1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.ooc; |
20 | |
|
21 | |
import com.sun.management.GarbageCollectionNotificationInfo; |
22 | |
import com.yammer.metrics.core.Gauge; |
23 | |
import org.apache.giraph.bsp.BspService; |
24 | |
import org.apache.giraph.bsp.CentralizedServiceWorker; |
25 | |
import org.apache.giraph.comm.NetworkMetrics; |
26 | |
import org.apache.giraph.comm.ServerData; |
27 | |
import org.apache.giraph.comm.flow_control.CreditBasedFlowControl; |
28 | |
import org.apache.giraph.comm.flow_control.FlowControl; |
29 | |
import org.apache.giraph.conf.GiraphConstants; |
30 | |
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
31 | |
import org.apache.giraph.metrics.GiraphMetrics; |
32 | |
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; |
33 | |
import org.apache.giraph.metrics.SuperstepMetricsRegistry; |
34 | |
import org.apache.giraph.ooc.data.MetaPartitionManager; |
35 | |
import org.apache.giraph.ooc.command.IOCommand; |
36 | |
import org.apache.giraph.ooc.command.LoadPartitionIOCommand; |
37 | |
import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; |
38 | |
import org.apache.giraph.ooc.policy.FixedPartitionsOracle; |
39 | |
import org.apache.giraph.ooc.policy.OutOfCoreOracle; |
40 | |
import org.apache.giraph.utils.AdjustableSemaphore; |
41 | |
import org.apache.giraph.worker.BspServiceWorker; |
42 | |
import org.apache.log4j.Logger; |
43 | |
|
44 | |
import java.lang.reflect.Constructor; |
45 | |
import java.lang.reflect.InvocationTargetException; |
46 | |
import java.util.concurrent.locks.ReadWriteLock; |
47 | |
import java.util.concurrent.locks.ReentrantReadWriteLock; |
48 | |
|
49 | |
import static com.google.common.base.Preconditions.checkState; |
50 | |
|
51 | |
|
52 | |
|
53 | |
|
54 | 0 | public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { |
55 | |
|
56 | |
|
57 | |
|
58 | |
|
59 | |
|
60 | |
|
61 | |
|
62 | |
public static final int CHECK_IN_INTERVAL = (1 << 10) - 1; |
63 | |
|
64 | |
public static final String GRAPH_PERCENTAGE_IN_MEMORY = "ooc-graph-in-mem-%"; |
65 | |
|
66 | 0 | private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class); |
67 | |
|
68 | |
|
69 | |
|
70 | |
|
71 | |
private static final long MSEC_TO_WAIT = 10000; |
72 | |
|
73 | |
private final CentralizedServiceWorker<?, ?, ?> service; |
74 | |
|
75 | |
private FlowControl flowControl; |
76 | |
|
77 | |
private final OutOfCoreIOScheduler ioScheduler; |
78 | |
|
79 | |
private final MetaPartitionManager metaPartitionManager; |
80 | |
|
81 | |
private final OutOfCoreOracle oracle; |
82 | |
|
83 | |
private final OutOfCoreIOStatistics statistics; |
84 | |
|
85 | |
|
86 | |
|
87 | |
|
88 | |
|
89 | 0 | private final ReadWriteLock superstepLock = new ReentrantReadWriteLock(); |
90 | |
|
91 | |
private final OutOfCoreDataAccessor dataAccessor; |
92 | |
|
93 | |
private final OutOfCoreIOCallableFactory oocIOCallableFactory; |
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | 0 | private final Object partitionAvailable = new Object(); |
99 | |
|
100 | |
private int numComputeThreads; |
101 | |
|
102 | |
private volatile int numProcessingThreads; |
103 | |
|
104 | |
private final AdjustableSemaphore activeThreadsPermit; |
105 | |
|
106 | |
|
107 | |
|
108 | |
|
109 | |
private final short maxRequestsCredit; |
110 | |
|
111 | |
|
112 | |
|
113 | |
|
114 | |
|
115 | |
|
116 | |
|
117 | |
|
118 | |
|
119 | |
|
120 | |
|
121 | |
|
122 | |
|
123 | |
|
124 | |
|
125 | |
|
126 | |
|
127 | |
|
128 | |
|
129 | |
private long superstep; |
130 | |
|
131 | |
|
132 | |
|
133 | |
|
134 | |
|
135 | |
|
136 | |
|
137 | |
|
138 | |
|
139 | |
|
140 | |
|
141 | |
|
142 | |
|
143 | |
|
144 | |
|
145 | |
|
146 | |
|
147 | |
|
148 | |
private boolean resetDone; |
149 | |
|
150 | |
|
151 | |
|
152 | |
|
153 | |
|
154 | |
private final NetworkMetrics networkMetrics; |
155 | |
|
156 | |
|
157 | |
|
158 | |
|
159 | |
|
160 | |
|
161 | |
|
162 | |
|
163 | |
public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf, |
164 | |
CentralizedServiceWorker<?, ?, ?> service, |
165 | 0 | NetworkMetrics networkMetrics) { |
166 | 0 | this.service = service; |
167 | 0 | this.networkMetrics = networkMetrics; |
168 | 0 | Class<? extends OutOfCoreDataAccessor> accessorClass = |
169 | 0 | GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf); |
170 | |
try { |
171 | 0 | Constructor<?> constructor = accessorClass.getConstructor( |
172 | |
ImmutableClassesGiraphConfiguration.class); |
173 | 0 | this.dataAccessor = (OutOfCoreDataAccessor) constructor.newInstance(conf); |
174 | 0 | } catch (NoSuchMethodException | InstantiationException | |
175 | |
InvocationTargetException | IllegalAccessException e) { |
176 | 0 | throw new IllegalStateException("OutOfCoreEngine: caught exception " + |
177 | |
"while creating the data accessor instance!", e); |
178 | 0 | } |
179 | 0 | int numIOThreads = dataAccessor.getNumAccessorThreads(); |
180 | 0 | this.oocIOCallableFactory = |
181 | |
new OutOfCoreIOCallableFactory(this, numIOThreads, |
182 | 0 | service.getGraphTaskManager().createUncaughtExceptionHandler()); |
183 | 0 | this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads); |
184 | 0 | this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this); |
185 | 0 | this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads); |
186 | 0 | int maxPartitionsInMemory = |
187 | 0 | GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf); |
188 | 0 | Class<? extends OutOfCoreOracle> oracleClass = |
189 | 0 | GiraphConstants.OUT_OF_CORE_ORACLE.get(conf); |
190 | 0 | if (maxPartitionsInMemory != 0 && |
191 | |
oracleClass != FixedPartitionsOracle.class) { |
192 | 0 | LOG.warn("OutOfCoreEngine: Max number of partitions in memory is set " + |
193 | |
"but the out-of-core oracle used is not tailored for fixed " + |
194 | |
"out-of-core policy. Setting the oracle to be FixedPartitionsOracle"); |
195 | 0 | oracleClass = FixedPartitionsOracle.class; |
196 | |
} |
197 | 0 | this.numComputeThreads = conf.getNumComputeThreads(); |
198 | |
|
199 | 0 | this.numProcessingThreads = conf.getNumInputSplitsThreads(); |
200 | 0 | this.activeThreadsPermit = new AdjustableSemaphore(numProcessingThreads); |
201 | 0 | this.maxRequestsCredit = (short) |
202 | 0 | CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf); |
203 | 0 | this.superstep = BspService.INPUT_SUPERSTEP; |
204 | 0 | this.resetDone = false; |
205 | 0 | GiraphMetrics.get().addSuperstepResetObserver(this); |
206 | |
try { |
207 | 0 | Constructor<?> constructor = oracleClass.getConstructor( |
208 | |
ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class); |
209 | 0 | this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this); |
210 | 0 | } catch (NoSuchMethodException | IllegalAccessException | |
211 | |
InstantiationException | InvocationTargetException e) { |
212 | 0 | throw new IllegalStateException("OutOfCoreEngine: caught exception " + |
213 | |
"while creating the oracle!", e); |
214 | 0 | } |
215 | 0 | } |
216 | |
|
217 | |
|
218 | |
|
219 | |
|
220 | |
public void initialize() { |
221 | 0 | dataAccessor.initialize(); |
222 | 0 | oocIOCallableFactory.createCallable(); |
223 | 0 | } |
224 | |
|
225 | |
|
226 | |
|
227 | |
|
228 | |
public void shutdown() { |
229 | 0 | if (LOG.isInfoEnabled()) { |
230 | 0 | LOG.info("shutdown: out-of-core engine shutting down, signalling IO " + |
231 | |
"threads to shutdown"); |
232 | |
} |
233 | 0 | ioScheduler.shutdown(); |
234 | 0 | oocIOCallableFactory.shutdown(); |
235 | 0 | dataAccessor.shutdown(); |
236 | 0 | } |
237 | |
|
238 | |
|
239 | |
|
240 | |
|
241 | |
|
242 | |
|
243 | |
public ServerData getServerData() { |
244 | 0 | return service.getServerData(); |
245 | |
} |
246 | |
|
247 | |
|
248 | |
|
249 | |
|
250 | |
|
251 | |
|
252 | |
public CentralizedServiceWorker getServiceWorker() { |
253 | 0 | return service; |
254 | |
} |
255 | |
|
256 | |
|
257 | |
|
258 | |
|
259 | |
|
260 | |
|
261 | |
public OutOfCoreIOScheduler getIOScheduler() { |
262 | 0 | return ioScheduler; |
263 | |
} |
264 | |
|
265 | |
|
266 | |
|
267 | |
|
268 | |
|
269 | |
|
270 | |
public MetaPartitionManager getMetaPartitionManager() { |
271 | 0 | return metaPartitionManager; |
272 | |
} |
273 | |
|
274 | |
|
275 | |
|
276 | |
|
277 | |
|
278 | |
|
279 | |
public ReadWriteLock getSuperstepLock() { |
280 | 0 | return superstepLock; |
281 | |
} |
282 | |
|
283 | |
|
284 | |
|
285 | |
|
286 | |
|
287 | |
|
288 | |
public OutOfCoreIOStatistics getIOStatistics() { |
289 | 0 | return statistics; |
290 | |
} |
291 | |
|
292 | |
|
293 | |
|
294 | |
|
295 | |
|
296 | |
|
297 | |
public OutOfCoreOracle getOracle() { |
298 | 0 | return oracle; |
299 | |
} |
300 | |
|
301 | |
|
302 | |
|
303 | |
|
304 | |
|
305 | |
|
306 | |
|
307 | |
|
308 | |
|
309 | |
public Integer getNextPartition() { |
310 | |
Integer partitionId; |
311 | 0 | synchronized (partitionAvailable) { |
312 | 0 | while ((partitionId = metaPartitionManager.getNextPartition()) == null) { |
313 | |
try { |
314 | 0 | if (LOG.isInfoEnabled()) { |
315 | 0 | LOG.info("getNextPartition: waiting until a partition becomes " + |
316 | |
"available!"); |
317 | |
} |
318 | 0 | partitionAvailable.wait(MSEC_TO_WAIT); |
319 | 0 | } catch (InterruptedException e) { |
320 | 0 | throw new IllegalStateException("getNextPartition: caught " + |
321 | |
"InterruptedException while waiting to retrieve a partition to " + |
322 | |
"process"); |
323 | 0 | } |
324 | |
} |
325 | 0 | if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) { |
326 | 0 | partitionAvailable.notifyAll(); |
327 | 0 | partitionId = null; |
328 | |
} |
329 | 0 | } |
330 | 0 | return partitionId; |
331 | |
} |
332 | |
|
333 | |
|
334 | |
|
335 | |
|
336 | |
|
337 | |
|
338 | |
public void doneProcessingPartition(int partitionId) { |
339 | 0 | metaPartitionManager.setPartitionIsProcessed(partitionId); |
340 | 0 | if (LOG.isInfoEnabled()) { |
341 | 0 | LOG.info("doneProcessingPartition: processing partition " + partitionId + |
342 | |
" is done!"); |
343 | |
} |
344 | 0 | } |
345 | |
|
346 | |
|
347 | |
|
348 | |
|
349 | |
|
350 | |
@edu.umd.cs.findbugs.annotations.SuppressWarnings( |
351 | |
"UL_UNRELEASED_LOCK_EXCEPTION_PATH") |
352 | |
public void startIteration() { |
353 | 0 | oracle.startIteration(); |
354 | 0 | if (!resetDone) { |
355 | 0 | superstepLock.writeLock().lock(); |
356 | 0 | metaPartitionManager.resetPartitions(); |
357 | 0 | superstepLock.writeLock().unlock(); |
358 | |
} |
359 | 0 | if (superstep != BspServiceWorker.INPUT_SUPERSTEP && |
360 | |
numProcessingThreads != numComputeThreads) { |
361 | |
|
362 | |
|
363 | |
|
364 | |
|
365 | |
|
366 | 0 | activeThreadsPermit.setMaxPermits(activeThreadsPermit.availablePermits() * |
367 | |
numComputeThreads / numProcessingThreads); |
368 | 0 | numProcessingThreads = numComputeThreads; |
369 | |
} |
370 | 0 | if (LOG.isInfoEnabled()) { |
371 | 0 | LOG.info("startIteration: with " + |
372 | 0 | metaPartitionManager.getNumInMemoryPartitions() + |
373 | |
" partitions in memory and " + |
374 | 0 | activeThreadsPermit.availablePermits() + " active threads"); |
375 | |
} |
376 | 0 | resetDone = false; |
377 | 0 | } |
378 | |
|
379 | |
|
380 | |
|
381 | |
|
382 | |
|
383 | |
|
384 | |
|
385 | |
public void retrievePartition(int partitionId) { |
386 | 0 | if (metaPartitionManager.isPartitionOnDisk(partitionId)) { |
387 | 0 | ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId, |
388 | |
superstep)); |
389 | 0 | synchronized (partitionAvailable) { |
390 | 0 | while (metaPartitionManager.isPartitionOnDisk(partitionId)) { |
391 | |
try { |
392 | 0 | if (LOG.isInfoEnabled()) { |
393 | 0 | LOG.info("retrievePartition: waiting until partition " + |
394 | |
partitionId + " becomes available"); |
395 | |
} |
396 | 0 | partitionAvailable.wait(); |
397 | 0 | } catch (InterruptedException e) { |
398 | 0 | throw new IllegalStateException("retrievePartition: caught " + |
399 | |
"InterruptedException while waiting to retrieve partition " + |
400 | |
partitionId); |
401 | 0 | } |
402 | |
} |
403 | 0 | } |
404 | |
} |
405 | 0 | } |
406 | |
|
407 | |
|
408 | |
|
409 | |
|
410 | |
|
411 | |
|
412 | |
public void ioCommandCompleted(IOCommand command) { |
413 | 0 | oracle.commandCompleted(command); |
414 | 0 | if (command instanceof LoadPartitionIOCommand) { |
415 | |
|
416 | |
|
417 | 0 | synchronized (partitionAvailable) { |
418 | 0 | partitionAvailable.notifyAll(); |
419 | 0 | } |
420 | |
} |
421 | 0 | } |
422 | |
|
423 | |
|
424 | |
|
425 | |
|
426 | |
|
427 | |
|
428 | |
|
429 | |
|
430 | |
|
431 | |
public void updateActiveThreadsFraction(double fraction) { |
432 | 0 | checkState(fraction >= 0 && fraction <= 1); |
433 | 0 | int numActiveThreads = (int) (numProcessingThreads * fraction); |
434 | 0 | if (LOG.isInfoEnabled()) { |
435 | 0 | LOG.info("updateActiveThreadsFraction: updating the number of active " + |
436 | |
"threads to " + numActiveThreads); |
437 | |
} |
438 | 0 | activeThreadsPermit.setMaxPermits(numActiveThreads); |
439 | 0 | } |
440 | |
|
441 | |
|
442 | |
|
443 | |
|
444 | |
|
445 | |
|
446 | |
|
447 | |
|
448 | |
public void activeThreadCheckIn() { |
449 | 0 | activeThreadsPermit.release(); |
450 | |
try { |
451 | 0 | activeThreadsPermit.acquire(); |
452 | 0 | } catch (InterruptedException e) { |
453 | 0 | LOG.error("activeThreadCheckIn: exception while acquiring a permit to " + |
454 | |
"remain an active thread"); |
455 | 0 | throw new IllegalStateException(e); |
456 | 0 | } |
457 | 0 | } |
458 | |
|
459 | |
|
460 | |
|
461 | |
|
462 | |
|
463 | |
public void processingThreadStart() { |
464 | |
try { |
465 | 0 | activeThreadsPermit.acquire(); |
466 | 0 | } catch (InterruptedException e) { |
467 | 0 | LOG.error("processingThreadStart: exception while acquiring a permit to" + |
468 | |
" start the processing thread!"); |
469 | 0 | throw new IllegalStateException(e); |
470 | 0 | } |
471 | 0 | } |
472 | |
|
473 | |
|
474 | |
|
475 | |
|
476 | |
|
477 | |
public void processingThreadFinish() { |
478 | 0 | activeThreadsPermit.release(); |
479 | 0 | } |
480 | |
|
481 | |
|
482 | |
|
483 | |
|
484 | |
|
485 | |
|
486 | |
|
487 | |
|
488 | |
|
489 | |
|
490 | |
public void updateRequestsCreditFraction(double fraction) { |
491 | 0 | checkState(fraction >= 0 && fraction <= 1); |
492 | 0 | short newCredit = (short) (maxRequestsCredit * fraction); |
493 | 0 | if (LOG.isInfoEnabled()) { |
494 | 0 | LOG.info("updateRequestsCreditFraction: updating the credit to " + |
495 | |
newCredit); |
496 | |
} |
497 | 0 | if (flowControl != null) { |
498 | 0 | ((CreditBasedFlowControl) flowControl).updateCredit(newCredit); |
499 | |
} |
500 | 0 | } |
501 | |
|
502 | |
|
503 | |
|
504 | |
|
505 | |
|
506 | |
public void reset() { |
507 | 0 | metaPartitionManager.resetPartitions(); |
508 | 0 | metaPartitionManager.resetMessages(); |
509 | 0 | superstep = service.getSuperstep(); |
510 | 0 | resetDone = true; |
511 | 0 | } |
512 | |
|
513 | |
|
514 | |
|
515 | |
|
516 | |
public long getSuperstep() { |
517 | 0 | return superstep; |
518 | |
} |
519 | |
|
520 | |
|
521 | |
|
522 | |
|
523 | |
|
524 | |
|
525 | |
public void gcCompleted(GarbageCollectionNotificationInfo info) { |
526 | 0 | oracle.gcCompleted(info); |
527 | 0 | } |
528 | |
|
529 | |
@Override |
530 | |
public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) { |
531 | 0 | superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() { |
532 | |
@Override |
533 | |
public Double value() { |
534 | 0 | return metaPartitionManager.getGraphFractionInMemory() * 100; |
535 | |
} |
536 | |
}); |
537 | 0 | } |
538 | |
|
539 | |
public FlowControl getFlowControl() { |
540 | 0 | return flowControl; |
541 | |
} |
542 | |
|
543 | |
public void setFlowControl(FlowControl flowControl) { |
544 | 0 | this.flowControl = flowControl; |
545 | 0 | } |
546 | |
|
547 | |
public OutOfCoreDataAccessor getDataAccessor() { |
548 | 0 | return dataAccessor; |
549 | |
} |
550 | |
|
551 | |
public NetworkMetrics getNetworkMetrics() { |
552 | 0 | return networkMetrics; |
553 | |
} |
554 | |
} |