Coverage Report - org.apache.giraph.ooc.policy.FixedPartitionsOracle
 
Classes in this File Line Coverage Branch Coverage Complexity
FixedPartitionsOracle
0%
0/44
0%
0/24
3.333
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc.policy;
 20  
 
 21  
 import com.sun.management.GarbageCollectionNotificationInfo;
 22  
 import org.apache.giraph.conf.GiraphConstants;
 23  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 24  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 25  
 import org.apache.giraph.ooc.command.IOCommand;
 26  
 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
 27  
 import org.apache.giraph.ooc.command.StorePartitionIOCommand;
 28  
 import org.apache.log4j.Logger;
 29  
 
 30  
 import java.util.concurrent.atomic.AtomicInteger;
 31  
 
 32  
 import static com.google.common.base.Preconditions.checkState;
 33  
 
 34  
 /** Oracle for fixed out-of-core mechanism */
 35  
 public class FixedPartitionsOracle implements OutOfCoreOracle {
 36  
   /** Class logger */
 37  0
   private static final Logger LOG =
 38  0
       Logger.getLogger(FixedPartitionsOracle.class);
 39  
   /** Maximum number of partitions to be kept in memory */
 40  
   private final int maxPartitionsInMemory;
 41  
   /**
 42  
    * Number of partitions to be added (loaded) or removed (stored) to/from
 43  
    * memory. Each outstanding load partition counts +1 and each outstanding
 44  
    * store partition counts -1 toward this counter.
 45  
    */
 46  0
   private final AtomicInteger deltaNumPartitionsInMemory =
 47  
       new AtomicInteger(0);
 48  
   /** Out-of-core engine */
 49  
   private final OutOfCoreEngine oocEngine;
 50  
 
 51  
   /**
 52  
    * Constructor
 53  
    *
 54  
    * @param conf configuration
 55  
    * @param oocEngine out-of-core engine
 56  
    */
 57  
   public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
 58  0
                                OutOfCoreEngine oocEngine) {
 59  0
     this.maxPartitionsInMemory =
 60  0
         GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
 61  0
     this.oocEngine = oocEngine;
 62  0
   }
 63  
 
 64  
   @Override
 65  
   public IOAction[] getNextIOActions() {
 66  0
     int numPartitionsInMemory =
 67  0
         oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
 68  0
     int numPartialPartitionsInMemory =
 69  0
         oocEngine.getMetaPartitionManager().getNumPartiallyInMemoryPartitions();
 70  0
     if (LOG.isDebugEnabled()) {
 71  0
       LOG.debug("getNextIOActions: calling with " + numPartitionsInMemory +
 72  
           " partitions entirely in memory and " + numPartialPartitionsInMemory +
 73  
           " partitions partially in memory, " +
 74  0
           deltaNumPartitionsInMemory.get() + " to be loaded");
 75  
     }
 76  0
     checkState(numPartitionsInMemory >= 0);
 77  0
     checkState(numPartialPartitionsInMemory >= 0);
 78  0
     int numPartitions =
 79  0
         numPartitionsInMemory + deltaNumPartitionsInMemory.get();
 80  
     // Fixed out-of-core policy:
 81  
     //   - if the number of partitions in memory is less than the max number of
 82  
     //     partitions in memory, we should load a partition to memory. This
 83  
     //     basically means we are prefetching partition's data either for the
 84  
     //     current superstep, or for the next superstep.
 85  
     //   - if the number of partitions in memory is equal to the the max number
 86  
     //     of partitions in memory, we do a 'soft store', meaning, we store
 87  
     //     processed partition to disk only if there is an unprocessed partition
 88  
     //     on disk. This basically makes room for unprocessed partitions on disk
 89  
     //     to be prefetched.
 90  
     //   - if the number of partitions in memory is more than the max number of
 91  
     //     partitions in memory, we do a 'hard store', meaning we store a
 92  
     //     partition to disk, regardless of its processing state.
 93  0
     if (numPartitions < maxPartitionsInMemory) {
 94  0
       return new IOAction[]{
 95  
         IOAction.LOAD_PARTITION,
 96  
         IOAction.STORE_MESSAGES_AND_BUFFERS};
 97  0
     } else if (numPartitions > maxPartitionsInMemory) {
 98  0
       if (LOG.isDebugEnabled()) {
 99  0
         LOG.debug("getNextIOActions: number of partitions in memory passed " +
 100  
           "the specified threshold!");
 101  
       }
 102  0
       return new IOAction[]{
 103  
         IOAction.STORE_PARTITION,
 104  
         IOAction.STORE_MESSAGES_AND_BUFFERS};
 105  
     } else {
 106  0
       return new IOAction[]{
 107  
         IOAction.STORE_MESSAGES_AND_BUFFERS,
 108  
         IOAction.LOAD_TO_SWAP_PARTITION};
 109  
     }
 110  
   }
 111  
 
 112  
   @Override
 113  
   public boolean approve(IOCommand command) {
 114  0
     int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
 115  0
         .getNumInMemoryPartitions();
 116  
     // If loading a partition result in having more partition in memory, the
 117  
     // command should be denied. Also, if number of partitions in memory is
 118  
     // already less than the max number of partitions, any command for storing
 119  
     // a partition should be denied.
 120  0
     if (command instanceof LoadPartitionIOCommand &&
 121  0
         numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
 122  
             maxPartitionsInMemory) {
 123  0
       deltaNumPartitionsInMemory.getAndDecrement();
 124  0
       return false;
 125  
 
 126  0
     } else if (command instanceof StorePartitionIOCommand &&
 127  0
         numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
 128  
             maxPartitionsInMemory) {
 129  0
       deltaNumPartitionsInMemory.getAndIncrement();
 130  0
       return false;
 131  
     }
 132  0
     return true;
 133  
   }
 134  
 
 135  
   @Override
 136  
   public void commandCompleted(IOCommand command) {
 137  0
     if (command instanceof LoadPartitionIOCommand) {
 138  0
       deltaNumPartitionsInMemory.getAndDecrement();
 139  0
     } else if (command instanceof StorePartitionIOCommand) {
 140  0
       deltaNumPartitionsInMemory.getAndIncrement();
 141  
     }
 142  0
   }
 143  
 
 144  
   @Override
 145  0
   public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
 146  
 
 147  
   @Override
 148  
   public void startIteration() {
 149  0
   }
 150  
 }