Coverage Report - org.apache.giraph.comm.netty.ByteCounterDelegate
 
Classes in this File Line Coverage Branch Coverage Complexity
ByteCounterDelegate
0%
0/62
0%
0/10
1.538
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.netty;
 20  
 
 21  
 import com.yammer.metrics.core.Histogram;
 22  
 import com.yammer.metrics.core.Meter;
 23  
 import com.yammer.metrics.core.NoOpHistogram;
 24  
 import com.yammer.metrics.core.NoOpMeter;
 25  
 import io.netty.buffer.ByteBuf;
 26  
 import org.apache.giraph.metrics.MeterDesc;
 27  
 import org.apache.giraph.metrics.MetricNames;
 28  
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 29  
 import org.apache.giraph.time.SystemTime;
 30  
 import org.apache.giraph.time.Time;
 31  
 
 32  
 import java.text.DecimalFormat;
 33  
 import java.util.concurrent.atomic.AtomicLong;
 34  
 
 35  
 /**
 36  
  * Delegate Object to help keep track of the bytes processed and provide some
 37  
  * metrics when desired as part of the Netty Channel stack.
 38  
  */
 39  
 public class ByteCounterDelegate implements ByteCounter {
 40  
   /** Megabyte in bytes */
 41  
   public static final double MEGABYTE = 1024f * 1024f;
 42  
   /** Helper to format the doubles */
 43  0
   private static final DecimalFormat DOUBLE_FORMAT =
 44  
       new DecimalFormat("#######.####");
 45  
   /** Class timer */
 46  0
   private static final Time TIME = SystemTime.get();
 47  
   /** Bytes processed during the most recent time interval */
 48  0
   private final AtomicLong bytesProcessed = new AtomicLong();
 49  
   /** Aggregate bytes per superstep */
 50  0
   private final AtomicLong bytesProcessedPerSuperstep = new AtomicLong();
 51  
   /** Total processed requests */
 52  0
   private final AtomicLong processedRequests = new AtomicLong();
 53  
   /** Start time (for bandwidth calculation) */
 54  0
   private final AtomicLong startMsecs = new AtomicLong();
 55  
   /** Last updated msecs for getMetricsWindow */
 56  0
   private final AtomicLong metricsWindowLastUpdatedMsecs = new AtomicLong();
 57  
 
 58  
   // Metrics
 59  
   /** Meter of requests sent */
 60  0
   private Meter processedRequestsMeter = NoOpMeter.INSTANCE;
 61  
   /** Histogram of bytes sent */
 62  0
   private Histogram processedBytesHist = NoOpHistogram.INSTANCE;
 63  
 
 64  
   /** Is it delegate for InBoundByteCounter */
 65  
   private final boolean isInbound;
 66  
 
 67  
   /**
 68  
    * Constructor to specify if delegate is created by InBound/ Outbound counter
 69  
    *
 70  
    * @param isInBound switch to specify if instantiated by inbound counter
 71  
    */
 72  0
   public ByteCounterDelegate(boolean isInBound) {
 73  0
     this.isInbound = isInBound;
 74  0
   }
 75  
 
 76  
   /**
 77  
    * Called by Inbound/ Outbound counters to refresh meters on a new superstep
 78  
    *
 79  
    * @param superstepMetrics superstepmetrics registry
 80  
    */
 81  
   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
 82  0
     if (isInbound) {
 83  0
       processedRequestsMeter = superstepMetrics.getMeter(
 84  
           MeterDesc.RECEIVED_REQUESTS);
 85  0
       processedBytesHist = superstepMetrics.getUniformHistogram(
 86  
           MetricNames.RECEIVED_BYTES);
 87  
     } else {
 88  0
       processedRequestsMeter = superstepMetrics.getMeter(
 89  
           MeterDesc.SENT_REQUESTS);
 90  0
       processedBytesHist = superstepMetrics.getUniformHistogram(
 91  
           MetricNames.SENT_BYTES);
 92  
     }
 93  0
   }
 94  
 
 95  
   /**
 96  
    * Updates properties based on bytes sent / received
 97  
    *
 98  
    * @param buf ByteBuf received by the counter
 99  
    * @return number of readable bytes
 100  
    */
 101  
   public int byteBookkeeper(ByteBuf buf) {
 102  0
     int processedBytes = buf.readableBytes();
 103  0
     bytesProcessed.addAndGet(processedBytes);
 104  0
     bytesProcessedPerSuperstep.addAndGet(processedBytes);
 105  0
     processedBytesHist.update(processedBytes);
 106  0
     processedRequests.incrementAndGet();
 107  0
     processedRequestsMeter.mark();
 108  0
     return processedBytes;
 109  
   }
 110  
 
 111  
   /**
 112  
    * Reset all the bytes kept track of.
 113  
    */
 114  
   public void resetBytes() {
 115  0
     bytesProcessed.set(0);
 116  0
     processedRequests.set(0);
 117  0
   }
 118  
 
 119  
   /**
 120  
    * Reset the start msecs.
 121  
    */
 122  
   public void resetStartMsecs() {
 123  0
     startMsecs.set(TIME.getMilliseconds());
 124  0
   }
 125  
 
 126  
   @Override
 127  
   public void resetAll() {
 128  0
     resetBytes();
 129  0
     resetStartMsecs();
 130  0
   }
 131  
 
 132  
   /**
 133  
    * Returns bytes processed per superstep.
 134  
    * @return Number of bytes.
 135  
    */
 136  
   public long getBytesProcessedPerSuperstep() {
 137  0
     return bytesProcessedPerSuperstep.get();
 138  
   }
 139  
 
 140  
   /**
 141  
    * Set bytes processed per superstep to 0.
 142  
    */
 143  
   public void resetBytesProcessedPerSuperstep() {
 144  0
     bytesProcessedPerSuperstep.set(0);
 145  0
   }
 146  
 
 147  
   public long getBytesProcessed() {
 148  0
     return bytesProcessed.get();
 149  
   }
 150  
 
 151  
   /**
 152  
    * @return Mbytes processed / sec in the current interval
 153  
    */
 154  
   public double getMbytesPerSecProcessed() {
 155  0
     return bytesProcessed.get() * 1000f /
 156  0
         (1 + TIME.getMilliseconds() - startMsecs.get()) / MEGABYTE;
 157  
   }
 158  
 
 159  
   /**
 160  
    * Helper method used by getMetrics to create its return string
 161  
    * @param mBytesProcessed mbytes processed
 162  
    * @param mBytesProcessedPerReq mbytes processed per request
 163  
    * @return A string containing all the metrics
 164  
    */
 165  
   public String getMetricsHelper(double mBytesProcessed,
 166  
                                  double mBytesProcessedPerReq) {
 167  0
     if (isInbound) {
 168  0
       return "MBytes/sec received = " +
 169  0
           DOUBLE_FORMAT.format(getMbytesPerSecProcessed()) +
 170  0
           ", MBytesReceived = " + DOUBLE_FORMAT.format(mBytesProcessed) +
 171  
           ", ave received req MBytes = " +
 172  0
           DOUBLE_FORMAT.format(mBytesProcessedPerReq) +
 173  
           ", secs waited = " +
 174  0
           ((TIME.getMilliseconds() - startMsecs.get()) / 1000f);
 175  
     } else {
 176  0
       return "MBytes/sec sent = " +
 177  0
           DOUBLE_FORMAT.format(getMbytesPerSecProcessed()) +
 178  0
           ", MBytesSent = " + DOUBLE_FORMAT.format(mBytesProcessed) +
 179  
           ", ave sent req MBytes = " +
 180  0
           DOUBLE_FORMAT.format(mBytesProcessedPerReq) +
 181  
           ", secs waited = " +
 182  0
           ((TIME.getMilliseconds() - startMsecs.get()) / 1000f);
 183  
     }
 184  
   }
 185  
 
 186  
   @Override
 187  
   public String getMetrics() {
 188  0
     double mBytesProcessed = bytesProcessed.get() / MEGABYTE;
 189  0
     long curProcessedRequests = processedRequests.get();
 190  0
     double mBytesProcessedPerReq = (curProcessedRequests == 0) ? 0 :
 191  
         mBytesProcessed / curProcessedRequests;
 192  
 
 193  0
     return getMetricsHelper(mBytesProcessed, mBytesProcessedPerReq);
 194  
   }
 195  
 
 196  
   @Override
 197  
   public String getMetricsWindow(int minMsecsWindow) {
 198  0
     long lastUpdatedMsecs =  metricsWindowLastUpdatedMsecs.get();
 199  0
     long curMsecs = TIME.getMilliseconds();
 200  0
     if (curMsecs - lastUpdatedMsecs > minMsecsWindow) {
 201  
       // Make sure that only one thread does this update
 202  0
       if (metricsWindowLastUpdatedMsecs.compareAndSet(lastUpdatedMsecs,
 203  
           curMsecs)) {
 204  0
         String metrics = getMetrics();
 205  0
         resetAll();
 206  0
         return metrics;
 207  
       }
 208  
     }
 209  0
     return null;
 210  
   }
 211  
 }