Coverage Report - org.apache.giraph.partition.ByteArrayPartition
 
Classes in this File Line Coverage Branch Coverage Complexity
ByteArrayPartition
0%
0/126
0%
0/28
2.235
ByteArrayPartition$1
N/A
N/A
2.235
ByteArrayPartition$RepresentativeVertexIterator
0%
0/9
N/A
2.235
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.partition;
 19  
 
 20  
 import com.google.common.collect.MapMaker;
 21  
 import com.google.common.primitives.Ints;
 22  
 import org.apache.giraph.edge.Edge;
 23  
 import org.apache.giraph.graph.Vertex;
 24  
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 25  
 import org.apache.giraph.utils.WritableUtils;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 import org.apache.hadoop.util.Progressable;
 29  
 
 30  
 import javax.annotation.concurrent.NotThreadSafe;
 31  
 import java.io.DataInput;
 32  
 import java.io.DataOutput;
 33  
 import java.io.IOException;
 34  
 import java.util.Iterator;
 35  
 import java.util.Map;
 36  
 import java.util.concurrent.ConcurrentMap;
 37  
 
 38  
 /**
 39  
  * Byte array based partition.  Should reduce the amount of memory used since
 40  
  * the entire graph is compressed into byte arrays.  Must guarantee, however,
 41  
  * that only one thread at a time will call getVertex since it is a singleton.
 42  
  *
 43  
  * @param <I> Vertex index value
 44  
  * @param <V> Vertex value
 45  
  * @param <E> Edge value
 46  
  */
 47  
 @NotThreadSafe
 48  0
 public class ByteArrayPartition<I extends WritableComparable,
 49  
     V extends Writable, E extends Writable>
 50  
     extends BasicPartition<I, V, E>
 51  
     implements ReusesObjectsPartition<I, V, E> {
 52  
   /**
 53  
    * Vertex map for this range (keyed by index).  Note that the byte[] is a
 54  
    * serialized vertex with the first four bytes as the length of the vertex
 55  
    * to read.
 56  
    */
 57  
   private ConcurrentMap<I, byte[]> vertexMap;
 58  
   /** Representative vertex */
 59  
   private Vertex<I, V, E> representativeVertex;
 60  
   /** Representative combiner vertex */
 61  
   private Vertex<I, V, E> representativeCombinerVertex;
 62  
   /** Use unsafe serialization */
 63  
   private boolean useUnsafeSerialization;
 64  
 
 65  
   /**
 66  
    * Constructor for reflection.
 67  
    */
 68  0
   public ByteArrayPartition() { }
 69  
 
 70  
   @Override
 71  
   public void initialize(int partitionId, Progressable progressable) {
 72  0
     super.initialize(partitionId, progressable);
 73  0
     vertexMap = new MapMaker().concurrencyLevel(
 74  0
         getConf().getNettyServerExecutionConcurrency()).makeMap();
 75  0
     representativeVertex = getConf().createVertex();
 76  0
     representativeVertex.initialize(
 77  0
         getConf().createVertexId(),
 78  0
         getConf().createVertexValue(),
 79  0
         getConf().createOutEdges());
 80  0
     representativeCombinerVertex = getConf().createVertex();
 81  0
     representativeCombinerVertex.initialize(
 82  0
         getConf().createVertexId(),
 83  0
         getConf().createVertexValue(),
 84  0
         getConf().createOutEdges());
 85  0
     useUnsafeSerialization = getConf().useUnsafeSerialization();
 86  0
   }
 87  
 
 88  
   @Override
 89  
   public Vertex<I, V, E> getVertex(I vertexIndex) {
 90  0
     byte[] vertexData = vertexMap.get(vertexIndex);
 91  0
     if (vertexData == null) {
 92  0
       return null;
 93  
     }
 94  0
     WritableUtils.reinitializeVertexFromByteArray(
 95  0
         vertexData, representativeVertex, useUnsafeSerialization, getConf());
 96  0
     return representativeVertex;
 97  
   }
 98  
 
 99  
   @Override
 100  
   public Vertex<I, V, E> putVertex(Vertex<I, V, E> vertex) {
 101  0
     byte[] vertexData =
 102  0
         WritableUtils.writeVertexToByteArray(
 103  0
             vertex, useUnsafeSerialization, getConf());
 104  0
     byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
 105  0
     if (oldVertexBytes == null) {
 106  0
       return null;
 107  
     } else {
 108  0
       WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
 109  0
           representativeVertex, useUnsafeSerialization, getConf());
 110  0
       return representativeVertex;
 111  
     }
 112  
   }
 113  
 
 114  
   @Override
 115  
   public Vertex<I, V, E> removeVertex(I vertexIndex) {
 116  0
     byte[] vertexBytes = vertexMap.remove(vertexIndex);
 117  0
     if (vertexBytes == null) {
 118  0
       return null;
 119  
     }
 120  0
     WritableUtils.reinitializeVertexFromByteArray(vertexBytes,
 121  0
         representativeVertex, useUnsafeSerialization, getConf());
 122  0
     return representativeVertex;
 123  
   }
 124  
 
 125  
   @Override
 126  
   public void addPartition(Partition<I, V, E> partition) {
 127  
     // Only work with other ByteArrayPartition instances
 128  0
     if (!(partition instanceof ByteArrayPartition)) {
 129  0
       throw new IllegalStateException("addPartition: Cannot add partition " +
 130  0
           "of type " + partition.getClass());
 131  
     }
 132  
 
 133  0
     ByteArrayPartition<I, V, E> byteArrayPartition =
 134  
         (ByteArrayPartition<I, V, E>) partition;
 135  
     for (Map.Entry<I, byte[]> entry :
 136  0
         byteArrayPartition.vertexMap.entrySet()) {
 137  
 
 138  0
       byte[] oldVertexBytes =
 139  0
           vertexMap.putIfAbsent(entry.getKey(), entry.getValue());
 140  0
       if (oldVertexBytes == null) {
 141  0
         continue;
 142  
       }
 143  
 
 144  
       // Note that vertex combining is going to be expensive compared to
 145  
       // SimplePartition since here we have to deserialize the vertices,
 146  
       // combine them, and then reserialize them.  If the vertex doesn't exist,
 147  
       // just add the new vertex as a byte[]
 148  0
       synchronized (this) {
 149  
         // Combine the vertex values
 150  0
         WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
 151  0
             representativeVertex, useUnsafeSerialization, getConf());
 152  0
         WritableUtils.reinitializeVertexFromByteArray(entry.getValue(),
 153  0
             representativeCombinerVertex, useUnsafeSerialization, getConf());
 154  0
         combine(representativeVertex, representativeCombinerVertex);
 155  0
       }
 156  0
     }
 157  0
   }
 158  
 
 159  
   @Override
 160  
   public synchronized boolean putOrCombine(Vertex<I, V, E> vertex) {
 161  
     // Optimistically try to first put and then combine if this fails
 162  0
     byte[] vertexData =
 163  0
         WritableUtils.writeVertexToByteArray(
 164  0
             vertex, useUnsafeSerialization, getConf());
 165  0
     byte[] oldVertexBytes = vertexMap.putIfAbsent(vertex.getId(), vertexData);
 166  0
     if (oldVertexBytes == null) {
 167  0
       return true;
 168  
     }
 169  
 
 170  0
     WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
 171  0
         representativeVertex, useUnsafeSerialization, getConf());
 172  0
     combine(representativeVertex, vertex);
 173  0
     return false;
 174  
   }
 175  
 
 176  
   /**
 177  
    * Combine two vertices together and store the serialized bytes
 178  
    * in the vertex map.
 179  
    *
 180  
    * @param representativeVertex existing vertex
 181  
    * @param representativeCombinerVertex new vertex to combine
 182  
    */
 183  
   private void combine(Vertex<I, V, E> representativeVertex,
 184  
       Vertex<I, V, E> representativeCombinerVertex) {
 185  0
     getVertexValueCombiner().combine(representativeVertex.getValue(),
 186  0
         representativeCombinerVertex.getValue());
 187  
     // Add the edges to the representative vertex
 188  0
     for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) {
 189  0
       representativeVertex.addEdge(edge);
 190  0
     }
 191  0
     vertexMap.put(representativeCombinerVertex.getId(),
 192  0
         WritableUtils.writeVertexToByteArray(
 193  0
             representativeVertex, useUnsafeSerialization, getConf()));
 194  0
   }
 195  
 
 196  
   @Override
 197  
   public long getVertexCount() {
 198  0
     return vertexMap.size();
 199  
   }
 200  
 
 201  
   @Override
 202  
   public long getEdgeCount() {
 203  0
     long edges = 0;
 204  0
     for (byte[] vertexBytes : vertexMap.values()) {
 205  0
       WritableUtils.reinitializeVertexFromByteArray(vertexBytes,
 206  0
           representativeVertex, useUnsafeSerialization, getConf());
 207  0
       edges += representativeVertex.getNumEdges();
 208  0
     }
 209  0
     return edges;
 210  
   }
 211  
 
 212  
   @Override
 213  
   public void saveVertex(Vertex<I, V, E> vertex) {
 214  
     // Reuse the old buffer whenever possible
 215  0
     byte[] oldVertexData = vertexMap.get(vertex.getId());
 216  0
     if (oldVertexData != null) {
 217  0
       vertexMap.put(vertex.getId(),
 218  0
           WritableUtils.writeVertexToByteArray(
 219  0
               vertex, oldVertexData, useUnsafeSerialization, getConf()));
 220  
     } else {
 221  0
       vertexMap.put(vertex.getId(),
 222  0
           WritableUtils.writeVertexToByteArray(
 223  0
               vertex, useUnsafeSerialization, getConf()));
 224  
     }
 225  0
   }
 226  
 
 227  
   @Override
 228  
   public void write(DataOutput output) throws IOException {
 229  0
     super.write(output);
 230  0
     output.writeInt(vertexMap.size());
 231  0
     for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
 232  0
       progress();
 233  0
       entry.getKey().write(output);
 234  
       // Note here that we are writing the size of the vertex data first
 235  
       // as it is encoded in the first four bytes of the byte[]
 236  
       int vertexDataSize;
 237  0
       if (useUnsafeSerialization) {
 238  0
         vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(),
 239  
             0);
 240  
       } else {
 241  0
         vertexDataSize = Ints.fromByteArray(entry.getValue());
 242  
       }
 243  
 
 244  0
       output.writeInt(vertexDataSize);
 245  0
       output.write(entry.getValue(), 0, vertexDataSize);
 246  0
     }
 247  0
   }
 248  
 
 249  
   @Override
 250  
   public void readFields(DataInput input) throws IOException {
 251  0
     super.readFields(input);
 252  0
     int size = input.readInt();
 253  0
     vertexMap = new MapMaker().concurrencyLevel(
 254  0
         getConf().getNettyServerExecutionConcurrency()).initialCapacity(
 255  0
         size).makeMap();
 256  0
     representativeVertex = getConf().createVertex();
 257  0
     representativeVertex.initialize(
 258  0
         getConf().createVertexId(),
 259  0
         getConf().createVertexValue(),
 260  0
         getConf().createOutEdges());
 261  0
     useUnsafeSerialization = getConf().useUnsafeSerialization();
 262  0
     for (int i = 0; i < size; ++i) {
 263  0
       progress();
 264  0
       I vertexId = getConf().createVertexId();
 265  0
       vertexId.readFields(input);
 266  0
       int vertexDataSize = input.readInt();
 267  0
       byte[] vertexData = new byte[vertexDataSize];
 268  0
       input.readFully(vertexData);
 269  0
       if (vertexMap.put(vertexId, vertexData) != null) {
 270  0
         throw new IllegalStateException("readFields: Already saw vertex " +
 271  
             vertexId);
 272  
       }
 273  
     }
 274  0
   }
 275  
 
 276  
   @Override
 277  
   public Iterator<Vertex<I, V, E>> iterator() {
 278  0
     return new RepresentativeVertexIterator();
 279  
   }
 280  
 
 281  
   /**
 282  
    * Iterator that deserializes a vertex from a byte array on the fly, using
 283  
    * the same representative vertex object.
 284  
    */
 285  0
   private class RepresentativeVertexIterator implements
 286  
       Iterator<Vertex<I, V, E>> {
 287  
     /** Iterator to the vertex values */
 288  0
     private Iterator<byte[]> vertexDataIterator =
 289  0
         vertexMap.values().iterator();
 290  
 
 291  
     @Override
 292  
     public boolean hasNext() {
 293  0
       return vertexDataIterator.hasNext();
 294  
     }
 295  
 
 296  
     @Override
 297  
     public Vertex<I, V, E> next() {
 298  0
       WritableUtils.reinitializeVertexFromByteArray(
 299  0
           vertexDataIterator.next(), representativeVertex,
 300  0
           useUnsafeSerialization, getConf());
 301  0
       return representativeVertex;
 302  
     }
 303  
 
 304  
     @Override
 305  
     public void remove() {
 306  0
       throw new IllegalAccessError("remove: This method is not supported.");
 307  
     }
 308  
   }
 309  
 }