Coverage Report - org.apache.giraph.benchmark.ReducersBenchmark
 
Classes in this File Line Coverage Branch Coverage Complexity
ReducersBenchmark
0%
0/20
0%
0/4
1.688
ReducersBenchmark$ReducersBenchmarkComputation
0%
0/17
0%
0/6
1.688
ReducersBenchmark$ReducersBenchmarkMasterCompute
0%
0/22
0%
0/4
1.688
ReducersBenchmark$ReducersBenchmarkWorkerContext
0%
0/24
0%
0/6
1.688
ReducersBenchmark$TestLongSumReducer
0%
0/7
N/A
1.688
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.benchmark;
 20  
 
 21  
 import java.io.DataInput;
 22  
 import java.io.DataOutput;
 23  
 import java.io.IOException;
 24  
 import java.util.Set;
 25  
 
 26  
 import org.apache.commons.cli.CommandLine;
 27  
 import org.apache.giraph.conf.GiraphConfiguration;
 28  
 import org.apache.giraph.conf.GiraphConstants;
 29  
 import org.apache.giraph.graph.BasicComputation;
 30  
 import org.apache.giraph.graph.Vertex;
 31  
 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
 32  
 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
 33  
 import org.apache.giraph.master.DefaultMasterCompute;
 34  
 import org.apache.giraph.reducers.ReduceSameTypeOperation;
 35  
 import org.apache.giraph.worker.DefaultWorkerContext;
 36  
 import org.apache.hadoop.conf.Configuration;
 37  
 import org.apache.hadoop.io.DoubleWritable;
 38  
 import org.apache.hadoop.io.LongWritable;
 39  
 import org.apache.hadoop.util.ToolRunner;
 40  
 
 41  
 import com.google.common.collect.Sets;
 42  
 
 43  
 /**
 44  
  * Benchmark for reducers. Also checks the correctness.
 45  
  */
 46  0
 public class ReducersBenchmark extends GiraphBenchmark {
 47  
   /** Number of reducers setting */
 48  
   private static final String REDUCERS_NUM = "reducersbenchmark.num";
 49  
 
 50  
   /** Option for number of reducers */
 51  0
   private static final BenchmarkOption REDUCERS =
 52  
       new BenchmarkOption("r", "reducers",
 53  
           true, "Reducers", "Need to set number of reducers (-r)");
 54  
 
 55  
   /** LongSumReducer */
 56  0
   public static class TestLongSumReducer
 57  
       extends ReduceSameTypeOperation<LongWritable> {
 58  
     /** Singleton */
 59  0
     public static final TestLongSumReducer INSTANCE = new TestLongSumReducer();
 60  
 
 61  
     @Override
 62  
     public LongWritable createInitialValue() {
 63  0
       return new LongWritable();
 64  
     }
 65  
 
 66  
     @Override
 67  
     public LongWritable reduce(
 68  
         LongWritable curValue, LongWritable valueToReduce) {
 69  0
       curValue.set(curValue.get() + valueToReduce.get());
 70  0
       return curValue;
 71  
     }
 72  
 
 73  
     @Override
 74  
     public void readFields(DataInput in) throws IOException {
 75  0
     }
 76  
 
 77  
     @Override
 78  
     public void write(DataOutput out) throws IOException {
 79  0
     }
 80  
   }
 81  
 
 82  
   /**
 83  
    * Vertex class for ReducersBenchmark
 84  
    */
 85  0
   public static class ReducersBenchmarkComputation extends
 86  
       BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
 87  
           DoubleWritable> {
 88  
     @Override
 89  
     public void compute(
 90  
         Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
 91  
         Iterable<DoubleWritable> messages) throws IOException {
 92  0
       int n = getNumReducers(getConf());
 93  0
       long superstep = getSuperstep();
 94  0
       int w = getWorkerContextReduced(getConf(), superstep);
 95  0
       for (int i = 0; i < n; i++) {
 96  0
         reduce("w" + i, new LongWritable((superstep + 1) * i));
 97  0
         reduce("p" + i, new LongWritable(i));
 98  
 
 99  0
         if (superstep > 0) {
 100  0
           assertEquals(superstep * (getTotalNumVertices() * i) + w,
 101  0
               ((LongWritable) getBroadcast("w" + i)).get());
 102  0
           assertEquals(-(superstep * i),
 103  0
               ((LongWritable) getBroadcast("m" + i)).get());
 104  0
           assertEquals(superstep * getTotalNumVertices() * i,
 105  0
               ((LongWritable) getBroadcast("p" + i)).get());
 106  
         }
 107  
       }
 108  0
       if (superstep > 2) {
 109  0
         vertex.voteToHalt();
 110  
       }
 111  0
     }
 112  
   }
 113  
 
 114  
   /**
 115  
    * MasterCompute class for ReducersBenchmark
 116  
    */
 117  0
   public static class ReducersBenchmarkMasterCompute extends
 118  
       DefaultMasterCompute {
 119  
     @Override
 120  
     public void compute() {
 121  0
       int n = getNumReducers(getConf());
 122  0
       long superstep = getSuperstep();
 123  0
       int w = getWorkerContextReduced(getConf(), superstep);
 124  0
       for (int i = 0; i < n; i++) {
 125  0
         String wi = "w" + i;
 126  0
         String mi = "m" + i;
 127  0
         String pi = "p" + i;
 128  
 
 129  0
         registerReducer(wi, TestLongSumReducer.INSTANCE);
 130  0
         registerReducer(mi, new TestLongSumReducer());
 131  
 
 132  0
         if (superstep > 0) {
 133  0
           broadcast(wi, getReduced(wi));
 134  0
           broadcast(mi, new LongWritable(-superstep * i));
 135  0
           broadcast(pi, getReduced(pi));
 136  
 
 137  0
           registerReducer(pi, new TestLongSumReducer(),
 138  0
               (LongWritable) getReduced(pi));
 139  
 
 140  0
           assertEquals(superstep * (getTotalNumVertices() * i) + w,
 141  0
               ((LongWritable) getReduced(wi)).get());
 142  0
           assertEquals(superstep * getTotalNumVertices() * i,
 143  0
               ((LongWritable) getReduced(pi)).get());
 144  
         } else {
 145  0
           registerReducer(pi, new TestLongSumReducer());
 146  
         }
 147  
       }
 148  0
     }
 149  
   }
 150  
 
 151  
   /**
 152  
    * WorkerContext class for ReducersBenchmark
 153  
    */
 154  0
   public static class ReducersBenchmarkWorkerContext
 155  
       extends DefaultWorkerContext {
 156  
     @Override
 157  
     public void preSuperstep() {
 158  0
       addToWorkerReducers(1);
 159  0
       checkReducers();
 160  0
     }
 161  
 
 162  
     @Override
 163  
     public void postSuperstep() {
 164  0
       addToWorkerReducers(2);
 165  0
       checkReducers();
 166  0
     }
 167  
 
 168  
     /**
 169  
      * Check if reducer values are correct for current superstep
 170  
      */
 171  
     private void checkReducers() {
 172  0
       int n = getNumReducers(getContext().getConfiguration());
 173  0
       long superstep = getSuperstep();
 174  0
       int w = getWorkerContextReduced(
 175  0
           getContext().getConfiguration(), superstep);
 176  0
       for (int i = 0; i < n; i++) {
 177  0
         if (superstep > 0) {
 178  0
           assertEquals(superstep * (getTotalNumVertices() * i) + w,
 179  0
               ((LongWritable) getBroadcast("w" + i)).get());
 180  0
           assertEquals(-(superstep * i),
 181  0
               ((LongWritable) getBroadcast("m" + i)).get());
 182  0
           assertEquals(superstep * getTotalNumVertices() * i,
 183  0
               ((LongWritable) getBroadcast("p" + i)).get());
 184  
         }
 185  
       }
 186  0
     }
 187  
 
 188  
     /**
 189  
      * Add some value to worker reducers.
 190  
      *
 191  
      * @param valueToAdd Which value to add
 192  
      */
 193  
     private void addToWorkerReducers(int valueToAdd) {
 194  0
       int n = getNumReducers(getContext().getConfiguration());
 195  0
       for (int i = 0; i < n; i++) {
 196  0
         reduce("w" + i, new LongWritable(valueToAdd));
 197  
       }
 198  0
     }
 199  
   }
 200  
 
 201  
   /**
 202  
    * Get the number of reducers from configuration
 203  
    *
 204  
    * @param conf Configuration
 205  
    * @return Number of reducers
 206  
    */
 207  
   private static int getNumReducers(Configuration conf) {
 208  0
     return conf.getInt(REDUCERS_NUM, 0);
 209  
   }
 210  
 
 211  
   /**
 212  
    * Get the value which should be reduced by worker context
 213  
    *
 214  
    * @param conf Configuration
 215  
    * @param superstep Superstep
 216  
    * @return The value which should be reduced by worker context
 217  
    */
 218  
   private static int getWorkerContextReduced(Configuration conf,
 219  
       long superstep) {
 220  0
     return (superstep <= 0) ? 0 : conf.getInt("workers", 0) * 3;
 221  
   }
 222  
 
 223  
   /**
 224  
    * Check if values are equal, throw an exception if they aren't
 225  
    *
 226  
    * @param expected Expected value
 227  
    * @param actual Actual value
 228  
    */
 229  
   private static void assertEquals(long expected, long actual) {
 230  0
     if (expected != actual) {
 231  0
       throw new RuntimeException("expected: " + expected +
 232  
           ", actual: " + actual);
 233  
     }
 234  0
   }
 235  
 
 236  
   @Override
 237  
   public Set<BenchmarkOption> getBenchmarkOptions() {
 238  0
     return Sets.newHashSet(BenchmarkOption.VERTICES, REDUCERS);
 239  
   }
 240  
 
 241  
   @Override
 242  
   protected void prepareConfiguration(GiraphConfiguration conf,
 243  
       CommandLine cmd) {
 244  0
     conf.setComputationClass(ReducersBenchmarkComputation.class);
 245  0
     conf.setMasterComputeClass(ReducersBenchmarkMasterCompute.class);
 246  0
     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
 247  0
     conf.setWorkerContextClass(ReducersBenchmarkWorkerContext.class);
 248  0
     conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
 249  0
         BenchmarkOption.VERTICES.getOptionLongValue(cmd));
 250  0
     conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1);
 251  0
     conf.setInt(REDUCERS_NUM, REDUCERS.getOptionIntValue(cmd));
 252  0
     conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1));
 253  0
   }
 254  
 
 255  
   /**
 256  
    * Execute the benchmark.
 257  
    *
 258  
    * @param args Typically the command line arguments.
 259  
    * @throws Exception Any exception from the computation.
 260  
    */
 261  
   public static void main(final String[] args) throws Exception {
 262  0
     System.exit(ToolRunner.run(new ReducersBenchmark(), args));
 263  0
   }
 264  
 }