Coverage Report - org.apache.giraph.partition.SimplePartition
 
Classes in this File Line Coverage Branch Coverage Complexity
SimplePartition
0%
0/51
0%
0/16
1.714
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.partition;
 20  
 
 21  
 import java.io.DataInput;
 22  
 import java.io.DataOutput;
 23  
 import java.io.IOException;
 24  
 import java.util.Iterator;
 25  
 import java.util.concurrent.ConcurrentMap;
 26  
 
 27  
 import javax.annotation.concurrent.ThreadSafe;
 28  
 
 29  
 import org.apache.giraph.edge.Edge;
 30  
 import org.apache.giraph.graph.Vertex;
 31  
 import org.apache.giraph.utils.WritableUtils;
 32  
 import org.apache.hadoop.io.Writable;
 33  
 import org.apache.hadoop.io.WritableComparable;
 34  
 import org.apache.hadoop.util.Progressable;
 35  
 
 36  
 import com.google.common.collect.Maps;
 37  
 
 38  
 /**
 39  
  * A simple map-based container that stores vertices.  Vertex ids will map to
 40  
  * exactly one partition.
 41  
  *
 42  
  * @param <I> Vertex id
 43  
  * @param <V> Vertex data
 44  
  * @param <E> Edge data
 45  
  */
 46  
 @ThreadSafe
 47  
 @SuppressWarnings("rawtypes")
 48  
 public class SimplePartition<I extends WritableComparable,
 49  
     V extends Writable, E extends Writable>
 50  
     extends BasicPartition<I, V, E> {
 51  
   /** Vertex map for this range (keyed by index) */
 52  
   private ConcurrentMap<I, Vertex<I, V, E>> vertexMap;
 53  
 
 54  
   /**
 55  
    * Constructor for reflection.
 56  
    */
 57  0
   public SimplePartition() { }
 58  
 
 59  
   @Override
 60  
   public void initialize(int partitionId, Progressable progressable) {
 61  0
     super.initialize(partitionId, progressable);
 62  0
     vertexMap = Maps.newConcurrentMap();
 63  0
   }
 64  
 
 65  
   @Override
 66  
   public Vertex<I, V, E> getVertex(I vertexIndex) {
 67  0
     return vertexMap.get(vertexIndex);
 68  
   }
 69  
 
 70  
   @Override
 71  
   public Vertex<I, V, E> putVertex(Vertex<I, V, E> vertex) {
 72  0
     return vertexMap.put(vertex.getId(), vertex);
 73  
   }
 74  
 
 75  
   @Override
 76  
   public Vertex<I, V, E> removeVertex(I vertexIndex) {
 77  0
     return vertexMap.remove(vertexIndex);
 78  
   }
 79  
 
 80  
   @Override
 81  
   public boolean putOrCombine(Vertex<I, V, E> vertex) {
 82  0
     Vertex<I, V, E> originalVertex = vertexMap.get(vertex.getId());
 83  0
     if (originalVertex == null) {
 84  0
       originalVertex =
 85  0
           vertexMap.putIfAbsent(vertex.getId(), vertex);
 86  0
       if (originalVertex == null) {
 87  0
         return true;
 88  
       }
 89  
     }
 90  
 
 91  0
     synchronized (originalVertex) {
 92  
       // Combine the vertex values
 93  0
       getVertexValueCombiner().combine(
 94  0
           originalVertex.getValue(), vertex.getValue());
 95  
 
 96  
       // Add the edges to the representative vertex
 97  0
       for (Edge<I, E> edge : vertex.getEdges()) {
 98  0
         originalVertex.addEdge(edge);
 99  0
       }
 100  0
     }
 101  
 
 102  0
     return false;
 103  
   }
 104  
 
 105  
   @Override
 106  
   public void addPartition(Partition<I, V, E> partition) {
 107  0
     for (Vertex<I, V, E> vertex : partition) {
 108  0
       putOrCombine(vertex);
 109  0
     }
 110  0
   }
 111  
 
 112  
   @Override
 113  
   public long getVertexCount() {
 114  0
     return vertexMap.size();
 115  
   }
 116  
 
 117  
   @Override
 118  
   public long getEdgeCount() {
 119  0
     long edges = 0;
 120  0
     for (Vertex<I, V, E> vertex : vertexMap.values()) {
 121  0
       edges += vertex.getNumEdges();
 122  0
     }
 123  0
     return edges;
 124  
   }
 125  
 
 126  
   @Override
 127  
   public void saveVertex(Vertex<I, V, E> vertex) {
 128  
     // No-op, vertices are stored as Java objects in this partition
 129  0
   }
 130  
 
 131  
   @Override
 132  
   public String toString() {
 133  0
     return "(id=" + getId() + ",V=" + vertexMap.size() + ")";
 134  
   }
 135  
 
 136  
   @Override
 137  
   public void readFields(DataInput input) throws IOException {
 138  0
     super.readFields(input);
 139  0
     vertexMap = Maps.newConcurrentMap();
 140  0
     int vertices = input.readInt();
 141  0
     for (int i = 0; i < vertices; ++i) {
 142  0
       progress();
 143  0
       Vertex<I, V, E> vertex =
 144  0
           WritableUtils.readVertexFromDataInput(input, getConf());
 145  0
       if (vertexMap.put(vertex.getId(), vertex) != null) {
 146  0
         throw new IllegalStateException(
 147  
             "readFields: " + this +
 148  
             " already has same id " + vertex);
 149  
       }
 150  
     }
 151  0
   }
 152  
 
 153  
   @Override
 154  
   public void write(DataOutput output) throws IOException {
 155  0
     super.write(output);
 156  0
     output.writeInt(vertexMap.size());
 157  0
     for (Vertex<I, V, E> vertex : vertexMap.values()) {
 158  0
       progress();
 159  0
       WritableUtils.writeVertexToDataOutput(output, vertex, getConf());
 160  0
     }
 161  0
   }
 162  
 
 163  
   @Override
 164  
   public Iterator<Vertex<I, V, E>> iterator() {
 165  0
     return vertexMap.values().iterator();
 166  
   }
 167  
 }