Coverage Report - org.apache.giraph.ooc.data.MetaPartitionManager
 
Classes in this File Line Coverage Branch Coverage Complexity
MetaPartitionManager
0%
0/293
0%
0/120
0
MetaPartitionManager$MetaPartition
0%
0/39
0%
0/12
0
MetaPartitionManager$MetaPartitionDictionary
0%
0/64
0%
0/40
0
MetaPartitionManager$PartitionStorageState
0%
0/2
N/A
0
MetaPartitionManager$ProcessingState
0%
0/1
N/A
0
MetaPartitionManager$StorageState
0%
0/1
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc.data;
 20  
 
 21  
 import com.google.common.collect.Maps;
 22  
 import com.google.common.collect.Sets;
 23  
 import com.google.common.util.concurrent.AtomicDouble;
 24  
 import org.apache.giraph.bsp.BspService;
 25  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 26  
 import org.apache.giraph.worker.BspServiceWorker;
 27  
 import org.apache.giraph.worker.WorkerProgress;
 28  
 import org.apache.log4j.Logger;
 29  
 
 30  
 import java.util.ArrayList;
 31  
 import java.util.Iterator;
 32  
 import java.util.List;
 33  
 import java.util.Random;
 34  
 import java.util.Set;
 35  
 import java.util.concurrent.ConcurrentMap;
 36  
 import java.util.concurrent.atomic.AtomicInteger;
 37  
 
 38  
 import static com.google.common.base.Preconditions.checkState;
 39  
 
 40  
 /**
 41  
  * Class to keep meta-information about partition data, edge data, and message
 42  
  * data of each partition on a worker.
 43  
  */
 44  0
 public class MetaPartitionManager {
 45  
   /**
 46  
    * Flag representing no partitions is left to process in the current iteration
 47  
    * cycle over all partitions.
 48  
    */
 49  
   public static final int NO_PARTITION_TO_PROCESS = -1;
 50  
 
 51  
   /** Class logger */
 52  0
   private static final Logger LOG =
 53  0
       Logger.getLogger(MetaPartitionManager.class);
 54  
   /** Different storage states for data */
 55  0
   private enum StorageState { IN_MEM, ON_DISK, IN_TRANSIT };
 56  
   /**
 57  
    * Different storage states for a partition as a whole (i.e. the partition
 58  
    * and its current messages)
 59  
    */
 60  0
   private enum PartitionStorageState
 61  
     /**
 62  
      * Either both partition and its current messages are in memory, or both
 63  
      * are on disk, or one part is on disk and the other part is in memory.
 64  
      */
 65  0
   { FULLY_IN_MEM, PARTIALLY_IN_MEM, FULLY_ON_DISK };
 66  
   /**
 67  
    * Different processing states for partitions. Processing states are reset
 68  
    * at the beginning of each iteration cycle over partitions.
 69  
    */
 70  0
   private enum ProcessingState { PROCESSED, UNPROCESSED, IN_PROCESS };
 71  
 
 72  
   /**
 73  
    * Number of partitions in-memory (partition and current messages in memory)
 74  
    */
 75  0
   private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0);
 76  
   /**
 77  
    * Number of partitions that are partially in-memory (either partition or its
 78  
    * current messages is in memory and the other part is not)
 79  
    */
 80  0
   private final AtomicInteger numPartiallyInMemoryPartitions =
 81  
       new AtomicInteger(0);
 82  
   /** Map (dictionary) of partitions to their meta information */
 83  0
   private final ConcurrentMap<Integer, MetaPartition> partitions =
 84  0
       Maps.newConcurrentMap();
 85  
   /** Reverse dictionaries of partitions assigned to each IO thread */
 86  
   private final List<MetaPartitionDictionary> perThreadPartitionDictionary;
 87  
   /** For each IO thread, set of partition ids that are on-disk and have
 88  
    * 'large enough' vertex/edge buffers to be offloaded on disk
 89  
    */
 90  
   private final List<Set<Integer>> perThreadVertexEdgeBuffers;
 91  
   /**
 92  
    * For each IO thread, set of partition ids that are on-disk and have
 93  
    * 'large enough' message buffers to be offloaded on disk
 94  
    */
 95  
   private final List<Set<Integer>> perThreadMessageBuffers;
 96  
   /**
 97  
    * Out-of-core engine
 98  
    */
 99  
   private final OutOfCoreEngine oocEngine;
 100  
   /**
 101  
    * Number of processed partitions in the current iteration cycle over all
 102  
    * partitions
 103  
    */
 104  0
   private final AtomicInteger numPartitionsProcessed = new AtomicInteger(0);
 105  
   /**
 106  
    * Random number generator to choose a thread to get one of its partition for
 107  
    * processing
 108  
    */
 109  
   private final Random randomGenerator;
 110  
   /**
 111  
    * What is the lowest fraction of partitions in memory, relative to the total
 112  
    * number of available partitions? This is an indirect estimation of the
 113  
    * amount of graph in memory, which can be used to estimate how many more
 114  
    * machines needed to avoid out-of-core execution. At the beginning all the
 115  
    * graph is in memory, so the fraction is 1. This fraction is calculated per
 116  
    * superstep.
 117  
    */
 118  0
   private final AtomicDouble lowestGraphFractionInMemory =
 119  
       new AtomicDouble(1);
 120  
   /**
 121  
    * Map of partition ids to their indices. index of a partition is the order
 122  
    * with which the partition has been inserted. Partitions are indexed as 0, 1,
 123  
    * 2, etc. This indexing is later used to find the id of the IO thread who is
 124  
    * responsible for handling a partition. Partitions are assigned to IO threads
 125  
    * in a round-robin fashion based on their indices.
 126  
    */
 127  0
   private final ConcurrentMap<Integer, Integer> partitionIndex =
 128  0
       Maps.newConcurrentMap();
 129  
   /**
 130  
    * Sequential counter used to assign indices to partitions as they are added
 131  
    */
 132  0
   private final AtomicInteger indexCounter = new AtomicInteger(0);
 133  
   /** How many disks (i.e. IO threads) do we have? */
 134  
   private final int numIOThreads;
 135  
 
 136  
   /**
 137  
    * Constructor
 138  
    *
 139  
    * @param numIOThreads number of IO threads
 140  
    * @param oocEngine out-of-core engine
 141  
    */
 142  0
   public MetaPartitionManager(int numIOThreads, OutOfCoreEngine oocEngine) {
 143  0
     perThreadPartitionDictionary = new ArrayList<>(numIOThreads);
 144  0
     perThreadVertexEdgeBuffers = new ArrayList<>(numIOThreads);
 145  0
     perThreadMessageBuffers = new ArrayList<>(numIOThreads);
 146  0
     for (int i = 0; i < numIOThreads; ++i) {
 147  0
       perThreadPartitionDictionary.add(new MetaPartitionDictionary());
 148  0
       perThreadMessageBuffers.add(Sets.<Integer>newConcurrentHashSet());
 149  0
       perThreadVertexEdgeBuffers.add(Sets.<Integer>newConcurrentHashSet());
 150  
     }
 151  0
     this.oocEngine = oocEngine;
 152  0
     this.randomGenerator = new Random();
 153  0
     this.numIOThreads = numIOThreads;
 154  0
   }
 155  
 
 156  
   /**
 157  
    * @return number of partitions in memory
 158  
    */
 159  
   public int getNumInMemoryPartitions() {
 160  0
     return numInMemoryPartitions.get();
 161  
   }
 162  
 
 163  
   /**
 164  
    * @return number of partitions that are partially in memory
 165  
    */
 166  
   public int getNumPartiallyInMemoryPartitions() {
 167  0
     return numPartiallyInMemoryPartitions.get();
 168  
   }
 169  
 
 170  
   /**
 171  
    * Get total number of partitions
 172  
    *
 173  
    * @return total number of partitions
 174  
    */
 175  
   public int getNumPartitions() {
 176  0
     return partitions.size();
 177  
   }
 178  
 
 179  
   /**
 180  
    * Since the statistics are based on estimates, we assume each partial
 181  
    * partition is taking about half of the full partition in terms of memory
 182  
    * footprint.
 183  
    *
 184  
    * @return estimate of fraction of graph in memory
 185  
    */
 186  
   public double getGraphFractionInMemory() {
 187  0
     return (getNumInMemoryPartitions() +
 188  0
         getNumPartiallyInMemoryPartitions() / 2.0) / getNumPartitions();
 189  
   }
 190  
 
 191  
   /**
 192  
    * Update the lowest fraction of graph in memory so to have a more accurate
 193  
    * information in one of the counters.
 194  
    */
 195  
   private synchronized void updateGraphFractionInMemory() {
 196  0
     double graphInMemory = getGraphFractionInMemory();
 197  0
     if (graphInMemory < lowestGraphFractionInMemory.get()) {
 198  0
       lowestGraphFractionInMemory.set(graphInMemory);
 199  0
       WorkerProgress.get().updateLowestGraphPercentageInMemory(
 200  
           (int) (graphInMemory * 100));
 201  
     }
 202  0
   }
 203  
 
 204  
   /**
 205  
    * Update the book-keeping about number of in-memory partitions and partially
 206  
    * in-memory partitions with regard to the storage status of the partition and
 207  
    * its current messages before and after an update to its status.
 208  
    *
 209  
    * @param stateBefore the storage state of the partition and its current
 210  
    *                    messages before an update
 211  
    * @param stateAfter the storage state of the partition and its current
 212  
    *                   messages after an update
 213  
    */
 214  
   private void updateCounters(PartitionStorageState stateBefore,
 215  
                               PartitionStorageState stateAfter) {
 216  0
     numInMemoryPartitions.getAndAdd(
 217  
         ((stateAfter == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0) -
 218  
             ((stateBefore == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0));
 219  0
     numPartiallyInMemoryPartitions.getAndAdd(
 220  
         ((stateAfter == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0) -
 221  
             ((stateBefore == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0));
 222  0
   }
 223  
 
 224  
   /**
 225  
    * Whether a given partition is available
 226  
    *
 227  
    * @param partitionId id of the partition to check if this worker owns it
 228  
    * @return true if the worker owns the partition, false otherwise
 229  
    */
 230  
   public boolean hasPartition(Integer partitionId) {
 231  0
     return partitions.containsKey(partitionId);
 232  
   }
 233  
 
 234  
   /**
 235  
    * Return the list of all available partitions as an iterable
 236  
    *
 237  
    * @return list of all available partitions
 238  
    */
 239  
   public Iterable<Integer> getPartitionIds() {
 240  0
     return partitions.keySet();
 241  
   }
 242  
 
 243  
   /**
 244  
    * Get the thread id that is responsible for a particular partition
 245  
    *
 246  
    * @param partitionId id of the given partition
 247  
    * @return id of the thread responsible for the given partition
 248  
    */
 249  
   public int getOwnerThreadId(int partitionId) {
 250  0
     Integer index = partitionIndex.get(partitionId);
 251  0
     checkState(index != null);
 252  0
     return index % numIOThreads;
 253  
   }
 254  
 
 255  
   /**
 256  
    * Add a partition
 257  
    *
 258  
    * @param partitionId id of a partition to add
 259  
    */
 260  
   public void addPartition(int partitionId) {
 261  0
     MetaPartition meta = new MetaPartition(partitionId);
 262  0
     MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
 263  
     // Check if the given partition is new
 264  0
     if (temp == null) {
 265  0
       int index = indexCounter.getAndIncrement();
 266  0
       checkState(partitionIndex.putIfAbsent(partitionId, index) == null);
 267  0
       int ownerThread = getOwnerThreadId(partitionId);
 268  0
       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
 269  0
       numInMemoryPartitions.getAndIncrement();
 270  
     }
 271  0
   }
 272  
 
 273  
   /**
 274  
    * Remove a partition. This method assumes that the partition is already
 275  
    * retrieved and is in memory)
 276  
    *
 277  
    * @param partitionId id of a partition to remove
 278  
    */
 279  
   public void removePartition(Integer partitionId) {
 280  0
     MetaPartition meta = partitions.remove(partitionId);
 281  0
     int ownerThread = getOwnerThreadId(partitionId);
 282  0
     perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
 283  0
     checkState(!meta.isOnDisk());
 284  0
     numInMemoryPartitions.getAndDecrement();
 285  0
   }
 286  
 
 287  
   /**
 288  
    * Pops an entry from the specified set.
 289  
    *
 290  
    * @param set set to pop an entry from
 291  
    * @param <T> Type of entries in the set
 292  
    * @return popped entry from the given set
 293  
    */
 294  
   private static <T> T popFromSet(Set<T> set) {
 295  0
     if (!set.isEmpty()) {
 296  0
       Iterator<T> it = set.iterator();
 297  0
       T entry = it.next();
 298  0
       it.remove();
 299  0
       return entry;
 300  
     }
 301  0
     return null;
 302  
   }
 303  
 
 304  
   /**
 305  
    * Peeks an entry from the specified set.
 306  
    *
 307  
    * @param set set to peek an entry from
 308  
    * @param <T> Type of entries in the set
 309  
    * @return peeked entry from the given set
 310  
    */
 311  
   private static <T> T peekFromSet(Set<T> set) {
 312  0
     if (!set.isEmpty()) {
 313  0
       return set.iterator().next();
 314  
     }
 315  0
     return null;
 316  
   }
 317  
 
 318  
   /**
 319  
    * Get id of a partition to offload to disk. Prioritize offloading processed
 320  
    * partitions over unprocessed partition. Also, prioritize offloading
 321  
    * partitions partially in memory over partitions fully in memory.
 322  
    *
 323  
    * @param threadId id of the thread who is going to store the partition on
 324  
    *                 disk
 325  
    * @return id of the partition to offload on disk
 326  
    */
 327  
   public Integer getOffloadPartitionId(int threadId) {
 328  
     // First, look for a processed partition partially on disk
 329  0
     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
 330  
         ProcessingState.PROCESSED,
 331  
         StorageState.IN_MEM,
 332  
         StorageState.ON_DISK,
 333  
         null);
 334  0
     if (meta != null) {
 335  0
       return meta.getPartitionId();
 336  
     }
 337  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 338  
         ProcessingState.PROCESSED,
 339  
         StorageState.ON_DISK,
 340  
         StorageState.IN_MEM,
 341  
         null);
 342  0
     if (meta != null) {
 343  0
       return meta.getPartitionId();
 344  
     }
 345  
     // Second, look for a processed partition entirely in memory
 346  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 347  
         ProcessingState.PROCESSED,
 348  
         StorageState.IN_MEM,
 349  
         StorageState.IN_MEM,
 350  
         null);
 351  0
     if (meta != null) {
 352  0
       return meta.getPartitionId();
 353  
     }
 354  
 
 355  
     // Third, look for an unprocessed partition partially on disk
 356  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 357  
         ProcessingState.UNPROCESSED,
 358  
         StorageState.IN_MEM,
 359  
         StorageState.ON_DISK,
 360  
         null);
 361  0
     if (meta != null) {
 362  0
       return meta.getPartitionId();
 363  
     }
 364  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 365  
         ProcessingState.UNPROCESSED,
 366  
         StorageState.ON_DISK,
 367  
         StorageState.IN_MEM,
 368  
         null);
 369  0
     if (meta != null) {
 370  0
       return meta.getPartitionId();
 371  
     }
 372  
     // Forth, look for an unprocessed partition entirely in memory
 373  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 374  
         ProcessingState.UNPROCESSED,
 375  
         StorageState.IN_MEM,
 376  
         StorageState.IN_MEM,
 377  
         null);
 378  0
     if (meta != null) {
 379  0
       return meta.getPartitionId();
 380  
     }
 381  0
     return null;
 382  
   }
 383  
 
 384  
   /**
 385  
    * Get id of a partition to offload its vertex/edge buffers on disk
 386  
    *
 387  
    * @param threadId id of the thread who is going to store the buffers on disk
 388  
    * @return id of the partition to offload its vertex/edge buffers on disk
 389  
    */
 390  
   public Integer getOffloadPartitionBufferId(int threadId) {
 391  0
     if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
 392  0
       Integer partitionId =
 393  0
           popFromSet(perThreadVertexEdgeBuffers.get(threadId));
 394  0
       if (partitionId == null) {
 395  0
         DiskBackedPartitionStore<?, ?, ?> partitionStore =
 396  0
             (DiskBackedPartitionStore<?, ?, ?>) (oocEngine.getServerData()
 397  0
                 .getPartitionStore());
 398  0
         perThreadVertexEdgeBuffers.get(threadId)
 399  0
             .addAll(partitionStore.getCandidateBuffersToOffload(threadId));
 400  0
         DiskBackedEdgeStore<?, ?, ?> edgeStore =
 401  0
             (DiskBackedEdgeStore<?, ?, ?>) (oocEngine.getServerData())
 402  0
                 .getEdgeStore();
 403  0
         perThreadVertexEdgeBuffers.get(threadId)
 404  0
             .addAll(edgeStore.getCandidateBuffersToOffload(threadId));
 405  0
         partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId));
 406  
       }
 407  0
       return partitionId;
 408  
     }
 409  0
     return null;
 410  
   }
 411  
 
 412  
   /**
 413  
    * Get id of a partition to offload its incoming message buffers on disk
 414  
    *
 415  
    * @param threadId id of the thread who is going to store the buffers on disk
 416  
    * @return id of the partition to offload its message buffer on disk
 417  
    */
 418  
   public Integer getOffloadMessageBufferId(int threadId) {
 419  0
     if (oocEngine.getSuperstep() != BspServiceWorker.INPUT_SUPERSTEP) {
 420  0
       Integer partitionId =
 421  0
           popFromSet(perThreadMessageBuffers.get(threadId));
 422  0
       if (partitionId == null) {
 423  0
         DiskBackedMessageStore<?, ?> messageStore =
 424  0
             (DiskBackedMessageStore<?, ?>) (oocEngine.getServerData()
 425  0
                 .getIncomingMessageStore());
 426  0
         if (messageStore != null) {
 427  0
           perThreadMessageBuffers.get(threadId)
 428  0
               .addAll(messageStore.getCandidateBuffersToOffload(threadId));
 429  0
           partitionId = popFromSet(perThreadMessageBuffers.get(threadId));
 430  
         }
 431  
       }
 432  0
       return partitionId;
 433  
     }
 434  0
     return null;
 435  
   }
 436  
 
 437  
   /**
 438  
    * Get id of a partition to offload its incoming message on disk. Prioritize
 439  
    * offloading messages of partitions already on disk, and then partitions
 440  
    * in-transit, over partitions in-memory. Also, prioritize processed
 441  
    * partitions over unprocessed (processed partitions would go on disk with
 442  
    * more chances that unprocessed partitions)
 443  
    *
 444  
    * @param threadId id of the thread who is going to store the incoming
 445  
    *                 messages on disk
 446  
    * @return id of the partition to offload its message on disk
 447  
    */
 448  
   public Integer getOffloadMessageId(int threadId) {
 449  0
     if (oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
 450  0
       return null;
 451  
     }
 452  0
     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
 453  
         ProcessingState.PROCESSED,
 454  
         StorageState.ON_DISK,
 455  
         null,
 456  
         StorageState.IN_MEM);
 457  0
     if (meta != null) {
 458  0
       return meta.getPartitionId();
 459  
     }
 460  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 461  
         ProcessingState.PROCESSED,
 462  
         StorageState.IN_TRANSIT,
 463  
         null,
 464  
         StorageState.IN_MEM);
 465  0
     if (meta != null) {
 466  0
       return meta.getPartitionId();
 467  
     }
 468  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 469  
         ProcessingState.UNPROCESSED,
 470  
         StorageState.ON_DISK,
 471  
         null,
 472  
         StorageState.IN_MEM);
 473  0
     if (meta != null) {
 474  0
       return meta.getPartitionId();
 475  
     }
 476  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 477  
         ProcessingState.UNPROCESSED,
 478  
         StorageState.IN_TRANSIT,
 479  
         null,
 480  
         StorageState.IN_MEM);
 481  0
     if (meta != null) {
 482  0
       return meta.getPartitionId();
 483  
     }
 484  0
     return null;
 485  
   }
 486  
 
 487  
   /**
 488  
    * Get id of a partition to load its data to memory. Prioritize loading an
 489  
    * unprocessed partition over loading processed partition. Also, prioritize
 490  
    * loading a partition partially in memory over partitions entirely on disk.
 491  
    *
 492  
    * @param threadId id of the thread who is going to load the partition data
 493  
    * @return id of the partition to load its data to memory
 494  
    */
 495  
   public Integer getLoadPartitionId(int threadId) {
 496  
     // First, look for an unprocessed partition partially in memory
 497  0
     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
 498  
         ProcessingState.UNPROCESSED,
 499  
         StorageState.IN_MEM,
 500  
         StorageState.ON_DISK,
 501  
         null);
 502  0
     if (meta != null) {
 503  0
       return meta.getPartitionId();
 504  
     }
 505  
 
 506  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 507  
         ProcessingState.UNPROCESSED,
 508  
         StorageState.ON_DISK,
 509  
         StorageState.IN_MEM,
 510  
         null);
 511  0
     if (meta != null) {
 512  0
       return meta.getPartitionId();
 513  
     }
 514  
 
 515  
     // Second, look for an unprocessed partition entirely on disk
 516  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 517  
         ProcessingState.UNPROCESSED,
 518  
         StorageState.ON_DISK,
 519  
         StorageState.ON_DISK,
 520  
         null);
 521  0
     if (meta != null) {
 522  0
       return meta.getPartitionId();
 523  
     }
 524  
 
 525  
     // Third, look for a processed partition partially in memory
 526  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 527  
         ProcessingState.PROCESSED,
 528  
         StorageState.IN_MEM,
 529  
         null,
 530  
         StorageState.ON_DISK);
 531  0
     if (meta != null) {
 532  0
       return meta.getPartitionId();
 533  
     }
 534  
 
 535  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 536  
         ProcessingState.PROCESSED,
 537  
         StorageState.ON_DISK,
 538  
         null,
 539  
         StorageState.IN_MEM);
 540  0
     if (meta != null) {
 541  0
       return meta.getPartitionId();
 542  
     }
 543  
 
 544  0
     meta = perThreadPartitionDictionary.get(threadId).lookup(
 545  
         ProcessingState.PROCESSED,
 546  
         StorageState.ON_DISK,
 547  
         null,
 548  
         StorageState.ON_DISK);
 549  0
     if (meta != null) {
 550  0
       return meta.getPartitionId();
 551  
     }
 552  
 
 553  0
     return null;
 554  
   }
 555  
 
 556  
   /**
 557  
    * Mark a partition as being 'IN_PROCESS'
 558  
    *
 559  
    * @param partitionId id of the partition to mark
 560  
    */
 561  
   public void markPartitionAsInProcess(int partitionId) {
 562  0
     MetaPartition meta = partitions.get(partitionId);
 563  0
     int ownerThread = getOwnerThreadId(partitionId);
 564  0
     synchronized (meta) {
 565  0
       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
 566  0
       meta.setProcessingState(ProcessingState.IN_PROCESS);
 567  0
       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
 568  0
     }
 569  0
   }
 570  
 
 571  
   /**
 572  
    * Whether there is any processed partition stored in memory (excluding those
 573  
    * that are prefetched to execute in the next superstep).
 574  
    *
 575  
    * @return true iff there is any processed partition in memory
 576  
    */
 577  
   public boolean hasProcessedOnMemory() {
 578  0
     for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
 579  0
       if (dictionary.hasProcessedOnMemory()) {
 580  0
         return true;
 581  
       }
 582  0
     }
 583  0
     return false;
 584  
   }
 585  
 
 586  
   /**
 587  
    * Whether a partition is *processed* in the current iteration cycle over
 588  
    * partitions.
 589  
    *
 590  
    * @param partitionId id of the partition to check
 591  
    * @return true iff processing the given partition is done
 592  
    */
 593  
   public boolean isPartitionProcessed(Integer partitionId) {
 594  0
     MetaPartition meta = partitions.get(partitionId);
 595  0
     synchronized (meta) {
 596  0
       return meta.getProcessingState() == ProcessingState.PROCESSED;
 597  0
     }
 598  
   }
 599  
 
 600  
   /**
 601  
    * Mark a partition as 'PROCESSED'
 602  
    *
 603  
    * @param partitionId id of the partition to mark
 604  
    */
 605  
   public void setPartitionIsProcessed(int partitionId) {
 606  0
     MetaPartition meta = partitions.get(partitionId);
 607  0
     int ownerThread = getOwnerThreadId(partitionId);
 608  0
     synchronized (meta) {
 609  0
       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
 610  0
       meta.setProcessingState(ProcessingState.PROCESSED);
 611  0
       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
 612  0
     }
 613  0
     numPartitionsProcessed.getAndIncrement();
 614  0
   }
 615  
 
 616  
   /**
 617  
    * Notify this meta store that load of a partition for a specific superstep
 618  
    * is about to start.
 619  
    *
 620  
    * @param partitionId id of the partition to load to memory
 621  
    * @param superstep superstep in which the partition is needed for
 622  
    * @return true iff load of the given partition is viable
 623  
    */
 624  
   public boolean startLoadingPartition(int partitionId, long superstep) {
 625  0
     MetaPartition meta = partitions.get(partitionId);
 626  0
     synchronized (meta) {
 627  0
       boolean shouldLoad = meta.getPartitionState() == StorageState.ON_DISK;
 628  0
       if (superstep == oocEngine.getSuperstep()) {
 629  0
         shouldLoad |= meta.getCurrentMessagesState() == StorageState.ON_DISK;
 630  
       } else {
 631  0
         shouldLoad |= meta.getIncomingMessagesState() == StorageState.ON_DISK;
 632  
       }
 633  0
       return shouldLoad;
 634  0
     }
 635  
   }
 636  
 
 637  
   /**
 638  
    * Notify this meta store that load of a partition for a specific superstep
 639  
    * is completed
 640  
    *
 641  
    * @param partitionId id of the partition for which the load is completed
 642  
    * @param superstep superstep in which the partition is loaded for
 643  
    */
 644  
   public void doneLoadingPartition(int partitionId, long superstep) {
 645  0
     MetaPartition meta = partitions.get(partitionId);
 646  0
     int owner = getOwnerThreadId(partitionId);
 647  0
     synchronized (meta) {
 648  0
       PartitionStorageState stateBefore = meta.getPartitionStorageState();
 649  0
       perThreadPartitionDictionary.get(owner).removePartition(meta);
 650  0
       meta.setPartitionState(StorageState.IN_MEM);
 651  0
       if (superstep == oocEngine.getSuperstep()) {
 652  0
         meta.setCurrentMessagesState(StorageState.IN_MEM);
 653  
       } else {
 654  0
         meta.setIncomingMessagesState(StorageState.IN_MEM);
 655  
       }
 656  0
       PartitionStorageState stateAfter = meta.getPartitionStorageState();
 657  0
       updateCounters(stateBefore, stateAfter);
 658  
       // Check whether load was to prefetch a partition from disk to memory for
 659  
       // the next superstep
 660  0
       if (meta.getProcessingState() == ProcessingState.PROCESSED) {
 661  0
         perThreadPartitionDictionary.get(owner).increaseNumPrefetch();
 662  
       }
 663  0
       perThreadPartitionDictionary.get(owner).addPartition(meta);
 664  0
     }
 665  0
     updateGraphFractionInMemory();
 666  0
   }
 667  
 
 668  
   /**
 669  
    * Notify this meta store that offload of messages for a particular partition
 670  
    * is about to start.
 671  
    *
 672  
    * @param partitionId id of the partition that its messages is being offloaded
 673  
    * @return true iff offload of messages of the given partition is viable
 674  
    */
 675  
   public boolean startOffloadingMessages(int partitionId) {
 676  0
     MetaPartition meta = partitions.get(partitionId);
 677  0
     int ownerThread = getOwnerThreadId(partitionId);
 678  0
     synchronized (meta) {
 679  0
       if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
 680  0
         perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
 681  0
         meta.setIncomingMessagesState(StorageState.IN_TRANSIT);
 682  0
         perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
 683  0
         return true;
 684  
       } else {
 685  0
         return false;
 686  
       }
 687  0
     }
 688  
   }
 689  
 
 690  
   /**
 691  
    * Notify this meta store that offload of messages for a particular partition
 692  
    * is complete.
 693  
    *
 694  
    * @param partitionId id of the partition that its messages is offloaded to
 695  
    *                    disk
 696  
    */
 697  
   public void doneOffloadingMessages(int partitionId) {
 698  0
     MetaPartition meta = partitions.get(partitionId);
 699  0
     int ownerThread = getOwnerThreadId(partitionId);
 700  0
     synchronized (meta) {
 701  0
       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
 702  0
       meta.setIncomingMessagesState(StorageState.ON_DISK);
 703  0
       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
 704  0
     }
 705  0
   }
 706  
 
 707  
   /**
 708  
    * Notify this meta store that offload of raw data buffers (vertex/edges/
 709  
    * messages) of a particular partition is about to start.
 710  
    *
 711  
    * @param partitionId id of the partition that its buffer is being offloaded
 712  
    * @return true iff offload of buffers of the given partition is viable
 713  
    */
 714  
   public boolean startOffloadingBuffer(int partitionId) {
 715  
     // Do nothing
 716  0
     return true;
 717  
   }
 718  
 
 719  
   /**
 720  
    * Notify this meta store that offload of raw data buffers (vertex/edges/
 721  
    * messages) of a particular partition is completed.
 722  
    *
 723  
    * @param partitionId id of the partition that its buffer is offloaded
 724  
    */
 725  
   public void doneOffloadingBuffer(int partitionId) {
 726  
     // Do nothing
 727  0
   }
 728  
 
 729  
   /**
 730  
    * Notify this meta store that offload of a partition (partition data and its
 731  
    * current messages) is about to start.
 732  
    *
 733  
    * @param partitionId id of the partition that its data is being offloaded
 734  
    * @return true iff offload of the given partition is viable
 735  
    */
 736  
   public boolean startOffloadingPartition(int partitionId) {
 737  0
     MetaPartition meta = partitions.get(partitionId);
 738  0
     int owner = getOwnerThreadId(partitionId);
 739  0
     synchronized (meta) {
 740  0
       if (meta.getProcessingState() != ProcessingState.IN_PROCESS &&
 741  0
           (meta.getPartitionState() == StorageState.IN_MEM ||
 742  0
           meta.getCurrentMessagesState() == StorageState.IN_MEM)) {
 743  0
         perThreadPartitionDictionary.get(owner).removePartition(meta);
 744  
         // We may only need to offload either partition or current messages of
 745  
         // that partition to disk. So, if either of the components (partition
 746  
         // or its current messages) is already on disk, we should not update its
 747  
         // metadata.
 748  0
         if (meta.getPartitionState() != StorageState.ON_DISK) {
 749  0
           meta.setPartitionState(StorageState.IN_TRANSIT);
 750  
         }
 751  0
         if (meta.getCurrentMessagesState() != StorageState.ON_DISK) {
 752  0
           meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
 753  
         }
 754  0
         perThreadPartitionDictionary.get(owner).addPartition(meta);
 755  0
         return true;
 756  
       } else {
 757  0
         return false;
 758  
       }
 759  0
     }
 760  
   }
 761  
 
 762  
   /**
 763  
    * Notify this meta store that offload of a partition (partition data and its
 764  
    * current messages) is completed.
 765  
    *
 766  
    * @param partitionId id of the partition that its data is offloaded
 767  
    */
 768  
   public void doneOffloadingPartition(int partitionId) {
 769  0
     MetaPartition meta = partitions.get(partitionId);
 770  0
     int owner = getOwnerThreadId(partitionId);
 771  0
     synchronized (meta) {
 772  
       // We either offload both partition and its messages to disk, or we only
 773  
       // offload one of the components.
 774  0
       if (meta.getCurrentMessagesState() == StorageState.IN_TRANSIT &&
 775  0
           meta.getPartitionState() == StorageState.IN_TRANSIT) {
 776  0
         numInMemoryPartitions.getAndDecrement();
 777  
       } else {
 778  0
         numPartiallyInMemoryPartitions.getAndDecrement();
 779  
       }
 780  0
       perThreadPartitionDictionary.get(owner).removePartition(meta);
 781  0
       meta.setPartitionState(StorageState.ON_DISK);
 782  0
       meta.setCurrentMessagesState(StorageState.ON_DISK);
 783  0
       perThreadPartitionDictionary.get(owner).addPartition(meta);
 784  0
     }
 785  0
     updateGraphFractionInMemory();
 786  0
   }
 787  
 
 788  
   /**
 789  
    * Reset the meta store for a new iteration cycle over all partitions.
 790  
    * Note: this is not thread-safe and should be called from a single thread.
 791  
    */
 792  
   public void resetPartitions() {
 793  0
     for (MetaPartition meta : partitions.values()) {
 794  0
       int owner = getOwnerThreadId(meta.getPartitionId());
 795  0
       perThreadPartitionDictionary.get(owner).removePartition(meta);
 796  0
       meta.resetPartition();
 797  0
       perThreadPartitionDictionary.get(owner).addPartition(meta);
 798  0
     }
 799  0
     for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
 800  0
       dictionary.reset();
 801  0
     }
 802  0
     numPartitionsProcessed.set(0);
 803  0
   }
 804  
 
 805  
   /**
 806  
    * Reset messages in the meta store.
 807  
    * Note: this is not thread-safe and should be called from a single thread.
 808  
    */
 809  
   public void resetMessages() {
 810  0
     for (MetaPartition meta : partitions.values()) {
 811  0
       int owner = getOwnerThreadId(meta.getPartitionId());
 812  0
       perThreadPartitionDictionary.get(owner).removePartition(meta);
 813  0
       PartitionStorageState stateBefore = meta.getPartitionStorageState();
 814  0
       meta.resetMessages();
 815  0
       PartitionStorageState stateAfter = meta.getPartitionStorageState();
 816  0
       updateCounters(stateBefore, stateAfter);
 817  0
       perThreadPartitionDictionary.get(owner).addPartition(meta);
 818  0
     }
 819  0
   }
 820  
 
 821  
   /**
 822  
    * Return the id of an unprocessed partition in memory. If all partitions are
 823  
    * processed, return an appropriate 'finisher signal'. If there are
 824  
    * unprocessed partitions, but none are in memory, return null.
 825  
    *
 826  
    * @return id of the partition to be processed next.
 827  
    */
 828  
   public Integer getNextPartition() {
 829  0
     if (numPartitionsProcessed.get() >= partitions.size()) {
 830  0
       return NO_PARTITION_TO_PROCESS;
 831  
     }
 832  0
     int numThreads = perThreadPartitionDictionary.size();
 833  0
     int index = randomGenerator.nextInt(numThreads);
 834  0
     int startIndex = index;
 835  
     MetaPartition meta;
 836  
     do {
 837  
       // We first look up a partition in the reverse dictionary. If there is a
 838  
       // partition with the given properties, we then check whether we can
 839  
       // return it as the next partition to process. If we cannot, there may
 840  
       // still be other partitions in the dictionary, so we will continue
 841  
       // looping through all of them. If all the partitions with our desired
 842  
       // properties has been examined, we will break the loop.
 843  
       while (true) {
 844  0
         meta = perThreadPartitionDictionary.get(index).lookup(
 845  
             ProcessingState.UNPROCESSED,
 846  
             StorageState.IN_MEM,
 847  
             StorageState.IN_MEM,
 848  
             null);
 849  0
         if (meta != null) {
 850  
           // Here we should check if the 'meta' still has the same property as
 851  
           // when it was looked up in the dictionary. There may be a case where
 852  
           // meta changes from the time it is looked up until the moment the
 853  
           // synchronize block is granted to progress.
 854  0
           synchronized (meta) {
 855  0
             if (meta.getProcessingState() == ProcessingState.UNPROCESSED &&
 856  0
                 meta.getPartitionState() == StorageState.IN_MEM &&
 857  0
                 meta.getCurrentMessagesState() == StorageState.IN_MEM) {
 858  0
               perThreadPartitionDictionary.get(index).removePartition(meta);
 859  0
               meta.setProcessingState(ProcessingState.IN_PROCESS);
 860  0
               perThreadPartitionDictionary.get(index).addPartition(meta);
 861  0
               return meta.getPartitionId();
 862  
             }
 863  0
           }
 864  
         } else {
 865  
           break;
 866  
         }
 867  
       }
 868  0
       index = (index + 1) % numThreads;
 869  0
     } while (index != startIndex);
 870  0
     return null;
 871  
   }
 872  
 
 873  
   /**
 874  
    * Whether a partition is on disk (both its data and its current messages)
 875  
    *
 876  
    * @param partitionId id of the partition to check if it is on disk
 877  
    * @return true if partition data or its current messages are on disk, false
 878  
    *         otherwise
 879  
    */
 880  
   public boolean isPartitionOnDisk(int partitionId) {
 881  0
     MetaPartition meta = partitions.get(partitionId);
 882  0
     synchronized (meta) {
 883  0
       return meta.isOnDisk();
 884  0
     }
 885  
   }
 886  
 
 887  
   /**
 888  
    * Representation of meta information of a partition
 889  
    */
 890  
   private static class MetaPartition {
 891  
     /** Id of the partition */
 892  
     private int partitionId;
 893  
     /** Storage state of incoming messages */
 894  
     private StorageState incomingMessagesState;
 895  
     /** Storage state of current messages */
 896  
     private StorageState currentMessagesState;
 897  
     /** Storage state of partition data */
 898  
     private StorageState partitionState;
 899  
     /** Processing state of a partition */
 900  
     private ProcessingState processingState;
 901  
 
 902  
     /**
 903  
      * Constructor
 904  
      *
 905  
      * @param partitionId id of the partition
 906  
      */
 907  0
     public MetaPartition(int partitionId) {
 908  0
       this.partitionId = partitionId;
 909  0
       this.processingState = ProcessingState.UNPROCESSED;
 910  0
       this.partitionState = StorageState.IN_MEM;
 911  0
       this.currentMessagesState = StorageState.IN_MEM;
 912  0
       this.incomingMessagesState = StorageState.IN_MEM;
 913  0
     }
 914  
 
 915  
     @Override
 916  
     public String toString() {
 917  0
       StringBuffer sb = new StringBuffer();
 918  0
       sb.append("\nMetaData: {");
 919  0
       sb.append("ID: " + partitionId + "; ");
 920  0
       sb.append("Partition: " + partitionState + "; ");
 921  0
       sb.append("Current Messages: " + currentMessagesState + "; ");
 922  0
       sb.append("Incoming Messages: " + incomingMessagesState + "; ");
 923  0
       sb.append("Processed? : " + processingState + "}");
 924  0
       return sb.toString();
 925  
     }
 926  
 
 927  
     public int getPartitionId() {
 928  0
       return partitionId;
 929  
     }
 930  
 
 931  
     public StorageState getIncomingMessagesState() {
 932  0
       return incomingMessagesState;
 933  
     }
 934  
 
 935  
     public void setIncomingMessagesState(StorageState incomingMessagesState) {
 936  0
       this.incomingMessagesState = incomingMessagesState;
 937  0
     }
 938  
 
 939  
     public StorageState getCurrentMessagesState() {
 940  0
       return currentMessagesState;
 941  
     }
 942  
 
 943  
     public void setCurrentMessagesState(StorageState currentMessagesState) {
 944  0
       this.currentMessagesState = currentMessagesState;
 945  0
     }
 946  
 
 947  
     public StorageState getPartitionState() {
 948  0
       return partitionState;
 949  
     }
 950  
 
 951  
     public void setPartitionState(StorageState state) {
 952  0
       this.partitionState = state;
 953  0
     }
 954  
 
 955  
     public ProcessingState getProcessingState() {
 956  0
       return processingState;
 957  
     }
 958  
 
 959  
     public void setProcessingState(ProcessingState processingState) {
 960  0
       this.processingState = processingState;
 961  0
     }
 962  
 
 963  
     /**
 964  
      * Whether the partition is on disk (either its data or its current
 965  
      * messages)
 966  
      *
 967  
      * @return true if the partition is on disk, false otherwise
 968  
      */
 969  
     public boolean isOnDisk() {
 970  0
       return partitionState == StorageState.ON_DISK ||
 971  
           currentMessagesState == StorageState.ON_DISK;
 972  
     }
 973  
 
 974  
     /**
 975  
      * Reset the partition meta information for the next iteration cycle
 976  
      */
 977  
     public void resetPartition() {
 978  0
       processingState = ProcessingState.UNPROCESSED;
 979  0
     }
 980  
 
 981  
     /**
 982  
      * Reset messages meta information for the next iteration cycle
 983  
      */
 984  
     public void resetMessages() {
 985  0
       currentMessagesState = incomingMessagesState;
 986  0
       incomingMessagesState = StorageState.IN_MEM;
 987  0
     }
 988  
 
 989  
     /**
 990  
      * @return the state of the partition and its current messages as a whole
 991  
      */
 992  
     public PartitionStorageState getPartitionStorageState() {
 993  0
       if (partitionState == StorageState.ON_DISK &&
 994  
           currentMessagesState == StorageState.ON_DISK) {
 995  0
         return PartitionStorageState.FULLY_ON_DISK;
 996  0
       } else if (partitionState == StorageState.IN_MEM &&
 997  
           currentMessagesState == StorageState.IN_MEM) {
 998  0
         return PartitionStorageState.FULLY_IN_MEM;
 999  
       } else {
 1000  0
         return PartitionStorageState.PARTIALLY_IN_MEM;
 1001  
       }
 1002  
     }
 1003  
   }
 1004  
 
 1005  
   /**
 1006  
    * Class representing reverse dictionary for partitions. The main operation
 1007  
    * of the reverse dictionary is to lookup for a partition with certain
 1008  
    * properties. The responsibility of keeping the dictionary consistent
 1009  
    * when partition property changes in on the code that changes the property.
 1010  
    * One can simply remove a partition from the dictionary, change the property
 1011  
    * (or properties), and then add the partition to the dictionary.
 1012  
    */
 1013  
   private static class MetaPartitionDictionary {
 1014  
     /**
 1015  
      * Sets of partitions for each possible combination of properties. Each
 1016  
      * partition can have 4 properties, and each property can have any of 3
 1017  
      * different values. The properties are as follows (in the order in which
 1018  
      * it is used as the dimensions of the following 4-D array):
 1019  
      *  - processing status (PROCESSED, UN_PROCESSED, or IN_PROCESS)
 1020  
      *  - partition storage status (IN_MEM, IN_TRANSIT, ON_DISK)
 1021  
      *  - current messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)
 1022  
      *  - incoming messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)
 1023  
      */
 1024  0
     private final Set<MetaPartition>[][][][] partitions =
 1025  
         (Set<MetaPartition>[][][][]) new Set<?>[3][3][3][3];
 1026  
     /**
 1027  
      * Number of partitions that has been prefetched to be computed in the
 1028  
      * next superstep
 1029  
      */
 1030  0
     private final AtomicInteger numPrefetch = new AtomicInteger(0);
 1031  
 
 1032  
     /**
 1033  
      * Constructor
 1034  
      */
 1035  0
     public MetaPartitionDictionary() {
 1036  0
       for (int i = 0; i < 3; ++i) {
 1037  0
         for (int j = 0; j < 3; ++j) {
 1038  0
           for (int k = 0; k < 3; ++k) {
 1039  0
             for (int t = 0; t < 3; ++t) {
 1040  0
               partitions[i][j][k][t] = Sets.newLinkedHashSet();
 1041  
             }
 1042  
           }
 1043  
         }
 1044  
       }
 1045  0
     }
 1046  
 
 1047  
     /**
 1048  
      * Get a partition set associated with property combination that a given
 1049  
      * partition has
 1050  
      *
 1051  
      * @param meta meta partition containing properties of a partition
 1052  
      * @return partition set with the same property combination as the given
 1053  
      *         meta partition
 1054  
      */
 1055  
     private Set<MetaPartition> getSet(MetaPartition meta) {
 1056  0
       return partitions[meta.getProcessingState().ordinal()]
 1057  0
           [meta.getPartitionState().ordinal()]
 1058  0
           [meta.getCurrentMessagesState().ordinal()]
 1059  0
           [meta.getIncomingMessagesState().ordinal()];
 1060  
     }
 1061  
 
 1062  
     /**
 1063  
      * Add a partition to the dictionary
 1064  
      *
 1065  
      * @param meta meta information of the partition to add
 1066  
      */
 1067  
     public void addPartition(MetaPartition meta) {
 1068  0
       Set<MetaPartition> partitionSet = getSet(meta);
 1069  0
       synchronized (partitionSet) {
 1070  0
         partitionSet.add(meta);
 1071  0
       }
 1072  0
     }
 1073  
 
 1074  
     /**
 1075  
      * Remove a partition to the dictionary
 1076  
      *
 1077  
      * @param meta meta infomation of the partition to remove
 1078  
      */
 1079  
     public void removePartition(MetaPartition meta) {
 1080  0
       Set<MetaPartition> partitionSet = getSet(meta);
 1081  0
       synchronized (partitionSet) {
 1082  0
         partitionSet.remove(meta);
 1083  0
       }
 1084  0
     }
 1085  
 
 1086  
     /**
 1087  
      * Lookup for a partition with given properties. One can use wildcard as
 1088  
      * a property in lookup operation (by passing null as the property).
 1089  
      *
 1090  
      * @param processingState processing state property
 1091  
      * @param partitionStorageState partition storage property
 1092  
      * @param currentMessagesState current messages storage property
 1093  
      * @param incomingMessagesState incoming messages storage property
 1094  
      * @return a meta partition in the dictionary with the given combination of
 1095  
      *         properties. If there is no such partition, return null
 1096  
      */
 1097  
     public MetaPartition lookup(ProcessingState processingState,
 1098  
                                 StorageState partitionStorageState,
 1099  
                                 StorageState currentMessagesState,
 1100  
                                 StorageState incomingMessagesState) {
 1101  0
       int iStart =
 1102  0
           (processingState == null) ? 0 : processingState.ordinal();
 1103  0
       int iEnd =
 1104  0
           (processingState == null) ? 3 : (processingState.ordinal() + 1);
 1105  0
       int jStart =
 1106  0
           (partitionStorageState == null) ? 0 : partitionStorageState.ordinal();
 1107  0
       int jEnd = (partitionStorageState == null) ? 3 :
 1108  0
               (partitionStorageState.ordinal() + 1);
 1109  0
       int kStart =
 1110  0
           (currentMessagesState == null) ? 0 : currentMessagesState.ordinal();
 1111  0
       int kEnd = (currentMessagesState == null) ? 3 :
 1112  0
               (currentMessagesState.ordinal() + 1);
 1113  0
       int tStart =
 1114  0
           (incomingMessagesState == null) ? 0 : incomingMessagesState.ordinal();
 1115  0
       int tEnd = (incomingMessagesState == null) ? 3 :
 1116  0
           (incomingMessagesState.ordinal() + 1);
 1117  0
       for (int i = iStart; i < iEnd; ++i) {
 1118  0
         for (int j = jStart; j < jEnd; ++j) {
 1119  0
           for (int k = kStart; k < kEnd; ++k) {
 1120  0
             for (int t = tStart; t < tEnd; ++t) {
 1121  0
               Set<MetaPartition> partitionSet = partitions[i][j][k][t];
 1122  0
               synchronized (partitionSet) {
 1123  0
                 MetaPartition meta = peekFromSet(partitionSet);
 1124  0
                 if (meta != null) {
 1125  0
                   return meta;
 1126  
                 }
 1127  0
               }
 1128  
             }
 1129  
           }
 1130  
         }
 1131  
       }
 1132  0
       return null;
 1133  
     }
 1134  
 
 1135  
     /**
 1136  
      * Whether there is an in-memory partition that is processed already,
 1137  
      * excluding those partitions that are prefetched
 1138  
      *
 1139  
      * @return true if there is a processed in-memory partition
 1140  
      */
 1141  
     public boolean hasProcessedOnMemory() {
 1142  0
       int count = 0;
 1143  0
       for (int i = 0; i < 3; ++i) {
 1144  0
         for (int j = 0; j < 3; ++j) {
 1145  0
           Set<MetaPartition> partitionSet =
 1146  0
               partitions[ProcessingState.PROCESSED.ordinal()]
 1147  0
                   [StorageState.IN_MEM.ordinal()][i][j];
 1148  0
           synchronized (partitionSet) {
 1149  0
             count += partitionSet.size();
 1150  0
           }
 1151  
         }
 1152  
       }
 1153  0
       return count - numPrefetch.get() != 0;
 1154  
     }
 1155  
 
 1156  
     /** Increase number of prefetch-ed partition by 1 */
 1157  
     public void increaseNumPrefetch() {
 1158  0
       numPrefetch.getAndIncrement();
 1159  0
     }
 1160  
 
 1161  
     /**
 1162  
      * Reset the dictionary preparing it for the next iteration cycle over
 1163  
      * partitions
 1164  
      */
 1165  
     public void reset() {
 1166  0
       numPrefetch.set(0);
 1167  0
     }
 1168  
   }
 1169  
 }