1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty.handler;
20
21 import io.netty.buffer.ByteBufOutputStream;
22 import org.apache.giraph.comm.requests.WritableRequest;
23 import org.apache.giraph.conf.GiraphConfiguration;
24 import org.apache.giraph.conf.GiraphConstants;
25 import org.apache.giraph.time.SystemTime;
26 import org.apache.giraph.time.Time;
27 import org.apache.giraph.time.Times;
28 import org.apache.log4j.Logger;
29
30 import io.netty.buffer.ByteBuf;
31 import io.netty.channel.ChannelHandlerContext;
32 import io.netty.channel.ChannelOutboundHandlerAdapter;
33 import io.netty.channel.ChannelPromise;
34
35 import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
36 import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
37
38
39
40
41 public class RequestEncoder extends ChannelOutboundHandlerAdapter {
42
43 private static final Logger LOG = Logger.getLogger(RequestEncoder.class);
44
45 private static final Time TIME = SystemTime.get();
46
47 private final int bufferStartingSize;
48
49 private long startEncodingNanoseconds = -1;
50
51
52
53
54
55
56 public RequestEncoder(GiraphConfiguration conf) {
57 bufferStartingSize =
58 GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE.get(conf);
59 }
60
61 @Override
62 public void write(ChannelHandlerContext ctx, Object msg,
63 ChannelPromise promise) throws Exception {
64 if (!(msg instanceof WritableRequest)) {
65 throw new IllegalArgumentException(
66 "encode: Got a message of type " + msg.getClass());
67 }
68
69
70 if (LOG.isDebugEnabled()) {
71 startEncodingNanoseconds = TIME.getNanoseconds();
72 }
73
74 ByteBuf buf;
75 WritableRequest request = (WritableRequest) msg;
76 int requestSize = request.getSerializedSize();
77 if (requestSize == WritableRequest.UNKNOWN_SIZE) {
78 buf = ctx.alloc().buffer(bufferStartingSize);
79 } else {
80 requestSize += SIZE_OF_INT + SIZE_OF_BYTE;
81 buf = ctx.alloc().buffer(requestSize);
82 }
83 ByteBufOutputStream output = new ByteBufOutputStream(buf);
84
85
86 output.writeInt(0);
87 output.writeByte(request.getType().ordinal());
88 try {
89 request.write(output);
90 } catch (IndexOutOfBoundsException e) {
91 LOG.error("write: Most likely the size of request was not properly " +
92 "specified (this buffer is too small) - see getSerializedSize() " +
93 "in " + request.getType().getRequestClass());
94 throw new IllegalStateException(e);
95 }
96 output.flush();
97 output.close();
98
99
100 buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
101 if (LOG.isDebugEnabled()) {
102 LOG.debug("write: Client " + request.getClientId() + ", " +
103 "requestId " + request.getRequestId() +
104 ", size = " + buf.readableBytes() + ", " +
105 request.getType() + " took " +
106 Times.getNanosSince(TIME, startEncodingNanoseconds) + " ns");
107 }
108 ctx.write(buf, promise);
109 }
110 }