View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.text.ParseException;
24  import java.text.SimpleDateFormat;
25  import java.util.Calendar;
26  import java.util.Date;
27  import java.util.Iterator;
28  import java.util.Map;
29  
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
31  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
33  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
34  import org.apache.hadoop.chukwa.extraction.engine.Record;
35  import org.apache.hadoop.mapred.OutputCollector;
36  import org.apache.hadoop.mapred.Reporter;
37  import org.apache.log4j.Logger;
38  import org.json.simple.JSONObject;
39  import org.json.simple.JSONValue;
40  
41  @Tables(annotations={
42  @Table(name="Hadoop",columnFamily="jvm_metrics"),
43  @Table(name="Hadoop",columnFamily="mapred_metrics"),
44  @Table(name="Hadoop",columnFamily="dfs_metrics"),
45  @Table(name="Hadoop",columnFamily="dfs_namenode"),
46  @Table(name="Hadoop",columnFamily="dfs_FSNamesystem"),
47  @Table(name="Hadoop",columnFamily="dfs_datanode"),
48  @Table(name="Hadoop",columnFamily="mapred_jobtracker"),
49  @Table(name="Hadoop",columnFamily="mapred_shuffleInput"),
50  @Table(name="Hadoop",columnFamily="mapred_shuffleOutput"),
51  @Table(name="Hadoop",columnFamily="mapred_tasktracker"),
52  @Table(name="Hadoop",columnFamily="rpc_metrics")
53  })
54  public class HadoopMetricsProcessor extends AbstractProcessor {  
55    static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
56    static final String chukwaTimestampField = "timestamp";
57    static final String contextNameField = "contextName";
58    static final String recordNameField = "recordName";
59  
60    private SimpleDateFormat sdf = null;
61  
62    public HadoopMetricsProcessor() {
63      // TODO move that to config
64      sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
65    }
66  
67    @SuppressWarnings("unchecked")
68    @Override
69    protected void parse(String recordEntry,
70        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
71        throws Throwable {
72      try {
73        // Look for syslog PRI, if PRI is not found, start from offset of 0.
74        int idx = recordEntry.indexOf('>', 0);  
75        String dStr = recordEntry.substring(idx+1, idx+23);
76        int start = idx + 25;
77        idx = recordEntry.indexOf(' ', start);
78        // String level = recordEntry.substring(start, idx);
79        start = idx + 1;
80        idx = recordEntry.indexOf(' ', start);
81        // String className = recordEntry.substring(start, idx-1);
82        String body = recordEntry.substring(idx + 1);
83        body = body.replaceAll("\n", "");
84        // log.info("record [" + recordEntry + "] body [" + body +"]");
85        Date d = sdf.parse(dStr);
86  
87        start = body.indexOf('{');
88        JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
89  
90        ChukwaRecord record = new ChukwaRecord();
91        StringBuilder datasource = new StringBuilder();
92        String contextName = null;
93        String recordName = null;
94  
95        Iterator<Map.Entry<String, ?>> ki = json.entrySet().iterator();
96        while (ki.hasNext()) {
97          Map.Entry<String, ?> entry = ki.next();
98          String keyName = entry.getKey();
99          Object value = entry.getValue();
100         if (chukwaTimestampField.intern() == keyName.intern()) {
101           d = new Date((Long) value);
102           Calendar cal = Calendar.getInstance();
103           cal.setTimeInMillis(d.getTime());
104           cal.set(Calendar.SECOND, 0);
105           cal.set(Calendar.MILLISECOND, 0);
106           d.setTime(cal.getTimeInMillis());
107         } else if (contextNameField.intern() == keyName.intern()) {
108           contextName = (String) value;
109         } else if (recordNameField.intern() == keyName.intern()) {
110           recordName = (String) value;
111           record.add(keyName, value.toString());
112         } else {
113           if(json.get(keyName)!=null) {
114             record.add(keyName, value.toString());
115           }
116         }
117       }
118       if(contextName!=null) {
119         datasource.append(contextName);
120         datasource.append("_");
121       }
122       datasource.append(recordName);
123       record.add("cluster", chunk.getTag("cluster"));
124       if(contextName!=null && contextName.equals("jvm")) {
125         buildJVMRecord(record, d.getTime(), datasource.toString());        
126       } else {
127         buildGenericRecord(record, null, d.getTime(), datasource.toString());
128       }
129       output.collect(key, record);
130     } catch (ParseException e) {
131       log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
132           e);
133       throw e;
134     } catch (IOException e) {
135       log.warn("Unable to collect output in HadoopMetricsProcessor ["
136           + recordEntry + "]", e);
137       throw e;
138     } catch (Exception e) {
139       log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
140           e);
141       throw e;
142     }
143 
144   }
145 
146   protected void buildJVMRecord(ChukwaRecord record, long timestamp, String dataSource) {
147     calendar.setTimeInMillis(timestamp);
148     calendar.set(Calendar.MINUTE, 0);
149     calendar.set(Calendar.SECOND, 0);
150     calendar.set(Calendar.MILLISECOND, 0);
151 
152     key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + ":" + 
153         record.getValue("processName")+ "/" + timestamp);
154     key.setReduceType(dataSource);
155     record.setTime(timestamp);
156 
157     record.add(Record.tagsField, chunk.getTags());
158     record.add(Record.sourceField, chunk.getSource());
159     record.add(Record.applicationField, chunk.getStreamName());
160   }
161   
162   public String getDataType() {
163     return HadoopMetricsProcessor.class.getName();
164   }
165 
166 }