Coverage Report - org.apache.giraph.ooc.policy.ThresholdBasedOracle
 
Classes in this File Line Coverage Branch Coverage Complexity
ThresholdBasedOracle
0%
0/58
0%
0/22
3.875
ThresholdBasedOracle$1
0%
0/25
0%
0/14
3.875
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc.policy;
 20  
 
 21  
 import com.sun.management.GarbageCollectionNotificationInfo;
 22  
 import org.apache.giraph.comm.netty.NettyClient;
 23  
 import org.apache.giraph.conf.FloatConfOption;
 24  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 25  
 import org.apache.giraph.conf.LongConfOption;
 26  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 27  
 import org.apache.giraph.ooc.command.IOCommand;
 28  
 import org.apache.giraph.utils.MemoryUtils;
 29  
 import org.apache.giraph.utils.ThreadUtils;
 30  
 import org.apache.log4j.Logger;
 31  
 
 32  
 import static com.google.common.base.Preconditions.checkState;
 33  
 
 34  
 /**
 35  
  * Out-of-core oracle to adaptively control data kept in memory, with the goal
 36  
  * of keeping the memory usage at a desired state. Out-of-core policy in this
 37  
  * oracle is based on several user-defined thresholds. Also, this oracle spawns
 38  
  * a thread to periodically check the memory usage. This thread would issue
 39  
  * manual GC calls if JVM fails to call major/full GC for a while and the amount
 40  
  * of used memory is about to cause high-memory pressure. This oracle, also,
 41  
  * monitors GC activities. The monitoring mechanism looks for major/full GC
 42  
  * calls, and updates out-of-core decisions based on the amount of available
 43  
  * memory after such GCs. There are three out-of-core decisions:
 44  
  *  - Which IO operations should be done (load/offload of partitions and
 45  
  *    messages)
 46  
  *  - What the incoming messages rate should be (updating credits announced by
 47  
  *    this worker in credit-based flow-control mechanism)
 48  
  *  - How many processing threads should remain active (tethering rate of
 49  
  *    data generation)
 50  
  *
 51  
  * The following table shows the relationship of these decisions and
 52  
  * used-defined thresholds.
 53  
  * --------------------------------------------------------------
 54  
  * Memory Pressure     |  Manual |   IO   | Credit   | Active   |
 55  
  * (memory usage)      |   GC?   | Action |          | Threads  |
 56  
  * --------------------------------------------------------------
 57  
  *                     |  Yes    | hard   |  0       |  0       |
 58  
  *                     |         | store  |          |          |
 59  
  * failPressure -------------------------------------------------
 60  
  *                     |  Yes    | hard   |  0       | fraction |
 61  
  *                     |         | store  |          |          |
 62  
  * emergencyPressure --------------------------------------------
 63  
  *                     |  Yes    | hard   | fraction |  max     |
 64  
  *                     |         | store  |          |          |
 65  
  * highPressure -------------------------------------------------
 66  
  *                     |  No     | soft   | fraction |  max     |
 67  
  *                     |         | store  |          |          |
 68  
  * optimalPressure ----------------------------------------------
 69  
  *                     |  No     | soft   |  max     |  max     |
 70  
  *                     |         | load   |          |          |
 71  
  * lowPressure --------------------------------------------------
 72  
  *                     |  No     | hard   |  max     |  max     |
 73  
  *                     |         | load   |          |          |
 74  
  * --------------------------------------------------------------
 75  
  *
 76  
  */
 77  0
 public class ThresholdBasedOracle implements OutOfCoreOracle {
 78  
   /** The memory pressure at/above which the job would fail */
 79  0
   public static final FloatConfOption FAIL_MEMORY_PRESSURE =
 80  
       new FloatConfOption("giraph.threshold.failPressure", 0.975f,
 81  
           "The memory pressure (fraction of used memory) at/above which the " +
 82  
               "job would fail.");
 83  
   /**
 84  
    * The memory pressure at which the job is cloe to fail, even though we were
 85  
    * using maximal disk bandwidth and minimal network rate. We should reduce
 86  
    * job processing rate.
 87  
    */
 88  0
   public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
 89  
       new FloatConfOption("giraph.threshold.emergencyPressure", 0.925f,
 90  
           "The memory pressure (fraction of used memory) at which the job " +
 91  
               "is close to fail, hence we should reduce its processing rate " +
 92  
               "as much as possible.");
 93  
   /** The memory pressure at which the job is suffering from GC overhead. */
 94  0
   public static final FloatConfOption HIGH_MEMORY_PRESSURE =
 95  
       new FloatConfOption("giraph.threshold.highPressure", 0.875f,
 96  
           "The memory pressure (fraction of used memory) at which the job " +
 97  
               "is suffering from GC overhead.");
 98  
   /**
 99  
    * The memory pressure at which we expect GC to perform optimally for a
 100  
    * memory intensive job.
 101  
    */
 102  0
   public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
 103  
       new FloatConfOption("giraph.threshold.optimalPressure", 0.8f,
 104  
           "The memory pressure (fraction of used memory) at which a " +
 105  
               "memory-intensive job shows the optimal GC behavior.");
 106  
   /**
 107  
    * The memory pressure at/below which the job can use more memory without
 108  
    * suffering from GC overhead.
 109  
    */
 110  0
   public static final FloatConfOption LOW_MEMORY_PRESSURE =
 111  
       new FloatConfOption("giraph.threshold.lowPressure", 0.7f,
 112  
           "The memory pressure (fraction of used memory) at/below which the " +
 113  
               "job can use more memory without suffering the performance.");
 114  
   /** The interval at which memory observer thread wakes up. */
 115  0
   public static final LongConfOption CHECK_MEMORY_INTERVAL =
 116  
       new LongConfOption("giraph.threshold.checkMemoryInterval", 2500,
 117  
           "The interval/period where memory observer thread wakes up and " +
 118  
               "monitors memory footprint (in milliseconds)");
 119  
   /**
 120  
    * Memory observer thread would manually call GC if major/full GC has not
 121  
    * been called for a while. The period where we expect GC to be happened in
 122  
    * past is specified in this parameter
 123  
    */
 124  0
   public static final LongConfOption LAST_GC_CALL_INTERVAL =
 125  
       new LongConfOption("giraph.threshold.lastGcCallInterval", 10 * 1000,
 126  
           "How long after last major/full GC should we call manual GC?");
 127  
 
 128  
   /** Class logger */
 129  0
   private static final Logger LOG =
 130  0
       Logger.getLogger(ThresholdBasedOracle.class);
 131  
   /** Cached value for FAIL_MEMORY_PRESSURE */
 132  
   private final float failMemoryPressure;
 133  
   /** Cached value for EMERGENCY_MEMORY_PRESSURE */
 134  
   private final float emergencyMemoryPressure;
 135  
   /** Cached value for HIGH_MEMORY_PRESSURE */
 136  
   private final float highMemoryPressure;
 137  
   /** Cached value for OPTIMAL_MEMORY_PRESSURE */
 138  
   private final float optimalMemoryPressure;
 139  
   /** Cached value for LOW_MEMORY_PRESSURE */
 140  
   private final float lowMemoryPressure;
 141  
   /** Cached value for CHECK_MEMORY_INTERVAL */
 142  
   private final long checkMemoryInterval;
 143  
   /** Cached value for LAST_GC_CALL_INTERVAL */
 144  
   private final long lastGCCallInterval;
 145  
   /** Out-of-core engine */
 146  
   private final OutOfCoreEngine oocEngine;
 147  
   /** Last time a major/full GC has been called (in milliseconds) */
 148  
   private volatile long lastMajorGCTime;
 149  
   /** Last time a non major/full GC has been called (in milliseconds) */
 150  
   private volatile long lastMinorGCTime;
 151  
 
 152  
   /**
 153  
    * Constructor
 154  
    *
 155  
    * @param conf configuration
 156  
    * @param oocEngine out-of-core engine
 157  
    */
 158  
   public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
 159  0
                               OutOfCoreEngine oocEngine) {
 160  0
     this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf);
 161  0
     this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf);
 162  0
     this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf);
 163  0
     this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
 164  0
     this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
 165  0
     this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
 166  0
     this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
 167  0
     NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
 168  0
     boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
 169  0
     checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
 170  
         "must be enabled. Use giraph.waitForPerWorkerRequests=true");
 171  0
     this.oocEngine = oocEngine;
 172  0
     this.lastMajorGCTime = 0;
 173  
 
 174  0
     ThreadUtils.startThread(new Runnable() {
 175  
       @Override
 176  
       public void run() {
 177  
         while (true) {
 178  0
           double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
 179  0
           long time = System.currentTimeMillis();
 180  0
           if ((usedMemoryFraction > highMemoryPressure &&
 181  0
               time - lastMajorGCTime >= lastGCCallInterval) ||
 182  0
               (usedMemoryFraction > optimalMemoryPressure &&
 183  0
                   time - lastMajorGCTime >= lastGCCallInterval &&
 184  0
                   time - lastMinorGCTime >= lastGCCallInterval)) {
 185  0
             if (LOG.isInfoEnabled()) {
 186  0
               LOG.info("call: last GC happened a while ago and the " +
 187  
                   "amount of used memory is high (used memory " +
 188  
                   "fraction is " +
 189  0
                   String.format("%.2f", usedMemoryFraction) + "). " +
 190  
                   "Calling GC manually");
 191  
             }
 192  0
             System.gc();
 193  0
             time = System.currentTimeMillis() - time;
 194  0
             usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
 195  0
             if (LOG.isInfoEnabled()) {
 196  0
               LOG.info("call: manual GC is done. It took " +
 197  0
                   String.format("%.2f", (double) time / 1000) +
 198  
                   " seconds. Used memory fraction is " +
 199  0
                   String.format("%.2f", usedMemoryFraction));
 200  
             }
 201  
           }
 202  0
           updateRates(usedMemoryFraction);
 203  
           try {
 204  0
             Thread.sleep(checkMemoryInterval);
 205  0
           } catch (InterruptedException e) {
 206  0
             LOG.warn("run: exception occurred!", e);
 207  0
             return;
 208  0
           }
 209  0
         }
 210  
       }
 211  0
     }, "memory-checker", oocEngine.getServiceWorker().getGraphTaskManager().
 212  0
         createUncaughtExceptionHandler());
 213  0
   }
 214  
 
 215  
   /**
 216  
    * Update statistics and rate regarding communication credits and number of
 217  
    * active threads.
 218  
    *
 219  
    * @param usedMemoryFraction the fraction of used memory over max memory
 220  
    */
 221  
   public void updateRates(double usedMemoryFraction) {
 222  
     // Update the fraction of processing threads that should remain active
 223  0
     if (usedMemoryFraction >= failMemoryPressure) {
 224  0
       oocEngine.updateActiveThreadsFraction(0);
 225  0
     } else if (usedMemoryFraction < emergencyMemoryPressure) {
 226  0
       oocEngine.updateActiveThreadsFraction(1);
 227  
     } else {
 228  0
       oocEngine.updateActiveThreadsFraction(1 -
 229  
           (usedMemoryFraction - emergencyMemoryPressure) /
 230  
               (failMemoryPressure - emergencyMemoryPressure));
 231  
     }
 232  
 
 233  
     // Update the fraction of credit that should be used in credit-based flow-
 234  
     // control
 235  0
     if (usedMemoryFraction >= emergencyMemoryPressure) {
 236  0
       oocEngine.updateRequestsCreditFraction(0);
 237  0
     } else if (usedMemoryFraction < optimalMemoryPressure) {
 238  0
       oocEngine.updateRequestsCreditFraction(1);
 239  
     } else {
 240  0
       oocEngine.updateRequestsCreditFraction(1 -
 241  
           (usedMemoryFraction - optimalMemoryPressure) /
 242  
               (emergencyMemoryPressure - optimalMemoryPressure));
 243  
     }
 244  0
   }
 245  
 
 246  
   @Override
 247  
   public IOAction[] getNextIOActions() {
 248  0
     double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
 249  0
     if (LOG.isDebugEnabled()) {
 250  0
       LOG.debug(String.format("getNextIOActions: usedMemoryFraction = %.2f",
 251  0
           usedMemoryFraction));
 252  
     }
 253  0
     if (usedMemoryFraction > highMemoryPressure) {
 254  0
       return new IOAction[]{
 255  
         IOAction.STORE_MESSAGES_AND_BUFFERS,
 256  
         IOAction.STORE_PARTITION};
 257  0
     } else if (usedMemoryFraction > optimalMemoryPressure) {
 258  0
       return new IOAction[]{
 259  
         IOAction.LOAD_UNPROCESSED_PARTITION,
 260  
         IOAction.STORE_MESSAGES_AND_BUFFERS,
 261  
         IOAction.STORE_PROCESSED_PARTITION};
 262  0
     } else if (usedMemoryFraction > lowMemoryPressure) {
 263  0
       return new IOAction[]{
 264  
         IOAction.LOAD_UNPROCESSED_PARTITION,
 265  
         IOAction.STORE_MESSAGES_AND_BUFFERS,
 266  
         IOAction.LOAD_PARTITION};
 267  
     } else {
 268  0
       return new IOAction[]{IOAction.LOAD_PARTITION};
 269  
     }
 270  
   }
 271  
 
 272  
   @Override
 273  
   public boolean approve(IOCommand command) {
 274  0
     return true;
 275  
   }
 276  
 
 277  
   @Override
 278  
   public void commandCompleted(IOCommand command) {
 279  
     // Do nothing
 280  0
   }
 281  
 
 282  
   @Override
 283  
   public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) {
 284  0
     String gcAction = gcInfo.getGcAction().toLowerCase();
 285  0
     if (gcAction.contains("full") || gcAction.contains("major")) {
 286  0
       if (!gcInfo.getGcCause().contains("No GC")) {
 287  0
         lastMajorGCTime = System.currentTimeMillis();
 288  
       }
 289  
     } else {
 290  0
       lastMinorGCTime = System.currentTimeMillis();
 291  
     }
 292  0
   }
 293  
 
 294  
   @Override
 295  
   public void startIteration() {
 296  0
   }
 297  
 }