1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21 import java.util.Calendar;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.TimeZone;
26 import java.util.concurrent.ConcurrentHashMap;
27
28 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
29 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32 import org.apache.hadoop.chukwa.util.ExceptionUtil;
33 import org.apache.hadoop.mapred.OutputCollector;
34 import org.apache.hadoop.mapred.Reporter;
35 import org.apache.log4j.Logger;
36 import org.json.simple.JSONObject;
37 import org.json.simple.JSONValue;
38
39 @Tables(annotations={
40 @Table(name="Datanode",columnFamily="dn"),
41 @Table(name="Datanode",columnFamily="jvm"),
42 @Table(name="Datanode",columnFamily="rpc")
43 })
44 public class DatanodeProcessor extends AbstractProcessor{
45
46 static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
47
48 static {
49 long zero = 0L;
50 rateMap.put("blocks_verified", zero);
51 rateMap.put("blocks_written", zero);
52 rateMap.put("blocks_read", zero);
53 rateMap.put("bytes_written", zero);
54 rateMap.put("bytes_read", zero);
55 rateMap.put("heartBeats_num_ops", zero);
56 rateMap.put("SentBytes", zero);
57 rateMap.put("ReceivedBytes", zero);
58 rateMap.put("rpcAuthorizationSuccesses", zero);
59 rateMap.put("rpcAuthorizationFailures", zero);
60 rateMap.put("RpcQueueTime_num_ops", zero);
61 rateMap.put("RpcProcessingTime_num_ops", zero);
62 rateMap.put("gcCount", zero);
63 }
64
65 @Override
66 protected void parse(String recordEntry,
67 OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
68 Reporter reporter) throws Throwable {
69 Logger log = Logger.getLogger(DatanodeProcessor.class);
70 long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
71
72 final ChukwaRecord hdfs_datanode = new ChukwaRecord();
73 final ChukwaRecord datanode_jvm = new ChukwaRecord();
74 final ChukwaRecord datanode_rpc = new ChukwaRecord();
75
76 Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){
77 private static final long serialVersionUID = 1L;
78
79 {
80 put("blocks_verified", hdfs_datanode);
81 put("blocks_written", hdfs_datanode);
82 put("blocks_read", hdfs_datanode);
83 put("blocks_replicated", hdfs_datanode);
84 put("blocks_removed", hdfs_datanode);
85 put("bytes_written", hdfs_datanode);
86 put("bytes_read", hdfs_datanode);
87 put("heartBeats_avg_time", hdfs_datanode);
88 put("heartBeats_num_ops", hdfs_datanode);
89
90 put("gcCount", datanode_jvm);
91 put("gcTimeMillis", datanode_jvm);
92 put("logError", datanode_jvm);
93 put("logFatal", datanode_jvm);
94 put("logInfo", datanode_jvm);
95 put("logWarn", datanode_jvm);
96 put("memHeapCommittedM", datanode_jvm);
97 put("memHeapUsedM", datanode_jvm);
98 put("threadsBlocked", datanode_jvm);
99 put("threadsNew", datanode_jvm);
100 put("threadsRunnable", datanode_jvm);
101 put("threadsTerminated", datanode_jvm);
102 put("threadsTimedWaiting", datanode_jvm);
103 put("threadsWaiting", datanode_jvm);
104
105 put("ReceivedBytes", datanode_rpc);
106 put("RpcProcessingTime_avg_time", datanode_rpc);
107 put("RpcProcessingTime_num_ops", datanode_rpc);
108 put("RpcQueueTime_avg_time", datanode_rpc);
109 put("RpcQueueTime_num_ops", datanode_rpc);
110 put("SentBytes", datanode_rpc);
111 put("rpcAuthorizationSuccesses", datanode_rpc);
112 }
113 };
114 try{
115 JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
116 String ttTag = chunk.getTag("timeStamp");
117 if(ttTag == null){
118 log.warn("timeStamp tag not set in JMX adaptor for datanode");
119 }
120 else{
121 timeStamp = Long.parseLong(ttTag);
122 }
123 Iterator<JSONObject> iter = obj.entrySet().iterator();
124
125 while(iter.hasNext()){
126 Map.Entry entry = (Map.Entry)iter.next();
127 String key = (String) entry.getKey();
128 Object value = entry.getValue();
129 String valueString = value == null?"":value.toString();
130
131
132 if(rateMap.containsKey(key)){
133 long oldValue = rateMap.get(key);
134 long curValue = Long.parseLong(valueString);
135 rateMap.put(key, curValue);
136 long newValue = curValue - oldValue;
137 if(newValue < 0){
138 log.error("DatanodeProcessor's rateMap might be reset or corrupted for metric "+key);
139 newValue = 0L;
140 }
141 valueString = Long.toString(newValue);
142 }
143
144 if(metricsMap.containsKey(key)){
145 ChukwaRecord rec = metricsMap.get(key);
146 rec.add(key, valueString);
147 }
148 }
149 buildGenericRecord(hdfs_datanode, null, timeStamp, "dn");
150 output.collect(key, hdfs_datanode);
151 buildGenericRecord(datanode_jvm, null, timeStamp, "jvm");
152 output.collect(key, datanode_jvm);
153 buildGenericRecord(datanode_rpc, null, timeStamp, "rpc");
154 output.collect(key, datanode_rpc);
155 }
156 catch(Exception e){
157 log.error(ExceptionUtil.getStackTrace(e));
158 }
159 }
160 }