1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24
25 import org.apache.giraph.examples.utils.BrachaTouegDeadlockVertexValue;
26 import org.apache.giraph.examples.utils.BrachaTouegDeadlockMessage;
27 import org.apache.giraph.conf.LongConfOption;
28 import org.apache.giraph.edge.Edge;
29 import org.apache.giraph.graph.BasicComputation;
30 import org.apache.giraph.graph.Vertex;
31 import org.apache.hadoop.io.LongWritable;
32 import org.apache.log4j.Logger;
33
34
35
36
37
38
39
40
41
42
43
44
45 @Algorithm(
46 name = "Bracha Toueg deadlock detection"
47 )
48 public class BrachaTouegDeadlockComputation
49 extends BasicComputation<LongWritable, BrachaTouegDeadlockVertexValue,
50 LongWritable, BrachaTouegDeadlockMessage> {
51
52
53 public static final LongConfOption BRACHA_TOUEG_DL_INITIATOR_ID =
54 new LongConfOption("BrachaTouegDeadlockVertex.initiatorId", 1,
55 "The deadlock detection initiator id");
56
57
58 private static final Logger LOG =
59 Logger.getLogger(BrachaTouegDeadlockComputation.class);
60
61 @Override
62 public void compute(
63 Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
64 Iterable<BrachaTouegDeadlockMessage> messages)
65 throws IOException {
66
67 BrachaTouegDeadlockVertexValue value;
68 long superstep = getSuperstep();
69
70 if (superstep == 0) {
71
72
73 initAlgorithm(vertex);
74
75
76
77 } else if (superstep == 1) {
78
79 value = vertex.getValue();
80
81 if (LOG.isDebugEnabled()) {
82 LOG.debug("Vertex ID " + vertex.getId() + " status is:");
83 LOG.debug("\tpending requests? " + value.hasPendingRequests());
84 LOG.debug("\tis free? " + value.isFree());
85 LOG.debug("\tis notified? " + value.isNotified());
86 }
87
88
89 for (BrachaTouegDeadlockMessage message : messages) {
90 value.addParent(Long.valueOf(message.getSenderId()));
91 }
92
93
94 if (LOG.isDebugEnabled()) {
95 logParents(vertex);
96 if (isInitiator(vertex)) {
97 LOG.debug("Vertex ID " + vertex.getId() + " start the algorithm.");
98 }
99 }
100
101 if (isInitiator(vertex)) {
102
103 notifyVertices(vertex);
104 } else {
105
106
107
108
109
110
111
112 vertex.voteToHalt();
113 return;
114 }
115
116
117 } else {
118 Long ackSenderId;
119
120 value = vertex.getValue();
121
122
123
124 for (BrachaTouegDeadlockMessage message : messages) {
125 long type = message.getType();
126
127 if (LOG.isDebugEnabled()) {
128 LOG.debug("Vertex ID " + vertex.getId() + " received: " + message);
129 }
130
131 if (type == BrachaTouegDeadlockMessage.NOTIFY) {
132 handleNotifyMessage(vertex, message);
133 } else if (type == BrachaTouegDeadlockMessage.GRANT) {
134 handleGrantMessage(vertex, message);
135 } else if (type == BrachaTouegDeadlockMessage.DONE ||
136 type == BrachaTouegDeadlockMessage.ACK) {
137
138
139
140 value.receivedMessage(message.getSenderId(), message.getType());
141 }
142 }
143
144 ackSenderId = value.getIdWithInHoldAck();
145 if (value.isFree() &&
146 !value.isWaitingForMessage(BrachaTouegDeadlockMessage.ACK) &&
147 !ackSenderId.equals(BrachaTouegDeadlockVertexValue.INVALID_ID)) {
148
149 sendAckMessage(ackSenderId, vertex);
150 value.setIdWithInHoldAck(BrachaTouegDeadlockVertexValue.INVALID_ID);
151 }
152
153
154
155 if (value.isNotified() &&
156 !value.isWaitingForMessage(BrachaTouegDeadlockMessage.ACK) &&
157 !value.isWaitingForMessage(BrachaTouegDeadlockMessage.DONE)) {
158
159 Long senderId = value.getIdWithInHoldDone();
160
161 if (LOG.isDebugEnabled()) {
162 LOG.debug("Vertex ID " + vertex.getId() +
163 " sent the last DONE message.");
164 LOG.debug("Vertex ID " + vertex.getId() + " voted to halt.");
165 }
166
167
168
169 if (!isInitiator(vertex) &&
170 !senderId.equals(BrachaTouegDeadlockVertexValue.INVALID_ID)) {
171 sendMessage(vertex.getId().get(), senderId,
172 BrachaTouegDeadlockMessage.DONE);
173 value.setIdWithInHoldDone(BrachaTouegDeadlockVertexValue.INVALID_ID);
174 }
175
176 vertex.voteToHalt();
177 }
178 }
179 }
180
181
182
183
184
185
186
187 private boolean isInitiator(Vertex<LongWritable, ?, ?> vertex) {
188 return vertex.getId().get() == BRACHA_TOUEG_DL_INITIATOR_ID.get(getConf());
189 }
190
191
192
193
194
195
196
197 private void initAlgorithm(Vertex<LongWritable,
198 BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
199
200 BrachaTouegDeadlockVertexValue value;
201 HashMap<Long, ArrayList<Long>> requests =
202 new HashMap<Long, ArrayList<Long>>();
203 long vertexId = vertex.getId().get();
204
205
206 for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
207 ArrayList<Long> targets;
208 Long tag = Long.valueOf(edge.getValue().get());
209 Long target = Long.valueOf(edge.getTargetVertexId().get());
210
211 if (requests.containsKey(tag)) {
212 targets = requests.get(tag);
213 } else {
214 targets = new ArrayList<Long>();
215 }
216
217 targets.add(target);
218 requests.put(tag, targets);
219 }
220
221
222
223 value = new BrachaTouegDeadlockVertexValue(requests);
224 vertex.setValue(value);
225
226
227 for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
228 sendMessage(vertexId, edge.getTargetVertexId().get(),
229 BrachaTouegDeadlockMessage.CTRL_IN_EDGE);
230 }
231 }
232
233
234
235
236
237
238
239
240 private void sendAckMessage(long receiver, Vertex<LongWritable,
241 BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
242
243 this.sendMessage(Long.valueOf(vertex.getId().get()),
244 receiver, BrachaTouegDeadlockMessage.ACK);
245
246 if (!vertex.getValue().isNotified()) {
247 vertex.voteToHalt();
248 }
249 }
250
251
252
253
254
255
256
257
258 private void sendMessage(long sender, long receiver, long messageType) {
259 BrachaTouegDeadlockMessage message;
260
261 message = new BrachaTouegDeadlockMessage(sender, messageType);
262 sendMessage(new LongWritable(receiver), message);
263 if (LOG.isDebugEnabled()) {
264 LOG.debug("sent message " + message + " from " + sender +
265 " to " + receiver);
266 }
267 }
268
269
270
271
272
273
274
275 private void logParents(Vertex<LongWritable,
276 BrachaTouegDeadlockVertexValue,
277 LongWritable> vertex) {
278 ArrayList<Long> parents = vertex.getValue().getParents();
279 int sz = parents.size();
280 StringBuffer buffer = new StringBuffer();
281
282 buffer.append("Vertex " + vertex.getId() + " parents:");
283 for (int i = 0; i < sz; ++i) {
284 buffer.append(" - " + parents.get(i));
285 }
286 LOG.debug(buffer.toString());
287 }
288
289
290
291
292
293
294
295
296
297
298
299 private void notifyVertices(
300 Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
301
302 BrachaTouegDeadlockVertexValue value = vertex.getValue();
303 long vertexId = vertex.getId().get();
304 boolean hasOutEdges = false;
305
306 value.setNotified();
307
308 for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
309 hasOutEdges = true;
310 sendMessage(vertexId,
311 edge.getTargetVertexId().get(),
312 BrachaTouegDeadlockMessage.NOTIFY);
313
314
315 value.waitForMessage(Long.valueOf(edge.getTargetVertexId().get()),
316 Long.valueOf(BrachaTouegDeadlockMessage.DONE));
317 }
318
319
320
321 if (!hasOutEdges && isInitiator(vertex)) {
322 value.setFree();
323 } else if (!value.hasPendingRequests() && !value.isFree()) {
324 grantVertices(vertex);
325 }
326 }
327
328
329
330
331 private void grantVertices(
332 Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
333
334 BrachaTouegDeadlockVertexValue value = vertex.getValue();
335 ArrayList<Long> parents = value.getParents();
336 long vertexId = vertex.getId().get();
337
338 value.setFree();
339
340
341 for (Long parent : parents) {
342 sendMessage(vertexId, parent,
343 BrachaTouegDeadlockMessage.GRANT);
344
345
346 value.waitForMessage(parent,
347 Long.valueOf(BrachaTouegDeadlockMessage.ACK));
348 }
349 }
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366 private void handleNotifyMessage(
367 Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
368 BrachaTouegDeadlockMessage message) {
369
370 BrachaTouegDeadlockVertexValue value = vertex.getValue();
371
372 if (!value.isNotified()) {
373 notifyVertices(vertex);
374 value.setIdWithInHoldDone(message.getSenderId());
375 } else {
376 sendMessage(vertex.getId().get(), message.getSenderId(),
377 BrachaTouegDeadlockMessage.DONE);
378 }
379 }
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398 private void handleGrantMessage(
399 Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
400 BrachaTouegDeadlockMessage message) {
401
402 BrachaTouegDeadlockVertexValue value = vertex.getValue();
403 Long senderId = Long.valueOf(message.getSenderId());
404 LongWritable wId = new LongWritable(senderId);
405 LongWritable tag = vertex.getEdgeValue(wId);
406
407 value.removeRequest(tag, wId);
408
409 if (value.isFree() || value.getNumOfRequests(tag) > 0) {
410 sendAckMessage(senderId, vertex);
411 return;
412 } else {
413 grantVertices(vertex);
414 value.setIdWithInHoldAck(senderId);
415 }
416 }
417 }