Coverage Report - org.apache.giraph.ooc.OutOfCoreIOStatistics
 
Classes in this File Line Coverage Branch Coverage Complexity
OutOfCoreIOStatistics
0%
0/73
0%
0/16
0
OutOfCoreIOStatistics$BytesDuration
0%
0/8
N/A
0
OutOfCoreIOStatistics$StatisticsEntry
0%
0/20
0%
0/2
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc;
 20  
 
 21  
 import com.google.common.collect.Maps;
 22  
 import org.apache.giraph.conf.GiraphConstants;
 23  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 24  
 import org.apache.giraph.conf.IntConfOption;
 25  
 import org.apache.giraph.ooc.command.IOCommand.IOCommandType;
 26  
 import org.apache.log4j.Logger;
 27  
 
 28  
 import java.util.Map;
 29  
 import java.util.Queue;
 30  
 import java.util.concurrent.ArrayBlockingQueue;
 31  
 import java.util.concurrent.atomic.AtomicLong;
 32  
 
 33  
 /**
 34  
  * Class to collect statistics regarding IO operations done in out-of-core
 35  
  * mechanism.
 36  
  */
 37  
 public class OutOfCoreIOStatistics {
 38  
   /**
 39  
    * An estimate of disk bandwidth. This number is only used just at the start
 40  
    * of the computation, and will be updated/replaced later on once a few disk
 41  
    * operations happen.
 42  
    */
 43  0
   public static final IntConfOption DISK_BANDWIDTH_ESTIMATE =
 44  
       new IntConfOption("giraph.diskBandwidthEstimate", 125,
 45  
           "An estimate of disk bandwidth (MB/s). This number is used just at " +
 46  
               "the beginning of the computation, and it will be " +
 47  
               "updated/replaced once a few disk operations happen.");
 48  
   /**
 49  
    * How many recent IO operations should we keep track of? Any report given out
 50  
    * of this statistics collector is only based on most recent IO operations.
 51  
    */
 52  0
   public static final IntConfOption IO_COMMAND_HISTORY_SIZE =
 53  
       new IntConfOption("giraph.ioCommandHistorySize", 50,
 54  
           "Number of most recent IO operations to consider for reporting the" +
 55  
               "statistics.");
 56  
 
 57  
   /**
 58  
    * Use this option to control how frequently to print OOC statistics.
 59  
    */
 60  0
   public static final IntConfOption STATS_PRINT_FREQUENCY =
 61  
       new IntConfOption("giraph.oocStatPrintFrequency", 200,
 62  
           "Number of updates before stats are printed.");
 63  
 
 64  
   /** Class logger */
 65  0
   private static final Logger LOG =
 66  0
       Logger.getLogger(OutOfCoreIOStatistics.class);
 67  
   /** Estimate of disk bandwidth (bytes/second) */
 68  
   private final AtomicLong diskBandwidthEstimate;
 69  
   /** Cached value for IO_COMMAND_HISTORY_SIZE */
 70  
   private final int maxHistorySize;
 71  
   /**
 72  
    * Coefficient/Weight of the most recent IO operation toward the disk
 73  
    * bandwidth estimate. Basically if the disk bandwidth estimate if d, and the
 74  
    * latest IO command happened at the rate of r, the new estimate of disk
 75  
    * bandwidth is calculated as:
 76  
    * d_new = updateCoefficient * r + (1 - updateCoefficient) * d
 77  
    */
 78  
   private final double updateCoefficient;
 79  
   /** Queue of all recent commands */
 80  
   private final Queue<StatisticsEntry> commandHistory;
 81  
   /**
 82  
    * Command statistics for each type of IO command. This is the statistics of
 83  
    * the recent commands in the history we keep track of (with 'maxHistorySize'
 84  
    * command in the history).
 85  
    */
 86  
   private final Map<IOCommandType, StatisticsEntry> aggregateStats;
 87  
   /** How many IO command completed? */
 88  0
   private int numUpdates = 0;
 89  
   /** Cached value for {@link #STATS_PRINT_FREQUENCY} */
 90  0
   private int statsPrintFrequency = 0;
 91  
 
 92  
   /**
 93  
    * Constructor
 94  
    *
 95  
    * @param conf configuration
 96  
    * @param numIOThreads number of disks/IO threads
 97  
    */
 98  
   public OutOfCoreIOStatistics(ImmutableClassesGiraphConfiguration conf,
 99  0
                                int numIOThreads) {
 100  0
     this.diskBandwidthEstimate =
 101  0
         new AtomicLong(DISK_BANDWIDTH_ESTIMATE.get(conf) *
 102  
             (long) GiraphConstants.ONE_MB);
 103  0
     this.maxHistorySize = IO_COMMAND_HISTORY_SIZE.get(conf);
 104  0
     this.updateCoefficient = 1.0 / maxHistorySize;
 105  
     // Adding more entry to the capacity of the queue to have a wiggle room
 106  
     // if all IO threads are adding/removing entries from the queue
 107  0
     this.commandHistory =
 108  
         new ArrayBlockingQueue<>(maxHistorySize + numIOThreads);
 109  0
     this.aggregateStats = Maps.newConcurrentMap();
 110  0
     for (IOCommandType type : IOCommandType.values()) {
 111  0
       aggregateStats.put(type, new StatisticsEntry(type, 0, 0, 0));
 112  
     }
 113  0
     this.statsPrintFrequency = STATS_PRINT_FREQUENCY.get(conf);
 114  0
   }
 115  
 
 116  
   /**
 117  
    * Update statistics with the last IO command that is executed.
 118  
    *
 119  
    * @param type type of the IO command that is executed
 120  
    * @param bytesTransferred number of bytes transferred in the last IO command
 121  
    * @param duration duration it took for the last IO command to complete
 122  
    *                 (milliseconds)
 123  
    */
 124  
   public void update(IOCommandType type, long bytesTransferred,
 125  
                      long duration) {
 126  0
     StatisticsEntry entry = aggregateStats.get(type);
 127  0
     synchronized (entry) {
 128  0
       entry.setOccurrence(entry.getOccurrence() + 1);
 129  0
       entry.setDuration(duration + entry.getDuration());
 130  0
       entry.setBytesTransferred(bytesTransferred + entry.getBytesTransferred());
 131  0
     }
 132  0
     commandHistory.offer(
 133  
         new StatisticsEntry(type, bytesTransferred, duration, 0));
 134  0
     if (type != IOCommandType.WAIT) {
 135  
       // If the current estimate is 'd', the new rate is 'r', and the size of
 136  
       // history is 'n', we can simply model all the past command's rate as:
 137  
       // d, d, d, ..., d, r
 138  
       // where 'd' happens for 'n-1' times. Hence the new estimate of the
 139  
       // bandwidth would be:
 140  
       // d_new = (d * (n-1) + r) / n = (1-1/n)*d + 1/n*r
 141  
       // where updateCoefficient = 1/n
 142  0
       diskBandwidthEstimate.set((long)
 143  
           (updateCoefficient * (bytesTransferred / duration * 1000) +
 144  0
               (1 - updateCoefficient) * diskBandwidthEstimate.get()));
 145  
     }
 146  0
     if (commandHistory.size() > maxHistorySize) {
 147  0
       StatisticsEntry removedEntry = commandHistory.poll();
 148  0
       entry = aggregateStats.get(removedEntry.getType());
 149  0
       synchronized (entry) {
 150  0
         entry.setOccurrence(entry.getOccurrence() - 1);
 151  0
         entry.setDuration(entry.getDuration() - removedEntry.getDuration());
 152  0
         entry.setBytesTransferred(
 153  0
             entry.getBytesTransferred() - removedEntry.getBytesTransferred());
 154  0
       }
 155  
     }
 156  0
     numUpdates++;
 157  
     // Outputting log every so many commands
 158  0
     if (numUpdates % statsPrintFrequency == 0) {
 159  0
       if (LOG.isInfoEnabled()) {
 160  0
         LOG.info(this);
 161  
       }
 162  
     }
 163  0
   }
 164  
 
 165  
   @Override
 166  
   public String toString() {
 167  0
     StringBuffer sb = new StringBuffer();
 168  0
     long waitTime = 0;
 169  0
     long loadTime = 0;
 170  0
     long bytesRead = 0;
 171  0
     long storeTime = 0;
 172  0
     long bytesWritten = 0;
 173  
     for (Map.Entry<IOCommandType, StatisticsEntry> entry :
 174  0
         aggregateStats.entrySet()) {
 175  0
       synchronized (entry.getValue()) {
 176  0
         sb.append(entry.getKey() + ": " + entry.getValue() + ", ");
 177  0
         if (entry.getKey() == IOCommandType.WAIT) {
 178  0
           waitTime += entry.getValue().getDuration();
 179  0
         } else if (entry.getKey() == IOCommandType.LOAD_PARTITION) {
 180  0
           loadTime += entry.getValue().getDuration();
 181  0
           bytesRead += entry.getValue().getBytesTransferred();
 182  
         } else {
 183  0
           storeTime += entry.getValue().getDuration();
 184  0
           bytesWritten += entry.getValue().getBytesTransferred();
 185  
         }
 186  0
       }
 187  0
     }
 188  0
     sb.append(String.format("Average STORE: %.2f MB/s, ",
 189  0
         (double) bytesWritten / storeTime * 1000 / 1024 / 1024));
 190  0
     sb.append(String.format("DATA_INJECTION: %.2f MB/s, ",
 191  0
         (double) (bytesRead - bytesWritten) /
 192  
             (waitTime + loadTime + storeTime) * 1000 / 1024 / 1024));
 193  0
     sb.append(String.format("DISK_BANDWIDTH: %.2f MB/s",
 194  0
         (double) diskBandwidthEstimate.get() / 1024 / 1024));
 195  
 
 196  0
     return sb.toString();
 197  
   }
 198  
 
 199  
   /**
 200  
    * @return most recent estimate of the disk bandwidth
 201  
    */
 202  
   public long getDiskBandwidth() {
 203  0
     return diskBandwidthEstimate.get();
 204  
   }
 205  
 
 206  
   /**
 207  
    * Get aggregate statistics of a given command type in the command history
 208  
    *
 209  
    * @param type type of the command to get the statistics for
 210  
    * @return aggregate statistics for the given command type
 211  
    */
 212  
   public BytesDuration getCommandTypeStats(IOCommandType type) {
 213  0
     StatisticsEntry entry = aggregateStats.get(type);
 214  0
     synchronized (entry) {
 215  0
       return new BytesDuration(entry.getBytesTransferred(), entry.getDuration(),
 216  0
           entry.getOccurrence());
 217  0
     }
 218  
   }
 219  
 
 220  
   /**
 221  
    * Helper class to return results of statistics collector for a certain
 222  
    * command type
 223  
    */
 224  
   public static class BytesDuration {
 225  
     /** Number of bytes transferred in a few commands of the same type */
 226  
     private long bytes;
 227  
     /** Duration of it took to execute a few commands of the same type */
 228  
     private long duration;
 229  
     /** Number of commands executed of the same type */
 230  
     private int occurrence;
 231  
 
 232  
     /**
 233  
      * Constructor
 234  
      * @param bytes number of bytes transferred
 235  
      * @param duration duration it took to execute commands
 236  
      * @param occurrence number of commands
 237  
      */
 238  0
     BytesDuration(long bytes, long duration, int occurrence) {
 239  0
       this.bytes = bytes;
 240  0
       this.duration = duration;
 241  0
       this.occurrence = occurrence;
 242  0
     }
 243  
 
 244  
     /**
 245  
      * @return number of bytes transferred for the same command type
 246  
      */
 247  
     public long getBytes() {
 248  0
       return bytes;
 249  
     }
 250  
 
 251  
     /**
 252  
      * @return duration it took to execute a few commands of the same type
 253  
      */
 254  
     public long getDuration() {
 255  0
       return duration;
 256  
     }
 257  
 
 258  
     /**
 259  
      * @return number of commands that are executed of the same type
 260  
      */
 261  
     public int getOccurrence() {
 262  0
       return occurrence;
 263  
     }
 264  
   }
 265  
 
 266  
   /**
 267  
    * Helper class to keep statistics for a certain command type
 268  
    */
 269  
   private static class StatisticsEntry {
 270  
     /** Type of the command */
 271  
     private IOCommandType type;
 272  
     /**
 273  
      * Aggregate number of bytes transferred executing the particular command
 274  
      * type in the history we keep
 275  
      */
 276  
     private long bytesTransferred;
 277  
     /**
 278  
      * Aggregate duration it took executing the particular command type in the
 279  
      * history we keep
 280  
      */
 281  
     private long duration;
 282  
     /**
 283  
      * Number of occurrences of the particular command type in the history we
 284  
      * keep
 285  
      */
 286  
     private int occurrence;
 287  
 
 288  
     /**
 289  
      * Constructor
 290  
      *
 291  
      * @param type type of the command
 292  
      * @param bytesTransferred aggregate number of bytes transferred
 293  
      * @param duration aggregate execution time
 294  
      * @param occurrence number of occurrences of the particular command type
 295  
      */
 296  
     public StatisticsEntry(IOCommandType type, long bytesTransferred,
 297  0
                            long duration, int occurrence) {
 298  0
       this.type = type;
 299  0
       this.bytesTransferred = bytesTransferred;
 300  0
       this.duration = duration;
 301  0
       this.occurrence = occurrence;
 302  0
     }
 303  
 
 304  
     /**
 305  
      * @return type of the command
 306  
      */
 307  
     public IOCommandType getType() {
 308  0
       return type;
 309  
     }
 310  
 
 311  
     /**
 312  
      * @return aggregate number of bytes transferred in the particular command
 313  
      *         type
 314  
      */
 315  
     public long getBytesTransferred() {
 316  0
       return bytesTransferred;
 317  
     }
 318  
 
 319  
     /**
 320  
      * Update the aggregate number of bytes transferred
 321  
      *
 322  
      * @param bytesTransferred aggregate number of bytes
 323  
      */
 324  
     public void setBytesTransferred(long bytesTransferred) {
 325  0
       this.bytesTransferred = bytesTransferred;
 326  0
     }
 327  
 
 328  
     /**
 329  
      * @return aggregate duration it took to execute the particular command type
 330  
      */
 331  
     public long getDuration() {
 332  0
       return duration;
 333  
     }
 334  
 
 335  
     /**
 336  
      * Update the aggregate duration
 337  
      *
 338  
      * @param duration aggregate duration
 339  
      */
 340  
     public void setDuration(long duration) {
 341  0
       this.duration = duration;
 342  0
     }
 343  
 
 344  
     /**
 345  
      * @return number of occurrences of the particular command type
 346  
      */
 347  
     public int getOccurrence() {
 348  0
       return occurrence;
 349  
     }
 350  
 
 351  
     /**
 352  
      * Update the number of occurrences of the particular command type
 353  
      *
 354  
      * @param occurrence number of occurrences
 355  
      */
 356  
     public void setOccurrence(int occurrence) {
 357  0
       this.occurrence = occurrence;
 358  0
     }
 359  
 
 360  
     @Override
 361  
     public String toString() {
 362  0
       if (type == IOCommandType.WAIT) {
 363  0
         return String.format("%.2f sec", duration / 1000.0);
 364  
       } else {
 365  0
         return String.format("%.2f MB/s",
 366  0
             (double) bytesTransferred / duration * 1000 / 1024 / 2014);
 367  
       }
 368  
     }
 369  
   }
 370  
 }