1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm;
20
21 import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
22 import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
23
24 import java.util.Iterator;
25
26 import org.apache.giraph.bsp.CentralizedServiceWorker;
27 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
28 import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
29 import org.apache.giraph.comm.requests.WritableRequest;
30 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31 import org.apache.giraph.edge.Edge;
32 import org.apache.giraph.factories.MessageValueFactory;
33 import org.apache.giraph.graph.Vertex;
34 import org.apache.giraph.partition.PartitionOwner;
35 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
36 import org.apache.giraph.utils.PairList;
37 import org.apache.giraph.utils.VertexIdMessages;
38 import org.apache.giraph.worker.WorkerInfo;
39 import org.apache.hadoop.io.Writable;
40 import org.apache.hadoop.io.WritableComparable;
41 import org.apache.log4j.Logger;
42
43
44
45
46
47
48
49
50 @SuppressWarnings("unchecked")
51 public class SendMessageCache<I extends WritableComparable, M extends Writable>
52 extends SendVertexIdDataCache<I, M, VertexIdMessages<I, M>> {
53
54 private static final Logger LOG =
55 Logger.getLogger(SendMessageCache.class);
56
57 protected long totalMsgsSentInSuperstep = 0;
58
59 protected long totalMsgBytesSentInSuperstep = 0;
60
61 protected final int maxMessagesSizePerWorker;
62
63 protected final NettyWorkerClientRequestProcessor<I, ?, ?> clientProcessor;
64
65 protected MessageValueFactory<M> messageValueFactory;
66
67
68
69
70
71
72
73
74 public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
75 CentralizedServiceWorker<?, ?, ?> serviceWorker,
76 NettyWorkerClientRequestProcessor<I, ?, ?> processor,
77 int maxMsgSize) {
78 super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf),
79 ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
80 maxMessagesSizePerWorker = maxMsgSize;
81 clientProcessor = processor;
82 messageValueFactory =
83 conf.createOutgoingMessageValueFactory();
84 }
85
86 @Override
87 public VertexIdMessages<I, M> createVertexIdData() {
88 return new ByteArrayVertexIdMessages<I, M>(messageValueFactory);
89 }
90
91
92
93
94
95
96
97
98
99
100 public int addMessage(WorkerInfo workerInfo,
101 int partitionId, I destVertexId, M message) {
102 return addData(workerInfo, partitionId, destVertexId, message);
103 }
104
105
106
107
108
109
110
111
112
113
114
115 protected int addMessage(WorkerInfo workerInfo, int partitionId,
116 byte[] serializedId, int idSerializerPos, M message) {
117 return addData(
118 workerInfo, partitionId, serializedId,
119 idSerializerPos, message);
120 }
121
122
123
124
125
126
127
128
129
130 protected PairList<Integer, VertexIdMessages<I, M>>
131 removeWorkerMessages(WorkerInfo workerInfo) {
132 return removeWorkerData(workerInfo);
133 }
134
135
136
137
138
139
140 private PairList<WorkerInfo, PairList<
141 Integer, VertexIdMessages<I, M>>> removeAllMessages() {
142 return removeAllData();
143 }
144
145
146
147
148
149
150
151 public void sendMessageRequest(I destVertexId, M message) {
152 PartitionOwner owner =
153 getServiceWorker().getVertexPartitionOwner(destVertexId);
154 WorkerInfo workerInfo = owner.getWorkerInfo();
155 final int partitionId = owner.getPartitionId();
156 if (LOG.isTraceEnabled()) {
157 LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
158 ") to " + destVertexId + " on worker " + workerInfo);
159 }
160 ++totalMsgsSentInSuperstep;
161
162 int workerMessageSize = addMessage(
163 workerInfo, partitionId, destVertexId, message);
164
165
166 if (workerMessageSize >= maxMessagesSizePerWorker) {
167 PairList<Integer, VertexIdMessages<I, M>>
168 workerMessages = removeWorkerMessages(workerInfo);
169 WritableRequest writableRequest =
170 new SendWorkerMessagesRequest<I, M>(workerMessages);
171 totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
172 clientProcessor.doRequest(workerInfo, writableRequest);
173
174 getServiceWorker().getGraphTaskManager().notifySentMessages();
175 }
176 }
177
178
179
180
181
182 public static class TargetVertexIdIterator<I extends WritableComparable>
183 implements Iterator<I> {
184
185 private final Iterator<Edge<I, Writable>> edgesIterator;
186
187
188
189
190
191
192 public TargetVertexIdIterator(Vertex<I, ?, ?> vertex) {
193 edgesIterator =
194 ((Vertex<I, Writable, Writable>) vertex).getEdges().iterator();
195 }
196
197 @Override
198 public boolean hasNext() {
199 return edgesIterator.hasNext();
200 }
201
202 @Override
203 public I next() {
204 return edgesIterator.next().getTargetVertexId();
205 }
206
207 @Override
208 public void remove() {
209 throw new UnsupportedOperationException();
210 }
211 }
212
213
214
215
216
217
218
219 public void sendMessageToAllRequest(Vertex<I, ?, ?> vertex, M message) {
220 TargetVertexIdIterator targetVertexIterator =
221 new TargetVertexIdIterator(vertex);
222 sendMessageToAllRequest(targetVertexIterator, message);
223 }
224
225
226
227
228
229
230
231 public void sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
232 while (vertexIdIterator.hasNext()) {
233 sendMessageRequest(vertexIdIterator.next(), message);
234 }
235 }
236
237
238
239
240 public void flush() {
241 PairList<WorkerInfo, PairList<Integer,
242 VertexIdMessages<I, M>>>
243 remainingMessageCache = removeAllMessages();
244 PairList<WorkerInfo, PairList<
245 Integer, VertexIdMessages<I, M>>>.Iterator
246 iterator = remainingMessageCache.getIterator();
247 while (iterator.hasNext()) {
248 iterator.next();
249 WritableRequest writableRequest =
250 new SendWorkerMessagesRequest<I, M>(
251 iterator.getCurrentSecond());
252 totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
253 clientProcessor.doRequest(
254 iterator.getCurrentFirst(), writableRequest);
255 }
256 }
257
258
259
260
261
262
263 public long resetMessageCount() {
264 long messagesSentInSuperstep = totalMsgsSentInSuperstep;
265 totalMsgsSentInSuperstep = 0;
266 return messagesSentInSuperstep;
267 }
268
269
270
271
272
273
274 public long resetMessageBytesCount() {
275 long messageBytesSentInSuperstep = totalMsgBytesSentInSuperstep;
276 totalMsgBytesSentInSuperstep = 0;
277 return messageBytesSentInSuperstep;
278 }
279 }