1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty;
20
21 import org.apache.giraph.metrics.GiraphMetrics;
22 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
23 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
24 import org.apache.log4j.Logger;
25
26 import io.netty.buffer.ByteBuf;
27 import io.netty.channel.ChannelHandlerContext;
28 import io.netty.channel.ChannelOutboundHandlerAdapter;
29 import io.netty.channel.ChannelPromise;
30 import io.netty.channel.ChannelHandler.Sharable;
31
32
33
34
35
36 @Sharable
37 public class OutboundByteCounter extends ChannelOutboundHandlerAdapter
38 implements ByteCounter, ResetSuperstepMetricsObserver {
39
40 private static final Logger LOG =
41 Logger.getLogger(OutboundByteCounter.class);
42
43 private final ByteCounterDelegate delegate = new ByteCounterDelegate(false);
44
45
46 public OutboundByteCounter() {
47
48 GiraphMetrics.get().addSuperstepResetObserver(this);
49 }
50
51 public long getBytesSent() {
52 return delegate.getBytesProcessed();
53 }
54
55
56
57
58 public double getMbytesPerSecSent() {
59 return delegate.getMbytesPerSecProcessed();
60 }
61
62 @Override
63 public void write(ChannelHandlerContext ctx, Object msg,
64 ChannelPromise promise) throws Exception {
65 if (msg instanceof ByteBuf) {
66 ByteBuf buf = (ByteBuf) msg;
67 int sentBytes = delegate.byteBookkeeper(buf);
68 if (LOG.isDebugEnabled()) {
69 LOG.debug("write: " + ctx.channel().toString() +
70 " buffer size = " + sentBytes + ", total bytes = " + getBytesSent()
71 );
72 }
73 }
74 ctx.writeAndFlush(msg, promise);
75 }
76
77 @Override
78 public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
79 delegate.newSuperstep(superstepMetrics);
80 }
81
82 @Override
83 public void resetAll() {
84 delegate.resetAll();
85 }
86
87 @Override
88 public String getMetrics() {
89 return delegate.getMetrics();
90 }
91
92 @Override
93 public String getMetricsWindow(int minMsecsWindow) {
94 return delegate.getMetricsWindow(minMsecsWindow);
95 }
96 }
97