Coverage Report - org.apache.giraph.partition.PartitionBalancer
 
Classes in this File Line Coverage Branch Coverage Complexity
PartitionBalancer
0%
0/75
0%
0/41
3.75
PartitionBalancer$1
0%
0/1
N/A
3.75
PartitionBalancer$BalanceValue
0%
0/4
N/A
3.75
PartitionBalancer$PartitionOwnerComparator
0%
0/8
N/A
3.75
PartitionBalancer$WorkerInfoAssignments
0%
0/20
0%
0/6
3.75
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.partition;
 20  
 
 21  
 import org.apache.giraph.worker.WorkerInfo;
 22  
 import org.apache.hadoop.conf.Configuration;
 23  
 import org.apache.log4j.Logger;
 24  
 
 25  
 import com.google.common.base.Objects;
 26  
 
 27  
 import java.util.ArrayList;
 28  
 import java.util.Collection;
 29  
 import java.util.Collections;
 30  
 import java.util.Comparator;
 31  
 import java.util.HashMap;
 32  
 import java.util.HashSet;
 33  
 import java.util.List;
 34  
 import java.util.Map;
 35  
 import java.util.PriorityQueue;
 36  
 import java.util.Set;
 37  
 
 38  
 /**
 39  
  * Helper class for balancing partitions across a set of workers.
 40  
  */
 41  0
 public class PartitionBalancer {
 42  
   /** Partition balancing algorithm */
 43  
   public static final String PARTITION_BALANCE_ALGORITHM =
 44  
     "hash.partitionBalanceAlgorithm";
 45  
   /** No rebalancing during the supersteps */
 46  
   public static final String STATIC_BALANCE_ALGORITHM =
 47  
     "static";
 48  
   /** Rebalance across supersteps by edges */
 49  
   public static final String EGDE_BALANCE_ALGORITHM =
 50  
     "edges";
 51  
   /** Rebalance across supersteps by vertices */
 52  
   public static final String VERTICES_BALANCE_ALGORITHM =
 53  
     "vertices";
 54  
   /** Class logger */
 55  0
   private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
 56  
 
 57  
   /**
 58  
    * What value to balance partitions with?  Edges, vertices?
 59  
    */
 60  0
   private enum BalanceValue {
 61  
     /** Not chosen */
 62  0
     UNSET,
 63  
     /** Balance with edges */
 64  0
     EDGES,
 65  
     /** Balance with vertices */
 66  0
     VERTICES
 67  
   }
 68  
 
 69  
   /**
 70  
    * Do not construct this class.
 71  
    */
 72  0
   private PartitionBalancer() { }
 73  
 
 74  
   /**
 75  
    * Get the value used to balance.
 76  
    *
 77  
    * @param partitionStat Stats of this partition.
 78  
    * @param balanceValue Type of the value to balance.
 79  
    * @return Balance value.
 80  
    */
 81  
   private static long getBalanceValue(PartitionStats partitionStat,
 82  
       BalanceValue balanceValue) {
 83  0
     switch (balanceValue) {
 84  
     case EDGES:
 85  0
       return partitionStat.getEdgeCount();
 86  
     case VERTICES:
 87  0
       return partitionStat.getVertexCount();
 88  
     default:
 89  0
       throw new IllegalArgumentException(
 90  
           "getBalanceValue: Illegal balance value " + balanceValue);
 91  
     }
 92  
   }
 93  
 
 94  
   /**
 95  
    * Used to sort the partition owners from lowest value to highest value
 96  
    */
 97  0
   private static class PartitionOwnerComparator implements
 98  
       Comparator<PartitionOwner> {
 99  
     /** Map of owner to stats */
 100  
     private final Map<PartitionOwner, PartitionStats> ownerStatMap;
 101  
     /** Value type to compare on */
 102  
     private final BalanceValue balanceValue;
 103  
 
 104  
 
 105  
     /**
 106  
      * Only constructor.
 107  
      *
 108  
      * @param ownerStatMap Map of owners to stats.
 109  
      * @param balanceValue Value to balance with.
 110  
      */
 111  
     public PartitionOwnerComparator(
 112  
         Map<PartitionOwner, PartitionStats> ownerStatMap,
 113  0
         BalanceValue balanceValue) {
 114  0
       this.ownerStatMap = ownerStatMap;
 115  0
       this.balanceValue = balanceValue;
 116  0
     }
 117  
 
 118  
     @Override
 119  
     public int compare(PartitionOwner owner1, PartitionOwner owner2) {
 120  0
       return (int)
 121  0
           (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
 122  0
               getBalanceValue(ownerStatMap.get(owner2), balanceValue));
 123  
     }
 124  
   }
 125  
 
 126  
   /**
 127  
    * Structure to keep track of how much value a {@link WorkerInfo} has
 128  
    * been assigned.
 129  
    */
 130  0
   private static class WorkerInfoAssignments implements
 131  
       Comparable<WorkerInfoAssignments> {
 132  
     /** Worker info associated */
 133  
     private final WorkerInfo workerInfo;
 134  
     /** Balance value */
 135  
     private final BalanceValue balanceValue;
 136  
     /** Map of owner to stats */
 137  
     private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
 138  
     /** Current value of this object */
 139  0
     private long value = 0;
 140  
 
 141  
     /**
 142  
      * Constructor with final values.
 143  
      *
 144  
      * @param workerInfo Worker info for assignment.
 145  
      * @param balanceValue Value used to balance.
 146  
      * @param ownerStatsMap Map of owner to stats.
 147  
      */
 148  
     public WorkerInfoAssignments(
 149  
         WorkerInfo workerInfo,
 150  
         BalanceValue balanceValue,
 151  0
         Map<PartitionOwner, PartitionStats> ownerStatsMap) {
 152  0
       this.workerInfo = workerInfo;
 153  0
       this.balanceValue = balanceValue;
 154  0
       this.ownerStatsMap = ownerStatsMap;
 155  0
     }
 156  
 
 157  
     /**
 158  
      * Get the total value of all partitions assigned to this worker.
 159  
      *
 160  
      * @return Total value of all partition assignments.
 161  
      */
 162  
     public long getValue() {
 163  0
       return value;
 164  
     }
 165  
 
 166  
     /**
 167  
      * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
 168  
      *
 169  
      * @param partitionOwner PartitionOwner to assign.
 170  
      */
 171  
     public void assignPartitionOwner(
 172  
         PartitionOwner partitionOwner) {
 173  0
       value += getBalanceValue(ownerStatsMap.get(partitionOwner),
 174  
           balanceValue);
 175  0
       if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
 176  0
         partitionOwner.setPreviousWorkerInfo(
 177  0
             partitionOwner.getWorkerInfo());
 178  0
         partitionOwner.setWorkerInfo(workerInfo);
 179  
       } else {
 180  0
         partitionOwner.setPreviousWorkerInfo(null);
 181  
       }
 182  0
     }
 183  
 
 184  
     @Override
 185  
     public int compareTo(WorkerInfoAssignments other) {
 186  0
       return (int)
 187  0
           (getValue() - ((WorkerInfoAssignments) other).getValue());
 188  
     }
 189  
 
 190  
     @Override
 191  
     public boolean equals(Object obj) {
 192  0
       return obj instanceof WorkerInfoAssignments &&
 193  0
           compareTo((WorkerInfoAssignments) obj) == 0;
 194  
     }
 195  
 
 196  
     @Override
 197  
     public int hashCode() {
 198  0
       return Objects.hashCode(value);
 199  
     }
 200  
   }
 201  
 
 202  
   /**
 203  
    * Balance the partitions with an algorithm based on a value.
 204  
    *
 205  
    * @param conf Configuration to find the algorithm
 206  
    * @param partitionOwners All the owners of all partitions
 207  
    * @param allPartitionStats All the partition stats
 208  
    * @param availableWorkerInfos All the available workers
 209  
    * @return Balanced partition owners
 210  
    */
 211  
   public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
 212  
       Configuration conf,
 213  
       Collection<PartitionOwner> partitionOwners,
 214  
       Collection<PartitionStats> allPartitionStats,
 215  
       Collection<WorkerInfo> availableWorkerInfos) {
 216  
 
 217  0
     String balanceAlgorithm =
 218  0
         conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
 219  0
     if (LOG.isInfoEnabled()) {
 220  0
       LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
 221  
           balanceAlgorithm);
 222  
     }
 223  0
     BalanceValue balanceValue = BalanceValue.UNSET;
 224  0
     if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
 225  0
       return partitionOwners;
 226  0
     } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
 227  0
       balanceValue = BalanceValue.EDGES;
 228  0
     } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
 229  0
       balanceValue = BalanceValue.VERTICES;
 230  
     } else {
 231  0
       throw new IllegalArgumentException(
 232  
           "balancePartitionsAcrossWorkers: Illegal balance " +
 233  
               "algorithm - " + balanceAlgorithm);
 234  
     }
 235  
 
 236  
     // Join the partition stats and partition owners by partition id
 237  0
     Map<Integer, PartitionStats> idStatMap =
 238  
         new HashMap<Integer, PartitionStats>();
 239  0
     for (PartitionStats partitionStats : allPartitionStats) {
 240  0
       if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
 241  
           null) {
 242  0
         throw new IllegalStateException(
 243  
             "balancePartitionsAcrossWorkers: Duplicate partition id " +
 244  
                 "for " + partitionStats);
 245  
       }
 246  0
     }
 247  0
     Map<PartitionOwner, PartitionStats> ownerStatsMap =
 248  
         new HashMap<PartitionOwner, PartitionStats>();
 249  0
     for (PartitionOwner partitionOwner : partitionOwners) {
 250  0
       PartitionStats partitionStats =
 251  0
           idStatMap.get(partitionOwner.getPartitionId());
 252  0
       if (partitionStats == null) {
 253  0
         throw new IllegalStateException(
 254  
             "balancePartitionsAcrossWorkers: Missing partition " +
 255  
                 "stats for " + partitionOwner);
 256  
       }
 257  0
       if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
 258  0
         throw new IllegalStateException(
 259  
             "balancePartitionsAcrossWorkers: Duplicate partition " +
 260  
                 "owner " + partitionOwner);
 261  
       }
 262  0
     }
 263  0
     if (ownerStatsMap.size() != partitionOwners.size()) {
 264  0
       throw new IllegalStateException(
 265  
           "balancePartitionsAcrossWorkers: ownerStats count = " +
 266  0
               ownerStatsMap.size() + ", partitionOwners count = " +
 267  0
               partitionOwners.size() + " and should match.");
 268  
     }
 269  
 
 270  0
     List<WorkerInfoAssignments> workerInfoAssignmentsList =
 271  0
         new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
 272  0
     for (WorkerInfo workerInfo : availableWorkerInfos) {
 273  0
       workerInfoAssignmentsList.add(
 274  
           new WorkerInfoAssignments(
 275  
               workerInfo, balanceValue, ownerStatsMap));
 276  0
     }
 277  
 
 278  
     // A simple heuristic for balancing the partitions across the workers
 279  
     // using a value (edges, vertices).  An improvement would be to
 280  
     // take into account the already existing partition worker assignments.
 281  
     // 1.  Sort the partitions by size
 282  
     // 2.  Place the workers in a min heap sorted by their total balance
 283  
     //     value.
 284  
     // 3.  From largest partition to the smallest, take the partition
 285  
     //     worker at the top of the heap, add the partition to it, and
 286  
     //     then put it back in the heap
 287  0
     List<PartitionOwner> partitionOwnerList =
 288  
         new ArrayList<PartitionOwner>(partitionOwners);
 289  0
     Collections.sort(partitionOwnerList,
 290  0
         Collections.reverseOrder(
 291  
             new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
 292  0
     PriorityQueue<WorkerInfoAssignments> minQueue =
 293  
         new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
 294  0
     for (PartitionOwner partitionOwner : partitionOwnerList) {
 295  0
       WorkerInfoAssignments chosenWorker = minQueue.remove();
 296  0
       chosenWorker.assignPartitionOwner(partitionOwner);
 297  0
       minQueue.add(chosenWorker);
 298  0
     }
 299  
 
 300  0
     return partitionOwnerList;
 301  
   }
 302  
 
 303  
   /**
 304  
    * Helper function to update partition owners and determine which
 305  
    * partitions need to be sent from a specific worker.
 306  
    *
 307  
    * @param partitionOwnerList Local {@link PartitionOwner} list for the
 308  
    *                           given worker
 309  
    * @param myWorkerInfo Worker info
 310  
    * @param masterSetPartitionOwners Master set partition owners, received
 311  
    *        prior to beginning the superstep
 312  
    * @return Information for the partition exchange.
 313  
    */
 314  
   public static PartitionExchange updatePartitionOwners(
 315  
       List<PartitionOwner> partitionOwnerList,
 316  
       WorkerInfo myWorkerInfo,
 317  
       Collection<? extends PartitionOwner> masterSetPartitionOwners) {
 318  0
     partitionOwnerList.clear();
 319  0
     partitionOwnerList.addAll(masterSetPartitionOwners);
 320  
 
 321  0
     Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
 322  0
     Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
 323  
         new HashMap<WorkerInfo, List<Integer>>();
 324  0
     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
 325  0
       if (partitionOwner.getPreviousWorkerInfo() == null) {
 326  0
         continue;
 327  0
       } else if (partitionOwner.getWorkerInfo().equals(
 328  
           myWorkerInfo) &&
 329  0
           partitionOwner.getPreviousWorkerInfo().equals(
 330  
               myWorkerInfo)) {
 331  0
         throw new IllegalStateException(
 332  
             "updatePartitionOwners: Impossible to have the same " +
 333  
                 "previous and current worker info " + partitionOwner +
 334  
                 " as me " + myWorkerInfo);
 335  0
       } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
 336  0
         dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
 337  0
       } else if (partitionOwner.getPreviousWorkerInfo().equals(
 338  
           myWorkerInfo)) {
 339  0
         if (workerPartitionOwnerMap.containsKey(
 340  0
             partitionOwner.getWorkerInfo())) {
 341  0
           workerPartitionOwnerMap.get(
 342  0
               partitionOwner.getWorkerInfo()).add(
 343  0
               partitionOwner.getPartitionId());
 344  
         } else {
 345  0
           List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
 346  0
           tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
 347  0
           workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
 348  
               tmpPartitionOwnerList);
 349  
         }
 350  
       }
 351  0
     }
 352  
 
 353  0
     return new PartitionExchange(dependentWorkerSet,
 354  
         workerPartitionOwnerMap);
 355  
   }
 356  
 }
 357