1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty;
20
21 import org.apache.giraph.metrics.GiraphMetrics;
22 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
23 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
24 import org.apache.log4j.Logger;
25
26 import io.netty.buffer.ByteBuf;
27 import io.netty.channel.ChannelHandlerContext;
28 import io.netty.channel.ChannelInboundHandlerAdapter;
29 import io.netty.channel.ChannelHandler.Sharable;
30
31
32
33
34
35
36 @Sharable
37 public class InboundByteCounter extends ChannelInboundHandlerAdapter implements
38 ByteCounter, ResetSuperstepMetricsObserver {
39
40 private static final Logger LOG =
41 Logger.getLogger(InboundByteCounter.class);
42
43 private final ByteCounterDelegate delegate = new ByteCounterDelegate(true);
44
45
46 public InboundByteCounter() {
47
48 GiraphMetrics.get().addSuperstepResetObserver(this);
49 }
50
51 public long getBytesReceived() {
52 return delegate.getBytesProcessed();
53 }
54
55
56
57
58
59 public long getBytesReceivedPerSuperstep() {
60 return delegate.getBytesProcessedPerSuperstep();
61 }
62
63
64
65
66 public void resetBytesReceivedPerSuperstep() {
67 delegate.resetBytesProcessedPerSuperstep();
68 }
69
70
71
72
73 public double getMbytesPerSecReceived() {
74 return delegate.getMbytesPerSecProcessed();
75 }
76
77 @Override
78 public void channelRead(ChannelHandlerContext ctx, Object msg)
79 throws Exception {
80 if (msg instanceof ByteBuf) {
81 ByteBuf buf = (ByteBuf) msg;
82 int receivedBytes = delegate.byteBookkeeper(buf);
83 if (LOG.isDebugEnabled()) {
84 LOG.debug("channelRead: " + ctx.channel().toString() + " buffer " +
85 "size = " + receivedBytes + ", total bytes = " +
86 getBytesReceived());
87 }
88 }
89 ctx.fireChannelRead(msg);
90 }
91
92 @Override
93 public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
94 delegate.newSuperstep(superstepMetrics);
95 }
96
97 @Override
98 public void resetAll() {
99 delegate.resetAll();
100 }
101
102 @Override
103 public String getMetrics() {
104 return delegate.getMetrics();
105 }
106
107 @Override
108 public String getMetricsWindow(int minMsecsWindow) {
109 return delegate.getMetricsWindow(minMsecsWindow);
110 }
111 }