Coverage Report - org.apache.giraph.ooc.OutOfCoreIOCallable
 
Classes in this File Line Coverage Branch Coverage Complexity
OutOfCoreIOCallable
0%
0/59
0%
0/26
6.333
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc;
 20  
 
 21  
 import com.yammer.metrics.core.Counter;
 22  
 import com.yammer.metrics.core.Histogram;
 23  
 import org.apache.giraph.metrics.GiraphMetrics;
 24  
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 25  
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 26  
 import org.apache.giraph.ooc.command.IOCommand;
 27  
 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
 28  
 import org.apache.giraph.ooc.command.WaitIOCommand;
 29  
 import org.apache.log4j.Logger;
 30  
 
 31  
 import java.util.concurrent.Callable;
 32  
 
 33  
 /**
 34  
  * IO threads for out-of-core mechanism.
 35  
  */
 36  0
 public class OutOfCoreIOCallable implements Callable<Void>,
 37  
     ResetSuperstepMetricsObserver {
 38  
   /** Name of Metric for number of bytes read from disk */
 39  
   public static final String BYTES_LOAD_FROM_DISK = "ooc-bytes-load";
 40  
   /** Name of Metric for number of bytes written to disk */
 41  
   public static final String BYTES_STORE_TO_DISK = "ooc-bytes-store";
 42  
   /** Name of Metric for size of loads */
 43  
   public static final String HISTOGRAM_LOAD_SIZE = "ooc-load-size-bytes";
 44  
   /** Name of Metric for size of stores */
 45  
   public static final String HISTOGRAM_STORE_SIZE = "ooc-store-size-bytes";
 46  
   /** Class logger. */
 47  0
   private static final Logger LOG = Logger.getLogger(OutOfCoreIOCallable.class);
 48  
   /** Out-of-core engine */
 49  
   private final OutOfCoreEngine oocEngine;
 50  
   /** Thread id/Disk id */
 51  
   private final int diskId;
 52  
   /** How many bytes of data is read from disk */
 53  
   private Counter bytesReadPerSuperstep;
 54  
   /** How many bytes of data is written to disk */
 55  
   private Counter bytesWrittenPerSuperstep;
 56  
   /** Size of load IO commands */
 57  
   private Histogram histogramLoadSize;
 58  
   /** Size of store IO commands */
 59  
   private Histogram histogramStoreSize;
 60  
 
 61  
   /**
 62  
    * Constructor
 63  
    *
 64  
    * @param oocEngine out-of-core engine
 65  
    * @param diskId thread id/disk id
 66  
    */
 67  0
   public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, int diskId) {
 68  0
     this.oocEngine = oocEngine;
 69  0
     this.diskId = diskId;
 70  0
     newSuperstep(GiraphMetrics.get().perSuperstep());
 71  0
     GiraphMetrics.get().addSuperstepResetObserver(this);
 72  0
   }
 73  
 
 74  
   @Override
 75  
   public Void call() throws Exception {
 76  
     while (true) {
 77  0
       oocEngine.getSuperstepLock().readLock().lock();
 78  0
       IOCommand command = oocEngine.getIOScheduler().getNextIOCommand(diskId);
 79  0
       if (LOG.isDebugEnabled() && !(command instanceof WaitIOCommand)) {
 80  0
         LOG.debug("call: thread " + diskId + "'s next IO command is: " +
 81  
             command);
 82  
       }
 83  0
       if (command == null) {
 84  0
         oocEngine.getSuperstepLock().readLock().unlock();
 85  0
         break;
 86  
       }
 87  0
       if (command instanceof WaitIOCommand) {
 88  0
         oocEngine.getSuperstepLock().readLock().unlock();
 89  
       }
 90  
 
 91  0
       boolean commandExecuted = false;
 92  0
       long duration = 0;
 93  
       long bytes;
 94  
       // CHECKSTYLE: stop IllegalCatch
 95  
       try {
 96  0
         long timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
 97  0
             .getSuperstepGCTime();
 98  0
         long startTime = System.currentTimeMillis();
 99  0
         commandExecuted = command.execute();
 100  0
         duration = System.currentTimeMillis() - startTime;
 101  0
         timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
 102  0
             .getSuperstepGCTime() - timeInGC;
 103  0
         bytes = command.bytesTransferred();
 104  0
         if (LOG.isDebugEnabled() && !(command instanceof WaitIOCommand)) {
 105  0
           LOG.debug("call: thread " + diskId + "'s command " + command +
 106  
               " completed: bytes= " + bytes + ", duration=" + duration + ", " +
 107  0
               "bandwidth=" + String.format("%.2f", (double) bytes / duration *
 108  
               1000 / 1024 / 1024) +
 109  
               ((command instanceof WaitIOCommand) ? "" :
 110  0
                   (", bandwidth (excluding GC time)=" + String.format("%.2f",
 111  0
                       (double) bytes / (duration - timeInGC) *
 112  
                           1000 / 1024 / 1024))));
 113  
         }
 114  0
       } catch (Exception e) {
 115  0
         throw new RuntimeException(
 116  
             "call: execution of IO command " + command + " failed!", e);
 117  0
       }
 118  
       // CHECKSTYLE: resume IllegalCatch
 119  0
       if (!(command instanceof WaitIOCommand)) {
 120  0
         oocEngine.getSuperstepLock().readLock().unlock();
 121  0
         if (bytes != 0) {
 122  0
           if (command instanceof LoadPartitionIOCommand) {
 123  0
             bytesReadPerSuperstep.inc(bytes);
 124  0
             histogramLoadSize.update(bytes);
 125  
           } else {
 126  0
             bytesWrittenPerSuperstep.inc(bytes);
 127  0
             histogramStoreSize.update(bytes);
 128  
           }
 129  
         }
 130  
       }
 131  
 
 132  0
       if (commandExecuted && duration > 0) {
 133  0
         oocEngine.getIOStatistics().update(command.getType(),
 134  0
             command.bytesTransferred(), duration);
 135  
       }
 136  0
       oocEngine.getIOScheduler().ioCommandCompleted(command);
 137  0
     }
 138  0
     if (LOG.isInfoEnabled()) {
 139  0
       LOG.info("call: out-of-core IO thread " + diskId + " terminating!");
 140  
     }
 141  0
     return null;
 142  
   }
 143  
 
 144  
   @Override
 145  
   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
 146  0
     bytesReadPerSuperstep = superstepMetrics.getCounter(BYTES_LOAD_FROM_DISK);
 147  0
     bytesWrittenPerSuperstep =
 148  0
         superstepMetrics.getCounter(BYTES_STORE_TO_DISK);
 149  0
     histogramLoadSize =
 150  0
         superstepMetrics.getUniformHistogram(HISTOGRAM_LOAD_SIZE);
 151  0
     histogramStoreSize =
 152  0
         superstepMetrics.getUniformHistogram(HISTOGRAM_STORE_SIZE);
 153  0
   }
 154  
 }
 155