View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.util.Hashtable;
24  import java.util.regex.Matcher;
25  import java.util.regex.Pattern;
26  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
27  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28  import org.apache.hadoop.chukwa.extraction.engine.Record;
29  import org.apache.hadoop.mapred.OutputCollector;
30  import org.apache.hadoop.mapred.Reporter;
31  import org.apache.log4j.Logger;
32  
33  public class Log4jJobHistoryProcessor extends AbstractProcessor {
34    static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
35  
36    private static final String recordType = "JobLogHistory";
37    private static String internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
38    private Pattern ip = null;
39  
40    private Matcher internalMatcher = null;
41  
42    public Log4jJobHistoryProcessor() {
43      ip = Pattern.compile(internalRegex);
44      internalMatcher = ip.matcher("-");
45    }
46  
47    @Override
48    protected void parse(String recordEntry,
49        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
50        throws Throwable {
51  
52      // log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
53      // + chunk.getDataType() + "]");
54  
55      try {
56  
57        // String dStr = recordEntry.substring(0, 23);
58        int start = 24;
59        int idx = recordEntry.indexOf(' ', start);
60        // String level = recordEntry.substring(start, idx);
61        start = idx + 1;
62        idx = recordEntry.indexOf(' ', start);
63        // String className = recordEntry.substring(start, idx-1);
64        String body = recordEntry.substring(idx + 1);
65  
66        Hashtable<String, String> keys = new Hashtable<String, String>();
67        ChukwaRecord record = null;
68  
69        int firstSep = body.indexOf(" ");
70        keys.put("RECORD_TYPE", body.substring(0, firstSep));
71        // log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
72        // + keys.get("RECORD_TYPE") + "]");
73  
74        body = body.substring(firstSep);
75  
76        internalMatcher.reset(body);
77  
78        // String fieldName = null;
79        // String fieldValue = null;
80  
81        while (internalMatcher.matches()) {
82  
83          keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
84              .trim());
85  
86          // TODO Remove debug info before production
87          // fieldName = internalMatcher.group(1).trim();
88          // fieldValue = internalMatcher.group(2).trim();
89          // log.info("JobLogHistoryProcessor Add field: [" + fieldName +
90          // "][" + fieldValue +"]" );
91          // log.info("EOL : [" + internalMatcher.group(3) + "]" );
92          internalMatcher.reset(internalMatcher.group(3));
93        }
94  
95        if (!keys.containsKey("JOBID")) {
96          // Extract JobID from taskID
97          // JOBID = "job_200804210403_0005"
98          // TASKID = "tip_200804210403_0005_m_000018"
99          String jobId = keys.get("TASKID");
100         int idx1 = jobId.indexOf('_', 0);
101         int idx2 = jobId.indexOf('_', idx1 + 1);
102         idx2 = jobId.indexOf('_', idx2 + 1);
103         keys.put("JOBID", "job" + jobId.substring(idx1, idx2));
104         // log.info("JobLogHistoryProcessor Add field: [JOBID]["
105         // + keys.get("JOBID") + "]");
106       }
107 
108       // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
109       // keys.containsKey("SUBMIT_TIME"))
110       // {
111       // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
112       // USER="userxxx"
113       // // SUBMIT_TIME="1208760436751"
114       // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
115       //					
116       //					
117       // }
118       // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
119       // keys.containsKey("LAUNCH_TIME"))
120       // {
121       // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
122       // TOTAL_MAPS="5912" TOTAL_REDUCES="739"
123       //					
124       // }
125       // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
126       // keys.containsKey("FINISH_TIME"))
127       // {
128       // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
129       // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
130       // FAILED_MAPS="0" FAILED_REDUCES="0"
131       // // COUNTERS="File Systems.Local bytes read:1735053407244,File
132       // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
133       // read:801605644910,File Systems.HDFS bytes written:44135800,
134       // // Job Counters .Launched map tasks:5912,Job Counters .Launched
135       // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
136       // Counters .Rack-local map tasks:316,Map-Reduce Framework.
137       // // Map input records:9410696067,Map-Reduce Framework.Map output
138       // records:9410696067,Map-Reduce Framework.Map input
139       // bytes:801599188816,Map-Reduce Framework.Map output
140       // bytes:784427968116,
141       // // Map-Reduce Framework.Combine input records:0,Map-Reduce
142       // Framework.Combine output records:0,Map-Reduce Framework.Reduce
143       // input groups:477265,Map-Reduce Framework.Reduce input
144       // records:739000,
145       // // Map-Reduce Framework.Reduce output records:739000"
146       //					
147       // }
148       // else
149       if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
150           && keys.containsKey("START_TIME")) {
151         // MapAttempt TASK_TYPE="MAP"
152         // TASKID="tip_200804210403_0005_m_000018"
153         // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
154         // START_TIME="1208760437531"
155         // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
156 
157         key = new ChukwaRecordKey();
158         key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
159             + keys.get("START_TIME"));
160         key.setReduceType("JobLogHistoryReduceProcessor");
161         record = new ChukwaRecord();
162         record.setTime(Long.parseLong(keys.get("START_TIME")));
163         record.add("JOBID", keys.get("JOBID"));
164         record.add("START_TIME", keys.get("START_TIME"));
165         record.add(Record.tagsField, chunk.getTags());
166         // log.info("JobLogHist/Map/S");
167         output.collect(key, record);
168 
169       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
170           && keys.containsKey("FINISH_TIME")) {
171         // MapAttempt TASK_TYPE="MAP"
172         // TASKID="tip_200804210403_0005_m_005494"
173         // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
174         // TASK_STATUS="SUCCESS"
175         // FINISH_TIME="1208760624124"
176         // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
177 
178         key = new ChukwaRecordKey();
179         key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
180             + keys.get("FINISH_TIME"));
181         key.setReduceType("JobLogHistoryReduceProcessor");
182         record = new ChukwaRecord();
183         record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
184         record.add("JOBID", keys.get("JOBID"));
185         record.add("FINISH_TIME", keys.get("FINISH_TIME"));
186         record.add(Record.tagsField, chunk.getTags());
187         // log.info("JobLogHist/Map/E");
188         output.collect(key, record);
189       }
190 
191       else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
192           && keys.containsKey("START_TIME")) {
193         // ReduceAttempt TASK_TYPE="REDUCE"
194         // TASKID="tip_200804210403_0005_r_000138"
195         // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
196         // START_TIME="1208760454885"
197         // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
198 
199         key = new ChukwaRecordKey();
200         key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
201             + keys.get("START_TIME"));
202         key.setReduceType("JobLogHistoryReduceProcessor");
203         record = new ChukwaRecord();
204         record.setTime(Long.parseLong(keys.get("START_TIME")));
205         record.add("JOBID", keys.get("JOBID"));
206         record.add("START_TIME", keys.get("START_TIME"));
207         record.add(Record.tagsField, chunk.getTags());
208         // log.info("JobLogHist/SHUFFLE/S");
209         output.collect(key, record);
210 
211       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
212           && keys.containsKey("FINISH_TIME")) {
213         // ReduceAttempt TASK_TYPE="REDUCE"
214         // TASKID="tip_200804210403_0005_r_000138"
215         // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
216         // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
217         // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
218         // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
219 
220         key = new ChukwaRecordKey();
221         key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
222             + keys.get("SHUFFLE_FINISHED"));
223         key.setReduceType("JobLogHistoryReduceProcessor");
224         record = new ChukwaRecord();
225         record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
226         record.add("JOBID", keys.get("JOBID"));
227         record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
228         record.add(Record.tagsField, chunk.getTags());
229         // log.info("JobLogHist/SHUFFLE/E");
230         output.collect(key, record);
231 
232         // SORT
233         key = new ChukwaRecordKey();
234         key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
235             + keys.get("SHUFFLE_FINISHED"));
236         key.setReduceType("JobLogHistoryReduceProcessor");
237         record = new ChukwaRecord();
238         record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
239         record.add("JOBID", keys.get("JOBID"));
240         record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
241         record.add(Record.tagsField, chunk.getTags());
242         // log.info("JobLogHist/SORT/S");
243         output.collect(key, record);
244 
245         key = new ChukwaRecordKey();
246         key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
247             + keys.get("SORT_FINISHED"));
248         key.setReduceType("JobLogHistoryReduceProcessor");
249         record = new ChukwaRecord();
250         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
251         record.add("JOBID", keys.get("JOBID"));
252         record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
253         record.add(Record.tagsField, chunk.getTags());
254         // log.info("JobLogHist/SORT/E");
255         output.collect(key, record);
256 
257         // Reduce
258         key = new ChukwaRecordKey();
259         key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
260             + keys.get("SORT_FINISHED"));
261         key.setReduceType("JobLogHistoryReduceProcessor");
262         record = new ChukwaRecord();
263         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
264         record.add("JOBID", keys.get("JOBID"));
265         record.add("START_TIME", keys.get("SORT_FINISHED"));
266         record.add(Record.tagsField, chunk.getTags());
267         // log.info("JobLogHist/REDUCE/S");
268         output.collect(key, record);
269 
270         key = new ChukwaRecordKey();
271         key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
272             + keys.get("FINISH_TIME"));
273         key.setReduceType("JobLogHistoryReduceProcessor");
274         record = new ChukwaRecord();
275         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
276         record.add("JOBID", keys.get("JOBID"));
277         record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
278         record.add(Record.tagsField, chunk.getTags());
279         // log.info("JobLogHist/REDUCE/E");
280         output.collect(key, record);
281 
282       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")
283           && keys.containsKey("COUNTERS")) {
284         // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
285         // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
286         // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
287         // COUNTERS="File Systems.Local bytes read:1735053407244,File
288         // Systems.Local bytes written:2610106384012,File Systems.HDFS
289         // bytes read:801605644910,File Systems.HDFS bytes
290         // written:44135800,
291         // Job Counters .Launched map tasks:5912,Job Counters .Launched
292         // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
293         // Counters .Rack-local map tasks:316,Map-Reduce Framework.
294         // Map input records:9410696067,Map-Reduce Framework.Map output
295         // records:9410696067,Map-Reduce Framework.Map input
296         // bytes:801599188816,Map-Reduce Framework.Map output
297         // bytes:784427968116,
298         // Map-Reduce Framework.Combine input records:0,Map-Reduce
299         // Framework.Combine output records:0,Map-Reduce
300         // Framework.Reduce input groups:477265,Map-Reduce
301         // Framework.Reduce input records:739000,
302         // Map-Reduce Framework.Reduce output records:739000"
303 
304         record = new ChukwaRecord();
305         key = new ChukwaRecordKey();
306         buildGenericRecord(record, null, Long
307             .parseLong(keys.get("FINISH_TIME")), "MRJobCounters");
308         extractCounters(record, keys.get("COUNTERS"));
309 
310         String jobId = keys.get("JOBID").replace("_", "").substring(3);
311         record.add("JobId", "" + jobId);
312 
313         // FIXME validate this when HodId will be available
314         if (keys.containsKey("HODID")) {
315           record.add("HodId", keys.get("HODID"));
316         }
317 
318         // log.info("MRJobCounters +1");
319         output.collect(key, record);
320       }
321 
322       if (keys.containsKey("TASK_TYPE")
323           && keys.containsKey("COUNTERS")
324           && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
325               "TASK_TYPE").equalsIgnoreCase("MAP"))) {
326         // MAP
327         // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
328         // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
329         // COUNTERS="File Systems.Local bytes read:159265655,File
330         // Systems.Local bytes written:318531310,
331         // File Systems.HDFS bytes read:145882417,Map-Reduce
332         // Framework.Map input records:1706604,
333         // Map-Reduce Framework.Map output records:1706604,Map-Reduce
334         // Framework.Map input bytes:145882057,
335         // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
336         // Framework.Combine input records:0,Map-Reduce
337         // Framework.Combine output records:0"
338 
339         // REDUCE
340         // Task TASKID="tip_200804210403_0005_r_000524"
341         // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
342         // FINISH_TIME="1208760877072"
343         // COUNTERS="File Systems.Local bytes read:1179319677,File
344         // Systems.Local bytes written:1184474889,File Systems.HDFS
345         // bytes written:59021,
346         // Map-Reduce Framework.Reduce input groups:684,Map-Reduce
347         // Framework.Reduce input records:1000,Map-Reduce
348         // Framework.Reduce output records:1000"
349 
350         record = new ChukwaRecord();
351         key = new ChukwaRecordKey();
352         buildGenericRecord(record, null, Long
353             .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
354         extractCounters(record, keys.get("COUNTERS"));
355         record.add("JOBID", keys.get("JOBID"));
356         record.add("TASKID", keys.get("TASKID"));
357         record.add("TASK_TYPE", keys.get("TASK_TYPE"));
358 
359         // log.info("MR_Graph +1");
360         output.collect(key, record);
361 
362       }
363     } catch (IOException e) {
364       log.warn("Unable to collect output in JobLogHistoryProcessor ["
365           + recordEntry + "]", e);
366       e.printStackTrace();
367       throw e;
368     }
369 
370   }
371 
372   protected void extractCounters(ChukwaRecord record, String input) {
373 
374     String[] data = null;
375     String[] counters = input.split(",");
376 
377     for (String counter : counters) {
378       data = counter.split(":");
379       record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
380           .toUpperCase(), data[1]);
381     }
382   }
383 
384   public String getDataType() {
385     return Log4jJobHistoryProcessor.recordType;
386   }
387 }