View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.nio.charset.Charset;
22  import java.util.Calendar;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.Map;
26  import java.util.TimeZone;
27  import java.util.TreeMap;
28  import java.util.concurrent.ConcurrentHashMap;
29  
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
33  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
34  import org.apache.hadoop.chukwa.util.ExceptionUtil;
35  import org.apache.hadoop.mapred.OutputCollector;
36  import org.apache.hadoop.mapred.Reporter;
37  import org.apache.hadoop.record.Buffer;
38  import org.apache.log4j.Logger;
39  import org.json.simple.JSONObject;
40  import org.json.simple.JSONValue;
41  
42  @Tables(annotations = { @Table(name = "HBase", columnFamily = "master") })
43  public class HBaseMasterProcessor extends AbstractProcessor {
44    static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
45    static {
46      long zero = 0L;
47      rateMap.put("splitSizeNumOps", zero);
48      rateMap.put("splitTimeNumOps", zero);
49    }
50  
51    @Override
52    protected void parse(String recordEntry,
53        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
54        throws Throwable {
55  
56      Logger log = Logger.getLogger(HBaseMasterProcessor.class);
57      long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
58          .getTimeInMillis();
59      ChukwaRecord record = new ChukwaRecord();
60  
61      Map<String, Buffer> metricsMap = new HashMap<String, Buffer>();
62  
63      try {
64        JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
65        String ttTag = chunk.getTag("timeStamp");
66        if (ttTag == null) {
67          log.warn("timeStamp tag not set in JMX adaptor for hbase master");
68        } else {
69          timeStamp = Long.parseLong(ttTag);
70        }
71        @SuppressWarnings("unchecked")
72        Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
73  
74        while (keys.hasNext()) {
75          Map.Entry<String, ?> entry = keys.next();
76          String key = entry.getKey();
77          Object value = entry.getValue();
78          String valueString = value == null ? "" : value.toString();
79  
80          // Calculate rate for some of the metrics
81          if (rateMap.containsKey(key)) {
82            long oldValue = rateMap.get(key);
83            long curValue = Long.parseLong(valueString);
84            rateMap.put(key, curValue);
85            long newValue = curValue - oldValue;
86            if (newValue < 0) {
87              log.warn("HBaseMaster rateMap might be reset or corrupted for metric "
88                  + key);
89              newValue = 0L;
90            }
91            valueString = Long.toString(newValue);
92          }
93  
94          Buffer b = new Buffer(valueString.getBytes(Charset.forName("UTF-8")));
95          metricsMap.put(key, b);
96        }
97  
98        TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap);
99        record.setMapFields(t);
100       buildGenericRecord(record, null, timeStamp, "master");
101       output.collect(key, record);
102     } catch (Exception e) {
103       log.error(ExceptionUtil.getStackTrace(e));
104     }
105   }
106 }