1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21 import java.util.Calendar;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.TimeZone;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentHashMap;
28
29 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33 import org.apache.hadoop.chukwa.util.ExceptionUtil;
34 import org.apache.hadoop.mapred.OutputCollector;
35 import org.apache.hadoop.mapred.Reporter;
36 import org.apache.hadoop.record.Buffer;
37 import org.apache.log4j.Logger;
38 import org.json.simple.JSONObject;
39 import org.json.simple.JSONValue;
40
41 @Tables(annotations={
42 @Table(name="HBase",columnFamily="master")
43 })
44 public class HBaseMasterProcessor extends AbstractProcessor{
45 static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
46 static {
47 long zero = 0L;
48 rateMap.put("splitSizeNumOps", zero);
49 rateMap.put("splitTimeNumOps", zero);
50 }
51
52 @Override
53 protected void parse(String recordEntry,
54 OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
55 Reporter reporter) throws Throwable {
56
57 Logger log = Logger.getLogger(HBaseMasterProcessor.class);
58 long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
59 ChukwaRecord record = new ChukwaRecord();
60
61 Map<String, Buffer> metricsMap = new HashMap<String,Buffer>();
62
63 try{
64 JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
65 String ttTag = chunk.getTag("timeStamp");
66 if(ttTag == null){
67 log.warn("timeStamp tag not set in JMX adaptor for hbase master");
68 }
69 else{
70 timeStamp = Long.parseLong(ttTag);
71 }
72 Iterator<JSONObject> iter = obj.entrySet().iterator();
73
74 while(iter.hasNext()){
75 Map.Entry entry = (Map.Entry)iter.next();
76 String key = (String) entry.getKey();
77 Object value = entry.getValue();
78 String valueString = value == null?"":value.toString();
79
80
81 if(rateMap.containsKey(key)){
82 long oldValue = rateMap.get(key);
83 long curValue = Long.parseLong(valueString);
84 rateMap.put(key, curValue);
85 long newValue = curValue - oldValue;
86 if(newValue < 0){
87 log.warn("HBaseMaster rateMap might be reset or corrupted for metric "+key);
88 newValue = 0L;
89 }
90 valueString = Long.toString(newValue);
91 }
92
93 Buffer b = new Buffer(valueString.getBytes());
94 metricsMap.put(key,b);
95 }
96
97 TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap);
98 record.setMapFields(t);
99 buildGenericRecord(record, null, timeStamp, "master");
100 output.collect(key, record);
101 }
102 catch(Exception e){
103 log.error(ExceptionUtil.getStackTrace(e));
104 }
105 }
106 }