1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import java.util.regex.*;
21 import org.apache.hadoop.chukwa.Chunk;
22 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
23 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorFactory;
24 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
25
26 public class AbstractWrapper implements NotifyOnCommitAdaptor,ChunkReceiver {
27
28 Adaptor inner;
29 String innerClassName;
30 String innerType;
31 ChunkReceiver dest;
32 AdaptorManager manager;
33 String adaptorID;
34 @Override
35 public String getCurrentStatus() {
36 return innerClassName + " " + inner.getCurrentStatus();
37 }
38
39
40 static Pattern p = Pattern.compile("([^ ]+) +([^ ].*)");
41
42
43
44
45 @Override
46 public String parseArgs(String innerClassName, String params, AdaptorManager a) {
47 manager = a;
48 Matcher m = p.matcher(params);
49 this.innerClassName = innerClassName;
50 String innerCoreParams;
51 if(m.matches()) {
52 innerType = m.group(1);
53 inner = AdaptorFactory.createAdaptor(innerClassName);
54 innerCoreParams = inner.parseArgs(innerType,m.group(2),a);
55 return innerClassName + innerCoreParams;
56 }
57 else return null;
58 }
59
60 @Override
61 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
62 throws AdaptorException {
63 return inner.shutdown(shutdownPolicy);
64 }
65
66 @Override
67 public String getType() {
68 return innerType;
69 }
70
71
72
73
74 @Override
75 public void start(String adaptorID, String type, long offset,
76 ChunkReceiver dest) throws AdaptorException {
77 String dummyAdaptorID = adaptorID;
78 this.dest = dest;
79 this.adaptorID = adaptorID;
80 inner.start(dummyAdaptorID, type, offset, this);
81 }
82
83 @Override
84 public void add(Chunk event) throws InterruptedException {
85 dest.add(event);
86 }
87
88 @Override
89 public void committed(long commitedByte) { }
90
91 }