Coverage Report - org.apache.giraph.master.input.LocalityAwareInputSplitsMasterOrganizer
 
Classes in this File Line Coverage Branch Coverage Complexity
LocalityAwareInputSplitsMasterOrganizer
0%
0/36
0%
0/20
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.master.input;
 20  
 
 21  
 import org.apache.giraph.worker.WorkerInfo;
 22  
 import org.apache.hadoop.mapreduce.InputSplit;
 23  
 
 24  
 import java.io.IOException;
 25  
 import java.util.HashMap;
 26  
 import java.util.List;
 27  
 import java.util.Map;
 28  
 import java.util.concurrent.ConcurrentLinkedQueue;
 29  
 import java.util.concurrent.atomic.AtomicBoolean;
 30  
 import java.util.concurrent.atomic.AtomicInteger;
 31  
 
 32  
 /**
 33  
  * Input splits organizer for vertex and edge input splits on master, which
 34  
  * uses locality information
 35  
  */
 36  
 public class LocalityAwareInputSplitsMasterOrganizer
 37  
     implements InputSplitsMasterOrganizer {
 38  
   /** All splits before this pointer were taken */
 39  0
   private final AtomicInteger listPointer = new AtomicInteger();
 40  
   /** List of serialized splits */
 41  
   private final List<byte[]> serializedSplits;
 42  
   /** Array containing information about whether a split was taken or not */
 43  
   private final AtomicBoolean[] splitsTaken;
 44  
 
 45  
   /** Map with preferred splits for each worker */
 46  
   private final Map<Integer, ConcurrentLinkedQueue<Integer>>
 47  
       workerToPreferredSplitsMap;
 48  
 
 49  
 
 50  
   /**
 51  
    * Constructor
 52  
    *
 53  
    * @param serializedSplits Serialized splits
 54  
    * @param splits           Splits
 55  
    * @param workers          List of workers
 56  
    */
 57  
   public LocalityAwareInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
 58  0
       List<InputSplit> splits, List<WorkerInfo> workers) {
 59  0
     this.serializedSplits = serializedSplits;
 60  0
     splitsTaken = new AtomicBoolean[serializedSplits.size()];
 61  
     // Mark all splits as not taken initially
 62  0
     for (int i = 0; i < serializedSplits.size(); i++) {
 63  0
       splitsTaken[i] = new AtomicBoolean(false);
 64  
     }
 65  
 
 66  0
     workerToPreferredSplitsMap = new HashMap<>();
 67  0
     for (WorkerInfo worker : workers) {
 68  0
       workerToPreferredSplitsMap.put(worker.getTaskId(),
 69  
           new ConcurrentLinkedQueue<Integer>());
 70  0
     }
 71  
     // Go through all splits
 72  0
     for (int i = 0; i < splits.size(); i++) {
 73  
       try {
 74  0
         String[] locations = splits.get(i).getLocations();
 75  
         // For every worker
 76  0
         for (WorkerInfo worker : workers) {
 77  
           // Check splits locations
 78  0
           for (String location : locations) {
 79  
             // If split is local for the worker, add it to preferred list
 80  0
             if (location.contains(worker.getHostname())) {
 81  0
               workerToPreferredSplitsMap.get(worker.getTaskId()).add(i);
 82  0
               break;
 83  
             }
 84  
           }
 85  0
         }
 86  0
       } catch (IOException | InterruptedException e) {
 87  0
         throw new IllegalStateException(
 88  
             "Exception occurred while getting splits locations", e);
 89  0
       }
 90  
     }
 91  0
   }
 92  
 
 93  
   @Override
 94  
   public byte[] getSerializedSplitFor(int workerTaskId) {
 95  0
     ConcurrentLinkedQueue<Integer> preferredSplits =
 96  0
         workerToPreferredSplitsMap.get(workerTaskId);
 97  
     // Try to find a local split
 98  
     while (true) {
 99  
       // Get position to check
 100  0
       Integer splitIndex = preferredSplits.poll();
 101  
       // Check if all local splits were already processed for this worker
 102  0
       if (splitIndex == null) {
 103  0
         break;
 104  
       }
 105  
       // Try to reserve the split
 106  0
       if (splitsTaken[splitIndex].compareAndSet(false, true)) {
 107  0
         return serializedSplits.get(splitIndex);
 108  
       }
 109  0
     }
 110  
 
 111  
     // No more local splits available, proceed linearly from splits list
 112  
     while (true) {
 113  
       // Get position to check
 114  0
       int splitIndex = listPointer.getAndIncrement();
 115  
       // Check if all splits were already taken
 116  0
       if (splitIndex >= serializedSplits.size()) {
 117  0
         return null;
 118  
       }
 119  
       // Try to reserve the split
 120  0
       if (splitsTaken[splitIndex].compareAndSet(false, true)) {
 121  0
         return serializedSplits.get(splitIndex);
 122  
       }
 123  0
     }
 124  
   }
 125  
 }