1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.metrics.spi;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.Timer;
32 import java.util.TimerTask;
33 import java.util.TreeMap;
34 import java.util.Map.Entry;
35
36 import org.apache.hadoop.metrics.ContextFactory;
37 import org.apache.hadoop.metrics.MetricsContext;
38 import org.apache.hadoop.metrics.MetricsException;
39 import org.apache.hadoop.metrics.MetricsRecord;
40 import org.apache.hadoop.metrics.Updater;
41
42
43
44
45
46
47
48
49
50
51
52 public abstract class AbstractMetricsContext implements MetricsContext {
53
54 private int period = MetricsContext.DEFAULT_PERIOD;
55 private Timer timer = null;
56 private boolean computeRate = true;
57 private Set<Updater> updaters = new HashSet<Updater>(1);
58 private volatile boolean isMonitoring = false;
59
60 private ContextFactory factory = null;
61 private String contextName = null;
62
63 static class TagMap extends TreeMap<String,Object> {
64 private static final long serialVersionUID = 3546309335061952993L;
65 TagMap() {
66 super();
67 }
68 TagMap(TagMap orig) {
69 super(orig);
70 }
71
72
73
74
75
76 public boolean containsAll(TagMap other) {
77 for (Map.Entry<String,Object> entry : other.entrySet()) {
78 Object value = get(entry.getKey());
79 if (value == null || !value.equals(entry.getValue())) {
80
81 return false;
82 }
83 }
84 return true;
85 }
86 }
87
88 static class MetricMap extends TreeMap<String,Number> {
89 private static final long serialVersionUID = -7495051861141631609L;
90 }
91
92 static class RecordMap extends HashMap<TagMap,MetricMap> {
93 private static final long serialVersionUID = 259835619700264611L;
94 }
95
96 private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
97
98
99
100
101
102 protected AbstractMetricsContext() {
103 }
104
105
106
107
108 public void init(String contextName, ContextFactory factory)
109 {
110 this.contextName = contextName;
111 this.factory = factory;
112 }
113
114
115
116
117 protected String getAttribute(String attributeName) {
118 String factoryAttribute = contextName + "." + attributeName;
119 return (String) factory.getAttribute(factoryAttribute);
120 }
121
122
123
124
125
126
127
128 protected Map<String,String> getAttributeTable(String tableName) {
129 String prefix = contextName + "." + tableName + ".";
130 Map<String,String> result = new HashMap<String,String>();
131 for (String attributeName : factory.getAttributeNames()) {
132 if (attributeName.startsWith(prefix)) {
133 String name = attributeName.substring(prefix.length());
134 String value = (String) factory.getAttribute(attributeName);
135 result.put(name, value);
136 }
137 }
138 return result;
139 }
140
141
142
143
144 public String getContextName() {
145 return contextName;
146 }
147
148
149
150
151
152 public ContextFactory getContextFactory() {
153 return factory;
154 }
155
156
157
158
159 public synchronized void startMonitoring()
160 throws IOException {
161 if (!isMonitoring) {
162 startTimer();
163 isMonitoring = true;
164 }
165 }
166
167
168
169
170
171 public synchronized void stopMonitoring() {
172 if (isMonitoring) {
173 stopTimer();
174 isMonitoring = false;
175 }
176 }
177
178
179
180
181 public boolean isMonitoring() {
182 return isMonitoring;
183 }
184
185
186
187
188
189 public synchronized void close() {
190 stopMonitoring();
191 clearUpdaters();
192 }
193
194
195
196
197
198
199
200
201
202 public final synchronized MetricsRecord createRecord(String recordName) {
203 if (bufferedData.get(recordName) == null) {
204 bufferedData.put(recordName, new RecordMap());
205 }
206 return newRecord(recordName);
207 }
208
209
210
211
212
213
214 protected MetricsRecord newRecord(String recordName) {
215 return new MetricsRecordImpl(recordName, this);
216 }
217
218
219
220
221
222
223
224
225 public synchronized void registerUpdater(final Updater updater) {
226 if (!updaters.contains(updater)) {
227 updaters.add(updater);
228 }
229 }
230
231
232
233
234
235
236 public synchronized void unregisterUpdater(Updater updater) {
237 updaters.remove(updater);
238 }
239
240 private synchronized void clearUpdaters() {
241 updaters.clear();
242 }
243
244
245
246
247 private synchronized void startTimer() {
248 if (timer == null) {
249 timer = new Timer("Timer thread for monitoring " + getContextName(),
250 true);
251 TimerTask task = new TimerTask() {
252 public void run() {
253 try {
254 timerEvent();
255 }
256 catch (IOException ioe) {
257 ioe.printStackTrace();
258 }
259 }
260 };
261 long millis = period * 1000;
262 timer.scheduleAtFixedRate(task, millis, millis);
263 }
264 }
265
266
267
268
269 private synchronized void stopTimer() {
270 if (timer != null) {
271 timer.cancel();
272 timer = null;
273 }
274 }
275
276
277
278
279 private void timerEvent() throws IOException {
280 if (isMonitoring) {
281 Collection<Updater> myUpdaters;
282 synchronized (this) {
283 myUpdaters = new ArrayList<Updater>(updaters);
284 }
285
286
287 for (Updater updater : myUpdaters) {
288 try {
289 updater.doUpdates(this);
290 }
291 catch (Throwable throwable) {
292 throwable.printStackTrace();
293 }
294 }
295 emitRecords();
296 }
297 }
298
299
300
301
302 private synchronized void emitRecords() throws IOException {
303 for (Entry<String, RecordMap> record : bufferedData.entrySet()) {
304 String recordName = record.getKey();
305 RecordMap recordMap = record.getValue();
306 for (Entry<TagMap, MetricMap> entry : record.getValue().entrySet()) {
307 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
308 emitRecord(contextName, recordName, outRec);
309 }
310 }
311 flush();
312 }
313
314
315
316
317 protected abstract void emitRecord(String contextName, String recordName,
318 OutputRecord outRec) throws IOException;
319
320
321
322
323
324 protected void flush() throws IOException {
325 }
326
327
328
329
330
331 protected void update(MetricsRecordImpl record) {
332
333 String recordName = record.getRecordName();
334 TagMap tagTable = record.getTagTable();
335 Map<String,MetricValue> metricUpdates = record.getMetricTable();
336
337 RecordMap recordMap = getRecordMap(recordName);
338 synchronized (recordMap) {
339 MetricMap metricMap = recordMap.get(tagTable);
340 if (metricMap == null) {
341 metricMap = new MetricMap();
342 TagMap tagMap = new TagMap(tagTable);
343 recordMap.put(tagMap, metricMap);
344 }
345
346 Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
347 for (Entry<String, MetricValue> entry : entrySet) {
348 String metricName = entry.getKey ();
349 MetricValue updateValue = entry.getValue ();
350 Number updateNumber = updateValue.getNumber();
351 Number currentNumber = metricMap.get(metricName);
352 if (currentNumber == null || updateValue.isAbsolute()) {
353 metricMap.put(metricName, updateNumber);
354 }
355 else {
356 Number newNumber = sum(updateNumber, currentNumber);
357 metricMap.put(metricName, newNumber);
358 metricMap.put(metricName+"_raw", updateNumber);
359 if (computeRate ) {
360 double rate = updateNumber.doubleValue() * 60.0 / period;
361 metricMap.put(metricName+"_rate", rate);
362 }
363 computeRate = true;
364 }
365 }
366 }
367 }
368
369 private synchronized RecordMap getRecordMap(String recordName) {
370 return bufferedData.get(recordName);
371 }
372
373
374
375
376
377 private Number sum(Number a, Number b) {
378 if (a instanceof Integer) {
379 return Integer.valueOf(a.intValue() + b.intValue());
380 }
381 else if (a instanceof Float) {
382 return new Float(a.floatValue() + b.floatValue());
383 }
384 else if (a instanceof Short) {
385 return Short.valueOf((short)(a.shortValue() + b.shortValue()));
386 }
387 else if (a instanceof Byte) {
388 return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
389 }
390 else if (a instanceof Long) {
391 return Long.valueOf((a.longValue() + b.longValue()));
392 }
393 else {
394
395 throw new MetricsException("Invalid number type");
396 }
397
398 }
399
400
401
402
403
404
405
406 protected void remove(MetricsRecordImpl record) {
407 String recordName = record.getRecordName();
408 TagMap tagTable = record.getTagTable();
409
410 RecordMap recordMap = getRecordMap(recordName);
411 synchronized (recordMap) {
412 Iterator<TagMap> it = recordMap.keySet().iterator();
413 while (it.hasNext()) {
414 TagMap rowTags = it.next();
415 if (rowTags.containsAll(tagTable)) {
416 it.remove();
417 }
418 }
419 }
420 }
421
422
423
424
425 public int getPeriod() {
426 return period;
427 }
428
429
430
431
432 protected void setPeriod(int period) {
433 this.period = period;
434 }
435 }