1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.policy;
20
21 import com.google.common.collect.Maps;
22 import com.sun.management.GarbageCollectionNotificationInfo;
23 import org.apache.giraph.conf.FloatConfOption;
24 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25 import org.apache.giraph.ooc.OutOfCoreEngine;
26 import org.apache.giraph.ooc.OutOfCoreIOStatistics;
27 import org.apache.giraph.ooc.command.IOCommand;
28 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
29 import org.apache.giraph.ooc.command.WaitIOCommand;
30 import org.apache.log4j.Logger;
31
32 import java.lang.management.MemoryUsage;
33 import java.util.Map;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicLong;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
61
62
63
64
65
66 public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
67 new FloatConfOption("giraph.optimalMemoryPressure", 0.8f,
68 "The memory pressure (fraction of used memory) at which the job " +
69 "shows the optimal GC behavior. This fraction may be dependent " +
70 "on the GC strategy used in running the job.");
71
72
73 private static final Logger LOG =
74 Logger.getLogger(SimpleGCMonitoringOracle.class);
75
76 private final float optimalMemoryPressure;
77
78 private final OutOfCoreEngine oocEngine;
79
80 private GCObservation lastGCObservation;
81
82 private final AtomicLong desiredDiskToMemoryDataRate =
83 new AtomicLong(0);
84
85 private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences =
86 Maps.newConcurrentMap();
87
88
89
90
91
92
93
94 public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf,
95 OutOfCoreEngine oocEngine) {
96 this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
97 this.oocEngine = oocEngine;
98 this.lastGCObservation = new GCObservation(-1, 0, 0);
99 for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
100 commandOccurrences.put(type, new AtomicInteger(0));
101 }
102 }
103
104 @Override
105 public synchronized void gcCompleted(GarbageCollectionNotificationInfo
106 gcInfo) {
107 long time = System.currentTimeMillis();
108 Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo()
109 .getMemoryUsageAfterGc();
110 long usedMemory = 0;
111 long maxMemory = 0;
112 for (MemoryUsage memDetail : memAfter.values()) {
113 usedMemory += memDetail.getUsed();
114 maxMemory += memDetail.getMax();
115 }
116 GCObservation observation = new GCObservation(time, usedMemory, maxMemory);
117 if (LOG.isInfoEnabled()) {
118 LOG.info("gcCompleted: GC completed with: " + observation);
119 }
120
121 if (lastGCObservation.isValid()) {
122 long deltaDataRate =
123 lastGCObservation.getDesiredDeltaDataRate(observation);
124 long diskBandwidthEstimate =
125 oocEngine.getIOStatistics().getDiskBandwidth();
126
127
128
129
130 long dataInjectionRate = desiredDiskToMemoryDataRate.get();
131 desiredDiskToMemoryDataRate.set(Math.max(
132 Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate,
133 diskBandwidthEstimate), -diskBandwidthEstimate));
134 if (LOG.isInfoEnabled()) {
135 LOG.info("gcCompleted: changing data injection rate from " +
136 String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) +
137 " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() /
138 1024.0 / 1024.0));
139 }
140 }
141 lastGCObservation = observation;
142 }
143
144 @Override
145 public void startIteration() {
146 }
147
148
149
150
151
152
153
154
155 private long getCurrentDataInjectionRate() {
156 long effectiveBytesTransferred = 0;
157 long effectiveDuration = 0;
158 for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
159 OutOfCoreIOStatistics.BytesDuration stats =
160 oocEngine.getIOStatistics().getCommandTypeStats(type);
161 int occurrence = commandOccurrences.get(type).get();
162 long typeBytesTransferred = stats.getBytes();
163 long typeDuration = stats.getDuration();
164
165
166
167
168
169
170
171 if (stats.getOccurrence() != 0) {
172 typeBytesTransferred += stats.getBytes() / stats.getOccurrence() *
173 occurrence;
174 typeDuration += stats.getDuration() / stats.getOccurrence() *
175 occurrence;
176 }
177 if (type == IOCommand.IOCommandType.LOAD_PARTITION) {
178 effectiveBytesTransferred += typeBytesTransferred;
179 } else {
180
181 effectiveBytesTransferred -= typeBytesTransferred;
182 }
183 effectiveDuration += typeDuration;
184 }
185 if (effectiveDuration == 0) {
186 return 0;
187 } else {
188 return effectiveBytesTransferred / effectiveDuration;
189 }
190 }
191
192 @Override
193 public IOAction[] getNextIOActions() {
194 long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
195 long desiredRate = desiredDiskToMemoryDataRate.get();
196 long currentRate = getCurrentDataInjectionRate();
197 if (desiredRate > error) {
198
199 if (currentRate > desiredRate + error) {
200
201
202 return new IOAction[]{
203 IOAction.STORE_MESSAGES_AND_BUFFERS,
204 IOAction.STORE_PROCESSED_PARTITION};
205 } else if (currentRate < desiredRate - error) {
206
207 return new IOAction[]{IOAction.LOAD_PARTITION};
208 } else {
209
210
211
212 return new IOAction[]{
213 IOAction.STORE_MESSAGES_AND_BUFFERS,
214 IOAction.STORE_PROCESSED_PARTITION,
215 IOAction.LOAD_PARTITION};
216 }
217 } else if (desiredRate < -error) {
218
219 if (currentRate < desiredRate - error) {
220
221
222 return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
223 } else if (currentRate > desiredRate + error) {
224
225 return new IOAction[]{
226 IOAction.STORE_MESSAGES_AND_BUFFERS,
227 IOAction.STORE_PARTITION};
228 } else {
229
230
231 return new IOAction[]{
232 IOAction.STORE_MESSAGES_AND_BUFFERS,
233 IOAction.STORE_PROCESSED_PARTITION,
234 IOAction.LOAD_UNPROCESSED_PARTITION};
235 }
236 } else {
237
238
239
240 if (currentRate > desiredRate + error) {
241 return new IOAction[]{
242 IOAction.STORE_MESSAGES_AND_BUFFERS,
243 IOAction.STORE_PROCESSED_PARTITION};
244 } else if (currentRate < desiredRate - error) {
245 return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
246 } else {
247 return new IOAction[]{
248 IOAction.STORE_MESSAGES_AND_BUFFERS,
249 IOAction.STORE_PROCESSED_PARTITION,
250 IOAction.LOAD_UNPROCESSED_PARTITION};
251 }
252 }
253 }
254
255 @Override
256 public synchronized boolean approve(IOCommand command) {
257 long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
258 long desiredRate = desiredDiskToMemoryDataRate.get();
259 long currentRate = getCurrentDataInjectionRate();
260
261
262
263 if (currentRate > desiredRate + error &&
264 command instanceof LoadPartitionIOCommand) {
265 return false;
266 }
267 if (currentRate < desiredRate - error &&
268 !(command instanceof LoadPartitionIOCommand) &&
269 !(command instanceof WaitIOCommand)) {
270 return false;
271 }
272 commandOccurrences.get(command.getType()).getAndIncrement();
273 return true;
274 }
275
276 @Override
277 public void commandCompleted(IOCommand command) {
278 commandOccurrences.get(command.getType()).getAndDecrement();
279 }
280
281
282 private class GCObservation {
283
284 private long time;
285
286 private long usedMemory;
287
288 private long maxMemory;
289
290
291
292
293
294
295
296
297 public GCObservation(long time, long usedMemory, long maxMemory) {
298 this.time = time;
299 this.usedMemory = usedMemory;
300 this.maxMemory = maxMemory;
301 }
302
303
304
305
306
307
308 public boolean isValid() {
309 return time > 0;
310 }
311
312
313
314
315
316
317
318
319 public long getDesiredDeltaDataRate(GCObservation newObservation) {
320 long newUsedMemory = newObservation.usedMemory;
321 long newMaxMemory = newObservation.maxMemory;
322 long lastUsedMemory = usedMemory;
323 long lastMaxMemory = maxMemory;
324
325 long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory);
326 newUsedMemory =
327 (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory);
328 lastUsedMemory =
329 (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory);
330 long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory);
331 if (LOG.isInfoEnabled()) {
332 LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " +
333 "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format(
334 "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) +
335 String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 /
336 1024.0));
337 }
338 long interval = newObservation.time - time;
339 if (interval == 0) {
340 interval = 1;
341 LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " +
342 "time!");
343 }
344 long currentDataRate = (long) ((double) (newUsedMemory -
345 lastUsedMemory) / interval * 1000);
346 long desiredDataRate = (long) ((double) (desiredUsedMemory -
347 newUsedMemory) / interval * 1000);
348 return currentDataRate - desiredDataRate;
349 }
350
351 @Override
352 public String toString() {
353 return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " +
354 "time: %d ms)", usedMemory / 1024.0 / 1024.0,
355 maxMemory / 1024.0 / 1024.0, time);
356 }
357 }
358 }