Coverage Report - org.apache.giraph.comm.SendMutationsCache
 
Classes in this File Line Coverage Branch Coverage Complexity
SendMutationsCache
0%
0/44
0%
0/6
1.375
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.comm;
 19  
 
 20  
 import org.apache.giraph.edge.Edge;
 21  
 import org.apache.giraph.graph.Vertex;
 22  
 import org.apache.giraph.graph.VertexMutations;
 23  
 import org.apache.hadoop.io.Writable;
 24  
 import org.apache.hadoop.io.WritableComparable;
 25  
 
 26  
 import java.util.HashMap;
 27  
 import java.util.Map;
 28  
 
 29  
 /**
 30  
  * Aggregates the mutations to be sent to partitions so they can be sent in
 31  
  * bulk. Not thread-safe.
 32  
  *
 33  
  * @param <I> Vertex id
 34  
  * @param <V> Vertex data
 35  
  * @param <E> Edge data
 36  
  */
 37  
 @SuppressWarnings("rawtypes")
 38  0
 public class SendMutationsCache<I extends WritableComparable,
 39  
     V extends Writable, E extends Writable> {
 40  
   /** Internal cache */
 41  0
   private Map<Integer, Map<I, VertexMutations<I, V, E>>> mutationCache =
 42  
       new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
 43  
   /** Number of mutations in each partition */
 44  0
   private final Map<Integer, Integer> mutationCountMap =
 45  
       new HashMap<Integer, Integer>();
 46  
 
 47  
   /**
 48  
    * Get the mutations for a partition and destination vertex (creating if
 49  
    * it doesn't exist).
 50  
    *
 51  
    * @param partitionId Partition id
 52  
    * @param destVertexId Destination vertex id
 53  
    * @return Mutations for the vertex
 54  
    */
 55  
   private VertexMutations<I, V, E> getVertexMutations(
 56  
       Integer partitionId, I destVertexId) {
 57  0
     Map<I, VertexMutations<I, V, E>> idMutations =
 58  0
         mutationCache.get(partitionId);
 59  0
     if (idMutations == null) {
 60  0
       idMutations = new HashMap<I, VertexMutations<I, V, E>>();
 61  0
       mutationCache.put(partitionId, idMutations);
 62  
     }
 63  0
     VertexMutations<I, V, E> mutations = idMutations.get(destVertexId);
 64  0
     if (mutations == null) {
 65  0
       mutations = new VertexMutations<I, V, E>();
 66  0
       idMutations.put(destVertexId, mutations);
 67  
     }
 68  0
     return mutations;
 69  
   }
 70  
 
 71  
   /**
 72  
    * Increment the number of mutations in a partition.
 73  
    *
 74  
    * @param partitionId Partition id
 75  
    * @return Number of mutations in a partition after the increment
 76  
    */
 77  
   private int incrementPartitionMutationCount(int partitionId) {
 78  0
     Integer currentPartitionMutationCount = mutationCountMap.get(partitionId);
 79  0
     if (currentPartitionMutationCount == null) {
 80  0
       currentPartitionMutationCount = 0;
 81  
     }
 82  0
     Integer updatedPartitionMutationCount =
 83  0
         currentPartitionMutationCount + 1;
 84  0
     mutationCountMap.put(partitionId, updatedPartitionMutationCount);
 85  0
     return updatedPartitionMutationCount;
 86  
   }
 87  
 
 88  
   /**
 89  
    * Add an add edge mutation to the cache.
 90  
    *
 91  
    * @param partitionId Partition id
 92  
    * @param destVertexId Destination vertex id
 93  
    * @param edge Edge to be added
 94  
    * @return Number of mutations in the partition.
 95  
    */
 96  
   public int addEdgeMutation(
 97  
       Integer partitionId, I destVertexId, Edge<I, E> edge) {
 98  
     // Get the mutations for this partition
 99  0
     VertexMutations<I, V, E> mutations =
 100  0
         getVertexMutations(partitionId, destVertexId);
 101  
 
 102  
     // Add the edge
 103  0
     mutations.addEdge(edge);
 104  
 
 105  
     // Update the number of mutations per partition
 106  0
     return incrementPartitionMutationCount(partitionId);
 107  
   }
 108  
 
 109  
   /**
 110  
    * Add a remove edge mutation to the cache.
 111  
    *
 112  
    * @param partitionId Partition id
 113  
    * @param vertexIndex Destination vertex id
 114  
    * @param destinationVertexIndex Edge vertex index to be removed
 115  
    * @return Number of mutations in the partition.
 116  
    */
 117  
   public int removeEdgeMutation(
 118  
       Integer partitionId, I vertexIndex, I destinationVertexIndex) {
 119  
     // Get the mutations for this partition
 120  0
     VertexMutations<I, V, E> mutations =
 121  0
         getVertexMutations(partitionId, vertexIndex);
 122  
 
 123  
     // Remove the edge
 124  0
     mutations.removeEdge(destinationVertexIndex);
 125  
 
 126  
     // Update the number of mutations per partition
 127  0
     return incrementPartitionMutationCount(partitionId);
 128  
   }
 129  
 
 130  
   /**
 131  
    * Add a add vertex mutation to the cache.
 132  
    *
 133  
    * @param partitionId Partition id
 134  
    * @param vertex Vertex to be added
 135  
    * @return Number of mutations in the partition.
 136  
    */
 137  
   public int addVertexMutation(
 138  
       Integer partitionId, Vertex<I, V, E> vertex) {
 139  
     // Get the mutations for this partition
 140  0
     VertexMutations<I, V, E> mutations =
 141  0
         getVertexMutations(partitionId, vertex.getId());
 142  
 
 143  
     // Add the vertex
 144  0
     mutations.addVertex(vertex);
 145  
 
 146  
     // Update the number of mutations per partition
 147  0
     return incrementPartitionMutationCount(partitionId);
 148  
   }
 149  
 
 150  
   /**
 151  
    * Add a remove vertex mutation to the cache.
 152  
    *
 153  
    * @param partitionId Partition id
 154  
    * @param destVertexId Vertex index to be removed
 155  
    * @return Number of mutations in the partition.
 156  
    */
 157  
   public int removeVertexMutation(
 158  
       Integer partitionId, I destVertexId) {
 159  
     // Get the mutations for this partition
 160  0
     VertexMutations<I, V, E> mutations =
 161  0
         getVertexMutations(partitionId, destVertexId);
 162  
 
 163  
     // Remove the vertex
 164  0
     mutations.removeVertex();
 165  
 
 166  
     // Update the number of mutations per partition
 167  0
     return incrementPartitionMutationCount(partitionId);
 168  
   }
 169  
 
 170  
   /**
 171  
    * Gets the mutations for a partition and removes it from the cache.
 172  
    *
 173  
    * @param partitionId Partition id
 174  
    * @return Removed partition mutations
 175  
    */
 176  
   public Map<I, VertexMutations<I, V, E>> removePartitionMutations(
 177  
       int partitionId) {
 178  0
     Map<I, VertexMutations<I, V, E>> idMutations =
 179  0
         mutationCache.remove(partitionId);
 180  0
     mutationCountMap.put(partitionId, 0);
 181  0
     return idMutations;
 182  
   }
 183  
 
 184  
   /**
 185  
    * Gets all the mutations and removes them from the cache.
 186  
    *
 187  
    * @return All vertex mutations for all partitions
 188  
    */
 189  
   public Map<Integer, Map<I, VertexMutations<I, V, E>>>
 190  
   removeAllPartitionMutations() {
 191  0
     Map<Integer, Map<I, VertexMutations<I, V, E>>> allMutations =
 192  
         mutationCache;
 193  0
     mutationCache =
 194  
         new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
 195  0
     mutationCountMap.clear();
 196  0
     return allMutations;
 197  
   }
 198  
 }