Coverage Report - org.apache.giraph.ooc.data.DiskBackedPartitionStore
 
Classes in this File Line Coverage Branch Coverage Complexity
DiskBackedPartitionStore
0%
0/164
0%
0/46
1.929
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc.data;
 20  
 
 21  
 import com.google.common.collect.Maps;
 22  
 import org.apache.giraph.bsp.BspService;
 23  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 24  
 import org.apache.giraph.edge.OutEdges;
 25  
 import org.apache.giraph.graph.Vertex;
 26  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 27  
 import org.apache.giraph.ooc.persistence.DataIndex;
 28  
 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 29  
 import org.apache.giraph.partition.Partition;
 30  
 import org.apache.giraph.partition.PartitionStore;
 31  
 import org.apache.giraph.utils.ExtendedDataOutput;
 32  
 import org.apache.giraph.utils.WritableUtils;
 33  
 import org.apache.giraph.worker.BspServiceWorker;
 34  
 import org.apache.hadoop.io.Writable;
 35  
 import org.apache.hadoop.io.WritableComparable;
 36  
 import org.apache.hadoop.mapreduce.Mapper;
 37  
 import org.apache.log4j.Logger;
 38  
 
 39  
 import java.io.DataInput;
 40  
 import java.io.DataOutput;
 41  
 import java.io.IOException;
 42  
 import java.util.Map;
 43  
 
 44  
 import static com.google.common.base.Preconditions.checkNotNull;
 45  
 
 46  
 /**
 47  
  * Implementation of a partition-store used for out-of-core mechanism.
 48  
  * Partition store is responsible for partition data, as well as data buffers in
 49  
  * INPUT_SUPERSTEP ("raw data buffer" -- defined in DiskBackedDataStore --
 50  
  * refers to vertex buffers in INPUT_SUPERSTEP).
 51  
  *
 52  
  * @param <I> Vertex id
 53  
  * @param <V> Vertex data
 54  
  * @param <E> Edge data
 55  
  */
 56  0
 public class DiskBackedPartitionStore<I extends WritableComparable,
 57  
     V extends Writable, E extends Writable>
 58  
     extends DiskBackedDataStore<ExtendedDataOutput>
 59  
     implements PartitionStore<I, V, E> {
 60  
   /** Class logger. */
 61  0
   private static final Logger LOG =
 62  0
       Logger.getLogger(DiskBackedPartitionStore.class);
 63  
   /** Configuration */
 64  
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
 65  
   /** Job context (for progress) */
 66  
   private final Mapper<?, ?, ?, ?>.Context context;
 67  
   /** In-memory partition store */
 68  
   private final PartitionStore<I, V, E> partitionStore;
 69  
   /**
 70  
    * Keeps number of vertices in partitions, right when they are last spilled
 71  
    * to the disk. This value may be inaccurate for in-memory partitions, but
 72  
    * is accurate for out-of-core partitions.
 73  
    */
 74  0
   private final Map<Integer, Long> partitionVertexCount =
 75  0
       Maps.newConcurrentMap();
 76  
   /**
 77  
    * Keeps number of edges in partitions, right when they are last spilled
 78  
    * to the disk. This value may be inaccurate for in-memory partitions, but
 79  
    * is accurate for out-of-core partitions.
 80  
    */
 81  0
   private final Map<Integer, Long> partitionEdgeCount =
 82  0
       Maps.newConcurrentMap();
 83  
 
 84  
   /**
 85  
    * Constructor.
 86  
    *
 87  
    * @param partitionStore In-memory partition store for which out-of-code
 88  
    *                       partition store would be a wrapper
 89  
    * @param conf Configuration
 90  
    * @param context Job context
 91  
    * @param oocEngine Out-of-core engine
 92  
    */
 93  
   public DiskBackedPartitionStore(
 94  
       PartitionStore<I, V, E> partitionStore,
 95  
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
 96  
       Mapper<?, ?, ?, ?>.Context context,
 97  
       OutOfCoreEngine oocEngine) {
 98  0
     super(conf, oocEngine);
 99  0
     this.partitionStore = partitionStore;
 100  0
     this.conf = conf;
 101  0
     this.context = context;
 102  0
   }
 103  
 
 104  
   @Override
 105  
   public boolean addPartition(Partition<I, V, E> partition) {
 106  0
     boolean added = partitionStore.addPartition(partition);
 107  0
     if (added) {
 108  0
       oocEngine.getMetaPartitionManager()
 109  0
           .addPartition(partition.getId());
 110  
     }
 111  0
     return added;
 112  
   }
 113  
 
 114  
   @Override
 115  
   public Partition<I, V, E> removePartition(Integer partitionId) {
 116  
     // Set the partition as 'in process' so its data and messages do not get
 117  
     // spilled to disk until the remove is complete.
 118  0
     oocEngine.getMetaPartitionManager().markPartitionAsInProcess(partitionId);
 119  0
     oocEngine.retrievePartition(partitionId);
 120  0
     Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
 121  0
     checkNotNull(partition, "removePartition: partition " + partitionId +
 122  
         " is not in memory for removal!");
 123  0
     oocEngine.getMetaPartitionManager().removePartition(partitionId);
 124  0
     return partition;
 125  
   }
 126  
 
 127  
   @Override
 128  
   public boolean hasPartition(Integer partitionId) {
 129  0
     return oocEngine.getMetaPartitionManager().hasPartition(partitionId);
 130  
   }
 131  
 
 132  
   @Override
 133  
   public Iterable<Integer> getPartitionIds() {
 134  0
     return oocEngine.getMetaPartitionManager().getPartitionIds();
 135  
   }
 136  
 
 137  
   @Override
 138  
   public int getNumPartitions() {
 139  0
     return oocEngine.getMetaPartitionManager().getNumPartitions();
 140  
   }
 141  
 
 142  
   @Override
 143  
   public long getPartitionVertexCount(Integer partitionId) {
 144  0
     if (partitionStore.hasPartition(partitionId)) {
 145  0
       return partitionStore.getPartitionVertexCount(partitionId);
 146  
     } else {
 147  0
       return partitionVertexCount.get(partitionId);
 148  
     }
 149  
   }
 150  
 
 151  
   @Override
 152  
   public long getPartitionEdgeCount(Integer partitionId) {
 153  0
     if (partitionStore.hasPartition(partitionId)) {
 154  0
       return partitionStore.getPartitionEdgeCount(partitionId);
 155  
     } else {
 156  0
       return partitionEdgeCount.get(partitionId);
 157  
     }
 158  
   }
 159  
 
 160  
   @Override
 161  
   public boolean isEmpty() {
 162  0
     return getNumPartitions() == 0;
 163  
   }
 164  
 
 165  
   @Override
 166  
   public void startIteration() {
 167  0
     oocEngine.startIteration();
 168  0
   }
 169  
 
 170  
   @Override
 171  
   public Partition<I, V, E> getNextPartition() {
 172  0
     Integer partitionId = oocEngine.getNextPartition();
 173  0
     if (partitionId == null) {
 174  0
       return null;
 175  
     }
 176  0
     Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
 177  0
     if (partition == null) {
 178  0
       if (LOG.isInfoEnabled()) {
 179  0
         LOG.info("getNextPartition: partition " + partitionId + " is not in " +
 180  
             "the partition store. Creating an empty partition for it.");
 181  
       }
 182  0
       partition = conf.createPartition(partitionId, context);
 183  
     }
 184  0
     partitionStore.addPartition(partition);
 185  0
     return partition;
 186  
   }
 187  
 
 188  
   @Override
 189  
   public void putPartition(Partition<I, V, E> partition) {
 190  0
     oocEngine.doneProcessingPartition(partition.getId());
 191  0
   }
 192  
 
 193  
   @Override
 194  
   public void addPartitionVertices(Integer partitionId,
 195  
                                    ExtendedDataOutput extendedDataOutput) {
 196  0
     addEntry(partitionId, extendedDataOutput);
 197  0
   }
 198  
 
 199  
   @Override
 200  
   public void shutdown() {
 201  0
     oocEngine.shutdown();
 202  0
   }
 203  
 
 204  
   @Override
 205  
   public void initialize() {
 206  0
     oocEngine.initialize();
 207  0
   }
 208  
 
 209  
   /**
 210  
    * Read vertex data from an input and initialize the vertex.
 211  
    *
 212  
    * @param in     The input stream
 213  
    * @param vertex The vertex to initialize
 214  
    * @throws IOException
 215  
    */
 216  
   private void readVertexData(DataInput in, Vertex<I, V, E> vertex)
 217  
       throws IOException {
 218  0
     I id = conf.createVertexId();
 219  0
     id.readFields(in);
 220  0
     V value = null;
 221  0
     boolean hasNullValue = in.readBoolean();
 222  0
     if (!hasNullValue) {
 223  0
       value = conf.createVertexValue();
 224  0
       value.readFields(in);
 225  
     }
 226  0
     OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0);
 227  0
     vertex.initialize(id, value, edges);
 228  0
     if (in.readBoolean()) {
 229  0
       vertex.voteToHalt();
 230  
     } else {
 231  0
       vertex.wakeUp();
 232  
     }
 233  0
   }
 234  
 
 235  
   /**
 236  
    * Read vertex edges from an input and set them to the vertex.
 237  
    *
 238  
    * @param in        The input stream
 239  
    * @param partition The partition owning the vertex
 240  
    * @throws IOException
 241  
    */
 242  
   private void readOutEdges(DataInput in, Partition<I, V, E> partition)
 243  
       throws IOException {
 244  0
     I id = conf.createVertexId();
 245  0
     id.readFields(in);
 246  0
     Vertex<I, V, E> v = partition.getVertex(id);
 247  0
     if (v == null) {
 248  0
       throw new IllegalStateException("Vertex with ID " + id +
 249  0
         " not found in partition " + partition.getId() +
 250  0
         " which has " + partition.getVertexCount() + " vertices and " +
 251  0
         partition.getEdgeCount() + " edges.");
 252  
     }
 253  0
     OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges();
 254  0
     edges.readFields(in);
 255  0
     partition.saveVertex(v);
 256  0
   }
 257  
 
 258  
   @Override
 259  
   protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
 260  
                                            DataIndex index) throws IOException {
 261  0
     long numBytes = 0;
 262  
     // Load vertices
 263  0
     if (hasPartitionDataOnFile.remove(partitionId)) {
 264  0
       Partition<I, V, E> partition = conf.createPartition(partitionId, context);
 265  0
       OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
 266  0
       index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
 267  0
       OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
 268  0
           dataAccessor.prepareInput(ioThreadId, index.copy());
 269  0
       DataInput dataInput = inputWrapper.getDataInput();
 270  0
       long numVertices = dataInput.readLong();
 271  0
       for (long i = 0; i < numVertices; ++i) {
 272  0
         Vertex<I, V, E> vertex = conf.createVertex();
 273  0
         readVertexData(dataInput, vertex);
 274  0
         partition.putVertex(vertex);
 275  
       }
 276  0
       numBytes += inputWrapper.finalizeInput(true);
 277  
 
 278  
       // Load edges
 279  0
       index.removeLastIndex()
 280  0
           .addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
 281  0
       inputWrapper = dataAccessor.prepareInput(ioThreadId, index.copy());
 282  0
       dataInput = inputWrapper.getDataInput();
 283  0
       for (int i = 0; i < numVertices; ++i) {
 284  0
         readOutEdges(dataInput, partition);
 285  
       }
 286  
       // If the graph is static and it is not INPUT_SUPERSTEP, keep the file
 287  
       // around.
 288  0
       boolean shouldDeleteEdges = false;
 289  0
       if (!conf.isStaticGraph() ||
 290  0
           oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
 291  0
         shouldDeleteEdges = true;
 292  
       }
 293  0
       numBytes += inputWrapper.finalizeInput(shouldDeleteEdges);
 294  0
       index.removeLastIndex();
 295  0
       partitionStore.addPartition(partition);
 296  
     }
 297  0
     return numBytes;
 298  
   }
 299  
 
 300  
   @Override
 301  
   protected ExtendedDataOutput readNextEntry(DataInput in) throws IOException {
 302  0
     return WritableUtils.readExtendedDataOutput(in, conf);
 303  
   }
 304  
 
 305  
   @Override
 306  
   protected void addEntryToInMemoryPartitionData(int partitionId,
 307  
                                                  ExtendedDataOutput vertices) {
 308  0
     if (!partitionStore.hasPartition(partitionId)) {
 309  0
       oocEngine.getMetaPartitionManager().addPartition(partitionId);
 310  
     }
 311  0
     partitionStore.addPartitionVertices(partitionId, vertices);
 312  0
   }
 313  
 
 314  
   @Override
 315  
   public long loadPartitionData(int partitionId)
 316  
       throws IOException {
 317  0
     return loadPartitionDataProxy(partitionId,
 318  0
         new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
 319  
   }
 320  
 
 321  
   @Override
 322  
   public long offloadPartitionData(int partitionId)
 323  
       throws IOException {
 324  0
     return offloadPartitionDataProxy(partitionId,
 325  0
         new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
 326  
   }
 327  
 
 328  
   /**
 329  
    * Writes vertex data (Id, value and halted state) to stream.
 330  
    *
 331  
    * @param output The output stream
 332  
    * @param vertex The vertex to serialize
 333  
    * @throws IOException
 334  
    */
 335  
   private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex)
 336  
       throws IOException {
 337  0
     vertex.getId().write(output);
 338  0
     V value = vertex.getValue();
 339  0
     if (value != null) {
 340  0
       output.writeBoolean(false);
 341  0
       value.write(output);
 342  
     } else {
 343  0
       output.writeBoolean(true);
 344  
     }
 345  0
     output.writeBoolean(vertex.isHalted());
 346  0
   }
 347  
 
 348  
   /**
 349  
    * Writes vertex edges (Id, edges) to stream.
 350  
    *
 351  
    * @param output The output stream
 352  
    * @param vertex The vertex to serialize
 353  
    * @throws IOException
 354  
    */
 355  
   private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex)
 356  
       throws IOException {
 357  0
     vertex.getId().write(output);
 358  0
     OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges();
 359  0
     edges.write(output);
 360  0
   }
 361  
 
 362  
   @Override
 363  
   protected long offloadInMemoryPartitionData(
 364  
       int partitionId, int ioThreadId, DataIndex index) throws IOException {
 365  0
     long numBytes = 0;
 366  0
     if (partitionStore.hasPartition(partitionId)) {
 367  0
       OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
 368  0
       partitionVertexCount.put(partitionId,
 369  0
           partitionStore.getPartitionVertexCount(partitionId));
 370  0
       partitionEdgeCount.put(partitionId,
 371  0
           partitionStore.getPartitionEdgeCount(partitionId));
 372  0
       Partition<I, V, E> partition =
 373  0
           partitionStore.removePartition(partitionId);
 374  0
       LOG.debug(
 375  
           "Offloading partition " + partition + " DataIndex[" + index + "]");
 376  0
       index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
 377  0
       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
 378  0
           dataAccessor.prepareOutput(ioThreadId, index.copy(), false);
 379  0
       DataOutput dataOutput = outputWrapper.getDataOutput();
 380  0
       dataOutput.writeLong(partition.getVertexCount());
 381  0
       for (Vertex<I, V, E> vertex : partition) {
 382  0
         writeVertexData(dataOutput, vertex);
 383  0
       }
 384  0
       numBytes += outputWrapper.finalizeOutput();
 385  0
       index.removeLastIndex();
 386  
       // Avoid writing back edges if we have already written them once and
 387  
       // the graph is not changing.
 388  
       // If we are in the input superstep, we need to write the files
 389  
       // at least the first time, even though the graph is static.
 390  0
       index.addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
 391  0
       if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
 392  0
           !conf.isStaticGraph() ||
 393  0
           !dataAccessor.dataExist(ioThreadId, index)) {
 394  0
         outputWrapper = dataAccessor.prepareOutput(ioThreadId, index.copy(),
 395  
             false);
 396  0
         for (Vertex<I, V, E> vertex : partition) {
 397  0
           writeOutEdges(outputWrapper.getDataOutput(), vertex);
 398  0
         }
 399  0
         numBytes += outputWrapper.finalizeOutput();
 400  
       }
 401  0
       index.removeLastIndex();
 402  0
       hasPartitionDataOnFile.add(partitionId);
 403  
     }
 404  0
     return numBytes;
 405  
   }
 406  
 
 407  
   @Override
 408  
   protected void writeEntry(ExtendedDataOutput vertices, DataOutput out)
 409  
       throws IOException {
 410  0
     WritableUtils.writeExtendedDataOutput(vertices, out);
 411  0
   }
 412  
 
 413  
   @Override
 414  
   public long offloadBuffers(int partitionId)
 415  
       throws IOException {
 416  0
     return offloadBuffersProxy(partitionId,
 417  0
         new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
 418  
   }
 419  
 
 420  
   @Override
 421  
   protected int entrySerializedSize(ExtendedDataOutput vertices) {
 422  0
     return vertices.getPos();
 423  
   }
 424  
 }