1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.text.ParseException;
24 import java.text.SimpleDateFormat;
25 import java.util.Calendar;
26 import java.util.Date;
27
28 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
29 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32 import org.apache.hadoop.chukwa.extraction.engine.Record;
33 import org.apache.hadoop.mapred.OutputCollector;
34 import org.apache.hadoop.mapred.Reporter;
35 import org.apache.log4j.Logger;
36
37 @Tables(annotations={
38 @Table(name="Jobs",columnFamily="summary")
39 })
40 public class JobSummary extends AbstractProcessor {
41 static Logger log = Logger.getLogger(JobSummary.class);
42 static final String chukwaTimestampField = "timestamp";
43 static final String contextNameField = "contextName";
44 static final String recordNameField = "recordName";
45
46 private SimpleDateFormat sdf = null;
47
48 public JobSummary() {
49
50 sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
51 }
52
53 @SuppressWarnings("unchecked")
54 @Override
55 protected void parse(String recordEntry,
56 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
57 throws Throwable {
58 try {
59
60 int idx = recordEntry.indexOf('>', 0);
61 String dStr = recordEntry.substring(idx+1, idx+23);
62 int start = idx + 25;
63 idx = recordEntry.indexOf(' ', start);
64
65 start = idx + 1;
66 idx = recordEntry.indexOf(' ', start);
67
68 String body = recordEntry.substring(idx + 1);
69 body = body.replaceAll("\n", "");
70
71 Date d = sdf.parse(dStr);
72
73 ChukwaRecord record = new ChukwaRecord();
74
75 String[] list = body.split(",");
76 for(String pair : list) {
77 String[] kv = pair.split("=");
78 record.add(kv[0], kv[1]);
79 }
80 record.add("cluster", chunk.getTag("cluster"));
81 buildGenericRecord(record, d.getTime(), "summary");
82 output.collect(key, record);
83 } catch (ParseException e) {
84 log.warn("Wrong format in JobSummary [" + recordEntry + "]",
85 e);
86 throw e;
87 } catch (IOException e) {
88 log.warn("Unable to collect output in JobSummary ["
89 + recordEntry + "]", e);
90 throw e;
91 } catch (Exception e) {
92 log.warn("Wrong format in JobSummary [" + recordEntry + "]",
93 e);
94 throw e;
95 }
96
97 }
98
99 protected void buildGenericRecord(ChukwaRecord record,
100 long timestamp, String dataSource) {
101 calendar.setTimeInMillis(timestamp);
102 calendar.set(Calendar.MINUTE, 0);
103 calendar.set(Calendar.SECOND, 0);
104 calendar.set(Calendar.MILLISECOND, 0);
105
106 key.setKey("" + calendar.getTimeInMillis() + "/" + record.getValue("jobId") + "/"
107 + timestamp);
108 key.setReduceType(dataSource);
109 record.setTime(timestamp);
110
111 record.add(Record.tagsField, chunk.getTags());
112 record.add(Record.sourceField, chunk.getSource());
113 record.add(Record.applicationField, chunk.getStreamName());
114
115 }
116
117 public String getDataType() {
118 return JobSummary.class.getName();
119 }
120
121 }