Coverage Report - org.apache.giraph.worker.WorkerInputSplitsHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
WorkerInputSplitsHandler
0%
0/19
0%
0/4
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.worker;
 20  
 
 21  
 import org.apache.giraph.comm.WorkerClient;
 22  
 import org.apache.giraph.comm.requests.AskForInputSplitRequest;
 23  
 import org.apache.giraph.io.InputType;
 24  
 
 25  
 import java.util.EnumMap;
 26  
 import java.util.Map;
 27  
 import java.util.concurrent.BlockingQueue;
 28  
 import java.util.concurrent.LinkedBlockingQueue;
 29  
 
 30  
 /**
 31  
  * Requests splits from master and keeps track of them
 32  
  */
 33  
 public class WorkerInputSplitsHandler {
 34  
   /** Worker info of this worker */
 35  
   private final WorkerInfo workerInfo;
 36  
   /** Task id of master */
 37  
   private final int masterTaskId;
 38  
   /** Worker client, used for communication */
 39  
   private final WorkerClient workerClient;
 40  
   /** Map with currently available splits received from master */
 41  
   private final Map<InputType, BlockingQueue<byte[]>> availableInputSplits;
 42  
 
 43  
   /**
 44  
    * Constructor
 45  
    *
 46  
    * @param workerInfo   Worker info of this worker
 47  
    * @param masterTaskId Task id of master
 48  
    * @param workerClient Worker client, used for communication
 49  
    */
 50  
   public WorkerInputSplitsHandler(WorkerInfo workerInfo, int masterTaskId,
 51  0
       WorkerClient workerClient) {
 52  0
     this.workerInfo = workerInfo;
 53  0
     this.masterTaskId = masterTaskId;
 54  0
     this.workerClient = workerClient;
 55  0
     availableInputSplits = new EnumMap<>(InputType.class);
 56  0
     for (InputType inputType : InputType.values()) {
 57  0
       availableInputSplits.put(
 58  
           inputType, new LinkedBlockingQueue<byte[]>());
 59  
     }
 60  0
   }
 61  
 
 62  
   /**
 63  
    * Called when an input split has been received from master, adding it to
 64  
    * the map
 65  
    *
 66  
    * @param splitType            Type of split
 67  
    * @param serializedInputSplit Split
 68  
    */
 69  
   public void receivedInputSplit(InputType splitType,
 70  
       byte[] serializedInputSplit) {
 71  
     try {
 72  0
       availableInputSplits.get(splitType).put(serializedInputSplit);
 73  0
     } catch (InterruptedException e) {
 74  0
       throw new IllegalStateException("Interrupted", e);
 75  0
     }
 76  0
   }
 77  
 
 78  
   /**
 79  
    * Try to reserve an InputSplit for loading.  While InputSplits exists that
 80  
    * are not finished, wait until they are.
 81  
    *
 82  
    * NOTE: iterations on the InputSplit list only halt for each worker when it
 83  
    * has scanned the entire list once and found every split marked RESERVED.
 84  
    * When a worker fails, its Ephemeral RESERVED znodes will disappear,
 85  
    * allowing other iterating workers to claim it's previously read splits.
 86  
    * Only when the last worker left iterating on the list fails can a danger
 87  
    * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
 88  
    * causes job failure, this is OK. As the failure model evolves, this
 89  
    * behavior might need to change. We could add watches on
 90  
    * inputSplitFinishedNodes and stop iterating only when all these nodes
 91  
    * have been created.
 92  
    *
 93  
    * @param splitType Type of split
 94  
    * @param isFirstSplit Whether this is the first split input thread reads
 95  
    * @return reserved InputSplit or null if no unfinished InputSplits exist
 96  
    */
 97  
   public byte[] reserveInputSplit(InputType splitType, boolean isFirstSplit) {
 98  
     // Send request
 99  0
     workerClient.sendWritableRequest(masterTaskId,
 100  
         new AskForInputSplitRequest(
 101  0
             splitType, workerInfo.getTaskId(), isFirstSplit));
 102  
     try {
 103  
       // Wait for some split to become available
 104  0
       byte[] serializedInputSplit = availableInputSplits.get(splitType).take();
 105  0
       return serializedInputSplit.length == 0 ? null : serializedInputSplit;
 106  0
     } catch (InterruptedException e) {
 107  0
       throw new IllegalStateException("Interrupted", e);
 108  
     }
 109  
   }
 110  
 }