1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.comm.netty.handler; |
20 | |
|
21 | |
import io.netty.buffer.ByteBufOutputStream; |
22 | |
|
23 | |
|
24 | |
import org.apache.giraph.comm.requests.RequestType; |
25 | |
|
26 | |
import org.apache.giraph.comm.requests.WritableRequest; |
27 | |
import org.apache.log4j.Logger; |
28 | |
import io.netty.buffer.ByteBuf; |
29 | |
import io.netty.channel.ChannelPromise; |
30 | |
import io.netty.channel.ChannelHandlerContext; |
31 | |
import io.netty.channel.ChannelOutboundHandlerAdapter; |
32 | |
|
33 | |
import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT; |
34 | |
|
35 | |
|
36 | |
|
37 | |
|
38 | |
|
39 | |
|
40 | 0 | public class ResponseEncoder extends ChannelOutboundHandlerAdapter { |
41 | |
|
42 | 0 | private static final Logger LOG = Logger.getLogger(ResponseEncoder.class); |
43 | |
|
44 | |
@Override |
45 | |
public void write(ChannelHandlerContext ctx, Object msg, |
46 | |
ChannelPromise promise) throws Exception { |
47 | 0 | if (LOG.isDebugEnabled()) { |
48 | 0 | LOG.debug("write(" + ctx + "," + msg); |
49 | |
} |
50 | |
|
51 | 0 | if (!(msg instanceof WritableRequest)) { |
52 | 0 | throw new IllegalArgumentException( |
53 | 0 | "encode: cannot encode message of type " + msg.getClass() + |
54 | |
" since it is not an instance of an implementation of " + |
55 | |
" WritableRequest."); |
56 | |
} |
57 | |
@SuppressWarnings("unchecked") |
58 | 0 | WritableRequest writableRequest = (WritableRequest) msg; |
59 | |
|
60 | 0 | ByteBuf buf = ctx.alloc().buffer(10); |
61 | 0 | ByteBufOutputStream output = new ByteBufOutputStream(buf); |
62 | |
|
63 | 0 | if (LOG.isDebugEnabled()) { |
64 | 0 | LOG.debug("encode: Encoding a message of type " + msg.getClass()); |
65 | |
} |
66 | |
|
67 | |
|
68 | 0 | output.writeInt(0); |
69 | |
|
70 | 0 | output.writeByte(writableRequest.getType().ordinal()); |
71 | |
|
72 | 0 | writableRequest.write(output); |
73 | |
|
74 | 0 | output.flush(); |
75 | 0 | output.close(); |
76 | |
|
77 | |
|
78 | 0 | buf.setInt(0, buf.writerIndex() - SIZE_OF_INT); |
79 | |
|
80 | 0 | if (LOG.isDebugEnabled()) { |
81 | 0 | LOG.debug("encode: Encoding a message of type " + msg.getClass()); |
82 | |
} |
83 | 0 | ctx.write(buf, promise); |
84 | |
|
85 | |
|
86 | 0 | if (writableRequest.getType() == RequestType.SASL_COMPLETE_REQUEST) { |
87 | |
|
88 | |
|
89 | |
|
90 | |
|
91 | 0 | if (LOG.isDebugEnabled()) { |
92 | 0 | LOG.debug("encode: Removing RequestEncoder handler: no longer needed," + |
93 | 0 | " since client: " + ctx.channel().remoteAddress() + " has " + |
94 | |
"completed authenticating."); |
95 | |
} |
96 | 0 | ctx.pipeline().remove(this); |
97 | |
} |
98 | |
|
99 | 0 | ctx.write(buf, promise); |
100 | 0 | } |
101 | |
} |
102 | |
|