Coverage Report - org.apache.giraph.utils.TaskIdsPermitsBarrier
 
Classes in this File Line Coverage Branch Coverage Complexity
TaskIdsPermitsBarrier
0%
0/37
0%
0/10
2.4
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.utils;
 20  
 
 21  
 import org.apache.hadoop.util.Progressable;
 22  
 import org.apache.log4j.Logger;
 23  
 
 24  
 import com.google.common.collect.Sets;
 25  
 
 26  
 import java.util.HashSet;
 27  
 import java.util.Set;
 28  
 
 29  
 /**
 30  
  * This barrier is used when we don't know how many events are we waiting on
 31  
  * from the start. Instead we have a set of task ids, and each of those will,
 32  
  * at some point of time, give the information about how many events from it
 33  
  * should we expect. Barrier will be waiting for all the tasks to notify it
 34  
  * about that number of events, and than it will also wait for all the events
 35  
  * to happen.
 36  
  *
 37  
  * requirePermits() corresponds to task notifying us how many events from it
 38  
  * to expect, and releasePermits() notifies us about events happening.
 39  
  *
 40  
  * This class is currently used during preparation of aggregators.
 41  
  *
 42  
  * User must follow this protocol for concurrent access:
 43  
  *
 44  
  * (1) an object instance is constructed
 45  
  * (2) arbitrarily many times
 46  
  *     (2a) concurrent calls to requirePermits(), releasePermits() and
 47  
  *          waitForRequiredPermits() are issued
 48  
  *     (2b) waitForRequiredPermits() returns
 49  
  *
 50  
  * Note that the next cycle of calls to requirePermits() or releasePermits()
 51  
  * cannot start until the previous call to waitForRequiredPermits()
 52  
  * has returned.
 53  
  *
 54  
  * Methods of this class are thread-safe.
 55  
  */
 56  
 public class TaskIdsPermitsBarrier {
 57  
   /** Class logger */
 58  0
   private static final Logger LOG =
 59  0
       Logger.getLogger(TaskIdsPermitsBarrier.class);
 60  
   /** Msecs to refresh the progress meter */
 61  
   private static final int MSEC_PERIOD = 10000;
 62  
   /** Maximum number of task ids to list in the log */
 63  
   private static final int MAX_TASK_IDS_TO_LOG = 10;
 64  
   /** Progressable for reporting progress */
 65  
   private final Progressable progressable;
 66  
   /** Number of permits we are currently waiting for */
 67  0
   private long waitingOnPermits = 0;
 68  
   /** Set of task ids which required permits already */
 69  0
   private final Set<Integer> arrivedTaskIds = new HashSet<Integer>();
 70  
   /** Logger */
 71  
   private final TimedLogger logger;
 72  
 
 73  
   /**
 74  
    * Constructor
 75  
    *
 76  
    * @param progressable Progressable for reporting progress
 77  
    */
 78  0
   public TaskIdsPermitsBarrier(Progressable progressable) {
 79  0
     this.progressable = progressable;
 80  0
     logger = new TimedLogger(MSEC_PERIOD, LOG);
 81  0
   }
 82  
 
 83  
   /**
 84  
    * Wait until permits have been required desired number of times,
 85  
    * and all required permits are available
 86  
    *
 87  
    * @param expectedTaskIds List of task ids which we are waiting permits from
 88  
    */
 89  
   public synchronized void waitForRequiredPermits(
 90  
       Set<Integer> expectedTaskIds) {
 91  0
     while (arrivedTaskIds.size() < expectedTaskIds.size() ||
 92  
         waitingOnPermits > 0) {
 93  
       try {
 94  0
         wait(MSEC_PERIOD);
 95  0
       } catch (InterruptedException e) {
 96  0
         throw new IllegalStateException("waitForRequiredPermits: " +
 97  
             "InterruptedException occurred");
 98  0
       }
 99  0
       progressable.progress();
 100  0
       if (LOG.isInfoEnabled()) {
 101  0
         if (arrivedTaskIds.size() < expectedTaskIds.size()) {
 102  0
           String logSuffix = "";
 103  0
           if (expectedTaskIds.size() - arrivedTaskIds.size() <=
 104  
               MAX_TASK_IDS_TO_LOG) {
 105  0
             Sets.SetView<Integer> difference =
 106  0
                 Sets.difference(expectedTaskIds, arrivedTaskIds);
 107  0
             logSuffix = ", task ids: " + difference;
 108  
           }
 109  0
           logger.info("waitForRequiredPermits: " +
 110  
               "Waiting for " +
 111  0
               (expectedTaskIds.size() - arrivedTaskIds.size()) +
 112  
               " more tasks to send their aggregator data" +
 113  
               logSuffix);
 114  0
         } else {
 115  0
           logger.info("waitForRequiredPermits: " +
 116  
               "Waiting for " + waitingOnPermits + " more aggregator requests");
 117  
         }
 118  
       }
 119  
     }
 120  
 
 121  
     // Reset for the next time to use
 122  0
     arrivedTaskIds.clear();
 123  0
     waitingOnPermits = 0;
 124  0
   }
 125  
 
 126  
   /**
 127  
    * Require more permits. This will increase the number of times permits
 128  
    * were required. Doesn't wait for permits to become available.
 129  
    *
 130  
    * @param permits Number of permits to require
 131  
    * @param taskId Task id which required permits
 132  
    */
 133  
   public synchronized void requirePermits(long permits, int taskId) {
 134  0
     arrivedTaskIds.add(taskId);
 135  0
     waitingOnPermits += permits;
 136  0
     notifyAll();
 137  0
   }
 138  
 
 139  
   /**
 140  
    * Release one permit.
 141  
    */
 142  
   public synchronized void releaseOnePermit() {
 143  0
     releasePermits(1);
 144  0
   }
 145  
 
 146  
   /**
 147  
    * Release some permits.
 148  
    *
 149  
    * @param permits Number of permits to release
 150  
    */
 151  
   public synchronized void releasePermits(long permits) {
 152  0
     waitingOnPermits -= permits;
 153  0
     notifyAll();
 154  0
   }
 155  
 }