Coverage Report - org.apache.giraph.block_app.framework.internal.BlockMasterLogic
 
Classes in this File Line Coverage Branch Coverage Complexity
BlockMasterLogic
0%
0/77
0%
0/24
0
BlockMasterLogic$1
0%
0/8
0%
0/2
0
BlockMasterLogic$TimeStatsPerEvent
0%
0/33
0%
0/6
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.internal;
 19  
 
 20  
 import java.util.HashSet;
 21  
 import java.util.Iterator;
 22  
 import java.util.Map;
 23  
 import java.util.Map.Entry;
 24  
 import java.util.TreeMap;
 25  
 
 26  
 import org.apache.commons.lang3.time.DurationFormatUtils;
 27  
 import org.apache.giraph.block_app.framework.BlockFactory;
 28  
 import org.apache.giraph.block_app.framework.BlockUtils;
 29  
 import org.apache.giraph.block_app.framework.api.BlockApiHandle;
 30  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 31  
 import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
 32  
 import org.apache.giraph.block_app.framework.block.Block;
 33  
 import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
 34  
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 35  
 import org.apache.giraph.conf.GiraphConfiguration;
 36  
 import org.apache.giraph.function.Consumer;
 37  
 import org.apache.giraph.writable.tuple.IntLongWritable;
 38  
 import org.apache.log4j.Logger;
 39  
 import com.google.common.base.Preconditions;
 40  
 
 41  
 /**
 42  
  * Block execution logic on master, iterating over Pieces of the
 43  
  * application Block, executing master logic, and providing what needs to be
 44  
  * executed on the workers.
 45  
  *
 46  
  * @param <S> Execution stage type
 47  
  */
 48  
 @SuppressWarnings("rawtypes")
 49  0
 public class BlockMasterLogic<S> {
 50  0
   private static final Logger LOG = Logger.getLogger(BlockMasterLogic.class);
 51  
 
 52  
   private Iterator<AbstractPiece> pieceIterator;
 53  
   private PairedPieceAndStage<S> previousPiece;
 54  
   private transient BlockMasterApi masterApi;
 55  0
   private long lastTimestamp = -1;
 56  
   private BlockWorkerPieces previousWorkerPieces;
 57  
   private boolean computationDone;
 58  
   private BlockApiHandle blockApiHandle;
 59  
 
 60  
   /** Tracks elapsed time on master for each distinct Piece */
 61  0
   private final TimeStatsPerEvent masterPerPieceTimeStats =
 62  
       new TimeStatsPerEvent("master");
 63  
   /** Tracks elapsed time on workers for each pair of recieve/send pieces. */
 64  0
   private final TimeStatsPerEvent workerPerPieceTimeStats =
 65  
       new TimeStatsPerEvent("worker");
 66  
 
 67  
   /**
 68  
    * Initialize master logic to execute BlockFactory defined in
 69  
    * the configuration.
 70  
    */
 71  
   public void initialize(
 72  
       GiraphConfiguration conf, final BlockMasterApi masterApi) {
 73  0
     BlockFactory<S> factory = BlockUtils.createBlockFactory(conf);
 74  0
     initialize(factory.createBlock(conf), factory.createExecutionStage(conf),
 75  
         masterApi);
 76  0
   }
 77  
 
 78  
   /**
 79  
    * Initialize Master Logic to execute given block, starting
 80  
    * with given executionStage.
 81  
    */
 82  
   public void initialize(
 83  
       Block executionBlock, S executionStage, final BlockMasterApi masterApi) {
 84  0
     this.masterApi = masterApi;
 85  0
     this.computationDone = false;
 86  
 
 87  0
     LOG.info("Executing application - " + executionBlock);
 88  0
     if (executionBlock instanceof BlockWithApiHandle) {
 89  0
       blockApiHandle =
 90  0
         ((BlockWithApiHandle) executionBlock).getBlockApiHandle();
 91  
     }
 92  0
     if (blockApiHandle == null) {
 93  0
       blockApiHandle = new BlockApiHandle();
 94  
     }
 95  0
     blockApiHandle.setMasterApi(masterApi);
 96  
 
 97  
     // We register all possible aggregators at the beginning
 98  0
     executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
 99  0
       private final HashSet<AbstractPiece> registeredPieces = new HashSet<>();
 100  
       @SuppressWarnings("deprecation")
 101  
       @Override
 102  
       public void apply(AbstractPiece piece) {
 103  
         // no need to register the same piece twice.
 104  0
         if (registeredPieces.add(piece)) {
 105  
           try {
 106  0
             piece.registerAggregators(masterApi);
 107  0
           } catch (InstantiationException | IllegalAccessException e) {
 108  0
             throw new RuntimeException(e);
 109  0
           }
 110  
         }
 111  0
       }
 112  
     });
 113  
 
 114  0
     pieceIterator = executionBlock.iterator();
 115  
     // Invariant is that ReceiveWorkerPiece of previousPiece has already been
 116  
     // executed and that previousPiece.nextExecutionStage() should be used for
 117  
     // iterating. So passing piece as null, and initial state as current state,
 118  
     // so that nothing get's executed in first half, and calculateNextState
 119  
     // returns initial state.
 120  0
     previousPiece = new PairedPieceAndStage<>(null, executionStage);
 121  0
   }
 122  
 
 123  
   /**
 124  
    * Initialize object after deserializing it.
 125  
    * BlockMasterApi is not serializable, so it is transient, and set via this
 126  
    * method afterwards.
 127  
    */
 128  
   public void initializeAfterRead(BlockMasterApi masterApi) {
 129  0
     this.masterApi = masterApi;
 130  0
   }
 131  
 
 132  
   /**
 133  
    * Executes operations on master (master compute and registering reducers),
 134  
    * and calculates next pieces to be exectued on workers.
 135  
    *
 136  
    * @param superstep Current superstep
 137  
    * @return Next BlockWorkerPieces to be executed on workers, or null
 138  
    *         if computation should be halted.
 139  
    */
 140  
   public BlockWorkerPieces<S> computeNext(long superstep) {
 141  0
     long beforeMaster = System.currentTimeMillis();
 142  0
     if (lastTimestamp != -1) {
 143  0
       BlockCounters.setWorkerTimeCounter(
 144  
           previousWorkerPieces, superstep - 1,
 145  
           beforeMaster - lastTimestamp, masterApi, workerPerPieceTimeStats);
 146  
     }
 147  
 
 148  0
     if (previousPiece == null) {
 149  0
       postApplication();
 150  0
       return null;
 151  
     } else {
 152  0
       boolean logExecutionStatus =
 153  0
           BlockUtils.LOG_EXECUTION_STATUS.get(masterApi.getConf());
 154  0
       if (logExecutionStatus) {
 155  0
         LOG.info("Master executing " + previousPiece +
 156  
             ", in superstep " + superstep);
 157  
       }
 158  0
       previousPiece.masterCompute(masterApi);
 159  0
       ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
 160  0
           returnAllWriters();
 161  0
       long afterMaster = System.currentTimeMillis();
 162  
 
 163  0
       if (previousPiece.getPiece() != null) {
 164  0
         BlockCounters.setMasterTimeCounter(
 165  
             previousPiece, superstep, afterMaster - beforeMaster, masterApi,
 166  
             masterPerPieceTimeStats);
 167  
       }
 168  
 
 169  
       PairedPieceAndStage<S> nextPiece;
 170  0
       if (pieceIterator.hasNext()) {
 171  0
         nextPiece = new PairedPieceAndStage<S>(
 172  0
             pieceIterator.next(), previousPiece.nextExecutionStage());
 173  0
         nextPiece.registerReducers(masterApi);
 174  
       } else {
 175  0
         nextPiece = null;
 176  
       }
 177  0
       BlockCounters.setStageCounters(
 178  0
           "Master finished stage: ", previousPiece.getExecutionStage(),
 179  
           masterApi);
 180  0
       if (logExecutionStatus) {
 181  0
         LOG.info(
 182  
             "Master passing next " + nextPiece + ", in superstep " + superstep);
 183  
       }
 184  
 
 185  
       // if there is nothing more to compute, no need for additional superstep
 186  
       // this can only happen if application uses no pieces.
 187  
       BlockWorkerPieces<S> result;
 188  0
       if (previousPiece.getPiece() == null && nextPiece == null) {
 189  0
         postApplication();
 190  0
         result = null;
 191  
       } else {
 192  0
         result = new BlockWorkerPieces<>(
 193  
           previousPiece, nextPiece, blockApiHandle);
 194  0
         if (logExecutionStatus) {
 195  0
           LOG.info("Master in " + superstep + " superstep passing " +
 196  
               result + " to be executed");
 197  
         }
 198  
       }
 199  
 
 200  0
       previousPiece = nextPiece;
 201  0
       lastTimestamp = afterMaster;
 202  0
       previousWorkerPieces = result;
 203  0
       return result;
 204  
     }
 205  
   }
 206  
 
 207  
   /**
 208  
    * Clean up any master state, after application has finished.
 209  
    */
 210  
   private void postApplication() {
 211  0
     ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
 212  0
         closeAllWriters();
 213  0
     Preconditions.checkState(!computationDone);
 214  0
     computationDone = true;
 215  0
     IntLongWritable masterTimes = masterPerPieceTimeStats.logTimeSums();
 216  0
     IntLongWritable workerTimes = workerPerPieceTimeStats.logTimeSums();
 217  0
     LOG.info("Time split:\n" +
 218  0
         TimeStatsPerEvent.header() +
 219  0
         TimeStatsPerEvent.line(
 220  0
             masterTimes.getLeft().get(),
 221  0
             100.0 * masterTimes.getRight().get() /
 222  0
               (masterTimes.getRight().get() + workerTimes.getRight().get()),
 223  0
             masterTimes.getRight().get(),
 224  
             "master") +
 225  0
         TimeStatsPerEvent.line(
 226  0
             workerTimes.getLeft().get(),
 227  0
             100.0 * workerTimes.getRight().get() /
 228  0
               (masterTimes.getRight().get() + workerTimes.getRight().get()),
 229  0
             workerTimes.getRight().get(),
 230  
             "worker"));
 231  0
   }
 232  
 
 233  
   /**
 234  
    * Class tracking invocation count and elapsed time for a set of events,
 235  
    * each event being having a String name.
 236  
    */
 237  
   public static class TimeStatsPerEvent {
 238  
     private final String groupName;
 239  0
     private final Map<String, IntLongWritable> keyToCountAndTime =
 240  
         new TreeMap<>();
 241  
 
 242  0
     public TimeStatsPerEvent(String groupName) {
 243  0
       this.groupName = groupName;
 244  0
     }
 245  
 
 246  
     public void inc(String name, long millis) {
 247  0
       IntLongWritable val = keyToCountAndTime.get(name);
 248  0
       if (val == null) {
 249  0
         val = new IntLongWritable();
 250  0
         keyToCountAndTime.put(name, val);
 251  
       }
 252  0
       val.getLeft().set(val.getLeft().get() + 1);
 253  0
       val.getRight().set(val.getRight().get() + millis);
 254  0
     }
 255  
 
 256  
     public IntLongWritable logTimeSums() {
 257  0
       StringBuilder sb = new StringBuilder("Time sums " + groupName + ":\n");
 258  0
       sb.append(header());
 259  0
       long total = 0;
 260  0
       int count = 0;
 261  
       for (Entry<String, IntLongWritable> entry :
 262  0
             keyToCountAndTime.entrySet()) {
 263  0
         total += entry.getValue().getRight().get();
 264  0
         count += entry.getValue().getLeft().get();
 265  0
       }
 266  
 
 267  
       for (Entry<String, IntLongWritable> entry :
 268  0
             keyToCountAndTime.entrySet()) {
 269  0
         sb.append(line(
 270  0
             entry.getValue().getLeft().get(),
 271  0
             (100.0 * entry.getValue().getRight().get()) / total,
 272  0
             entry.getValue().getRight().get(),
 273  0
             entry.getKey()));
 274  0
       }
 275  0
       LOG.info(sb);
 276  0
       return new IntLongWritable(count, total);
 277  
     }
 278  
 
 279  
     public static String header() {
 280  0
       return String.format(
 281  
           "%10s%10s%11s   %s%n", "count", "time %", "time", "name");
 282  
     }
 283  
 
 284  
     public static String line(
 285  
         int count, double percTime, long time, String name) {
 286  0
       return String.format("%10d%9.2f%%%11s   %s%n",
 287  0
           count,
 288  0
           percTime,
 289  0
           DurationFormatUtils.formatDuration(time, "HH:mm:ss"),
 290  
           name);
 291  
     }
 292  
   }
 293  
 }