View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.util.Calendar;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.TimeZone;
26  import java.util.concurrent.ConcurrentHashMap;
27  
28  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32  import org.apache.hadoop.chukwa.util.ExceptionUtil;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  import org.apache.log4j.Logger;
36  import org.json.simple.JSONObject;
37  import org.json.simple.JSONValue;
38  
39  @Tables(annotations = { @Table(name = "Datanode", columnFamily = "dn"),
40      @Table(name = "Datanode", columnFamily = "jvm"),
41      @Table(name = "Datanode", columnFamily = "rpc") })
42  public class DatanodeProcessor extends AbstractProcessor {
43  
44    static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
45  
46    static {
47      long zero = 0L;
48      rateMap.put("blocks_verified", zero);
49      rateMap.put("blocks_written", zero);
50      rateMap.put("blocks_read", zero);
51      rateMap.put("bytes_written", zero);
52      rateMap.put("bytes_read", zero);
53      rateMap.put("heartBeats_num_ops", zero);
54      rateMap.put("SentBytes", zero);
55      rateMap.put("ReceivedBytes", zero);
56      rateMap.put("rpcAuthorizationSuccesses", zero);
57      rateMap.put("rpcAuthorizationFailures", zero);
58      rateMap.put("RpcQueueTime_num_ops", zero);
59      rateMap.put("RpcProcessingTime_num_ops", zero);
60      rateMap.put("gcCount", zero);
61    }
62  
63    @Override
64    protected void parse(String recordEntry,
65        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
66        throws Throwable {
67      Logger log = Logger.getLogger(DatanodeProcessor.class);
68      long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
69          .getTimeInMillis();
70  
71      final ChukwaRecord hdfs_datanode = new ChukwaRecord();
72      final ChukwaRecord datanode_jvm = new ChukwaRecord();
73      final ChukwaRecord datanode_rpc = new ChukwaRecord();
74  
75      Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() {
76        private static final long serialVersionUID = 1L;
77  
78        {
79          put("blocks_verified", hdfs_datanode);
80          put("blocks_written", hdfs_datanode);
81          put("blocks_read", hdfs_datanode);
82          put("blocks_replicated", hdfs_datanode);
83          put("blocks_removed", hdfs_datanode);
84          put("bytes_written", hdfs_datanode);
85          put("bytes_read", hdfs_datanode);
86          put("heartBeats_avg_time", hdfs_datanode);
87          put("heartBeats_num_ops", hdfs_datanode);
88  
89          put("gcCount", datanode_jvm);
90          put("gcTimeMillis", datanode_jvm);
91          put("logError", datanode_jvm);
92          put("logFatal", datanode_jvm);
93          put("logInfo", datanode_jvm);
94          put("logWarn", datanode_jvm);
95          put("memHeapCommittedM", datanode_jvm);
96          put("memHeapUsedM", datanode_jvm);
97          put("threadsBlocked", datanode_jvm);
98          put("threadsNew", datanode_jvm);
99          put("threadsRunnable", datanode_jvm);
100         put("threadsTerminated", datanode_jvm);
101         put("threadsTimedWaiting", datanode_jvm);
102         put("threadsWaiting", datanode_jvm);
103 
104         put("ReceivedBytes", datanode_rpc);
105         put("RpcProcessingTime_avg_time", datanode_rpc);
106         put("RpcProcessingTime_num_ops", datanode_rpc);
107         put("RpcQueueTime_avg_time", datanode_rpc);
108         put("RpcQueueTime_num_ops", datanode_rpc);
109         put("SentBytes", datanode_rpc);
110         put("rpcAuthorizationSuccesses", datanode_rpc);
111       }
112     };
113     try {
114       JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
115       String ttTag = chunk.getTag("timeStamp");
116       if (ttTag == null) {
117         log.warn("timeStamp tag not set in JMX adaptor for datanode");
118       } else {
119         timeStamp = Long.parseLong(ttTag);
120       }
121       @SuppressWarnings("unchecked")
122       Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
123 
124       while (keys.hasNext()) {
125         Map.Entry<String, ?> entry = keys.next();
126         String key = entry.getKey();
127         Object value = entry.getValue();
128         String valueString = value == null ? "" : value.toString();
129 
130         // Calculate rate for some of the metrics
131         if (rateMap.containsKey(key)) {
132           long oldValue = rateMap.get(key);
133           long curValue = Long.parseLong(valueString);
134           rateMap.put(key, curValue);
135           long newValue = curValue - oldValue;
136           if (newValue < 0) {
137             log.error("DatanodeProcessor's rateMap might be reset or corrupted for metric "
138                 + key);
139             newValue = 0L;
140           }
141           valueString = Long.toString(newValue);
142         }
143 
144         if (metricsMap.containsKey(key)) {
145           ChukwaRecord rec = metricsMap.get(key);
146           rec.add(key, valueString);
147         }
148       }
149       buildGenericRecord(hdfs_datanode, null, timeStamp, "dn");
150       output.collect(key, hdfs_datanode);
151       buildGenericRecord(datanode_jvm, null, timeStamp, "jvm");
152       output.collect(key, datanode_jvm);
153       buildGenericRecord(datanode_rpc, null, timeStamp, "rpc");
154       output.collect(key, datanode_rpc);
155     } catch (Exception e) {
156       log.error(ExceptionUtil.getStackTrace(e));
157     }
158   }
159 }