View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.util.Calendar;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.TimeZone;
26  import java.util.concurrent.ConcurrentHashMap;
27  
28  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32  import org.apache.hadoop.chukwa.util.ExceptionUtil;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  import org.apache.log4j.Logger;
36  import org.json.simple.JSONObject;
37  import org.json.simple.JSONValue;
38  
39  @Tables(annotations={
40  @Table(name="Datanode",columnFamily="dn"),
41  @Table(name="Datanode",columnFamily="jvm"),
42  @Table(name="Datanode",columnFamily="rpc")
43  })
44  public class DatanodeProcessor extends AbstractProcessor{
45  
46  	static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
47  	
48  	static {
49  		long zero = 0L;
50  		rateMap.put("blocks_verified", zero);
51  		rateMap.put("blocks_written", zero);
52  		rateMap.put("blocks_read", zero);
53  		rateMap.put("bytes_written", zero);
54  		rateMap.put("bytes_read", zero);
55  		rateMap.put("heartBeats_num_ops", zero);		
56  		rateMap.put("SentBytes", zero);
57  		rateMap.put("ReceivedBytes", zero);
58  		rateMap.put("rpcAuthorizationSuccesses", zero);
59  		rateMap.put("rpcAuthorizationFailures", zero);
60  		rateMap.put("RpcQueueTime_num_ops", zero);
61  		rateMap.put("RpcProcessingTime_num_ops", zero);
62  		rateMap.put("gcCount", zero);
63  	}
64  	
65  	@Override
66  	protected void parse(String recordEntry,
67  			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
68  			Reporter reporter) throws Throwable {
69  		Logger log = Logger.getLogger(DatanodeProcessor.class); 
70  		long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();		
71  		
72  		final ChukwaRecord hdfs_datanode = new ChukwaRecord();
73  		final ChukwaRecord datanode_jvm = new ChukwaRecord();
74  		final ChukwaRecord datanode_rpc = new ChukwaRecord();
75  		
76  		Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){
77  			private static final long serialVersionUID = 1L;
78  
79  			{
80  				put("blocks_verified", hdfs_datanode);
81  				put("blocks_written", hdfs_datanode);
82  				put("blocks_read", hdfs_datanode);
83  				put("blocks_replicated", hdfs_datanode);
84  				put("blocks_removed", hdfs_datanode);
85  				put("bytes_written", hdfs_datanode);
86  				put("bytes_read", hdfs_datanode);
87  				put("heartBeats_avg_time", hdfs_datanode);
88  				put("heartBeats_num_ops", hdfs_datanode);
89  				
90  				put("gcCount", datanode_jvm);
91  				put("gcTimeMillis", datanode_jvm);
92  				put("logError", datanode_jvm);
93  				put("logFatal", datanode_jvm);
94  				put("logInfo", datanode_jvm);
95  				put("logWarn", datanode_jvm);
96  				put("memHeapCommittedM", datanode_jvm);
97  				put("memHeapUsedM", datanode_jvm);
98  				put("threadsBlocked", datanode_jvm);
99  				put("threadsNew", datanode_jvm);
100 				put("threadsRunnable", datanode_jvm);
101 				put("threadsTerminated", datanode_jvm);
102 				put("threadsTimedWaiting", datanode_jvm);
103 				put("threadsWaiting", datanode_jvm);
104 
105 				put("ReceivedBytes", datanode_rpc);				
106 				put("RpcProcessingTime_avg_time", datanode_rpc);	
107 				put("RpcProcessingTime_num_ops", datanode_rpc);	
108 				put("RpcQueueTime_avg_time", datanode_rpc);	
109 				put("RpcQueueTime_num_ops", datanode_rpc);	
110 				put("SentBytes", datanode_rpc);	
111 				put("rpcAuthorizationSuccesses", datanode_rpc);
112 			}
113 		};
114 		try{
115 			JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);	
116 			String ttTag = chunk.getTag("timeStamp");
117 			if(ttTag == null){
118 				log.warn("timeStamp tag not set in JMX adaptor for datanode");
119 			}
120 			else{
121 				timeStamp = Long.parseLong(ttTag);
122 			}
123 			Iterator<JSONObject> iter = obj.entrySet().iterator();
124 			
125 			while(iter.hasNext()){
126 				Map.Entry entry = (Map.Entry)iter.next();
127 				String key = (String) entry.getKey();
128 				Object value = entry.getValue();
129 				String valueString = value == null?"":value.toString();	
130 				
131 				//Calculate rate for some of the metrics
132 				if(rateMap.containsKey(key)){
133 					long oldValue = rateMap.get(key);
134 					long curValue = Long.parseLong(valueString);
135 					rateMap.put(key, curValue);
136 					long newValue = curValue - oldValue;
137 					if(newValue < 0){
138 						log.error("DatanodeProcessor's rateMap might be reset or corrupted for metric "+key);
139 						newValue = 0L;
140 					}					
141 					valueString = Long.toString(newValue);
142 				}
143 				
144 				if(metricsMap.containsKey(key)){
145 					ChukwaRecord rec = metricsMap.get(key);
146 					rec.add(key, valueString);
147 				}				
148 			}
149 			buildGenericRecord(hdfs_datanode, null, timeStamp, "dn");
150 			output.collect(key, hdfs_datanode);
151 			buildGenericRecord(datanode_jvm, null, timeStamp, "jvm");
152 			output.collect(key, datanode_jvm);
153 			buildGenericRecord(datanode_rpc, null, timeStamp, "rpc");
154 			output.collect(key, datanode_rpc);
155 		}
156 		catch(Exception e){
157 			log.error(ExceptionUtil.getStackTrace(e));
158 		}		
159 	}	
160 }