Coverage Report - org.apache.giraph.utils.ProgressableUtils
 
Classes in this File Line Coverage Branch Coverage Complexity
ProgressableUtils
0%
0/74
0%
0/18
0
ProgressableUtils$1
N/A
N/A
0
ProgressableUtils$ChannelFutureWaitable
0%
0/6
N/A
0
ProgressableUtils$ChannelGroupFutureWaitable
0%
0/6
N/A
0
ProgressableUtils$ExecutorServiceWaitable
0%
0/6
N/A
0
ProgressableUtils$FutureWaitable
0%
0/12
0%
0/2
0
ProgressableUtils$SemaphoreWaitable
0%
0/10
0%
0/4
0
ProgressableUtils$Waitable
N/A
N/A
0
ProgressableUtils$WaitableWithoutResult
0%
0/3
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.utils;
 20  
 
 21  
 import org.apache.hadoop.util.Progressable;
 22  
 import org.apache.log4j.Logger;
 23  
 
 24  
 import io.netty.channel.ChannelFuture;
 25  
 import io.netty.channel.group.ChannelGroupFuture;
 26  
 import io.netty.util.concurrent.EventExecutorGroup;
 27  
 
 28  
 import java.util.ArrayList;
 29  
 import java.util.Collections;
 30  
 import java.util.HashMap;
 31  
 import java.util.Iterator;
 32  
 import java.util.List;
 33  
 import java.util.Map;
 34  
 import java.util.concurrent.Callable;
 35  
 import java.util.concurrent.ExecutionException;
 36  
 import java.util.concurrent.ExecutorService;
 37  
 import java.util.concurrent.Executors;
 38  
 import java.util.concurrent.Future;
 39  
 import java.util.concurrent.Semaphore;
 40  
 import java.util.concurrent.TimeUnit;
 41  
 import java.util.concurrent.TimeoutException;
 42  
 
 43  
 /** Functions for waiting on some events to happen while reporting progress */
 44  0
 public class ProgressableUtils {
 45  
   /** Class logger */
 46  0
   private static final Logger LOG =
 47  0
       Logger.getLogger(ProgressableUtils.class);
 48  
   /** Msecs to refresh the progress meter (one minute) */
 49  
   private static final int DEFUALT_MSEC_PERIOD = 60 * 1000;
 50  
   /**
 51  
    * When getting results with many threads, how many milliseconds to wait
 52  
    * on each when looping through them
 53  
    */
 54  
   private static final int MSEC_TO_WAIT_ON_EACH_FUTURE = 10;
 55  
 
 56  
   /** Do not instantiate. */
 57  0
   private ProgressableUtils() {
 58  0
   }
 59  
 
 60  
   /**
 61  
    * Wait for executor tasks to terminate, while periodically reporting
 62  
    * progress.
 63  
    *
 64  
    * @param executor     Executor which we are waiting for
 65  
    * @param progressable Progressable for reporting progress (Job context)
 66  
    * @param msecsPeriod How often to report progress
 67  
    */
 68  
   public static void awaitExecutorTermination(ExecutorService executor,
 69  
       Progressable progressable, int msecsPeriod) {
 70  0
     waitForever(new ExecutorServiceWaitable(executor), progressable,
 71  
         msecsPeriod);
 72  0
   }
 73  
 
 74  
   /**
 75  
    * Wait for executor tasks to terminate, while periodically reporting
 76  
    * progress.
 77  
    *
 78  
    * @param executor     Executor which we are waiting for
 79  
    * @param progressable Progressable for reporting progress (Job context)
 80  
    */
 81  
   public static void awaitExecutorTermination(ExecutorService executor,
 82  
       Progressable progressable) {
 83  0
     waitForever(new ExecutorServiceWaitable(executor), progressable);
 84  0
   }
 85  
 
 86  
   /**
 87  
    * Wait for executorgroup to terminate, while periodically reporting progress
 88  
    *
 89  
    * @param group ExecutorGroup whose termination we are awaiting
 90  
    * @param progressable Progressable for reporting progress (Job context)
 91  
    */
 92  
   public static void awaitTerminationFuture(EventExecutorGroup group,
 93  
                                             Progressable progressable) {
 94  0
     waitForever(new FutureWaitable<>(group.terminationFuture()), progressable);
 95  0
   }
 96  
 
 97  
   /**
 98  
    * Wait for the result of the future to be ready, while periodically
 99  
    * reporting progress.
 100  
    *
 101  
    * @param <T>          Type of the return value of the future
 102  
    * @param future       Future
 103  
    * @param progressable Progressable for reporting progress (Job context)
 104  
    * @return Computed result of the future.
 105  
    */
 106  
   public static <T> T getFutureResult(Future<T> future,
 107  
       Progressable progressable) {
 108  0
     return waitForever(new FutureWaitable<T>(future), progressable);
 109  
   }
 110  
 
 111  
   /**
 112  
    * Wait for {@link ChannelGroupFuture} to finish, while periodically
 113  
    * reporting progress.
 114  
    *
 115  
    * @param future       ChannelGroupFuture
 116  
    * @param progressable Progressable for reporting progress (Job context)
 117  
    */
 118  
   public static void awaitChannelGroupFuture(ChannelGroupFuture future,
 119  
       Progressable progressable) {
 120  0
     waitForever(new ChannelGroupFutureWaitable(future), progressable);
 121  0
   }
 122  
 
 123  
   /**
 124  
    * Wait for {@link ChannelFuture} to finish, while periodically
 125  
    * reporting progress.
 126  
    *
 127  
    * @param future       ChannelFuture
 128  
    * @param progressable Progressable for reporting progress (Job context)
 129  
    */
 130  
   public static void awaitChannelFuture(ChannelFuture future,
 131  
       Progressable progressable) {
 132  0
     waitForever(new ChannelFutureWaitable(future), progressable);
 133  0
   }
 134  
 
 135  
   /**
 136  
    * Wait to acquire enough permits from {@link Semaphore}, while periodically
 137  
    * reporting progress.
 138  
    *
 139  
    * @param semaphore    Semaphore
 140  
    * @param permits      How many permits to acquire
 141  
    * @param progressable Progressable for reporting progress (Job context)
 142  
    */
 143  
   public static void awaitSemaphorePermits(final Semaphore semaphore,
 144  
       int permits, Progressable progressable) {
 145  
     while (true) {
 146  0
       waitForever(new SemaphoreWaitable(semaphore, permits), progressable);
 147  
       // Verify permits were not taken by another thread,
 148  
       // if they were keep looping
 149  0
       if (semaphore.tryAcquire(permits)) {
 150  0
         return;
 151  
       }
 152  
     }
 153  
   }
 154  
 
 155  
   /**
 156  
    * Wait forever for waitable to finish. Periodically reports progress.
 157  
    *
 158  
    * @param waitable Waitable which we wait for
 159  
    * @param progressable Progressable for reporting progress (Job context)
 160  
    * @param <T> Result type
 161  
    * @return Result of waitable
 162  
    */
 163  
   private static <T> T waitForever(Waitable<T> waitable,
 164  
       Progressable progressable) {
 165  0
     return waitForever(waitable, progressable, DEFUALT_MSEC_PERIOD);
 166  
   }
 167  
 
 168  
   /**
 169  
    * Wait forever for waitable to finish. Periodically reports progress.
 170  
    *
 171  
    * @param waitable Waitable which we wait for
 172  
    * @param progressable Progressable for reporting progress (Job context)
 173  
    * @param msecsPeriod How often to report progress
 174  
    * @param <T> Result type
 175  
    * @return Result of waitable
 176  
    */
 177  
   private static <T> T waitForever(Waitable<T> waitable,
 178  
       Progressable progressable, int msecsPeriod) {
 179  
     while (true) {
 180  0
       waitFor(waitable, progressable, msecsPeriod, msecsPeriod);
 181  0
       if (waitable.isFinished()) {
 182  
         try {
 183  0
           return waitable.getResult();
 184  0
         } catch (ExecutionException e) {
 185  0
           throw new IllegalStateException("waitForever: " +
 186  
               "ExecutionException occurred while waiting for " + waitable, e);
 187  0
         } catch (InterruptedException e) {
 188  0
           throw new IllegalStateException("waitForever: " +
 189  
               "InterruptedException occurred while waiting for " + waitable, e);
 190  
         }
 191  
       }
 192  
     }
 193  
   }
 194  
 
 195  
   /**
 196  
    *  Wait for desired number of milliseconds for waitable to finish.
 197  
    *  Periodically reports progress.
 198  
    *
 199  
    * @param waitable Waitable which we wait for
 200  
    * @param progressable Progressable for reporting progress (Job context)
 201  
    * @param msecs Number of milliseconds to wait for
 202  
    * @param msecsPeriod How often to report progress
 203  
    * @param <T> Result type
 204  
    * @return Result of waitable
 205  
    */
 206  
   private static <T> T waitFor(Waitable<T> waitable, Progressable progressable,
 207  
       int msecs, int msecsPeriod) {
 208  0
     long timeoutTimeMsecs = System.currentTimeMillis() + msecs;
 209  
     int currentWaitMsecs;
 210  
     while (true) {
 211  0
       progressable.progress();
 212  0
       currentWaitMsecs = Math.min(msecs, msecsPeriod);
 213  
       try {
 214  0
         waitable.waitFor(currentWaitMsecs);
 215  0
         if (waitable.isFinished()) {
 216  0
           return waitable.getResult();
 217  
         }
 218  0
       } catch (InterruptedException e) {
 219  0
         throw new IllegalStateException("waitFor: " +
 220  
             "InterruptedException occurred while waiting for " + waitable, e);
 221  0
       } catch (ExecutionException e) {
 222  0
         throw new IllegalStateException("waitFor: " +
 223  
             "ExecutionException occurred while waiting for " + waitable, e);
 224  0
       }
 225  0
       if (LOG.isInfoEnabled()) {
 226  0
         LOG.info("waitFor: Waiting for " + waitable);
 227  
       }
 228  0
       if (System.currentTimeMillis() >= timeoutTimeMsecs) {
 229  0
         return waitable.getTimeoutResult();
 230  
       }
 231  0
       msecs = Math.max(0, msecs - currentWaitMsecs);
 232  
     }
 233  
   }
 234  
 
 235  
   /**
 236  
    * Create {#link numThreads} callables from {#link callableFactory},
 237  
    * execute them and gather results.
 238  
    *
 239  
    * @param callableFactory Factory for Callables
 240  
    * @param numThreads Number of threads to use
 241  
    * @param threadNameFormat Format for thread name
 242  
    * @param progressable Progressable for reporting progress
 243  
    * @param <R> Type of Callable's results
 244  
    * @return List of results from Callables
 245  
    */
 246  
   public static <R> List<R> getResultsWithNCallables(
 247  
       CallableFactory<R> callableFactory, int numThreads,
 248  
       String threadNameFormat, Progressable progressable) {
 249  0
     ExecutorService executorService = Executors.newFixedThreadPool(numThreads,
 250  0
         ThreadUtils.createThreadFactory(threadNameFormat));
 251  0
     HashMap<Integer, Future<R>> futures = new HashMap<>(numThreads);
 252  0
     for (int i = 0; i < numThreads; i++) {
 253  0
       Callable<R> callable = callableFactory.newCallable(i);
 254  0
       Future<R> future = executorService.submit(
 255  
           new LogStacktraceCallable<R>(callable));
 256  0
       futures.put(i, future);
 257  
     }
 258  0
     executorService.shutdown();
 259  0
     List<R> futureResults =
 260  0
         new ArrayList<>(Collections.<R>nCopies(numThreads, null));
 261  
     // Loop through the futures until all are finished
 262  
     // We do this in order to get any exceptions from the futures early
 263  0
     while (!futures.isEmpty()) {
 264  0
       Iterator<Map.Entry<Integer, Future<R>>> iterator =
 265  0
           futures.entrySet().iterator();
 266  0
       while (iterator.hasNext()) {
 267  0
         Map.Entry<Integer, Future<R>> entry = iterator.next();
 268  
         R result;
 269  
         try {
 270  
           // Try to get result from the future
 271  0
           result = entry.getValue().get(
 272  
               MSEC_TO_WAIT_ON_EACH_FUTURE, TimeUnit.MILLISECONDS);
 273  0
         } catch (InterruptedException e) {
 274  0
           throw new IllegalStateException("Interrupted", e);
 275  0
         } catch (ExecutionException e) {
 276  
           // Execution exception wraps the actual cause
 277  0
           if (e.getCause() instanceof RuntimeException) {
 278  0
             throw (RuntimeException) e.getCause();
 279  
           } else {
 280  0
             throw new IllegalStateException("Exception occurred", e.getCause());
 281  
           }
 282  
 
 283  0
         } catch (TimeoutException e) {
 284  
           // If result is not ready yet just keep waiting
 285  0
           continue;
 286  0
         }
 287  
         // Result is ready, put it to final results
 288  0
         futureResults.set(entry.getKey(), result);
 289  
         // Remove current future since we are done with it
 290  0
         iterator.remove();
 291  0
       }
 292  0
       progressable.progress();
 293  0
     }
 294  0
     return futureResults;
 295  
   }
 296  
 
 297  
   /**
 298  
    * Interface for waiting on a result from some operation.
 299  
    *
 300  
    * @param <T> Result type.
 301  
    */
 302  
   private interface Waitable<T> {
 303  
     /**
 304  
      * Wait for desired number of milliseconds for waitable to finish.
 305  
      *
 306  
      * @param msecs Number of milliseconds to wait.
 307  
      */
 308  
     void waitFor(int msecs) throws InterruptedException, ExecutionException;
 309  
 
 310  
     /**
 311  
      * Check if waitable is finished.
 312  
      *
 313  
      * @return True iff waitable finished.
 314  
      */
 315  
     boolean isFinished();
 316  
 
 317  
     /**
 318  
      * Get result of waitable. Call after isFinished() returns true.
 319  
      *
 320  
      * @return Result of waitable.
 321  
      */
 322  
     T getResult() throws ExecutionException, InterruptedException;
 323  
 
 324  
     /**
 325  
      * Get the result which we want to return in case of timeout.
 326  
      *
 327  
      * @return Timeout result.
 328  
      */
 329  
     T getTimeoutResult();
 330  
   }
 331  
 
 332  
   /**
 333  
    * abstract class for waitables which don't have the result.
 334  
    */
 335  0
   private abstract static class WaitableWithoutResult
 336  
       implements Waitable<Void> {
 337  
     @Override
 338  
     public Void getResult() throws ExecutionException, InterruptedException {
 339  0
       return null;
 340  
     }
 341  
 
 342  
     @Override
 343  
     public Void getTimeoutResult() {
 344  0
       return null;
 345  
     }
 346  
   }
 347  
 
 348  
   /**
 349  
    * {@link Waitable} for waiting on a result of a {@link Future}.
 350  
    *
 351  
    * @param <T> Future result type
 352  
    */
 353  
   private static class FutureWaitable<T> implements Waitable<T> {
 354  
     /** Future which we want to wait for */
 355  
     private final Future<T> future;
 356  
 
 357  
     /**
 358  
      * Constructor
 359  
      *
 360  
      * @param future Future which we want to wait for
 361  
      */
 362  0
     public FutureWaitable(Future<T> future) {
 363  0
       this.future = future;
 364  0
     }
 365  
 
 366  
     @Override
 367  
     public void waitFor(int msecs) throws InterruptedException,
 368  
         ExecutionException {
 369  
       try {
 370  0
         future.get(msecs, TimeUnit.MILLISECONDS);
 371  0
       } catch (TimeoutException e) {
 372  0
         if (LOG.isInfoEnabled()) {
 373  0
           LOG.info("waitFor: Future result not ready yet " + future);
 374  
         }
 375  0
       }
 376  0
     }
 377  
 
 378  
     @Override
 379  
     public boolean isFinished() {
 380  0
       return future.isDone();
 381  
     }
 382  
 
 383  
     @Override
 384  
     public T getResult() throws ExecutionException, InterruptedException {
 385  0
       return future.get();
 386  
     }
 387  
 
 388  
     @Override
 389  
     public T getTimeoutResult() {
 390  0
       return null;
 391  
     }
 392  
   }
 393  
 
 394  
   /**
 395  
    * {@link Waitable} for waiting on an {@link ExecutorService} to terminate.
 396  
    */
 397  
   private static class ExecutorServiceWaitable extends WaitableWithoutResult {
 398  
     /** ExecutorService which we want to wait for */
 399  
     private final ExecutorService executorService;
 400  
 
 401  
     /**
 402  
      * Constructor
 403  
      *
 404  
      * @param executorService ExecutorService which we want to wait for
 405  
      */
 406  0
     public ExecutorServiceWaitable(ExecutorService executorService) {
 407  0
       this.executorService = executorService;
 408  0
     }
 409  
 
 410  
     @Override
 411  
     public void waitFor(int msecs) throws InterruptedException {
 412  0
       executorService.awaitTermination(msecs, TimeUnit.MILLISECONDS);
 413  0
     }
 414  
 
 415  
     @Override
 416  
     public boolean isFinished() {
 417  0
       return executorService.isTerminated();
 418  
     }
 419  
   }
 420  
 
 421  
   /**
 422  
    * {@link Waitable} for waiting on a {@link ChannelGroupFuture} to
 423  
    * terminate.
 424  
    */
 425  
   private static class ChannelGroupFutureWaitable extends
 426  
       WaitableWithoutResult {
 427  
     /** ChannelGroupFuture which we want to wait for */
 428  
     private final ChannelGroupFuture future;
 429  
 
 430  
     /**
 431  
      * Constructor
 432  
      *
 433  
      * @param future ChannelGroupFuture which we want to wait for
 434  
      */
 435  0
     public ChannelGroupFutureWaitable(ChannelGroupFuture future) {
 436  0
       this.future = future;
 437  0
     }
 438  
 
 439  
     @Override
 440  
     public void waitFor(int msecs) throws InterruptedException {
 441  0
       future.await(msecs, TimeUnit.MILLISECONDS);
 442  0
     }
 443  
 
 444  
     @Override
 445  
     public boolean isFinished() {
 446  0
       return future.isDone();
 447  
     }
 448  
   }
 449  
 
 450  
   /**
 451  
    * {@link Waitable} for waiting on a {@link ChannelFuture} to
 452  
    * terminate.
 453  
    */
 454  
   private static class ChannelFutureWaitable extends WaitableWithoutResult {
 455  
     /** ChannelGroupFuture which we want to wait for */
 456  
     private final ChannelFuture future;
 457  
 
 458  
     /**
 459  
      * Constructor
 460  
      *
 461  
      * @param future ChannelFuture which we want to wait for
 462  
      */
 463  0
     public ChannelFutureWaitable(ChannelFuture future) {
 464  0
       this.future = future;
 465  0
     }
 466  
 
 467  
     @Override
 468  
     public void waitFor(int msecs) throws InterruptedException {
 469  0
       future.await(msecs, TimeUnit.MILLISECONDS);
 470  0
     }
 471  
 
 472  
     @Override
 473  
     public boolean isFinished() {
 474  0
       return future.isDone();
 475  
     }
 476  
   }
 477  
 
 478  
   /**
 479  
    * {@link Waitable} for waiting on required number of permits in a
 480  
    * {@link Semaphore} to become available.
 481  
    */
 482  
   private static class SemaphoreWaitable extends WaitableWithoutResult {
 483  
     /** Semaphore to wait on */
 484  
     private final Semaphore semaphore;
 485  
     /** How many permits to wait on */
 486  
     private final int permits;
 487  
 
 488  
     /**
 489  
      * Constructor
 490  
      *
 491  
      * @param semaphore Semaphore to wait on
 492  
      * @param permits How many permits to wait on
 493  
      */
 494  0
     public SemaphoreWaitable(Semaphore semaphore, int permits) {
 495  0
       this.semaphore = semaphore;
 496  0
       this.permits = permits;
 497  0
     }
 498  
 
 499  
     @Override
 500  
     public void waitFor(int msecs) throws InterruptedException {
 501  0
       boolean acquired =
 502  0
           semaphore.tryAcquire(permits, msecs, TimeUnit.MILLISECONDS);
 503  
       // Return permits if we managed to acquire them
 504  0
       if (acquired) {
 505  0
         semaphore.release(permits);
 506  
       }
 507  0
     }
 508  
 
 509  
     @Override
 510  
     public boolean isFinished() {
 511  0
       return semaphore.availablePermits() >= permits;
 512  
     }
 513  
   }
 514  
 }