1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.flow_control;
20
21 import static com.google.common.base.Preconditions.checkState;
22 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
23 import com.google.common.collect.Lists;
24 import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets;
26 import org.apache.commons.lang3.tuple.ImmutablePair;
27 import org.apache.commons.lang3.tuple.MutablePair;
28 import org.apache.commons.lang3.tuple.Pair;
29 import org.apache.giraph.comm.netty.NettyClient;
30 import org.apache.giraph.comm.netty.handler.AckSignalFlag;
31 import org.apache.giraph.comm.requests.SendResumeRequest;
32 import org.apache.giraph.comm.requests.WritableRequest;
33 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34 import org.apache.giraph.conf.IntConfOption;
35 import org.apache.giraph.utils.AdjustableSemaphore;
36 import org.apache.giraph.utils.ThreadUtils;
37 import org.apache.log4j.Logger;
38
39 import java.util.ArrayDeque;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.Comparator;
43 import java.util.Deque;
44 import java.util.Map;
45 import java.util.Set;
46 import java.util.concurrent.ArrayBlockingQueue;
47 import java.util.concurrent.BlockingQueue;
48 import java.util.concurrent.ConcurrentMap;
49 import java.util.concurrent.Semaphore;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.atomic.AtomicInteger;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 public class CreditBasedFlowControl implements FlowControl {
78
79
80
81
82 public static final IntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER =
83 new IntConfOption("giraph.maxOpenRequestsPerWorker", 20,
84 "Maximum number of requests without confirmation we can have per " +
85 "worker");
86
87 public static final IntConfOption MAX_NUM_OF_UNSENT_REQUESTS =
88 new IntConfOption("giraph.maxNumberOfUnsentRequests", 2000,
89 "Maximum number of unsent requests we can keep in memory");
90
91
92
93 public static final IntConfOption UNSENT_CACHE_WAIT_INTERVAL =
94 new IntConfOption("giraph.unsentCacheWaitInterval", 1000,
95 "Time interval to wait on unsent requests cache (in milliseconds)");
96
97 private static final Logger LOG =
98 Logger.getLogger(CreditBasedFlowControl.class);
99
100
101 private final int unsentWaitMsecs;
102
103 private final int waitingRequestMsecs;
104
105
106
107
108 private volatile short maxOpenRequestsPerWorker;
109
110 private final AtomicInteger aggregateUnsentRequests = new AtomicInteger(0);
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 private final ConcurrentMap<Integer, Pair<AdjustableSemaphore, Integer>>
152 perWorkerOpenRequestMap = Maps.newConcurrentMap();
153
154 private final ConcurrentMap<Integer, Deque<WritableRequest>>
155 perWorkerUnsentRequestMap = Maps.newConcurrentMap();
156
157
158
159
160 private final Set<Integer> workersToResume = Sets.newHashSet();
161
162
163
164
165
166
167
168
169
170 private final ConcurrentMap<Integer, Set<Long>> resumeRequestsId =
171 Maps.newConcurrentMap();
172
173
174
175
176
177
178
179 private final BlockingQueue<Pair<Integer, WritableRequest>> toBeSent;
180
181
182
183
184 private final Semaphore unsentRequestPermit;
185
186 private final NettyClient nettyClient;
187
188
189
190
191
192
193
194 public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
195 final NettyClient nettyClient,
196 Thread.UncaughtExceptionHandler
197 exceptionHandler) {
198 this.nettyClient = nettyClient;
199 maxOpenRequestsPerWorker =
200 (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
201 checkState(maxOpenRequestsPerWorker < 0x4000 &&
202 maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
203 "requests should be in range (0, " + 0x4FFF + ")");
204 int maxUnsentRequests = MAX_NUM_OF_UNSENT_REQUESTS.get(conf);
205 unsentRequestPermit = new Semaphore(maxUnsentRequests);
206 this.toBeSent = new ArrayBlockingQueue<Pair<Integer, WritableRequest>>(
207 maxUnsentRequests);
208 unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
209 waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
210
211
212 ThreadUtils.startThread(new Runnable() {
213 @Override
214 public void run() {
215 while (true) {
216 synchronized (workersToResume) {
217 for (Integer workerId : workersToResume) {
218 if (maxOpenRequestsPerWorker != 0) {
219 sendResumeSignal(workerId);
220 } else {
221 break;
222 }
223 }
224 try {
225 workersToResume.wait();
226 } catch (InterruptedException e) {
227 throw new IllegalStateException("run: caught exception " +
228 "while waiting for resume-sender thread to be notified!",
229 e);
230 }
231 }
232 }
233 }
234 }, "resume-sender", exceptionHandler);
235
236
237 ThreadUtils.startThread(new Runnable() {
238 @Override
239 public void run() {
240 while (true) {
241 Pair<Integer, WritableRequest> pair = null;
242 try {
243 pair = toBeSent.take();
244 } catch (InterruptedException e) {
245 throw new IllegalStateException("run: failed while waiting to " +
246 "take an element from the request queue!", e);
247 }
248 int taskId = pair.getLeft();
249 WritableRequest request = pair.getRight();
250 nettyClient.doSend(taskId, request);
251 if (aggregateUnsentRequests.decrementAndGet() == 0) {
252 synchronized (aggregateUnsentRequests) {
253 aggregateUnsentRequests.notifyAll();
254 }
255 }
256 }
257 }
258 }, "cached-req-sender", exceptionHandler);
259 }
260
261
262
263
264
265
266 private void sendResumeSignal(int workerId) {
267 if (maxOpenRequestsPerWorker == 0) {
268 LOG.warn("sendResumeSignal: method called while the max open requests " +
269 "for worker " + workerId + " is still 0");
270 return;
271 }
272 WritableRequest request = new SendResumeRequest(maxOpenRequestsPerWorker);
273 Long resumeId = nettyClient.doSend(workerId, request);
274 checkState(resumeId != null);
275 if (LOG.isDebugEnabled()) {
276 LOG.debug("sendResumeSignal: sending signal to worker " + workerId +
277 " with credit=" + maxOpenRequestsPerWorker + ", ID=" +
278 (resumeId & 0xFFFF));
279 }
280 resumeRequestsId.get(workerId).add(resumeId);
281 }
282
283 @Override
284 public void sendRequest(int destTaskId, WritableRequest request) {
285 Pair<AdjustableSemaphore, Integer> pair =
286 perWorkerOpenRequestMap.get(destTaskId);
287
288
289 if (pair == null) {
290 pair = new MutablePair<>(
291 new AdjustableSemaphore(maxOpenRequestsPerWorker), -1);
292 Pair<AdjustableSemaphore, Integer> temp =
293 perWorkerOpenRequestMap.putIfAbsent(destTaskId, pair);
294 perWorkerUnsentRequestMap.putIfAbsent(
295 destTaskId, new ArrayDeque<WritableRequest>());
296 resumeRequestsId.putIfAbsent(
297 destTaskId, Sets.<Long>newConcurrentHashSet());
298 if (temp != null) {
299 pair = temp;
300 }
301 }
302 AdjustableSemaphore openRequestPermit = pair.getLeft();
303
304
305 boolean shouldSend = openRequestPermit.tryAcquire();
306 boolean shouldCache = false;
307 while (!shouldSend) {
308
309
310
311
312
313
314
315 try {
316 shouldCache = unsentRequestPermit.tryAcquire(unsentWaitMsecs,
317 TimeUnit.MILLISECONDS);
318 } catch (InterruptedException e) {
319 throw new IllegalStateException("shouldSend: failed " +
320 "while waiting on the unsent request cache to have some more " +
321 "room for extra unsent requests!");
322 }
323 if (shouldCache) {
324 break;
325 }
326
327
328 shouldSend = openRequestPermit.tryAcquire();
329 if (shouldSend) {
330 break;
331 }
332
333
334
335
336
337 nettyClient.logAndSanityCheck();
338 }
339
340 if (shouldCache) {
341 Deque<WritableRequest> unsentRequests =
342 perWorkerUnsentRequestMap.get(destTaskId);
343
344
345
346
347
348
349
350
351
352
353
354
355 synchronized (unsentRequests) {
356 shouldSend = openRequestPermit.tryAcquire();
357 if (!shouldSend) {
358 aggregateUnsentRequests.getAndIncrement();
359 unsentRequests.add(request);
360 return;
361 }
362 }
363
364
365 unsentRequestPermit.release();
366 }
367 nettyClient.doSend(destTaskId, request);
368 }
369
370
371
372
373
374
375
376 private boolean shouldIgnoreCredit(int response) {
377 return ((short) ((response >> (14 + 16)) & 1)) == 1;
378 }
379
380
381
382
383
384
385
386 private short getCredit(int response) {
387 return (short) ((response >> 16) & 0x3FFF);
388 }
389
390
391
392
393
394
395
396 private int getTimestamp(int response) {
397 return response & 0xFFFF;
398 }
399
400
401
402
403
404
405
406 @Override
407 public AckSignalFlag getAckSignalFlag(int response) {
408 return AckSignalFlag.values()[(response >> (16 + 14 + 1)) & 1];
409 }
410
411 @Override
412 public int calculateResponse(AckSignalFlag flag, int taskId) {
413 boolean ignoreCredit = nettyClient.masterInvolved(taskId);
414 if (!ignoreCredit && maxOpenRequestsPerWorker == 0) {
415 synchronized (workersToResume) {
416 workersToResume.add(taskId);
417 }
418 }
419 int timestamp = (int) (nettyClient.getNextRequestId(taskId) & 0xFFFF);
420 return (flag.ordinal() << (16 + 14 + 1)) |
421 ((ignoreCredit ? 1 : 0) << (16 + 14)) |
422 (maxOpenRequestsPerWorker << 16) |
423 timestamp;
424 }
425
426 @Override
427 public void logInfo() {
428 if (LOG.isInfoEnabled()) {
429
430 Map<Integer, Integer> unsentRequestCounts = Maps.newHashMap();
431 for (Map.Entry<Integer, Deque<WritableRequest>> entry :
432 perWorkerUnsentRequestMap.entrySet()) {
433 unsentRequestCounts.put(entry.getKey(), entry.getValue().size());
434 }
435 ArrayList<Map.Entry<Integer, Integer>> sorted =
436 Lists.newArrayList(unsentRequestCounts.entrySet());
437 Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
438 @Override
439 public int compare(Map.Entry<Integer, Integer> entry1,
440 Map.Entry<Integer, Integer> entry2) {
441 int value1 = entry1.getValue();
442 int value2 = entry2.getValue();
443 return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
444 }
445 });
446 StringBuilder message = new StringBuilder();
447 message.append("logInfo: ").append(aggregateUnsentRequests.get())
448 .append(" unsent requests in total. ");
449 int itemsToPrint = Math.min(10, sorted.size());
450 for (int i = 0; i < itemsToPrint; ++i) {
451 message.append(sorted.get(i).getValue())
452 .append(" unsent requests for taskId=")
453 .append(sorted.get(i).getKey()).append(" (credit=")
454 .append(perWorkerOpenRequestMap.get(sorted.get(i).getKey())
455 .getKey().getMaxPermits())
456 .append("), ");
457 }
458 LOG.info(message);
459 }
460 }
461
462 @Override
463 public void waitAllRequests() {
464 while (true) {
465 synchronized (aggregateUnsentRequests) {
466 if (aggregateUnsentRequests.get() == 0) {
467 break;
468 }
469 try {
470 aggregateUnsentRequests.wait(waitingRequestMsecs);
471 } catch (InterruptedException e) {
472 throw new IllegalStateException("waitAllRequests: failed while " +
473 "waiting on open/cached requests");
474 }
475 }
476 if (aggregateUnsentRequests.get() == 0) {
477 break;
478 }
479 nettyClient.logAndSanityCheck();
480 }
481 }
482
483 @Override
484 public int getNumberOfUnsentRequests() {
485 return aggregateUnsentRequests.get();
486 }
487
488 @Override
489 public void messageAckReceived(int taskId, long requestId, int response) {
490 boolean ignoreCredit = shouldIgnoreCredit(response);
491 short credit = getCredit(response);
492 int timestamp = getTimestamp(response);
493 MutablePair<AdjustableSemaphore, Integer> pair =
494 (MutablePair<AdjustableSemaphore, Integer>)
495 perWorkerOpenRequestMap.get(taskId);
496 AdjustableSemaphore openRequestPermit = pair.getLeft();
497
498
499
500 if (!resumeRequestsId.get(taskId).remove(requestId)) {
501 openRequestPermit.release();
502 } else if (LOG.isDebugEnabled()) {
503 LOG.debug("messageAckReceived: ACK of resume received from " + taskId +
504 " timestamp=" + timestamp);
505 }
506 if (!ignoreCredit) {
507 synchronized (pair) {
508 if (compareTimestamps(timestamp, pair.getRight()) > 0) {
509 pair.setRight(timestamp);
510 openRequestPermit.setMaxPermits(credit);
511 } else if (LOG.isDebugEnabled()) {
512 LOG.debug("messageAckReceived: received out-of-order messages." +
513 "Received timestamp=" + timestamp + " and current timestamp=" +
514 pair.getRight());
515 }
516 }
517 }
518
519
520
521
522 trySendCachedRequests(taskId);
523 }
524
525
526
527
528
529
530 private void trySendCachedRequests(int taskId) {
531 Deque<WritableRequest> requestDeque =
532 perWorkerUnsentRequestMap.get(taskId);
533 AdjustableSemaphore openRequestPermit =
534 perWorkerOpenRequestMap.get(taskId).getLeft();
535 while (true) {
536 WritableRequest request;
537 synchronized (requestDeque) {
538 request = requestDeque.pollFirst();
539 if (request == null) {
540 break;
541 }
542
543 if (!openRequestPermit.tryAcquire()) {
544 requestDeque.offerFirst(request);
545 break;
546 }
547 }
548 unsentRequestPermit.release();
549
550
551
552 try {
553 toBeSent.put(
554 new ImmutablePair<Integer, WritableRequest>(taskId, request));
555 } catch (InterruptedException e) {
556 throw new IllegalStateException("trySendCachedRequests: failed while" +
557 "waiting to put element in send queue!", e);
558 }
559 }
560 }
561
562
563
564
565
566
567 public void updateCredit(short newCredit) {
568 newCredit = (short) Math.max(0, Math.min(0x3FFF, newCredit));
569
570 if (maxOpenRequestsPerWorker == 0 && newCredit != 0) {
571 maxOpenRequestsPerWorker = newCredit;
572 synchronized (workersToResume) {
573 workersToResume.notifyAll();
574 }
575 } else {
576 maxOpenRequestsPerWorker = newCredit;
577 }
578 }
579
580
581
582
583
584
585
586
587
588
589
590
591 private int compareTimestamps(int ts1, int ts2) {
592 int diff = ts1 - ts2;
593 if (Math.abs(diff) < 0x7FFF) {
594 return diff;
595 } else {
596 return -diff;
597 }
598 }
599
600
601
602
603
604
605
606
607 public void processResumeSignal(int clientId, short credit, long requestId) {
608 int timestamp = (int) (requestId & 0xFFFF);
609 if (LOG.isDebugEnabled()) {
610 LOG.debug("processResumeSignal: resume signal from " + clientId +
611 " with timestamp=" + timestamp);
612 }
613 MutablePair<AdjustableSemaphore, Integer> pair =
614 (MutablePair<AdjustableSemaphore, Integer>)
615 perWorkerOpenRequestMap.get(clientId);
616 synchronized (pair) {
617 if (compareTimestamps(timestamp, pair.getRight()) > 0) {
618 pair.setRight(timestamp);
619 pair.getLeft().setMaxPermits(credit);
620 } else if (LOG.isDebugEnabled()) {
621 LOG.debug("processResumeSignal: received out-of-order messages. " +
622 "Received timestamp=" + timestamp + " and current timestamp=" +
623 pair.getRight());
624 }
625 }
626 trySendCachedRequests(clientId);
627 }
628 }