Coverage Report - org.apache.giraph.ooc.OutOfCoreEngine
 
Classes in this File Line Coverage Branch Coverage Complexity
OutOfCoreEngine
0%
0/149
0%
0/44
0
OutOfCoreEngine$1
0%
0/2
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc;
 20  
 
 21  
 import com.sun.management.GarbageCollectionNotificationInfo;
 22  
 import com.yammer.metrics.core.Gauge;
 23  
 import org.apache.giraph.bsp.BspService;
 24  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 25  
 import org.apache.giraph.comm.NetworkMetrics;
 26  
 import org.apache.giraph.comm.ServerData;
 27  
 import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
 28  
 import org.apache.giraph.comm.flow_control.FlowControl;
 29  
 import org.apache.giraph.conf.GiraphConstants;
 30  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 31  
 import org.apache.giraph.metrics.GiraphMetrics;
 32  
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 33  
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 34  
 import org.apache.giraph.ooc.data.MetaPartitionManager;
 35  
 import org.apache.giraph.ooc.command.IOCommand;
 36  
 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
 37  
 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 38  
 import org.apache.giraph.ooc.policy.FixedPartitionsOracle;
 39  
 import org.apache.giraph.ooc.policy.OutOfCoreOracle;
 40  
 import org.apache.giraph.utils.AdjustableSemaphore;
 41  
 import org.apache.giraph.worker.BspServiceWorker;
 42  
 import org.apache.log4j.Logger;
 43  
 
 44  
 import java.lang.reflect.Constructor;
 45  
 import java.lang.reflect.InvocationTargetException;
 46  
 import java.util.concurrent.locks.ReadWriteLock;
 47  
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 48  
 
 49  
 import static com.google.common.base.Preconditions.checkState;
 50  
 
 51  
 /**
 52  
  * Class to represent an out-of-core engine.
 53  
  */
 54  0
 public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
 55  
   /**
 56  
    * Number of 'units of processing' after which an active thread should
 57  
    * check-in with the out-of-core engine in order to re-claim its permission to
 58  
    * stay active. For a compute thread, the 'unit of processing' is processing
 59  
    * of one vertex, and for an input thread, the 'unit of processing' is reading
 60  
    * a row of input data.
 61  
    */
 62  
   public static final int CHECK_IN_INTERVAL = (1 << 10) - 1;
 63  
   /** Name of metric for percentage of graph on disk */
 64  
   public static final String GRAPH_PERCENTAGE_IN_MEMORY = "ooc-graph-in-mem-%";
 65  
   /** Class logger. */
 66  0
   private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class);
 67  
   /**
 68  
    * When getting partitions, how many milliseconds to wait if no partition was
 69  
    * available in memory
 70  
    */
 71  
   private static final long MSEC_TO_WAIT = 10000;
 72  
   /** Service worker */
 73  
   private final CentralizedServiceWorker<?, ?, ?> service;
 74  
   /** Flow control used in sending requests */
 75  
   private FlowControl flowControl;
 76  
   /** Scheduler for IO threads */
 77  
   private final OutOfCoreIOScheduler ioScheduler;
 78  
   /** Data structure to keep meta partition information */
 79  
   private final MetaPartitionManager metaPartitionManager;
 80  
   /** Out-of-core oracle (brain of out-of-core mechanism) */
 81  
   private final OutOfCoreOracle oracle;
 82  
   /** IO statistics collector */
 83  
   private final OutOfCoreIOStatistics statistics;
 84  
   /**
 85  
    * Global lock for entire superstep. This lock helps to avoid overlapping of
 86  
    * out-of-core decisions (what to do next to help the out-of-core mechanism)
 87  
    * with out-of-core operations (actual IO operations).
 88  
    */
 89  0
   private final ReadWriteLock superstepLock = new ReentrantReadWriteLock();
 90  
   /** Data accessor object (DAO) used as persistence layer in out-of-core */
 91  
   private final OutOfCoreDataAccessor dataAccessor;
 92  
   /** Callable factory for IO threads */
 93  
   private final OutOfCoreIOCallableFactory oocIOCallableFactory;
 94  
   /**
 95  
    * Dummy object to wait on until a partition becomes available in memory
 96  
    * for processing
 97  
    */
 98  0
   private final Object partitionAvailable = new Object();
 99  
   /** How many compute threads do we have? */
 100  
   private int numComputeThreads;
 101  
   /** How many threads (input/compute) are processing data? */
 102  
   private volatile int numProcessingThreads;
 103  
   /** Semaphore used for controlling number of active threads at each moment */
 104  
   private final AdjustableSemaphore activeThreadsPermit;
 105  
   /**
 106  
    * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
 107  
    * credit used for credit-based flow-control mechanism)
 108  
    */
 109  
   private final short maxRequestsCredit;
 110  
   /**
 111  
    * Generally, the logic in Giraph for change of the superstep happens in the
 112  
    * following order:
 113  
    *   (1) Compute threads are done processing all partitions
 114  
    *   (2) Superstep number increases
 115  
    *   (3) New message store is created and message stores are prepared
 116  
    *   (4) Iteration over partitions starts
 117  
    * Note that there are other operations happening at the same time as well as
 118  
    * the above operations, but the above operations are the ones which may
 119  
    * interfere with out-of-core operations. The goal of `superstepLock` is to
 120  
    * isolate operations 2, 3, and 4 from the rest of computations and IO
 121  
    * operations. Specifically, increasing the superstep counter (operation 2)
 122  
    * should be exclusive and no IO operation should happen at the same time.
 123  
    * This is due to the fact that prefetching mechanism uses superstep counter
 124  
    * as a mean to identify which data should be read. That being said, superstep
 125  
    * counter should be cached in out-of-core engine, and all IO operations and
 126  
    * out-of-core logic should access superstep counter through this cached
 127  
    * value.
 128  
    */
 129  
   private long superstep;
 130  
   /**
 131  
    * Generally, the logic of a graph computations happens in the following order
 132  
    * with respect to `startIteration` and `reset` method:
 133  
    * ...
 134  
    * startIteration (for moving edges)
 135  
    * ...
 136  
    * reset (to prepare messages/partitions for superstep 0)
 137  
    * ...
 138  
    * startIteration (superstep 0)
 139  
    * ...
 140  
    * reset (to prepare messages/partitions for superstep 1)
 141  
    * ...
 142  
    *
 143  
    * However, in the unit tests, we usually consider only one superstep (usually
 144  
    * INPUT_SUPERSTEP), and we move through partitions multiple times. Out-of-
 145  
    * core mechanism works only if partitions are reset in a proper way. So,
 146  
    * we keep the following flag to reset partitions if necessary.
 147  
    */
 148  
   private boolean resetDone;
 149  
 
 150  
   /**
 151  
    * Provides statistics about network traffic (e.g. received bytes per
 152  
    * superstep etc).
 153  
    */
 154  
   private final NetworkMetrics networkMetrics;
 155  
 
 156  
   /**
 157  
    * Constructor
 158  
    *
 159  
    * @param conf Configuration
 160  
    * @param service Service worker
 161  
    * @param networkMetrics Interface for network stats
 162  
    */
 163  
   public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
 164  
                          CentralizedServiceWorker<?, ?, ?> service,
 165  0
                          NetworkMetrics networkMetrics) {
 166  0
     this.service = service;
 167  0
     this.networkMetrics = networkMetrics;
 168  0
     Class<? extends OutOfCoreDataAccessor> accessorClass =
 169  0
         GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf);
 170  
     try {
 171  0
       Constructor<?> constructor = accessorClass.getConstructor(
 172  
           ImmutableClassesGiraphConfiguration.class);
 173  0
       this.dataAccessor = (OutOfCoreDataAccessor) constructor.newInstance(conf);
 174  0
     } catch (NoSuchMethodException | InstantiationException |
 175  
         InvocationTargetException | IllegalAccessException e) {
 176  0
       throw new IllegalStateException("OutOfCoreEngine: caught exception " +
 177  
           "while creating the data accessor instance!", e);
 178  0
     }
 179  0
     int numIOThreads = dataAccessor.getNumAccessorThreads();
 180  0
     this.oocIOCallableFactory =
 181  
         new OutOfCoreIOCallableFactory(this, numIOThreads,
 182  0
             service.getGraphTaskManager().createUncaughtExceptionHandler());
 183  0
     this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads);
 184  0
     this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
 185  0
     this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads);
 186  0
     int maxPartitionsInMemory =
 187  0
         GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
 188  0
     Class<? extends OutOfCoreOracle> oracleClass =
 189  0
         GiraphConstants.OUT_OF_CORE_ORACLE.get(conf);
 190  0
     if (maxPartitionsInMemory != 0 &&
 191  
         oracleClass != FixedPartitionsOracle.class) {
 192  0
       LOG.warn("OutOfCoreEngine: Max number of partitions in memory is set " +
 193  
           "but the out-of-core oracle used is not tailored for fixed " +
 194  
           "out-of-core policy. Setting the oracle to be FixedPartitionsOracle");
 195  0
       oracleClass = FixedPartitionsOracle.class;
 196  
     }
 197  0
     this.numComputeThreads = conf.getNumComputeThreads();
 198  
     // At the beginning of the execution, only input threads are processing data
 199  0
     this.numProcessingThreads = conf.getNumInputSplitsThreads();
 200  0
     this.activeThreadsPermit = new AdjustableSemaphore(numProcessingThreads);
 201  0
     this.maxRequestsCredit = (short)
 202  0
         CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
 203  0
     this.superstep = BspService.INPUT_SUPERSTEP;
 204  0
     this.resetDone = false;
 205  0
     GiraphMetrics.get().addSuperstepResetObserver(this);
 206  
     try {
 207  0
       Constructor<?> constructor = oracleClass.getConstructor(
 208  
         ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
 209  0
       this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
 210  0
     } catch (NoSuchMethodException | IllegalAccessException |
 211  
       InstantiationException | InvocationTargetException e) {
 212  0
       throw new IllegalStateException("OutOfCoreEngine: caught exception " +
 213  
         "while creating the oracle!", e);
 214  0
     }
 215  0
   }
 216  
 
 217  
   /**
 218  
    * Initialize/Start the out-of-core engine.
 219  
    */
 220  
   public void initialize() {
 221  0
     dataAccessor.initialize();
 222  0
     oocIOCallableFactory.createCallable();
 223  0
   }
 224  
 
 225  
   /**
 226  
    * Shutdown/Stop the out-of-core engine.
 227  
    */
 228  
   public void shutdown() {
 229  0
     if (LOG.isInfoEnabled()) {
 230  0
       LOG.info("shutdown: out-of-core engine shutting down, signalling IO " +
 231  
           "threads to shutdown");
 232  
     }
 233  0
     ioScheduler.shutdown();
 234  0
     oocIOCallableFactory.shutdown();
 235  0
     dataAccessor.shutdown();
 236  0
   }
 237  
 
 238  
   /**
 239  
    * Get a reference to the server data
 240  
    *
 241  
    * @return ServerData
 242  
    */
 243  
   public ServerData getServerData() {
 244  0
     return service.getServerData();
 245  
   }
 246  
 
 247  
   /**
 248  
    * Get a reference to the service worker
 249  
    *
 250  
    * @return CentralizedServiceWorker
 251  
    */
 252  
   public CentralizedServiceWorker getServiceWorker() {
 253  0
     return service;
 254  
   }
 255  
 
 256  
   /**
 257  
    * Get a reference to IO scheduler
 258  
    *
 259  
    * @return OutOfCoreIOScheduler
 260  
    */
 261  
   public OutOfCoreIOScheduler getIOScheduler() {
 262  0
     return ioScheduler;
 263  
   }
 264  
 
 265  
   /**
 266  
    * Get a reference to meta partition information
 267  
    *
 268  
    * @return MetaPartitionManager
 269  
    */
 270  
   public MetaPartitionManager getMetaPartitionManager() {
 271  0
     return metaPartitionManager;
 272  
   }
 273  
 
 274  
   /**
 275  
    * Get a reference to superstep lock
 276  
    *
 277  
    * @return read/write lock used for global superstep lock
 278  
    */
 279  
   public ReadWriteLock getSuperstepLock() {
 280  0
     return superstepLock;
 281  
   }
 282  
 
 283  
   /**
 284  
    * Get a reference to IO statistics collector
 285  
    *
 286  
    * @return IO statistics collector
 287  
    */
 288  
   public OutOfCoreIOStatistics getIOStatistics() {
 289  0
     return statistics;
 290  
   }
 291  
 
 292  
   /**
 293  
    * Get a reference to out-of-core oracle
 294  
    *
 295  
    * @return out-of-core oracle
 296  
    */
 297  
   public OutOfCoreOracle getOracle() {
 298  0
     return oracle;
 299  
   }
 300  
 
 301  
   /**
 302  
    * Get the id of the next partition to process in the current iteration over
 303  
    * all the partitions. If all partitions are already processed, this method
 304  
    * returns null.
 305  
    *
 306  
    * @return id of a partition to process. 'null' if all partitions are
 307  
    *         processed in current iteration over partitions.
 308  
    */
 309  
   public Integer getNextPartition() {
 310  
     Integer partitionId;
 311  0
     synchronized (partitionAvailable) {
 312  0
       while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
 313  
         try {
 314  0
           if (LOG.isInfoEnabled()) {
 315  0
             LOG.info("getNextPartition: waiting until a partition becomes " +
 316  
                 "available!");
 317  
           }
 318  0
           partitionAvailable.wait(MSEC_TO_WAIT);
 319  0
         } catch (InterruptedException e) {
 320  0
           throw new IllegalStateException("getNextPartition: caught " +
 321  
               "InterruptedException while waiting to retrieve a partition to " +
 322  
               "process");
 323  0
         }
 324  
       }
 325  0
       if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
 326  0
         partitionAvailable.notifyAll();
 327  0
         partitionId = null;
 328  
       }
 329  0
     }
 330  0
     return partitionId;
 331  
   }
 332  
 
 333  
   /**
 334  
    * Notify out-of-core engine that processing of a particular partition is done
 335  
    *
 336  
    * @param partitionId id of the partition that its processing is done
 337  
    */
 338  
   public void doneProcessingPartition(int partitionId) {
 339  0
     metaPartitionManager.setPartitionIsProcessed(partitionId);
 340  0
     if (LOG.isInfoEnabled()) {
 341  0
       LOG.info("doneProcessingPartition: processing partition " + partitionId +
 342  
           " is done!");
 343  
     }
 344  0
   }
 345  
 
 346  
   /**
 347  
    * Notify out-of-core engine that iteration cycle over all partitions is about
 348  
    * to begin.
 349  
    */
 350  
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
 351  
       "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
 352  
   public void startIteration() {
 353  0
     oracle.startIteration();
 354  0
     if (!resetDone) {
 355  0
       superstepLock.writeLock().lock();
 356  0
       metaPartitionManager.resetPartitions();
 357  0
       superstepLock.writeLock().unlock();
 358  
     }
 359  0
     if (superstep != BspServiceWorker.INPUT_SUPERSTEP &&
 360  
         numProcessingThreads != numComputeThreads) {
 361  
       // This method is only executed by the main thread, and at this point
 362  
       // no other input/compute thread is alive. So, all the permits in
 363  
       // `activeThreadsPermit` is available. However, now that we are changing
 364  
       // the maximum number of active threads, we need to adjust the number
 365  
       // of available permits on `activeThreadsPermit`.
 366  0
       activeThreadsPermit.setMaxPermits(activeThreadsPermit.availablePermits() *
 367  
           numComputeThreads / numProcessingThreads);
 368  0
       numProcessingThreads = numComputeThreads;
 369  
     }
 370  0
     if (LOG.isInfoEnabled()) {
 371  0
       LOG.info("startIteration: with " +
 372  0
           metaPartitionManager.getNumInMemoryPartitions() +
 373  
           " partitions in memory and " +
 374  0
           activeThreadsPermit.availablePermits() + " active threads");
 375  
     }
 376  0
     resetDone = false;
 377  0
   }
 378  
 
 379  
   /**
 380  
    * Retrieve a particular partition. After this method is complete the
 381  
    * requested partition should be in memory.
 382  
    *
 383  
    * @param partitionId id of the partition to retrieve
 384  
    */
 385  
   public void retrievePartition(int partitionId) {
 386  0
     if (metaPartitionManager.isPartitionOnDisk(partitionId)) {
 387  0
       ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId,
 388  
           superstep));
 389  0
       synchronized (partitionAvailable) {
 390  0
         while (metaPartitionManager.isPartitionOnDisk(partitionId)) {
 391  
           try {
 392  0
             if (LOG.isInfoEnabled()) {
 393  0
               LOG.info("retrievePartition: waiting until partition " +
 394  
                   partitionId + " becomes available");
 395  
             }
 396  0
             partitionAvailable.wait();
 397  0
           } catch (InterruptedException e) {
 398  0
             throw new IllegalStateException("retrievePartition: caught " +
 399  
                 "InterruptedException while waiting to retrieve partition " +
 400  
                 partitionId);
 401  0
           }
 402  
         }
 403  0
       }
 404  
     }
 405  0
   }
 406  
 
 407  
   /**
 408  
    * Notify out-of-core engine that an IO command is completed by an IO thread
 409  
    *
 410  
    * @param command the IO command that is completed
 411  
    */
 412  
   public void ioCommandCompleted(IOCommand command) {
 413  0
     oracle.commandCompleted(command);
 414  0
     if (command instanceof LoadPartitionIOCommand) {
 415  
       // Notifying compute threads who are waiting for a partition to become
 416  
       // available in memory to process.
 417  0
       synchronized (partitionAvailable) {
 418  0
         partitionAvailable.notifyAll();
 419  0
       }
 420  
     }
 421  0
   }
 422  
 
 423  
   /**
 424  
    * Update the fraction of processing threads that should remain active. It is
 425  
    * the responsibility of out-of-core oracle to update the number of active
 426  
    * threads.
 427  
    *
 428  
    * @param fraction the fraction of processing threads to remain active. This
 429  
    *                 number is in range [0, 1]
 430  
    */
 431  
   public void updateActiveThreadsFraction(double fraction) {
 432  0
     checkState(fraction >= 0 && fraction <= 1);
 433  0
     int numActiveThreads = (int) (numProcessingThreads * fraction);
 434  0
     if (LOG.isInfoEnabled()) {
 435  0
       LOG.info("updateActiveThreadsFraction: updating the number of active " +
 436  
           "threads to " + numActiveThreads);
 437  
     }
 438  0
     activeThreadsPermit.setMaxPermits(numActiveThreads);
 439  0
   }
 440  
 
 441  
   /**
 442  
    * A processing thread would check in with out-of-core engine every once in a
 443  
    * while to make sure that it can still remain active. It is the
 444  
    * responsibility of the out-of-core oracle to update the number of active
 445  
    * threads in a way that the computation never fails, and yet achieve the
 446  
    * optimal performance it can achieve.
 447  
    */
 448  
   public void activeThreadCheckIn() {
 449  0
     activeThreadsPermit.release();
 450  
     try {
 451  0
       activeThreadsPermit.acquire();
 452  0
     } catch (InterruptedException e) {
 453  0
       LOG.error("activeThreadCheckIn: exception while acquiring a permit to " +
 454  
           "remain an active thread");
 455  0
       throw new IllegalStateException(e);
 456  0
     }
 457  0
   }
 458  
 
 459  
   /**
 460  
    * Notify the out-of-core engine that a processing (input/compute) thread has
 461  
    * started.
 462  
    */
 463  
   public void processingThreadStart() {
 464  
     try {
 465  0
       activeThreadsPermit.acquire();
 466  0
     } catch (InterruptedException e) {
 467  0
       LOG.error("processingThreadStart: exception while acquiring a permit to" +
 468  
           " start the processing thread!");
 469  0
       throw new IllegalStateException(e);
 470  0
     }
 471  0
   }
 472  
 
 473  
   /**
 474  
    * Notify the out-of-core engine that a processing (input/compute) thread has
 475  
    * finished.
 476  
    */
 477  
   public void processingThreadFinish() {
 478  0
     activeThreadsPermit.release();
 479  0
   }
 480  
 
 481  
   /**
 482  
    * Update the credit announced for this worker in Netty. The lower the credit
 483  
    * is, the lower rate incoming messages arrive at this worker. Thus, credit
 484  
    * is an indirect way of controlling amount of memory incoming messages would
 485  
    * take.
 486  
    *
 487  
    * @param fraction the fraction of max credits others can use to send requests
 488  
    *                 to this worker
 489  
    */
 490  
   public void updateRequestsCreditFraction(double fraction) {
 491  0
     checkState(fraction >= 0 && fraction <= 1);
 492  0
     short newCredit = (short) (maxRequestsCredit * fraction);
 493  0
     if (LOG.isInfoEnabled()) {
 494  0
       LOG.info("updateRequestsCreditFraction: updating the credit to " +
 495  
           newCredit);
 496  
     }
 497  0
     if (flowControl != null) {
 498  0
       ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
 499  
     }
 500  0
   }
 501  
 
 502  
   /**
 503  
    * Reset partitions and messages meta data. Also, reset the cached value of
 504  
    * superstep counter.
 505  
    */
 506  
   public void reset() {
 507  0
     metaPartitionManager.resetPartitions();
 508  0
     metaPartitionManager.resetMessages();
 509  0
     superstep = service.getSuperstep();
 510  0
     resetDone = true;
 511  0
   }
 512  
 
 513  
   /**
 514  
    * @return cached value of the superstep counter
 515  
    */
 516  
   public long getSuperstep() {
 517  0
     return superstep;
 518  
   }
 519  
 
 520  
   /**
 521  
    * Notify the out-of-core engine that a GC has just been completed
 522  
    *
 523  
    * @param info GC information
 524  
    */
 525  
   public void gcCompleted(GarbageCollectionNotificationInfo info) {
 526  0
     oracle.gcCompleted(info);
 527  0
   }
 528  
 
 529  
   @Override
 530  
   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
 531  0
     superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
 532  
       @Override
 533  
       public Double value() {
 534  0
         return metaPartitionManager.getGraphFractionInMemory() * 100;
 535  
       }
 536  
     });
 537  0
   }
 538  
 
 539  
   public FlowControl getFlowControl() {
 540  0
     return flowControl;
 541  
   }
 542  
 
 543  
   public void setFlowControl(FlowControl flowControl) {
 544  0
     this.flowControl = flowControl;
 545  0
   }
 546  
 
 547  
   public OutOfCoreDataAccessor getDataAccessor() {
 548  0
     return dataAccessor;
 549  
   }
 550  
 
 551  
   public NetworkMetrics getNetworkMetrics() {
 552  0
     return networkMetrics;
 553  
   }
 554  
 }