1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.block_app.framework.api.local; |
19 | |
|
20 | |
import java.io.IOException; |
21 | |
import java.util.List; |
22 | |
import java.util.concurrent.CountDownLatch; |
23 | |
import java.util.concurrent.ExecutorService; |
24 | |
import java.util.concurrent.Executors; |
25 | |
import java.util.concurrent.atomic.AtomicBoolean; |
26 | |
import java.util.concurrent.atomic.AtomicReference; |
27 | |
|
28 | |
import org.apache.giraph.block_app.framework.BlockFactory; |
29 | |
import org.apache.giraph.block_app.framework.BlockUtils; |
30 | |
import org.apache.giraph.block_app.framework.api.local.InternalApi.InternalWorkerApi; |
31 | |
import org.apache.giraph.block_app.framework.block.Block; |
32 | |
import org.apache.giraph.block_app.framework.internal.BlockMasterLogic; |
33 | |
import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic; |
34 | |
import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic; |
35 | |
import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces; |
36 | |
import org.apache.giraph.block_app.framework.output.BlockOutputHandle; |
37 | |
import org.apache.giraph.conf.BooleanConfOption; |
38 | |
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
39 | |
import org.apache.giraph.conf.IntConfOption; |
40 | |
import org.apache.giraph.graph.OnlyIdVertex; |
41 | |
import org.apache.giraph.graph.Vertex; |
42 | |
import org.apache.giraph.io.SimpleVertexWriter; |
43 | |
import org.apache.giraph.partition.Partition; |
44 | |
import org.apache.giraph.utils.InternalVertexRunner; |
45 | |
import org.apache.giraph.utils.TestGraph; |
46 | |
import org.apache.giraph.utils.Trimmable; |
47 | |
import org.apache.giraph.utils.WritableUtils; |
48 | |
import org.apache.giraph.writable.kryo.KryoWritableWrapper; |
49 | |
import org.apache.hadoop.io.Writable; |
50 | |
import org.apache.hadoop.io.WritableComparable; |
51 | |
import org.apache.hadoop.util.Progressable; |
52 | |
|
53 | |
import com.google.common.base.Preconditions; |
54 | |
import com.google.common.collect.Iterables; |
55 | |
|
56 | |
|
57 | |
|
58 | |
|
59 | |
|
60 | |
|
61 | |
|
62 | |
@SuppressWarnings({ "rawtypes", "unchecked" }) |
63 | |
public class LocalBlockRunner { |
64 | |
|
65 | 0 | public static final IntConfOption NUM_THREADS = new IntConfOption( |
66 | |
"test.LocalBlockRunner.NUM_THREADS", 3, ""); |
67 | |
|
68 | 0 | public static final IntConfOption NUM_PARTITIONS = new IntConfOption( |
69 | |
"test.LocalBlockRunner.NUM_PARTITIONS", 16, ""); |
70 | |
|
71 | |
|
72 | |
|
73 | |
|
74 | 0 | public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption( |
75 | |
"test.LocalBlockRunner.RUN_ALL_CHECKS", true, ""); |
76 | |
|
77 | 0 | public static final BooleanConfOption SERIALIZE_MASTER = |
78 | |
new BooleanConfOption( |
79 | |
"test.LocalBlockRunner.SERIALIZE_MASTER", false, ""); |
80 | |
|
81 | 0 | private LocalBlockRunner() { } |
82 | |
|
83 | |
|
84 | |
|
85 | |
|
86 | |
|
87 | |
|
88 | |
|
89 | |
|
90 | |
public static |
91 | |
<I extends WritableComparable, V extends Writable, E extends Writable> |
92 | |
TestGraph<I, V, E> runApp( |
93 | |
TestGraph<I, V, E> graph, boolean useFullDigraphTests) throws Exception { |
94 | 0 | if (useFullDigraphTests) { |
95 | 0 | return InternalVertexRunner.runWithInMemoryOutput(graph.getConf(), graph); |
96 | |
} else { |
97 | 0 | runApp(graph); |
98 | 0 | return graph; |
99 | |
} |
100 | |
} |
101 | |
|
102 | |
|
103 | |
|
104 | |
|
105 | |
|
106 | |
public static |
107 | |
<I extends WritableComparable, V extends Writable, E extends Writable> |
108 | |
void runApp(TestGraph<I, V, E> graph) { |
109 | 0 | SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver(); |
110 | 0 | runAppWithVertexOutput(graph, noOpVertexSaver); |
111 | 0 | } |
112 | |
|
113 | |
|
114 | |
|
115 | |
|
116 | |
|
117 | |
public static |
118 | |
<I extends WritableComparable, V extends Writable, E extends Writable> |
119 | |
void runBlock( |
120 | |
TestGraph<I, V, E> graph, Block block, Object executionStage) { |
121 | 0 | SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver(); |
122 | 0 | runBlockWithVertexOutput( |
123 | |
block, executionStage, graph, noOpVertexSaver); |
124 | 0 | } |
125 | |
|
126 | |
|
127 | |
|
128 | |
|
129 | |
|
130 | |
|
131 | |
public static |
132 | |
<I extends WritableComparable, V extends Writable, E extends Writable> |
133 | |
void runAppWithVertexOutput( |
134 | |
TestGraph<I, V, E> graph, final SimpleVertexWriter<I, V, E> vertexSaver) { |
135 | 0 | BlockFactory<?> factory = BlockUtils.createBlockFactory(graph.getConf()); |
136 | 0 | runBlockWithVertexOutput( |
137 | 0 | factory.createBlock(graph.getConf()), |
138 | 0 | factory.createExecutionStage(graph.getConf()), |
139 | |
graph, vertexSaver); |
140 | 0 | } |
141 | |
|
142 | |
|
143 | |
|
144 | |
|
145 | |
|
146 | |
public static |
147 | |
<I extends WritableComparable, V extends Writable, E extends Writable> |
148 | |
void runBlockWithVertexOutput( |
149 | |
Block block, Object executionStage, TestGraph<I, V, E> graph, |
150 | |
final SimpleVertexWriter<I, V, E> vertexSaver |
151 | |
) { |
152 | 0 | Preconditions.checkNotNull(block); |
153 | 0 | Preconditions.checkNotNull(graph); |
154 | 0 | ImmutableClassesGiraphConfiguration<I, V, E> conf = graph.getConf(); |
155 | 0 | int numThreads = NUM_THREADS.get(conf); |
156 | 0 | int numPartitions = NUM_PARTITIONS.get(conf); |
157 | 0 | boolean runAllChecks = RUN_ALL_CHECKS.get(conf); |
158 | 0 | boolean serializeMaster = SERIALIZE_MASTER.get(conf); |
159 | 0 | final boolean doOutputDuringComputation = conf.doOutputDuringComputation(); |
160 | |
|
161 | 0 | final InternalApi internalApi = |
162 | |
new InternalApi(graph, conf, numPartitions, runAllChecks); |
163 | 0 | final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi(); |
164 | |
|
165 | 0 | BlockUtils.checkBlockTypes(block, executionStage, conf); |
166 | |
|
167 | 0 | BlockMasterLogic<Object> blockMasterLogic = new BlockMasterLogic<>(); |
168 | 0 | blockMasterLogic.initialize(block, executionStage, internalApi); |
169 | |
|
170 | 0 | BlockWorkerContextLogic workerContextLogic = |
171 | 0 | internalApi.getWorkerContextLogic(); |
172 | 0 | workerContextLogic.preApplication(internalWorkerApi, |
173 | 0 | new BlockOutputHandle("", conf, new Progressable() { |
174 | |
@Override |
175 | |
public void progress() { |
176 | 0 | } |
177 | |
})); |
178 | |
|
179 | 0 | ExecutorService executor = Executors.newFixedThreadPool(numThreads); |
180 | |
|
181 | 0 | if (runAllChecks) { |
182 | 0 | for (Vertex<I, V, E> vertex : graph) { |
183 | 0 | V value = conf.createVertexValue(); |
184 | 0 | WritableUtils.copyInto(vertex.getValue(), value); |
185 | 0 | vertex.setValue(value); |
186 | |
|
187 | 0 | vertex.setEdges((Iterable) WritableUtils.createCopy( |
188 | 0 | (Writable) vertex.getEdges(), conf.getOutEdgesClass(), conf)); |
189 | 0 | } |
190 | |
} |
191 | |
|
192 | 0 | final AtomicBoolean anyVertexAlive = new AtomicBoolean(true); |
193 | |
|
194 | 0 | for (int superstep = 0;; superstep++) { |
195 | |
|
196 | 0 | if (serializeMaster) { |
197 | 0 | blockMasterLogic = (BlockMasterLogic) WritableUtils.createCopy( |
198 | |
new KryoWritableWrapper<>(blockMasterLogic), |
199 | |
KryoWritableWrapper.class, |
200 | 0 | conf).get(); |
201 | 0 | blockMasterLogic.initializeAfterRead(internalApi); |
202 | |
} |
203 | |
|
204 | 0 | if (!anyVertexAlive.get()) { |
205 | 0 | break; |
206 | |
} |
207 | |
|
208 | 0 | final BlockWorkerPieces workerPieces = |
209 | 0 | blockMasterLogic.computeNext(superstep); |
210 | 0 | if (workerPieces == null) { |
211 | 0 | if (!conf.doOutputDuringComputation()) { |
212 | 0 | List<Partition<I, V, E>> partitions = internalApi.getPartitions(); |
213 | 0 | for (Partition<I, V, E> partition : partitions) { |
214 | 0 | for (Vertex<I, V, E> vertex : partition) { |
215 | |
try { |
216 | 0 | vertexSaver.writeVertex(vertex); |
217 | 0 | } catch (IOException | InterruptedException e) { |
218 | 0 | throw new RuntimeException(e); |
219 | 0 | } |
220 | 0 | } |
221 | 0 | } |
222 | |
} |
223 | 0 | int left = executor.shutdownNow().size(); |
224 | 0 | Preconditions.checkState(0 == left, "Some work still left to be done?"); |
225 | 0 | break; |
226 | |
} else { |
227 | 0 | internalApi.afterMasterBeforeWorker(workerPieces); |
228 | 0 | List<Partition<I, V, E>> partitions = internalApi.getPartitions(); |
229 | |
|
230 | 0 | workerContextLogic.preSuperstep( |
231 | |
internalWorkerApi, |
232 | |
internalWorkerApi, |
233 | 0 | KryoWritableWrapper.wrapAndCopy(workerPieces), superstep, |
234 | 0 | internalApi.takeWorkerMessages()); |
235 | |
|
236 | 0 | final CountDownLatch latch = new CountDownLatch(numPartitions); |
237 | 0 | final AtomicReference<Throwable> exception = new AtomicReference<>(); |
238 | 0 | anyVertexAlive.set(false); |
239 | 0 | for (final Partition<I, V, E> partition : partitions) { |
240 | 0 | executor.execute(new Runnable() { |
241 | |
@Override |
242 | |
public void run() { |
243 | |
try { |
244 | 0 | boolean anyCurVertexAlive = false; |
245 | 0 | BlockWorkerPieces localPieces = |
246 | 0 | KryoWritableWrapper.wrapAndCopy(workerPieces); |
247 | |
|
248 | 0 | BlockWorkerLogic localLogic = new BlockWorkerLogic(localPieces); |
249 | 0 | localLogic.preSuperstep(internalWorkerApi, internalWorkerApi); |
250 | |
|
251 | 0 | if (internalApi.ignoreExistingVertices()) { |
252 | 0 | Iterable<I> destinations = |
253 | 0 | internalApi.getPartitionDestinationVertices( |
254 | 0 | partition.getId()); |
255 | 0 | if (!Iterables.isEmpty(destinations)) { |
256 | 0 | OnlyIdVertex<I> vertex = new OnlyIdVertex<>(); |
257 | |
|
258 | 0 | for (I vertexId : destinations) { |
259 | 0 | Iterable messages = internalApi.takeMessages(vertexId); |
260 | 0 | Preconditions.checkState(!Iterables.isEmpty(messages)); |
261 | 0 | vertex.setId(vertexId); |
262 | 0 | localLogic.compute(vertex, messages); |
263 | |
|
264 | 0 | anyCurVertexAlive = true; |
265 | 0 | } |
266 | |
} |
267 | 0 | } else { |
268 | 0 | for (Vertex<I, V, E> vertex : partition) { |
269 | 0 | Iterable messages = |
270 | 0 | internalApi.takeMessages(vertex.getId()); |
271 | 0 | if (vertex.isHalted() && !Iterables.isEmpty(messages)) { |
272 | 0 | vertex.wakeUp(); |
273 | |
} |
274 | |
|
275 | 0 | if (!vertex.isHalted()) { |
276 | 0 | localLogic.compute(vertex, messages); |
277 | |
|
278 | |
|
279 | 0 | vertex.unwrapMutableEdges(); |
280 | |
|
281 | 0 | if (vertex instanceof Trimmable) { |
282 | 0 | ((Trimmable) vertex).trim(); |
283 | |
} |
284 | |
|
285 | |
|
286 | 0 | if (doOutputDuringComputation) { |
287 | 0 | vertexSaver.writeVertex(vertex); |
288 | |
} |
289 | |
|
290 | 0 | partition.saveVertex(vertex); |
291 | |
} |
292 | |
|
293 | 0 | if (!vertex.isHalted()) { |
294 | 0 | anyCurVertexAlive = true; |
295 | |
} |
296 | 0 | } |
297 | |
} |
298 | |
|
299 | 0 | if (anyCurVertexAlive) { |
300 | 0 | anyVertexAlive.set(true); |
301 | |
} |
302 | 0 | localLogic.postSuperstep(); |
303 | |
|
304 | |
|
305 | 0 | } catch (Throwable t) { |
306 | |
|
307 | 0 | t.printStackTrace(); |
308 | 0 | exception.set(t); |
309 | 0 | } |
310 | |
|
311 | 0 | latch.countDown(); |
312 | 0 | } |
313 | |
}); |
314 | 0 | } |
315 | |
|
316 | |
try { |
317 | 0 | latch.await(); |
318 | 0 | } catch (InterruptedException e) { |
319 | 0 | throw new RuntimeException("Thread intentionally interrupted", e); |
320 | 0 | } |
321 | |
|
322 | 0 | if (exception.get() != null) { |
323 | 0 | throw new RuntimeException("Worker failed", exception.get()); |
324 | |
} |
325 | |
|
326 | 0 | workerContextLogic.postSuperstep(); |
327 | |
|
328 | 0 | internalApi.afterWorkerBeforeMaster(); |
329 | |
} |
330 | |
} |
331 | |
|
332 | 0 | workerContextLogic.postApplication(); |
333 | 0 | internalApi.postApplication(); |
334 | 0 | } |
335 | |
|
336 | |
private static |
337 | |
<I extends WritableComparable, E extends Writable, V extends Writable> |
338 | |
SimpleVertexWriter<I, V, E> noOpVertexSaver() { |
339 | 0 | return new SimpleVertexWriter<I, V, E>() { |
340 | |
@Override |
341 | |
public void writeVertex(Vertex<I, V, E> vertex) |
342 | |
throws IOException, InterruptedException { |
343 | |
|
344 | 0 | } |
345 | |
}; |
346 | |
} |
347 | |
|
348 | |
} |