Coverage Report - org.apache.giraph.ooc.data.DiskBackedDataStore
 
Classes in this File Line Coverage Branch Coverage Complexity
DiskBackedDataStore
0%
0/118
0%
0/36
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc.data;
 20  
 
 21  
 import com.google.common.collect.Maps;
 22  
 import com.google.common.collect.Sets;
 23  
 import org.apache.commons.lang3.tuple.MutablePair;
 24  
 import org.apache.commons.lang3.tuple.Pair;
 25  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 26  
 import org.apache.giraph.conf.IntConfOption;
 27  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 28  
 import org.apache.giraph.ooc.persistence.DataIndex;
 29  
 import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
 30  
 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 31  
 import org.apache.log4j.Logger;
 32  
 
 33  
 import java.io.DataInput;
 34  
 import java.io.DataOutput;
 35  
 import java.io.IOException;
 36  
 import java.util.ArrayList;
 37  
 import java.util.HashSet;
 38  
 import java.util.List;
 39  
 import java.util.Map;
 40  
 import java.util.Set;
 41  
 import java.util.concurrent.ConcurrentMap;
 42  
 import java.util.concurrent.locks.ReadWriteLock;
 43  
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 44  
 
 45  
 import static com.google.common.base.Preconditions.checkNotNull;
 46  
 import static com.google.common.base.Preconditions.checkState;
 47  
 import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
 48  
 
 49  
 /**
 50  
  * This class provides basic operations for data structures that have to
 51  
  * participate in out-of-core mechanism. Essential subclasses of this class are:
 52  
  *  - DiskBackedPartitionStore (for partition data)
 53  
  *  - DiskBackedMessageStore (for messages)
 54  
  *  - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)
 55  
  * Basically, any data structure that may cause OOM to happen can be implemented
 56  
  * as a subclass of this class.
 57  
  *
 58  
  * There are two different terms used in the rest of this class:
 59  
  *  - "data store" refers to in-memory representation of data. Usually this is
 60  
  *    stored per-partition in in-memory implementations of data structures. For
 61  
  *    instance, "data store" of a DiskBackedPartitionStore would collection of
 62  
  *    all partitions kept in the in-memory partition store within the
 63  
  *    DiskBackedPartitionStore.
 64  
  *  - "raw data buffer" refers to raw data which were supposed to be
 65  
  *    de-serialized and added to the data store, but they remain 'as is' in the
 66  
  *    memory because their corresponding partition is offloaded to disk and is
 67  
  *    not available in the data store.
 68  
  *
 69  
  * @param <T> raw data format of the data store subclassing this class
 70  
  */
 71  
 public abstract class DiskBackedDataStore<T> {
 72  
   /**
 73  
    * Minimum size of a buffer (in bytes) to flush to disk. This is used to
 74  
    * decide whether vertex/edge buffers are large enough to flush to disk.
 75  
    */
 76  0
   public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
 77  
       new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
 78  
           "Minimum size of a buffer (in bytes) to flush to disk.");
 79  
 
 80  
   /** Class logger. */
 81  0
   private static final Logger LOG = Logger.getLogger(
 82  
       DiskBackedDataStore.class);
 83  
   /** Out-of-core engine */
 84  
   protected final OutOfCoreEngine oocEngine;
 85  
   /**
 86  
    * Set containing ids of all partitions where the partition data is in some
 87  
    * file on disk.
 88  
    * Note that the out-of-core mechanism may decide to put the data for a
 89  
    * partition on disk, while the partition data is empty. For instance, at the
 90  
    * beginning of a superstep, out-of-core mechanism may decide to put incoming
 91  
    * messages of a partition on disk, while the partition has not received any
 92  
    * messages. In such scenarios, the "out-of-core mechanism" thinks that the
 93  
    * partition data is on disk, while disk-backed data stores may want to
 94  
    * optimize for IO/metadata accesses and decide not to create/write anything
 95  
    * on files on disk.
 96  
    * In summary, there is a subtle difference between this field and
 97  
    * `hasPartitionOnDisk` field. Basically, this field is used for optimizing
 98  
    * IO (mainly metadata) accesses by disk-backed stores, while
 99  
    * `hasPartitionDataOnDisk` is the view that out-of-core mechanism has
 100  
    * regarding partition storage statuses. Since out-of-core mechanism does not
 101  
    * know about the actual data for a partition, these two fields have to be
 102  
    * separate.
 103  
    */
 104  0
   protected final Set<Integer> hasPartitionDataOnFile =
 105  0
       Sets.newConcurrentHashSet();
 106  
   /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
 107  
   private final int minBufferSizeToOffload;
 108  
   /** Set containing ids of all out-of-core partitions */
 109  0
   private final Set<Integer> hasPartitionDataOnDisk =
 110  0
       Sets.newConcurrentHashSet();
 111  
   /**
 112  
    * Map of partition ids to list of raw data buffers. The map will have entries
 113  
    * only for partitions that their in-memory data structures are currently
 114  
    * offloaded to disk. We keep the aggregate size of buffers for each partition
 115  
    * as part of the values in the map to estimate how much memory we can free up
 116  
    * if we offload data buffers of a particular partition to disk.
 117  
    */
 118  0
   private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
 119  0
       Maps.newConcurrentMap();
 120  
   /**
 121  
    * Map of partition ids to number of raw data buffers offloaded to disk for
 122  
    * each partition. The map will have entries only for partitions that their
 123  
    * in-memory data structures are currently out of core. It is necessary to
 124  
    * know the number of data buffers on disk for a particular partition when we
 125  
    * are loading all these buffers back in memory.
 126  
    */
 127  0
   private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
 128  0
       Maps.newConcurrentMap();
 129  
   /**
 130  
    * Lock to avoid overlapping of read and write on data associated with each
 131  
    * partition.
 132  
    * */
 133  0
   private final ConcurrentMap<Integer, ReadWriteLock> locks =
 134  0
       Maps.newConcurrentMap();
 135  
 
 136  
   /**
 137  
    * Constructor.
 138  
    *
 139  
    * @param conf Configuration
 140  
    * @param oocEngine Out-of-core engine
 141  
    */
 142  
   DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf,
 143  0
                       OutOfCoreEngine oocEngine) {
 144  0
     this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
 145  0
     this.oocEngine = oocEngine;
 146  0
   }
 147  
 
 148  
   /**
 149  
    * Retrieves a lock for a given partition. If the lock for the given partition
 150  
    * does not exist, creates a new lock.
 151  
    *
 152  
    * @param partitionId id of the partition the lock is needed for
 153  
    * @return lock for a given partition
 154  
    */
 155  
   private ReadWriteLock getPartitionLock(int partitionId) {
 156  0
     ReadWriteLock readWriteLock = locks.get(partitionId);
 157  0
     if (readWriteLock == null) {
 158  0
       readWriteLock = new ReentrantReadWriteLock();
 159  0
       ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
 160  0
       if (temp != null) {
 161  0
         readWriteLock = temp;
 162  
       }
 163  
     }
 164  0
     return readWriteLock;
 165  
   }
 166  
 
 167  
   /**
 168  
    * Adds a data entry for a given partition to the current data store. If data
 169  
    * of a given partition in data store is already offloaded to disk, adds the
 170  
    * data entry to appropriate raw data buffer list.
 171  
    *
 172  
    * @param partitionId id of the partition to add the data entry to
 173  
    * @param entry data entry to add
 174  
    */
 175  
   protected void addEntry(int partitionId, T entry) {
 176  
     // Addition of data entries to a data store is much more common than
 177  
     // out-of-core operations. Besides, in-memory data store implementations
 178  
     // existing in the code base already account for parallel addition to data
 179  
     // stores. Therefore, using read lock would optimize for parallel addition
 180  
     // to data stores, specially for cases where the addition should happen for
 181  
     // partitions that are entirely in memory.
 182  0
     ReadWriteLock rwLock = getPartitionLock(partitionId);
 183  0
     rwLock.readLock().lock();
 184  0
     if (hasPartitionDataOnDisk.contains(partitionId)) {
 185  0
       List<T> entryList = new ArrayList<>();
 186  0
       entryList.add(entry);
 187  0
       int entrySize = entrySerializedSize(entry);
 188  0
       MutablePair<Integer, List<T>> newPair =
 189  0
           new MutablePair<>(entrySize, entryList);
 190  0
       Pair<Integer, List<T>> oldPair =
 191  0
           dataBuffers.putIfAbsent(partitionId, newPair);
 192  0
       if (oldPair != null) {
 193  0
         synchronized (oldPair) {
 194  0
           newPair = (MutablePair<Integer, List<T>>) oldPair;
 195  0
           newPair.setLeft(oldPair.getLeft() + entrySize);
 196  0
           newPair.getRight().add(entry);
 197  0
         }
 198  
       }
 199  0
     } else {
 200  0
       addEntryToInMemoryPartitionData(partitionId, entry);
 201  
     }
 202  0
     rwLock.readLock().unlock();
 203  0
   }
 204  
 
 205  
   /**
 206  
    * Loads and assembles all data for a given partition, and put it into the
 207  
    * data store. Returns the number of bytes transferred from disk to memory in
 208  
    * the loading process.
 209  
    *
 210  
    * @param partitionId id of the partition to load and assemble all data for
 211  
    * @return number of bytes loaded from disk to memory
 212  
    * @throws IOException
 213  
    */
 214  
   public abstract long loadPartitionData(int partitionId) throws IOException;
 215  
 
 216  
   /**
 217  
    * The proxy method that does the actual operation for `loadPartitionData`,
 218  
    * but uses the data index given by the caller.
 219  
    *
 220  
    * @param partitionId id of the partition to load and assemble all data for
 221  
    * @param index data index chain for the data to load
 222  
    * @return number of bytes loaded from disk to memory
 223  
    * @throws IOException
 224  
    */
 225  
   protected long loadPartitionDataProxy(int partitionId, DataIndex index)
 226  
       throws IOException {
 227  0
     long numBytes = 0;
 228  0
     ReadWriteLock rwLock = getPartitionLock(partitionId);
 229  0
     rwLock.writeLock().lock();
 230  0
     if (hasPartitionDataOnDisk.contains(partitionId)) {
 231  0
       int ioThreadId =
 232  0
           oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
 233  0
       numBytes += loadInMemoryPartitionData(partitionId, ioThreadId,
 234  0
           index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
 235  0
       hasPartitionDataOnDisk.remove(partitionId);
 236  
       // Loading raw data buffers from disk if there is any and applying those
 237  
       // to already loaded in-memory data.
 238  0
       Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
 239  0
       if (numBuffers != null) {
 240  0
         checkState(numBuffers > 0);
 241  0
         index.addIndex(DataIndex.TypeIndexEntry.BUFFER);
 242  0
         OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
 243  0
             oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
 244  0
         DataInput dataInput = inputWrapper.getDataInput();
 245  0
         for (int i = 0; i < numBuffers; ++i) {
 246  0
           T entry = readNextEntry(dataInput);
 247  0
           addEntryToInMemoryPartitionData(partitionId, entry);
 248  
         }
 249  0
         numBytes += inputWrapper.finalizeInput(true);
 250  0
         index.removeLastIndex();
 251  
       }
 252  0
       index.removeLastIndex();
 253  
       // Applying in-memory raw data buffers to in-memory partition data.
 254  0
       Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
 255  0
       if (pair != null) {
 256  0
         for (T entry : pair.getValue()) {
 257  0
           addEntryToInMemoryPartitionData(partitionId, entry);
 258  0
         }
 259  
       }
 260  
     }
 261  0
     rwLock.writeLock().unlock();
 262  0
     return numBytes;
 263  
   }
 264  
 
 265  
   /**
 266  
    * Offloads partition data of a given partition in the data store to disk, and
 267  
    * returns the number of bytes offloaded from memory to disk.
 268  
    *
 269  
    * @param partitionId id of the partition to offload its data
 270  
    * @return number of bytes offloaded from memory to disk
 271  
    * @throws IOException
 272  
    */
 273  
   public abstract long offloadPartitionData(int partitionId) throws IOException;
 274  
 
 275  
   /**
 276  
    * The proxy method that does the actual operation for `offloadPartitionData`,
 277  
    * but uses the data index given by the caller.
 278  
    *
 279  
    * @param partitionId id of the partition to offload its data
 280  
    * @param index data index chain for the data to offload
 281  
    * @return number of bytes offloaded from memory to disk
 282  
    * @throws IOException
 283  
    */
 284  
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
 285  
       "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
 286  
   protected long offloadPartitionDataProxy(
 287  
       int partitionId, DataIndex index) throws IOException {
 288  0
     ReadWriteLock rwLock = getPartitionLock(partitionId);
 289  0
     rwLock.writeLock().lock();
 290  0
     hasPartitionDataOnDisk.add(partitionId);
 291  0
     rwLock.writeLock().unlock();
 292  0
     int ioThreadId =
 293  0
         oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
 294  0
     long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId,
 295  0
         index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
 296  0
     index.removeLastIndex();
 297  0
     return numBytes;
 298  
   }
 299  
 
 300  
   /**
 301  
    * Offloads raw data buffers of a given partition to disk, and returns the
 302  
    * number of bytes offloaded from memory to disk.
 303  
    *
 304  
    * @param partitionId id of the partition to offload its raw data buffers
 305  
    * @return number of bytes offloaded from memory to disk
 306  
    * @throws IOException
 307  
    */
 308  
   public abstract long offloadBuffers(int partitionId) throws IOException;
 309  
 
 310  
   /**
 311  
    * The proxy method that does the actual operation for `offloadBuffers`,
 312  
    * but uses the data index given by the caller.
 313  
    *
 314  
    * @param partitionId id of the partition to offload its raw data buffers
 315  
    * @param index data index chain for the data to offload its buffers
 316  
    * @return number of bytes offloaded from memory to disk
 317  
    * @throws IOException
 318  
    */
 319  
   protected long offloadBuffersProxy(int partitionId, DataIndex index)
 320  
       throws IOException {
 321  0
     Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
 322  0
     if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
 323  0
       return 0;
 324  
     }
 325  0
     ReadWriteLock rwLock = getPartitionLock(partitionId);
 326  0
     rwLock.writeLock().lock();
 327  0
     pair = dataBuffers.remove(partitionId);
 328  0
     rwLock.writeLock().unlock();
 329  0
     checkNotNull(pair);
 330  0
     checkState(!pair.getRight().isEmpty());
 331  0
     int ioThreadId =
 332  0
         oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
 333  0
     index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))
 334  0
         .addIndex(DataIndex.TypeIndexEntry.BUFFER);
 335  0
     OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
 336  0
         oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
 337  
             true);
 338  0
     for (T entry : pair.getRight()) {
 339  0
       writeEntry(entry, outputWrapper.getDataOutput());
 340  0
     }
 341  0
     long numBytes = outputWrapper.finalizeOutput();
 342  0
     index.removeLastIndex().removeLastIndex();
 343  0
     int numBuffers = pair.getRight().size();
 344  0
     Integer oldNumBuffersOnDisk =
 345  0
         numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
 346  0
     if (oldNumBuffersOnDisk != null) {
 347  0
       numDataBuffersOnDisk.replace(partitionId,
 348  0
           oldNumBuffersOnDisk + numBuffers);
 349  
     }
 350  0
     return numBytes;
 351  
   }
 352  
 
 353  
   /**
 354  
    * Looks through all partitions that their data is not in the data store (is
 355  
    * offloaded to disk), and sees if any of them has enough raw data buffer in
 356  
    * memory. If so, puts that partition in a list to return.
 357  
    *
 358  
    * @param ioThreadId Id of the IO thread who would offload the buffers
 359  
    * @return Set of partition ids of all partition raw buffers where the
 360  
    *         aggregate size of buffers are large enough and it is worth flushing
 361  
    *         those buffers to disk
 362  
    */
 363  
   public Set<Integer> getCandidateBuffersToOffload(int ioThreadId) {
 364  0
     Set<Integer> result = new HashSet<>();
 365  
     for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
 366  0
         dataBuffers.entrySet()) {
 367  0
       int partitionId = entry.getKey();
 368  0
       long aggregateBufferSize = entry.getValue().getLeft();
 369  0
       if (aggregateBufferSize > minBufferSizeToOffload &&
 370  0
           oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId) ==
 371  
               ioThreadId) {
 372  0
         result.add(partitionId);
 373  
       }
 374  0
     }
 375  0
     return result;
 376  
   }
 377  
 
 378  
   /**
 379  
    * Writes a single raw entry to a given output stream.
 380  
    *
 381  
    * @param entry entry to write to output
 382  
    * @param out output stream to write the entry to
 383  
    * @throws IOException
 384  
    */
 385  
   protected abstract void writeEntry(T entry, DataOutput out)
 386  
       throws IOException;
 387  
 
 388  
   /**
 389  
    * Reads the next available raw entry from a given input stream.
 390  
    *
 391  
    * @param in input stream to read the entry from
 392  
    * @return entry read from an input stream
 393  
    * @throws IOException
 394  
    */
 395  
   protected abstract T readNextEntry(DataInput in) throws IOException;
 396  
 
 397  
   /**
 398  
    * Loads data of a partition into data store. Returns number of bytes loaded.
 399  
    *
 400  
    * @param partitionId id of the partition to load its data
 401  
    * @param ioThreadId id of the IO thread performing the load
 402  
    * @param index data index chain for the data to load
 403  
    * @return number of bytes loaded from disk to memory
 404  
    * @throws IOException
 405  
    */
 406  
   protected abstract long loadInMemoryPartitionData(
 407  
       int partitionId, int ioThreadId, DataIndex index) throws IOException;
 408  
 
 409  
   /**
 410  
    * Offloads data of a partition in data store to disk. Returns the number of
 411  
    * bytes offloaded to disk
 412  
    *
 413  
    * @param partitionId id of the partition to offload to disk
 414  
    * @param ioThreadId id of the IO thread performing the offload
 415  
    * @param index data index chain for the data to offload
 416  
    * @return number of bytes offloaded from memory to disk
 417  
    * @throws IOException
 418  
    */
 419  
   protected abstract long offloadInMemoryPartitionData(
 420  
       int partitionId, int ioThreadId, DataIndex index) throws IOException;
 421  
 
 422  
   /**
 423  
    * Gets the size of a given entry in bytes.
 424  
    *
 425  
    * @param entry input entry to find its size
 426  
    * @return size of given input entry in bytes
 427  
    */
 428  
   protected abstract int entrySerializedSize(T entry);
 429  
 
 430  
   /**
 431  
    * Adds a single entry for a given partition to the in-memory data store.
 432  
    *
 433  
    * @param partitionId id of the partition to add the data to
 434  
    * @param entry input entry to add to the data store
 435  
    */
 436  
   protected abstract void addEntryToInMemoryPartitionData(int partitionId,
 437  
                                                           T entry);
 438  
 }