Coverage Report - org.apache.giraph.benchmark.AggregatorsBenchmark
 
Classes in this File Line Coverage Branch Coverage Complexity
AggregatorsBenchmark
0%
0/21
0%
0/4
1.769
AggregatorsBenchmark$AggregatorsBenchmarkComputation
0%
0/16
0%
0/4
1.769
AggregatorsBenchmark$AggregatorsBenchmarkMasterCompute
0%
0/18
0%
0/6
1.769
AggregatorsBenchmark$AggregatorsBenchmarkWorkerContext
0%
0/24
0%
0/4
1.769
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.benchmark;
 20  
 
 21  
 import java.io.IOException;
 22  
 import java.util.Set;
 23  
 
 24  
 import org.apache.commons.cli.CommandLine;
 25  
 import org.apache.giraph.aggregators.LongSumAggregator;
 26  
 import org.apache.giraph.conf.GiraphConfiguration;
 27  
 import org.apache.giraph.conf.GiraphConstants;
 28  
 import org.apache.giraph.graph.BasicComputation;
 29  
 import org.apache.giraph.graph.Vertex;
 30  
 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
 31  
 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
 32  
 import org.apache.giraph.master.DefaultMasterCompute;
 33  
 import org.apache.giraph.utils.MasterLoggingAggregator;
 34  
 import org.apache.giraph.worker.DefaultWorkerContext;
 35  
 import org.apache.hadoop.conf.Configuration;
 36  
 import org.apache.hadoop.io.DoubleWritable;
 37  
 import org.apache.hadoop.io.LongWritable;
 38  
 import org.apache.hadoop.util.ToolRunner;
 39  
 
 40  
 import com.google.common.collect.Sets;
 41  
 
 42  
 /**
 43  
  * Benchmark for aggregators. Also checks the correctness.
 44  
  */
 45  0
 public class AggregatorsBenchmark extends GiraphBenchmark {
 46  
   /** Number of aggregators setting */
 47  
   private static final String AGGREGATORS_NUM = "aggregatorsbenchmark.num";
 48  
 
 49  
   /** Option for number of aggregators */
 50  0
   private static final BenchmarkOption AGGREGATORS =
 51  
       new BenchmarkOption("a", "aggregators",
 52  
           true, "Aggregators", "Need to set number of aggregators (-a)");
 53  
 
 54  
   /**
 55  
    * Vertex class for AggregatorsBenchmark
 56  
    */
 57  0
   public static class AggregatorsBenchmarkComputation extends
 58  
       BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
 59  
           DoubleWritable> {
 60  
     @Override
 61  
     public void compute(
 62  
         Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
 63  
         Iterable<DoubleWritable> messages) throws IOException {
 64  0
       int n = getNumAggregators(getConf());
 65  0
       long superstep = getSuperstep();
 66  0
       int w = getWorkerContextAggregated(getConf(), superstep);
 67  0
       for (int i = 0; i < n; i++) {
 68  0
         aggregate("w" + i, new LongWritable((superstep + 1) * i));
 69  0
         aggregate("p" + i, new LongWritable(i));
 70  
 
 71  0
         assertEquals(superstep * (getTotalNumVertices() * i) + w,
 72  0
             ((LongWritable) getAggregatedValue("w" + i)).get());
 73  0
         assertEquals(-(superstep * i),
 74  0
             ((LongWritable) getAggregatedValue("m" + i)).get());
 75  0
         assertEquals(superstep * getTotalNumVertices() * i,
 76  0
             ((LongWritable) getAggregatedValue("p" + i)).get());
 77  
       }
 78  0
       if (superstep > 2) {
 79  0
         vertex.voteToHalt();
 80  
       }
 81  0
     }
 82  
   }
 83  
 
 84  
   /**
 85  
    * MasterCompute class for AggregatorsBenchmark
 86  
    */
 87  0
   public static class AggregatorsBenchmarkMasterCompute extends
 88  
       DefaultMasterCompute {
 89  
     @Override
 90  
     public void initialize() throws InstantiationException,
 91  
         IllegalAccessException {
 92  0
       int n = getNumAggregators(getConf());
 93  0
       for (int i = 0; i < n; i++) {
 94  0
         registerAggregator("w" + i, LongSumAggregator.class);
 95  0
         registerAggregator("m" + i, LongSumAggregator.class);
 96  0
         registerPersistentAggregator("p" + i, LongSumAggregator.class);
 97  
       }
 98  0
     }
 99  
 
 100  
     @Override
 101  
     public void compute() {
 102  0
       int n = getNumAggregators(getConf());
 103  0
       long superstep = getSuperstep();
 104  0
       int w = getWorkerContextAggregated(getConf(), superstep);
 105  0
       for (int i = 0; i < n; i++) {
 106  0
         setAggregatedValue("m" + i, new LongWritable(-superstep * i));
 107  
 
 108  0
         if (superstep > 0) {
 109  0
           assertEquals(superstep * (getTotalNumVertices() * i) + w,
 110  0
               ((LongWritable) getAggregatedValue("w" + i)).get());
 111  0
           assertEquals(superstep * getTotalNumVertices() * i,
 112  0
               ((LongWritable) getAggregatedValue("p" + i)).get());
 113  
         }
 114  
       }
 115  0
     }
 116  
   }
 117  
 
 118  
   /**
 119  
    * WorkerContext class for AggregatorsBenchmark
 120  
    */
 121  0
   public static class AggregatorsBenchmarkWorkerContext
 122  
       extends DefaultWorkerContext {
 123  
     @Override
 124  
     public void preSuperstep() {
 125  0
       addToWorkerAggregators(1);
 126  0
       checkAggregators();
 127  0
       MasterLoggingAggregator.aggregate("everything fine", this, getConf());
 128  0
     }
 129  
 
 130  
     @Override
 131  
     public void postSuperstep() {
 132  0
       addToWorkerAggregators(2);
 133  0
       checkAggregators();
 134  0
     }
 135  
 
 136  
     /**
 137  
      * Check if aggregator values are correct for current superstep
 138  
      */
 139  
     private void checkAggregators() {
 140  0
       int n = getNumAggregators(getContext().getConfiguration());
 141  0
       long superstep = getSuperstep();
 142  0
       int w = getWorkerContextAggregated(
 143  0
           getContext().getConfiguration(), superstep);
 144  0
       for (int i = 0; i < n; i++) {
 145  0
         assertEquals(superstep * (getTotalNumVertices() * i) + w,
 146  0
             ((LongWritable) getAggregatedValue("w" + i)).get());
 147  0
         assertEquals(-(superstep * i),
 148  0
             ((LongWritable) getAggregatedValue("m" + i)).get());
 149  0
         assertEquals(superstep * getTotalNumVertices() * i,
 150  0
             ((LongWritable) getAggregatedValue("p" + i)).get());
 151  
       }
 152  0
     }
 153  
 
 154  
     /**
 155  
      * Add some value to worker aggregators.
 156  
      *
 157  
      * @param valueToAdd Which value to add
 158  
      */
 159  
     private void addToWorkerAggregators(int valueToAdd) {
 160  0
       int n = getNumAggregators(getContext().getConfiguration());
 161  0
       for (int i = 0; i < n; i++) {
 162  0
         aggregate("w" + i, new LongWritable(valueToAdd));
 163  
       }
 164  0
     }
 165  
   }
 166  
 
 167  
   /**
 168  
    * Get the number of aggregators from configuration
 169  
    *
 170  
    * @param conf Configuration
 171  
    * @return Number of aggregators
 172  
    */
 173  
   private static int getNumAggregators(Configuration conf) {
 174  0
     return conf.getInt(AGGREGATORS_NUM, 0);
 175  
   }
 176  
 
 177  
   /**
 178  
    * Get the value which should be aggreagted by worker context
 179  
    *
 180  
    * @param conf Configuration
 181  
    * @param superstep Superstep
 182  
    * @return The value which should be aggregated by worker context
 183  
    */
 184  
   private static int getWorkerContextAggregated(Configuration conf,
 185  
       long superstep) {
 186  0
     return (superstep <= 0) ? 0 : conf.getInt("workers", 0) * 3;
 187  
   }
 188  
 
 189  
   /**
 190  
    * Check if values are equal, throw an exception if they aren't
 191  
    *
 192  
    * @param expected Expected value
 193  
    * @param actual Actual value
 194  
    */
 195  
   private static void assertEquals(long expected, long actual) {
 196  0
     if (expected != actual) {
 197  0
       throw new RuntimeException("expected: " + expected +
 198  
           ", actual: " + actual);
 199  
     }
 200  0
   }
 201  
 
 202  
   @Override
 203  
   public Set<BenchmarkOption> getBenchmarkOptions() {
 204  0
     return Sets.newHashSet(BenchmarkOption.VERTICES, AGGREGATORS);
 205  
   }
 206  
 
 207  
   @Override
 208  
   protected void prepareConfiguration(GiraphConfiguration conf,
 209  
       CommandLine cmd) {
 210  0
     conf.setComputationClass(AggregatorsBenchmarkComputation.class);
 211  0
     conf.setMasterComputeClass(AggregatorsBenchmarkMasterCompute.class);
 212  0
     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
 213  0
     conf.setWorkerContextClass(AggregatorsBenchmarkWorkerContext.class);
 214  0
     conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
 215  0
         BenchmarkOption.VERTICES.getOptionLongValue(cmd));
 216  0
     conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1);
 217  0
     conf.setInt(AGGREGATORS_NUM, AGGREGATORS.getOptionIntValue(cmd));
 218  0
     conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1));
 219  0
     MasterLoggingAggregator.setUseMasterLoggingAggregator(true, conf);
 220  0
   }
 221  
 
 222  
   /**
 223  
    * Execute the benchmark.
 224  
    *
 225  
    * @param args Typically the command line arguments.
 226  
    * @throws Exception Any exception from the computation.
 227  
    */
 228  
   public static void main(final String[] args) throws Exception {
 229  0
     System.exit(ToolRunner.run(new AggregatorsBenchmark(), args));
 230  0
   }
 231  
 }