1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import java.util.*;
21 import java.util.regex.*;
22 import org.apache.hadoop.chukwa.Chunk;
23 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
24 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorFactory;
25 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
26
27 public class AbstractWrapper implements NotifyOnCommitAdaptor,ChunkReceiver {
28
29 Adaptor inner;
30 String innerClassName;
31 String innerType;
32 ChunkReceiver dest;
33 AdaptorManager manager;
34 String adaptorID;
35 @Override
36 public String getCurrentStatus() {
37 return innerClassName + " " + inner.getCurrentStatus();
38 }
39
40
41 static Pattern p = Pattern.compile("([^ ]+) +([^ ].*)");
42
43
44
45
46 @Override
47 public String parseArgs(String innerClassName, String params, AdaptorManager a) {
48 manager = a;
49 Matcher m = p.matcher(params);
50 this.innerClassName = innerClassName;
51 String innerCoreParams;
52 if(m.matches()) {
53 innerType = m.group(1);
54 inner = AdaptorFactory.createAdaptor(innerClassName);
55 innerCoreParams = inner.parseArgs(innerType,m.group(2),a);
56 return innerClassName + innerCoreParams;
57 }
58 else return null;
59 }
60
61 @Override
62 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
63 throws AdaptorException {
64 return inner.shutdown(shutdownPolicy);
65 }
66
67 @Override
68 public String getType() {
69 return innerType;
70 }
71
72
73
74
75 @Override
76 public void start(String adaptorID, String type, long offset,
77 ChunkReceiver dest) throws AdaptorException {
78 String dummyAdaptorID = adaptorID;
79 this.dest = dest;
80 this.adaptorID = adaptorID;
81 inner.start(dummyAdaptorID, type, offset, this);
82 }
83
84 @Override
85 public void add(Chunk event) throws InterruptedException {
86 dest.add(event);
87 }
88
89 @Override
90 public void committed(long commitedByte) { }
91
92 }