Coverage Report - org.apache.giraph.job.CombinedWorkerProgress
 
Classes in this File Line Coverage Branch Coverage Complexity
CombinedWorkerProgress
0%
0/110
0%
0/50
3.667
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.job;
 20  
 
 21  
 import com.google.common.collect.Iterables;
 22  
 import org.apache.giraph.conf.FloatConfOption;
 23  
 import org.apache.giraph.conf.GiraphConstants;
 24  
 import org.apache.giraph.master.MasterProgress;
 25  
 import org.apache.giraph.worker.WorkerProgress;
 26  
 import org.apache.giraph.worker.WorkerProgressStats;
 27  
 import org.apache.hadoop.conf.Configuration;
 28  
 
 29  
 import javax.annotation.concurrent.NotThreadSafe;
 30  
 import java.text.DecimalFormat;
 31  
 
 32  
 /**
 33  
  * Class which combines multiple workers' progresses to get overall
 34  
  * application progress
 35  
  */
 36  
 @NotThreadSafe
 37  
 public class CombinedWorkerProgress extends WorkerProgressStats {
 38  
   /** Decimal format which rounds numbers to two decimal places */
 39  0
   public static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#.##");
 40  
   /**
 41  
    * If free memory fraction on some worker goes below this value,
 42  
    * warning will be printed
 43  
    */
 44  0
   public static final FloatConfOption NORMAL_FREE_MEMORY_FRACTION =
 45  
       new FloatConfOption("giraph.normalFreeMemoryFraction", 0.1f,
 46  
           "If free memory fraction on some worker goes below this value, " +
 47  
               "warning will be printed");
 48  
   /**
 49  
    * If free memory fraction on some worker goes below this value,
 50  
    * warning will be printed
 51  
    */
 52  
   private double normalFreeMemoryFraction;
 53  
   /** Total number of supersteps */
 54  
   private final int superstepCount;
 55  
   /**
 56  
    * How many workers have reported that they are in highest reported
 57  
    * superstep
 58  
    */
 59  0
   private int workersInSuperstep = 0;
 60  
   /**
 61  
    * How many workers reported that they finished application
 62  
    */
 63  0
   private int workersDone = 0;
 64  
   /** Minimum amount of free memory on a worker */
 65  0
   private double minFreeMemoryMB = Double.MAX_VALUE;
 66  
   /** Name of the worker with min free memory */
 67  
   private int workerWithMinFreeMemory;
 68  
   /** Minimum fraction of free memory on a worker */
 69  0
   private double minFreeMemoryFraction = Double.MAX_VALUE;
 70  
   /**
 71  
    * Minimum percentage of graph in memory in any worker so far in the
 72  
    * computation
 73  
    */
 74  0
   private int minGraphPercentageInMemory = 100;
 75  
   /** Id of the worker with min percentage of graph in memory */
 76  0
   private int workerWithMinGraphPercentageInMemory = -1;
 77  
   /** Master progress */
 78  
   private MasterProgress masterProgress;
 79  
 
 80  
   /**
 81  
    * Constructor
 82  
    *
 83  
    * @param workerProgresses Worker progresses to combine
 84  
    * @param masterProgress Master progress
 85  
    * @param conf Configuration
 86  
    */
 87  
   public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses,
 88  0
       MasterProgress masterProgress, Configuration conf) {
 89  0
     this.masterProgress = masterProgress;
 90  0
     normalFreeMemoryFraction = NORMAL_FREE_MEMORY_FRACTION.get(conf);
 91  0
     superstepCount = GiraphConstants.SUPERSTEP_COUNT.get(conf);
 92  0
     for (WorkerProgress workerProgress : workerProgresses) {
 93  0
       if (workerProgress.getCurrentSuperstep() > currentSuperstep) {
 94  0
         verticesToCompute = 0;
 95  0
         verticesComputed = 0;
 96  0
         partitionsToCompute = 0;
 97  0
         partitionsComputed = 0;
 98  0
         currentSuperstep = workerProgress.getCurrentSuperstep();
 99  0
         workersInSuperstep = 0;
 100  
       }
 101  
 
 102  0
       if (workerProgress.getCurrentSuperstep() == currentSuperstep) {
 103  0
         workersInSuperstep++;
 104  0
         if (isInputSuperstep()) {
 105  0
           verticesLoaded += workerProgress.getVerticesLoaded();
 106  0
           vertexInputSplitsLoaded +=
 107  0
               workerProgress.getVertexInputSplitsLoaded();
 108  0
           edgesLoaded += workerProgress.getEdgesLoaded();
 109  0
           edgeInputSplitsLoaded += workerProgress.getEdgeInputSplitsLoaded();
 110  0
         } else if (isComputeSuperstep()) {
 111  0
           verticesToCompute += workerProgress.getVerticesToCompute();
 112  0
           verticesComputed += workerProgress.getVerticesComputed();
 113  0
           partitionsToCompute += workerProgress.getPartitionsToCompute();
 114  0
           partitionsComputed += workerProgress.getPartitionsComputed();
 115  0
         } else if (isOutputSuperstep()) {
 116  0
           verticesToStore += workerProgress.getVerticesToStore();
 117  0
           verticesStored += workerProgress.getVerticesStored();
 118  0
           partitionsToStore += workerProgress.getPartitionsToStore();
 119  0
           partitionsStored += workerProgress.getPartitionsStored();
 120  
         }
 121  
       }
 122  
 
 123  0
       if (workerProgress.isStoringDone()) {
 124  0
         workersDone++;
 125  
       }
 126  
 
 127  0
       if (workerProgress.getFreeMemoryMB() < minFreeMemoryMB) {
 128  0
         minFreeMemoryMB = workerProgress.getFreeMemoryMB();
 129  0
         workerWithMinFreeMemory = workerProgress.getTaskId();
 130  
       }
 131  0
       minFreeMemoryFraction = Math.min(minFreeMemoryFraction,
 132  0
           workerProgress.getFreeMemoryFraction());
 133  0
       freeMemoryMB += workerProgress.getFreeMemoryMB();
 134  0
       int percentage = workerProgress.getLowestGraphPercentageInMemory();
 135  0
       if (percentage < minGraphPercentageInMemory) {
 136  0
         minGraphPercentageInMemory = percentage;
 137  0
         workerWithMinGraphPercentageInMemory = workerProgress.getTaskId();
 138  
       }
 139  0
     }
 140  0
     if (!Iterables.isEmpty(workerProgresses)) {
 141  0
       freeMemoryMB /= Iterables.size(workerProgresses);
 142  
     }
 143  0
   }
 144  
 
 145  
   /**
 146  
    * Get Current superstep
 147  
    * @return Current superstep
 148  
    */
 149  
   public long getCurrentSuperstep() {
 150  0
     return currentSuperstep;
 151  
   }
 152  
 
 153  
   /**
 154  
    * Get workers in superstep
 155  
    * @return Workers in superstep.
 156  
    */
 157  
   public long getWorkersInSuperstep() {
 158  0
     return workersInSuperstep;
 159  
   }
 160  
 
 161  
   /**
 162  
    * Get vertices computed
 163  
    * @return Vertices computed
 164  
    */
 165  
   public long getVerticesComputed() {
 166  0
     return verticesComputed;
 167  
   }
 168  
 
 169  
   /**
 170  
    * Get vertices to compute
 171  
    * @return Vertices to compute
 172  
    */
 173  
   public long getVerticesToCompute() {
 174  0
     return verticesToCompute;
 175  
   }
 176  
 
 177  
   /**
 178  
    * Is the application done
 179  
    *
 180  
    * @param expectedWorkersDone Number of workers which should be done in
 181  
    *                            order for application to be done
 182  
    * @return True if application is done
 183  
    */
 184  
   public boolean isDone(int expectedWorkersDone) {
 185  0
     return workersDone == expectedWorkersDone;
 186  
   }
 187  
 
 188  
   /**
 189  
    * Get string describing total job progress
 190  
    *
 191  
    * @return String describing total job progress
 192  
    */
 193  
   protected String getProgressString() {
 194  0
     StringBuilder sb = new StringBuilder();
 195  0
     if (isInputSuperstep()) {
 196  0
       sb.append("Loading data: ");
 197  0
       if (!masterProgress.vertexInputSplitsSet() ||
 198  0
           masterProgress.getVertexInputSplitCount() > 0) {
 199  0
         sb.append(verticesLoaded).append(" vertices loaded, ");
 200  0
         sb.append(vertexInputSplitsLoaded).append(
 201  
             " vertex input splits loaded");
 202  0
         if (masterProgress.getVertexInputSplitCount() > 0) {
 203  0
           sb.append(" (out of ").append(
 204  0
               masterProgress.getVertexInputSplitCount()).append(")");
 205  
         }
 206  0
         sb.append("; ");
 207  
       }
 208  0
       if (!masterProgress.edgeInputSplitsSet() ||
 209  0
           masterProgress.getEdgeInputSplitsCount() > 0) {
 210  0
         sb.append(edgesLoaded).append(" edges loaded, ");
 211  0
         sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
 212  0
         if (masterProgress.getEdgeInputSplitsCount() > 0) {
 213  0
           sb.append(" (out of ").append(
 214  0
               masterProgress.getEdgeInputSplitsCount()).append(")");
 215  
         }
 216  
       }
 217  0
     } else if (isComputeSuperstep()) {
 218  0
       sb.append("Compute superstep ").append(currentSuperstep);
 219  0
       if (superstepCount > 0) {
 220  
         // Supersteps are 0..superstepCount-1 so subtract 1 here
 221  0
         sb.append(" (out of ").append(superstepCount - 1).append(")");
 222  
       }
 223  0
       sb.append(": ").append(verticesComputed).append(" out of ").append(
 224  0
           verticesToCompute).append(" vertices computed; ");
 225  0
       sb.append(partitionsComputed).append(" out of ").append(
 226  0
           partitionsToCompute).append(" partitions computed");
 227  0
     } else if (isOutputSuperstep()) {
 228  0
       sb.append("Storing data: ");
 229  0
       sb.append(verticesStored).append(" out of ").append(
 230  0
           verticesToStore).append(" vertices stored; ");
 231  0
       sb.append(partitionsStored).append(" out of ").append(
 232  0
           partitionsToStore).append(" partitions stored");
 233  
     }
 234  0
     return sb.toString();
 235  
   }
 236  
 
 237  
   @Override
 238  
   public String toString() {
 239  0
     StringBuilder sb = new StringBuilder();
 240  0
     sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
 241  0
     sb.append(getProgressString());
 242  0
     sb.append("; min free memory on worker ").append(
 243  0
         workerWithMinFreeMemory).append(" - ").append(
 244  0
         DECIMAL_FORMAT.format(minFreeMemoryMB)).append("MB, average ").append(
 245  0
         DECIMAL_FORMAT.format(freeMemoryMB)).append("MB");
 246  0
     if (minFreeMemoryFraction < normalFreeMemoryFraction) {
 247  0
       sb.append(", ******* YOUR JOB IS RUNNING LOW ON MEMORY *******");
 248  
     }
 249  0
     if (minGraphPercentageInMemory < 100) {
 250  0
       sb.append(" Spilling ")
 251  0
           .append(100 - minGraphPercentageInMemory)
 252  0
           .append("% of data to external storage on worker ")
 253  0
           .append(workerWithMinGraphPercentageInMemory);
 254  
     }
 255  0
     return sb.toString();
 256  
   }
 257  
 
 258  
   /**
 259  
    * Check if this instance made progress from another instance
 260  
    *
 261  
    * @param lastProgress Instance to compare with
 262  
    * @return True iff progress was made
 263  
    */
 264  
   public boolean madeProgressFrom(CombinedWorkerProgress lastProgress) {
 265  
     // If progress strings are different there was progress made
 266  0
     if (!getProgressString().equals(lastProgress.getProgressString())) {
 267  0
       return true;
 268  
     }
 269  
     // If more workers were done there was progress made
 270  0
     return workersDone != lastProgress.workersDone;
 271  
   }
 272  
 }