1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc;
20
21 import com.google.common.collect.Maps;
22 import org.apache.giraph.conf.GiraphConstants;
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.conf.IntConfOption;
25 import org.apache.giraph.ooc.command.IOCommand.IOCommandType;
26 import org.apache.log4j.Logger;
27
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.concurrent.ArrayBlockingQueue;
31 import java.util.concurrent.atomic.AtomicLong;
32
33
34
35
36
37 public class OutOfCoreIOStatistics {
38
39
40
41
42
43 public static final IntConfOption DISK_BANDWIDTH_ESTIMATE =
44 new IntConfOption("giraph.diskBandwidthEstimate", 125,
45 "An estimate of disk bandwidth (MB/s). This number is used just at " +
46 "the beginning of the computation, and it will be " +
47 "updated/replaced once a few disk operations happen.");
48
49
50
51
52 public static final IntConfOption IO_COMMAND_HISTORY_SIZE =
53 new IntConfOption("giraph.ioCommandHistorySize", 50,
54 "Number of most recent IO operations to consider for reporting the" +
55 "statistics.");
56
57
58
59
60 public static final IntConfOption STATS_PRINT_FREQUENCY =
61 new IntConfOption("giraph.oocStatPrintFrequency", 200,
62 "Number of updates before stats are printed.");
63
64
65 private static final Logger LOG =
66 Logger.getLogger(OutOfCoreIOStatistics.class);
67
68 private final AtomicLong diskBandwidthEstimate;
69
70 private final int maxHistorySize;
71
72
73
74
75
76
77
78 private final double updateCoefficient;
79
80 private final Queue<StatisticsEntry> commandHistory;
81
82
83
84
85
86 private final Map<IOCommandType, StatisticsEntry> aggregateStats;
87
88 private int numUpdates = 0;
89
90 private int statsPrintFrequency = 0;
91
92
93
94
95
96
97
98 public OutOfCoreIOStatistics(ImmutableClassesGiraphConfiguration conf,
99 int numIOThreads) {
100 this.diskBandwidthEstimate =
101 new AtomicLong(DISK_BANDWIDTH_ESTIMATE.get(conf) *
102 (long) GiraphConstants.ONE_MB);
103 this.maxHistorySize = IO_COMMAND_HISTORY_SIZE.get(conf);
104 this.updateCoefficient = 1.0 / maxHistorySize;
105
106
107 this.commandHistory =
108 new ArrayBlockingQueue<>(maxHistorySize + numIOThreads);
109 this.aggregateStats = Maps.newConcurrentMap();
110 for (IOCommandType type : IOCommandType.values()) {
111 aggregateStats.put(type, new StatisticsEntry(type, 0, 0, 0));
112 }
113 this.statsPrintFrequency = STATS_PRINT_FREQUENCY.get(conf);
114 }
115
116
117
118
119
120
121
122
123
124 public void update(IOCommandType type, long bytesTransferred,
125 long duration) {
126 StatisticsEntry entry = aggregateStats.get(type);
127 synchronized (entry) {
128 entry.setOccurrence(entry.getOccurrence() + 1);
129 entry.setDuration(duration + entry.getDuration());
130 entry.setBytesTransferred(bytesTransferred + entry.getBytesTransferred());
131 }
132 commandHistory.offer(
133 new StatisticsEntry(type, bytesTransferred, duration, 0));
134 if (type != IOCommandType.WAIT) {
135
136
137
138
139
140
141
142 diskBandwidthEstimate.set((long)
143 (updateCoefficient * (bytesTransferred / duration * 1000) +
144 (1 - updateCoefficient) * diskBandwidthEstimate.get()));
145 }
146 if (commandHistory.size() > maxHistorySize) {
147 StatisticsEntry removedEntry = commandHistory.poll();
148 entry = aggregateStats.get(removedEntry.getType());
149 synchronized (entry) {
150 entry.setOccurrence(entry.getOccurrence() - 1);
151 entry.setDuration(entry.getDuration() - removedEntry.getDuration());
152 entry.setBytesTransferred(
153 entry.getBytesTransferred() - removedEntry.getBytesTransferred());
154 }
155 }
156 numUpdates++;
157
158 if (numUpdates % statsPrintFrequency == 0) {
159 if (LOG.isInfoEnabled()) {
160 LOG.info(this);
161 }
162 }
163 }
164
165 @Override
166 public String toString() {
167 StringBuffer sb = new StringBuffer();
168 long waitTime = 0;
169 long loadTime = 0;
170 long bytesRead = 0;
171 long storeTime = 0;
172 long bytesWritten = 0;
173 for (Map.Entry<IOCommandType, StatisticsEntry> entry :
174 aggregateStats.entrySet()) {
175 synchronized (entry.getValue()) {
176 sb.append(entry.getKey() + ": " + entry.getValue() + ", ");
177 if (entry.getKey() == IOCommandType.WAIT) {
178 waitTime += entry.getValue().getDuration();
179 } else if (entry.getKey() == IOCommandType.LOAD_PARTITION) {
180 loadTime += entry.getValue().getDuration();
181 bytesRead += entry.getValue().getBytesTransferred();
182 } else {
183 storeTime += entry.getValue().getDuration();
184 bytesWritten += entry.getValue().getBytesTransferred();
185 }
186 }
187 }
188 sb.append(String.format("Average STORE: %.2f MB/s, ",
189 (double) bytesWritten / storeTime * 1000 / 1024 / 1024));
190 sb.append(String.format("DATA_INJECTION: %.2f MB/s, ",
191 (double) (bytesRead - bytesWritten) /
192 (waitTime + loadTime + storeTime) * 1000 / 1024 / 1024));
193 sb.append(String.format("DISK_BANDWIDTH: %.2f MB/s",
194 (double) diskBandwidthEstimate.get() / 1024 / 1024));
195
196 return sb.toString();
197 }
198
199
200
201
202 public long getDiskBandwidth() {
203 return diskBandwidthEstimate.get();
204 }
205
206
207
208
209
210
211
212 public BytesDuration getCommandTypeStats(IOCommandType type) {
213 StatisticsEntry entry = aggregateStats.get(type);
214 synchronized (entry) {
215 return new BytesDuration(entry.getBytesTransferred(), entry.getDuration(),
216 entry.getOccurrence());
217 }
218 }
219
220
221
222
223
224 public static class BytesDuration {
225
226 private long bytes;
227
228 private long duration;
229
230 private int occurrence;
231
232
233
234
235
236
237
238 BytesDuration(long bytes, long duration, int occurrence) {
239 this.bytes = bytes;
240 this.duration = duration;
241 this.occurrence = occurrence;
242 }
243
244
245
246
247 public long getBytes() {
248 return bytes;
249 }
250
251
252
253
254 public long getDuration() {
255 return duration;
256 }
257
258
259
260
261 public int getOccurrence() {
262 return occurrence;
263 }
264 }
265
266
267
268
269 private static class StatisticsEntry {
270
271 private IOCommandType type;
272
273
274
275
276 private long bytesTransferred;
277
278
279
280
281 private long duration;
282
283
284
285
286 private int occurrence;
287
288
289
290
291
292
293
294
295
296 public StatisticsEntry(IOCommandType type, long bytesTransferred,
297 long duration, int occurrence) {
298 this.type = type;
299 this.bytesTransferred = bytesTransferred;
300 this.duration = duration;
301 this.occurrence = occurrence;
302 }
303
304
305
306
307 public IOCommandType getType() {
308 return type;
309 }
310
311
312
313
314
315 public long getBytesTransferred() {
316 return bytesTransferred;
317 }
318
319
320
321
322
323
324 public void setBytesTransferred(long bytesTransferred) {
325 this.bytesTransferred = bytesTransferred;
326 }
327
328
329
330
331 public long getDuration() {
332 return duration;
333 }
334
335
336
337
338
339
340 public void setDuration(long duration) {
341 this.duration = duration;
342 }
343
344
345
346
347 public int getOccurrence() {
348 return occurrence;
349 }
350
351
352
353
354
355
356 public void setOccurrence(int occurrence) {
357 this.occurrence = occurrence;
358 }
359
360 @Override
361 public String toString() {
362 if (type == IOCommandType.WAIT) {
363 return String.format("%.2f sec", duration / 1000.0);
364 } else {
365 return String.format("%.2f MB/s",
366 (double) bytesTransferred / duration * 1000 / 1024 / 2014);
367 }
368 }
369 }
370 }