Coverage Report - org.apache.giraph.ooc.OutOfCoreIOScheduler
 
Classes in this File Line Coverage Branch Coverage Complexity
OutOfCoreIOScheduler
0%
0/103
0%
0/72
0
OutOfCoreIOScheduler$1
0%
0/1
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc;
 20  
 
 21  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 22  
 import org.apache.giraph.conf.IntConfOption;
 23  
 import org.apache.giraph.ooc.command.IOCommand;
 24  
 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
 25  
 import org.apache.giraph.ooc.command.StoreDataBufferIOCommand;
 26  
 import org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand;
 27  
 import org.apache.giraph.ooc.command.StorePartitionIOCommand;
 28  
 import org.apache.giraph.ooc.command.WaitIOCommand;
 29  
 import org.apache.giraph.ooc.policy.OutOfCoreOracle;
 30  
 import org.apache.log4j.Logger;
 31  
 
 32  
 import java.util.ArrayList;
 33  
 import java.util.Arrays;
 34  
 import java.util.List;
 35  
 import java.util.Queue;
 36  
 import java.util.concurrent.ConcurrentLinkedQueue;
 37  
 
 38  
 import static com.google.common.base.Preconditions.checkNotNull;
 39  
 
 40  
 /**
 41  
  * Representation of IO thread scheduler for out-of-core mechanism
 42  
  */
 43  
 public class OutOfCoreIOScheduler {
 44  
   /**
 45  
    * If an IO thread does not have any command to do, it waits for certain a
 46  
    * period and check back again to see if there exist any command to perform.
 47  
    * This constant determines this wait period in milliseconds.
 48  
    */
 49  0
   public static final IntConfOption OOC_WAIT_INTERVAL =
 50  
       new IntConfOption("giraph.oocWaitInterval", 1000,
 51  
           "Duration (in milliseconds) which IO threads in out-of-core " +
 52  
               "mechanism would wait until a command becomes available");
 53  
   /** Class logger. */
 54  0
   private static final Logger LOG =
 55  0
       Logger.getLogger(OutOfCoreIOScheduler.class);
 56  
   /** Out-of-core engine */
 57  
   private final OutOfCoreEngine oocEngine;
 58  
   /** How much an IO thread should wait if there is no IO command */
 59  
   private final int waitInterval;
 60  
   /**
 61  
    * Queue of IO commands for loading partitions to memory. Load commands are
 62  
    * urgent and should be done once loading data is a viable IO command.
 63  
    */
 64  
   private final List<Queue<IOCommand>> threadLoadCommandQueue;
 65  
   /** Whether IO threads should terminate */
 66  
   private volatile boolean shouldTerminate;
 67  
 
 68  
   /**
 69  
    * Constructor
 70  
    *
 71  
    * @param conf configuration
 72  
    * @param oocEngine out-of-core engine
 73  
    * @param numDisks number of disks (IO threads)
 74  
    */
 75  
   OutOfCoreIOScheduler(final ImmutableClassesGiraphConfiguration conf,
 76  0
                        OutOfCoreEngine oocEngine, int numDisks) {
 77  0
     this.oocEngine = oocEngine;
 78  0
     this.waitInterval = OOC_WAIT_INTERVAL.get(conf);
 79  0
     threadLoadCommandQueue = new ArrayList<>(numDisks);
 80  0
     for (int i = 0; i < numDisks; ++i) {
 81  0
       threadLoadCommandQueue.add(
 82  
           new ConcurrentLinkedQueue<IOCommand>());
 83  
     }
 84  0
     shouldTerminate = false;
 85  0
   }
 86  
 
 87  
   /**
 88  
    * Generate and return the next appropriate IO command for a given thread
 89  
    *
 90  
    * @param threadId id of the thread ready to execute the next IO command
 91  
    * @return next IO command to be executed by the given thread
 92  
    */
 93  
   public IOCommand getNextIOCommand(int threadId) {
 94  0
     if (shouldTerminate) {
 95  0
       return null;
 96  
     }
 97  0
     IOCommand command = null;
 98  
     do {
 99  0
       if (command != null && LOG.isInfoEnabled()) {
 100  0
         LOG.info("getNextIOCommand: command " + command + " was proposed to " +
 101  
             "the oracle, but got denied. Generating another command!");
 102  
       }
 103  0
       OutOfCoreOracle.IOAction[] actions =
 104  0
           oocEngine.getOracle().getNextIOActions();
 105  0
       if (LOG.isDebugEnabled()) {
 106  0
         LOG.debug("getNextIOCommand: actions are " + Arrays.toString(actions));
 107  
       }
 108  
       // Check whether there are any urgent outstanding load requests
 109  0
       if (!threadLoadCommandQueue.get(threadId).isEmpty()) {
 110  
         // Check whether loading a partition is a viable (allowed) action to do
 111  0
         boolean canLoad = false;
 112  0
         for (OutOfCoreOracle.IOAction action : actions) {
 113  0
           if (action == OutOfCoreOracle.IOAction.LOAD_PARTITION ||
 114  
               action == OutOfCoreOracle.IOAction.LOAD_UNPROCESSED_PARTITION ||
 115  
               action == OutOfCoreOracle.IOAction.LOAD_TO_SWAP_PARTITION ||
 116  
               action == OutOfCoreOracle.IOAction.URGENT_LOAD_PARTITION) {
 117  0
             canLoad = true;
 118  0
             break;
 119  
           }
 120  
         }
 121  0
         if (canLoad) {
 122  0
           command = threadLoadCommandQueue.get(threadId).poll();
 123  0
           checkNotNull(command);
 124  0
           if (oocEngine.getOracle().approve(command)) {
 125  0
             return command;
 126  
           } else {
 127  
             // Loading is not viable at this moment. We should put the command
 128  
             // back in the load queue and wait until loading becomes viable.
 129  0
             threadLoadCommandQueue.get(threadId).offer(command);
 130  
           }
 131  
         }
 132  
       }
 133  0
       command = null;
 134  0
       for (OutOfCoreOracle.IOAction action : actions) {
 135  
         Integer partitionId;
 136  0
         switch (action) {
 137  
         case STORE_MESSAGES_AND_BUFFERS:
 138  0
           partitionId = oocEngine.getMetaPartitionManager()
 139  0
               .getOffloadPartitionBufferId(threadId);
 140  0
           if (partitionId != null) {
 141  0
             command = new StoreDataBufferIOCommand(oocEngine, partitionId,
 142  
                 StoreDataBufferIOCommand.DataBufferType.PARTITION);
 143  
           } else {
 144  0
             partitionId = oocEngine.getMetaPartitionManager()
 145  0
                 .getOffloadMessageBufferId(threadId);
 146  0
             if (partitionId != null) {
 147  0
               command = new StoreDataBufferIOCommand(oocEngine, partitionId,
 148  
                   StoreDataBufferIOCommand.DataBufferType.MESSAGE);
 149  
             } else {
 150  0
               partitionId = oocEngine.getMetaPartitionManager()
 151  0
                   .getOffloadMessageId(threadId);
 152  0
               if (partitionId != null) {
 153  0
                 command = new StoreIncomingMessageIOCommand(oocEngine,
 154  0
                     partitionId);
 155  
               }
 156  
             }
 157  
           }
 158  
           break;
 159  
         case STORE_PROCESSED_PARTITION:
 160  0
           partitionId = oocEngine.getMetaPartitionManager()
 161  0
               .getOffloadPartitionId(threadId);
 162  0
           if (partitionId != null &&
 163  0
               oocEngine.getMetaPartitionManager()
 164  0
                   .isPartitionProcessed(partitionId)) {
 165  0
             command = new StorePartitionIOCommand(oocEngine, partitionId);
 166  
           }
 167  
           break;
 168  
         case STORE_PARTITION:
 169  0
           partitionId = oocEngine.getMetaPartitionManager()
 170  0
               .getOffloadPartitionId(threadId);
 171  0
           if (partitionId != null) {
 172  0
             command = new StorePartitionIOCommand(oocEngine, partitionId);
 173  
           }
 174  
           break;
 175  
         case LOAD_UNPROCESSED_PARTITION:
 176  0
           partitionId = oocEngine.getMetaPartitionManager()
 177  0
               .getLoadPartitionId(threadId);
 178  0
           if (partitionId != null &&
 179  0
               !oocEngine.getMetaPartitionManager()
 180  0
                   .isPartitionProcessed(partitionId)) {
 181  0
             command = new LoadPartitionIOCommand(oocEngine, partitionId,
 182  0
                 oocEngine.getSuperstep());
 183  
           }
 184  
           break;
 185  
         case LOAD_TO_SWAP_PARTITION:
 186  0
           partitionId = oocEngine.getMetaPartitionManager()
 187  0
               .getLoadPartitionId(threadId);
 188  0
           if (partitionId != null &&
 189  0
               !oocEngine.getMetaPartitionManager()
 190  0
                   .isPartitionProcessed(partitionId) &&
 191  0
               oocEngine.getMetaPartitionManager().hasProcessedOnMemory()) {
 192  0
             command = new LoadPartitionIOCommand(oocEngine, partitionId,
 193  0
                 oocEngine.getSuperstep());
 194  
           }
 195  
           break;
 196  
         case LOAD_PARTITION:
 197  0
           partitionId = oocEngine.getMetaPartitionManager()
 198  0
               .getLoadPartitionId(threadId);
 199  0
           if (partitionId != null) {
 200  0
             if (oocEngine.getMetaPartitionManager()
 201  0
                 .isPartitionProcessed(partitionId)) {
 202  0
               command = new LoadPartitionIOCommand(oocEngine, partitionId,
 203  0
                   oocEngine.getSuperstep() + 1);
 204  
             } else {
 205  0
               command = new LoadPartitionIOCommand(oocEngine, partitionId,
 206  0
                   oocEngine.getSuperstep());
 207  
             }
 208  
           }
 209  
           break;
 210  
         case URGENT_LOAD_PARTITION:
 211  
           // Do nothing
 212  0
           break;
 213  
         default:
 214  0
           throw new IllegalStateException("getNextIOCommand: the IO action " +
 215  
               "is not defined!");
 216  
         }
 217  0
         if (command != null) {
 218  0
           break;
 219  
         }
 220  
       }
 221  0
       if (command == null) {
 222  0
         command = new WaitIOCommand(oocEngine, waitInterval);
 223  
       }
 224  0
     } while (!oocEngine.getOracle().approve(command));
 225  0
     return command;
 226  
   }
 227  
 
 228  
   /**
 229  
    * Notify IO scheduler that the IO command is completed
 230  
    *
 231  
    * @param command completed command
 232  
    */
 233  
   public void ioCommandCompleted(IOCommand command) {
 234  0
     oocEngine.ioCommandCompleted(command);
 235  0
   }
 236  
 
 237  
   /**
 238  
    * Add an IO command to the scheduling queue of the IO scheduler
 239  
    *
 240  
    * @param ioCommand IO command to add to the scheduler
 241  
    */
 242  
   public void addIOCommand(IOCommand ioCommand) {
 243  0
     if (ioCommand instanceof LoadPartitionIOCommand) {
 244  0
       int ownerThread = oocEngine.getMetaPartitionManager()
 245  0
           .getOwnerThreadId(ioCommand.getPartitionId());
 246  0
       threadLoadCommandQueue.get(ownerThread).offer(ioCommand);
 247  0
     } else {
 248  0
       throw new IllegalStateException("addIOCommand: IO command type is not " +
 249  
           "supported for addition");
 250  
     }
 251  0
   }
 252  
 
 253  
   /**
 254  
    * Shutdown/Terminate the IO scheduler, and notify all IO threads to halt
 255  
    */
 256  
   public void shutdown() {
 257  0
     shouldTerminate = true;
 258  0
     if (LOG.isInfoEnabled()) {
 259  0
       LOG.info("shutdown: OutOfCoreIOScheduler shutting down!");
 260  
     }
 261  0
   }
 262  
 }