1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import org.apache.giraph.comm.ServerData;
22 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
23 import org.apache.giraph.utils.PairList;
24 import org.apache.giraph.utils.VertexIdMessages;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27
28
29
30
31
32
33
34 @SuppressWarnings("unchecked")
35 public class SendWorkerMessagesRequest<I extends WritableComparable,
36 M extends Writable> extends SendWorkerDataRequest<I, M,
37 VertexIdMessages<I, M>> {
38
39
40 public SendWorkerMessagesRequest() {
41 }
42
43
44
45
46
47
48
49 public SendWorkerMessagesRequest(
50 PairList<Integer, VertexIdMessages<I, M>> partVertMsgs) {
51 this.partitionVertexData = partVertMsgs;
52 }
53
54 @Override
55 public VertexIdMessages<I, M> createVertexIdData() {
56 return new ByteArrayVertexIdMessages<I, M>(
57 getConf().createOutgoingMessageValueFactory());
58 }
59
60 @Override
61 public RequestType getType() {
62 return RequestType.SEND_WORKER_MESSAGES_REQUEST;
63 }
64
65 @Override
66 public void doRequest(ServerData serverData) {
67 PairList<Integer, VertexIdMessages<I, M>>.Iterator
68 iterator = partitionVertexData.getIterator();
69 while (iterator.hasNext()) {
70 iterator.next();
71 serverData.getIncomingMessageStore().
72 addPartitionMessages(iterator.getCurrentFirst(),
73 iterator.getCurrentSecond());
74 }
75 }
76 }