Coverage Report - org.apache.giraph.io.formats.WattsStrogatzVertexInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
WattsStrogatzVertexInputFormat
0%
0/4
N/A
1.909
WattsStrogatzVertexInputFormat$WattsStrogatzVertexReader
0%
0/61
0%
0/22
1.909
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.io.formats;
 19  
 
 20  
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 21  
 import it.unimi.dsi.fastutil.longs.LongSet;
 22  
 
 23  
 import java.io.IOException;
 24  
 import java.util.List;
 25  
 import java.util.Random;
 26  
 
 27  
 import org.apache.giraph.bsp.BspInputSplit;
 28  
 import org.apache.giraph.edge.Edge;
 29  
 import org.apache.giraph.edge.OutEdges;
 30  
 import org.apache.giraph.edge.ReusableEdge;
 31  
 import org.apache.giraph.graph.Vertex;
 32  
 import org.apache.giraph.io.VertexInputFormat;
 33  
 import org.apache.giraph.io.VertexReader;
 34  
 import org.apache.hadoop.conf.Configuration;
 35  
 import org.apache.hadoop.io.DoubleWritable;
 36  
 import org.apache.hadoop.io.LongWritable;
 37  
 import org.apache.hadoop.mapreduce.InputSplit;
 38  
 import org.apache.hadoop.mapreduce.JobContext;
 39  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 40  
 
 41  
 /**
 42  
  * Generates a random Watts-Strogatz graph by re-wiring a ring lattice.
 43  
  * The resulting graph is a random graph with high clustering coefficient
 44  
  * and low average path length. The graph has these two characteristics that
 45  
  * are typical of small-world scale-free graphs, however the degree
 46  
  * distribution is more similar to a random graph.
 47  
  * It supports a seed for pseudo-random generation.
 48  
  */
 49  0
 public class WattsStrogatzVertexInputFormat extends
 50  
   VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
 51  
   /** The number of vertices in the graph */
 52  
   private static final String AGGREGATE_VERTICES =
 53  
       "wattsStrogatz.aggregateVertices";
 54  
   /** The number of outgoing edges per vertex */
 55  
   private static final String EDGES_PER_VERTEX =
 56  
       "wattsStrogatz.edgesPerVertex";
 57  
   /** The probability to re-wire an outgoing edge from the ring lattice */
 58  
   private static final String BETA =
 59  
       "wattsStrogatz.beta";
 60  
   /** The seed to generate random values for pseudo-randomness */
 61  
   private static final String SEED =
 62  
       "wattsStrogatz.seed";
 63  
 
 64  
   @Override
 65  0
   public void checkInputSpecs(Configuration conf) { }
 66  
 
 67  
   @Override
 68  
   public final List<InputSplit> getSplits(final JobContext context,
 69  
       final int minSplitCountHint) throws IOException, InterruptedException {
 70  0
     return PseudoRandomUtils.getSplits(minSplitCountHint);
 71  
   }
 72  
 
 73  
   @Override
 74  
   public VertexReader<LongWritable, DoubleWritable, DoubleWritable>
 75  
   createVertexReader(InputSplit split,
 76  
       TaskAttemptContext context) throws IOException {
 77  0
     return new WattsStrogatzVertexReader();
 78  
   }
 79  
 
 80  
   /**
 81  
    * Vertex reader used to generate the graph
 82  
    */
 83  
   private static class WattsStrogatzVertexReader extends
 84  
     VertexReader<LongWritable, DoubleWritable, DoubleWritable> {
 85  
     /** the re-wiring probability */
 86  0
     private float beta = 0;
 87  
     /** The total number of vertices */
 88  0
     private long aggregateVertices = 0;
 89  
     /** The starting vertex id for this split */
 90  0
     private long startingVertexId = -1;
 91  
     /** The number of vertices read so far */
 92  0
     private long verticesRead = 0;
 93  
     /** The total number of vertices in the split */
 94  0
     private long totalSplitVertices = -1;
 95  
     /** the total number of outgoing edges per vertex */
 96  0
     private int edgesPerVertex = -1;
 97  
     /** The target ids of the outgoing edges */
 98  0
     private final LongSet destVertices = new LongOpenHashSet();
 99  
     /** The random values generator */
 100  
     private Random rnd;
 101  
     /** The reusable edge */
 102  0
     private ReusableEdge<LongWritable, DoubleWritable> reusableEdge = null;
 103  
 
 104  
     /**
 105  
      * Default constructor
 106  
      */
 107  0
     public WattsStrogatzVertexReader() { }
 108  
 
 109  
     @Override
 110  
     public void initialize(InputSplit inputSplit,
 111  
         TaskAttemptContext context) throws IOException {
 112  0
       beta = getConf().getFloat(
 113  
           BETA, 0.0f);
 114  0
       aggregateVertices = getConf().getLong(
 115  
           AGGREGATE_VERTICES, 0);
 116  0
       BspInputSplit bspInputSplit = (BspInputSplit) inputSplit;
 117  0
       long extraVertices = aggregateVertices % bspInputSplit.getNumSplits();
 118  0
       totalSplitVertices = aggregateVertices / bspInputSplit.getNumSplits();
 119  0
       if (bspInputSplit.getSplitIndex() < extraVertices) {
 120  0
         ++totalSplitVertices;
 121  
       }
 122  0
       startingVertexId = bspInputSplit.getSplitIndex() *
 123  0
           (aggregateVertices / bspInputSplit.getNumSplits()) +
 124  0
           Math.min(bspInputSplit.getSplitIndex(), extraVertices);
 125  0
       edgesPerVertex = getConf().getInt(
 126  
           EDGES_PER_VERTEX, 0);
 127  0
       if (getConf().reuseEdgeObjects()) {
 128  0
         reusableEdge = getConf().createReusableEdge();
 129  
       }
 130  0
       int seed = getConf().getInt(SEED, -1);
 131  0
       if (seed != -1) {
 132  0
         rnd = new Random(seed);
 133  
       } else {
 134  0
         rnd = new Random();
 135  
       }
 136  0
     }
 137  
 
 138  
     @Override
 139  
     public boolean nextVertex() throws IOException, InterruptedException {
 140  0
       return totalSplitVertices > verticesRead;
 141  
     }
 142  
 
 143  
     /**
 144  
      * Return a long value uniformly distributed between 0 (inclusive) and n.
 145  
      *
 146  
      * @param n the upper bound for the random long value
 147  
      * @return the random value
 148  
      */
 149  
     private long nextLong(long n) {
 150  
       long bits;
 151  
       long val;
 152  
       do {
 153  0
         bits = (rnd.nextLong() << 1) >>> 1;
 154  0
         val = bits % n;
 155  0
       } while (bits - val + (n - 1) < 0L);
 156  0
       return val;
 157  
     }
 158  
 
 159  
     /**
 160  
      * Get a destination id that is not already in the neighborhood and
 161  
      * that is not the vertex itself (no self-loops). For the second condition
 162  
      * it expects destVertices to contain the own id already.
 163  
      *
 164  
      * @return the destination vertex id
 165  
      */
 166  
     private long getRandomDestination() {
 167  
       long randomId;
 168  
       do {
 169  0
         randomId = nextLong(aggregateVertices);
 170  0
       } while (!destVertices.add(randomId));
 171  0
       return randomId;
 172  
     }
 173  
 
 174  
     @Override
 175  
     public Vertex<LongWritable, DoubleWritable, DoubleWritable>
 176  
     getCurrentVertex() throws IOException, InterruptedException {
 177  0
       Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
 178  0
           getConf().createVertex();
 179  0
       long vertexId = startingVertexId + verticesRead;
 180  0
       OutEdges<LongWritable, DoubleWritable> edges =
 181  0
           getConf().createOutEdges();
 182  0
       edges.initialize(edgesPerVertex);
 183  0
       destVertices.clear();
 184  0
       destVertices.add(vertexId);
 185  0
       long destVertexId = vertexId - edgesPerVertex / 2;
 186  0
       if (destVertexId < 0) {
 187  0
         destVertexId = aggregateVertices + destVertexId;
 188  
       }
 189  0
       for (int i = 0; i < edgesPerVertex + 1; ++i) {
 190  0
         if (destVertexId != vertexId) {
 191  0
           Edge<LongWritable, DoubleWritable> edge =
 192  0
               (reusableEdge == null) ? getConf().createEdge() : reusableEdge;
 193  0
           edge.getTargetVertexId().set(
 194  0
               rnd.nextFloat() < beta ? getRandomDestination() : destVertexId);
 195  0
           edge.getValue().set(rnd.nextDouble());
 196  0
           edges.add(edge);
 197  
         }
 198  0
         destVertexId = (destVertexId + 1) % aggregateVertices;
 199  
       }
 200  0
       vertex.initialize(new LongWritable(vertexId),
 201  0
           new DoubleWritable(rnd.nextDouble()), edges);
 202  0
       ++verticesRead;
 203  0
       return vertex;
 204  
     }
 205  
 
 206  
     @Override
 207  0
     public void close() throws IOException { }
 208  
 
 209  
     @Override
 210  
     public float getProgress() throws IOException {
 211  0
       return verticesRead * 100.0f / totalSplitVertices;
 212  
     }
 213  
   }
 214  
 }