Coverage Report - org.apache.giraph.benchmark.RandomMessageBenchmark
 
Classes in this File Line Coverage Branch Coverage Complexity
RandomMessageBenchmark
0%
0/27
0%
0/2
1.385
RandomMessageBenchmark$RandomMessageBenchmarkMasterCompute
0%
0/6
N/A
1.385
RandomMessageBenchmark$RandomMessageBenchmarkWorkerContext
0%
0/65
0%
0/4
1.385
RandomMessageBenchmark$RandomMessageComputation
0%
0/14
0%
0/4
1.385
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.benchmark;
 20  
 
 21  
 import org.apache.commons.cli.CommandLine;
 22  
 import org.apache.giraph.aggregators.LongSumAggregator;
 23  
 import org.apache.giraph.graph.BasicComputation;
 24  
 import org.apache.giraph.conf.GiraphConfiguration;
 25  
 import org.apache.giraph.conf.GiraphConstants;
 26  
 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
 27  
 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
 28  
 import org.apache.giraph.master.DefaultMasterCompute;
 29  
 import org.apache.giraph.graph.Vertex;
 30  
 import org.apache.giraph.worker.WorkerContext;
 31  
 import org.apache.hadoop.io.BytesWritable;
 32  
 import org.apache.hadoop.io.DoubleWritable;
 33  
 import org.apache.hadoop.io.LongWritable;
 34  
 import org.apache.hadoop.util.ToolRunner;
 35  
 import org.apache.log4j.Logger;
 36  
 
 37  
 import com.google.common.collect.Sets;
 38  
 
 39  
 import java.io.IOException;
 40  
 import java.util.Random;
 41  
 import java.util.Set;
 42  
 
 43  
 /**
 44  
  * Random Message Benchmark for evaluating the messaging performance.
 45  
  */
 46  0
 public class RandomMessageBenchmark extends GiraphBenchmark {
 47  
   /** How many supersteps to run */
 48  
   public static final String SUPERSTEP_COUNT =
 49  
       "giraph.randomMessageBenchmark.superstepCount";
 50  
   /** How many bytes per message */
 51  
   public static final String NUM_BYTES_PER_MESSAGE =
 52  
       "giraph.randomMessageBenchmark.numBytesPerMessage";
 53  
   /** Default bytes per message */
 54  
   public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
 55  
   /** How many messages per edge */
 56  
   public static final String NUM_MESSAGES_PER_EDGE =
 57  
       "giraph.randomMessageBenchmark.numMessagesPerEdge";
 58  
   /** Default messages per edge */
 59  
   public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
 60  
   /** All bytes sent during this superstep */
 61  
   public static final String AGG_SUPERSTEP_TOTAL_BYTES =
 62  
       "superstep total bytes sent";
 63  
   /** All bytes sent during this application */
 64  
   public static final String AGG_TOTAL_BYTES = "total bytes sent";
 65  
   /** All messages during this superstep */
 66  
   public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
 67  
       "superstep total messages";
 68  
   /** All messages during this application */
 69  
   public static final String AGG_TOTAL_MESSAGES = "total messages";
 70  
   /** All millis during this superstep */
 71  
   public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
 72  
       "superstep total millis";
 73  
   /** All millis during this application */
 74  
   public static final String AGG_TOTAL_MILLIS = "total millis";
 75  
   /** Workers for that superstep */
 76  
   public static final String WORKERS_NUM = "workers";
 77  
 
 78  
   /** Option for number of bytes per message */
 79  0
   private static final BenchmarkOption BYTES_PER_MESSAGE = new BenchmarkOption(
 80  
       "b", "bytes", true, "Message bytes per memssage",
 81  
       "Need to set the number of message bytes (-b)");
 82  
   /** Option for number of messages per edge */
 83  0
   private static final BenchmarkOption MESSAGES_PER_EDGE = new BenchmarkOption(
 84  
       "n", "number", true, "Number of messages per edge",
 85  
       "Need to set the number of messages per edge (-n)");
 86  
   /** Option for number of flush threads */
 87  0
   private static final BenchmarkOption FLUSH_THREADS = new BenchmarkOption(
 88  
       "f", "flusher", true, "Number of flush threads");
 89  
 
 90  
   /** Class logger */
 91  0
   private static final Logger LOG =
 92  0
     Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
 93  
 
 94  
   /**
 95  
    * {@link WorkerContext} forRandomMessageBenchmark.
 96  
    */
 97  0
   public static class RandomMessageBenchmarkWorkerContext extends
 98  
       WorkerContext {
 99  
     /** Class logger */
 100  0
     private static final Logger LOG =
 101  0
       Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
 102  
     /** Bytes to be sent */
 103  
     private byte[] messageBytes;
 104  
     /** Number of messages sent per edge */
 105  0
     private int numMessagesPerEdge = -1;
 106  
     /** Number of supersteps */
 107  0
     private int numSupersteps = -1;
 108  
     /** Random generator for random bytes message */
 109  0
     private final Random random = new Random(System.currentTimeMillis());
 110  
     /** Start superstep millis */
 111  0
     private long startSuperstepMillis = 0;
 112  
     /** Total bytes */
 113  0
     private long totalBytes = 0;
 114  
     /** Total messages */
 115  0
     private long totalMessages = 0;
 116  
     /** Total millis */
 117  0
     private long totalMillis = 0;
 118  
 
 119  
     @Override
 120  
     public void preApplication()
 121  
       throws InstantiationException, IllegalAccessException {
 122  0
       messageBytes =
 123  0
         new byte[getContext().getConfiguration().
 124  0
                  getInt(NUM_BYTES_PER_MESSAGE,
 125  
                  DEFAULT_NUM_BYTES_PER_MESSAGE)];
 126  0
       numMessagesPerEdge =
 127  0
           getContext().getConfiguration().
 128  0
           getInt(NUM_MESSAGES_PER_EDGE,
 129  
               DEFAULT_NUM_MESSAGES_PER_EDGE);
 130  0
       numSupersteps = getContext().getConfiguration().
 131  0
           getInt(SUPERSTEP_COUNT, -1);
 132  0
     }
 133  
 
 134  
     @Override
 135  
     public void preSuperstep() {
 136  0
       long superstepBytes = this.<LongWritable>
 137  0
           getAggregatedValue(AGG_SUPERSTEP_TOTAL_BYTES).get();
 138  0
       long superstepMessages = this.<LongWritable>
 139  0
           getAggregatedValue(AGG_SUPERSTEP_TOTAL_MESSAGES).get();
 140  0
       long superstepMillis = this.<LongWritable>
 141  0
           getAggregatedValue(AGG_SUPERSTEP_TOTAL_MILLIS).get();
 142  0
       long workers = this.<LongWritable>getAggregatedValue(WORKERS_NUM).get();
 143  
 
 144  
       // For timing and tracking the supersteps
 145  
       // - superstep 0 starts the time, but cannot display any stats
 146  
       //   since nothing has been aggregated yet
 147  
       // - supersteps > 0 can display the stats
 148  0
       if (getSuperstep() == 0) {
 149  0
         startSuperstepMillis = System.currentTimeMillis();
 150  
       } else {
 151  0
         totalBytes += superstepBytes;
 152  0
         totalMessages += superstepMessages;
 153  0
         totalMillis += superstepMillis;
 154  0
         double superstepMegabytesPerSecond =
 155  
             superstepBytes * workers * 1000d / 1024d / 1024d / superstepMillis;
 156  0
         double megabytesPerSecond = totalBytes *
 157  
             workers * 1000d / 1024d / 1024d / totalMillis;
 158  0
         double superstepMessagesPerSecond =
 159  
             superstepMessages * workers * 1000d / superstepMillis;
 160  0
         double messagesPerSecond =
 161  
             totalMessages * workers * 1000d / totalMillis;
 162  0
         if (LOG.isInfoEnabled()) {
 163  0
           LOG.info("Outputing statistics for superstep " + getSuperstep());
 164  0
           LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " + superstepBytes);
 165  0
           LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
 166  0
           LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " + superstepMessages);
 167  0
           LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
 168  0
           LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " + superstepMillis);
 169  0
           LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
 170  0
           LOG.info(WORKERS_NUM + " : " + workers);
 171  0
           LOG.info("Superstep megabytes / second = " +
 172  
               superstepMegabytesPerSecond);
 173  0
           LOG.info("Total megabytes / second = " +
 174  
               megabytesPerSecond);
 175  0
           LOG.info("Superstep messages / second = " +
 176  
               superstepMessagesPerSecond);
 177  0
           LOG.info("Total messages / second = " +
 178  
               messagesPerSecond);
 179  0
           LOG.info("Superstep megabytes / second / worker = " +
 180  
               superstepMegabytesPerSecond / workers);
 181  0
           LOG.info("Total megabytes / second / worker = " +
 182  
               megabytesPerSecond / workers);
 183  0
           LOG.info("Superstep messages / second / worker = " +
 184  
               superstepMessagesPerSecond / workers);
 185  0
           LOG.info("Total messages / second / worker = " +
 186  
               messagesPerSecond / workers);
 187  
         }
 188  
       }
 189  
 
 190  0
       aggregate(WORKERS_NUM, new LongWritable(1));
 191  0
     }
 192  
 
 193  
     @Override
 194  
     public void postSuperstep() {
 195  0
       long endSuperstepMillis = System.currentTimeMillis();
 196  0
       long superstepMillis = endSuperstepMillis - startSuperstepMillis;
 197  0
       startSuperstepMillis = endSuperstepMillis;
 198  0
       aggregate(AGG_SUPERSTEP_TOTAL_MILLIS, new LongWritable(superstepMillis));
 199  0
     }
 200  
 
 201  
     @Override
 202  0
     public void postApplication() { }
 203  
 
 204  
     /**
 205  
      * Get the message bytes to be used for sending.
 206  
      *
 207  
      * @return Byte array used for messages.
 208  
      */
 209  
     public byte[] getMessageBytes() {
 210  0
       return messageBytes;
 211  
     }
 212  
 
 213  
     /**
 214  
      * Get the number of edges per message.
 215  
      *
 216  
      * @return Messages per edge.
 217  
      */
 218  
     public int getNumMessagePerEdge() {
 219  0
       return numMessagesPerEdge;
 220  
     }
 221  
 
 222  
     /**
 223  
      * Get the number of supersteps.
 224  
      *
 225  
      * @return Number of supersteps.
 226  
      */
 227  
     public int getNumSupersteps() {
 228  0
       return numSupersteps;
 229  
     }
 230  
 
 231  
     /**
 232  
      * Randomize the message bytes.
 233  
      */
 234  
     public void randomizeMessageBytes() {
 235  0
       random.nextBytes(messageBytes);
 236  0
     }
 237  
   }
 238  
 
 239  
   /**
 240  
    * Master compute associated with {@link RandomMessageBenchmark}.
 241  
    * It registers required aggregators.
 242  
    */
 243  0
   public static class RandomMessageBenchmarkMasterCompute extends
 244  
       DefaultMasterCompute {
 245  
     @Override
 246  
     public void initialize() throws InstantiationException,
 247  
         IllegalAccessException {
 248  0
       registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
 249  
           LongSumAggregator.class);
 250  0
       registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
 251  
           LongSumAggregator.class);
 252  0
       registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
 253  
           LongSumAggregator.class);
 254  0
       registerAggregator(WORKERS_NUM,
 255  
           LongSumAggregator.class);
 256  0
     }
 257  
   }
 258  
 
 259  
   /**
 260  
    * Actual message computation (messaging in this case)
 261  
    */
 262  0
   public static class RandomMessageComputation extends BasicComputation<
 263  
       LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
 264  
     @Override
 265  
     public void compute(
 266  
         Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
 267  
         Iterable<BytesWritable> messages) throws IOException {
 268  0
       RandomMessageBenchmarkWorkerContext workerContext = getWorkerContext();
 269  0
       if (getSuperstep() < workerContext.getNumSupersteps()) {
 270  0
         for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
 271  0
           workerContext.randomizeMessageBytes();
 272  0
           sendMessageToAllEdges(vertex,
 273  0
               new BytesWritable(workerContext.getMessageBytes()));
 274  0
           long bytesSent = workerContext.getMessageBytes().length *
 275  0
               vertex.getNumEdges();
 276  0
           aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent));
 277  0
           aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES,
 278  0
               new LongWritable(vertex.getNumEdges()));
 279  
         }
 280  
       } else {
 281  0
         vertex.voteToHalt();
 282  
       }
 283  0
     }
 284  
   }
 285  
 
 286  
   @Override
 287  
   public Set<BenchmarkOption> getBenchmarkOptions() {
 288  0
     return Sets.newHashSet(BenchmarkOption.SUPERSTEPS,
 289  
         BenchmarkOption.VERTICES, BenchmarkOption.EDGES_PER_VERTEX,
 290  
         BYTES_PER_MESSAGE, MESSAGES_PER_EDGE, FLUSH_THREADS);
 291  
   }
 292  
 
 293  
   @Override
 294  
   protected void prepareConfiguration(GiraphConfiguration conf,
 295  
       CommandLine cmd) {
 296  0
     conf.setComputationClass(RandomMessageComputation.class);
 297  0
     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
 298  0
     conf.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
 299  0
     conf.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);
 300  0
     conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
 301  0
         BenchmarkOption.VERTICES.getOptionLongValue(cmd));
 302  0
     conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
 303  0
         BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd));
 304  0
     conf.setInt(SUPERSTEP_COUNT,
 305  0
         BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd));
 306  0
     conf.setInt(RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
 307  0
         BYTES_PER_MESSAGE.getOptionIntValue(cmd));
 308  0
     conf.setInt(RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
 309  0
         MESSAGES_PER_EDGE.getOptionIntValue(cmd));
 310  0
     if (FLUSH_THREADS.optionTurnedOn(cmd)) {
 311  0
       conf.setInt(GiraphConstants.MSG_NUM_FLUSH_THREADS,
 312  0
           FLUSH_THREADS.getOptionIntValue(cmd));
 313  
     }
 314  0
   }
 315  
 
 316  
   /**
 317  
    * Execute the benchmark.
 318  
    *
 319  
    * @param args Typically, this is the command line arguments.
 320  
    * @throws Exception Any exception thrown during computation.
 321  
    */
 322  
   public static void main(String[] args) throws Exception {
 323  0
     System.exit(ToolRunner.run(new RandomMessageBenchmark(), args));
 324  0
   }
 325  
 }