1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.metrics;
20
21 import org.apache.giraph.graph.GraphTaskManager;
22 import org.apache.giraph.ooc.OutOfCoreEngine;
23 import org.apache.giraph.ooc.OutOfCoreIOCallable;
24 import org.apache.giraph.worker.BspServiceWorker;
25
26 import com.google.common.collect.Maps;
27
28 import java.io.PrintStream;
29 import java.util.Map;
30
31
32
33
34 public class AggregatedMetrics {
35
36 private Map<String, AggregatedMetric<?>> metrics = Maps.newHashMap();
37
38
39
40
41
42
43
44
45
46 public AggregatedMetrics add(String name, long value,
47 String hostnamePartitionId) {
48 AggregatedMetricLong aggregatedMetric =
49 (AggregatedMetricLong) metrics.get(name);
50 if (aggregatedMetric == null) {
51 aggregatedMetric = new AggregatedMetricLong();
52 metrics.put(name, aggregatedMetric);
53 }
54 aggregatedMetric.addItem(value, hostnamePartitionId);
55 return this;
56 }
57
58
59
60
61
62
63
64
65
66 public AggregatedMetrics add(String name, double value,
67 String hostnamePartitionId) {
68 AggregatedMetricDouble aggregatedMetric =
69 (AggregatedMetricDouble) metrics.get(name);
70 if (aggregatedMetric == null) {
71 aggregatedMetric = new AggregatedMetricDouble();
72 metrics.put(name, aggregatedMetric);
73 }
74 aggregatedMetric.addItem(value, hostnamePartitionId);
75 return this;
76 }
77
78
79
80
81
82
83
84
85 public AggregatedMetrics add(WorkerSuperstepMetrics workerMetrics,
86 String hostname) {
87 add(GraphTaskManager.TIMER_SUPERSTEP_TIME,
88 workerMetrics.getSuperstepTimer(), hostname);
89 add(GraphTaskManager.TIMER_COMMUNICATION_TIME,
90 workerMetrics.getCommTimer(), hostname);
91 add(GraphTaskManager.TIMER_COMPUTE_ALL,
92 workerMetrics.getComputeAllTimer(), hostname);
93 add(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG,
94 workerMetrics.getTimeToFirstMsg(), hostname);
95 add(BspServiceWorker.TIMER_WAIT_REQUESTS,
96 workerMetrics.getWaitRequestsTimer(), hostname);
97 add(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK,
98 workerMetrics.getBytesLoadedFromDisk(), hostname);
99 add(OutOfCoreIOCallable.BYTES_STORE_TO_DISK,
100 workerMetrics.getBytesStoredOnDisk(), hostname);
101 add(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY,
102 workerMetrics.getGraphPercentageInMemory(), hostname);
103 return this;
104 }
105
106
107
108
109
110
111
112
113 public AggregatedMetrics print(long superstep, PrintStream out) {
114 AggregatedMetric superstepTime = get(GraphTaskManager.TIMER_SUPERSTEP_TIME);
115 AggregatedMetric commTime = get(GraphTaskManager.TIMER_COMMUNICATION_TIME);
116 AggregatedMetric computeAll = get(GraphTaskManager.TIMER_COMPUTE_ALL);
117 AggregatedMetric timeToFirstMsg =
118 get(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG);
119 AggregatedMetric waitRequestsMicros = get(
120 BspServiceWorker.TIMER_WAIT_REQUESTS);
121 AggregatedMetric bytesLoaded =
122 get(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK);
123 AggregatedMetric bytesStored =
124 get(OutOfCoreIOCallable.BYTES_STORE_TO_DISK);
125 AggregatedMetric graphInMem =
126 get(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
127
128 out.println();
129 out.println("--- METRICS: superstep " + superstep + " ---");
130 printAggregatedMetric(out, "superstep time", "ms", superstepTime);
131 printAggregatedMetric(out, "compute all partitions", "ms", computeAll);
132 printAggregatedMetric(out, "network communication time", "ms", commTime);
133 printAggregatedMetric(out, "time to first message", "us", timeToFirstMsg);
134 printAggregatedMetric(out, "wait requests time", "us", waitRequestsMicros);
135 printAggregatedMetric(out, "bytes loaded from disk", "bytes", bytesLoaded);
136 printAggregatedMetric(out, "bytes stored to disk", "bytes", bytesStored);
137 printAggregatedMetric(out, "graph in mem", "%", graphInMem);
138
139 return this;
140 }
141
142
143
144
145
146
147
148
149
150 private void printAggregatedMetric(PrintStream out, String header,
151 String unit,
152 AggregatedMetric aggregatedMetric) {
153 if (aggregatedMetric.hasData()) {
154 out.println(header);
155 out.println(" mean: " + aggregatedMetric.mean() + " " + unit);
156 printValueFromHost(out, " smallest: ", unit, aggregatedMetric.min());
157 printValueFromHost(out, " largest: ", unit, aggregatedMetric.max());
158 } else {
159 out.println(header + ": NO DATA");
160 }
161 }
162
163
164
165
166
167
168
169
170
171 private void printValueFromHost(PrintStream out, String prefix,
172 String unit, ValueWithHostname vh) {
173 out.println(prefix + vh.getValue() + ' ' + unit +
174 " from " + vh.getHostname());
175 }
176
177
178
179
180
181
182
183 public AggregatedMetric get(String name) {
184 return metrics.get(name);
185 }
186
187
188
189
190
191
192 public Map<String, AggregatedMetric<?>> getAll() {
193 return metrics;
194 }
195 }