Coverage Report - org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
PseudoRandomEdgeInputFormat
0%
0/4
N/A
2.444
PseudoRandomEdgeInputFormat$1
N/A
N/A
2.444
PseudoRandomEdgeInputFormat$PseudoRandomEdgeReader
0%
0/58
0%
0/22
2.444
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.formats;
 20  
 
 21  
 import com.google.common.collect.Sets;
 22  
 import java.io.IOException;
 23  
 import java.util.List;
 24  
 import java.util.Random;
 25  
 import java.util.Set;
 26  
 import org.apache.giraph.bsp.BspInputSplit;
 27  
 import org.apache.giraph.edge.Edge;
 28  
 import org.apache.giraph.edge.EdgeFactory;
 29  
 import org.apache.giraph.io.EdgeInputFormat;
 30  
 import org.apache.giraph.io.EdgeReader;
 31  
 import org.apache.hadoop.conf.Configuration;
 32  
 import org.apache.hadoop.io.DoubleWritable;
 33  
 import org.apache.hadoop.io.LongWritable;
 34  
 import org.apache.hadoop.mapreduce.InputSplit;
 35  
 import org.apache.hadoop.mapreduce.JobContext;
 36  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 37  
 import org.apache.log4j.Logger;
 38  
 
 39  
 /**
 40  
  * This {@link EdgeInputFormat} generates pseudo-random edges on the fly.
 41  
  * As with {@link PseudoRandomVertexInputFormat}, the user specifies the
 42  
  * number of vertices and the number of edges per vertex.
 43  
  */
 44  0
 public class PseudoRandomEdgeInputFormat
 45  
     extends EdgeInputFormat<LongWritable, DoubleWritable> {
 46  0
   @Override public void checkInputSpecs(Configuration conf) { }
 47  
 
 48  
   @Override
 49  
   public final List<InputSplit> getSplits(final JobContext context,
 50  
                                           final int minSplitCountHint)
 51  
     throws IOException, InterruptedException {
 52  0
     return PseudoRandomUtils.getSplits(minSplitCountHint);
 53  
   }
 54  
 
 55  
   @Override
 56  
   public EdgeReader<LongWritable, DoubleWritable> createEdgeReader(
 57  
       InputSplit split, TaskAttemptContext context) throws IOException {
 58  0
     return new PseudoRandomEdgeReader();
 59  
   }
 60  
 
 61  
   /**
 62  
    * {@link EdgeReader} that generates pseudo-random edges.
 63  
    */
 64  0
   private static class PseudoRandomEdgeReader
 65  
       extends EdgeReader<LongWritable, DoubleWritable>  {
 66  
     /** Logger. */
 67  0
     private static final Logger LOG =
 68  0
         Logger.getLogger(PseudoRandomEdgeReader.class);
 69  
     /** Starting vertex id. */
 70  0
     private long startingVertexId = -1;
 71  
     /** Vertices read so far. */
 72  0
     private long verticesRead = 0;
 73  
     /** Total vertices to read (on this split alone). */
 74  0
     private long totalSplitVertices = -1;
 75  
     /** Current vertex id. */
 76  0
     private LongWritable currentVertexId = new LongWritable(-1);
 77  
     /** Edges read for the current vertex. */
 78  0
     private int currentOutEdgesRead = 0;
 79  
     /** Target vertices of edges for current vertex. */
 80  0
     private Set<LongWritable> currentVertexDestVertices = Sets.newHashSet();
 81  
     /** Random number generator for the current vertex (for consistency
 82  
      * across runs on different numbers of workers). */
 83  0
     private Random random = new Random();
 84  
     /** Aggregate vertices (all input splits). */
 85  0
     private long aggregateVertices = -1;
 86  
     /** Edges per vertex. */
 87  0
     private int edgesPerVertex = -1;
 88  
     /** BspInputSplit (used only for index). */
 89  
     private BspInputSplit bspInputSplit;
 90  
     /** Helper for generating pseudo-random local edges. */
 91  
     private PseudoRandomLocalEdgesHelper localEdgesHelper;
 92  
 
 93  
     @Override
 94  
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
 95  
       throws IOException, InterruptedException {
 96  0
       aggregateVertices = getConf().getLong(
 97  
               PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
 98  0
       if (aggregateVertices <= 0) {
 99  0
         throw new IllegalArgumentException(
 100  
             PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0");
 101  
       }
 102  0
       if (inputSplit instanceof BspInputSplit) {
 103  0
         bspInputSplit = (BspInputSplit) inputSplit;
 104  0
         long extraVertices =
 105  0
             aggregateVertices % bspInputSplit.getNumSplits();
 106  0
         totalSplitVertices =
 107  0
             aggregateVertices / bspInputSplit.getNumSplits();
 108  0
         if (bspInputSplit.getSplitIndex() < extraVertices) {
 109  0
           ++totalSplitVertices;
 110  
         }
 111  0
         startingVertexId = (bspInputSplit.getSplitIndex() *
 112  0
             (aggregateVertices / bspInputSplit.getNumSplits())) +
 113  0
             Math.min(bspInputSplit.getSplitIndex(),
 114  
                 extraVertices);
 115  0
       } else {
 116  0
         throw new IllegalArgumentException(
 117  0
             "initialize: Got " + inputSplit.getClass() +
 118  
                 " instead of " + BspInputSplit.class);
 119  
       }
 120  0
       edgesPerVertex = getConf().getInt(
 121  
           PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
 122  0
       if (edgesPerVertex <= 0) {
 123  0
         throw new IllegalArgumentException(
 124  
             PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0");
 125  
       }
 126  0
       float minLocalEdgesRatio = getConf().getFloat(
 127  
           PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
 128  
           PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
 129  0
       localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices,
 130  0
           minLocalEdgesRatio, getConf());
 131  0
     }
 132  
 
 133  
     @Override
 134  
     public boolean nextEdge() throws IOException, InterruptedException {
 135  0
       return totalSplitVertices > verticesRead + 1 ||
 136  
           (totalSplitVertices == verticesRead + 1 &&
 137  
               edgesPerVertex > currentOutEdgesRead);
 138  
     }
 139  
 
 140  
     @Override
 141  
     public LongWritable getCurrentSourceId() throws IOException,
 142  
         InterruptedException {
 143  0
       if (currentOutEdgesRead == edgesPerVertex) {
 144  0
         ++verticesRead;
 145  0
         currentVertexId = new LongWritable(-1);
 146  
       }
 147  
 
 148  0
       if (currentVertexId.get() == -1) {
 149  0
         currentVertexId.set(startingVertexId + verticesRead);
 150  0
         currentOutEdgesRead = 0;
 151  
         // Seed on the vertex id to keep the vertex data the same when
 152  
         // on different number of workers, but other parameters are the
 153  
         // same.
 154  0
         random.setSeed(currentVertexId.get());
 155  0
         currentVertexDestVertices.clear();
 156  
       }
 157  0
       return currentVertexId;
 158  
     }
 159  
 
 160  
     @Override
 161  
     public Edge<LongWritable, DoubleWritable> getCurrentEdge()
 162  
       throws IOException, InterruptedException {
 163  0
       LongWritable destVertexId = new LongWritable();
 164  
       do {
 165  0
         destVertexId.set(localEdgesHelper.generateDestVertex(
 166  0
             currentVertexId.get(), random));
 167  0
       } while (currentVertexDestVertices.contains(destVertexId));
 168  0
       ++currentOutEdgesRead;
 169  0
       currentVertexDestVertices.add(destVertexId);
 170  0
       if (LOG.isTraceEnabled()) {
 171  0
         LOG.trace("getCurrentEdge: Return edge (" + currentVertexId + ", " +
 172  
             "" + destVertexId + ")");
 173  
       }
 174  0
       return EdgeFactory.create(
 175  
           destVertexId,
 176  0
           new DoubleWritable(random.nextDouble()));
 177  
     }
 178  
 
 179  
     @Override
 180  0
     public void close() throws IOException { }
 181  
 
 182  
     @Override
 183  
     public float getProgress() throws IOException, InterruptedException {
 184  0
       return (verticesRead * edgesPerVertex + currentOutEdgesRead) *
 185  
           100.0f / (totalSplitVertices * edgesPerVertex);
 186  
     }
 187  
   }
 188  
 }