Coverage Report - org.apache.giraph.zk.PredicateLock
 
Classes in this File Line Coverage Branch Coverage Complexity
PredicateLock
0%
0/52
0%
0/14
3
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.zk;
 20  
 
 21  
 import java.util.concurrent.TimeUnit;
 22  
 import java.util.concurrent.locks.Condition;
 23  
 import java.util.concurrent.locks.Lock;
 24  
 import java.util.concurrent.locks.ReentrantLock;
 25  
 
 26  
 import org.apache.giraph.time.SystemTime;
 27  
 import org.apache.giraph.time.Time;
 28  
 import org.apache.hadoop.util.Progressable;
 29  
 import org.apache.log4j.Logger;
 30  
 
 31  
 /**
 32  
  * A lock with a predicate that was be used to synchronize events and keep the
 33  
  * job context updated while waiting.
 34  
  */
 35  
 public class PredicateLock implements BspEvent {
 36  
   /** Class logger */
 37  0
   private static final Logger LOG = Logger.getLogger(PredicateLock.class);
 38  
   /** Default msecs to refresh the progress meter */
 39  
   private static final int DEFAULT_MSEC_PERIOD = 10000;
 40  
   /** Progressable for reporting progress (Job context) */
 41  
   protected final Progressable progressable;
 42  
   /** Actual mses to refresh the progress meter */
 43  
   private final int msecPeriod;
 44  
   /** Lock */
 45  0
   private Lock lock = new ReentrantLock();
 46  
   /** Condition associated with lock */
 47  0
   private Condition cond = lock.newCondition();
 48  
   /** Predicate */
 49  0
   private boolean eventOccurred = false;
 50  
   /** Keeps track of the time */
 51  
   private final Time time;
 52  
 
 53  
   /**
 54  
    * Constructor with default values.
 55  
    *
 56  
    * @param progressable used to report progress() (usually a Mapper.Context)
 57  
    */
 58  
   public PredicateLock(Progressable progressable) {
 59  0
     this(progressable, DEFAULT_MSEC_PERIOD, SystemTime.get());
 60  0
   }
 61  
 
 62  
   /**
 63  
    * Constructor.
 64  
    *
 65  
    * @param progressable used to report progress() (usually a Mapper.Context)
 66  
    * @param msecPeriod Msecs between progress reports
 67  
    * @param time Time implementation
 68  
    */
 69  0
   public PredicateLock(Progressable progressable, int msecPeriod, Time time) {
 70  0
     this.progressable = progressable;
 71  0
     this.msecPeriod = msecPeriod;
 72  0
     this.time = time;
 73  0
   }
 74  
 
 75  
   @Override
 76  
   public void reset() {
 77  0
     lock.lock();
 78  
     try {
 79  0
       eventOccurred = false;
 80  
     } finally {
 81  0
       lock.unlock();
 82  0
     }
 83  0
   }
 84  
 
 85  
   @Override
 86  
   public void signal() {
 87  0
     lock.lock();
 88  
     try {
 89  0
       eventOccurred = true;
 90  0
       cond.signalAll();
 91  
     } finally {
 92  0
       lock.unlock();
 93  0
     }
 94  0
   }
 95  
 
 96  
   @Override
 97  
   public boolean waitMsecs(int msecs) {
 98  0
     if (msecs < 0) {
 99  0
       throw new RuntimeException("waitMsecs: msecs cannot be negative!");
 100  
     }
 101  0
     long maxMsecs = time.getMilliseconds() + msecs;
 102  0
     int curMsecTimeout = 0;
 103  0
     lock.lock();
 104  
     try {
 105  0
       while (!eventOccurred) {
 106  0
         curMsecTimeout =
 107  0
             Math.min(msecs, msecPeriod);
 108  0
         if (LOG.isDebugEnabled()) {
 109  0
           LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
 110  
         }
 111  
         try {
 112  0
           boolean signaled =
 113  0
               cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
 114  0
           if (LOG.isDebugEnabled()) {
 115  0
             LOG.debug("waitMsecs: Got timed signaled of " +
 116  
               signaled);
 117  
           }
 118  0
         } catch (InterruptedException e) {
 119  0
           throw new IllegalStateException(
 120  
             "waitMsecs: Caught interrupted " +
 121  
             "exception on cond.await() " +
 122  
             curMsecTimeout, e);
 123  0
         }
 124  0
         if (time.getMilliseconds() > maxMsecs) {
 125  0
           return false;
 126  
         }
 127  0
         msecs = Math.max(0, msecs - curMsecTimeout);
 128  0
         progressable.progress(); // go around again
 129  
       }
 130  
     } finally {
 131  0
       lock.unlock();
 132  0
     }
 133  0
     return true;
 134  
   }
 135  
 
 136  
   @Override
 137  
   public void waitForTimeoutOrFail(long timeout) {
 138  0
     long t0 = System.currentTimeMillis();
 139  0
     while (!waitMsecs(msecPeriod)) {
 140  0
       if (System.currentTimeMillis() > t0 + timeout) {
 141  0
         throw new RuntimeException("Timeout waiting");
 142  
       }
 143  0
       progressable.progress();
 144  
     }
 145  0
   }
 146  
 }