1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.text.ParseException;
24 import java.text.SimpleDateFormat;
25 import java.util.Calendar;
26 import java.util.Date;
27 import java.util.Iterator;
28 import java.util.Map;
29
30 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
31 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
33 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
34 import org.apache.hadoop.chukwa.extraction.engine.Record;
35 import org.apache.hadoop.mapred.OutputCollector;
36 import org.apache.hadoop.mapred.Reporter;
37 import org.apache.log4j.Logger;
38 import org.json.simple.JSONObject;
39 import org.json.simple.JSONValue;
40
41 @Tables(annotations={
42 @Table(name="Hadoop",columnFamily="jvm_metrics"),
43 @Table(name="Hadoop",columnFamily="mapred_metrics"),
44 @Table(name="Hadoop",columnFamily="dfs_metrics"),
45 @Table(name="Hadoop",columnFamily="dfs_namenode"),
46 @Table(name="Hadoop",columnFamily="dfs_FSNamesystem"),
47 @Table(name="Hadoop",columnFamily="dfs_datanode"),
48 @Table(name="Hadoop",columnFamily="mapred_jobtracker"),
49 @Table(name="Hadoop",columnFamily="mapred_shuffleInput"),
50 @Table(name="Hadoop",columnFamily="mapred_shuffleOutput"),
51 @Table(name="Hadoop",columnFamily="mapred_tasktracker"),
52 @Table(name="Hadoop",columnFamily="rpc_metrics")
53 })
54 public class HadoopMetricsProcessor extends AbstractProcessor {
55 static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
56 static final String chukwaTimestampField = "timestamp";
57 static final String contextNameField = "contextName";
58 static final String recordNameField = "recordName";
59
60 private SimpleDateFormat sdf = null;
61
62 public HadoopMetricsProcessor() {
63
64 sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
65 }
66
67 @SuppressWarnings("unchecked")
68 @Override
69 protected void parse(String recordEntry,
70 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
71 throws Throwable {
72 try {
73
74 int idx = recordEntry.indexOf('>', 0);
75 String dStr = recordEntry.substring(idx+1, idx+23);
76 int start = idx + 25;
77 idx = recordEntry.indexOf(' ', start);
78
79 start = idx + 1;
80 idx = recordEntry.indexOf(' ', start);
81
82 String body = recordEntry.substring(idx + 1);
83 body = body.replaceAll("\n", "");
84
85 Date d = sdf.parse(dStr);
86
87 start = body.indexOf('{');
88 JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
89
90 ChukwaRecord record = new ChukwaRecord();
91 StringBuilder datasource = new StringBuilder();
92 String contextName = null;
93 String recordName = null;
94
95 Iterator<Map.Entry<String, ?>> ki = json.entrySet().iterator();
96 while (ki.hasNext()) {
97 Map.Entry<String, ?> entry = ki.next();
98 String keyName = entry.getKey();
99 Object value = entry.getValue();
100 if (chukwaTimestampField.intern() == keyName.intern()) {
101 d = new Date((Long) value);
102 Calendar cal = Calendar.getInstance();
103 cal.setTimeInMillis(d.getTime());
104 cal.set(Calendar.SECOND, 0);
105 cal.set(Calendar.MILLISECOND, 0);
106 d.setTime(cal.getTimeInMillis());
107 } else if (contextNameField.intern() == keyName.intern()) {
108 contextName = (String) value;
109 } else if (recordNameField.intern() == keyName.intern()) {
110 recordName = (String) value;
111 record.add(keyName, value.toString());
112 } else {
113 if(json.get(keyName)!=null) {
114 record.add(keyName, value.toString());
115 }
116 }
117 }
118 if(contextName!=null) {
119 datasource.append(contextName);
120 datasource.append("_");
121 }
122 datasource.append(recordName);
123 record.add("cluster", chunk.getTag("cluster"));
124 if(contextName!=null && contextName.equals("jvm")) {
125 buildJVMRecord(record, d.getTime(), datasource.toString());
126 } else {
127 buildGenericRecord(record, null, d.getTime(), datasource.toString());
128 }
129 output.collect(key, record);
130 } catch (ParseException e) {
131 log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
132 e);
133 throw e;
134 } catch (IOException e) {
135 log.warn("Unable to collect output in HadoopMetricsProcessor ["
136 + recordEntry + "]", e);
137 throw e;
138 } catch (Exception e) {
139 log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
140 e);
141 throw e;
142 }
143
144 }
145
146 protected void buildJVMRecord(ChukwaRecord record, long timestamp, String dataSource) {
147 calendar.setTimeInMillis(timestamp);
148 calendar.set(Calendar.MINUTE, 0);
149 calendar.set(Calendar.SECOND, 0);
150 calendar.set(Calendar.MILLISECOND, 0);
151
152 key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + ":" +
153 record.getValue("processName")+ "/" + timestamp);
154 key.setReduceType(dataSource);
155 record.setTime(timestamp);
156
157 record.add(Record.tagsField, chunk.getTags());
158 record.add(Record.sourceField, chunk.getSource());
159 record.add(Record.applicationField, chunk.getStreamName());
160 }
161
162 public String getDataType() {
163 return HadoopMetricsProcessor.class.getName();
164 }
165
166 }