1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty.handler;
20
21 import io.netty.buffer.ByteBufOutputStream;
22
23
24 import org.apache.giraph.comm.requests.RequestType;
25
26 import org.apache.giraph.comm.requests.WritableRequest;
27 import org.apache.log4j.Logger;
28 import io.netty.buffer.ByteBuf;
29 import io.netty.channel.ChannelPromise;
30 import io.netty.channel.ChannelHandlerContext;
31 import io.netty.channel.ChannelOutboundHandlerAdapter;
32
33 import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
34
35
36
37
38
39
40 public class ResponseEncoder extends ChannelOutboundHandlerAdapter {
41
42 private static final Logger LOG = Logger.getLogger(ResponseEncoder.class);
43
44 @Override
45 public void write(ChannelHandlerContext ctx, Object msg,
46 ChannelPromise promise) throws Exception {
47 if (LOG.isDebugEnabled()) {
48 LOG.debug("write(" + ctx + "," + msg);
49 }
50
51 if (!(msg instanceof WritableRequest)) {
52 throw new IllegalArgumentException(
53 "encode: cannot encode message of type " + msg.getClass() +
54 " since it is not an instance of an implementation of " +
55 " WritableRequest.");
56 }
57 @SuppressWarnings("unchecked")
58 WritableRequest writableRequest = (WritableRequest) msg;
59
60 ByteBuf buf = ctx.alloc().buffer(10);
61 ByteBufOutputStream output = new ByteBufOutputStream(buf);
62
63 if (LOG.isDebugEnabled()) {
64 LOG.debug("encode: Encoding a message of type " + msg.getClass());
65 }
66
67
68 output.writeInt(0);
69
70 output.writeByte(writableRequest.getType().ordinal());
71
72 writableRequest.write(output);
73
74 output.flush();
75 output.close();
76
77
78 buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
79
80 if (LOG.isDebugEnabled()) {
81 LOG.debug("encode: Encoding a message of type " + msg.getClass());
82 }
83 ctx.write(buf, promise);
84
85
86 if (writableRequest.getType() == RequestType.SASL_COMPLETE_REQUEST) {
87
88
89
90
91 if (LOG.isDebugEnabled()) {
92 LOG.debug("encode: Removing RequestEncoder handler: no longer needed," +
93 " since client: " + ctx.channel().remoteAddress() + " has " +
94 "completed authenticating.");
95 }
96 ctx.pipeline().remove(this);
97 }
98
99 ctx.write(buf, promise);
100 }
101 }
102