1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24
25 import org.apache.giraph.comm.ServerData;
26 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29
30
31
32
33
34
35
36
37
38
39 public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
40 V extends Writable, E extends Writable, M extends Writable> extends
41 WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
42
43 private int partitionId;
44
45 private ByteArrayVertexIdMessages<I, M> vertexIdMessageMap;
46
47
48 public SendPartitionCurrentMessagesRequest() { }
49
50
51
52
53
54
55
56 public SendPartitionCurrentMessagesRequest(int partitionId,
57 ByteArrayVertexIdMessages<I, M> vertexIdMessages) {
58 super();
59 this.partitionId = partitionId;
60 this.vertexIdMessageMap = vertexIdMessages;
61 }
62
63 @Override
64 public RequestType getType() {
65 return RequestType.SEND_PARTITION_CURRENT_MESSAGES_REQUEST;
66 }
67
68 @Override
69 public void readFieldsRequest(DataInput input) throws IOException {
70 partitionId = input.readInt();
71
72
73 vertexIdMessageMap = new ByteArrayVertexIdMessages<>(
74 getConf().<M>createIncomingMessageValueFactory());
75 vertexIdMessageMap.setConf(getConf());
76 vertexIdMessageMap.initialize();
77 vertexIdMessageMap.readFields(input);
78 }
79
80 @Override
81 public void writeRequest(DataOutput output) throws IOException {
82 output.writeInt(partitionId);
83 vertexIdMessageMap.write(output);
84 }
85
86 @Override
87 public void doRequest(ServerData<I, V, E> serverData) {
88 serverData.<M>getCurrentMessageStore().addPartitionMessages(partitionId,
89 vertexIdMessageMap);
90 }
91
92 @Override
93 public int getSerializedSize() {
94 return super.getSerializedSize() + 4 +
95 vertexIdMessageMap.getSerializedSize();
96 }
97 }