Coverage Report - org.apache.giraph.ooc.data.DiskBackedEdgeStore
 
Classes in this File Line Coverage Branch Coverage Complexity
DiskBackedEdgeStore
0%
0/46
0%
0/4
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc.data;
 20  
 
 21  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 22  
 import org.apache.giraph.edge.EdgeStore;
 23  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 24  
 import org.apache.giraph.ooc.persistence.DataIndex;
 25  
 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 26  
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 27  
 import org.apache.giraph.utils.VertexIdEdges;
 28  
 import org.apache.hadoop.io.Writable;
 29  
 import org.apache.hadoop.io.WritableComparable;
 30  
 import org.apache.log4j.Logger;
 31  
 
 32  
 import java.io.DataInput;
 33  
 import java.io.DataOutput;
 34  
 import java.io.IOException;
 35  
 
 36  
 /**
 37  
  * Implementation of an edge-store used for out-of-core mechanism.
 38  
  *
 39  
  * @param <I> Vertex id
 40  
  * @param <V> Vertex data
 41  
  * @param <E> Edge data
 42  
  */
 43  0
 public class DiskBackedEdgeStore<I extends WritableComparable,
 44  
     V extends Writable, E extends Writable>
 45  
     extends DiskBackedDataStore<VertexIdEdges<I, E>>
 46  
     implements EdgeStore<I, V, E> {
 47  
   /** Class logger. */
 48  0
   private static final Logger LOG = Logger.getLogger(DiskBackedEdgeStore.class);
 49  
   /** In-memory message store */
 50  
   private final EdgeStore<I, V, E> edgeStore;
 51  
   /** Configuration */
 52  
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
 53  
 
 54  
   /**
 55  
    * Constructor
 56  
    *
 57  
    * @param edgeStore In-memory edge store for which out-of-core edge store
 58  
    *                  would be a wrapper
 59  
    * @param conf Configuration
 60  
    * @param oocEngine Out-of-core engine
 61  
    */
 62  
   public DiskBackedEdgeStore(
 63  
       EdgeStore<I, V, E> edgeStore,
 64  
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
 65  
       OutOfCoreEngine oocEngine) {
 66  0
     super(conf, oocEngine);
 67  0
     this.edgeStore = edgeStore;
 68  0
     this.conf = conf;
 69  0
   }
 70  
 
 71  
   @Override
 72  
   public void addPartitionEdges(int partitionId, VertexIdEdges<I, E> edges) {
 73  0
     addEntry(partitionId, edges);
 74  0
   }
 75  
 
 76  
   @Override
 77  
   public void moveEdgesToVertices() {
 78  0
     edgeStore.moveEdgesToVertices();
 79  0
   }
 80  
 
 81  
   @Override
 82  
   public void writePartitionEdgeStore(int partitionId, DataOutput output)
 83  
       throws IOException {
 84  
     // This method is only called (should only be called) on in-memory edge
 85  
     // stores
 86  0
     throw new IllegalStateException("writePartitionEdgeStore: this method " +
 87  
         "should not be called for DiskBackedEdgeStore!");
 88  
   }
 89  
 
 90  
   @Override
 91  
   public void readPartitionEdgeStore(int partitionId, DataInput input)
 92  
       throws IOException {
 93  
     // This method is only called (should only be called) on in-memory edge
 94  
     // stores
 95  0
     throw new IllegalStateException("readPartitionEdgeStore: this method " +
 96  
         "should not be called for DiskBackedEdgeStore!");
 97  
   }
 98  
 
 99  
   @Override
 100  
   public boolean hasEdgesForPartition(int partitionId) {
 101  
     // This method is only called (should only be called) on in-memory edge
 102  
     // stores
 103  0
     throw new IllegalStateException("hasEdgesForPartition: this method " +
 104  
         "should not be called for DiskBackedEdgeStore!");
 105  
   }
 106  
 
 107  
   @Override
 108  
   public long loadPartitionData(int partitionId)
 109  
       throws IOException {
 110  0
     return loadPartitionDataProxy(partitionId,
 111  0
         new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
 112  
   }
 113  
 
 114  
   @Override
 115  
   public long offloadPartitionData(int partitionId)
 116  
       throws IOException {
 117  0
     return offloadPartitionDataProxy(partitionId,
 118  0
         new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
 119  
   }
 120  
 
 121  
   @Override
 122  
   public long offloadBuffers(int partitionId)
 123  
       throws IOException {
 124  0
     return offloadBuffersProxy(partitionId,
 125  0
         new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
 126  
   }
 127  
 
 128  
   @Override
 129  
   protected void writeEntry(VertexIdEdges<I, E> edges, DataOutput out)
 130  
       throws IOException {
 131  0
     edges.write(out);
 132  0
   }
 133  
 
 134  
   @Override
 135  
   protected VertexIdEdges<I, E> readNextEntry(DataInput in) throws IOException {
 136  0
     VertexIdEdges<I, E> vertexIdEdges = new ByteArrayVertexIdEdges<>();
 137  0
     vertexIdEdges.setConf(conf);
 138  0
     vertexIdEdges.readFields(in);
 139  0
     return vertexIdEdges;
 140  
   }
 141  
 
 142  
   @Override
 143  
   protected long loadInMemoryPartitionData(
 144  
       int partitionId, int ioThreadId, DataIndex index) throws IOException {
 145  0
     long numBytes = 0;
 146  0
     if (hasPartitionDataOnFile.remove(partitionId)) {
 147  0
       OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
 148  0
           oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
 149  0
       edgeStore.readPartitionEdgeStore(partitionId,
 150  0
           inputWrapper.getDataInput());
 151  0
       numBytes = inputWrapper.finalizeInput(true);
 152  
     }
 153  0
     return numBytes;
 154  
   }
 155  
 
 156  
   @Override
 157  
   protected long offloadInMemoryPartitionData(
 158  
       int partitionId, int ioThreadId, DataIndex index) throws IOException {
 159  0
     long numBytes = 0;
 160  0
     if (edgeStore.hasEdgesForPartition(partitionId)) {
 161  0
       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
 162  0
           oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
 163  
               false);
 164  0
       edgeStore.writePartitionEdgeStore(partitionId,
 165  0
           outputWrapper.getDataOutput());
 166  0
       numBytes = outputWrapper.finalizeOutput();
 167  0
       hasPartitionDataOnFile.add(partitionId);
 168  
     }
 169  0
     return numBytes;
 170  
   }
 171  
 
 172  
   @Override
 173  
   protected int entrySerializedSize(VertexIdEdges<I, E> edges) {
 174  0
     return edges.getSerializedSize();
 175  
   }
 176  
 
 177  
   @Override
 178  
   protected void addEntryToInMemoryPartitionData(int partitionId,
 179  
                                                  VertexIdEdges<I, E> edges) {
 180  0
     oocEngine.getMetaPartitionManager().addPartition(partitionId);
 181  0
     edgeStore.addPartitionEdges(partitionId, edges);
 182  0
   }
 183  
 }