1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.api.giraph;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
25 import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
26 import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
27 import org.apache.giraph.master.MasterCompute;
28 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
29
30
31
32
33
34
35 public final class BlockMasterCompute<S> extends MasterCompute {
36 private BlockMasterLogic<S> blockMasterLogic = new BlockMasterLogic<>();
37
38 @Override
39 public void initialize() throws InstantiationException,
40 IllegalAccessException {
41 blockMasterLogic.initialize(getConf(), new BlockMasterApiWrapper(this,
42 new BlockOutputHandle(getContext().getJobID().toString(),
43 getConf(), getContext())));
44 }
45
46 @Override
47 public void compute() {
48 BlockWorkerPieces<S> workerPieces =
49 blockMasterLogic.computeNext(getSuperstep());
50 if (workerPieces == null) {
51 haltComputation();
52 } else {
53 BlockWorkerPieces.setNextWorkerPieces(this, workerPieces);
54 }
55 }
56
57 @Override
58 public void write(DataOutput out) throws IOException {
59 new KryoWritableWrapper<>(blockMasterLogic).write(out);
60 }
61
62 @Override
63 public void readFields(DataInput in) throws IOException {
64 KryoWritableWrapper<BlockMasterLogic<S>> object =
65 new KryoWritableWrapper<>();
66 object.readFields(in);
67 blockMasterLogic = object.get();
68 blockMasterLogic.initializeAfterRead(new BlockMasterApiWrapper(this,
69 new BlockOutputHandle()));
70 }
71 }