1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26 import java.util.Map.Entry;
27
28 import org.apache.giraph.bsp.CentralizedServiceWorker;
29 import org.apache.giraph.comm.ServerData;
30 import org.apache.giraph.comm.messages.MessageStore;
31 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32 import org.apache.giraph.partition.PartitionOwner;
33 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
34 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
35 import org.apache.giraph.utils.VertexIdMessageIterator;
36 import org.apache.hadoop.io.Writable;
37 import org.apache.hadoop.io.WritableComparable;
38
39
40
41
42
43
44
45 @SuppressWarnings("unchecked")
46 public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
47 M extends Writable> extends WritableRequest<I, Writable, Writable>
48 implements WorkerRequest<I, Writable, Writable> {
49
50 protected ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds;
51
52
53
54
55 public SendWorkerOneMessageToManyRequest() { }
56
57
58
59
60
61
62
63 public SendWorkerOneMessageToManyRequest(
64 ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds,
65 ImmutableClassesGiraphConfiguration conf) {
66 this.oneMessageToManyIds = oneMessageToManyIds;
67 setConf(conf);
68 }
69
70 @Override
71 public RequestType getType() {
72 return RequestType.SEND_WORKER_ONE_MESSAGE_TO_MANY_REQUEST;
73 }
74
75 @Override
76 public void readFieldsRequest(DataInput input) throws IOException {
77 oneMessageToManyIds = new ByteArrayOneMessageToManyIds<>(
78 getConf().<M>createOutgoingMessageValueFactory());
79 oneMessageToManyIds.setConf(getConf());
80 oneMessageToManyIds.readFields(input);
81 }
82
83 @Override
84 public void writeRequest(DataOutput output) throws IOException {
85 this.oneMessageToManyIds.write(output);
86 }
87
88 @Override
89 public int getSerializedSize() {
90 return super.getSerializedSize() +
91 this.oneMessageToManyIds.getSerializedSize();
92 }
93
94 @Override
95 public void doRequest(ServerData serverData) {
96 MessageStore<I, M> messageStore = serverData.getIncomingMessageStore();
97 if (messageStore.isPointerListEncoding()) {
98
99 messageStore.addPartitionMessages(-1, oneMessageToManyIds);
100 } else {
101 CentralizedServiceWorker<I, ?, ?> serviceWorker =
102 serverData.getServiceWorker();
103
104
105
106
107
108
109
110
111
112 int initialSize = oneMessageToManyIds.getSize() /
113 serverData.getPartitionStore().getNumPartitions() * 2;
114
115
116 Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> partitionIdMsgs =
117 new Int2ObjectOpenHashMap<>();
118
119
120
121 VertexIdMessageIterator<I, M> vertexIdMessageIterator =
122 oneMessageToManyIds.getVertexIdMessageIterator();
123 while (vertexIdMessageIterator.hasNext()) {
124 vertexIdMessageIterator.next();
125 M msg = vertexIdMessageIterator.getCurrentMessage();
126 I vertexId = vertexIdMessageIterator.getCurrentVertexId();
127 PartitionOwner owner =
128 serviceWorker.getVertexPartitionOwner(vertexId);
129 int partitionId = owner.getPartitionId();
130 ByteArrayVertexIdMessages<I, M> idMsgs = partitionIdMsgs
131 .get(partitionId);
132 if (idMsgs == null) {
133 idMsgs = new ByteArrayVertexIdMessages<>(
134 getConf().<M>createOutgoingMessageValueFactory());
135 idMsgs.setConf(getConf());
136 idMsgs.initialize(initialSize);
137 partitionIdMsgs.put(partitionId, idMsgs);
138 }
139 idMsgs.add(vertexId, msg);
140 }
141
142
143 for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
144 partitionIdMsgs.entrySet()) {
145 if (!idMsgs.getValue().isEmpty()) {
146 serverData.getIncomingMessageStore().addPartitionMessages(
147 idMsgs.getKey(), idMsgs.getValue());
148 }
149 }
150 }
151 }
152 }