1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import java.io.File;
21 import java.io.IOException;
22
23 import org.apache.log4j.Logger;
24 import org.apache.commons.io.FileUtils;
25 import org.apache.commons.io.filefilter.FileFilterUtils;
26 import org.apache.commons.io.filefilter.IOFileFilter;
27 import org.apache.commons.io.filefilter.WildcardFileFilter;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 public class DirTailingAdaptor extends AbstractAdaptor implements Runnable {
45
46 static Logger log = Logger.getLogger(DirTailingAdaptor.class);
47
48 Thread scanThread = new Thread(this);
49 long lastSweepStartTime;
50 volatile boolean continueScanning=true;
51 File baseDir;
52 String baseDirName;
53 long scanInterval;
54
55 protected String adaptorName;
56
57 IOFileFilter fileFilter;
58 String wildCharacter;
59
60 @Override
61 public void start(long offset) throws AdaptorException {
62 scanInterval = control.getConfiguration().getInt("adaptor.dirscan.intervalMs", 10000);
63
64 scanThread.start();
65 lastSweepStartTime = offset;
66 try {
67 baseDirName = baseDir.getCanonicalPath();
68 } catch(IOException e) {
69 throw new AdaptorException(e);
70 }
71 }
72
73 public void run() {
74 try {
75 log.debug("dir tailer starting to scan");
76 while(continueScanning) {
77 try {
78 long sweepStartTime = System.currentTimeMillis();
79 scanDirHierarchy(baseDir);
80 lastSweepStartTime=sweepStartTime;
81 control.reportCommit(this, lastSweepStartTime);
82 } catch(IOException e) {
83 log.warn(e);
84 }
85 Thread.sleep(scanInterval);
86 }
87 } catch(InterruptedException e) {
88 }
89 }
90
91
92
93
94 private void scanDirHierarchy(File dir) throws IOException {
95 if(!dir.exists())
96 return;
97 if(!dir.isDirectory()) {
98
99 if(dir.lastModified() >= lastSweepStartTime) {
100 String newAdaptorID = control.processAddCommand(getAdaptorAddCommand(dir));
101
102 log.info("DirTailingAdaptor " + adaptorID + " started new adaptor " + newAdaptorID);
103 }
104
105 } else {
106 log.info("Scanning directory: " + dir.getName());
107
108 for(Object f: FileUtils.listFiles(dir, fileFilter, FileFilterUtils.trueFileFilter())) {
109 scanDirHierarchy((File)f);
110 }
111 }
112 }
113
114 protected String getAdaptorAddCommand(File dir) throws IOException {
115 return "add " + adaptorName + " " + type + " " + dir.getCanonicalPath() + " 0";
116 }
117
118 @Override
119 public String getCurrentStatus() {
120 return this.wildCharacter == null ? (type + " " + baseDirName + " " + adaptorName)
121 :(type + " " + baseDirName + " " + this.wildCharacter + " " + adaptorName);
122 }
123
124 @Override
125 public String parseArgs(String status) {
126
127 String[] args = status.split(" ");
128
129 if(args.length == 2){
130 baseDir = new File(args[0]);
131 fileFilter = FileFilterUtils.trueFileFilter();
132 adaptorName = args[1];
133 }else if(args.length == 3){
134 baseDir = new File(args[0]);
135 this.wildCharacter = args[ 1 ];
136 fileFilter = getFileFilter();
137 adaptorName = args[2];
138 }else{
139 log.warn("bad syntax in DirTailingAdaptor args");
140 return null;
141 }
142
143 return (args.length == 2)? baseDir + " " + adaptorName : baseDir + " " + this.wildCharacter + " " + adaptorName;
144 }
145
146 @Override
147 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
148 throws AdaptorException {
149 continueScanning = false;
150 return lastSweepStartTime;
151 }
152
153
154
155
156
157
158
159
160 protected IOFileFilter getFileFilter() {
161 return new WildcardFileFilter( this.wildCharacter );
162 }
163 }