View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.util.Calendar;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.TimeZone;
26  import java.util.TreeMap;
27  import java.util.concurrent.ConcurrentHashMap;
28  
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33  import org.apache.hadoop.chukwa.util.ExceptionUtil;
34  import org.apache.hadoop.mapred.OutputCollector;
35  import org.apache.hadoop.mapred.Reporter;
36  import org.apache.hadoop.record.Buffer;
37  import org.apache.log4j.Logger;
38  import org.json.simple.JSONObject;
39  import org.json.simple.JSONValue;
40  
41  @Tables(annotations={
42  @Table(name="HBase",columnFamily="master")
43  })
44  public class HBaseMasterProcessor extends AbstractProcessor{
45  	static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
46  	static {
47  		long zero = 0L;	
48  		rateMap.put("splitSizeNumOps", zero);
49  		rateMap.put("splitTimeNumOps", zero);
50  	}
51  	
52  	@Override
53  	protected void parse(String recordEntry,
54  			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
55  			Reporter reporter) throws Throwable {
56  		
57  		Logger log = Logger.getLogger(HBaseMasterProcessor.class); 
58  		long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
59  		ChukwaRecord record = new ChukwaRecord();
60  		
61  		Map<String, Buffer> metricsMap = new HashMap<String,Buffer>();
62  
63  		try{
64  			JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);	
65  			String ttTag = chunk.getTag("timeStamp");
66  			if(ttTag == null){
67  				log.warn("timeStamp tag not set in JMX adaptor for hbase master");
68  			}
69  			else{
70  				timeStamp = Long.parseLong(ttTag);
71  			}
72  			Iterator<JSONObject> iter = obj.entrySet().iterator();
73  			
74  			while(iter.hasNext()){
75  				Map.Entry entry = (Map.Entry)iter.next();
76  				String key = (String) entry.getKey();
77  				Object value = entry.getValue();
78  				String valueString = value == null?"":value.toString();	
79  				
80  				//Calculate rate for some of the metrics
81  				if(rateMap.containsKey(key)){
82  					long oldValue = rateMap.get(key);
83  					long curValue = Long.parseLong(valueString);
84  					rateMap.put(key, curValue);
85  					long newValue = curValue - oldValue;
86  					if(newValue < 0){
87  						log.warn("HBaseMaster rateMap might be reset or corrupted for metric "+key);						
88  						newValue = 0L;
89  					}					
90  					valueString = Long.toString(newValue);
91  				}
92  				
93  				Buffer b = new Buffer(valueString.getBytes());
94  				metricsMap.put(key,b);				
95  			}			
96  			
97  			TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap);
98  			record.setMapFields(t);			
99  			buildGenericRecord(record, null, timeStamp, "master");
100 			output.collect(key, record);
101 		}
102 		catch(Exception e){
103 			log.error(ExceptionUtil.getStackTrace(e));
104 		}		
105 	}	
106 }