1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc;
20
21 import org.apache.giraph.utils.CallableFactory;
22 import org.apache.giraph.utils.ThreadUtils;
23 import org.apache.log4j.Logger;
24
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34
35
36
37
38 public class OutOfCoreIOCallableFactory {
39
40 private static final Logger LOG =
41 Logger.getLogger(OutOfCoreIOCallableFactory.class);
42
43 private final OutOfCoreEngine oocEngine;
44
45 private final List<Future> results;
46
47 private final int numIOThreads;
48
49 private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
50
51 private ExecutorService outOfCoreIOExecutor;
52
53
54
55
56
57
58
59 public OutOfCoreIOCallableFactory(OutOfCoreEngine oocEngine,
60 int numIOThreads,
61 Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
62 this.oocEngine = oocEngine;
63 this.numIOThreads = numIOThreads;
64 this.results = new ArrayList<>(numIOThreads);
65 this.uncaughtExceptionHandler = uncaughtExceptionHandler;
66 }
67
68
69
70
71 public void createCallable() {
72 CallableFactory<Void> outOfCoreIOCallableFactory =
73 new CallableFactory<Void>() {
74 @Override
75 public Callable<Void> newCallable(int callableId) {
76 return new OutOfCoreIOCallable(oocEngine, callableId);
77 }
78 };
79 outOfCoreIOExecutor = new ThreadPoolExecutor(numIOThreads, numIOThreads, 0L,
80 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
81 ThreadUtils.createThreadFactory("ooc-io-%d"));
82
83 for (int i = 0; i < numIOThreads; ++i) {
84 Future<Void> future = ThreadUtils.submitToExecutor(outOfCoreIOExecutor,
85 outOfCoreIOCallableFactory.newCallable(i), uncaughtExceptionHandler);
86 results.add(future);
87 }
88
89 outOfCoreIOExecutor.shutdown();
90 }
91
92
93
94
95 public void shutdown() {
96 boolean threadsTerminated = false;
97 while (!threadsTerminated) {
98 if (LOG.isInfoEnabled()) {
99 LOG.info("shutdown: waiting for IO threads to finish!");
100 }
101 try {
102 threadsTerminated =
103 outOfCoreIOExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
104 } catch (InterruptedException e) {
105 throw new IllegalStateException("shutdown: caught " +
106 "InterruptedException while waiting for IO threads to finish");
107 }
108 }
109 for (int i = 0; i < numIOThreads; ++i) {
110 try {
111
112 results.get(i).get();
113 } catch (InterruptedException e) {
114 LOG.error("shutdown: IO thread " + i + " was interrupted during its " +
115 "execution");
116 throw new IllegalStateException(e);
117 } catch (ExecutionException e) {
118 LOG.error("shutdown: IO thread " + i + " threw an exception during " +
119 "its execution");
120 throw new IllegalStateException(e);
121 }
122 }
123 }
124 }