1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21 import java.nio.charset.Charset;
22 import java.util.Calendar;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.Map;
26 import java.util.TimeZone;
27 import java.util.TreeMap;
28 import java.util.concurrent.ConcurrentHashMap;
29
30 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
33 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
34 import org.apache.hadoop.chukwa.util.ExceptionUtil;
35 import org.apache.hadoop.mapred.OutputCollector;
36 import org.apache.hadoop.mapred.Reporter;
37 import org.apache.hadoop.record.Buffer;
38 import org.apache.log4j.Logger;
39 import org.json.simple.JSONObject;
40 import org.json.simple.JSONValue;
41
42 @Tables(annotations = { @Table(name = "HBase", columnFamily = "master") })
43 public class HBaseMasterProcessor extends AbstractProcessor {
44 static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
45 static {
46 long zero = 0L;
47 rateMap.put("splitSizeNumOps", zero);
48 rateMap.put("splitTimeNumOps", zero);
49 }
50
51 @Override
52 protected void parse(String recordEntry,
53 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
54 throws Throwable {
55
56 Logger log = Logger.getLogger(HBaseMasterProcessor.class);
57 long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
58 .getTimeInMillis();
59 ChukwaRecord record = new ChukwaRecord();
60
61 Map<String, Buffer> metricsMap = new HashMap<String, Buffer>();
62
63 try {
64 JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
65 String ttTag = chunk.getTag("timeStamp");
66 if (ttTag == null) {
67 log.warn("timeStamp tag not set in JMX adaptor for hbase master");
68 } else {
69 timeStamp = Long.parseLong(ttTag);
70 }
71 @SuppressWarnings("unchecked")
72 Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
73
74 while (keys.hasNext()) {
75 Map.Entry<String, ?> entry = keys.next();
76 String key = entry.getKey();
77 Object value = entry.getValue();
78 String valueString = value == null ? "" : value.toString();
79
80
81 if (rateMap.containsKey(key)) {
82 long oldValue = rateMap.get(key);
83 long curValue = Long.parseLong(valueString);
84 rateMap.put(key, curValue);
85 long newValue = curValue - oldValue;
86 if (newValue < 0) {
87 log.warn("HBaseMaster rateMap might be reset or corrupted for metric "
88 + key);
89 newValue = 0L;
90 }
91 valueString = Long.toString(newValue);
92 }
93
94 Buffer b = new Buffer(valueString.getBytes(Charset.forName("UTF-8")));
95 metricsMap.put(key, b);
96 }
97
98 TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap);
99 record.setMapFields(t);
100 buildGenericRecord(record, null, timeStamp, "master");
101 output.collect(key, record);
102 } catch (Exception e) {
103 log.error(ExceptionUtil.getStackTrace(e));
104 }
105 }
106 }