Coverage Report - org.apache.giraph.comm.netty.handler.SaslServerHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
SaslServerHandler
0%
0/77
0%
0/28
3.714
SaslServerHandler$Factory
0%
0/3
N/A
3.714
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.netty.handler;
 20  
 
 21  
 import org.apache.giraph.comm.netty.NettyServer;
 22  
 import org.apache.giraph.comm.netty.SaslNettyServer;
 23  
 import org.apache.giraph.comm.requests.RequestType;
 24  
 import org.apache.giraph.comm.requests.SaslCompleteRequest;
 25  
 import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
 26  
 import org.apache.giraph.comm.requests.WritableRequest;
 27  
 import org.apache.hadoop.conf.Configuration;
 28  
 import org.apache.hadoop.mapred.JobConf;
 29  
 import org.apache.hadoop.mapreduce.security.TokenCache;
 30  
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 31  
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 32  
 import org.apache.hadoop.security.Credentials;
 33  
 import org.apache.hadoop.security.UserGroupInformation;
 34  
 import org.apache.hadoop.security.token.Token;
 35  
 import org.apache.hadoop.security.token.TokenIdentifier;
 36  
 import org.apache.hadoop.util.ReflectionUtils;
 37  
 import org.apache.log4j.Logger;
 38  
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 39  
 
 40  
 import io.netty.channel.ChannelHandlerContext;
 41  
 import io.netty.channel.ChannelInboundHandlerAdapter;
 42  
 
 43  
 import java.io.ByteArrayInputStream;
 44  
 import java.io.DataInputStream;
 45  
 import java.io.IOException;
 46  
 import java.util.Collection;
 47  
 
 48  
 import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
 49  
 
 50  
 /**
 51  
  * Generate SASL response tokens to client SASL tokens, allowing clients to
 52  
  * authenticate themselves with this server.
 53  
  */
 54  
 public class SaslServerHandler extends
 55  
     ChannelInboundHandlerAdapter {
 56  
     /** Class logger */
 57  0
   private static final Logger LOG =
 58  0
       Logger.getLogger(SaslServerHandler.class);
 59  
 
 60  
   // TODO: Move out into a separate, dedicated handler: ("FirstRequestHandler")
 61  
   // or similar.
 62  
   /** Already closed first request? */
 63  0
   private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
 64  
 
 65  
   /** Close connection on first request (used for simulating failure) */
 66  
   private final boolean closeFirstRequest;
 67  
   /** Used to store Hadoop Job Tokens to authenticate clients. */
 68  
   private JobTokenSecretManager secretManager;
 69  
 
 70  
   /**
 71  
    * Constructor
 72  
    *
 73  
    * @param conf Configuration
 74  
    */
 75  
   public SaslServerHandler(
 76  0
       Configuration conf) throws IOException {
 77  0
     SaslNettyServer.init(conf);
 78  0
     setupSecretManager(conf);
 79  0
     closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
 80  0
   }
 81  
 
 82  
   @Override
 83  
   public void channelRead(ChannelHandlerContext ctx, Object msg)
 84  
     throws Exception {
 85  
 
 86  0
     if (LOG.isDebugEnabled()) {
 87  0
       LOG.debug("messageReceived: Got " + msg.getClass());
 88  
     }
 89  
 
 90  0
     WritableRequest writableRequest = (WritableRequest) msg;
 91  
     // Simulate a closed connection on the first request (if desired)
 92  
     // TODO: Move out into a separate, dedicated handler.
 93  0
     if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
 94  0
       LOG.info("messageReceived: Simulating closing channel on first " +
 95  0
           "request " + writableRequest.getRequestId() + " from " +
 96  0
           writableRequest.getClientId());
 97  0
       setAlreadyClosedFirstRequest();
 98  0
       ctx.close();
 99  0
       return;
 100  
     }
 101  
 
 102  0
     if (writableRequest.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
 103  
       // initialize server-side SASL functionality, if we haven't yet
 104  
       // (in which case we are looking at the first SASL message from the
 105  
       // client).
 106  0
       SaslNettyServer saslNettyServer =
 107  0
           ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).get();
 108  0
       if (saslNettyServer == null) {
 109  0
         if (LOG.isDebugEnabled()) {
 110  0
           LOG.debug("No saslNettyServer for " + ctx.channel() +
 111  
               " yet; creating now, with secret manager: " + secretManager);
 112  
         }
 113  
         try {
 114  0
           saslNettyServer = new SaslNettyServer(secretManager,
 115  
             AuthMethod.SIMPLE);
 116  0
         } catch (IOException ioe) { //TODO:
 117  0
           throw new RuntimeException(ioe);
 118  0
         }
 119  0
         ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).set(saslNettyServer);
 120  
       } else {
 121  0
         if (LOG.isDebugEnabled()) {
 122  0
           LOG.debug("Found existing saslNettyServer on server:" +
 123  0
               ctx.channel().localAddress() + " for client " +
 124  0
               ctx.channel().remoteAddress());
 125  
         }
 126  
       }
 127  
 
 128  0
       ((SaslTokenMessageRequest) writableRequest).processToken(saslNettyServer);
 129  
       // Send response to client.
 130  0
       ctx.write(writableRequest);
 131  0
       if (saslNettyServer.isComplete()) {
 132  
         // If authentication of client is complete, we will also send a
 133  
         // SASL-Complete message to the client.
 134  0
         if (LOG.isDebugEnabled()) {
 135  0
           LOG.debug("SASL authentication is complete for client with " +
 136  0
               "username: " + saslNettyServer.getUserName());
 137  
         }
 138  0
         SaslCompleteRequest saslComplete = new SaslCompleteRequest();
 139  0
         ctx.write(saslComplete);
 140  0
         if (LOG.isDebugEnabled()) {
 141  0
           LOG.debug("Removing SaslServerHandler from pipeline since SASL " +
 142  
               "authentication is complete.");
 143  
         }
 144  0
         ctx.pipeline().remove(this);
 145  
       }
 146  0
       ctx.flush();
 147  
       // do not send upstream to other handlers: no further action needs to be
 148  
       // done for SASL_TOKEN_MESSAGE_REQUEST requests.
 149  0
       return;
 150  
     } else {
 151  
       // Client should not be sending other-than-SASL messages before
 152  
       // SaslServerHandler has removed itself from the pipeline. Such non-SASL
 153  
       // requests will be denied by the Authorize channel handler (the next
 154  
       // handler upstream in the server pipeline) if SASL authentication has
 155  
       // not completed.
 156  0
       LOG.warn("Sending upstream an unexpected non-SASL message :  " +
 157  
           writableRequest);
 158  0
       ctx.fireChannelRead(msg);
 159  
     }
 160  0
   }
 161  
 
 162  
   /**
 163  
    * Set already closed first request flag
 164  
    */
 165  
   private static void setAlreadyClosedFirstRequest() {
 166  0
     ALREADY_CLOSED_FIRST_REQUEST = true;
 167  0
   }
 168  
 
 169  
   /**
 170  
    * Load Hadoop Job Token into secret manager.
 171  
    *
 172  
    * @param conf Configuration
 173  
    * @throws IOException
 174  
    */
 175  
   private void setupSecretManager(Configuration conf) throws IOException {
 176  0
     secretManager = new JobTokenSecretManager();
 177  0
     String localJobTokenFile = System.getenv().get(
 178  
         UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
 179  0
     if (localJobTokenFile == null) {
 180  0
       throw new IOException("Could not find job credentials: environment " +
 181  
           "variable: " + UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION +
 182  
           " was not defined.");
 183  
     }
 184  0
     JobConf jobConf = new JobConf(conf);
 185  
 
 186  
     // Find the JobTokenIdentifiers among all the tokens available in the
 187  
     // jobTokenFile and store them in the secretManager.
 188  0
     Credentials credentials =
 189  0
         TokenCache.loadTokens(localJobTokenFile, jobConf);
 190  0
     Collection<Token<? extends TokenIdentifier>> collection =
 191  0
         credentials.getAllTokens();
 192  0
     for (Token<? extends TokenIdentifier> token:  collection) {
 193  0
       TokenIdentifier tokenIdentifier = decodeIdentifier(token,
 194  
           JobTokenIdentifier.class);
 195  0
       if (tokenIdentifier instanceof JobTokenIdentifier) {
 196  0
         Token<JobTokenIdentifier> theToken =
 197  
             (Token<JobTokenIdentifier>) token;
 198  0
         JobTokenIdentifier jobTokenIdentifier =
 199  
             (JobTokenIdentifier) tokenIdentifier;
 200  0
         secretManager.addTokenForJob(
 201  0
             jobTokenIdentifier.getJobId().toString(), theToken);
 202  
       }
 203  0
     }
 204  0
     if (LOG.isDebugEnabled()) {
 205  0
       LOG.debug("loaded JobToken credentials: " + credentials + " from " +
 206  
           "localJobTokenFile: " + localJobTokenFile);
 207  
     }
 208  0
   }
 209  
 
 210  
   /**
 211  
    * Get the token identifier object, or null if it could not be constructed
 212  
    * (because the class could not be loaded, for example).
 213  
    * Hadoop 2.0.0 (and older Hadoop2 versions? (verify)) need this.
 214  
    * Hadoop 2.0.1 and newer have a Token.decodeIdentifier() method and do not
 215  
    * need this. Might want to create a munge flag to distinguish 2.0.0 vs newer.
 216  
    *
 217  
    * @param token the token to decode into a TokenIdentifier
 218  
    * @param cls the subclass of TokenIdentifier to decode the token into.
 219  
    * @return the token identifier.
 220  
    * @throws IOException
 221  
    */
 222  
   @SuppressWarnings("unchecked")
 223  
   private TokenIdentifier decodeIdentifier(
 224  
       Token<? extends TokenIdentifier> token,
 225  
       Class<? extends TokenIdentifier> cls) throws IOException {
 226  0
     TokenIdentifier tokenIdentifier = ReflectionUtils.newInstance(cls, null);
 227  0
     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
 228  0
     DataInputStream in = new DataInputStream(buf);
 229  0
     tokenIdentifier.readFields(in);
 230  0
     in.close();
 231  0
     return tokenIdentifier;
 232  
   }
 233  
 
 234  
   /** Factory for {@link SaslServerHandler} */
 235  
   public static class Factory {
 236  
     /**
 237  
      * Constructor
 238  
      */
 239  0
     public Factory() {
 240  0
     }
 241  
     /**
 242  
      * Create new {@link SaslServerHandler}
 243  
      *
 244  
      * @param conf Configuration to use
 245  
      * @return New {@link SaslServerHandler}
 246  
      */
 247  
     public SaslServerHandler newHandler(
 248  
         Configuration conf) throws IOException {
 249  0
       return new SaslServerHandler(conf);
 250  
     }
 251  
   }
 252  
 }