View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.text.ParseException;
24  import java.text.SimpleDateFormat;
25  import java.util.Calendar;
26  import java.util.Date;
27  
28  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32  import org.apache.hadoop.chukwa.extraction.engine.Record;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  import org.apache.log4j.Logger;
36  
37  @Tables(annotations={
38  @Table(name="Jobs",columnFamily="summary")
39  })
40  public class JobSummary extends AbstractProcessor {  
41    static Logger log = Logger.getLogger(JobSummary.class);
42    static final String chukwaTimestampField = "chukwa_timestamp";
43    static final String contextNameField = "contextName";
44    static final String recordNameField = "recordName";
45  
46    private SimpleDateFormat sdf = null;
47  
48    public JobSummary() {
49      // TODO move that to config
50      sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
51    }
52  
53    @SuppressWarnings("unchecked")
54    @Override
55    protected void parse(String recordEntry,
56        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
57        throws Throwable {
58      try {
59        // Look for syslog PRI, if PRI is not found, start from offset of 0.
60        int idx = recordEntry.indexOf('>', 0);  
61        String dStr = recordEntry.substring(idx+1, idx+23);
62        int start = idx + 25;
63        idx = recordEntry.indexOf(' ', start);
64        // String level = recordEntry.substring(start, idx);
65        start = idx + 1;
66        idx = recordEntry.indexOf(' ', start);
67        // String className = recordEntry.substring(start, idx-1);
68        String body = recordEntry.substring(idx + 1);
69        body = body.replaceAll("\n", "");
70        // log.info("record [" + recordEntry + "] body [" + body +"]");
71        Date d = sdf.parse(dStr);
72  
73        ChukwaRecord record = new ChukwaRecord();
74  
75        String[] list = body.split(",");
76        for(String pair : list) {
77          String[] kv = pair.split("=");
78          record.add(kv[0], kv[1]);
79        }
80        record.add("cluster", chunk.getTag("cluster"));
81        buildGenericRecord(record, d.getTime(), "summary");
82        output.collect(key, record);
83      } catch (ParseException e) {
84        log.warn("Wrong format in JobSummary [" + recordEntry + "]",
85            e);
86        throw e;
87      } catch (IOException e) {
88        log.warn("Unable to collect output in JobSummary ["
89            + recordEntry + "]", e);
90        throw e;
91      } catch (Exception e) {
92        log.warn("Wrong format in JobSummary [" + recordEntry + "]",
93            e);
94        throw e;
95      }
96  
97    }
98  
99    protected void buildGenericRecord(ChukwaRecord record,
100       long timestamp, String dataSource) {
101     calendar.setTimeInMillis(timestamp);
102     calendar.set(Calendar.MINUTE, 0);
103     calendar.set(Calendar.SECOND, 0);
104     calendar.set(Calendar.MILLISECOND, 0);
105 
106     key.setKey("" + calendar.getTimeInMillis() + "/" + record.getValue("jobId") + "/"
107         + timestamp);
108     key.setReduceType(dataSource);
109     record.setTime(timestamp);
110 
111     record.add(Record.tagsField, chunk.getTags());
112     record.add(Record.sourceField, chunk.getSource());
113     record.add(Record.applicationField, chunk.getStreamName());
114 
115   }
116   
117   public String getDataType() {
118     return JobSummary.class.getName();
119   }
120 
121 }