1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.internal;
19
20 import java.util.List;
21
22 import org.apache.giraph.block_app.framework.BlockUtils;
23 import org.apache.giraph.block_app.framework.api.BlockWorkerContextApi;
24 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
25 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
26 import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.log4j.Logger;
29
30
31
32
33 @SuppressWarnings({ "rawtypes" })
34 public class BlockWorkerContextLogic {
35 public static final Logger LOG =
36 Logger.getLogger(BlockWorkerContextLogic.class);
37
38 private Object workerValue;
39 private BlockWorkerPieces workerPieces;
40 private BlockOutputHandle outputHandle;
41
42 private transient BlockWorkerContextSendApi sendApi;
43
44 public BlockWorkerContextLogic() {
45 }
46
47 public void preApplication(BlockWorkerContextApi api,
48 BlockOutputHandle outputHandle) {
49 workerValue =
50 BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.newInstance(api.getConf());
51 this.outputHandle = outputHandle;
52 }
53
54 public Object getWorkerValue() {
55 return workerValue;
56 }
57
58 public BlockOutputHandle getOutputHandle() {
59 return outputHandle;
60 }
61
62 @SuppressWarnings("unchecked")
63 public void preSuperstep(
64 BlockWorkerContextReceiveApi receiveApi,
65 BlockWorkerContextSendApi sendApi,
66 BlockWorkerPieces workerPieces, long superstep,
67 List<Writable> messages) {
68 workerPieces.getBlockApiHandle().setWorkerContextReceiveApi(receiveApi);
69 workerPieces.getBlockApiHandle().setWorkerContextSendApi(sendApi);
70 if (BlockUtils.LOG_EXECUTION_STATUS.get(receiveApi.getConf())) {
71 LOG.info("Worker executing " + workerPieces + " in " + superstep +
72 " superstep");
73 }
74 this.sendApi = sendApi;
75 this.workerPieces = workerPieces;
76 if (workerPieces.getReceiver() != null) {
77 workerPieces.getReceiver().workerContextReceive(
78 receiveApi, workerValue, messages);
79 }
80 }
81
82 public void postSuperstep() {
83 if (workerPieces.getSender() != null) {
84 workerPieces.getSender().workerContextSend(sendApi, workerValue);
85 }
86 workerPieces = null;
87 sendApi = null;
88 outputHandle.returnAllWriters();
89 }
90
91 public void postApplication() {
92 outputHandle.closeAllWriters();
93
94 }
95 }