1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty.handler;
20
21 import org.apache.giraph.comm.netty.NettyServer;
22 import org.apache.giraph.comm.netty.SaslNettyServer;
23 import org.apache.giraph.comm.requests.RequestType;
24 import org.apache.giraph.comm.requests.SaslCompleteRequest;
25 import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
26 import org.apache.giraph.comm.requests.WritableRequest;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.mapred.JobConf;
29 import org.apache.hadoop.mapreduce.security.TokenCache;
30 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
31 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
32 import org.apache.hadoop.security.Credentials;
33 import org.apache.hadoop.security.UserGroupInformation;
34 import org.apache.hadoop.security.token.Token;
35 import org.apache.hadoop.security.token.TokenIdentifier;
36 import org.apache.hadoop.util.ReflectionUtils;
37 import org.apache.log4j.Logger;
38 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
39
40 import io.netty.channel.ChannelHandlerContext;
41 import io.netty.channel.ChannelInboundHandlerAdapter;
42
43 import java.io.ByteArrayInputStream;
44 import java.io.DataInputStream;
45 import java.io.IOException;
46 import java.util.Collection;
47
48 import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
49
50
51
52
53
54 public class SaslServerHandler extends
55 ChannelInboundHandlerAdapter {
56
57 private static final Logger LOG =
58 Logger.getLogger(SaslServerHandler.class);
59
60
61
62
63 private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
64
65
66 private final boolean closeFirstRequest;
67
68 private JobTokenSecretManager secretManager;
69
70
71
72
73
74
75 public SaslServerHandler(
76 Configuration conf) throws IOException {
77 SaslNettyServer.init(conf);
78 setupSecretManager(conf);
79 closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
80 }
81
82 @Override
83 public void channelRead(ChannelHandlerContext ctx, Object msg)
84 throws Exception {
85
86 if (LOG.isDebugEnabled()) {
87 LOG.debug("messageReceived: Got " + msg.getClass());
88 }
89
90 WritableRequest writableRequest = (WritableRequest) msg;
91
92
93 if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
94 LOG.info("messageReceived: Simulating closing channel on first " +
95 "request " + writableRequest.getRequestId() + " from " +
96 writableRequest.getClientId());
97 setAlreadyClosedFirstRequest();
98 ctx.close();
99 return;
100 }
101
102 if (writableRequest.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
103
104
105
106 SaslNettyServer saslNettyServer =
107 ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).get();
108 if (saslNettyServer == null) {
109 if (LOG.isDebugEnabled()) {
110 LOG.debug("No saslNettyServer for " + ctx.channel() +
111 " yet; creating now, with secret manager: " + secretManager);
112 }
113 try {
114 saslNettyServer = new SaslNettyServer(secretManager,
115 AuthMethod.SIMPLE);
116 } catch (IOException ioe) {
117 throw new RuntimeException(ioe);
118 }
119 ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).set(saslNettyServer);
120 } else {
121 if (LOG.isDebugEnabled()) {
122 LOG.debug("Found existing saslNettyServer on server:" +
123 ctx.channel().localAddress() + " for client " +
124 ctx.channel().remoteAddress());
125 }
126 }
127
128 ((SaslTokenMessageRequest) writableRequest).processToken(saslNettyServer);
129
130 ctx.write(writableRequest);
131 if (saslNettyServer.isComplete()) {
132
133
134 if (LOG.isDebugEnabled()) {
135 LOG.debug("SASL authentication is complete for client with " +
136 "username: " + saslNettyServer.getUserName());
137 }
138 SaslCompleteRequest saslComplete = new SaslCompleteRequest();
139 ctx.write(saslComplete);
140 if (LOG.isDebugEnabled()) {
141 LOG.debug("Removing SaslServerHandler from pipeline since SASL " +
142 "authentication is complete.");
143 }
144 ctx.pipeline().remove(this);
145 }
146 ctx.flush();
147
148
149 return;
150 } else {
151
152
153
154
155
156 LOG.warn("Sending upstream an unexpected non-SASL message : " +
157 writableRequest);
158 ctx.fireChannelRead(msg);
159 }
160 }
161
162
163
164
165 private static void setAlreadyClosedFirstRequest() {
166 ALREADY_CLOSED_FIRST_REQUEST = true;
167 }
168
169
170
171
172
173
174
175 private void setupSecretManager(Configuration conf) throws IOException {
176 secretManager = new JobTokenSecretManager();
177 String localJobTokenFile = System.getenv().get(
178 UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
179 if (localJobTokenFile == null) {
180 throw new IOException("Could not find job credentials: environment " +
181 "variable: " + UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION +
182 " was not defined.");
183 }
184 JobConf jobConf = new JobConf(conf);
185
186
187
188 Credentials credentials =
189 TokenCache.loadTokens(localJobTokenFile, jobConf);
190 Collection<Token<? extends TokenIdentifier>> collection =
191 credentials.getAllTokens();
192 for (Token<? extends TokenIdentifier> token: collection) {
193 TokenIdentifier tokenIdentifier = decodeIdentifier(token,
194 JobTokenIdentifier.class);
195 if (tokenIdentifier instanceof JobTokenIdentifier) {
196 Token<JobTokenIdentifier> theToken =
197 (Token<JobTokenIdentifier>) token;
198 JobTokenIdentifier jobTokenIdentifier =
199 (JobTokenIdentifier) tokenIdentifier;
200 secretManager.addTokenForJob(
201 jobTokenIdentifier.getJobId().toString(), theToken);
202 }
203 }
204 if (LOG.isDebugEnabled()) {
205 LOG.debug("loaded JobToken credentials: " + credentials + " from " +
206 "localJobTokenFile: " + localJobTokenFile);
207 }
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221
222 @SuppressWarnings("unchecked")
223 private TokenIdentifier decodeIdentifier(
224 Token<? extends TokenIdentifier> token,
225 Class<? extends TokenIdentifier> cls) throws IOException {
226 TokenIdentifier tokenIdentifier = ReflectionUtils.newInstance(cls, null);
227 ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
228 DataInputStream in = new DataInputStream(buf);
229 tokenIdentifier.readFields(in);
230 in.close();
231 return tokenIdentifier;
232 }
233
234
235 public static class Factory {
236
237
238
239 public Factory() {
240 }
241
242
243
244
245
246
247 public SaslServerHandler newHandler(
248 Configuration conf) throws IOException {
249 return new SaslServerHandler(conf);
250 }
251 }
252 }