View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  /**
20   * Demux parser for system metrics data collected through
21   * org.apache.hadoop.chukwa.datacollection.adaptor.sigar.SystemMetrics.
22   */
23  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
24  
25  import java.util.Calendar;
26  import java.util.Iterator;
27  import java.util.Map;
28  import java.util.TimeZone;
29  
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
33  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
34  import org.apache.hadoop.mapred.OutputCollector;
35  import org.apache.hadoop.mapred.Reporter;
36  import org.json.simple.JSONArray;
37  import org.json.simple.JSONObject;
38  import org.json.simple.JSONValue;
39  
40  @Tables(annotations={
41      @Table(name="SystemMetrics",columnFamily="cpu"),
42      @Table(name="SystemMetrics",columnFamily="system"),
43      @Table(name="SystemMetrics",columnFamily="memory"),
44      @Table(name="SystemMetrics",columnFamily="network"),
45      @Table(name="SystemMetrics",columnFamily="disk")
46      })
47  public class SystemMetrics extends AbstractProcessor {
48  
49    @Override
50    protected void parse(String recordEntry,
51        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
52        throws Throwable {
53      JSONObject json = (JSONObject) JSONValue.parse(recordEntry);
54      long timestamp = ((Long)json.get("timestamp")).longValue();
55      ChukwaRecord record = new ChukwaRecord();
56      Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
57      cal.setTimeInMillis(timestamp);
58      cal.set(Calendar.SECOND, 0);
59      cal.set(Calendar.MILLISECOND, 0);
60      JSONArray cpuList = (JSONArray) json.get("cpu");
61      double combined = 0.0;
62      double user = 0.0;
63      double sys = 0.0;
64      double idle = 0.0;
65      int actualSize = 0;
66      for(int i = 0; i< cpuList.size(); i++) {
67        JSONObject cpu = (JSONObject) cpuList.get(i);
68        //Work around for sigar returning null sometimes for cpu metrics on pLinux
69        if(cpu.get("combined") == null){
70      	  continue;
71        }
72        actualSize++;
73        combined = combined + Double.parseDouble(cpu.get("combined").toString());
74        user = user + Double.parseDouble(cpu.get("user").toString());
75        sys = sys + Double.parseDouble(cpu.get("sys").toString());
76        idle = idle + Double.parseDouble(cpu.get("idle").toString());
77        @SuppressWarnings("unchecked")
78        Iterator<Map.Entry<String, ?>> keys = cpu.entrySet().iterator();
79        while(keys.hasNext()) {
80          Map.Entry<String, ?> entry = keys.next();
81          String key = entry.getKey();
82          Object value = entry.getValue();
83          record.add(key + "." + i, value.toString());
84        }
85      }
86      combined = combined / actualSize;
87      user = user / actualSize;
88      sys = sys / actualSize;
89      idle = idle / actualSize;
90      record.add("combined", Double.toString(combined));
91      record.add("user", Double.toString(user));
92      record.add("idle", Double.toString(idle));    
93      record.add("sys", Double.toString(sys));
94      buildGenericRecord(record, null, cal.getTimeInMillis(), "cpu");
95      output.collect(key, record);    
96  
97      record = new ChukwaRecord();
98      record.add("Uptime", json.get("uptime").toString());
99      JSONArray loadavg = (JSONArray) json.get("loadavg");
100     record.add("LoadAverage.1", loadavg.get(0).toString());
101     record.add("LoadAverage.5", loadavg.get(1).toString());
102     record.add("LoadAverage.15", loadavg.get(2).toString());
103     buildGenericRecord(record, null, cal.getTimeInMillis(), "system");
104     output.collect(key, record);    
105 
106     record = new ChukwaRecord();
107     JSONObject memory = (JSONObject) json.get("memory");
108     @SuppressWarnings("unchecked")
109     Iterator<Map.Entry<String, ?>> memKeys = memory.entrySet().iterator();
110     while(memKeys.hasNext()) {
111       Map.Entry<String, ?> entry = memKeys.next();
112       String key = entry.getKey();
113       Object value = entry.getValue();
114       record.add(key, value.toString());
115     }
116     buildGenericRecord(record, null, cal.getTimeInMillis(), "memory");
117     output.collect(key, record);    
118 
119     record = new ChukwaRecord();
120     JSONObject swap = (JSONObject) json.get("swap");
121     @SuppressWarnings("unchecked")
122     Iterator<Map.Entry<String, ?>> swapKeys = swap.entrySet().iterator();
123     while(swapKeys.hasNext()) {
124       Map.Entry<String, ?> entry = swapKeys.next();
125       String key = entry.getKey();
126       Object value = entry.getValue();
127       record.add(key, value.toString());
128     }
129     buildGenericRecord(record, null, cal.getTimeInMillis(), "swap");
130     output.collect(key, record);
131     
132     double rxBytes = 0;
133     double rxDropped = 0;
134     double rxErrors = 0;
135     double rxPackets = 0;
136     double txBytes = 0;
137     double txCollisions = 0;
138     double txErrors = 0;
139     double txPackets = 0;
140     record = new ChukwaRecord();
141     JSONArray netList = (JSONArray) json.get("network");
142     for(int i = 0;i < netList.size(); i++) {
143       JSONObject netIf = (JSONObject) netList.get(i);
144       @SuppressWarnings("unchecked")
145       Iterator<Map.Entry<String, ?>> keys = netIf.entrySet().iterator();
146       while(keys.hasNext()) {
147         Map.Entry<String, ?> entry = keys.next();
148         String key = entry.getKey();
149         Object value = entry.getValue();
150         record.add(key + "." + i, value.toString());
151         if(i!=0) {
152           if(key.equals("RxBytes")) {
153             rxBytes = rxBytes + (Long) value;
154           } else if(key.equals("RxDropped")) {
155             rxDropped = rxDropped + (Long) value;
156           } else if(key.equals("RxErrors")) {          
157             rxErrors = rxErrors + (Long) value;
158           } else if(key.equals("RxPackets")) {
159             rxPackets = rxPackets + (Long) value;
160           } else if(key.equals("TxBytes")) {
161             txBytes = txBytes + (Long) value;
162           } else if(key.equals("TxCollisions")) {
163             txCollisions = txCollisions + (Long) value;
164           } else if(key.equals("TxErrors")) {
165             txErrors = txErrors + (Long) value;
166           } else if(key.equals("TxPackets")) {
167             txPackets = txPackets + (Long) netIf.get(key);
168           }
169         }
170       }
171     }
172     buildGenericRecord(record, null, cal.getTimeInMillis(), "network");
173     record.add("RxBytes", Double.toString(rxBytes));
174     record.add("RxDropped", Double.toString(rxDropped));
175     record.add("RxErrors", Double.toString(rxErrors));
176     record.add("RxPackets", Double.toString(rxPackets));
177     record.add("TxBytes", Double.toString(txBytes));
178     record.add("TxCollisions", Double.toString(txCollisions));
179     record.add("TxErrors", Double.toString(txErrors));
180     record.add("TxPackets", Double.toString(txPackets));
181     output.collect(key, record);    
182     
183     double readBytes = 0;
184     double reads = 0;
185     double writeBytes = 0;
186     double writes = 0;
187     double total = 0;
188     double used = 0;
189     record = new ChukwaRecord();
190     JSONArray diskList = (JSONArray) json.get("disk");
191     for(int i = 0;i < diskList.size(); i++) {
192       JSONObject disk = (JSONObject) diskList.get(i);
193       @SuppressWarnings("unchecked")
194       Iterator<Map.Entry<String, ?>> keys = disk.entrySet().iterator();
195       while(keys.hasNext()) {
196         Map.Entry<String, ?> entry = keys.next();
197         String key = entry.getKey();
198         Object value = entry.getValue();
199         record.add(key + "." + i, value.toString());
200         if(key.equals("ReadBytes")) {
201           readBytes = readBytes + (Long) value;
202         } else if(key.equals("Reads")) {
203           reads = reads + (Long) value;
204         } else if(key.equals("WriteBytes")) {
205           writeBytes = writeBytes + (Long) value;
206         } else if(key.equals("Writes")) {
207           writes = writes + (Long) value;
208         }  else if(key.equals("Total")) {
209           total = total + (Long) value;
210         } else if(key.equals("Used")) {
211           used = used + (Long) value;
212         }
213       }
214     }
215     double percentUsed = used/total; 
216     record.add("ReadBytes", Double.toString(readBytes));
217     record.add("Reads", Double.toString(reads));
218     record.add("WriteBytes", Double.toString(writeBytes));
219     record.add("Writes", Double.toString(writes));
220     record.add("Total", Double.toString(total));
221     record.add("Used", Double.toString(used));
222     record.add("PercentUsed", Double.toString(percentUsed));
223     buildGenericRecord(record, null, cal.getTimeInMillis(), "disk");
224     output.collect(key, record);
225     
226     record = new ChukwaRecord();
227     record.add("cluster", chunk.getTag("cluster"));
228     buildGenericRecord(record, null, cal.getTimeInMillis(), "tags");
229     output.collect(key, record);
230   }
231 
232 }