Coverage Report - org.apache.giraph.metrics.AggregatedMetrics
 
Classes in this File Line Coverage Branch Coverage Complexity
AggregatedMetrics
0%
0/68
0%
0/6
1.375
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.metrics;
 20  
 
 21  
 import org.apache.giraph.graph.GraphTaskManager;
 22  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 23  
 import org.apache.giraph.ooc.OutOfCoreIOCallable;
 24  
 import org.apache.giraph.worker.BspServiceWorker;
 25  
 
 26  
 import com.google.common.collect.Maps;
 27  
 
 28  
 import java.io.PrintStream;
 29  
 import java.util.Map;
 30  
 
 31  
 /**
 32  
  * Map of a bunch of aggregated metrics
 33  
  */
 34  0
 public class AggregatedMetrics {
 35  
   /** Mapping from name to aggregated metric */
 36  0
   private Map<String, AggregatedMetric<?>> metrics = Maps.newHashMap();
 37  
 
 38  
   /**
 39  
    * Add value from hostname for a metric.
 40  
    *
 41  
    * @param name String name of metric
 42  
    * @param value long value to track
 43  
    * @param hostnamePartitionId String host it came from
 44  
    * @return this
 45  
    */
 46  
   public AggregatedMetrics add(String name, long value,
 47  
                                String hostnamePartitionId) {
 48  0
     AggregatedMetricLong aggregatedMetric =
 49  0
         (AggregatedMetricLong) metrics.get(name);
 50  0
     if (aggregatedMetric == null) {
 51  0
       aggregatedMetric = new AggregatedMetricLong();
 52  0
       metrics.put(name, aggregatedMetric);
 53  
     }
 54  0
     aggregatedMetric.addItem(value, hostnamePartitionId);
 55  0
     return this;
 56  
   }
 57  
 
 58  
   /**
 59  
    * Add value from hostname for a metric.
 60  
    *
 61  
    * @param name String name of metric
 62  
    * @param value double value to track
 63  
    * @param hostnamePartitionId String host it came from
 64  
    * @return this
 65  
    */
 66  
   public AggregatedMetrics add(String name, double value,
 67  
                                String hostnamePartitionId) {
 68  0
     AggregatedMetricDouble aggregatedMetric =
 69  0
         (AggregatedMetricDouble) metrics.get(name);
 70  0
     if (aggregatedMetric == null) {
 71  0
       aggregatedMetric = new AggregatedMetricDouble();
 72  0
       metrics.put(name, aggregatedMetric);
 73  
     }
 74  0
     aggregatedMetric.addItem(value, hostnamePartitionId);
 75  0
     return this;
 76  
   }
 77  
 
 78  
   /**
 79  
    * Add metrics from worker.
 80  
    *
 81  
    * @param workerMetrics WorkerSuperstepMetrics from work
 82  
    * @param hostname String hostname of worker
 83  
    * @return this
 84  
    */
 85  
   public AggregatedMetrics add(WorkerSuperstepMetrics workerMetrics,
 86  
                                String hostname) {
 87  0
     add(GraphTaskManager.TIMER_SUPERSTEP_TIME,
 88  0
         workerMetrics.getSuperstepTimer(), hostname);
 89  0
     add(GraphTaskManager.TIMER_COMMUNICATION_TIME,
 90  0
         workerMetrics.getCommTimer(), hostname);
 91  0
     add(GraphTaskManager.TIMER_COMPUTE_ALL,
 92  0
         workerMetrics.getComputeAllTimer(), hostname);
 93  0
     add(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG,
 94  0
         workerMetrics.getTimeToFirstMsg(), hostname);
 95  0
     add(BspServiceWorker.TIMER_WAIT_REQUESTS,
 96  0
         workerMetrics.getWaitRequestsTimer(), hostname);
 97  0
     add(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK,
 98  0
         workerMetrics.getBytesLoadedFromDisk(), hostname);
 99  0
     add(OutOfCoreIOCallable.BYTES_STORE_TO_DISK,
 100  0
         workerMetrics.getBytesStoredOnDisk(), hostname);
 101  0
     add(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY,
 102  0
         workerMetrics.getGraphPercentageInMemory(), hostname);
 103  0
     return this;
 104  
   }
 105  
 
 106  
   /**
 107  
    * Print the aggregated metrics to the stream provided.
 108  
    *
 109  
    * @param superstep long number of superstep.
 110  
    * @param out PrintStream to write to.
 111  
    * @return this
 112  
    */
 113  
   public AggregatedMetrics print(long superstep, PrintStream out) {
 114  0
     AggregatedMetric superstepTime = get(GraphTaskManager.TIMER_SUPERSTEP_TIME);
 115  0
     AggregatedMetric commTime = get(GraphTaskManager.TIMER_COMMUNICATION_TIME);
 116  0
     AggregatedMetric computeAll = get(GraphTaskManager.TIMER_COMPUTE_ALL);
 117  0
     AggregatedMetric timeToFirstMsg =
 118  0
         get(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG);
 119  0
     AggregatedMetric waitRequestsMicros = get(
 120  
         BspServiceWorker.TIMER_WAIT_REQUESTS);
 121  0
     AggregatedMetric bytesLoaded =
 122  0
         get(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK);
 123  0
     AggregatedMetric bytesStored =
 124  0
         get(OutOfCoreIOCallable.BYTES_STORE_TO_DISK);
 125  0
     AggregatedMetric graphInMem =
 126  0
         get(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
 127  
 
 128  0
     out.println();
 129  0
     out.println("--- METRICS: superstep " + superstep + " ---");
 130  0
     printAggregatedMetric(out, "superstep time", "ms", superstepTime);
 131  0
     printAggregatedMetric(out, "compute all partitions", "ms", computeAll);
 132  0
     printAggregatedMetric(out, "network communication time", "ms", commTime);
 133  0
     printAggregatedMetric(out, "time to first message", "us", timeToFirstMsg);
 134  0
     printAggregatedMetric(out, "wait requests time", "us", waitRequestsMicros);
 135  0
     printAggregatedMetric(out, "bytes loaded from disk", "bytes", bytesLoaded);
 136  0
     printAggregatedMetric(out, "bytes stored to disk", "bytes", bytesStored);
 137  0
     printAggregatedMetric(out, "graph in mem", "%", graphInMem);
 138  
 
 139  0
     return this;
 140  
   }
 141  
 
 142  
   /**
 143  
    * Print batch of lines for AggregatedMetric
 144  
    *
 145  
    * @param out PrintStream to write to
 146  
    * @param header String header to print.
 147  
    * @param unit String unit of metric
 148  
    * @param aggregatedMetric AggregatedMetric to write
 149  
    */
 150  
   private void printAggregatedMetric(PrintStream out, String header,
 151  
                                      String unit,
 152  
                                      AggregatedMetric aggregatedMetric) {
 153  0
     if (aggregatedMetric.hasData()) {
 154  0
       out.println(header);
 155  0
       out.println("  mean: " + aggregatedMetric.mean() + " " + unit);
 156  0
       printValueFromHost(out, "  smallest: ", unit, aggregatedMetric.min());
 157  0
       printValueFromHost(out, "  largest: ", unit, aggregatedMetric.max());
 158  
     } else {
 159  0
       out.println(header + ": NO DATA");
 160  
     }
 161  0
   }
 162  
 
 163  
   /**
 164  
    * Print a line for a value with the host it came from.
 165  
    *
 166  
    * @param out PrintStream to write to
 167  
    * @param prefix String to write at beginning
 168  
    * @param unit String unit of metric
 169  
    * @param vh ValueWithHostname to write
 170  
    */
 171  
   private void printValueFromHost(PrintStream out, String prefix,
 172  
                                   String unit, ValueWithHostname vh) {
 173  0
     out.println(prefix + vh.getValue() + ' ' + unit +
 174  0
         " from " + vh.getHostname());
 175  0
   }
 176  
 
 177  
   /**
 178  
    * Get AggregatedMetric with given name.
 179  
    *
 180  
    * @param name String metric to lookup.
 181  
    * @return AggregatedMetric for given metric name.
 182  
    */
 183  
   public AggregatedMetric get(String name) {
 184  0
     return metrics.get(name);
 185  
   }
 186  
 
 187  
   /**
 188  
    * Get map of all aggregated metrics.
 189  
    *
 190  
    * @return Map of all the aggregated metrics.
 191  
    */
 192  
   public Map<String, AggregatedMetric<?>> getAll() {
 193  0
     return metrics;
 194  
   }
 195  
 }