1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.internal;
19
20 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
21 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
22 import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender;
23 import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
24 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
25 import org.apache.giraph.graph.Vertex;
26
27
28
29
30 @SuppressWarnings({ "rawtypes", "unchecked" })
31 public class BlockWorkerLogic {
32 private final BlockWorkerPieces pieces;
33
34 private transient VertexReceiver receiveFunctions;
35 private transient InnerVertexSender sendFunctions;
36
37 public BlockWorkerLogic(BlockWorkerPieces pieces) {
38 this.pieces = pieces;
39 }
40
41 public void preSuperstep(
42 BlockWorkerReceiveApi receiveApi, BlockWorkerSendApi sendApi) {
43 pieces.getBlockApiHandle().setWorkerReceiveApi(receiveApi);
44 pieces.getBlockApiHandle().setWorkerSendApi(sendApi);
45 if (pieces.getReceiver() != null) {
46 receiveFunctions = pieces.getReceiver().getVertexReceiver(receiveApi);
47 }
48 if (pieces.getSender() != null) {
49 sendFunctions = pieces.getSender().getVertexSender(sendApi);
50 }
51 }
52
53 public void compute(Vertex vertex, Iterable messages) {
54 if (receiveFunctions != null) {
55 receiveFunctions.vertexReceive(vertex, messages);
56 }
57 if (sendFunctions != null) {
58 sendFunctions.vertexSend(vertex);
59 }
60 }
61
62 public void postSuperstep() {
63 if (receiveFunctions instanceof VertexPostprocessor) {
64 ((VertexPostprocessor) receiveFunctions).postprocess();
65 }
66 if (sendFunctions != null) {
67 sendFunctions.postprocess();
68 }
69 }
70 }