1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.text.ParseException;
24 import java.text.SimpleDateFormat;
25 import java.util.Date;
26
27 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
28 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
30 import org.apache.hadoop.mapred.OutputCollector;
31 import org.apache.hadoop.mapred.Reporter;
32 import org.apache.log4j.Logger;
33
34 @Table(name="SystemMetrics",columnFamily="Disk")
35 public class Df extends AbstractProcessor {
36 static Logger log = Logger.getLogger(Df.class);
37
38 private static final String[] headerSplitCols = { "Filesystem", "1K-blocks",
39 "Used", "Available", "Use%", "Mounted", "on" };
40 private static final String[] headerCols = { "Filesystem", "1K-blocks",
41 "Used", "Available", "Use%", "Mounted on" };
42 private SimpleDateFormat sdf = null;
43
44 public Df() {
45 sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
46 }
47
48 @Override
49 protected void parse(String recordEntry,
50 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
51 throws Throwable {
52
53 try {
54 String dStr = recordEntry.substring(0, 23);
55 int start = 24;
56 int idx = recordEntry.indexOf(' ', start);
57
58 start = idx + 1;
59 idx = recordEntry.indexOf(' ', start);
60
61 String body = recordEntry.substring(idx + 1);
62
63 Date d = sdf.parse(dStr);
64 String[] lines = body.split("\n");
65
66 String[] outputCols = lines[0].substring(lines[0].indexOf("Filesystem")).split("[\\s]++");
67
68 if (outputCols.length != headerSplitCols.length
69 || outputCols[0].intern() != headerSplitCols[0].intern()
70 || outputCols[1].intern() != headerSplitCols[1].intern()
71 || outputCols[2].intern() != headerSplitCols[2].intern()
72 || outputCols[3].intern() != headerSplitCols[3].intern()
73 || outputCols[4].intern() != headerSplitCols[4].intern()
74 || outputCols[5].intern() != headerSplitCols[5].intern()
75 || outputCols[6].intern() != headerSplitCols[6].intern()) {
76 throw new DFInvalidRecord("Wrong output format (header) ["
77 + recordEntry + "]");
78 }
79
80 String[] values = null;
81
82
83 ChukwaRecord record = null;
84
85 for (int i = 1; i < lines.length; i++) {
86 values = lines[i].split("[\\s]++");
87 key = new ChukwaRecordKey();
88 record = new ChukwaRecord();
89 this.buildGenericRecord(record, null, d.getTime(), "Df");
90
91 record.add(headerCols[0], values[0]);
92 record.add(headerCols[1], values[1]);
93 record.add(headerCols[2], values[2]);
94 record.add(headerCols[3], values[3]);
95 record.add(headerCols[4], values[4]
96 .substring(0, values[4].length() - 1));
97 record.add(headerCols[5], values[5]);
98
99 output.collect(key, record);
100 }
101
102
103 } catch (ParseException e) {
104 e.printStackTrace();
105 log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
106 throw e;
107 } catch (IOException e) {
108 e.printStackTrace();
109 log.warn("Unable to collect output in DFProcessor [" + recordEntry + "]",
110 e);
111 throw e;
112 } catch (DFInvalidRecord e) {
113 e.printStackTrace();
114 log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
115 throw e;
116 }
117 }
118 }