Coverage Report - org.apache.giraph.ooc.policy.MemoryEstimatorOracle
 
Classes in this File Line Coverage Branch Coverage Complexity
MemoryEstimatorOracle
0%
0/134
0%
0/70
0
MemoryEstimatorOracle$1
0%
0/26
0%
0/10
0
MemoryEstimatorOracle$MemoryEstimator
0%
0/183
0%
0/112
0
MemoryEstimatorOracle$State
0%
0/3
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.ooc.policy;
 19  
 
 20  
 import com.sun.management.GarbageCollectionNotificationInfo;
 21  
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 22  
 import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
 23  
 import org.apache.giraph.comm.NetworkMetrics;
 24  
 import org.apache.giraph.conf.FloatConfOption;
 25  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 26  
 import org.apache.giraph.conf.LongConfOption;
 27  
 import org.apache.giraph.edge.AbstractEdgeStore;
 28  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 29  
 import org.apache.giraph.ooc.command.IOCommand;
 30  
 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
 31  
 import org.apache.giraph.ooc.command.WaitIOCommand;
 32  
 import org.apache.giraph.utils.ThreadUtils;
 33  
 import org.apache.giraph.worker.EdgeInputSplitsCallable;
 34  
 import org.apache.giraph.worker.VertexInputSplitsCallable;
 35  
 import org.apache.giraph.worker.WorkerProgress;
 36  
 import org.apache.log4j.Logger;
 37  
 
 38  
 import javax.annotation.Nullable;
 39  
 import java.lang.management.ManagementFactory;
 40  
 import java.lang.management.MemoryPoolMXBean;
 41  
 import java.lang.management.MemoryUsage;
 42  
 import java.util.ArrayList;
 43  
 import java.util.Arrays;
 44  
 import java.util.List;
 45  
 import java.util.Map;
 46  
 import java.util.concurrent.atomic.AtomicLong;
 47  
 import java.util.concurrent.locks.Lock;
 48  
 import java.util.concurrent.locks.ReentrantLock;
 49  
 
 50  
 import static com.google.common.base.Preconditions.checkState;
 51  
 
 52  
 /**
 53  
  * Implementation of {@link OutOfCoreOracle} that uses a linear regression model
 54  
  * to estimate actual memory usage based on the current state of computation.
 55  
  * The model takes into consideration 5 parameters:
 56  
  *
 57  
  * y = c0 + c1*x1 + c2*x2 + c3*x3 + c4*x4 + c5*x5
 58  
  *
 59  
  * y: memory usage
 60  
  * x1: edges loaded
 61  
  * x2: vertices loaded
 62  
  * x3: vertices processed
 63  
  * x4: bytes received due to messages
 64  
  * x5: bytes loaded/stored from/to disk due to OOC.
 65  
  *
 66  
  */
 67  0
 public class MemoryEstimatorOracle implements OutOfCoreOracle {
 68  
   /** Memory check interval in msec */
 69  0
   public static final LongConfOption CHECK_MEMORY_INTERVAL =
 70  
     new LongConfOption("giraph.garbageEstimator.checkMemoryInterval", 1000,
 71  
         "The interval where memory checker thread wakes up and " +
 72  
             "monitors memory footprint (in milliseconds)");
 73  
   /**
 74  
    * If mem-usage is above this threshold and no Full GC has been called,
 75  
    * we call it manually
 76  
    */
 77  0
   public static final FloatConfOption MANUAL_GC_MEMORY_PRESSURE =
 78  
     new FloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f,
 79  
         "The threshold above which GC is called manually if Full GC has not " +
 80  
             "happened in a while");
 81  
   /** Used to detect a high memory pressure situation */
 82  0
   public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION =
 83  
     new FloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f,
 84  
         "Minimum percentage of memory we expect to be reclaimed after a Full " +
 85  
             "GC. If less than this amount is reclaimed, it is sage to say " +
 86  
             "we are in a high memory situation and the estimation mechanism " +
 87  
             "has not recognized it yet!");
 88  
   /** If mem-usage is above this threshold, active threads are set to 0 */
 89  0
   public static final FloatConfOption AM_HIGH_THRESHOLD =
 90  
     new FloatConfOption("giraph.amHighThreshold", 0.95f,
 91  
         "If mem-usage is above this threshold, all active threads " +
 92  
             "(compute/input) are paused.");
 93  
   /** If mem-usage is below this threshold, active threads are set to max */
 94  0
   public static final FloatConfOption AM_LOW_THRESHOLD =
 95  
     new FloatConfOption("giraph.amLowThreshold", 0.90f,
 96  
         "If mem-usage is below this threshold, all active threads " +
 97  
             "(compute/input) are running.");
 98  
   /** If mem-usage is above this threshold, credit is set to 0 */
 99  0
   public static final FloatConfOption CREDIT_HIGH_THRESHOLD =
 100  
     new FloatConfOption("giraph.creditHighThreshold", 0.95f,
 101  
         "If mem-usage is above this threshold, credit is set to 0");
 102  
   /** If mem-usage is below this threshold, credit is set to max */
 103  0
   public static final FloatConfOption CREDIT_LOW_THRESHOLD =
 104  
     new FloatConfOption("giraph.creditLowThreshold", 0.90f,
 105  
         "If mem-usage is below this threshold, credit is set to max");
 106  
   /** OOC starts if mem-usage is above this threshold */
 107  0
   public static final FloatConfOption OOC_THRESHOLD =
 108  
     new FloatConfOption("giraph.oocThreshold", 0.90f,
 109  
         "If mem-usage is above this threshold, out of core threads starts " +
 110  
             "writing data to disk");
 111  
 
 112  
   /** Logger */
 113  0
   private static final Logger LOG =
 114  0
     Logger.getLogger(MemoryEstimatorOracle.class);
 115  
 
 116  
   /** Cached value for {@link #MANUAL_GC_MEMORY_PRESSURE} */
 117  
   private final float manualGCMemoryPressure;
 118  
   /** Cached value for {@link #GC_MINIMUM_RECLAIM_FRACTION} */
 119  
   private final float gcReclaimFraction;
 120  
   /** Cached value for {@link #AM_HIGH_THRESHOLD} */
 121  
   private final float amHighThreshold;
 122  
   /** Cached value for {@link #AM_LOW_THRESHOLD} */
 123  
   private final float amLowThreshold;
 124  
   /** Cached value for {@link #CREDIT_HIGH_THRESHOLD} */
 125  
   private final float creditHighThreshold;
 126  
   /** Cached value for {@link #CREDIT_LOW_THRESHOLD} */
 127  
   private final float creditLowThreshold;
 128  
   /** Cached value for {@link #OOC_THRESHOLD} */
 129  
   private final float oocThreshold;
 130  
 
 131  
   /** Reference to running OOC engine */
 132  
   private final OutOfCoreEngine oocEngine;
 133  
   /** Memory estimator instance */
 134  
   private final MemoryEstimator memoryEstimator;
 135  
   /** Keeps track of the number of bytes stored/loaded by OOC */
 136  0
   private final AtomicLong oocBytesInjected = new AtomicLong(0);
 137  
   /** How many bytes to offload */
 138  0
   private final AtomicLong numBytesToOffload = new AtomicLong(0);
 139  
   /** Current state of the OOC */
 140  0
   private volatile State state = State.STABLE;
 141  
   /** Timestamp of the last major GC */
 142  0
   private volatile long lastMajorGCTime = 0;
 143  
 
 144  
   /**
 145  
    * Different states the OOC can be in.
 146  
    */
 147  0
   private enum State {
 148  
     /** No offloading */
 149  0
     STABLE,
 150  
     /** Current offloading */
 151  0
     OFFLOADING,
 152  
   }
 153  
 
 154  
   /**
 155  
    * Constructor.
 156  
    * @param conf Configuration
 157  
    * @param oocEngine OOC engine.:w
 158  
    *
 159  
    */
 160  
   public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf,
 161  0
                                final OutOfCoreEngine oocEngine) {
 162  0
     this.oocEngine = oocEngine;
 163  0
     this.memoryEstimator = new MemoryEstimator(this.oocBytesInjected,
 164  0
       oocEngine.getNetworkMetrics());
 165  
 
 166  0
     this.manualGCMemoryPressure = MANUAL_GC_MEMORY_PRESSURE.get(conf);
 167  0
     this.gcReclaimFraction = GC_MINIMUM_RECLAIM_FRACTION.get(conf);
 168  0
     this.amHighThreshold = AM_HIGH_THRESHOLD.get(conf);
 169  0
     this.amLowThreshold = AM_LOW_THRESHOLD.get(conf);
 170  0
     this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf);
 171  0
     this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf);
 172  0
     this.oocThreshold = OOC_THRESHOLD.get(conf);
 173  
 
 174  0
     final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
 175  
 
 176  0
     ThreadUtils.startThread(new Runnable() {
 177  
       @Override
 178  
       public void run() {
 179  
         while (true) {
 180  0
           long oldGenUsageEstimate = memoryEstimator.getUsageEstimate();
 181  0
           MemoryUsage usage = getOldGenUsed();
 182  0
           if (oldGenUsageEstimate > 0) {
 183  0
             updateRates(oldGenUsageEstimate, usage.getMax());
 184  
           } else {
 185  0
             long time = System.currentTimeMillis();
 186  0
             if (time - lastMajorGCTime >= 10000) {
 187  0
               double used = (double) usage.getUsed() / usage.getMax();
 188  0
               if (used > manualGCMemoryPressure) {
 189  0
                 if (LOG.isInfoEnabled()) {
 190  0
                   LOG.info(
 191  
                     "High memory pressure with no full GC from the JVM. " +
 192  
                       "Calling GC manually. Used fraction of old-gen is " +
 193  0
                       String.format("%.2f", used) + ".");
 194  
                 }
 195  0
                 System.gc();
 196  0
                 time = System.currentTimeMillis() - time;
 197  0
                 usage = getOldGenUsed();
 198  0
                 used = (double) usage.getUsed() / usage.getMax();
 199  0
                 if (LOG.isInfoEnabled()) {
 200  0
                   LOG.info("Manual GC done. It took " +
 201  0
                     String.format("%.2f", time / 1000.0) +
 202  
                     " seconds. Used fraction of old-gen is " +
 203  0
                     String.format("%.2f", used) + ".");
 204  
                 }
 205  
               }
 206  
             }
 207  
           }
 208  
           try {
 209  0
             Thread.sleep(checkMemoryInterval);
 210  0
           } catch (InterruptedException e) {
 211  0
             LOG.warn("run: exception occurred!", e);
 212  0
             return;
 213  0
           }
 214  0
         }
 215  
       }
 216  0
     }, "ooc-memory-checker", oocEngine.getServiceWorker().getGraphTaskManager()
 217  0
         .createUncaughtExceptionHandler());
 218  0
   }
 219  
 
 220  
   /**
 221  
    * Resets all the counters used in the memory estimation. This is called at
 222  
    * the beginning of a new superstep.
 223  
    * <p>
 224  
    * The number of vertices to compute in the next superstep gets reset in
 225  
    * {@link org.apache.giraph.graph.GraphTaskManager#processGraphPartitions}
 226  
    * right before
 227  
    * {@link org.apache.giraph.partition.PartitionStore#startIteration()} gets
 228  
    * called.
 229  
    */
 230  
   @Override
 231  
   public void startIteration() {
 232  0
     AbstractEdgeStore.PROGRESS_COUNTER.reset();
 233  0
     oocBytesInjected.set(0);
 234  0
     memoryEstimator.clear();
 235  0
     memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep());
 236  0
     oocEngine.updateRequestsCreditFraction(1);
 237  0
     oocEngine.updateActiveThreadsFraction(1);
 238  0
   }
 239  
 
 240  
 
 241  
   @Override
 242  
   public IOAction[] getNextIOActions() {
 243  0
     if (state == State.OFFLOADING) {
 244  0
       return new IOAction[]{
 245  
         IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION};
 246  
     }
 247  0
     long oldGenUsage = memoryEstimator.getUsageEstimate();
 248  0
     MemoryUsage usage = getOldGenUsed();
 249  0
     if (oldGenUsage > 0) {
 250  0
       double usageEstimate = (double) oldGenUsage / usage.getMax();
 251  0
       if (usageEstimate > oocThreshold) {
 252  0
         return new IOAction[]{
 253  
           IOAction.STORE_MESSAGES_AND_BUFFERS,
 254  
           IOAction.STORE_PARTITION};
 255  
       } else {
 256  0
         return new IOAction[]{IOAction.LOAD_PARTITION};
 257  
       }
 258  
     } else {
 259  0
       return new IOAction[]{IOAction.LOAD_PARTITION};
 260  
     }
 261  
   }
 262  
 
 263  
   @Override
 264  
   public boolean approve(IOCommand command) {
 265  0
     return true;
 266  
   }
 267  
 
 268  
   @Override
 269  
   public void commandCompleted(IOCommand command) {
 270  0
     if (command instanceof LoadPartitionIOCommand) {
 271  0
       oocBytesInjected.getAndAdd(command.bytesTransferred());
 272  0
       if (state == State.OFFLOADING) {
 273  0
         numBytesToOffload.getAndAdd(command.bytesTransferred());
 274  
       }
 275  0
     } else if (!(command instanceof WaitIOCommand)) {
 276  0
       oocBytesInjected.getAndAdd(0 - command.bytesTransferred());
 277  0
       if (state == State.OFFLOADING) {
 278  0
         numBytesToOffload.getAndAdd(0 - command.bytesTransferred());
 279  
       }
 280  
     }
 281  
 
 282  0
     if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) {
 283  0
       numBytesToOffload.set(0);
 284  0
       state = State.STABLE;
 285  0
       updateRates(-1, 1);
 286  
     }
 287  0
   }
 288  
 
 289  
   /**
 290  
    * When a new GC has completed, we can get an accurate measurement of the
 291  
    * memory usage. We use this to update the linear regression model.
 292  
    *
 293  
    * @param gcInfo GC information
 294  
    */
 295  
   @Override
 296  
   public synchronized void gcCompleted(
 297  
     GarbageCollectionNotificationInfo gcInfo) {
 298  0
     String action = gcInfo.getGcAction().toLowerCase();
 299  0
     String cause = gcInfo.getGcCause().toLowerCase();
 300  0
     if (action.contains("major") &&
 301  0
       (cause.contains("ergo") || cause.contains("system"))) {
 302  0
       lastMajorGCTime = System.currentTimeMillis();
 303  0
       MemoryUsage before = null;
 304  0
       MemoryUsage after = null;
 305  
 
 306  
       for (Map.Entry<String, MemoryUsage> entry :
 307  0
         gcInfo.getGcInfo().getMemoryUsageBeforeGc().entrySet()) {
 308  0
         String poolName = entry.getKey();
 309  0
         if (poolName.toLowerCase().contains("old")) {
 310  0
           before = entry.getValue();
 311  0
           after = gcInfo.getGcInfo().getMemoryUsageAfterGc().get(poolName);
 312  0
           break;
 313  
         }
 314  0
       }
 315  0
       if (after == null) {
 316  0
         throw new IllegalStateException("Missing Memory Usage After GC info");
 317  
       }
 318  0
       if (before == null) {
 319  0
         throw new IllegalStateException("Missing Memory Usage Before GC info");
 320  
       }
 321  
 
 322  
       // Compare the estimation with the actual value
 323  0
       long usedMemoryEstimate = memoryEstimator.getUsageEstimate();
 324  0
       long usedMemoryReal = after.getUsed();
 325  0
       if (usedMemoryEstimate >= 0) {
 326  0
         if (LOG.isInfoEnabled()) {
 327  0
           LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + " real=" +
 328  
             usedMemoryReal + " error=" +
 329  0
             ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) /
 330  
               usedMemoryReal * 100));
 331  
         }
 332  
       }
 333  
 
 334  
       // Number of edges loaded so far (if in input superstep)
 335  0
       long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
 336  0
         EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
 337  
       // Number of vertices loaded so far (if in input superstep)
 338  0
       long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
 339  0
         VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
 340  
       // Number of vertices computed (if either in compute or store phase)
 341  0
       long verticesComputed = WorkerProgress.get().getVerticesComputed() +
 342  0
         WorkerProgress.get().getVerticesStored() +
 343  0
         AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
 344  
       // Number of bytes received
 345  0
       long receivedBytes =
 346  0
         oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep();
 347  
       // Number of OOC bytes
 348  0
       long oocBytes = oocBytesInjected.get();
 349  
 
 350  0
       memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded,
 351  
         verticesLoaded, verticesComputed, receivedBytes, oocBytes);
 352  
 
 353  0
       long garbage = before.getUsed() - after.getUsed();
 354  0
       long maxMem = after.getMax();
 355  0
       long memUsed = after.getUsed();
 356  0
       boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction * maxMem &&
 357  
         garbage < gcReclaimFraction * maxMem;
 358  0
       boolean predictionExist = memoryEstimator.getUsageEstimate() > 0;
 359  0
       if (isTight && !predictionExist) {
 360  0
         if (LOG.isInfoEnabled()) {
 361  0
           LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" +
 362  
             memUsed + " maxMem=" + maxMem);
 363  
         }
 364  0
         numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) -
 365  
           (maxMem - memUsed));
 366  0
         if (LOG.isInfoEnabled()) {
 367  0
           LOG.info("gcCompleted: tight memory usage. Starting to offload " +
 368  0
             "until " + numBytesToOffload.get() + " bytes are offloaded");
 369  
         }
 370  0
         state = State.OFFLOADING;
 371  0
         updateRates(1, 1);
 372  
       }
 373  
     }
 374  0
   }
 375  
 
 376  
   /**
 377  
    * Given an estimate for the current memory usage and the maximum available
 378  
    * memory, it updates the active threads and flow control credit in the
 379  
    * OOC engine.
 380  
    *
 381  
    * @param usageEstimateMem Estimate of memory usage.
 382  
    * @param maxMemory Maximum memory.
 383  
    */
 384  
   private void updateRates(long usageEstimateMem, long maxMemory) {
 385  0
     double usageEstimate = (double) usageEstimateMem / maxMemory;
 386  0
     if (usageEstimate > 0) {
 387  0
       if (usageEstimate >= amHighThreshold) {
 388  0
         oocEngine.updateActiveThreadsFraction(0);
 389  0
       } else if (usageEstimate < amLowThreshold) {
 390  0
         oocEngine.updateActiveThreadsFraction(1);
 391  
       } else {
 392  0
         oocEngine.updateActiveThreadsFraction(1 -
 393  
           (usageEstimate - amLowThreshold) /
 394  
             (amHighThreshold - amLowThreshold));
 395  
       }
 396  
 
 397  0
       if (usageEstimate >= creditHighThreshold) {
 398  0
         oocEngine.updateRequestsCreditFraction(0);
 399  0
       } else if (usageEstimate < creditLowThreshold) {
 400  0
         oocEngine.updateRequestsCreditFraction(1);
 401  
       } else {
 402  0
         oocEngine.updateRequestsCreditFraction(1 -
 403  
           (usageEstimate - creditLowThreshold) /
 404  
             (creditHighThreshold - creditLowThreshold));
 405  
       }
 406  
     } else {
 407  0
       oocEngine.updateActiveThreadsFraction(1);
 408  0
       oocEngine.updateRequestsCreditFraction(1);
 409  
     }
 410  0
   }
 411  
 
 412  
   /**
 413  
    * Returns statistics about the old gen pool.
 414  
    * @return {@link MemoryUsage}.
 415  
    */
 416  
   private MemoryUsage getOldGenUsed() {
 417  
     List<MemoryPoolMXBean> memoryPoolList =
 418  0
       ManagementFactory.getMemoryPoolMXBeans();
 419  0
     for (MemoryPoolMXBean pool : memoryPoolList) {
 420  0
       String normalName = pool.getName().toLowerCase();
 421  0
       if (normalName.contains("old") || normalName.contains("tenured")) {
 422  0
         return pool.getUsage();
 423  
       }
 424  0
     }
 425  0
     throw new IllegalStateException("Bad Memory Pool");
 426  
   }
 427  
 
 428  
   /**
 429  
    * Maintains statistics about the current state and progress of the
 430  
    * computation and produces estimates of memory usage using a technique
 431  
    * based on linear regression.
 432  
    *
 433  
    * Upon a GC events, it gets updated with the most recent statistics through
 434  
    * the {@link #addRecord} method.
 435  
    */
 436  
   private static class MemoryEstimator {
 437  
     /** Stores the (x1,x2,...,x5) arrays of data samples, one for each sample */
 438  0
     private List<double[]> dataSamples = new ArrayList<>();
 439  
     /** Stores the y memory usage dataSamples, one for each sample */
 440  0
     private DoubleArrayList memorySamples = new DoubleArrayList();
 441  
     /** Stores the coefficients computed by the linear regression model */
 442  0
     private double[] coefficient = new double[6];
 443  
     /** Stores the column indices that can be used in the regression model */
 444  0
     private List<Integer> validColumnIndices = new ArrayList<>();
 445  
     /** Potentially out-of-range coefficient values */
 446  0
     private double[] extreme = new double[6];
 447  
     /** Indicates whether current coefficients can be used for estimation */
 448  0
     private boolean isValid = false;
 449  
     /** Implementation of linear regression */
 450  0
     private OLSMultipleLinearRegression mlr = new OLSMultipleLinearRegression();
 451  
     /** Used to synchronize access to the data samples */
 452  0
     private Lock lock = new ReentrantLock();
 453  
     /** The estimation method depends on the current superstep. */
 454  0
     private long currentSuperstep = -1;
 455  
     /** The estimation method depends on the bytes injected. */
 456  
     private final AtomicLong oocBytesInjected;
 457  
     /** Provides network statistics */
 458  
     private final NetworkMetrics networkMetrics;
 459  
 
 460  
     /**
 461  
      * Constructor
 462  
      * @param oocBytesInjected Reference to {@link AtomicLong} object
 463  
      *                         maintaining the number of OOC bytes stored.
 464  
      * @param networkMetrics Interface to get network stats.
 465  
      */
 466  
     public MemoryEstimator(AtomicLong oocBytesInjected,
 467  0
                            NetworkMetrics networkMetrics) {
 468  0
       this.oocBytesInjected = oocBytesInjected;
 469  0
       this.networkMetrics = networkMetrics;
 470  0
     }
 471  
 
 472  
 
 473  
     /**
 474  
      * Clear data structure (called from single threaded program).
 475  
      */
 476  
     public void clear() {
 477  0
       dataSamples.clear();
 478  0
       memorySamples.clear();
 479  0
       isValid = false;
 480  0
     }
 481  
 
 482  
     public void setCurrentSuperstep(long superstep) {
 483  0
       this.currentSuperstep = superstep;
 484  0
     }
 485  
 
 486  
     /**
 487  
      * Given the current state of computation (i.e. current edges loaded,
 488  
      * vertices computed etc) and the current model (i.e. the regression
 489  
      * coefficient), it returns a prediction about the memory usage in bytes.
 490  
      *
 491  
      * @return Memory estimate in bytes.
 492  
      */
 493  
     public long getUsageEstimate() {
 494  0
       long usage = -1;
 495  0
       lock.lock();
 496  
       try {
 497  0
         if (isValid) {
 498  
           // Number of edges loaded so far (if in input superstep)
 499  0
           long edgesLoaded = currentSuperstep >= 0 ? 0 :
 500  0
             EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
 501  
           // Number of vertices loaded so far (if in input superstep)
 502  0
           long verticesLoaded = currentSuperstep >= 0 ? 0 :
 503  0
             VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
 504  
           // Number of vertices computed (if either in compute or store phase)
 505  0
           long verticesComputed = WorkerProgress.get().getVerticesComputed() +
 506  0
             WorkerProgress.get().getVerticesStored() +
 507  0
             AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
 508  
           // Number of bytes received
 509  0
           long receivedBytes = networkMetrics.getBytesReceivedPerSuperstep();
 510  
           // Number of OOC bytes
 511  0
           long oocBytes = this.oocBytesInjected.get();
 512  
 
 513  0
           usage = (long) (edgesLoaded * coefficient[0] +
 514  
             verticesLoaded * coefficient[1] +
 515  
             verticesComputed * coefficient[2] +
 516  
             receivedBytes * coefficient[3] +
 517  
             oocBytes * coefficient[4] +
 518  
             coefficient[5]);
 519  
         }
 520  
       } finally {
 521  0
         lock.unlock();
 522  0
       }
 523  0
       return usage;
 524  
     }
 525  
 
 526  
     /**
 527  
      * Updates the linear regression model with a new data point.
 528  
      *
 529  
      * @param memUsed Current real value of memory usage.
 530  
      * @param edges Number of edges loaded.
 531  
      * @param vertices Number of vertices loaded.
 532  
      * @param verticesProcessed Number of vertices processed.
 533  
      * @param bytesReceived Number of bytes received.
 534  
      * @param oocBytesInjected Number of bytes stored/loaded due to OOC.
 535  
      */
 536  
     public void addRecord(long memUsed, long edges, long vertices,
 537  
                           long verticesProcessed,
 538  
                           long bytesReceived, long oocBytesInjected) {
 539  0
       checkState(memUsed > 0, "Memory Usage cannot be negative");
 540  0
       if (dataSamples.size() > 0) {
 541  0
         double[] last = dataSamples.get(dataSamples.size() - 1);
 542  0
         if (edges == last[0] && vertices == last[1] &&
 543  
           verticesProcessed == last[2] && bytesReceived == last[3] &&
 544  
           oocBytesInjected == last[4]) {
 545  0
           if (LOG.isDebugEnabled()) {
 546  0
             LOG.debug(
 547  
               "addRecord: avoiding to add the same entry as the last one!");
 548  
           }
 549  0
           return;
 550  
         }
 551  
       }
 552  0
       dataSamples.add(new double[] {edges, vertices, verticesProcessed,
 553  
         bytesReceived, oocBytesInjected});
 554  0
       memorySamples.add((double) memUsed);
 555  
 
 556  
       // Weed out the columns that are all zero
 557  0
       validColumnIndices.clear();
 558  0
       for (int i = 0; i < 5; ++i) {
 559  0
         boolean validIndex = false;
 560  
         // Check if there is a non-zero entry in the column
 561  0
         for (double[] value : dataSamples) {
 562  0
           if (value[i] != 0) {
 563  0
             validIndex = true;
 564  0
             break;
 565  
           }
 566  0
         }
 567  0
         if (validIndex) {
 568  
           // check if all entries are not equal to each other
 569  0
           double firstValue = -1;
 570  0
           boolean allEqual = true;
 571  0
           for (double[] value : dataSamples) {
 572  0
             if (firstValue == -1) {
 573  0
               firstValue = value[i];
 574  
             } else {
 575  0
               if (Math.abs((value[i] - firstValue) / firstValue) > 0.01) {
 576  0
                 allEqual = false;
 577  0
                 break;
 578  
               }
 579  
             }
 580  0
           }
 581  0
           validIndex = !allEqual;
 582  0
           if (validIndex) {
 583  
             // Check if the column has linear dependency with another column
 584  0
             for (int col = i + 1; col < 5; ++col) {
 585  0
               if (isLinearDependence(dataSamples, i, col)) {
 586  0
                 validIndex = false;
 587  0
                 break;
 588  
               }
 589  
             }
 590  
           }
 591  
         }
 592  
 
 593  0
         if (validIndex) {
 594  0
           validColumnIndices.add(i);
 595  
         }
 596  
       }
 597  
 
 598  
       // If we filtered out columns in the previous step, we are going to run
 599  
       // the regression without those columns.
 600  
 
 601  
       // Create the coefficient table
 602  0
       boolean setIsValid = false;
 603  0
       lock.lock();
 604  
       try {
 605  0
         if (validColumnIndices.size() >= 1 &&
 606  0
           dataSamples.size() >= validColumnIndices.size() + 1) {
 607  
 
 608  0
           double[][] xValues = new double[dataSamples.size()][];
 609  0
           fillXMatrix(dataSamples, validColumnIndices, xValues);
 610  0
           double[] yValues =
 611  0
               memorySamples.toDoubleArray(new double[memorySamples.size()]);
 612  0
           mlr.newSampleData(yValues, xValues);
 613  0
           boolean isRegressionValid =
 614  0
             calculateRegression(coefficient, validColumnIndices, mlr);
 615  
 
 616  0
           if (!isRegressionValid) { // invalid regression result
 617  0
             return; // The finally-block at the end will release any locks.
 618  
           }
 619  
 
 620  
           // After the computation of the regression, some coefficients may have
 621  
           // values outside the valid value range. In this case, we set the
 622  
           // coefficient to the minimum or maximum value allowed, and re-run the
 623  
           // regression.
 624  
           // We only care about coefficients of two of the variables:
 625  
           // bytes received due to messages (receivedBytes -- index 3 in
 626  
           // `coefficient` array) and bytes transferred due to OOC (oocBytes --
 627  
           // index 4 in `coefficient` array).
 628  
           //
 629  
           // receivedByte's coefficient cannot be negative, meaning that it does
 630  
           // not make sense that memory footprint decreases because of receipt
 631  
           // of messages. We either have message combiner or we do not have
 632  
           // combiner. If message combiner is used, the memory footprint
 633  
           // will not change much due to messages leading to the coefficient for
 634  
           // oocBytes to be close to 0. If message combiner is not used, the
 635  
           // memory only increase with messages, and the coefficient should be
 636  
           // positive. In this case, a message is usually deserialized and then
 637  
           // written to the message store. We assume that the process of
 638  
           // deserializing the message and putting it into the message store
 639  
           // will not result in more than twice the size of the serialized form
 640  
           // of the message (meaning that the memory footprint for message store
 641  
           // will not be more than 2*receivedBytes). Based on this assumption
 642  
           // the upper bound for coefficient of receivedBytes should be 2.
 643  
           //
 644  
           // "oocBytes" represents the size of the serialized form of data that
 645  
           // has transferred to/from secondary storage. On the other hand, in
 646  
           // our estimation mechanism, we are estimating the aggregate size of
 647  
           // all live objects in memory, meaning that we are estimating the size
 648  
           // of deserialized for of data in memory. Since we are not using any
 649  
           // (de)compression for (de)serialization of data, we assume that
 650  
           // size of serialized data <= size of deserialized data <=
 651  
           // 2 * (size of serialized dat)
 652  
           // This basically means that the coefficient for oocBytes should be
 653  
           // between 1 and 2.
 654  
 
 655  
           boolean changed;
 656  0
           extreme[3] = -1;
 657  0
           extreme[4] = -1;
 658  
           do {
 659  0
             Boolean result = null;
 660  
 
 661  0
             result = refineCoefficient(4, 1, 2, xValues, yValues);
 662  0
             if (result == null) { // invalid regression result
 663  0
               return;  // finally-block will release lock
 664  
             }
 665  0
             changed = result;
 666  
 
 667  0
             result = refineCoefficient(3, 0, 2, xValues, yValues);
 668  0
             if (result == null) { // invalid regression result
 669  0
               return;  // finally-block will release lock
 670  
             }
 671  0
             changed |= result;
 672  0
           } while (changed);
 673  0
           if (extreme[3] != -1) {
 674  0
             coefficient[3] = extreme[3];
 675  
           }
 676  0
           if (extreme[4] != -1) {
 677  0
             coefficient[4] = extreme[4];
 678  
           }
 679  0
           setIsValid = true;
 680  0
           return; // the finally-block will execute before return
 681  
         }
 682  
       } finally {
 683  
         // This inner try-finally block is necessary to ensure that the
 684  
         // lock is always released.
 685  0
         try {
 686  0
           isValid = setIsValid;
 687  0
           printStats();
 688  
         } finally {
 689  0
           lock.unlock();
 690  0
         }
 691  0
       }
 692  0
     }
 693  
 
 694  
     /**
 695  
      * Certain coefficients need to be within a specific range.
 696  
      * If the coefficient is not in this range, we set it to the closest bound
 697  
      * and re-run the linear regression. In this case, we keep the possible
 698  
      * extremum coefficient in an intermediate array ("extreme"). Also, if
 699  
      * we choose the extremum coefficient for an index, that index is removed
 700  
      * from the regression calculation as well as the rest of the refinement
 701  
      * process.
 702  
      *
 703  
      * Note that the regression calculation here is based on the method of
 704  
      * least square for minimizing the error. The sum of squares of errors for
 705  
      * all points is a convex function. This means if we solve the
 706  
      * non-constrained linear regression and then refine the coefficient to
 707  
      * apply their bounds, we will achieve a solution to our constrained
 708  
      * linear regression problem.
 709  
      *
 710  
      * This method is called in a loop to refine certain coefficients. The loop
 711  
      * should continue until all coefficients are refined and are within their
 712  
      * range.
 713  
      *
 714  
      * @param coefIndex Coefficient index
 715  
      * @param lowerBound Lower bound
 716  
      * @param upperBound Upper bound
 717  
      * @param xValues double[][] matrix with data samples
 718  
      * @param yValues double[] matrix with y samples
 719  
      * @return True if coefficients were out-of-range, false otherwise. A null
 720  
      *         value means the regression result was invalid and the result of
 721  
      *         this method is invalid too.
 722  
      */
 723  
     @Nullable
 724  
     private Boolean refineCoefficient(int coefIndex, double lowerBound,
 725  
       double upperBound, double[][] xValues, double[] yValues) {
 726  
 
 727  0
       boolean result = false;
 728  0
       if (coefficient[coefIndex] < lowerBound ||
 729  
         coefficient[coefIndex] > upperBound) {
 730  
 
 731  
         double value;
 732  0
         if (coefficient[coefIndex] < lowerBound) {
 733  0
           value = lowerBound;
 734  
         } else {
 735  0
           value = upperBound;
 736  
         }
 737  0
         int ptr = -1;
 738  
         // Finding the 'coefIndex' in the valid indices. Since this method is
 739  
         // used only for the variables with higher indices, we use a reverse
 740  
         // loop to lookup the 'coefIndex' for faster termination of the loop.
 741  0
         for (int i = validColumnIndices.size() - 1; i >= 0; --i) {
 742  0
           if (validColumnIndices.get(i) == coefIndex) {
 743  0
             ptr = i;
 744  0
             break;
 745  
           }
 746  
         }
 747  0
         if (ptr != -1) {
 748  0
           if (LOG.isDebugEnabled()) {
 749  0
             LOG.debug("addRecord: coefficient at index " + coefIndex +
 750  
               " is wrong in the regression, setting it to " + value);
 751  
           }
 752  
           // remove from valid column
 753  0
           validColumnIndices.remove(ptr);
 754  
           // re-create the X matrix
 755  0
           fillXMatrix(dataSamples, validColumnIndices, xValues);
 756  
           // adjust Y values
 757  0
           for (int i = 0; i < memorySamples.size(); ++i) {
 758  0
             yValues[i] -= value * dataSamples.get(i)[coefIndex];
 759  
           }
 760  
           // save new coefficient value in intermediate array
 761  0
           extreme[coefIndex] = value;
 762  
           // re-run regression
 763  0
           mlr.newSampleData(yValues, xValues);
 764  0
           result = calculateRegression(coefficient, validColumnIndices, mlr);
 765  0
           if (!result) { // invalid regression result
 766  0
             return null;
 767  
           }
 768  
         } else {
 769  0
           if (LOG.isDebugEnabled()) {
 770  0
             LOG.debug(
 771  
               "addRecord: coefficient was not in the regression, " +
 772  
                 "setting it to the extreme of the bound");
 773  
           }
 774  0
           result = false;
 775  
         }
 776  0
         coefficient[coefIndex] = value;
 777  
       }
 778  0
       return result;
 779  
     }
 780  
 
 781  
     /**
 782  
      * Calculates the regression.
 783  
      * @param coefficient Array of coefficients
 784  
      * @param validColumnIndices List of valid columns
 785  
      * @param mlr {@link OLSMultipleLinearRegression} instance.
 786  
      * @return True if the result is valid, false otherwise.
 787  
      */
 788  
     private static boolean calculateRegression(double[] coefficient,
 789  
       List<Integer> validColumnIndices, OLSMultipleLinearRegression mlr) {
 790  
 
 791  0
       if (coefficient.length != validColumnIndices.size()) {
 792  0
         LOG.info("There are " + coefficient.length +
 793  0
           " coefficients, and " + validColumnIndices.size() +
 794  
           " valid columns in the regression");
 795  
       }
 796  
 
 797  0
       double[] beta = mlr.estimateRegressionParameters();
 798  0
       Arrays.fill(coefficient, 0);
 799  0
       for (int i = 0; i < validColumnIndices.size(); ++i) {
 800  0
         coefficient[validColumnIndices.get(i)] = beta[i];
 801  
       }
 802  0
       coefficient[5] = beta[validColumnIndices.size()];
 803  0
       return true;
 804  
     }
 805  
 
 806  
     /**
 807  
      * Copies values from a List of double[] to a double[][]. Takes into
 808  
      * consideration the list of valid column indices.
 809  
      * @param sourceValues Source List of double[]
 810  
      * @param validColumnIndices Valid column indices
 811  
      * @param xValues Target double[][] matrix.
 812  
      */
 813  
     private static void fillXMatrix(List<double[]> sourceValues,
 814  
                                     List<Integer> validColumnIndices,
 815  
                                     double[][] xValues) {
 816  
 
 817  0
       for (int i = 0; i < sourceValues.size(); ++i) {
 818  0
         xValues[i] = new double[validColumnIndices.size() + 1];
 819  0
         for (int j = 0; j < validColumnIndices.size(); ++j) {
 820  0
           xValues[i][j] = sourceValues.get(i)[validColumnIndices.get(j)];
 821  
         }
 822  0
         xValues[i][validColumnIndices.size()] = 1;
 823  
       }
 824  0
     }
 825  
 
 826  
     /**
 827  
      * Utility function that checks whether two doubles are equals given an
 828  
      * accuracy tolerance.
 829  
      *
 830  
      * @param val1 First value
 831  
      * @param val2 Second value
 832  
      * @return True if within a threshold
 833  
      */
 834  
     private static boolean equal(double val1, double val2) {
 835  0
       return Math.abs(val1 - val2) < 0.01;
 836  
     }
 837  
 
 838  
     /**
 839  
      * Utility function that checks if two columns have linear dependence.
 840  
      *
 841  
      * @param values Matrix in the form of a List of double[] values.
 842  
      * @param col1 First column index
 843  
      * @param col2 Second column index
 844  
      * @return True if there is linear dependence.
 845  
      */
 846  
     private static boolean isLinearDependence(List<double[]> values,
 847  
                                               int col1, int col2) {
 848  0
       boolean firstValSeen = false;
 849  0
       double firstVal = 0;
 850  0
       for (double[] value : values) {
 851  0
         double val1 = value[col1];
 852  0
         double val2 = value[col2];
 853  0
         if (equal(val1, 0)) {
 854  0
           if (equal(val2, 0)) {
 855  0
             continue;
 856  
           } else {
 857  0
             return false;
 858  
           }
 859  
         }
 860  0
         if (equal(val2, 0)) {
 861  0
           return false;
 862  
         }
 863  0
         if (!firstValSeen) {
 864  0
           firstVal = val1 / val2;
 865  0
           firstValSeen = true;
 866  
         } else {
 867  0
           if (!equal((val1 / val2 - firstVal) / firstVal, 0)) {
 868  0
             return false;
 869  
           }
 870  
         }
 871  0
       }
 872  0
       return true;
 873  
     }
 874  
 
 875  
     /**
 876  
      * Prints statistics about the regression model.
 877  
      */
 878  
     private void printStats() {
 879  0
       if (LOG.isDebugEnabled()) {
 880  0
         StringBuilder sb = new StringBuilder();
 881  0
         sb.append(
 882  
           "\nEDGES\t\tVERTICES\t\tV_PROC\t\tRECEIVED\t\tOOC\t\tMEM_USED\n");
 883  0
         for (int i = 0; i < dataSamples.size(); ++i) {
 884  0
           for (int j = 0; j < dataSamples.get(i).length; ++j) {
 885  0
             sb.append(String.format("%.2f\t\t", dataSamples.get(i)[j]));
 886  
           }
 887  0
           sb.append(memorySamples.get(i));
 888  0
           sb.append("\n");
 889  
         }
 890  0
         sb.append("COEFFICIENT:\n");
 891  0
         for (int i = 0; i < coefficient.length; ++i) {
 892  0
           sb.append(String.format("%.2f\t\t", coefficient[i]));
 893  
         }
 894  0
         sb.append("\n");
 895  0
         LOG.debug("printStats: isValid=" + isValid + sb.toString());
 896  
       }
 897  0
     }
 898  
   }
 899  
 }