Coverage Report - org.apache.giraph.comm.flow_control.CreditBasedFlowControl
 
Classes in this File Line Coverage Branch Coverage Complexity
CreditBasedFlowControl
0%
0/191
0%
0/68
0
CreditBasedFlowControl$1
0%
0/11
0%
0/4
0
CreditBasedFlowControl$2
0%
0/14
0%
0/2
0
CreditBasedFlowControl$3
0%
0/4
0%
0/4
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.flow_control;
 20  
 
 21  
 import static com.google.common.base.Preconditions.checkState;
 22  
 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
 23  
 import com.google.common.collect.Lists;
 24  
 import com.google.common.collect.Maps;
 25  
 import com.google.common.collect.Sets;
 26  
 import org.apache.commons.lang3.tuple.ImmutablePair;
 27  
 import org.apache.commons.lang3.tuple.MutablePair;
 28  
 import org.apache.commons.lang3.tuple.Pair;
 29  
 import org.apache.giraph.comm.netty.NettyClient;
 30  
 import org.apache.giraph.comm.netty.handler.AckSignalFlag;
 31  
 import org.apache.giraph.comm.requests.SendResumeRequest;
 32  
 import org.apache.giraph.comm.requests.WritableRequest;
 33  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 34  
 import org.apache.giraph.conf.IntConfOption;
 35  
 import org.apache.giraph.utils.AdjustableSemaphore;
 36  
 import org.apache.giraph.utils.ThreadUtils;
 37  
 import org.apache.log4j.Logger;
 38  
 
 39  
 import java.util.ArrayDeque;
 40  
 import java.util.ArrayList;
 41  
 import java.util.Collections;
 42  
 import java.util.Comparator;
 43  
 import java.util.Deque;
 44  
 import java.util.Map;
 45  
 import java.util.Set;
 46  
 import java.util.concurrent.ArrayBlockingQueue;
 47  
 import java.util.concurrent.BlockingQueue;
 48  
 import java.util.concurrent.ConcurrentMap;
 49  
 import java.util.concurrent.Semaphore;
 50  
 import java.util.concurrent.TimeUnit;
 51  
 import java.util.concurrent.atomic.AtomicInteger;
 52  
 
 53  
 /**
 54  
  * Representation of credit-based flow control policy. With this policy there
 55  
  * can be limited number of open requests from any worker x to any other worker
 56  
  * y. This number is called 'credit'. Let's denote this number by C{x->y}.
 57  
  * This implementation assumes that for a particular worker W, all values of
 58  
  * C{x->W} are the same. Let's denote this value by CR_W. CR_W may change
 59  
  * due to other reasons (e.g. memory pressure observed in an out-of-core
 60  
  * mechanism). However, CR_W is always in range [0, MAX_CR], where MAX_CR
 61  
  * is a user-defined constant. Note that MAX_CR should be representable by
 62  
  * at most 14 bits.
 63  
  *
 64  
  * In this implementation, the value of CR_W is announced to other workers along
 65  
  * with the ACK response envelope for all ACK response envelope going out of W.
 66  
  * Therefore, for non-zero values of CR_W, other workers know the instant value
 67  
  * of CR_W, hence they can control the number of open requests they have to W.
 68  
  * However, it is possible that W announces 0 as CR_W. In this case, other
 69  
  * workers stop opening more requests to W, hence they will not get any new
 70  
  * response envelope from W. This means other workers should be notified with
 71  
  * a dedicated request to resume sending more requests once CR_W becomes
 72  
  * non-zero. In this implementation, once W_CR is announced as 0 to a particular
 73  
  * worker U, we keep U in a set, so later on we can send 'resume signal' to U
 74  
  * once CR_W becomes non-zero. Sending resume signals are done through a
 75  
  * separate thread.
 76  
  */
 77  0
 public class CreditBasedFlowControl implements FlowControl {
 78  
   /**
 79  
    * Maximum number of requests we can have per worker without confirmation
 80  
    * (i.e. open requests)
 81  
    */
 82  0
   public static final IntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER =
 83  
       new IntConfOption("giraph.maxOpenRequestsPerWorker", 20,
 84  
           "Maximum number of requests without confirmation we can have per " +
 85  
               "worker");
 86  
   /** Aggregate number of in-memory unsent requests */
 87  0
   public static final IntConfOption MAX_NUM_OF_UNSENT_REQUESTS =
 88  
       new IntConfOption("giraph.maxNumberOfUnsentRequests", 2000,
 89  
           "Maximum number of unsent requests we can keep in memory");
 90  
   /**
 91  
    * Time interval to wait on unsent requests cahce until we find a spot in it
 92  
    */
 93  0
   public static final IntConfOption UNSENT_CACHE_WAIT_INTERVAL =
 94  
       new IntConfOption("giraph.unsentCacheWaitInterval", 1000,
 95  
           "Time interval to wait on unsent requests cache (in milliseconds)");
 96  
   /** Class logger */
 97  0
   private static final Logger LOG =
 98  0
       Logger.getLogger(CreditBasedFlowControl.class);
 99  
 
 100  
   /** Waiting interval on unsent requests cache until it frees up */
 101  
   private final int unsentWaitMsecs;
 102  
   /** Waiting interval for checking outstanding requests msecs */
 103  
   private final int waitingRequestMsecs;
 104  
   /**
 105  
    * Maximum number of open requests each worker can have to this work at each
 106  
    * moment (CR_W -define above- for this worker)
 107  
    */
 108  
   private volatile short maxOpenRequestsPerWorker;
 109  
   /** Total number of unsent, cached requests */
 110  0
   private final AtomicInteger aggregateUnsentRequests = new AtomicInteger(0);
 111  
   /**
 112  
    * Map of requests permits per worker. Keys in the map are worker ids and
 113  
    * values are pairs (X, Y) where:
 114  
    *   X: is the semaphore to control the number of open requests for a
 115  
    *      particular worker. Basically, the number of available permits on a
 116  
    *      semaphore is the credit available for the worker associated with that
 117  
    *      semaphore.
 118  
    *   Y: is the timestamp of the latest message (resume signal or ACK response)
 119  
    *      that changed the number of permits in the semaphore.
 120  
    * The idea behind keeping a timestamp is to avoid any issue that may happen
 121  
    * due to out-of-order message delivery. For example, consider this scenario:
 122  
    * an ACK response is sent to a worker announcing the credit is 0. Later on,
 123  
    * a resume signal announcing a non-zero credit is sent to the same worker.
 124  
    * Now, if the resume signal receives before the ACK message, the worker
 125  
    * would incorrectly assume credit value of 0, and would avoid sending any
 126  
    * messages, which may lead to a live-lock.
 127  
    *
 128  
    * The timestamp value is simply the request id generated by NettyClient.
 129  
    * These ids are generated in consecutive order, hence simulating the concept
 130  
    * of timestamp. However, the timestamp value should be sent along with
 131  
    * any ACK response envelope. The ACK response envelope is already very small
 132  
    * (maybe 10-20 bytes). So, the timestamp value should not add much overhead
 133  
    * to it. Instead of sending the whole long value request id (8 bytes) as the
 134  
    * timestamp, we can simply send the least significant 2 bytes of it. This is
 135  
    * a valid timestamp, as the credit value can be 0x3FFF (=16383) at most. This
 136  
    * means there will be at most 0x3FFF messages on the fly at each moment,
 137  
    * which means that the timestamp value sent by all messages in fly will fall
 138  
    * into a range of size 0x3FFF. So, it is enough to only consider timestamp
 139  
    * values twice as big as the mentioned range to be able to accurately
 140  
    * determine ordering even when values wrap around. This means we only need to
 141  
    * consider 15 least significant bits of request ids as timestamp values.
 142  
    *
 143  
    * The ACK response value contains following information (from least
 144  
    * significant to most significant):
 145  
    *  - 16 bits timestamp
 146  
    *  - 14 bits credit value
 147  
    *  - 1 bit specifying whether one end of communication is master and hence
 148  
    *    credit based flow control should be ignored
 149  
    *  - 1 bit response flag
 150  
    */
 151  0
   private final ConcurrentMap<Integer, Pair<AdjustableSemaphore, Integer>>
 152  0
       perWorkerOpenRequestMap = Maps.newConcurrentMap();
 153  
   /** Map of unsent cached requests per worker */
 154  0
   private final ConcurrentMap<Integer, Deque<WritableRequest>>
 155  0
       perWorkerUnsentRequestMap = Maps.newConcurrentMap();
 156  
   /**
 157  
    * Set of workers that should be notified to resume sending more requests if
 158  
    * the credit becomes non-zero
 159  
    */
 160  0
   private final Set<Integer> workersToResume = Sets.newHashSet();
 161  
   /**
 162  
    * Resume signals are not using any credit, so they should be treated
 163  
    * differently than normal requests. Resume signals should be ignored in
 164  
    * accounting for credits in credit-based flow control. The following map
 165  
    * keeps sets of request ids, for resume signals sent to other workers that
 166  
    * are still "open". The set of request ids used for resume signals for a
 167  
    * worker is important so we can determine if a received response is for a
 168  
    * resume signal or not.
 169  
    */
 170  0
   private final ConcurrentMap<Integer, Set<Long>> resumeRequestsId =
 171  0
       Maps.newConcurrentMap();
 172  
   /**
 173  
    * Queue of the cached requests to be sent out. The queue keeps pairs of
 174  
    * (destination id, request). The thread-safe blocking queue is used here for
 175  
    * the sake of simplicity. The blocking queue should be bounded (with bounds
 176  
    * no more than user-defined max number of unsent/cached requests) to avoid
 177  
    * excessive memory footprint.
 178  
    */
 179  
   private final BlockingQueue<Pair<Integer, WritableRequest>> toBeSent;
 180  
   /**
 181  
    * Semaphore to control number of cached unsent requests. Maximum number of
 182  
    * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
 183  
    */
 184  
   private final Semaphore unsentRequestPermit;
 185  
   /** Netty client used for sending requests */
 186  
   private final NettyClient nettyClient;
 187  
 
 188  
   /**
 189  
    * Constructor
 190  
    * @param conf configuration
 191  
    * @param nettyClient netty client
 192  
    * @param exceptionHandler Exception handler
 193  
    */
 194  
   public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
 195  
                                 final NettyClient nettyClient,
 196  
                                 Thread.UncaughtExceptionHandler
 197  0
                                     exceptionHandler) {
 198  0
     this.nettyClient = nettyClient;
 199  0
     maxOpenRequestsPerWorker =
 200  0
         (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
 201  0
     checkState(maxOpenRequestsPerWorker < 0x4000 &&
 202  
         maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
 203  
         "requests should be in range (0, " + 0x4FFF + ")");
 204  0
     int maxUnsentRequests = MAX_NUM_OF_UNSENT_REQUESTS.get(conf);
 205  0
     unsentRequestPermit = new Semaphore(maxUnsentRequests);
 206  0
     this.toBeSent = new ArrayBlockingQueue<Pair<Integer, WritableRequest>>(
 207  
         maxUnsentRequests);
 208  0
     unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
 209  0
     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
 210  
 
 211  
     // Thread to handle/send resume signals when necessary
 212  0
     ThreadUtils.startThread(new Runnable() {
 213  
       @Override
 214  
       public void run() {
 215  
         while (true) {
 216  0
           synchronized (workersToResume) {
 217  0
             for (Integer workerId : workersToResume) {
 218  0
               if (maxOpenRequestsPerWorker != 0) {
 219  0
                 sendResumeSignal(workerId);
 220  
               } else {
 221  
                 break;
 222  
               }
 223  0
             }
 224  
             try {
 225  0
               workersToResume.wait();
 226  0
             } catch (InterruptedException e) {
 227  0
               throw new IllegalStateException("run: caught exception " +
 228  
                   "while waiting for resume-sender thread to be notified!",
 229  
                   e);
 230  0
             }
 231  0
           }
 232  
         }
 233  
       }
 234  
     }, "resume-sender", exceptionHandler);
 235  
 
 236  
     // Thread to handle/send cached requests
 237  0
     ThreadUtils.startThread(new Runnable() {
 238  
       @Override
 239  
       public void run() {
 240  
         while (true) {
 241  0
           Pair<Integer, WritableRequest> pair = null;
 242  
           try {
 243  0
             pair = toBeSent.take();
 244  0
           } catch (InterruptedException e) {
 245  0
             throw new IllegalStateException("run: failed while waiting to " +
 246  
                 "take an element from the request queue!", e);
 247  0
           }
 248  0
           int taskId = pair.getLeft();
 249  0
           WritableRequest request = pair.getRight();
 250  0
           nettyClient.doSend(taskId, request);
 251  0
           if (aggregateUnsentRequests.decrementAndGet() == 0) {
 252  0
             synchronized (aggregateUnsentRequests) {
 253  0
               aggregateUnsentRequests.notifyAll();
 254  0
             }
 255  
           }
 256  0
         }
 257  
       }
 258  
     }, "cached-req-sender", exceptionHandler);
 259  0
   }
 260  
 
 261  
   /**
 262  
    * Send resume signal request to a given worker
 263  
    *
 264  
    * @param workerId id of the worker to send the resume signal to
 265  
    */
 266  
   private void sendResumeSignal(int workerId) {
 267  0
     if (maxOpenRequestsPerWorker == 0) {
 268  0
       LOG.warn("sendResumeSignal: method called while the max open requests " +
 269  
           "for worker " + workerId + " is still 0");
 270  0
       return;
 271  
     }
 272  0
     WritableRequest request = new SendResumeRequest(maxOpenRequestsPerWorker);
 273  0
     Long resumeId = nettyClient.doSend(workerId, request);
 274  0
     checkState(resumeId != null);
 275  0
     if (LOG.isDebugEnabled()) {
 276  0
       LOG.debug("sendResumeSignal: sending signal to worker " + workerId +
 277  
           " with credit=" + maxOpenRequestsPerWorker + ", ID=" +
 278  0
           (resumeId & 0xFFFF));
 279  
     }
 280  0
     resumeRequestsId.get(workerId).add(resumeId);
 281  0
   }
 282  
 
 283  
   @Override
 284  
   public void sendRequest(int destTaskId, WritableRequest request) {
 285  0
     Pair<AdjustableSemaphore, Integer> pair =
 286  0
         perWorkerOpenRequestMap.get(destTaskId);
 287  
     // Check if this is the first time sending a request to a worker. If so, we
 288  
     // should the worker id to necessary bookkeeping data structure.
 289  0
     if (pair == null) {
 290  0
       pair = new MutablePair<>(
 291  0
           new AdjustableSemaphore(maxOpenRequestsPerWorker), -1);
 292  0
       Pair<AdjustableSemaphore, Integer> temp =
 293  0
           perWorkerOpenRequestMap.putIfAbsent(destTaskId, pair);
 294  0
       perWorkerUnsentRequestMap.putIfAbsent(
 295  0
           destTaskId, new ArrayDeque<WritableRequest>());
 296  0
       resumeRequestsId.putIfAbsent(
 297  0
           destTaskId, Sets.<Long>newConcurrentHashSet());
 298  0
       if (temp != null) {
 299  0
         pair = temp;
 300  
       }
 301  
     }
 302  0
     AdjustableSemaphore openRequestPermit = pair.getLeft();
 303  
     // Try to reserve a spot for the request amongst the open requests of
 304  
     // the destination worker.
 305  0
     boolean shouldSend = openRequestPermit.tryAcquire();
 306  0
     boolean shouldCache = false;
 307  0
     while (!shouldSend) {
 308  
       // We should not send the request, and should cache the request instead.
 309  
       // It may be possible that the unsent message cache is also full, so we
 310  
       // should try to acquire a space on the cache, and if there is no extra
 311  
       // space in unsent request cache, we should wait until some space
 312  
       // become available. However, it is possible that during the time we are
 313  
       // waiting on the unsent messages cache, actual buffer for open requests
 314  
       // frees up space.
 315  
       try {
 316  0
         shouldCache = unsentRequestPermit.tryAcquire(unsentWaitMsecs,
 317  
             TimeUnit.MILLISECONDS);
 318  0
       } catch (InterruptedException e) {
 319  0
         throw new IllegalStateException("shouldSend: failed " +
 320  
             "while waiting on the unsent request cache to have some more " +
 321  
             "room for extra unsent requests!");
 322  0
       }
 323  0
       if (shouldCache) {
 324  0
         break;
 325  
       }
 326  
       // We may have an open spot in the meantime that we were waiting on the
 327  
       // unsent requests.
 328  0
       shouldSend = openRequestPermit.tryAcquire();
 329  0
       if (shouldSend) {
 330  0
         break;
 331  
       }
 332  
       // The current thread will be at this point only if it could not make
 333  
       // space amongst open requests for the destination worker and has been
 334  
       // timed-out in trying to acquire a space amongst unsent messages. So,
 335  
       // we should report logs, report progress, and check for request
 336  
       // failures.
 337  0
       nettyClient.logAndSanityCheck();
 338  
     }
 339  
     // Either shouldSend == true or shouldCache == true
 340  0
     if (shouldCache) {
 341  0
       Deque<WritableRequest> unsentRequests =
 342  0
           perWorkerUnsentRequestMap.get(destTaskId);
 343  
       // This synchronize block is necessary for the following reason:
 344  
       // Once we are at this point, it means there was no room for this
 345  
       // request to become an open request, hence we have to put it into
 346  
       // unsent cache. Consider the case that since last time we checked if
 347  
       // there is any room for an additional open request so far, all open
 348  
       // requests are delivered and their acknowledgements are also processed.
 349  
       // Now, if we put this request in the unsent cache, it is not being
 350  
       // considered to become an open request, as the only one who checks
 351  
       // on this matter would be the one who receives an acknowledgment for an
 352  
       // open request for the destination worker. So, a lock is necessary
 353  
       // to forcefully serialize the execution if this scenario is about to
 354  
       // happen.
 355  0
       synchronized (unsentRequests) {
 356  0
         shouldSend = openRequestPermit.tryAcquire();
 357  0
         if (!shouldSend) {
 358  0
           aggregateUnsentRequests.getAndIncrement();
 359  0
           unsentRequests.add(request);
 360  0
           return;
 361  
         }
 362  0
       }
 363  
       // We found a spot amongst open requests to send this request. So, this
 364  
       // request won't be cached anymore.
 365  0
       unsentRequestPermit.release();
 366  
     }
 367  0
     nettyClient.doSend(destTaskId, request);
 368  0
   }
 369  
 
 370  
   /**
 371  
    * Whether response specifies that credit should be ignored
 372  
    *
 373  
    * @param response response received
 374  
    * @return true iff credit should be ignored, false otherwise
 375  
    */
 376  
   private boolean shouldIgnoreCredit(int response) {
 377  0
     return ((short) ((response >> (14 + 16)) & 1)) == 1;
 378  
   }
 379  
 
 380  
   /**
 381  
    * Get the credit from a response
 382  
    *
 383  
    * @param response response received
 384  
    * @return credit from the received response
 385  
    */
 386  
   private short getCredit(int response) {
 387  0
     return (short) ((response >> 16) & 0x3FFF);
 388  
   }
 389  
 
 390  
   /**
 391  
    * Get the timestamp from a response
 392  
    *
 393  
    * @param response response received
 394  
    * @return timestamp from the received response
 395  
    */
 396  
   private int getTimestamp(int response) {
 397  0
     return response & 0xFFFF;
 398  
   }
 399  
 
 400  
   /**
 401  
    * Get the response flag from a response
 402  
    *
 403  
    * @param response response received
 404  
    * @return AckSignalFlag coming with the response
 405  
    */
 406  
   @Override
 407  
   public AckSignalFlag getAckSignalFlag(int response) {
 408  0
     return AckSignalFlag.values()[(response >> (16 + 14 + 1)) & 1];
 409  
   }
 410  
 
 411  
   @Override
 412  
   public int calculateResponse(AckSignalFlag flag, int taskId) {
 413  0
     boolean ignoreCredit = nettyClient.masterInvolved(taskId);
 414  0
     if (!ignoreCredit && maxOpenRequestsPerWorker == 0) {
 415  0
       synchronized (workersToResume) {
 416  0
         workersToResume.add(taskId);
 417  0
       }
 418  
     }
 419  0
     int timestamp = (int) (nettyClient.getNextRequestId(taskId) & 0xFFFF);
 420  0
     return (flag.ordinal() << (16 + 14 + 1)) |
 421  
         ((ignoreCredit ? 1 : 0) << (16 + 14)) |
 422  
         (maxOpenRequestsPerWorker << 16) |
 423  
         timestamp;
 424  
   }
 425  
 
 426  
   @Override
 427  
   public void logInfo() {
 428  0
     if (LOG.isInfoEnabled()) {
 429  
       // Count how many unsent requests each task has
 430  0
       Map<Integer, Integer> unsentRequestCounts = Maps.newHashMap();
 431  
       for (Map.Entry<Integer, Deque<WritableRequest>> entry :
 432  0
           perWorkerUnsentRequestMap.entrySet()) {
 433  0
         unsentRequestCounts.put(entry.getKey(), entry.getValue().size());
 434  0
       }
 435  0
       ArrayList<Map.Entry<Integer, Integer>> sorted =
 436  0
           Lists.newArrayList(unsentRequestCounts.entrySet());
 437  0
       Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
 438  
         @Override
 439  
         public int compare(Map.Entry<Integer, Integer> entry1,
 440  
                            Map.Entry<Integer, Integer> entry2) {
 441  0
           int value1 = entry1.getValue();
 442  0
           int value2 = entry2.getValue();
 443  0
           return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
 444  
         }
 445  
       });
 446  0
       StringBuilder message = new StringBuilder();
 447  0
       message.append("logInfo: ").append(aggregateUnsentRequests.get())
 448  0
           .append(" unsent requests in total. ");
 449  0
       int itemsToPrint = Math.min(10, sorted.size());
 450  0
       for (int i = 0; i < itemsToPrint; ++i) {
 451  0
         message.append(sorted.get(i).getValue())
 452  0
             .append(" unsent requests for taskId=")
 453  0
             .append(sorted.get(i).getKey()).append(" (credit=")
 454  0
             .append(perWorkerOpenRequestMap.get(sorted.get(i).getKey())
 455  0
                 .getKey().getMaxPermits())
 456  0
             .append("), ");
 457  
       }
 458  0
       LOG.info(message);
 459  
     }
 460  0
   }
 461  
 
 462  
   @Override
 463  
   public void waitAllRequests() {
 464  
     while (true) {
 465  0
       synchronized (aggregateUnsentRequests) {
 466  0
         if (aggregateUnsentRequests.get() == 0) {
 467  0
           break;
 468  
         }
 469  
         try {
 470  0
           aggregateUnsentRequests.wait(waitingRequestMsecs);
 471  0
         } catch (InterruptedException e) {
 472  0
           throw new IllegalStateException("waitAllRequests: failed while " +
 473  
               "waiting on open/cached requests");
 474  0
         }
 475  0
       }
 476  0
       if (aggregateUnsentRequests.get() == 0) {
 477  0
         break;
 478  
       }
 479  0
       nettyClient.logAndSanityCheck();
 480  
     }
 481  0
   }
 482  
 
 483  
   @Override
 484  
   public int getNumberOfUnsentRequests() {
 485  0
     return aggregateUnsentRequests.get();
 486  
   }
 487  
 
 488  
   @Override
 489  
   public void messageAckReceived(int taskId, long requestId, int response) {
 490  0
     boolean ignoreCredit = shouldIgnoreCredit(response);
 491  0
     short credit = getCredit(response);
 492  0
     int timestamp = getTimestamp(response);
 493  0
     MutablePair<AdjustableSemaphore, Integer> pair =
 494  
         (MutablePair<AdjustableSemaphore, Integer>)
 495  0
             perWorkerOpenRequestMap.get(taskId);
 496  0
     AdjustableSemaphore openRequestPermit = pair.getLeft();
 497  
     // Release a permit on open requests if we received ACK of a request other
 498  
     // than a Resume request (resume requests are always sent regardless of
 499  
     // number of open requests)
 500  0
     if (!resumeRequestsId.get(taskId).remove(requestId)) {
 501  0
       openRequestPermit.release();
 502  0
     } else if (LOG.isDebugEnabled()) {
 503  0
       LOG.debug("messageAckReceived: ACK of resume received from " + taskId +
 504  
           " timestamp=" + timestamp);
 505  
     }
 506  0
     if (!ignoreCredit) {
 507  0
       synchronized (pair) {
 508  0
         if (compareTimestamps(timestamp, pair.getRight()) > 0) {
 509  0
           pair.setRight(timestamp);
 510  0
           openRequestPermit.setMaxPermits(credit);
 511  0
         } else if (LOG.isDebugEnabled()) {
 512  0
           LOG.debug("messageAckReceived: received out-of-order messages." +
 513  
               "Received timestamp=" + timestamp + " and current timestamp=" +
 514  0
               pair.getRight());
 515  
         }
 516  0
       }
 517  
     }
 518  
     // Since we received a response and we changed the credit of the sender
 519  
     // client, we may be able to send some more requests to the sender
 520  
     // client. So, we try to send as much request as we can to the sender
 521  
     // client.
 522  0
     trySendCachedRequests(taskId);
 523  0
   }
 524  
 
 525  
   /**
 526  
    * Try to send as much as cached requests to a given worker
 527  
    *
 528  
    * @param taskId id of the worker to send cached requests to
 529  
    */
 530  
   private void trySendCachedRequests(int taskId) {
 531  0
     Deque<WritableRequest> requestDeque =
 532  0
         perWorkerUnsentRequestMap.get(taskId);
 533  0
     AdjustableSemaphore openRequestPermit =
 534  0
         perWorkerOpenRequestMap.get(taskId).getLeft();
 535  
     while (true) {
 536  
       WritableRequest request;
 537  0
       synchronized (requestDeque) {
 538  0
         request = requestDeque.pollFirst();
 539  0
         if (request == null) {
 540  0
           break;
 541  
         }
 542  
         // See whether the sender client has any unused credit
 543  0
         if (!openRequestPermit.tryAcquire()) {
 544  0
           requestDeque.offerFirst(request);
 545  0
           break;
 546  
         }
 547  0
       }
 548  0
       unsentRequestPermit.release();
 549  
       // At this point, we have a request, and we reserved a credit for the
 550  
       // sender client. So, we put the request in a queue to be sent to the
 551  
       // client.
 552  
       try {
 553  0
         toBeSent.put(
 554  0
             new ImmutablePair<Integer, WritableRequest>(taskId, request));
 555  0
       } catch (InterruptedException e) {
 556  0
         throw new IllegalStateException("trySendCachedRequests: failed while" +
 557  
             "waiting to put element in send queue!", e);
 558  0
       }
 559  0
     }
 560  0
   }
 561  
 
 562  
   /**
 563  
    * Update the max credit that is announced to other workers
 564  
    *
 565  
    * @param newCredit new credit
 566  
    */
 567  
   public void updateCredit(short newCredit) {
 568  0
     newCredit = (short) Math.max(0, Math.min(0x3FFF, newCredit));
 569  
     // Check whether we should send resume signals to some workers
 570  0
     if (maxOpenRequestsPerWorker == 0 && newCredit != 0) {
 571  0
       maxOpenRequestsPerWorker = newCredit;
 572  0
       synchronized (workersToResume) {
 573  0
         workersToResume.notifyAll();
 574  0
       }
 575  
     } else {
 576  0
       maxOpenRequestsPerWorker = newCredit;
 577  
     }
 578  0
   }
 579  
 
 580  
   /**
 581  
    * Compare two timestamps accounting for wrap around. Note that the timestamp
 582  
    * values should be in a range that fits into 14 bits values. This means if
 583  
    * the difference of the two given timestamp is large, we are dealing with one
 584  
    * value being wrapped around.
 585  
    *
 586  
    * @param ts1 first timestamp
 587  
    * @param ts2 second timestamp
 588  
    * @return positive value if first timestamp is later than second timestamp,
 589  
    *         negative otherwise
 590  
    */
 591  
   private int compareTimestamps(int ts1, int ts2) {
 592  0
     int diff = ts1 - ts2;
 593  0
     if (Math.abs(diff) < 0x7FFF) {
 594  0
       return diff;
 595  
     } else {
 596  0
       return -diff;
 597  
     }
 598  
   }
 599  
 
 600  
   /**
 601  
    * Process a resume signal came from a given worker
 602  
    *
 603  
    * @param clientId id of the worker that sent the signal
 604  
    * @param credit the credit value sent along with the resume signal
 605  
    * @param requestId timestamp (request id) of the resume signal
 606  
    */
 607  
   public void processResumeSignal(int clientId, short credit, long requestId) {
 608  0
     int timestamp = (int) (requestId & 0xFFFF);
 609  0
     if (LOG.isDebugEnabled()) {
 610  0
       LOG.debug("processResumeSignal: resume signal from " + clientId +
 611  
           " with timestamp=" + timestamp);
 612  
     }
 613  0
     MutablePair<AdjustableSemaphore, Integer> pair =
 614  
         (MutablePair<AdjustableSemaphore, Integer>)
 615  0
             perWorkerOpenRequestMap.get(clientId);
 616  0
     synchronized (pair) {
 617  0
       if (compareTimestamps(timestamp, pair.getRight()) > 0) {
 618  0
         pair.setRight(timestamp);
 619  0
         pair.getLeft().setMaxPermits(credit);
 620  0
       } else if (LOG.isDebugEnabled()) {
 621  0
         LOG.debug("processResumeSignal: received out-of-order messages. " +
 622  
             "Received timestamp=" + timestamp + " and current timestamp=" +
 623  0
             pair.getRight());
 624  
       }
 625  0
     }
 626  0
     trySendCachedRequests(clientId);
 627  0
   }
 628  
 }