Coverage Report - org.apache.giraph.partition.PartitionUtils
 
Classes in this File Line Coverage Branch Coverage Complexity
PartitionUtils
0%
0/65
0%
0/16
3
PartitionUtils$1
N/A
N/A
3
PartitionUtils$EdgeCountComparator
0%
0/3
N/A
3
PartitionUtils$VertexCountComparator
0%
0/3
N/A
3
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.partition;
 20  
 
 21  
 import org.apache.giraph.conf.GiraphConstants;
 22  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 23  
 import org.apache.giraph.graph.VertexEdgeCount;
 24  
 import org.apache.giraph.worker.WorkerInfo;
 25  
 import org.apache.log4j.Logger;
 26  
 
 27  
 import com.google.common.collect.Lists;
 28  
 import com.google.common.collect.Maps;
 29  
 
 30  
 import java.io.Serializable;
 31  
 import java.util.Collection;
 32  
 import java.util.Collections;
 33  
 import java.util.Comparator;
 34  
 import java.util.HashMap;
 35  
 import java.util.List;
 36  
 import java.util.Map;
 37  
 import java.util.Map.Entry;
 38  
 
 39  
 import static org.apache.giraph.conf.GiraphConstants
 40  
     .MIN_PARTITIONS_PER_COMPUTE_THREAD;
 41  
 import static org.apache.giraph.conf.GiraphConstants.NUM_COMPUTE_THREADS;
 42  
 import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
 43  
 
 44  
 /**
 45  
  * Helper class for {@link Partition} related operations.
 46  
  */
 47  
 public class PartitionUtils {
 48  
   /** Class logger */
 49  0
   private static Logger LOG = Logger.getLogger(PartitionUtils.class);
 50  
 
 51  
   /**
 52  
    * Do not construct this object.
 53  
    */
 54  0
   private PartitionUtils() { }
 55  
 
 56  
   /**
 57  
    * Compare edge counts for Entry<WorkerInfo, VertexEdgeCount> objects.
 58  
    */
 59  0
   private static class EdgeCountComparator implements
 60  
       Comparator<Entry<WorkerInfo, VertexEdgeCount>>, Serializable {
 61  
     /** Serialization version. */
 62  
     private static final long serialVersionUID = 1L;
 63  
 
 64  
     @Override
 65  
     public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
 66  
         Entry<WorkerInfo, VertexEdgeCount> worker2) {
 67  0
       return Long.compare(worker1.getValue().getEdgeCount(),
 68  0
         worker2.getValue().getEdgeCount());
 69  
     }
 70  
   }
 71  
 
 72  
   /**
 73  
    * Compare vertex counts between a {@link WorkerInfo} and
 74  
    * {@link VertexEdgeCount}.
 75  
    */
 76  0
   private static class VertexCountComparator implements
 77  
       Comparator<Entry<WorkerInfo, VertexEdgeCount>>, Serializable {
 78  
     /** Serialization version. */
 79  
     private static final long serialVersionUID = 1L;
 80  
 
 81  
     @Override
 82  
     public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
 83  
         Entry<WorkerInfo, VertexEdgeCount> worker2) {
 84  0
       return Long.compare(worker1.getValue().getVertexCount(),
 85  0
         worker2.getValue().getVertexCount());
 86  
     }
 87  
   }
 88  
 
 89  
   /**
 90  
    * Check for imbalances on a per worker basis, by calculating the
 91  
    * mean, high and low workers by edges and vertices.
 92  
    *
 93  
    * @param partitionOwnerList List of partition owners.
 94  
    * @param allPartitionStats All the partition stats.
 95  
    */
 96  
   public static void analyzePartitionStats(
 97  
       Collection<PartitionOwner> partitionOwnerList,
 98  
       List<PartitionStats> allPartitionStats) {
 99  0
     Map<Integer, PartitionOwner> idOwnerMap =
 100  
         new HashMap<Integer, PartitionOwner>();
 101  0
     for (PartitionOwner partitionOwner : partitionOwnerList) {
 102  0
       if (idOwnerMap.put(partitionOwner.getPartitionId(),
 103  
           partitionOwner) != null) {
 104  0
         throw new IllegalStateException(
 105  
             "analyzePartitionStats: Duplicate partition " +
 106  
                 partitionOwner);
 107  
       }
 108  0
     }
 109  
 
 110  0
     Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
 111  0
     VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
 112  0
     for (PartitionStats partitionStats : allPartitionStats) {
 113  0
       WorkerInfo workerInfo =
 114  0
           idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
 115  0
       VertexEdgeCount vertexEdgeCount =
 116  0
           workerStatsMap.get(workerInfo);
 117  0
       if (vertexEdgeCount == null) {
 118  0
         workerStatsMap.put(
 119  
             workerInfo,
 120  0
             new VertexEdgeCount(partitionStats.getVertexCount(),
 121  0
                 partitionStats.getEdgeCount(), 0));
 122  
       } else {
 123  0
         workerStatsMap.put(
 124  
             workerInfo,
 125  0
             vertexEdgeCount.incrVertexEdgeCount(
 126  0
                 partitionStats.getVertexCount(),
 127  0
                 partitionStats.getEdgeCount()));
 128  
       }
 129  0
       totalVertexEdgeCount =
 130  0
           totalVertexEdgeCount.incrVertexEdgeCount(
 131  0
               partitionStats.getVertexCount(),
 132  0
               partitionStats.getEdgeCount());
 133  0
     }
 134  
 
 135  0
     List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
 136  0
         Lists.newArrayList(workerStatsMap.entrySet());
 137  
 
 138  0
     if (LOG.isInfoEnabled()) {
 139  0
       Collections.sort(workerEntryList, new VertexCountComparator());
 140  0
       LOG.info("analyzePartitionStats: Vertices - Mean: " +
 141  0
           (totalVertexEdgeCount.getVertexCount() /
 142  0
               workerStatsMap.size()) +
 143  
               ", Min: " +
 144  0
               workerEntryList.get(0).getKey() + " - " +
 145  0
               workerEntryList.get(0).getValue().getVertexCount() +
 146  
               ", Max: " +
 147  0
               workerEntryList.get(workerEntryList.size() - 1).getKey() +
 148  
               " - " +
 149  0
               workerEntryList.get(workerEntryList.size() - 1).
 150  0
               getValue().getVertexCount());
 151  0
       Collections.sort(workerEntryList, new EdgeCountComparator());
 152  0
       LOG.info("analyzePartitionStats: Edges - Mean: " +
 153  0
           (totalVertexEdgeCount.getEdgeCount() /
 154  0
               workerStatsMap.size()) +
 155  
               ", Min: " +
 156  0
               workerEntryList.get(0).getKey() + " - " +
 157  0
               workerEntryList.get(0).getValue().getEdgeCount() +
 158  
               ", Max: " +
 159  0
               workerEntryList.get(workerEntryList.size() - 1).getKey() +
 160  
               " - " +
 161  0
               workerEntryList.get(workerEntryList.size() - 1).
 162  0
               getValue().getEdgeCount());
 163  
     }
 164  0
   }
 165  
 
 166  
   /**
 167  
    * Compute the number of partitions, based on the configuration.
 168  
    *
 169  
    * If USER_PARTITION_COUNT is set, it will follow that, otherwise it will
 170  
    * choose the max of what MIN_PARTITIONS_PER_COMPUTE_THREAD and
 171  
    * PARTITION_COUNT_MULTIPLIER settings would choose, capped by max
 172  
    * partitions limited constrained by zookeeper.
 173  
    *
 174  
    * @param availableWorkerCount Number of available workers
 175  
    * @param conf Configuration.
 176  
    * @return Number of partitions for the job.
 177  
    */
 178  
   public static int computePartitionCount(int availableWorkerCount,
 179  
       ImmutableClassesGiraphConfiguration conf) {
 180  0
     if (availableWorkerCount == 0) {
 181  0
       throw new IllegalArgumentException(
 182  
           "computePartitionCount: No available workers");
 183  
     }
 184  
 
 185  0
     int userPartitionCount = USER_PARTITION_COUNT.get(conf);
 186  
     int partitionCount;
 187  0
     if (userPartitionCount == USER_PARTITION_COUNT.getDefaultValue()) {
 188  0
       float multiplier = GiraphConstants.PARTITION_COUNT_MULTIPLIER.get(conf);
 189  0
       partitionCount = Math.max(
 190  
           (int) (multiplier * availableWorkerCount * availableWorkerCount), 1);
 191  0
       int minPartitionsPerComputeThread =
 192  0
           MIN_PARTITIONS_PER_COMPUTE_THREAD.get(conf);
 193  0
       int totalComputeThreads =
 194  0
           NUM_COMPUTE_THREADS.get(conf) * availableWorkerCount;
 195  0
       partitionCount = Math.max(partitionCount,
 196  
           minPartitionsPerComputeThread * totalComputeThreads);
 197  0
     } else {
 198  0
       partitionCount = userPartitionCount;
 199  
     }
 200  0
     if (LOG.isInfoEnabled()) {
 201  0
       LOG.info("computePartitionCount: Creating " +
 202  
           partitionCount + " partitions.");
 203  
     }
 204  0
     return partitionCount;
 205  
   }
 206  
 }