1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.comm.flow_control; |
20 | |
|
21 | |
import static com.google.common.base.Preconditions.checkState; |
22 | |
import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS; |
23 | |
import com.google.common.collect.Lists; |
24 | |
import com.google.common.collect.Maps; |
25 | |
import com.google.common.collect.Sets; |
26 | |
import org.apache.commons.lang3.tuple.ImmutablePair; |
27 | |
import org.apache.commons.lang3.tuple.MutablePair; |
28 | |
import org.apache.commons.lang3.tuple.Pair; |
29 | |
import org.apache.giraph.comm.netty.NettyClient; |
30 | |
import org.apache.giraph.comm.netty.handler.AckSignalFlag; |
31 | |
import org.apache.giraph.comm.requests.SendResumeRequest; |
32 | |
import org.apache.giraph.comm.requests.WritableRequest; |
33 | |
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
34 | |
import org.apache.giraph.conf.IntConfOption; |
35 | |
import org.apache.giraph.utils.AdjustableSemaphore; |
36 | |
import org.apache.giraph.utils.ThreadUtils; |
37 | |
import org.apache.log4j.Logger; |
38 | |
|
39 | |
import java.util.ArrayDeque; |
40 | |
import java.util.ArrayList; |
41 | |
import java.util.Collections; |
42 | |
import java.util.Comparator; |
43 | |
import java.util.Deque; |
44 | |
import java.util.Map; |
45 | |
import java.util.Set; |
46 | |
import java.util.concurrent.ArrayBlockingQueue; |
47 | |
import java.util.concurrent.BlockingQueue; |
48 | |
import java.util.concurrent.ConcurrentMap; |
49 | |
import java.util.concurrent.Semaphore; |
50 | |
import java.util.concurrent.TimeUnit; |
51 | |
import java.util.concurrent.atomic.AtomicInteger; |
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | |
|
58 | |
|
59 | |
|
60 | |
|
61 | |
|
62 | |
|
63 | |
|
64 | |
|
65 | |
|
66 | |
|
67 | |
|
68 | |
|
69 | |
|
70 | |
|
71 | |
|
72 | |
|
73 | |
|
74 | |
|
75 | |
|
76 | |
|
77 | 0 | public class CreditBasedFlowControl implements FlowControl { |
78 | |
|
79 | |
|
80 | |
|
81 | |
|
82 | 0 | public static final IntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER = |
83 | |
new IntConfOption("giraph.maxOpenRequestsPerWorker", 20, |
84 | |
"Maximum number of requests without confirmation we can have per " + |
85 | |
"worker"); |
86 | |
|
87 | 0 | public static final IntConfOption MAX_NUM_OF_UNSENT_REQUESTS = |
88 | |
new IntConfOption("giraph.maxNumberOfUnsentRequests", 2000, |
89 | |
"Maximum number of unsent requests we can keep in memory"); |
90 | |
|
91 | |
|
92 | |
|
93 | 0 | public static final IntConfOption UNSENT_CACHE_WAIT_INTERVAL = |
94 | |
new IntConfOption("giraph.unsentCacheWaitInterval", 1000, |
95 | |
"Time interval to wait on unsent requests cache (in milliseconds)"); |
96 | |
|
97 | 0 | private static final Logger LOG = |
98 | 0 | Logger.getLogger(CreditBasedFlowControl.class); |
99 | |
|
100 | |
|
101 | |
private final int unsentWaitMsecs; |
102 | |
|
103 | |
private final int waitingRequestMsecs; |
104 | |
|
105 | |
|
106 | |
|
107 | |
|
108 | |
private volatile short maxOpenRequestsPerWorker; |
109 | |
|
110 | 0 | private final AtomicInteger aggregateUnsentRequests = new AtomicInteger(0); |
111 | |
|
112 | |
|
113 | |
|
114 | |
|
115 | |
|
116 | |
|
117 | |
|
118 | |
|
119 | |
|
120 | |
|
121 | |
|
122 | |
|
123 | |
|
124 | |
|
125 | |
|
126 | |
|
127 | |
|
128 | |
|
129 | |
|
130 | |
|
131 | |
|
132 | |
|
133 | |
|
134 | |
|
135 | |
|
136 | |
|
137 | |
|
138 | |
|
139 | |
|
140 | |
|
141 | |
|
142 | |
|
143 | |
|
144 | |
|
145 | |
|
146 | |
|
147 | |
|
148 | |
|
149 | |
|
150 | |
|
151 | 0 | private final ConcurrentMap<Integer, Pair<AdjustableSemaphore, Integer>> |
152 | 0 | perWorkerOpenRequestMap = Maps.newConcurrentMap(); |
153 | |
|
154 | 0 | private final ConcurrentMap<Integer, Deque<WritableRequest>> |
155 | 0 | perWorkerUnsentRequestMap = Maps.newConcurrentMap(); |
156 | |
|
157 | |
|
158 | |
|
159 | |
|
160 | 0 | private final Set<Integer> workersToResume = Sets.newHashSet(); |
161 | |
|
162 | |
|
163 | |
|
164 | |
|
165 | |
|
166 | |
|
167 | |
|
168 | |
|
169 | |
|
170 | 0 | private final ConcurrentMap<Integer, Set<Long>> resumeRequestsId = |
171 | 0 | Maps.newConcurrentMap(); |
172 | |
|
173 | |
|
174 | |
|
175 | |
|
176 | |
|
177 | |
|
178 | |
|
179 | |
private final BlockingQueue<Pair<Integer, WritableRequest>> toBeSent; |
180 | |
|
181 | |
|
182 | |
|
183 | |
|
184 | |
private final Semaphore unsentRequestPermit; |
185 | |
|
186 | |
private final NettyClient nettyClient; |
187 | |
|
188 | |
|
189 | |
|
190 | |
|
191 | |
|
192 | |
|
193 | |
|
194 | |
public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf, |
195 | |
final NettyClient nettyClient, |
196 | |
Thread.UncaughtExceptionHandler |
197 | 0 | exceptionHandler) { |
198 | 0 | this.nettyClient = nettyClient; |
199 | 0 | maxOpenRequestsPerWorker = |
200 | 0 | (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf); |
201 | 0 | checkState(maxOpenRequestsPerWorker < 0x4000 && |
202 | |
maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " + |
203 | |
"requests should be in range (0, " + 0x4FFF + ")"); |
204 | 0 | int maxUnsentRequests = MAX_NUM_OF_UNSENT_REQUESTS.get(conf); |
205 | 0 | unsentRequestPermit = new Semaphore(maxUnsentRequests); |
206 | 0 | this.toBeSent = new ArrayBlockingQueue<Pair<Integer, WritableRequest>>( |
207 | |
maxUnsentRequests); |
208 | 0 | unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf); |
209 | 0 | waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf); |
210 | |
|
211 | |
|
212 | 0 | ThreadUtils.startThread(new Runnable() { |
213 | |
@Override |
214 | |
public void run() { |
215 | |
while (true) { |
216 | 0 | synchronized (workersToResume) { |
217 | 0 | for (Integer workerId : workersToResume) { |
218 | 0 | if (maxOpenRequestsPerWorker != 0) { |
219 | 0 | sendResumeSignal(workerId); |
220 | |
} else { |
221 | |
break; |
222 | |
} |
223 | 0 | } |
224 | |
try { |
225 | 0 | workersToResume.wait(); |
226 | 0 | } catch (InterruptedException e) { |
227 | 0 | throw new IllegalStateException("run: caught exception " + |
228 | |
"while waiting for resume-sender thread to be notified!", |
229 | |
e); |
230 | 0 | } |
231 | 0 | } |
232 | |
} |
233 | |
} |
234 | |
}, "resume-sender", exceptionHandler); |
235 | |
|
236 | |
|
237 | 0 | ThreadUtils.startThread(new Runnable() { |
238 | |
@Override |
239 | |
public void run() { |
240 | |
while (true) { |
241 | 0 | Pair<Integer, WritableRequest> pair = null; |
242 | |
try { |
243 | 0 | pair = toBeSent.take(); |
244 | 0 | } catch (InterruptedException e) { |
245 | 0 | throw new IllegalStateException("run: failed while waiting to " + |
246 | |
"take an element from the request queue!", e); |
247 | 0 | } |
248 | 0 | int taskId = pair.getLeft(); |
249 | 0 | WritableRequest request = pair.getRight(); |
250 | 0 | nettyClient.doSend(taskId, request); |
251 | 0 | if (aggregateUnsentRequests.decrementAndGet() == 0) { |
252 | 0 | synchronized (aggregateUnsentRequests) { |
253 | 0 | aggregateUnsentRequests.notifyAll(); |
254 | 0 | } |
255 | |
} |
256 | 0 | } |
257 | |
} |
258 | |
}, "cached-req-sender", exceptionHandler); |
259 | 0 | } |
260 | |
|
261 | |
|
262 | |
|
263 | |
|
264 | |
|
265 | |
|
266 | |
private void sendResumeSignal(int workerId) { |
267 | 0 | if (maxOpenRequestsPerWorker == 0) { |
268 | 0 | LOG.warn("sendResumeSignal: method called while the max open requests " + |
269 | |
"for worker " + workerId + " is still 0"); |
270 | 0 | return; |
271 | |
} |
272 | 0 | WritableRequest request = new SendResumeRequest(maxOpenRequestsPerWorker); |
273 | 0 | Long resumeId = nettyClient.doSend(workerId, request); |
274 | 0 | checkState(resumeId != null); |
275 | 0 | if (LOG.isDebugEnabled()) { |
276 | 0 | LOG.debug("sendResumeSignal: sending signal to worker " + workerId + |
277 | |
" with credit=" + maxOpenRequestsPerWorker + ", ID=" + |
278 | 0 | (resumeId & 0xFFFF)); |
279 | |
} |
280 | 0 | resumeRequestsId.get(workerId).add(resumeId); |
281 | 0 | } |
282 | |
|
283 | |
@Override |
284 | |
public void sendRequest(int destTaskId, WritableRequest request) { |
285 | 0 | Pair<AdjustableSemaphore, Integer> pair = |
286 | 0 | perWorkerOpenRequestMap.get(destTaskId); |
287 | |
|
288 | |
|
289 | 0 | if (pair == null) { |
290 | 0 | pair = new MutablePair<>( |
291 | 0 | new AdjustableSemaphore(maxOpenRequestsPerWorker), -1); |
292 | 0 | Pair<AdjustableSemaphore, Integer> temp = |
293 | 0 | perWorkerOpenRequestMap.putIfAbsent(destTaskId, pair); |
294 | 0 | perWorkerUnsentRequestMap.putIfAbsent( |
295 | 0 | destTaskId, new ArrayDeque<WritableRequest>()); |
296 | 0 | resumeRequestsId.putIfAbsent( |
297 | 0 | destTaskId, Sets.<Long>newConcurrentHashSet()); |
298 | 0 | if (temp != null) { |
299 | 0 | pair = temp; |
300 | |
} |
301 | |
} |
302 | 0 | AdjustableSemaphore openRequestPermit = pair.getLeft(); |
303 | |
|
304 | |
|
305 | 0 | boolean shouldSend = openRequestPermit.tryAcquire(); |
306 | 0 | boolean shouldCache = false; |
307 | 0 | while (!shouldSend) { |
308 | |
|
309 | |
|
310 | |
|
311 | |
|
312 | |
|
313 | |
|
314 | |
|
315 | |
try { |
316 | 0 | shouldCache = unsentRequestPermit.tryAcquire(unsentWaitMsecs, |
317 | |
TimeUnit.MILLISECONDS); |
318 | 0 | } catch (InterruptedException e) { |
319 | 0 | throw new IllegalStateException("shouldSend: failed " + |
320 | |
"while waiting on the unsent request cache to have some more " + |
321 | |
"room for extra unsent requests!"); |
322 | 0 | } |
323 | 0 | if (shouldCache) { |
324 | 0 | break; |
325 | |
} |
326 | |
|
327 | |
|
328 | 0 | shouldSend = openRequestPermit.tryAcquire(); |
329 | 0 | if (shouldSend) { |
330 | 0 | break; |
331 | |
} |
332 | |
|
333 | |
|
334 | |
|
335 | |
|
336 | |
|
337 | 0 | nettyClient.logAndSanityCheck(); |
338 | |
} |
339 | |
|
340 | 0 | if (shouldCache) { |
341 | 0 | Deque<WritableRequest> unsentRequests = |
342 | 0 | perWorkerUnsentRequestMap.get(destTaskId); |
343 | |
|
344 | |
|
345 | |
|
346 | |
|
347 | |
|
348 | |
|
349 | |
|
350 | |
|
351 | |
|
352 | |
|
353 | |
|
354 | |
|
355 | 0 | synchronized (unsentRequests) { |
356 | 0 | shouldSend = openRequestPermit.tryAcquire(); |
357 | 0 | if (!shouldSend) { |
358 | 0 | aggregateUnsentRequests.getAndIncrement(); |
359 | 0 | unsentRequests.add(request); |
360 | 0 | return; |
361 | |
} |
362 | 0 | } |
363 | |
|
364 | |
|
365 | 0 | unsentRequestPermit.release(); |
366 | |
} |
367 | 0 | nettyClient.doSend(destTaskId, request); |
368 | 0 | } |
369 | |
|
370 | |
|
371 | |
|
372 | |
|
373 | |
|
374 | |
|
375 | |
|
376 | |
private boolean shouldIgnoreCredit(int response) { |
377 | 0 | return ((short) ((response >> (14 + 16)) & 1)) == 1; |
378 | |
} |
379 | |
|
380 | |
|
381 | |
|
382 | |
|
383 | |
|
384 | |
|
385 | |
|
386 | |
private short getCredit(int response) { |
387 | 0 | return (short) ((response >> 16) & 0x3FFF); |
388 | |
} |
389 | |
|
390 | |
|
391 | |
|
392 | |
|
393 | |
|
394 | |
|
395 | |
|
396 | |
private int getTimestamp(int response) { |
397 | 0 | return response & 0xFFFF; |
398 | |
} |
399 | |
|
400 | |
|
401 | |
|
402 | |
|
403 | |
|
404 | |
|
405 | |
|
406 | |
@Override |
407 | |
public AckSignalFlag getAckSignalFlag(int response) { |
408 | 0 | return AckSignalFlag.values()[(response >> (16 + 14 + 1)) & 1]; |
409 | |
} |
410 | |
|
411 | |
@Override |
412 | |
public int calculateResponse(AckSignalFlag flag, int taskId) { |
413 | 0 | boolean ignoreCredit = nettyClient.masterInvolved(taskId); |
414 | 0 | if (!ignoreCredit && maxOpenRequestsPerWorker == 0) { |
415 | 0 | synchronized (workersToResume) { |
416 | 0 | workersToResume.add(taskId); |
417 | 0 | } |
418 | |
} |
419 | 0 | int timestamp = (int) (nettyClient.getNextRequestId(taskId) & 0xFFFF); |
420 | 0 | return (flag.ordinal() << (16 + 14 + 1)) | |
421 | |
((ignoreCredit ? 1 : 0) << (16 + 14)) | |
422 | |
(maxOpenRequestsPerWorker << 16) | |
423 | |
timestamp; |
424 | |
} |
425 | |
|
426 | |
@Override |
427 | |
public void logInfo() { |
428 | 0 | if (LOG.isInfoEnabled()) { |
429 | |
|
430 | 0 | Map<Integer, Integer> unsentRequestCounts = Maps.newHashMap(); |
431 | |
for (Map.Entry<Integer, Deque<WritableRequest>> entry : |
432 | 0 | perWorkerUnsentRequestMap.entrySet()) { |
433 | 0 | unsentRequestCounts.put(entry.getKey(), entry.getValue().size()); |
434 | 0 | } |
435 | 0 | ArrayList<Map.Entry<Integer, Integer>> sorted = |
436 | 0 | Lists.newArrayList(unsentRequestCounts.entrySet()); |
437 | 0 | Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() { |
438 | |
@Override |
439 | |
public int compare(Map.Entry<Integer, Integer> entry1, |
440 | |
Map.Entry<Integer, Integer> entry2) { |
441 | 0 | int value1 = entry1.getValue(); |
442 | 0 | int value2 = entry2.getValue(); |
443 | 0 | return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1); |
444 | |
} |
445 | |
}); |
446 | 0 | StringBuilder message = new StringBuilder(); |
447 | 0 | message.append("logInfo: ").append(aggregateUnsentRequests.get()) |
448 | 0 | .append(" unsent requests in total. "); |
449 | 0 | int itemsToPrint = Math.min(10, sorted.size()); |
450 | 0 | for (int i = 0; i < itemsToPrint; ++i) { |
451 | 0 | message.append(sorted.get(i).getValue()) |
452 | 0 | .append(" unsent requests for taskId=") |
453 | 0 | .append(sorted.get(i).getKey()).append(" (credit=") |
454 | 0 | .append(perWorkerOpenRequestMap.get(sorted.get(i).getKey()) |
455 | 0 | .getKey().getMaxPermits()) |
456 | 0 | .append("), "); |
457 | |
} |
458 | 0 | LOG.info(message); |
459 | |
} |
460 | 0 | } |
461 | |
|
462 | |
@Override |
463 | |
public void waitAllRequests() { |
464 | |
while (true) { |
465 | 0 | synchronized (aggregateUnsentRequests) { |
466 | 0 | if (aggregateUnsentRequests.get() == 0) { |
467 | 0 | break; |
468 | |
} |
469 | |
try { |
470 | 0 | aggregateUnsentRequests.wait(waitingRequestMsecs); |
471 | 0 | } catch (InterruptedException e) { |
472 | 0 | throw new IllegalStateException("waitAllRequests: failed while " + |
473 | |
"waiting on open/cached requests"); |
474 | 0 | } |
475 | 0 | } |
476 | 0 | if (aggregateUnsentRequests.get() == 0) { |
477 | 0 | break; |
478 | |
} |
479 | 0 | nettyClient.logAndSanityCheck(); |
480 | |
} |
481 | 0 | } |
482 | |
|
483 | |
@Override |
484 | |
public int getNumberOfUnsentRequests() { |
485 | 0 | return aggregateUnsentRequests.get(); |
486 | |
} |
487 | |
|
488 | |
@Override |
489 | |
public void messageAckReceived(int taskId, long requestId, int response) { |
490 | 0 | boolean ignoreCredit = shouldIgnoreCredit(response); |
491 | 0 | short credit = getCredit(response); |
492 | 0 | int timestamp = getTimestamp(response); |
493 | 0 | MutablePair<AdjustableSemaphore, Integer> pair = |
494 | |
(MutablePair<AdjustableSemaphore, Integer>) |
495 | 0 | perWorkerOpenRequestMap.get(taskId); |
496 | 0 | AdjustableSemaphore openRequestPermit = pair.getLeft(); |
497 | |
|
498 | |
|
499 | |
|
500 | 0 | if (!resumeRequestsId.get(taskId).remove(requestId)) { |
501 | 0 | openRequestPermit.release(); |
502 | 0 | } else if (LOG.isDebugEnabled()) { |
503 | 0 | LOG.debug("messageAckReceived: ACK of resume received from " + taskId + |
504 | |
" timestamp=" + timestamp); |
505 | |
} |
506 | 0 | if (!ignoreCredit) { |
507 | 0 | synchronized (pair) { |
508 | 0 | if (compareTimestamps(timestamp, pair.getRight()) > 0) { |
509 | 0 | pair.setRight(timestamp); |
510 | 0 | openRequestPermit.setMaxPermits(credit); |
511 | 0 | } else if (LOG.isDebugEnabled()) { |
512 | 0 | LOG.debug("messageAckReceived: received out-of-order messages." + |
513 | |
"Received timestamp=" + timestamp + " and current timestamp=" + |
514 | 0 | pair.getRight()); |
515 | |
} |
516 | 0 | } |
517 | |
} |
518 | |
|
519 | |
|
520 | |
|
521 | |
|
522 | 0 | trySendCachedRequests(taskId); |
523 | 0 | } |
524 | |
|
525 | |
|
526 | |
|
527 | |
|
528 | |
|
529 | |
|
530 | |
private void trySendCachedRequests(int taskId) { |
531 | 0 | Deque<WritableRequest> requestDeque = |
532 | 0 | perWorkerUnsentRequestMap.get(taskId); |
533 | 0 | AdjustableSemaphore openRequestPermit = |
534 | 0 | perWorkerOpenRequestMap.get(taskId).getLeft(); |
535 | |
while (true) { |
536 | |
WritableRequest request; |
537 | 0 | synchronized (requestDeque) { |
538 | 0 | request = requestDeque.pollFirst(); |
539 | 0 | if (request == null) { |
540 | 0 | break; |
541 | |
} |
542 | |
|
543 | 0 | if (!openRequestPermit.tryAcquire()) { |
544 | 0 | requestDeque.offerFirst(request); |
545 | 0 | break; |
546 | |
} |
547 | 0 | } |
548 | 0 | unsentRequestPermit.release(); |
549 | |
|
550 | |
|
551 | |
|
552 | |
try { |
553 | 0 | toBeSent.put( |
554 | 0 | new ImmutablePair<Integer, WritableRequest>(taskId, request)); |
555 | 0 | } catch (InterruptedException e) { |
556 | 0 | throw new IllegalStateException("trySendCachedRequests: failed while" + |
557 | |
"waiting to put element in send queue!", e); |
558 | 0 | } |
559 | 0 | } |
560 | 0 | } |
561 | |
|
562 | |
|
563 | |
|
564 | |
|
565 | |
|
566 | |
|
567 | |
public void updateCredit(short newCredit) { |
568 | 0 | newCredit = (short) Math.max(0, Math.min(0x3FFF, newCredit)); |
569 | |
|
570 | 0 | if (maxOpenRequestsPerWorker == 0 && newCredit != 0) { |
571 | 0 | maxOpenRequestsPerWorker = newCredit; |
572 | 0 | synchronized (workersToResume) { |
573 | 0 | workersToResume.notifyAll(); |
574 | 0 | } |
575 | |
} else { |
576 | 0 | maxOpenRequestsPerWorker = newCredit; |
577 | |
} |
578 | 0 | } |
579 | |
|
580 | |
|
581 | |
|
582 | |
|
583 | |
|
584 | |
|
585 | |
|
586 | |
|
587 | |
|
588 | |
|
589 | |
|
590 | |
|
591 | |
private int compareTimestamps(int ts1, int ts2) { |
592 | 0 | int diff = ts1 - ts2; |
593 | 0 | if (Math.abs(diff) < 0x7FFF) { |
594 | 0 | return diff; |
595 | |
} else { |
596 | 0 | return -diff; |
597 | |
} |
598 | |
} |
599 | |
|
600 | |
|
601 | |
|
602 | |
|
603 | |
|
604 | |
|
605 | |
|
606 | |
|
607 | |
public void processResumeSignal(int clientId, short credit, long requestId) { |
608 | 0 | int timestamp = (int) (requestId & 0xFFFF); |
609 | 0 | if (LOG.isDebugEnabled()) { |
610 | 0 | LOG.debug("processResumeSignal: resume signal from " + clientId + |
611 | |
" with timestamp=" + timestamp); |
612 | |
} |
613 | 0 | MutablePair<AdjustableSemaphore, Integer> pair = |
614 | |
(MutablePair<AdjustableSemaphore, Integer>) |
615 | 0 | perWorkerOpenRequestMap.get(clientId); |
616 | 0 | synchronized (pair) { |
617 | 0 | if (compareTimestamps(timestamp, pair.getRight()) > 0) { |
618 | 0 | pair.setRight(timestamp); |
619 | 0 | pair.getLeft().setMaxPermits(credit); |
620 | 0 | } else if (LOG.isDebugEnabled()) { |
621 | 0 | LOG.debug("processResumeSignal: received out-of-order messages. " + |
622 | |
"Received timestamp=" + timestamp + " and current timestamp=" + |
623 | 0 | pair.getRight()); |
624 | |
} |
625 | 0 | } |
626 | 0 | trySendCachedRequests(clientId); |
627 | 0 | } |
628 | |
} |