1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.util.Hashtable;
24 import java.util.regex.Matcher;
25 import java.util.regex.Pattern;
26 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
27 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28 import org.apache.hadoop.chukwa.extraction.engine.Record;
29 import org.apache.hadoop.mapred.OutputCollector;
30 import org.apache.hadoop.mapred.Reporter;
31 import org.apache.log4j.Logger;
32
33 public class Log4jJobHistoryProcessor extends AbstractProcessor {
34 static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
35
36 private static final String recordType = "JobLogHistory";
37 private static String internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
38 private Pattern ip = null;
39
40 private Matcher internalMatcher = null;
41
42 public Log4jJobHistoryProcessor() {
43 ip = Pattern.compile(internalRegex);
44 internalMatcher = ip.matcher("-");
45 }
46
47 @Override
48 protected void parse(String recordEntry,
49 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
50 throws Throwable {
51
52
53
54
55 try {
56
57
58 int start = 24;
59 int idx = recordEntry.indexOf(' ', start);
60
61 start = idx + 1;
62 idx = recordEntry.indexOf(' ', start);
63
64 String body = recordEntry.substring(idx + 1);
65
66 Hashtable<String, String> keys = new Hashtable<String, String>();
67 ChukwaRecord record = null;
68
69 int firstSep = body.indexOf(" ");
70 keys.put("RECORD_TYPE", body.substring(0, firstSep));
71
72
73
74 body = body.substring(firstSep);
75
76 internalMatcher.reset(body);
77
78
79
80
81 while (internalMatcher.matches()) {
82
83 keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
84 .trim());
85
86
87
88
89
90
91
92 internalMatcher.reset(internalMatcher.group(3));
93 }
94
95 if (!keys.containsKey("JOBID")) {
96
97
98
99 String jobId = keys.get("TASKID");
100 int idx1 = jobId.indexOf('_', 0);
101 int idx2 = jobId.indexOf('_', idx1 + 1);
102 idx2 = jobId.indexOf('_', idx2 + 1);
103 keys.put("JOBID", "job" + jobId.substring(idx1, idx2));
104
105
106 }
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
150 && keys.containsKey("START_TIME")) {
151
152
153
154
155
156
157 key = new ChukwaRecordKey();
158 key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
159 + keys.get("START_TIME"));
160 key.setReduceType("JobLogHistoryReduceProcessor");
161 record = new ChukwaRecord();
162 record.setTime(Long.parseLong(keys.get("START_TIME")));
163 record.add("JOBID", keys.get("JOBID"));
164 record.add("START_TIME", keys.get("START_TIME"));
165 record.add(Record.tagsField, chunk.getTags());
166
167 output.collect(key, record);
168
169 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
170 && keys.containsKey("FINISH_TIME")) {
171
172
173
174
175
176
177
178 key = new ChukwaRecordKey();
179 key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
180 + keys.get("FINISH_TIME"));
181 key.setReduceType("JobLogHistoryReduceProcessor");
182 record = new ChukwaRecord();
183 record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
184 record.add("JOBID", keys.get("JOBID"));
185 record.add("FINISH_TIME", keys.get("FINISH_TIME"));
186 record.add(Record.tagsField, chunk.getTags());
187
188 output.collect(key, record);
189 }
190
191 else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
192 && keys.containsKey("START_TIME")) {
193
194
195
196
197
198
199 key = new ChukwaRecordKey();
200 key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
201 + keys.get("START_TIME"));
202 key.setReduceType("JobLogHistoryReduceProcessor");
203 record = new ChukwaRecord();
204 record.setTime(Long.parseLong(keys.get("START_TIME")));
205 record.add("JOBID", keys.get("JOBID"));
206 record.add("START_TIME", keys.get("START_TIME"));
207 record.add(Record.tagsField, chunk.getTags());
208
209 output.collect(key, record);
210
211 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
212 && keys.containsKey("FINISH_TIME")) {
213
214
215
216
217
218
219
220 key = new ChukwaRecordKey();
221 key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
222 + keys.get("SHUFFLE_FINISHED"));
223 key.setReduceType("JobLogHistoryReduceProcessor");
224 record = new ChukwaRecord();
225 record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
226 record.add("JOBID", keys.get("JOBID"));
227 record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
228 record.add(Record.tagsField, chunk.getTags());
229
230 output.collect(key, record);
231
232
233 key = new ChukwaRecordKey();
234 key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
235 + keys.get("SHUFFLE_FINISHED"));
236 key.setReduceType("JobLogHistoryReduceProcessor");
237 record = new ChukwaRecord();
238 record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
239 record.add("JOBID", keys.get("JOBID"));
240 record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
241 record.add(Record.tagsField, chunk.getTags());
242
243 output.collect(key, record);
244
245 key = new ChukwaRecordKey();
246 key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
247 + keys.get("SORT_FINISHED"));
248 key.setReduceType("JobLogHistoryReduceProcessor");
249 record = new ChukwaRecord();
250 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
251 record.add("JOBID", keys.get("JOBID"));
252 record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
253 record.add(Record.tagsField, chunk.getTags());
254
255 output.collect(key, record);
256
257
258 key = new ChukwaRecordKey();
259 key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
260 + keys.get("SORT_FINISHED"));
261 key.setReduceType("JobLogHistoryReduceProcessor");
262 record = new ChukwaRecord();
263 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
264 record.add("JOBID", keys.get("JOBID"));
265 record.add("START_TIME", keys.get("SORT_FINISHED"));
266 record.add(Record.tagsField, chunk.getTags());
267
268 output.collect(key, record);
269
270 key = new ChukwaRecordKey();
271 key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
272 + keys.get("FINISH_TIME"));
273 key.setReduceType("JobLogHistoryReduceProcessor");
274 record = new ChukwaRecord();
275 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
276 record.add("JOBID", keys.get("JOBID"));
277 record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
278 record.add(Record.tagsField, chunk.getTags());
279
280 output.collect(key, record);
281
282 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")
283 && keys.containsKey("COUNTERS")) {
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304 record = new ChukwaRecord();
305 key = new ChukwaRecordKey();
306 buildGenericRecord(record, null, Long
307 .parseLong(keys.get("FINISH_TIME")), "MRJobCounters");
308 extractCounters(record, keys.get("COUNTERS"));
309
310 String jobId = keys.get("JOBID").replace("_", "").substring(3);
311 record.add("JobId", "" + jobId);
312
313
314 if (keys.containsKey("HODID")) {
315 record.add("HodId", keys.get("HODID"));
316 }
317
318
319 output.collect(key, record);
320 }
321
322 if (keys.containsKey("TASK_TYPE")
323 && keys.containsKey("COUNTERS")
324 && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
325 "TASK_TYPE").equalsIgnoreCase("MAP"))) {
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350 record = new ChukwaRecord();
351 key = new ChukwaRecordKey();
352 buildGenericRecord(record, null, Long
353 .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
354 extractCounters(record, keys.get("COUNTERS"));
355 record.add("JOBID", keys.get("JOBID"));
356 record.add("TASKID", keys.get("TASKID"));
357 record.add("TASK_TYPE", keys.get("TASK_TYPE"));
358
359
360 output.collect(key, record);
361
362 }
363 } catch (IOException e) {
364 log.warn("Unable to collect output in JobLogHistoryProcessor ["
365 + recordEntry + "]", e);
366 e.printStackTrace();
367 throw e;
368 }
369
370 }
371
372 protected void extractCounters(ChukwaRecord record, String input) {
373
374 String[] data = null;
375 String[] counters = input.split(",");
376
377 for (String counter : counters) {
378 data = counter.split(":");
379 record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
380 .toUpperCase(), data[1]);
381 }
382 }
383
384 public String getDataType() {
385 return Log4jJobHistoryProcessor.recordType;
386 }
387 }