1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.api.giraph;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.util.List;
24
25 import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
26 import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
27 import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
28 import org.apache.giraph.worker.WorkerContext;
29 import org.apache.giraph.writable.kryo.HadoopKryo;
30 import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.hadoop.io.WritableComparable;
33 import org.apache.log4j.Logger;
34
35
36
37
38
39 public final class BlockWorkerContext extends WorkerContext
40 implements KryoIgnoreWritable {
41 public static final Logger LOG = Logger.getLogger(BlockWorkerContext.class);
42
43 private BlockWorkerContextLogic workerLogic;
44
45 @Override
46 public void preApplication()
47 throws InstantiationException, IllegalAccessException {
48 workerLogic = new BlockWorkerContextLogic();
49 workerLogic.preApplication(new BlockWorkerContextApiWrapper<>(this),
50 new BlockOutputHandle(getContext().getJobID().toString(),
51 getConf(), getContext()));
52 }
53
54 @Override
55 public void preSuperstep() {
56 List<Writable> messages = getAndClearMessagesFromOtherWorkers();
57 BlockWorkerContextApiWrapper<WritableComparable, Writable> workerApi =
58 new BlockWorkerContextApiWrapper<>(this);
59 BlockWorkerPieces<Object> workerPieces =
60 BlockWorkerPieces.getNextWorkerPieces(this);
61
62 LOG.info("PassedComputation in " + getSuperstep() +
63 " superstep executing " + workerPieces);
64
65 workerLogic.preSuperstep(
66 workerApi, workerApi, workerPieces, getSuperstep(), messages);
67 }
68
69 @Override
70 public void postSuperstep() {
71 workerLogic.postSuperstep();
72 }
73
74 @Override
75 public void postApplication() {
76 workerLogic.postApplication();
77 }
78
79 public Object getWorkerValue() {
80 return workerLogic.getWorkerValue();
81 }
82
83 public BlockOutputHandle getOutputHandle() {
84 return workerLogic.getOutputHandle();
85 }
86
87
88
89
90
91
92
93 @Override
94 public void write(DataOutput out) throws IOException {
95 HadoopKryo.writeClassAndObj(out, workerLogic);
96 }
97
98 @Override
99 public void readFields(DataInput in) throws IOException {
100 workerLogic = HadoopKryo.readClassAndObj(in);
101 workerLogic.getOutputHandle().initialize(getConf(), getContext());
102 }
103 }