1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
24
25 import java.util.Calendar;
26 import java.util.Iterator;
27 import java.util.Map;
28 import java.util.TimeZone;
29
30 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
33 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
34 import org.apache.hadoop.mapred.OutputCollector;
35 import org.apache.hadoop.mapred.Reporter;
36 import org.json.simple.JSONArray;
37 import org.json.simple.JSONObject;
38 import org.json.simple.JSONValue;
39
40 @Tables(annotations={
41 @Table(name="SystemMetrics",columnFamily="cpu"),
42 @Table(name="SystemMetrics",columnFamily="system"),
43 @Table(name="SystemMetrics",columnFamily="memory"),
44 @Table(name="SystemMetrics",columnFamily="network"),
45 @Table(name="SystemMetrics",columnFamily="disk")
46 })
47 public class SystemMetrics extends AbstractProcessor {
48
49 @Override
50 protected void parse(String recordEntry,
51 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
52 throws Throwable {
53 JSONObject json = (JSONObject) JSONValue.parse(recordEntry);
54 long timestamp = ((Long)json.get("timestamp")).longValue();
55 ChukwaRecord record = new ChukwaRecord();
56 Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
57 cal.setTimeInMillis(timestamp);
58 cal.set(Calendar.SECOND, 0);
59 cal.set(Calendar.MILLISECOND, 0);
60 JSONArray cpuList = (JSONArray) json.get("cpu");
61 double combined = 0.0;
62 double user = 0.0;
63 double sys = 0.0;
64 double idle = 0.0;
65 int actualSize = 0;
66 for(int i = 0; i< cpuList.size(); i++) {
67 JSONObject cpu = (JSONObject) cpuList.get(i);
68
69 if(cpu.get("combined") == null){
70 continue;
71 }
72 actualSize++;
73 combined = combined + Double.parseDouble(cpu.get("combined").toString());
74 user = user + Double.parseDouble(cpu.get("user").toString());
75 sys = sys + Double.parseDouble(cpu.get("sys").toString());
76 idle = idle + Double.parseDouble(cpu.get("idle").toString());
77 @SuppressWarnings("unchecked")
78 Iterator<Map.Entry<String, ?>> keys = cpu.entrySet().iterator();
79 while(keys.hasNext()) {
80 Map.Entry<String, ?> entry = keys.next();
81 String key = entry.getKey();
82 Object value = entry.getValue();
83 record.add(key + "." + i, value.toString());
84 }
85 }
86 combined = combined / actualSize;
87 user = user / actualSize;
88 sys = sys / actualSize;
89 idle = idle / actualSize;
90 record.add("combined", Double.toString(combined));
91 record.add("user", Double.toString(user));
92 record.add("idle", Double.toString(idle));
93 record.add("sys", Double.toString(sys));
94 buildGenericRecord(record, null, cal.getTimeInMillis(), "cpu");
95 output.collect(key, record);
96
97 record = new ChukwaRecord();
98 record.add("Uptime", json.get("uptime").toString());
99 JSONArray loadavg = (JSONArray) json.get("loadavg");
100 record.add("LoadAverage.1", loadavg.get(0).toString());
101 record.add("LoadAverage.5", loadavg.get(1).toString());
102 record.add("LoadAverage.15", loadavg.get(2).toString());
103 buildGenericRecord(record, null, cal.getTimeInMillis(), "system");
104 output.collect(key, record);
105
106 record = new ChukwaRecord();
107 JSONObject memory = (JSONObject) json.get("memory");
108 @SuppressWarnings("unchecked")
109 Iterator<Map.Entry<String, ?>> memKeys = memory.entrySet().iterator();
110 while(memKeys.hasNext()) {
111 Map.Entry<String, ?> entry = memKeys.next();
112 String key = entry.getKey();
113 Object value = entry.getValue();
114 record.add(key, value.toString());
115 }
116 buildGenericRecord(record, null, cal.getTimeInMillis(), "memory");
117 output.collect(key, record);
118
119 record = new ChukwaRecord();
120 JSONObject swap = (JSONObject) json.get("swap");
121 @SuppressWarnings("unchecked")
122 Iterator<Map.Entry<String, ?>> swapKeys = swap.entrySet().iterator();
123 while(swapKeys.hasNext()) {
124 Map.Entry<String, ?> entry = swapKeys.next();
125 String key = entry.getKey();
126 Object value = entry.getValue();
127 record.add(key, value.toString());
128 }
129 buildGenericRecord(record, null, cal.getTimeInMillis(), "swap");
130 output.collect(key, record);
131
132 double rxBytes = 0;
133 double rxDropped = 0;
134 double rxErrors = 0;
135 double rxPackets = 0;
136 double txBytes = 0;
137 double txCollisions = 0;
138 double txErrors = 0;
139 double txPackets = 0;
140 record = new ChukwaRecord();
141 JSONArray netList = (JSONArray) json.get("network");
142 for(int i = 0;i < netList.size(); i++) {
143 JSONObject netIf = (JSONObject) netList.get(i);
144 @SuppressWarnings("unchecked")
145 Iterator<Map.Entry<String, ?>> keys = netIf.entrySet().iterator();
146 while(keys.hasNext()) {
147 Map.Entry<String, ?> entry = keys.next();
148 String key = entry.getKey();
149 Object value = entry.getValue();
150 record.add(key + "." + i, value.toString());
151 if(i!=0) {
152 if(key.equals("RxBytes")) {
153 rxBytes = rxBytes + (Long) value;
154 } else if(key.equals("RxDropped")) {
155 rxDropped = rxDropped + (Long) value;
156 } else if(key.equals("RxErrors")) {
157 rxErrors = rxErrors + (Long) value;
158 } else if(key.equals("RxPackets")) {
159 rxPackets = rxPackets + (Long) value;
160 } else if(key.equals("TxBytes")) {
161 txBytes = txBytes + (Long) value;
162 } else if(key.equals("TxCollisions")) {
163 txCollisions = txCollisions + (Long) value;
164 } else if(key.equals("TxErrors")) {
165 txErrors = txErrors + (Long) value;
166 } else if(key.equals("TxPackets")) {
167 txPackets = txPackets + (Long) netIf.get(key);
168 }
169 }
170 }
171 }
172 buildGenericRecord(record, null, cal.getTimeInMillis(), "network");
173 record.add("RxBytes", Double.toString(rxBytes));
174 record.add("RxDropped", Double.toString(rxDropped));
175 record.add("RxErrors", Double.toString(rxErrors));
176 record.add("RxPackets", Double.toString(rxPackets));
177 record.add("TxBytes", Double.toString(txBytes));
178 record.add("TxCollisions", Double.toString(txCollisions));
179 record.add("TxErrors", Double.toString(txErrors));
180 record.add("TxPackets", Double.toString(txPackets));
181 output.collect(key, record);
182
183 double readBytes = 0;
184 double reads = 0;
185 double writeBytes = 0;
186 double writes = 0;
187 double total = 0;
188 double used = 0;
189 record = new ChukwaRecord();
190 JSONArray diskList = (JSONArray) json.get("disk");
191 for(int i = 0;i < diskList.size(); i++) {
192 JSONObject disk = (JSONObject) diskList.get(i);
193 @SuppressWarnings("unchecked")
194 Iterator<Map.Entry<String, ?>> keys = disk.entrySet().iterator();
195 while(keys.hasNext()) {
196 Map.Entry<String, ?> entry = keys.next();
197 String key = entry.getKey();
198 Object value = entry.getValue();
199 record.add(key + "." + i, value.toString());
200 if(key.equals("ReadBytes")) {
201 readBytes = readBytes + (Long) value;
202 } else if(key.equals("Reads")) {
203 reads = reads + (Long) value;
204 } else if(key.equals("WriteBytes")) {
205 writeBytes = writeBytes + (Long) value;
206 } else if(key.equals("Writes")) {
207 writes = writes + (Long) value;
208 } else if(key.equals("Total")) {
209 total = total + (Long) value;
210 } else if(key.equals("Used")) {
211 used = used + (Long) value;
212 }
213 }
214 }
215 double percentUsed = used/total;
216 record.add("ReadBytes", Double.toString(readBytes));
217 record.add("Reads", Double.toString(reads));
218 record.add("WriteBytes", Double.toString(writeBytes));
219 record.add("Writes", Double.toString(writes));
220 record.add("Total", Double.toString(total));
221 record.add("Used", Double.toString(used));
222 record.add("PercentUsed", Double.toString(percentUsed));
223 buildGenericRecord(record, null, cal.getTimeInMillis(), "disk");
224 output.collect(key, record);
225
226 record = new ChukwaRecord();
227 record.add("cluster", chunk.getTag("cluster"));
228 buildGenericRecord(record, null, cal.getTimeInMillis(), "tags");
229 output.collect(key, record);
230 }
231
232 }