Coverage Report - org.apache.giraph.block_app.framework.internal.BlockWorkerPieces
 
Classes in this File Line Coverage Branch Coverage Complexity
BlockWorkerPieces
0%
0/67
0%
0/22
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.internal;
 19  
 
 20  
 import java.util.ArrayList;
 21  
 import java.util.Arrays;
 22  
 import java.util.Objects;
 23  
 
 24  
 import org.apache.giraph.block_app.framework.api.BlockApiHandle;
 25  
 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 26  
 import org.apache.giraph.conf.DefaultMessageClasses;
 27  
 import org.apache.giraph.conf.GiraphConstants;
 28  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 29  
 import org.apache.giraph.conf.MessageClasses;
 30  
 import org.apache.giraph.counters.CustomCounter;
 31  
 import org.apache.giraph.counters.CustomCounters;
 32  
 import org.apache.giraph.factories.DefaultMessageValueFactory;
 33  
 import org.apache.giraph.master.MasterCompute;
 34  
 import org.apache.giraph.types.NoMessage;
 35  
 import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
 36  
 import org.apache.giraph.utils.WritableUtils;
 37  
 import org.apache.giraph.worker.WorkerGlobalCommUsage;
 38  
 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
 39  
 import org.apache.hadoop.io.IntWritable;
 40  
 import org.apache.hadoop.io.Writable;
 41  
 import org.apache.log4j.Logger;
 42  
 
 43  
 /**
 44  
  * Pair of pieces to be executed on workers in a superstep
 45  
  *
 46  
  * @param <S> Execution stage type
 47  
  */
 48  
 @SuppressWarnings({ "rawtypes", "unchecked" })
 49  
 public class BlockWorkerPieces<S> {
 50  0
   private static final Logger LOG = Logger.getLogger(BlockWorkerPieces.class);
 51  
 
 52  
   /** Aggregator holding next worker computation */
 53  
   private static final
 54  
   String NEXT_WORKER_PIECES = "giraph.blocks.next_worker_pieces";
 55  
 
 56  
   /** Passed worker stats counter group */
 57  
   private static final String PASSED_WORKER_STATS_GROUP = "PassedWorker Stats";
 58  
 
 59  
   /** Total serialised size counter name */
 60  
   private static final String TOTAL_SERIALISED_SIZE_NAME =
 61  
           "total serialized size";
 62  
 
 63  
   /** Split parts counter name */
 64  
   private static final String SPLIT_PARTS_NAME = "split parts";
 65  
 
 66  
   private final PairedPieceAndStage<S> receiver;
 67  
   private final PairedPieceAndStage<S> sender;
 68  
   private final BlockApiHandle blockApiHandle;
 69  
 
 70  
   public BlockWorkerPieces(
 71  
       PairedPieceAndStage<S> receiver, PairedPieceAndStage<S> sender,
 72  0
       BlockApiHandle blockApiHandle) {
 73  0
     this.receiver = receiver;
 74  0
     this.sender = sender;
 75  0
     this.blockApiHandle = blockApiHandle;
 76  0
   }
 77  
 
 78  
   public PairedPieceAndStage<S> getReceiver() {
 79  0
     return receiver;
 80  
   }
 81  
 
 82  
   public PairedPieceAndStage<S> getSender() {
 83  0
     return sender;
 84  
   }
 85  
 
 86  
   public BlockApiHandle getBlockApiHandle() {
 87  0
     return blockApiHandle;
 88  
   }
 89  
 
 90  
   public MessageClasses getOutgoingMessageClasses(
 91  
       ImmutableClassesGiraphConfiguration conf) {
 92  
     MessageClasses messageClasses;
 93  0
     if (sender == null || sender.getPiece() == null) {
 94  0
       messageClasses = new DefaultMessageClasses(
 95  
           NoMessage.class,
 96  
           DefaultMessageValueFactory.class,
 97  
           null,
 98  
           MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION);
 99  
     } else {
 100  0
       messageClasses = sender.getPiece().getMessageClasses(conf);
 101  
     }
 102  
 
 103  0
     messageClasses.verifyConsistent(conf);
 104  0
     return messageClasses;
 105  
   }
 106  
 
 107  
   @Override
 108  
   public String toString() {
 109  0
     return "[receiver=" + receiver + ",sender=" + sender + "]";
 110  
   }
 111  
 
 112  
   public String toStringShort() {
 113  0
     String receiverString =
 114  0
         Objects.toString(receiver != null ? receiver.getPiece() : null);
 115  0
     String senderString =
 116  0
         Objects.toString(sender != null ? sender.getPiece() : null);
 117  0
     if (receiverString.equals(senderString)) {
 118  0
       return "[receiver&sender=" + receiverString + "]";
 119  
     } else {
 120  0
       return "[receiver=" + receiverString + ",sender=" + senderString + "]";
 121  
     }
 122  
   }
 123  
 
 124  
   /**
 125  
    * Sets which WorkerComputation is going to be executed in the next superstep.
 126  
    */
 127  
   public static <S> void setNextWorkerPieces(
 128  
       MasterCompute master, BlockWorkerPieces<S> nextWorkerPieces) {
 129  0
     Writable toBroadcast = new KryoWritableWrapper<>(nextWorkerPieces);
 130  0
     byte[] data = WritableUtils.toByteArrayUnsafe(toBroadcast);
 131  
 
 132  
     // TODO: extract splitting logic into common utility
 133  0
     int overhead = 4096;
 134  0
     int singleSize = Math.max(
 135  
         overhead,
 136  0
         GiraphConstants.MAX_MSG_REQUEST_SIZE.get(master.getConf()) - overhead);
 137  
 
 138  0
     ArrayList<byte[]> splittedData = new ArrayList<>();
 139  0
     if (data.length < singleSize) {
 140  0
       splittedData.add(data);
 141  
     } else {
 142  0
       for (int start = 0; start < data.length; start += singleSize) {
 143  0
         splittedData.add(Arrays.copyOfRange(
 144  0
             data, start, Math.min(data.length, start + singleSize)));
 145  
       }
 146  
     }
 147  
 
 148  0
     LOG.info("Next worker piece - total serialized size: " + data.length +
 149  0
         ", split into " + splittedData.size());
 150  0
     CustomCounters.addCustomCounter(PASSED_WORKER_STATS_GROUP,
 151  
             TOTAL_SERIALISED_SIZE_NAME, CustomCounter.Aggregation.SUM);
 152  0
     master.getContext().getCounter(PASSED_WORKER_STATS_GROUP,
 153  
             TOTAL_SERIALISED_SIZE_NAME)
 154  0
         .increment(data.length);
 155  0
     CustomCounters.addCustomCounter(PASSED_WORKER_STATS_GROUP,
 156  
             SPLIT_PARTS_NAME, CustomCounter.Aggregation.SUM);
 157  0
     master.getContext().getCounter(PASSED_WORKER_STATS_GROUP, SPLIT_PARTS_NAME)
 158  0
         .increment(splittedData.size());
 159  
 
 160  0
     master.broadcast(NEXT_WORKER_PIECES, new IntWritable(splittedData.size()));
 161  
 
 162  0
     for (int i = 0; i < splittedData.size(); i++) {
 163  0
       master.broadcast(NEXT_WORKER_PIECES + "_part_" + i,
 164  0
           KryoWritableWrapper.wrapIfNeeded(splittedData.get(i)));
 165  
     }
 166  
 
 167  0
     master.setOutgoingMessageClasses(
 168  0
         nextWorkerPieces.getOutgoingMessageClasses(master.getConf()));
 169  0
   }
 170  
 
 171  
   public static <S> BlockWorkerPieces<S> getNextWorkerPieces(
 172  
       WorkerGlobalCommUsage worker) {
 173  0
     int splits = worker.<IntWritable>getBroadcast(NEXT_WORKER_PIECES).get();
 174  
 
 175  0
     int totalLength = 0;
 176  0
     ArrayList<byte[]> splittedData = new ArrayList<>();
 177  0
     for (int i = 0; i < splits; i++) {
 178  0
       byte[] cur = KryoWritableWrapper.<byte[]>unwrapIfNeeded(
 179  0
           worker.getBroadcast(NEXT_WORKER_PIECES + "_part_" + i));
 180  0
       splittedData.add(cur);
 181  0
       totalLength += cur.length;
 182  
     }
 183  
 
 184  
     byte[] merged;
 185  0
     if (splits == 1) {
 186  0
       merged = splittedData.get(0);
 187  
     } else {
 188  0
       merged = new byte[totalLength];
 189  0
       int index = 0;
 190  0
       for (int i = 0; i < splits; i++) {
 191  0
         System.arraycopy(
 192  0
             splittedData.get(i), 0, merged, index, splittedData.get(i).length);
 193  0
         index += splittedData.get(i).length;
 194  
       }
 195  
     }
 196  
 
 197  0
     KryoWritableWrapper<BlockWorkerPieces<S>> wrapper =
 198  
         new KryoWritableWrapper<>();
 199  0
     WritableUtils.fromByteArrayUnsafe(
 200  
         merged, wrapper, new UnsafeReusableByteArrayInput());
 201  0
     return wrapper.get();
 202  
   }
 203  
 }