Coverage Report - org.apache.giraph.graph.GiraphTransferRegulator
 
Classes in this File Line Coverage Branch Coverage Complexity
GiraphTransferRegulator
0%
0/36
0%
0/8
1.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.graph;
 19  
 
 20  
 import org.apache.hadoop.conf.Configuration;
 21  
 import org.apache.hadoop.io.Writable;
 22  
 import org.apache.hadoop.io.WritableComparable;
 23  
 import org.apache.giraph.partition.PartitionOwner;
 24  
 import com.google.common.collect.Maps;
 25  
 import java.util.Map;
 26  
 
 27  
 
 28  
 /** Utility class to manage data transfers from
 29  
  * a local worker reading InputSplits.
 30  
  * Currently, this class measures # of vertices and edges
 31  
  * per outgoing Collection of graph data (destined for a
 32  
  * particular Partition and remote worker node, preselected
 33  
  * by the master.)
 34  
  *
 35  
  * TODO: implement defaults and configurable options for
 36  
  * measuring the size of input <V> or <E> data
 37  
  * per read vertex, and setting limits on totals per outgoing
 38  
  * graph data Collection etc. (See GIRAPH-260)
 39  
  */
 40  
 public class GiraphTransferRegulator {
 41  
   /** Maximum vertices to read from an InputSplit locally that are
 42  
    * to be routed to a remote worker, before sending them. */
 43  
   public static final String MAX_VERTICES_PER_TRANSFER =
 44  
     "giraph.maxVerticesPerTransfer";
 45  
   /** Default maximum number of vertices per
 46  
    * temp partition before sending. */
 47  
   public static final int MAX_VERTICES_PER_TRANSFER_DEFAULT = 10000;
 48  
   /**
 49  
    * Maximum edges to read from an InputSplit locally that are
 50  
    * to be routed to a remote worker, before sending them.
 51  
    */
 52  
   public static final String MAX_EDGES_PER_TRANSFER =
 53  
     "giraph.maxEdgesPerTransfer";
 54  
   /** Default maximum number of vertices per
 55  
    * temp partition before sending. */
 56  
   public static final int MAX_EDGES_PER_TRANSFER_DEFAULT = 80000;
 57  
 
 58  
   /** Internal state to measure when
 59  
    * the next data transfer of a Collection
 60  
    * of vertices read by the local worker that
 61  
    * owns this regulator is ready to be sent
 62  
    * to the remote worker node that the master
 63  
    * has assigned the vertices to */
 64  
   private Map<Integer, Integer> edgeAccumulator;
 65  
 
 66  
   /** Internal state to measure when
 67  
    * the next data transfer of a Collection
 68  
    * of vertices read by the local worker that
 69  
    * owns this regulator is ready to be sent
 70  
    * to the remote worker node that the master
 71  
    * has assigned the vertices to */
 72  
   private Map<Integer, Integer> vertexAccumulator;
 73  
 
 74  
   /** Number of vertices per data transfer */
 75  
   private final int maxVerticesPerTransfer;
 76  
 
 77  
   /** Number of edges per data transfer */
 78  
   private final int maxEdgesPerTransfer;
 79  
 
 80  
   /** Vertex count total for this InputSplit */
 81  
   private long totalVertexCount;
 82  
 
 83  
   /** Edge count total for this InputSplit */
 84  
   private long totalEdgeCount;
 85  
 
 86  
   /** Default constructor
 87  
    * @param conf the Configuration for this job
 88  
    */
 89  0
   public GiraphTransferRegulator(Configuration conf) {
 90  0
     vertexAccumulator = Maps.<Integer, Integer>newHashMap();
 91  0
     edgeAccumulator = Maps.<Integer, Integer>newHashMap();
 92  0
     maxVerticesPerTransfer = conf.getInt(
 93  
         MAX_VERTICES_PER_TRANSFER,
 94  
         MAX_VERTICES_PER_TRANSFER_DEFAULT);
 95  0
     maxEdgesPerTransfer = conf.getInt(
 96  
         MAX_EDGES_PER_TRANSFER,
 97  
         MAX_EDGES_PER_TRANSFER_DEFAULT);
 98  0
     totalEdgeCount = 0;
 99  0
     totalVertexCount = 0;
 100  0
   }
 101  
 
 102  
   /** Is this outbound data Collection full,
 103  
    * and ready to transfer?
 104  
    * @param owner the partition owner for the outbound data
 105  
    * @return 'true' if the temp partition data is ready to transfer
 106  
    */
 107  
   public boolean transferThisPartition(PartitionOwner owner) {
 108  0
     final int partitionId = owner.getPartitionId();
 109  0
     if (getEdgesForPartition(partitionId) >=
 110  
       maxEdgesPerTransfer ||
 111  0
       getVerticesForPartition(partitionId) >=
 112  
       maxVerticesPerTransfer) {
 113  0
       vertexAccumulator.put(partitionId, 0);
 114  0
       edgeAccumulator.put(partitionId, 0);
 115  0
       return true;
 116  
     }
 117  0
     return false;
 118  
   }
 119  
 
 120  
   /** get current vertex count for a given Collection of
 121  
    * data soon to be transfered to its permanent home.
 122  
    * @param partId the partition id to check the count on.
 123  
    * @return the count of vertices.
 124  
    */
 125  
   private int getVerticesForPartition(final int partId) {
 126  0
     return vertexAccumulator.get(partId) == null ?
 127  0
       0 : vertexAccumulator.get(partId);
 128  
   }
 129  
 
 130  
   /** get current edge count for a given Collection of
 131  
    * data soon to be transfered to its permanent home.
 132  
    * @param partId the partition id to check the count on.
 133  
    * @return the count of edges.
 134  
    */
 135  
   private int getEdgesForPartition(final int partId) {
 136  0
     return edgeAccumulator.get(partId) == null ?
 137  0
       0 : edgeAccumulator.get(partId);
 138  
   }
 139  
 
 140  
   /** Clear storage to reset for reading new InputSplit */
 141  
   public void clearCounters() {
 142  0
     totalEdgeCount = 0;
 143  0
     totalVertexCount = 0;
 144  0
     vertexAccumulator.clear();
 145  0
     edgeAccumulator.clear();
 146  0
   }
 147  
 
 148  
   /** Increment V &amp; E counts for new vertex read, store values
 149  
    * for that outgoing _temporary_ Partition, which shares the
 150  
    * Partition ID for the actual remote Partition the collection
 151  
    * will eventually be processed in.
 152  
    * @param partitionOwner the owner of the Partition this data
 153  
    *  will eventually belong to.
 154  
    * @param vertex the vertex to extract counts from.
 155  
    * @param <I> the vertex id type.
 156  
    * @param <V> the vertex value type.
 157  
    * @param <E> the edge value type.
 158  
    */
 159  
   public <I extends WritableComparable, V extends Writable,
 160  
   E extends Writable> void
 161  
   incrementCounters(PartitionOwner partitionOwner,
 162  
     Vertex<I, V, E> vertex) {
 163  0
     final int id = partitionOwner.getPartitionId();
 164  
     // vertex counts
 165  0
     vertexAccumulator
 166  0
       .put(id, getVerticesForPartition(id) + 1);
 167  0
     totalVertexCount++;
 168  
     // edge counts
 169  0
     totalEdgeCount += vertex.getNumEdges();
 170  0
     edgeAccumulator.put(id, getEdgesForPartition(id) +
 171  0
       vertex.getNumEdges());
 172  0
   }
 173  
 
 174  
   /** Getter for MAX edge count to initiate a transfer
 175  
     * @return max edge count per transfer */
 176  
   public long getMaxEdgesPerTransfer() {
 177  0
     return maxEdgesPerTransfer;
 178  
   }
 179  
 
 180  
   /** Getter for MAX vertex count to initiate a transfer
 181  
    * @return max edge count per transfer */
 182  
   public long getMaxVerticesPerTransfer() {
 183  0
     return maxVerticesPerTransfer;
 184  
   }
 185  
 
 186  
   /** Getter for total edge count for the current InputSplit
 187  
     * @return the # of total edges counted in this InputSplit */
 188  
   public long getTotalEdges() {
 189  0
     return totalEdgeCount;
 190  
   }
 191  
 
 192  
   /** Getter for total vetex count for the current InputSplit
 193  
    * @return the total # of vertices counted in this InputSplit */
 194  
   public long getTotalVertices() {
 195  0
     return totalVertexCount;
 196  
   }
 197  
 }
 198