1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc;
20
21 import com.sun.management.GarbageCollectionNotificationInfo;
22 import com.yammer.metrics.core.Gauge;
23 import org.apache.giraph.bsp.BspService;
24 import org.apache.giraph.bsp.CentralizedServiceWorker;
25 import org.apache.giraph.comm.NetworkMetrics;
26 import org.apache.giraph.comm.ServerData;
27 import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
28 import org.apache.giraph.comm.flow_control.FlowControl;
29 import org.apache.giraph.conf.GiraphConstants;
30 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31 import org.apache.giraph.metrics.GiraphMetrics;
32 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
33 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
34 import org.apache.giraph.ooc.data.MetaPartitionManager;
35 import org.apache.giraph.ooc.command.IOCommand;
36 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
37 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
38 import org.apache.giraph.ooc.policy.FixedPartitionsOracle;
39 import org.apache.giraph.ooc.policy.OutOfCoreOracle;
40 import org.apache.giraph.utils.AdjustableSemaphore;
41 import org.apache.giraph.worker.BspServiceWorker;
42 import org.apache.log4j.Logger;
43
44 import java.lang.reflect.Constructor;
45 import java.lang.reflect.InvocationTargetException;
46 import java.util.concurrent.locks.ReadWriteLock;
47 import java.util.concurrent.locks.ReentrantReadWriteLock;
48
49 import static com.google.common.base.Preconditions.checkState;
50
51
52
53
54 public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
55
56
57
58
59
60
61
62 public static final int CHECK_IN_INTERVAL = (1 << 10) - 1;
63
64 public static final String GRAPH_PERCENTAGE_IN_MEMORY = "ooc-graph-in-mem-%";
65
66 private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class);
67
68
69
70
71 private static final long MSEC_TO_WAIT = 10000;
72
73 private final CentralizedServiceWorker<?, ?, ?> service;
74
75 private FlowControl flowControl;
76
77 private final OutOfCoreIOScheduler ioScheduler;
78
79 private final MetaPartitionManager metaPartitionManager;
80
81 private final OutOfCoreOracle oracle;
82
83 private final OutOfCoreIOStatistics statistics;
84
85
86
87
88
89 private final ReadWriteLock superstepLock = new ReentrantReadWriteLock();
90
91 private final OutOfCoreDataAccessor dataAccessor;
92
93 private final OutOfCoreIOCallableFactory oocIOCallableFactory;
94
95
96
97
98 private final Object partitionAvailable = new Object();
99
100 private int numComputeThreads;
101
102 private volatile int numProcessingThreads;
103
104 private final AdjustableSemaphore activeThreadsPermit;
105
106
107
108
109 private final short maxRequestsCredit;
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 private long superstep;
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 private boolean resetDone;
149
150
151
152
153
154 private final NetworkMetrics networkMetrics;
155
156
157
158
159
160
161
162
163 public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
164 CentralizedServiceWorker<?, ?, ?> service,
165 NetworkMetrics networkMetrics) {
166 this.service = service;
167 this.networkMetrics = networkMetrics;
168 Class<? extends OutOfCoreDataAccessor> accessorClass =
169 GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf);
170 try {
171 Constructor<?> constructor = accessorClass.getConstructor(
172 ImmutableClassesGiraphConfiguration.class);
173 this.dataAccessor = (OutOfCoreDataAccessor) constructor.newInstance(conf);
174 } catch (NoSuchMethodException | InstantiationException |
175 InvocationTargetException | IllegalAccessException e) {
176 throw new IllegalStateException("OutOfCoreEngine: caught exception " +
177 "while creating the data accessor instance!", e);
178 }
179 int numIOThreads = dataAccessor.getNumAccessorThreads();
180 this.oocIOCallableFactory =
181 new OutOfCoreIOCallableFactory(this, numIOThreads,
182 service.getGraphTaskManager().createUncaughtExceptionHandler());
183 this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads);
184 this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
185 this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads);
186 int maxPartitionsInMemory =
187 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
188 Class<? extends OutOfCoreOracle> oracleClass =
189 GiraphConstants.OUT_OF_CORE_ORACLE.get(conf);
190 if (maxPartitionsInMemory != 0 &&
191 oracleClass != FixedPartitionsOracle.class) {
192 LOG.warn("OutOfCoreEngine: Max number of partitions in memory is set " +
193 "but the out-of-core oracle used is not tailored for fixed " +
194 "out-of-core policy. Setting the oracle to be FixedPartitionsOracle");
195 oracleClass = FixedPartitionsOracle.class;
196 }
197 this.numComputeThreads = conf.getNumComputeThreads();
198
199 this.numProcessingThreads = conf.getNumInputSplitsThreads();
200 this.activeThreadsPermit = new AdjustableSemaphore(numProcessingThreads);
201 this.maxRequestsCredit = (short)
202 CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
203 this.superstep = BspService.INPUT_SUPERSTEP;
204 this.resetDone = false;
205 GiraphMetrics.get().addSuperstepResetObserver(this);
206 try {
207 Constructor<?> constructor = oracleClass.getConstructor(
208 ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
209 this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
210 } catch (NoSuchMethodException | IllegalAccessException |
211 InstantiationException | InvocationTargetException e) {
212 throw new IllegalStateException("OutOfCoreEngine: caught exception " +
213 "while creating the oracle!", e);
214 }
215 }
216
217
218
219
220 public void initialize() {
221 dataAccessor.initialize();
222 oocIOCallableFactory.createCallable();
223 }
224
225
226
227
228 public void shutdown() {
229 if (LOG.isInfoEnabled()) {
230 LOG.info("shutdown: out-of-core engine shutting down, signalling IO " +
231 "threads to shutdown");
232 }
233 ioScheduler.shutdown();
234 oocIOCallableFactory.shutdown();
235 dataAccessor.shutdown();
236 }
237
238
239
240
241
242
243 public ServerData getServerData() {
244 return service.getServerData();
245 }
246
247
248
249
250
251
252 public CentralizedServiceWorker getServiceWorker() {
253 return service;
254 }
255
256
257
258
259
260
261 public OutOfCoreIOScheduler getIOScheduler() {
262 return ioScheduler;
263 }
264
265
266
267
268
269
270 public MetaPartitionManager getMetaPartitionManager() {
271 return metaPartitionManager;
272 }
273
274
275
276
277
278
279 public ReadWriteLock getSuperstepLock() {
280 return superstepLock;
281 }
282
283
284
285
286
287
288 public OutOfCoreIOStatistics getIOStatistics() {
289 return statistics;
290 }
291
292
293
294
295
296
297 public OutOfCoreOracle getOracle() {
298 return oracle;
299 }
300
301
302
303
304
305
306
307
308
309 public Integer getNextPartition() {
310 Integer partitionId;
311 synchronized (partitionAvailable) {
312 while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
313 try {
314 if (LOG.isInfoEnabled()) {
315 LOG.info("getNextPartition: waiting until a partition becomes " +
316 "available!");
317 }
318 partitionAvailable.wait(MSEC_TO_WAIT);
319 } catch (InterruptedException e) {
320 throw new IllegalStateException("getNextPartition: caught " +
321 "InterruptedException while waiting to retrieve a partition to " +
322 "process");
323 }
324 }
325 if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
326 partitionAvailable.notifyAll();
327 partitionId = null;
328 }
329 }
330 return partitionId;
331 }
332
333
334
335
336
337
338 public void doneProcessingPartition(int partitionId) {
339 metaPartitionManager.setPartitionIsProcessed(partitionId);
340 if (LOG.isInfoEnabled()) {
341 LOG.info("doneProcessingPartition: processing partition " + partitionId +
342 " is done!");
343 }
344 }
345
346
347
348
349
350 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
351 "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
352 public void startIteration() {
353 oracle.startIteration();
354 if (!resetDone) {
355 superstepLock.writeLock().lock();
356 metaPartitionManager.resetPartitions();
357 superstepLock.writeLock().unlock();
358 }
359 if (superstep != BspServiceWorker.INPUT_SUPERSTEP &&
360 numProcessingThreads != numComputeThreads) {
361
362
363
364
365
366 activeThreadsPermit.setMaxPermits(activeThreadsPermit.availablePermits() *
367 numComputeThreads / numProcessingThreads);
368 numProcessingThreads = numComputeThreads;
369 }
370 if (LOG.isInfoEnabled()) {
371 LOG.info("startIteration: with " +
372 metaPartitionManager.getNumInMemoryPartitions() +
373 " partitions in memory and " +
374 activeThreadsPermit.availablePermits() + " active threads");
375 }
376 resetDone = false;
377 }
378
379
380
381
382
383
384
385 public void retrievePartition(int partitionId) {
386 if (metaPartitionManager.isPartitionOnDisk(partitionId)) {
387 ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId,
388 superstep));
389 synchronized (partitionAvailable) {
390 while (metaPartitionManager.isPartitionOnDisk(partitionId)) {
391 try {
392 if (LOG.isInfoEnabled()) {
393 LOG.info("retrievePartition: waiting until partition " +
394 partitionId + " becomes available");
395 }
396 partitionAvailable.wait();
397 } catch (InterruptedException e) {
398 throw new IllegalStateException("retrievePartition: caught " +
399 "InterruptedException while waiting to retrieve partition " +
400 partitionId);
401 }
402 }
403 }
404 }
405 }
406
407
408
409
410
411
412 public void ioCommandCompleted(IOCommand command) {
413 oracle.commandCompleted(command);
414 if (command instanceof LoadPartitionIOCommand) {
415
416
417 synchronized (partitionAvailable) {
418 partitionAvailable.notifyAll();
419 }
420 }
421 }
422
423
424
425
426
427
428
429
430
431 public void updateActiveThreadsFraction(double fraction) {
432 checkState(fraction >= 0 && fraction <= 1);
433 int numActiveThreads = (int) (numProcessingThreads * fraction);
434 if (LOG.isInfoEnabled()) {
435 LOG.info("updateActiveThreadsFraction: updating the number of active " +
436 "threads to " + numActiveThreads);
437 }
438 activeThreadsPermit.setMaxPermits(numActiveThreads);
439 }
440
441
442
443
444
445
446
447
448 public void activeThreadCheckIn() {
449 activeThreadsPermit.release();
450 try {
451 activeThreadsPermit.acquire();
452 } catch (InterruptedException e) {
453 LOG.error("activeThreadCheckIn: exception while acquiring a permit to " +
454 "remain an active thread");
455 throw new IllegalStateException(e);
456 }
457 }
458
459
460
461
462
463 public void processingThreadStart() {
464 try {
465 activeThreadsPermit.acquire();
466 } catch (InterruptedException e) {
467 LOG.error("processingThreadStart: exception while acquiring a permit to" +
468 " start the processing thread!");
469 throw new IllegalStateException(e);
470 }
471 }
472
473
474
475
476
477 public void processingThreadFinish() {
478 activeThreadsPermit.release();
479 }
480
481
482
483
484
485
486
487
488
489
490 public void updateRequestsCreditFraction(double fraction) {
491 checkState(fraction >= 0 && fraction <= 1);
492 short newCredit = (short) (maxRequestsCredit * fraction);
493 if (LOG.isInfoEnabled()) {
494 LOG.info("updateRequestsCreditFraction: updating the credit to " +
495 newCredit);
496 }
497 if (flowControl != null) {
498 ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
499 }
500 }
501
502
503
504
505
506 public void reset() {
507 metaPartitionManager.resetPartitions();
508 metaPartitionManager.resetMessages();
509 superstep = service.getSuperstep();
510 resetDone = true;
511 }
512
513
514
515
516 public long getSuperstep() {
517 return superstep;
518 }
519
520
521
522
523
524
525 public void gcCompleted(GarbageCollectionNotificationInfo info) {
526 oracle.gcCompleted(info);
527 }
528
529 @Override
530 public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
531 superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
532 @Override
533 public Double value() {
534 return metaPartitionManager.getGraphFractionInMemory() * 100;
535 }
536 });
537 }
538
539 public FlowControl getFlowControl() {
540 return flowControl;
541 }
542
543 public void setFlowControl(FlowControl flowControl) {
544 this.flowControl = flowControl;
545 }
546
547 public OutOfCoreDataAccessor getDataAccessor() {
548 return dataAccessor;
549 }
550
551 public NetworkMetrics getNetworkMetrics() {
552 return networkMetrics;
553 }
554 }