Coverage Report - org.apache.giraph.master.input.MasterInputSplitsHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
MasterInputSplitsHandler
0%
0/66
0%
0/18
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.master.input;
 20  
 
 21  
 import org.apache.giraph.comm.MasterClient;
 22  
 import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
 23  
 import org.apache.giraph.conf.StrConfOption;
 24  
 import org.apache.giraph.io.GiraphInputFormat;
 25  
 import org.apache.giraph.io.InputType;
 26  
 import org.apache.giraph.worker.WorkerInfo;
 27  
 import org.apache.hadoop.mapreduce.Counter;
 28  
 import org.apache.hadoop.mapreduce.InputSplit;
 29  
 import org.apache.hadoop.mapreduce.Mapper;
 30  
 
 31  
 import java.io.ByteArrayOutputStream;
 32  
 import java.io.DataOutput;
 33  
 import java.io.DataOutputStream;
 34  
 import java.io.IOException;
 35  
 import java.util.ArrayList;
 36  
 import java.util.EnumMap;
 37  
 import java.util.HashSet;
 38  
 import java.util.List;
 39  
 import java.util.Map;
 40  
 import java.util.Set;
 41  
 import java.util.concurrent.ConcurrentHashMap;
 42  
 import java.util.concurrent.CountDownLatch;
 43  
 import java.util.concurrent.atomic.AtomicInteger;
 44  
 
 45  
 /**
 46  
  * Handler for input splits on master
 47  
  *
 48  
  * Since currently Giraph fails if worker fails while reading input, we
 49  
  * didn't complicate this part with retries yet, later it could be added by
 50  
  * keeping track of which worker got which split and then if worker dies put
 51  
  * these splits back to queues.
 52  
  */
 53  
 public class MasterInputSplitsHandler {
 54  
   /**
 55  
    * Store in counters timestamps when we finished reading
 56  
    * these fractions of input
 57  
    */
 58  0
   public static final StrConfOption DONE_FRACTIONS_TO_STORE_IN_COUNTERS =
 59  
       new StrConfOption("giraph.master.input.doneFractionsToStoreInCounters",
 60  
           "0.99,1", "Store in counters timestamps when we finished reading " +
 61  
           "these fractions of input");
 62  
   /** Map of counter group and names */
 63  0
   private static Map<String, Set<String>> COUNTER_GROUP_AND_NAMES =
 64  
           new ConcurrentHashMap<>();
 65  
 
 66  
   /** Whether to use locality information */
 67  
   private final boolean useLocality;
 68  
   /** Master client */
 69  
   private MasterClient masterClient;
 70  
   /** Master client */
 71  
   private List<WorkerInfo> workers;
 72  
   /** Map of splits organizers for each split type */
 73  0
   private Map<InputType, InputSplitsMasterOrganizer> splitsMap =
 74  
       new EnumMap<>(InputType.class);
 75  
   /** Latches to say when one input splits type is ready to be accessed */
 76  0
   private Map<InputType, CountDownLatch> latchesMap =
 77  
       new EnumMap<>(InputType.class);
 78  
   /** Context for accessing counters */
 79  
   private final Mapper.Context context;
 80  
   /** How many splits per type are there total */
 81  0
   private final Map<InputType, Integer> numSplitsPerType =
 82  
       new EnumMap<>(InputType.class);
 83  
   /** How many splits per type have been read so far */
 84  0
   private final Map<InputType, AtomicInteger> numSplitsReadPerType =
 85  
       new EnumMap<>(InputType.class);
 86  
   /** Timestamps when various splits were created */
 87  0
   private final Map<InputType, Long> splitsCreatedTimestamp =
 88  
       new EnumMap<>(InputType.class);
 89  
   /**
 90  
    * Store in counters timestamps when we finished reading
 91  
    * these fractions of input
 92  
    */
 93  
   private final double[] doneFractionsToStoreInCounters;
 94  
 
 95  
   /**
 96  
    * Constructor
 97  
    *
 98  
    * @param useLocality Whether to use locality information or not
 99  
    * @param context Context for accessing counters
 100  
    */
 101  0
   public MasterInputSplitsHandler(boolean useLocality, Mapper.Context context) {
 102  0
     this.useLocality = useLocality;
 103  0
     this.context = context;
 104  0
     for (InputType inputType : InputType.values()) {
 105  0
       latchesMap.put(inputType, new CountDownLatch(1));
 106  0
       numSplitsReadPerType.put(inputType, new AtomicInteger(0));
 107  
     }
 108  
 
 109  0
     String[] tmp = DONE_FRACTIONS_TO_STORE_IN_COUNTERS.get(
 110  0
         context.getConfiguration()).split(",");
 111  0
     doneFractionsToStoreInCounters = new double[tmp.length];
 112  0
     for (int i = 0; i < tmp.length; i++) {
 113  0
       doneFractionsToStoreInCounters[i] = Double.parseDouble(tmp[i].trim());
 114  
     }
 115  0
   }
 116  
 
 117  
   /**
 118  
    * Initialize
 119  
    *
 120  
    * @param masterClient Master client
 121  
    * @param workers List of workers
 122  
    */
 123  
   public void initialize(MasterClient masterClient, List<WorkerInfo> workers) {
 124  0
     this.masterClient = masterClient;
 125  0
     this.workers = workers;
 126  0
   }
 127  
 
 128  
   /**
 129  
    * Add splits
 130  
    *
 131  
    * @param splitsType Type of splits
 132  
    * @param inputSplits Splits
 133  
    * @param inputFormat Format
 134  
    */
 135  
   public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
 136  
       GiraphInputFormat inputFormat) {
 137  0
     splitsCreatedTimestamp.put(splitsType, System.currentTimeMillis());
 138  0
     List<byte[]> serializedSplits = new ArrayList<>();
 139  0
     for (InputSplit inputSplit : inputSplits) {
 140  
       try {
 141  0
         ByteArrayOutputStream byteArrayOutputStream =
 142  
             new ByteArrayOutputStream();
 143  0
         DataOutput outputStream =
 144  
             new DataOutputStream(byteArrayOutputStream);
 145  0
         inputFormat.writeInputSplit(inputSplit, outputStream);
 146  0
         serializedSplits.add(byteArrayOutputStream.toByteArray());
 147  0
       } catch (IOException e) {
 148  0
         throw new IllegalStateException("IOException occurred", e);
 149  0
       }
 150  0
     }
 151  
     InputSplitsMasterOrganizer inputSplitsOrganizer;
 152  0
     if (splitsType == InputType.MAPPING) {
 153  0
       inputSplitsOrganizer = new MappingInputSplitsMasterOrganizer(
 154  
           serializedSplits, workers);
 155  
     } else {
 156  0
       inputSplitsOrganizer = useLocality ?
 157  
           new LocalityAwareInputSplitsMasterOrganizer(serializedSplits,
 158  
               inputSplits, workers) :
 159  
           new BasicInputSplitsMasterOrganizer(serializedSplits);
 160  
     }
 161  0
     splitsMap.put(splitsType, inputSplitsOrganizer);
 162  0
     latchesMap.get(splitsType).countDown();
 163  0
     numSplitsPerType.put(splitsType, serializedSplits.size());
 164  0
   }
 165  
 
 166  
   /**
 167  
    * Called after we receive a split request from some worker, should send
 168  
    * split back to it if available, or send it information that there is no
 169  
    * more available
 170  
    *
 171  
    * @param splitType Type of split requested
 172  
    * @param workerTaskId Id of worker who requested split
 173  
    * @param isFirstSplit Whether this is the first split a thread is requesting,
 174  
    *   or this request indicates that previously requested input split was done
 175  
    */
 176  
   public void sendSplitTo(InputType splitType, int workerTaskId,
 177  
       boolean isFirstSplit) {
 178  
     try {
 179  
       // Make sure we don't try to retrieve splits before they were added
 180  0
       latchesMap.get(splitType).await();
 181  0
     } catch (InterruptedException e) {
 182  0
       throw new IllegalStateException("Interrupted", e);
 183  0
     }
 184  0
     byte[] serializedInputSplit =
 185  0
         splitsMap.get(splitType).getSerializedSplitFor(workerTaskId);
 186  0
     masterClient.sendWritableRequest(workerTaskId,
 187  
         new ReplyWithInputSplitRequest(splitType,
 188  
             serializedInputSplit == null ? new byte[0] : serializedInputSplit));
 189  0
     if (!isFirstSplit) {
 190  0
       incrementSplitsRead(splitType);
 191  
     }
 192  0
   }
 193  
 
 194  
   /**
 195  
    * Increment splits read
 196  
    *
 197  
    * @param splitType Type of split which was read
 198  
    */
 199  
   private void incrementSplitsRead(InputType splitType) {
 200  0
     int splitsRead = numSplitsReadPerType.get(splitType).incrementAndGet();
 201  0
     int splits = numSplitsPerType.get(splitType);
 202  0
     for (int i = 0; i < doneFractionsToStoreInCounters.length; i++) {
 203  0
       if (splitsRead == (int) (splits * doneFractionsToStoreInCounters[i])) {
 204  0
         splitFractionReached(
 205  
             splitType, doneFractionsToStoreInCounters[i], context);
 206  
       }
 207  
     }
 208  0
   }
 209  
 
 210  
   /**
 211  
    * Call when we reached some fraction of split type done to set the
 212  
    * timestamp counter
 213  
    *
 214  
    * @param inputType Type of input
 215  
    * @param fraction Which fraction of input type was done reading
 216  
    * @param context Context for accessing counters
 217  
    */
 218  
   private void splitFractionReached(
 219  
       InputType inputType, double fraction, Mapper.Context context) {
 220  0
     getSplitFractionDoneTimestampCounter(inputType, fraction, context).setValue(
 221  0
         System.currentTimeMillis() - splitsCreatedTimestamp.get(inputType));
 222  0
   }
 223  
 
 224  
   /**
 225  
    * Get counter
 226  
    *
 227  
    * @param inputType Type of input for counter
 228  
    * @param fraction Fraction for counter
 229  
    * @param context Context to get counter from
 230  
    * @return Counter
 231  
    */
 232  
   public static Counter getSplitFractionDoneTimestampCounter(
 233  
       InputType inputType, double fraction, Mapper.Context context) {
 234  0
     String groupName = inputType.name() + " input";
 235  0
     String counterName = String.format("%.2f%% done time (ms)", fraction * 100);
 236  0
     Set<String> counters = COUNTER_GROUP_AND_NAMES.getOrDefault(
 237  
             groupName, new HashSet<>());
 238  0
     counters.add(counterName);
 239  0
     COUNTER_GROUP_AND_NAMES.put(groupName, counters);
 240  0
     return context.getCounter(groupName, counterName);
 241  
   }
 242  
 
 243  
   public static Map<String, Set<String>> getCounterGroupAndNames() {
 244  0
     return COUNTER_GROUP_AND_NAMES;
 245  
   }
 246  
 }