Coverage Report - org.apache.giraph.ooc.policy.SimpleGCMonitoringOracle
 
Classes in this File Line Coverage Branch Coverage Complexity
SimpleGCMonitoringOracle
0%
0/90
0%
0/44
4.182
SimpleGCMonitoringOracle$GCObservation
0%
0/29
0%
0/6
4.182
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc.policy;
 20  
 
 21  
 import com.google.common.collect.Maps;
 22  
 import com.sun.management.GarbageCollectionNotificationInfo;
 23  
 import org.apache.giraph.conf.FloatConfOption;
 24  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 25  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 26  
 import org.apache.giraph.ooc.OutOfCoreIOStatistics;
 27  
 import org.apache.giraph.ooc.command.IOCommand;
 28  
 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
 29  
 import org.apache.giraph.ooc.command.WaitIOCommand;
 30  
 import org.apache.log4j.Logger;
 31  
 
 32  
 import java.lang.management.MemoryUsage;
 33  
 import java.util.Map;
 34  
 import java.util.concurrent.atomic.AtomicInteger;
 35  
 import java.util.concurrent.atomic.AtomicLong;
 36  
 
 37  
 /**
 38  
  * Out-of-core oracle to adaptively control data kept in memory, with the goal
 39  
  * of keeping the memory state constantly at a desired state. This oracle
 40  
  * monitors GC behavior to keep track of memory pressure.
 41  
  *
 42  
  * After each GC is done, this oracle retrieve statistics about the memory
 43  
  * pressure (memory used, max memory, and how far away memory is compared to a
 44  
  * max optimal pressure). Based on the the past 2 recent memory statistics,
 45  
  * the oracle predicts the status of the memory, and sets the rate of load/store
 46  
  * of data from/to disk. If the rate of loading data from disk is 'l', and the
 47  
  * rate of storing data to disk is 's', the rate of data injection to memory
 48  
  * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should
 49  
  * be based on the prediction of memory status.
 50  
  *
 51  
  * Assume that based on the previous GC call the memory usage at time t_0 is
 52  
  * m_0, and based on the most recent GC call the memory usage at time t_1 is
 53  
  * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0).
 54  
  * Assume that the ideal memory pressure happens when the memory usage is
 55  
  * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means
 56  
  * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date
 57  
  * injection rate to memory so far was i, the new injection rate should be:
 58  
  * i_new = i - (alpha - beta)
 59  
  */
 60  0
 public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
 61  
   /**
 62  
    * The optimal memory pressure at which GC behavior is close to ideal. This
 63  
    * fraction may be dependant on the GC strategy used for running a job, but
 64  
    * generally should not be dependent on the graph processing application.
 65  
    */
 66  0
   public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
 67  
       new FloatConfOption("giraph.optimalMemoryPressure", 0.8f,
 68  
           "The memory pressure (fraction of used memory) at which the job " +
 69  
               "shows the optimal GC behavior. This fraction may be dependent " +
 70  
               "on the GC strategy used in running the job.");
 71  
 
 72  
   /** Class logger */
 73  0
   private static final Logger LOG =
 74  0
       Logger.getLogger(SimpleGCMonitoringOracle.class);
 75  
   /** Cached value for OPTIMAL_MEMORY_PRESSURE */
 76  
   private final float optimalMemoryPressure;
 77  
   /** Out-of-core engine */
 78  
   private final OutOfCoreEngine oocEngine;
 79  
   /** Status of memory from the last GC call */
 80  
   private GCObservation lastGCObservation;
 81  
   /** Desired rate of data injection to memory */
 82  0
   private final AtomicLong desiredDiskToMemoryDataRate =
 83  
       new AtomicLong(0);
 84  
   /** Number of on the fly (outstanding) IO commands for each command type */
 85  0
   private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences =
 86  0
       Maps.newConcurrentMap();
 87  
 
 88  
   /**
 89  
    * Constructor
 90  
    *
 91  
    * @param conf configuration
 92  
    * @param oocEngine out-of-core engine
 93  
    */
 94  
   public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf,
 95  0
                                   OutOfCoreEngine oocEngine) {
 96  0
     this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
 97  0
     this.oocEngine = oocEngine;
 98  0
     this.lastGCObservation = new GCObservation(-1, 0, 0);
 99  0
     for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
 100  0
       commandOccurrences.put(type, new AtomicInteger(0));
 101  
     }
 102  0
   }
 103  
 
 104  
   @Override
 105  
   public synchronized void gcCompleted(GarbageCollectionNotificationInfo
 106  
                                              gcInfo) {
 107  0
     long time = System.currentTimeMillis();
 108  0
     Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo()
 109  0
         .getMemoryUsageAfterGc();
 110  0
     long usedMemory = 0;
 111  0
     long maxMemory = 0;
 112  0
     for (MemoryUsage memDetail : memAfter.values()) {
 113  0
       usedMemory += memDetail.getUsed();
 114  0
       maxMemory += memDetail.getMax();
 115  0
     }
 116  0
     GCObservation observation = new GCObservation(time, usedMemory, maxMemory);
 117  0
     if (LOG.isInfoEnabled()) {
 118  0
       LOG.info("gcCompleted: GC completed with: " + observation);
 119  
     }
 120  
     // Whether this is not the first GC call in the application
 121  0
     if (lastGCObservation.isValid()) {
 122  0
       long deltaDataRate =
 123  0
           lastGCObservation.getDesiredDeltaDataRate(observation);
 124  0
       long diskBandwidthEstimate =
 125  0
           oocEngine.getIOStatistics().getDiskBandwidth();
 126  
       // Update the desired data injection rate to memory. The data injection
 127  
       // rate cannot be less than -disk_bandwidth (the extreme case happens if
 128  
       // we only do 'store'), and cannot be more than disk_bandwidth (the
 129  
       // extreme case happens if we only do 'load').
 130  0
       long dataInjectionRate = desiredDiskToMemoryDataRate.get();
 131  0
       desiredDiskToMemoryDataRate.set(Math.max(
 132  0
           Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate,
 133  
               diskBandwidthEstimate), -diskBandwidthEstimate));
 134  0
       if (LOG.isInfoEnabled()) {
 135  0
         LOG.info("gcCompleted: changing data injection rate from " +
 136  0
             String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) +
 137  0
             " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() /
 138  
             1024.0 / 1024.0));
 139  
       }
 140  
     }
 141  0
     lastGCObservation = observation;
 142  0
   }
 143  
 
 144  
   @Override
 145  
   public void startIteration() {
 146  0
   }
 147  
 
 148  
   /**
 149  
    * Get the current data injection rate to memory based on the commands ran
 150  
    * in the history (retrieved from statistics collector), and outstanding
 151  
    * commands issued by the IO scheduler.
 152  
    *
 153  
    * @return the current data injection rate to memory
 154  
    */
 155  
   private long getCurrentDataInjectionRate() {
 156  0
     long effectiveBytesTransferred = 0;
 157  0
     long effectiveDuration = 0;
 158  0
     for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
 159  0
       OutOfCoreIOStatistics.BytesDuration stats =
 160  0
           oocEngine.getIOStatistics().getCommandTypeStats(type);
 161  0
       int occurrence = commandOccurrences.get(type).get();
 162  0
       long typeBytesTransferred = stats.getBytes();
 163  0
       long typeDuration = stats.getDuration();
 164  
       // If there is an outstanding command, we still do not know how many bytes
 165  
       // it will transfer, and how long it will take. So, we guesstimate these
 166  
       // numbers based on other similar commands happened in the history. We
 167  
       // simply take the average number of bytes transferred for the particular
 168  
       // command, and we take average duration for the particular command. We
 169  
       // should multiply these numbers by the number of outstanding commands of
 170  
       // this particular command type.
 171  0
       if (stats.getOccurrence() != 0) {
 172  0
         typeBytesTransferred += stats.getBytes() / stats.getOccurrence() *
 173  
             occurrence;
 174  0
         typeDuration += stats.getDuration() / stats.getOccurrence() *
 175  
             occurrence;
 176  
       }
 177  0
       if (type == IOCommand.IOCommandType.LOAD_PARTITION) {
 178  0
         effectiveBytesTransferred += typeBytesTransferred;
 179  
       } else {
 180  
         // Store (data going out of memory), or wait (no data transferred)
 181  0
         effectiveBytesTransferred -= typeBytesTransferred;
 182  
       }
 183  0
       effectiveDuration += typeDuration;
 184  
     }
 185  0
     if (effectiveDuration == 0) {
 186  0
       return 0;
 187  
     } else {
 188  0
       return effectiveBytesTransferred / effectiveDuration;
 189  
     }
 190  
   }
 191  
 
 192  
   @Override
 193  
   public IOAction[] getNextIOActions() {
 194  0
     long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
 195  0
     long desiredRate = desiredDiskToMemoryDataRate.get();
 196  0
     long currentRate = getCurrentDataInjectionRate();
 197  0
     if (desiredRate > error) {
 198  
       // 'l-s' is positive, we should do more load than store.
 199  0
       if (currentRate > desiredRate + error) {
 200  
         // We should decrease 'l-s'. This can be done either by increasing 's'
 201  
         // or issuing wait command. We prioritize wait over hard store.
 202  0
         return new IOAction[]{
 203  
           IOAction.STORE_MESSAGES_AND_BUFFERS,
 204  
           IOAction.STORE_PROCESSED_PARTITION};
 205  0
       } else if (currentRate < desiredRate - error) {
 206  
         // We should increase 'l-s'. We can simply load partitions/data.
 207  0
         return new IOAction[]{IOAction.LOAD_PARTITION};
 208  
       } else {
 209  
         // We are in a proper state and we should keep up with the rate. We can
 210  
         // either soft store data or load data (hard load, since we desired rate
 211  
         // is positive).
 212  0
         return new IOAction[]{
 213  
           IOAction.STORE_MESSAGES_AND_BUFFERS,
 214  
           IOAction.STORE_PROCESSED_PARTITION,
 215  
           IOAction.LOAD_PARTITION};
 216  
       }
 217  0
     } else if (desiredRate < -error) {
 218  
       // 'l-s' is negative, we should do more store than load.
 219  0
       if (currentRate < desiredRate - error) {
 220  
         // We should increase 'l-s', but we should be cautious. We only do soft
 221  
         // load, or wait.
 222  0
         return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
 223  0
       } else if (currentRate > desiredRate + error) {
 224  
         // We should reduce 'l-s', we do hard store.
 225  0
         return new IOAction[]{
 226  
           IOAction.STORE_MESSAGES_AND_BUFFERS,
 227  
           IOAction.STORE_PARTITION};
 228  
       } else {
 229  
         // We should keep up with the rate. We can either soft store data, or
 230  
         // soft load data.
 231  0
         return new IOAction[]{
 232  
           IOAction.STORE_MESSAGES_AND_BUFFERS,
 233  
           IOAction.STORE_PROCESSED_PARTITION,
 234  
           IOAction.LOAD_UNPROCESSED_PARTITION};
 235  
       }
 236  
     } else {
 237  
       // 'l-s' is almost zero. If current rate is over the desired rate, we do
 238  
       // soft store. If the current rate is below the desired rate, we do soft
 239  
       // load.
 240  0
       if (currentRate > desiredRate + error) {
 241  0
         return new IOAction[]{
 242  
           IOAction.STORE_MESSAGES_AND_BUFFERS,
 243  
           IOAction.STORE_PROCESSED_PARTITION};
 244  0
       } else if (currentRate < desiredRate - error) {
 245  0
         return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
 246  
       } else {
 247  0
         return new IOAction[]{
 248  
           IOAction.STORE_MESSAGES_AND_BUFFERS,
 249  
           IOAction.STORE_PROCESSED_PARTITION,
 250  
           IOAction.LOAD_UNPROCESSED_PARTITION};
 251  
       }
 252  
     }
 253  
   }
 254  
 
 255  
   @Override
 256  
   public synchronized boolean approve(IOCommand command) {
 257  0
     long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
 258  0
     long desiredRate = desiredDiskToMemoryDataRate.get();
 259  0
     long currentRate = getCurrentDataInjectionRate();
 260  
     // The command is denied iff the current rate is above the desired rate and
 261  
     // we are doing load (instead of store), or the current rate is below the
 262  
     // desired rate and we are doing store (instead of loading).
 263  0
     if (currentRate > desiredRate + error &&
 264  
         command instanceof LoadPartitionIOCommand) {
 265  0
       return false;
 266  
     }
 267  0
     if (currentRate < desiredRate - error &&
 268  
         !(command instanceof LoadPartitionIOCommand) &&
 269  
         !(command instanceof WaitIOCommand)) {
 270  0
       return false;
 271  
     }
 272  0
     commandOccurrences.get(command.getType()).getAndIncrement();
 273  0
     return true;
 274  
   }
 275  
 
 276  
   @Override
 277  
   public void commandCompleted(IOCommand command) {
 278  0
     commandOccurrences.get(command.getType()).getAndDecrement();
 279  0
   }
 280  
 
 281  
   /** Helper class to record memory status after GC calls */
 282  
   private class GCObservation {
 283  
     /** The time at which the GC happened (in milliseconds) */
 284  
     private long time;
 285  
     /** Amount of memory used after the GC call */
 286  
     private long usedMemory;
 287  
     /** Maximum amounts of memory reported by GC listener */
 288  
     private long maxMemory;
 289  
 
 290  
     /**
 291  
      * Constructor
 292  
      *
 293  
      * @param time time of GC
 294  
      * @param usedMemory amount of used memory after GC
 295  
      * @param maxMemory amount of all available memory based on GC observation
 296  
      */
 297  0
     public GCObservation(long time, long usedMemory, long maxMemory) {
 298  0
       this.time = time;
 299  0
       this.usedMemory = usedMemory;
 300  0
       this.maxMemory = maxMemory;
 301  0
     }
 302  
 
 303  
     /**
 304  
      * Is this a valid observation?
 305  
      *
 306  
      * @return true iff it is a valid observation
 307  
      */
 308  
     public boolean isValid() {
 309  0
       return time > 0;
 310  
     }
 311  
 
 312  
     /**
 313  
      * Considering a new observation of memory status after the most recent GC,
 314  
      * what is the desired rate for data injection to memory.
 315  
      *
 316  
      * @param newObservation the most recent GC observation
 317  
      * @return desired rate of data injection to memory
 318  
      */
 319  
     public long getDesiredDeltaDataRate(GCObservation newObservation) {
 320  0
       long newUsedMemory = newObservation.usedMemory;
 321  0
       long newMaxMemory = newObservation.maxMemory;
 322  0
       long lastUsedMemory = usedMemory;
 323  0
       long lastMaxMemory = maxMemory;
 324  
       // Scale the memory status of two GC observation to be the same
 325  0
       long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory);
 326  0
       newUsedMemory =
 327  
           (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory);
 328  0
       lastUsedMemory =
 329  
           (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory);
 330  0
       long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory);
 331  0
       if (LOG.isInfoEnabled()) {
 332  0
         LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " +
 333  0
             "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format(
 334  0
             "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) +
 335  0
             String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 /
 336  
                 1024.0));
 337  
       }
 338  0
       long interval = newObservation.time - time;
 339  0
       if (interval == 0) {
 340  0
         interval = 1;
 341  0
         LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " +
 342  
             "time!");
 343  
       }
 344  0
       long currentDataRate = (long) ((double) (newUsedMemory -
 345  
           lastUsedMemory) / interval * 1000);
 346  0
       long desiredDataRate = (long) ((double) (desiredUsedMemory -
 347  
           newUsedMemory) / interval * 1000);
 348  0
       return currentDataRate - desiredDataRate;
 349  
     }
 350  
 
 351  
     @Override
 352  
     public String toString() {
 353  0
       return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " +
 354  0
           "time: %d ms)", usedMemory / 1024.0 / 1024.0,
 355  0
           maxMemory / 1024.0 / 1024.0, time);
 356  
     }
 357  
   }
 358  
 }