1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21 import java.util.Calendar;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.TimeZone;
26 import java.util.concurrent.ConcurrentHashMap;
27
28 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
29 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32 import org.apache.hadoop.chukwa.util.ExceptionUtil;
33 import org.apache.hadoop.mapred.OutputCollector;
34 import org.apache.hadoop.mapred.Reporter;
35 import org.apache.log4j.Logger;
36 import org.json.simple.JSONObject;
37 import org.json.simple.JSONValue;
38
39 @Tables(annotations = { @Table(name = "Datanode", columnFamily = "dn"),
40 @Table(name = "Datanode", columnFamily = "jvm"),
41 @Table(name = "Datanode", columnFamily = "rpc") })
42 public class DatanodeProcessor extends AbstractProcessor {
43
44 static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
45
46 static {
47 long zero = 0L;
48 rateMap.put("blocks_verified", zero);
49 rateMap.put("blocks_written", zero);
50 rateMap.put("blocks_read", zero);
51 rateMap.put("bytes_written", zero);
52 rateMap.put("bytes_read", zero);
53 rateMap.put("heartBeats_num_ops", zero);
54 rateMap.put("SentBytes", zero);
55 rateMap.put("ReceivedBytes", zero);
56 rateMap.put("rpcAuthorizationSuccesses", zero);
57 rateMap.put("rpcAuthorizationFailures", zero);
58 rateMap.put("RpcQueueTime_num_ops", zero);
59 rateMap.put("RpcProcessingTime_num_ops", zero);
60 rateMap.put("gcCount", zero);
61 }
62
63 @Override
64 protected void parse(String recordEntry,
65 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
66 throws Throwable {
67 Logger log = Logger.getLogger(DatanodeProcessor.class);
68 long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
69 .getTimeInMillis();
70
71 final ChukwaRecord hdfs_datanode = new ChukwaRecord();
72 final ChukwaRecord datanode_jvm = new ChukwaRecord();
73 final ChukwaRecord datanode_rpc = new ChukwaRecord();
74
75 Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() {
76 private static final long serialVersionUID = 1L;
77
78 {
79 put("blocks_verified", hdfs_datanode);
80 put("blocks_written", hdfs_datanode);
81 put("blocks_read", hdfs_datanode);
82 put("blocks_replicated", hdfs_datanode);
83 put("blocks_removed", hdfs_datanode);
84 put("bytes_written", hdfs_datanode);
85 put("bytes_read", hdfs_datanode);
86 put("heartBeats_avg_time", hdfs_datanode);
87 put("heartBeats_num_ops", hdfs_datanode);
88
89 put("gcCount", datanode_jvm);
90 put("gcTimeMillis", datanode_jvm);
91 put("logError", datanode_jvm);
92 put("logFatal", datanode_jvm);
93 put("logInfo", datanode_jvm);
94 put("logWarn", datanode_jvm);
95 put("memHeapCommittedM", datanode_jvm);
96 put("memHeapUsedM", datanode_jvm);
97 put("threadsBlocked", datanode_jvm);
98 put("threadsNew", datanode_jvm);
99 put("threadsRunnable", datanode_jvm);
100 put("threadsTerminated", datanode_jvm);
101 put("threadsTimedWaiting", datanode_jvm);
102 put("threadsWaiting", datanode_jvm);
103
104 put("ReceivedBytes", datanode_rpc);
105 put("RpcProcessingTime_avg_time", datanode_rpc);
106 put("RpcProcessingTime_num_ops", datanode_rpc);
107 put("RpcQueueTime_avg_time", datanode_rpc);
108 put("RpcQueueTime_num_ops", datanode_rpc);
109 put("SentBytes", datanode_rpc);
110 put("rpcAuthorizationSuccesses", datanode_rpc);
111 }
112 };
113 try {
114 JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
115 String ttTag = chunk.getTag("timeStamp");
116 if (ttTag == null) {
117 log.warn("timeStamp tag not set in JMX adaptor for datanode");
118 } else {
119 timeStamp = Long.parseLong(ttTag);
120 }
121 @SuppressWarnings("unchecked")
122 Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
123
124 while (keys.hasNext()) {
125 Map.Entry<String, ?> entry = keys.next();
126 String key = entry.getKey();
127 Object value = entry.getValue();
128 String valueString = value == null ? "" : value.toString();
129
130
131 if (rateMap.containsKey(key)) {
132 long oldValue = rateMap.get(key);
133 long curValue = Long.parseLong(valueString);
134 rateMap.put(key, curValue);
135 long newValue = curValue - oldValue;
136 if (newValue < 0) {
137 log.error("DatanodeProcessor's rateMap might be reset or corrupted for metric "
138 + key);
139 newValue = 0L;
140 }
141 valueString = Long.toString(newValue);
142 }
143
144 if (metricsMap.containsKey(key)) {
145 ChukwaRecord rec = metricsMap.get(key);
146 rec.add(key, valueString);
147 }
148 }
149 buildGenericRecord(hdfs_datanode, null, timeStamp, "dn");
150 output.collect(key, hdfs_datanode);
151 buildGenericRecord(datanode_jvm, null, timeStamp, "jvm");
152 output.collect(key, datanode_jvm);
153 buildGenericRecord(datanode_rpc, null, timeStamp, "rpc");
154 output.collect(key, datanode_rpc);
155 } catch (Exception e) {
156 log.error(ExceptionUtil.getStackTrace(e));
157 }
158 }
159 }