1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.io.RandomAccessFile;
23 import java.util.regex.Matcher;
24 import java.util.regex.Pattern;
25 import org.apache.hadoop.chukwa.ChunkImpl;
26 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
27 import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
28 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
29 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.log4j.Logger;
32
33
34
35
36
37
38
39
40
41
42 public class LWFTAdaptor extends AbstractAdaptor {
43
44
45
46
47
48
49
50 public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
51 public static final String MAX_READ_SIZE_OPT =
52 "chukwaAgent.fileTailingAdaptor.maxReadSize";
53
54 public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
55
56 static Logger log;
57 protected static FileTailer tailer;
58
59 static {
60 tailer = null;
61 log = Logger.getLogger(FileTailingAdaptor.class);
62 }
63
64
65
66
67
68 protected long fileReadOffset;
69
70
71
72
73 protected long offsetOfFirstByte = 0;
74 protected Configuration conf = null;
75
76
77
78 protected long lastSlurpTime = 0l;
79
80 File toWatch;
81
82 @Override
83 public void start(long offset) {
84 synchronized(LWFTAdaptor.class) {
85 if(tailer == null)
86 tailer = new FileTailer(control.getConfiguration());
87 }
88 this.fileReadOffset = offset - offsetOfFirstByte;
89 tailer.startWatchingFile(this);
90 }
91
92
93
94
95 public String getCurrentStatus() {
96 return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
97 }
98
99 public String toString() {
100 return "Lightweight Tailer on " + toWatch;
101 }
102
103 public String getStreamName() {
104 return toWatch.getPath();
105 }
106
107 @Override
108 public String parseArgs(String params) {
109 conf = control.getConfiguration();
110 MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);
111
112 Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
113 Matcher m = cmd.matcher(params);
114 if (m.matches()) {
115 offsetOfFirstByte = Long.parseLong(m.group(1));
116 toWatch = new File(m.group(2));
117 } else {
118 toWatch = new File(params.trim());
119 }
120 return toWatch.getAbsolutePath();
121 }
122
123 @Override
124 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
125 throws AdaptorException {
126 tailer.stopWatchingFile(this);
127 return fileReadOffset + offsetOfFirstByte;
128 }
129
130
131
132
133
134
135
136
137
138
139
140 protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
141 byte[] buf) throws InterruptedException {
142 if(buf.length == 0)
143 return 0;
144
145 ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
146 buffOffsetInFile + buf.length, buf, this);
147
148 eq.add(chunk);
149 return buf.length;
150 }
151
152 protected boolean slurp(long len, RandomAccessFile reader) throws IOException,
153 InterruptedException{
154 boolean hasMoreData = false;
155
156 log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset);
157 reader.seek(fileReadOffset);
158
159 long bufSize = len - fileReadOffset;
160
161 if (bufSize > MAX_READ_SIZE) {
162 bufSize = MAX_READ_SIZE;
163 hasMoreData = true;
164 }
165 byte[] buf = new byte[(int) bufSize];
166
167 long curOffset = fileReadOffset;
168
169 lastSlurpTime = System.currentTimeMillis();
170 int bufferRead = reader.read(buf);
171 assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
172 + " pointer is "
173 + reader.getFilePointer()
174 + " but offset is "
175 + fileReadOffset + bufSize;
176
177 int bytesUsed = extractRecords(dest,
178 fileReadOffset + offsetOfFirstByte, buf);
179
180
181
182
183
184
185
186 if (bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
187 log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, dropping current buffer: startOffset="
188 + curOffset
189 + ", MAX_READ_SIZE="
190 + MAX_READ_SIZE
191 + ", for "
192 + toWatch.getPath());
193 bytesUsed = buf.length;
194 }
195
196 fileReadOffset = fileReadOffset + bytesUsed;
197
198 log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"
199 + fileReadOffset);
200 return hasMoreData;
201 }
202
203 public synchronized boolean tailFile()
204 throws InterruptedException {
205 boolean hasMoreData = false;
206 try {
207
208
209
210
211
212 long len = toWatch.length();
213 if(len < fileReadOffset) {
214
215 handleShrunkenFile(len);
216 } else if(len > fileReadOffset) {
217 RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
218 hasMoreData = slurp(len, reader);
219 reader.close();
220 }
221 } catch(IOException e) {
222 log.warn("IOException in tailer", e);
223 deregisterAndStop();
224 }
225
226 return hasMoreData;
227 }
228
229 private void handleShrunkenFile(long measuredLen) {
230 log.info("file "+ toWatch +"shrank from " + fileReadOffset + " to " + measuredLen);
231 offsetOfFirstByte = measuredLen;
232 fileReadOffset = 0;
233 }
234
235 }