Coverage Report - org.apache.giraph.metrics.WorkerSuperstepMetrics
 
Classes in this File Line Coverage Branch Coverage Complexity
WorkerSuperstepMetrics
0%
0/74
0%
0/6
1.286
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.metrics;
 20  
 
 21  
 import org.apache.giraph.graph.GraphTaskManager;
 22  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 23  
 import org.apache.giraph.ooc.OutOfCoreIOCallable;
 24  
 import org.apache.giraph.worker.BspServiceWorker;
 25  
 import org.apache.hadoop.io.Writable;
 26  
 
 27  
 import com.yammer.metrics.core.Gauge;
 28  
 
 29  
 import java.io.DataInput;
 30  
 import java.io.DataOutput;
 31  
 import java.io.IOException;
 32  
 import java.io.PrintStream;
 33  
 import java.util.concurrent.TimeUnit;
 34  
 
 35  
 /**
 36  
  * Per-superstep metrics for a Worker.
 37  
  */
 38  
 public class WorkerSuperstepMetrics implements Writable {
 39  
   /** Total network communication time */
 40  
   private LongAndTimeUnit commTimer;
 41  
   /** Time for all compute calls to complete */
 42  
   private LongAndTimeUnit computeAllTimer;
 43  
   /** Time till first message gets flushed */
 44  
   private LongAndTimeUnit timeToFirstMsg;
 45  
   /** Total superstep time */
 46  
   private LongAndTimeUnit superstepTimer;
 47  
   /** Time spent waiting for other workers to finish */
 48  
   private LongAndTimeUnit waitRequestsTimer;
 49  
   /** Time spent doing GC in a superstep */
 50  
   private LongAndTimeUnit superstepGCTimer;
 51  
   /** Number of bytes loaded from disk to memory in out-of-core mechanism */
 52  
   private long bytesLoadedFromDisk;
 53  
   /** Number of bytes stored from memory to disk in out-of-core mechanism */
 54  
   private long bytesStoredOnDisk;
 55  
   /** Percentage of graph kept in memory */
 56  
   private double graphPercentageInMemory;
 57  
 
 58  
   /**
 59  
    * Constructor
 60  
    */
 61  0
   public WorkerSuperstepMetrics() {
 62  0
     commTimer = new LongAndTimeUnit();
 63  0
     computeAllTimer = new LongAndTimeUnit();
 64  0
     timeToFirstMsg = new LongAndTimeUnit();
 65  0
     superstepTimer = new LongAndTimeUnit();
 66  0
     waitRequestsTimer = new LongAndTimeUnit();
 67  0
     superstepGCTimer = new LongAndTimeUnit();
 68  0
     superstepGCTimer.setTimeUnit(TimeUnit.MILLISECONDS);
 69  0
     bytesLoadedFromDisk = 0;
 70  0
     bytesStoredOnDisk = 0;
 71  0
     graphPercentageInMemory = 100;
 72  0
   }
 73  
 
 74  
   /**
 75  
    * Read metric values from global MetricsRegistry.
 76  
    *
 77  
    * @return this object, for chaining
 78  
    */
 79  
   public WorkerSuperstepMetrics readFromRegistry() {
 80  0
     readGiraphTimer(GraphTaskManager.TIMER_COMMUNICATION_TIME, commTimer);
 81  0
     readGiraphTimer(GraphTaskManager.TIMER_COMPUTE_ALL, computeAllTimer);
 82  0
     readGiraphTimer(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG, timeToFirstMsg);
 83  0
     readGiraphTimer(GraphTaskManager.TIMER_SUPERSTEP_TIME, superstepTimer);
 84  0
     readGiraphTimer(BspServiceWorker.TIMER_WAIT_REQUESTS, waitRequestsTimer);
 85  0
     SuperstepMetricsRegistry registry = GiraphMetrics.get().perSuperstep();
 86  0
     superstepGCTimer.setValue(
 87  0
         registry.getCounter(GraphTaskManager.TIMER_SUPERSTEP_GC_TIME).count());
 88  0
     bytesLoadedFromDisk =
 89  0
         registry.getCounter(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK).count();
 90  0
     bytesStoredOnDisk =
 91  0
         registry.getCounter(OutOfCoreIOCallable.BYTES_STORE_TO_DISK).count();
 92  0
     Gauge<Double> gauge =
 93  0
         registry.getExistingGauge(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
 94  0
     if (gauge != null) {
 95  0
       graphPercentageInMemory = gauge.value();
 96  
     }
 97  0
     return this;
 98  
   }
 99  
 
 100  
   /**
 101  
    * Read data from GiraphTimer into a LongAndTimeUnit.
 102  
    *
 103  
    * @param name String name of Gauge to retrieve.
 104  
    * @param data LongAndTimeUnit to read data into.
 105  
    */
 106  
   private void readGiraphTimer(String name, LongAndTimeUnit data) {
 107  0
     Gauge<Long> gauge = GiraphMetrics.get().perSuperstep().
 108  0
         getExistingGauge(name);
 109  0
     if (gauge instanceof GiraphTimer) {
 110  0
       GiraphTimer giraphTimer = (GiraphTimer) gauge;
 111  0
       data.setTimeUnit(giraphTimer.getTimeUnit());
 112  0
       data.setValue(giraphTimer.value());
 113  0
     } else if (gauge != null) {
 114  0
       throw new IllegalStateException(name + " is not a GiraphTimer");
 115  
     }
 116  0
   }
 117  
 
 118  
   /**
 119  
    * Human readable dump of metrics stored here.
 120  
    *
 121  
    * @param superstep long number of superstep.
 122  
    * @param out PrintStream to write to.
 123  
    * @return this object, for chaining
 124  
    */
 125  
   public WorkerSuperstepMetrics print(long superstep, PrintStream out) {
 126  0
     out.println();
 127  0
     out.println("--- METRICS: superstep " + superstep + " ---");
 128  0
     out.println("  superstep time: " + superstepTimer);
 129  0
     out.println("  compute all partitions: " + computeAllTimer);
 130  0
     out.println("  time spent in gc: " + superstepGCTimer);
 131  0
     out.println("  bytes transferred in out-of-core: " +
 132  
         (bytesLoadedFromDisk + bytesStoredOnDisk));
 133  0
     out.println("  network communication time: " + commTimer);
 134  0
     out.println("  time to first message: " + timeToFirstMsg);
 135  0
     out.println("  wait on requests time: " + waitRequestsTimer);
 136  0
     return this;
 137  
   }
 138  
 
 139  
   /**
 140  
    * @return Communication timer
 141  
    */
 142  
   public long getCommTimer() {
 143  0
     return commTimer.getValue();
 144  
   }
 145  
 
 146  
   /**
 147  
    * @return Total compute timer
 148  
    */
 149  
   public long getComputeAllTimer() {
 150  0
     return computeAllTimer.getValue();
 151  
   }
 152  
 
 153  
   /**
 154  
    * @return timer between start time and first message flushed.
 155  
    */
 156  
   public long getTimeToFirstMsg() {
 157  0
     return timeToFirstMsg.getValue();
 158  
   }
 159  
 
 160  
   /**
 161  
    * @return timer for superstep time
 162  
    */
 163  
   public long getSuperstepTimer() {
 164  0
     return superstepTimer.getValue();
 165  
   }
 166  
 
 167  
   /**
 168  
    * @return timer waiting for other workers
 169  
    */
 170  
   public long getWaitRequestsTimer() {
 171  0
     return waitRequestsTimer.getValue();
 172  
   }
 173  
 
 174  
   /**
 175  
    * @return number of bytes loaded from disk by out-of-core mechanism (if any
 176  
    *         is used)
 177  
    */
 178  
   public long getBytesLoadedFromDisk() {
 179  0
     return bytesLoadedFromDisk;
 180  
   }
 181  
 
 182  
   /**
 183  
    * @return number of bytes stored on disk by out-of-core mechanism (if any is
 184  
    *         used)
 185  
    */
 186  
   public long getBytesStoredOnDisk() {
 187  0
     return bytesStoredOnDisk;
 188  
   }
 189  
 
 190  
   /**
 191  
    * @return a rough estimate of percentage of graph in memory
 192  
    */
 193  
   public double getGraphPercentageInMemory() {
 194  0
     return graphPercentageInMemory;
 195  
   }
 196  
 
 197  
   @Override
 198  
   public void readFields(DataInput dataInput) throws IOException {
 199  0
     commTimer.setValue(dataInput.readLong());
 200  0
     computeAllTimer.setValue(dataInput.readLong());
 201  0
     timeToFirstMsg.setValue(dataInput.readLong());
 202  0
     superstepTimer.setValue(dataInput.readLong());
 203  0
     waitRequestsTimer.setValue(dataInput.readLong());
 204  0
     bytesLoadedFromDisk = dataInput.readLong();
 205  0
     bytesStoredOnDisk = dataInput.readLong();
 206  0
     graphPercentageInMemory = dataInput.readDouble();
 207  0
   }
 208  
 
 209  
   @Override
 210  
   public void write(DataOutput dataOutput) throws IOException {
 211  0
     dataOutput.writeLong(commTimer.getValue());
 212  0
     dataOutput.writeLong(computeAllTimer.getValue());
 213  0
     dataOutput.writeLong(timeToFirstMsg.getValue());
 214  0
     dataOutput.writeLong(superstepTimer.getValue());
 215  0
     dataOutput.writeLong(waitRequestsTimer.getValue());
 216  0
     dataOutput.writeLong(bytesLoadedFromDisk);
 217  0
     dataOutput.writeLong(bytesStoredOnDisk);
 218  0
     dataOutput.writeDouble(graphPercentageInMemory);
 219  0
   }
 220  
 }