Coverage Report - org.apache.giraph.comm.netty.NettyClient
 
Classes in this File Line Coverage Branch Coverage Complexity
NettyClient
0%
0/422
0%
0/174
0
NettyClient$1
0%
0/58
0%
0/10
0
NettyClient$2
0%
0/3
N/A
0
NettyClient$3
0%
0/9
0%
0/6
0
NettyClient$4
0%
0/4
0%
0/4
0
NettyClient$5
0%
0/2
0%
0/2
0
NettyClient$6
0%
0/4
0%
0/8
0
NettyClient$7
0%
0/5
0%
0/4
0
NettyClient$ChannelFutureAddress
0%
0/7
N/A
0
NettyClient$LogOnErrorChannelFutureListener
0%
0/5
0%
0/4
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.netty;
 20  
 
 21  
 import io.netty.handler.flush.FlushConsolidationHandler;
 22  
 import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
 23  
 import org.apache.giraph.comm.flow_control.FlowControl;
 24  
 import org.apache.giraph.comm.flow_control.NoOpFlowControl;
 25  
 import org.apache.giraph.comm.flow_control.StaticFlowControl;
 26  
 import org.apache.giraph.comm.netty.handler.AckSignalFlag;
 27  
 import org.apache.giraph.comm.netty.handler.TaskRequestIdGenerator;
 28  
 import org.apache.giraph.comm.netty.handler.ClientRequestId;
 29  
 import org.apache.giraph.comm.netty.handler.RequestEncoder;
 30  
 import org.apache.giraph.comm.netty.handler.RequestInfo;
 31  
 import org.apache.giraph.comm.netty.handler.RequestServerHandler;
 32  
 import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
 33  
 /*if_not[HADOOP_NON_SECURE]*/
 34  
 import org.apache.giraph.comm.netty.handler.SaslClientHandler;
 35  
 import org.apache.giraph.comm.requests.RequestType;
 36  
 import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
 37  
 /*end[HADOOP_NON_SECURE]*/
 38  
 import org.apache.giraph.comm.requests.WritableRequest;
 39  
 import org.apache.giraph.conf.BooleanConfOption;
 40  
 import org.apache.giraph.conf.GiraphConstants;
 41  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 42  
 import org.apache.giraph.counters.GiraphHadoopCounter;
 43  
 import org.apache.giraph.function.Predicate;
 44  
 import org.apache.giraph.graph.TaskInfo;
 45  
 import org.apache.giraph.master.MasterInfo;
 46  
 import org.apache.giraph.utils.PipelineUtils;
 47  
 import org.apache.giraph.utils.ProgressableUtils;
 48  
 import org.apache.giraph.utils.ThreadUtils;
 49  
 import org.apache.giraph.utils.TimedLogger;
 50  
 import org.apache.hadoop.mapreduce.Mapper;
 51  
 import org.apache.log4j.Logger;
 52  
 
 53  
 import com.google.common.collect.Lists;
 54  
 import com.google.common.collect.MapMaker;
 55  
 import com.google.common.collect.Maps;
 56  
 
 57  
 /*if_not[HADOOP_NON_SECURE]*/
 58  
 import java.io.IOException;
 59  
 /*end[HADOOP_NON_SECURE]*/
 60  
 import java.net.InetSocketAddress;
 61  
 import java.util.Collection;
 62  
 import java.util.Collections;
 63  
 import java.util.Comparator;
 64  
 import java.util.HashMap;
 65  
 import java.util.HashSet;
 66  
 import java.util.List;
 67  
 import java.util.Map;
 68  
 import java.util.Set;
 69  
 import java.util.concurrent.ConcurrentMap;
 70  
 import java.util.concurrent.atomic.AtomicInteger;
 71  
 import java.util.concurrent.atomic.AtomicLong;
 72  
 
 73  
 import io.netty.bootstrap.Bootstrap;
 74  
 import io.netty.channel.Channel;
 75  
 import io.netty.channel.ChannelFuture;
 76  
 import io.netty.channel.ChannelFutureListener;
 77  
 import io.netty.channel.ChannelHandlerContext;
 78  
 import io.netty.channel.ChannelInitializer;
 79  
 import io.netty.channel.ChannelOption;
 80  
 import io.netty.channel.EventLoopGroup;
 81  
 import io.netty.channel.nio.NioEventLoopGroup;
 82  
 import io.netty.channel.socket.SocketChannel;
 83  
 import io.netty.channel.socket.nio.NioSocketChannel;
 84  
 import io.netty.handler.codec.FixedLengthFrameDecoder;
 85  
 /*if_not[HADOOP_NON_SECURE]*/
 86  
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 87  
 import io.netty.util.AttributeKey;
 88  
 /*end[HADOOP_NON_SECURE]*/
 89  
 import io.netty.util.concurrent.BlockingOperationException;
 90  
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 91  
 import io.netty.util.concurrent.EventExecutorGroup;
 92  
 
 93  
 import static com.google.common.base.Preconditions.checkState;
 94  
 import static org.apache.giraph.conf.GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE;
 95  
 import static org.apache.giraph.conf.GiraphConstants.CLIENT_SEND_BUFFER_SIZE;
 96  
 import static org.apache.giraph.conf.GiraphConstants.MAX_REQUEST_MILLISECONDS;
 97  
 import static org.apache.giraph.conf.GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS;
 98  
 import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER;
 99  
 import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;
 100  
 import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
 101  
 import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
 102  
 import static org.apache.giraph.conf.GiraphConstants.RESEND_TIMED_OUT_REQUESTS;
 103  
 import static org.apache.giraph.conf.GiraphConstants.WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS;
 104  
 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
 105  
 
 106  
 /**
 107  
  * Netty client for sending requests.  Thread-safe.
 108  
  */
 109  0
 public class NettyClient {
 110  
   /** Do we have a limit on number of open requests we can have */
 111  0
   public static final BooleanConfOption LIMIT_NUMBER_OF_OPEN_REQUESTS =
 112  
       new BooleanConfOption("giraph.waitForRequestsConfirmation", false,
 113  
           "Whether to have a limit on number of open requests or not");
 114  
   /**
 115  
    * Do we have a limit on number of open requests we can have for each worker.
 116  
    * Note that if this option is enabled, Netty will not keep more than a
 117  
    * certain number of requests open for each other worker in the job. If there
 118  
    * are more requests generated for a worker, Netty will not actually send the
 119  
    * surplus requests, instead, it caches the requests in a local buffer. The
 120  
    * maximum number of these unsent requests in the cache is another
 121  
    * user-defined parameter (MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER).
 122  
    */
 123  0
   public static final BooleanConfOption LIMIT_OPEN_REQUESTS_PER_WORKER =
 124  
       new BooleanConfOption("giraph.waitForPerWorkerRequests", false,
 125  
           "Whether to have a limit on number of open requests for each worker" +
 126  
               "or not");
 127  
   /** Maximum number of requests to list (for debugging) */
 128  
   public static final int MAX_REQUESTS_TO_LIST = 10;
 129  
   /**
 130  
    * Maximum number of destination task ids with open requests to list
 131  
    * (for debugging)
 132  
    */
 133  
   public static final int MAX_DESTINATION_TASK_IDS_TO_LIST = 10;
 134  
   /** 30 seconds to connect by default */
 135  
   public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
 136  
 /*if_not[HADOOP_NON_SECURE]*/
 137  
   /** Used to authenticate with other workers acting as servers */
 138  0
   public static final AttributeKey<SaslNettyClient> SASL =
 139  0
       AttributeKey.valueOf("saslNettyClient");
 140  
 /*end[HADOOP_NON_SECURE]*/
 141  
 
 142  
   /** Group name for netty counters */
 143  
   public static final String NETTY_COUNTERS_GROUP = "Netty counters";
 144  
   /** How many network requests were resent because they took too long */
 145  
   public static final String NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME =
 146  
       "Network requests resent for timeout";
 147  
   /** How many network requests were resent because channel failed */
 148  
   public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
 149  
       "Network requests resent for channel failure";
 150  
   /** How many network requests were resent because connection failed */
 151  
   public static final String
 152  
       NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME =
 153  
       "Network requests resent for connection or request failure";
 154  
 
 155  
   /** Class logger */
 156  0
   private static final Logger LOG = Logger.getLogger(NettyClient.class);
 157  
   /** Netty related counter names */
 158  0
   private static Map<String, Set<String>> COUNTER_GROUP_AND_NAMES =
 159  
           new HashMap<>();
 160  
   /** Context used to report progress */
 161  
   private final Mapper<?, ?, ?, ?>.Context context;
 162  
   /** Client bootstrap */
 163  
   private final Bootstrap bootstrap;
 164  
   /**
 165  
    * Map of the peer connections, mapping from remote socket address to client
 166  
    * meta data
 167  
    */
 168  0
   private final ConcurrentMap<InetSocketAddress, ChannelRotater>
 169  0
   addressChannelMap = new MapMaker().makeMap();
 170  
   /**
 171  
    * Map from task id to address of its server
 172  
    */
 173  0
   private final Map<Integer, InetSocketAddress> taskIdAddressMap =
 174  0
       new MapMaker().makeMap();
 175  
   /**
 176  
    * Request map of client request ids to request information.
 177  
    */
 178  
   private final ConcurrentMap<ClientRequestId, RequestInfo>
 179  
   clientRequestIdRequestInfoMap;
 180  
   /** Number of channels per server */
 181  
   private final int channelsPerServer;
 182  
   /** Inbound byte counter for this client */
 183  0
   private final InboundByteCounter inboundByteCounter = new
 184  
       InboundByteCounter();
 185  
   /** Outbound byte counter for this client */
 186  0
   private final OutboundByteCounter outboundByteCounter = new
 187  
       OutboundByteCounter();
 188  
   /** Send buffer size */
 189  
   private final int sendBufferSize;
 190  
   /** Receive buffer size */
 191  
   private final int receiveBufferSize;
 192  
   /** Warn if request size is bigger than the buffer size by this factor */
 193  
   private final float requestSizeWarningThreshold;
 194  
   /** Maximum number of connection failures */
 195  
   private final int maxConnectionFailures;
 196  
   /** How long to wait before trying to reconnect failed connections */
 197  
   private final long waitTimeBetweenConnectionRetriesMs;
 198  
   /** Maximum number of milliseconds for a request */
 199  
   private final int maxRequestMilliseconds;
 200  
   /**
 201  
    * Whether to resend request which timed out or fail the job if timeout
 202  
    * happens
 203  
    */
 204  
   private final boolean resendTimedOutRequests;
 205  
   /** Waiting interval for checking outstanding requests msecs */
 206  
   private final int waitingRequestMsecs;
 207  
   /** Timed logger for printing request debugging */
 208  
   private final TimedLogger requestLogger;
 209  
   /** Worker executor group */
 210  
   private final EventLoopGroup workerGroup;
 211  
   /** Task request id generator */
 212  0
   private final TaskRequestIdGenerator taskRequestIdGenerator =
 213  
       new TaskRequestIdGenerator();
 214  
   /** Task info */
 215  
   private final TaskInfo myTaskInfo;
 216  
   /** Maximum thread pool size */
 217  
   private final int maxPoolSize;
 218  
   /** Maximum number of attempts to resolve an address*/
 219  
   private final int maxResolveAddressAttempts;
 220  
   /** Use execution handler? */
 221  
   private final boolean useExecutionGroup;
 222  
   /** EventExecutor Group (if used) */
 223  
   private final EventExecutorGroup executionGroup;
 224  
   /** Name of the handler to use execution group for (if used) */
 225  
   private final String handlerToUseExecutionGroup;
 226  
   /** When was the last time we checked if we should resend some requests */
 227  0
   private final AtomicLong lastTimeCheckedRequestsForProblems =
 228  
       new AtomicLong(0);
 229  
   /**
 230  
    * Logger used to dump stack traces for every exception that happens
 231  
    * in netty client threads.
 232  
    */
 233  0
   private final LogOnErrorChannelFutureListener logErrorListener =
 234  
       new LogOnErrorChannelFutureListener();
 235  
   /** Flow control policy used */
 236  
   private final FlowControl flowControl;
 237  
 
 238  
   /** How many network requests were resent because they took too long */
 239  
   private final GiraphHadoopCounter networkRequestsResentForTimeout;
 240  
   /** How many network requests were resent because channel failed */
 241  
   private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
 242  
   /** How many network requests were resent because connection failed */
 243  
   private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
 244  
 
 245  
   /**
 246  
    * Keeps track of the number of reconnect failures. Once this exceeds the
 247  
    * value of {@link #maxConnectionFailures}, the job will fail.
 248  
    */
 249  0
   private int reconnectFailures = 0;
 250  
 
 251  
   /**
 252  
    * Only constructor
 253  
    *
 254  
    * @param context Context for progress
 255  
    * @param conf Configuration
 256  
    * @param myTaskInfo Current task info
 257  
    * @param exceptionHandler handler for uncaught exception. Will
 258  
    *                         terminate job.
 259  
    */
 260  
   public NettyClient(Mapper<?, ?, ?, ?>.Context context,
 261  
     final ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo,
 262  0
     final Thread.UncaughtExceptionHandler exceptionHandler) {
 263  
 
 264  0
     this.context = context;
 265  0
     this.myTaskInfo = myTaskInfo;
 266  0
     this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
 267  0
     sendBufferSize = CLIENT_SEND_BUFFER_SIZE.get(conf);
 268  0
     receiveBufferSize = CLIENT_RECEIVE_BUFFER_SIZE.get(conf);
 269  0
     this.requestSizeWarningThreshold =
 270  0
         GiraphConstants.REQUEST_SIZE_WARNING_THRESHOLD.get(conf);
 271  
 
 272  0
     boolean limitNumberOfOpenRequests = LIMIT_NUMBER_OF_OPEN_REQUESTS.get(conf);
 273  0
     boolean limitOpenRequestsPerWorker =
 274  0
         LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
 275  0
     checkState(!limitNumberOfOpenRequests || !limitOpenRequestsPerWorker,
 276  
         "NettyClient: it is not allowed to have both limitations on the " +
 277  
             "number of total open requests, and on the number of open " +
 278  
             "requests per worker!");
 279  0
     if (limitNumberOfOpenRequests) {
 280  0
       flowControl = new StaticFlowControl(conf, this);
 281  0
     } else if (limitOpenRequestsPerWorker) {
 282  0
       flowControl = new CreditBasedFlowControl(conf, this, exceptionHandler);
 283  
     } else {
 284  0
       flowControl = new NoOpFlowControl(this);
 285  
     }
 286  
 
 287  0
     initialiseCounters();
 288  0
     networkRequestsResentForTimeout =
 289  0
         new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
 290  
             NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
 291  0
     networkRequestsResentForChannelFailure =
 292  0
         new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
 293  
             NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
 294  0
     networkRequestsResentForConnectionFailure =
 295  0
       new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
 296  
         NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
 297  
 
 298  0
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
 299  0
     resendTimedOutRequests = RESEND_TIMED_OUT_REQUESTS.get(conf);
 300  0
     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
 301  0
     waitTimeBetweenConnectionRetriesMs =
 302  0
         WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf);
 303  0
     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
 304  0
     requestLogger = new TimedLogger(waitingRequestMsecs, LOG);
 305  0
     maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
 306  0
     maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
 307  
 
 308  0
     clientRequestIdRequestInfoMap =
 309  0
         new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
 310  
 
 311  0
     handlerToUseExecutionGroup =
 312  0
         NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf);
 313  0
     useExecutionGroup = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf);
 314  0
     if (useExecutionGroup) {
 315  0
       int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf);
 316  0
       executionGroup = new DefaultEventExecutorGroup(executionThreads,
 317  0
           ThreadUtils.createThreadFactory(
 318  
               "netty-client-exec-%d", exceptionHandler));
 319  0
       if (LOG.isInfoEnabled()) {
 320  0
         LOG.info("NettyClient: Using execution handler with " +
 321  
             executionThreads + " threads after " +
 322  
             handlerToUseExecutionGroup + ".");
 323  
       }
 324  0
     } else {
 325  0
       executionGroup = null;
 326  
     }
 327  
 
 328  0
     workerGroup = new NioEventLoopGroup(maxPoolSize,
 329  0
         ThreadUtils.createThreadFactory(
 330  
             "netty-client-worker-%d", exceptionHandler));
 331  
 
 332  0
     bootstrap = new Bootstrap();
 333  0
     bootstrap.group(workerGroup)
 334  0
         .channel(NioSocketChannel.class)
 335  0
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
 336  0
             MAX_CONNECTION_MILLISECONDS_DEFAULT)
 337  0
         .option(ChannelOption.TCP_NODELAY, true)
 338  0
         .option(ChannelOption.SO_KEEPALIVE, true)
 339  0
         .option(ChannelOption.SO_SNDBUF, sendBufferSize)
 340  0
         .option(ChannelOption.SO_RCVBUF, receiveBufferSize)
 341  0
         .option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
 342  0
         .handler(new ChannelInitializer<SocketChannel>() {
 343  
           @Override
 344  
           protected void initChannel(SocketChannel ch) throws Exception {
 345  
 /*if_not[HADOOP_NON_SECURE]*/
 346  0
             if (conf.authenticate()) {
 347  0
               LOG.info("Using Netty with authentication.");
 348  0
               PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
 349  
                 new FlushConsolidationHandler(FlushConsolidationHandler
 350  
                   .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
 351  0
                 handlerToUseExecutionGroup, executionGroup, ch);
 352  
               // Our pipeline starts with just byteCounter, and then we use
 353  
               // addLast() to incrementally add pipeline elements, so that we
 354  
               // can name them for identification for removal or replacement
 355  
               // after client is authenticated by server.
 356  
               // After authentication is complete, the pipeline's SASL-specific
 357  
               // functionality is removed, restoring the pipeline to exactly the
 358  
               // same configuration as it would be without authentication.
 359  0
               PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
 360  0
                   inboundByteCounter, handlerToUseExecutionGroup,
 361  0
                   executionGroup, ch);
 362  0
               if (conf.doCompression()) {
 363  0
                 PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
 364  0
                     conf.getNettyCompressionDecoder(),
 365  0
                     handlerToUseExecutionGroup, executionGroup, ch);
 366  
               }
 367  0
               PipelineUtils.addLastWithExecutorCheck(
 368  
                   "clientOutboundByteCounter",
 369  0
                   outboundByteCounter, handlerToUseExecutionGroup,
 370  0
                   executionGroup, ch);
 371  0
               if (conf.doCompression()) {
 372  0
                 PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
 373  0
                     conf.getNettyCompressionEncoder(),
 374  0
                     handlerToUseExecutionGroup, executionGroup, ch);
 375  
               }
 376  
               // The following pipeline component is needed to decode the
 377  
               // server's SASL tokens. It is replaced with a
 378  
               // FixedLengthFrameDecoder (same as used with the
 379  
               // non-authenticated pipeline) after authentication
 380  
               // completes (as in non-auth pipeline below).
 381  0
               PipelineUtils.addLastWithExecutorCheck(
 382  
                   "length-field-based-frame-decoder",
 383  
                   new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),
 384  0
                   handlerToUseExecutionGroup, executionGroup, ch);
 385  0
               PipelineUtils.addLastWithExecutorCheck("request-encoder",
 386  0
                   new RequestEncoder(conf), handlerToUseExecutionGroup,
 387  0
                   executionGroup, ch);
 388  
               // The following pipeline component responds to the server's SASL
 389  
               // tokens with its own responses. Both client and server share the
 390  
               // same Hadoop Job token, which is used to create the SASL
 391  
               // tokens to authenticate with each other.
 392  
               // After authentication finishes, this pipeline component
 393  
               // is removed.
 394  0
               PipelineUtils.addLastWithExecutorCheck("sasl-client-handler",
 395  0
                   new SaslClientHandler(conf), handlerToUseExecutionGroup,
 396  0
                   executionGroup, ch);
 397  0
               PipelineUtils.addLastWithExecutorCheck("response-handler",
 398  
                   new ResponseClientHandler(NettyClient.this, conf),
 399  0
                   handlerToUseExecutionGroup, executionGroup, ch);
 400  
             } else {
 401  0
               LOG.info("Using Netty without authentication.");
 402  
 /*end[HADOOP_NON_SECURE]*/
 403  0
               PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
 404  
                 new FlushConsolidationHandler(FlushConsolidationHandler
 405  
                     .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
 406  0
                 handlerToUseExecutionGroup, executionGroup, ch);
 407  0
               PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
 408  0
                   inboundByteCounter, handlerToUseExecutionGroup,
 409  0
                   executionGroup, ch);
 410  0
               if (conf.doCompression()) {
 411  0
                 PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
 412  0
                     conf.getNettyCompressionDecoder(),
 413  0
                     handlerToUseExecutionGroup, executionGroup, ch);
 414  
               }
 415  0
               PipelineUtils.addLastWithExecutorCheck(
 416  
                   "clientOutboundByteCounter",
 417  0
                   outboundByteCounter, handlerToUseExecutionGroup,
 418  0
                   executionGroup, ch);
 419  0
               if (conf.doCompression()) {
 420  0
                 PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
 421  0
                     conf.getNettyCompressionEncoder(),
 422  0
                     handlerToUseExecutionGroup, executionGroup, ch);
 423  
               }
 424  0
               PipelineUtils.addLastWithExecutorCheck(
 425  
                   "fixed-length-frame-decoder",
 426  
                   new FixedLengthFrameDecoder(
 427  
                       RequestServerHandler.RESPONSE_BYTES),
 428  0
                  handlerToUseExecutionGroup, executionGroup, ch);
 429  0
               PipelineUtils.addLastWithExecutorCheck("request-encoder",
 430  0
                     new RequestEncoder(conf), handlerToUseExecutionGroup,
 431  0
                   executionGroup, ch);
 432  0
               PipelineUtils.addLastWithExecutorCheck("response-handler",
 433  
                   new ResponseClientHandler(NettyClient.this, conf),
 434  0
                   handlerToUseExecutionGroup, executionGroup, ch);
 435  
 
 436  
 /*if_not[HADOOP_NON_SECURE]*/
 437  
             }
 438  
 /*end[HADOOP_NON_SECURE]*/
 439  0
           }
 440  
 
 441  
           @Override
 442  
           public void channelUnregistered(ChannelHandlerContext ctx) throws
 443  
               Exception {
 444  0
             super.channelUnregistered(ctx);
 445  0
             LOG.error("Channel failed " + ctx.channel());
 446  0
             checkRequestsAfterChannelFailure(ctx.channel());
 447  0
           }
 448  
         });
 449  
 
 450  
     // Start a thread which will observe if there are any problems with open
 451  
     // requests
 452  0
     ThreadUtils.startThread(new Runnable() {
 453  
       @Override
 454  
       public void run() {
 455  
         while (true) {
 456  0
           ThreadUtils.trySleep(waitingRequestMsecs);
 457  0
           checkRequestsForProblems();
 458  
         }
 459  
       }
 460  
     }, "open-requests-observer");
 461  0
   }
 462  
 
 463  
   /**
 464  
    * Put the Netty-related counters in a single map which will be accessed
 465  
    * from the worker/master
 466  
    */
 467  
   private void initialiseCounters() {
 468  0
     Set<String> counters = COUNTER_GROUP_AND_NAMES.getOrDefault(
 469  
             NETTY_COUNTERS_GROUP, new HashSet<>());
 470  0
     counters.add(NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME);
 471  0
     counters.add(NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME);
 472  0
     counters.add(NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME);
 473  0
     COUNTER_GROUP_AND_NAMES.put(NETTY_COUNTERS_GROUP, counters);
 474  0
   }
 475  
 
 476  
   public static Map<String, Set<String>> getCounterGroupsAndNames() {
 477  0
     return COUNTER_GROUP_AND_NAMES;
 478  
   }
 479  
 
 480  
   /**
 481  
    * Whether master task is involved in the communication with a given client
 482  
    *
 483  
    * @param clientId id of the communication (on the end of the communication)
 484  
    * @return true if master is on one end of the communication
 485  
    */
 486  
   public boolean masterInvolved(int clientId) {
 487  0
     return myTaskInfo.getTaskId() == MasterInfo.MASTER_TASK_ID ||
 488  
         clientId == MasterInfo.MASTER_TASK_ID;
 489  
   }
 490  
 
 491  
   /**
 492  
    * Pair object for connectAllAddresses().
 493  
    */
 494  0
   private static class ChannelFutureAddress {
 495  
     /** Future object */
 496  
     private final ChannelFuture future;
 497  
     /** Address of the future */
 498  
     private final InetSocketAddress address;
 499  
     /** Task id */
 500  
     private final Integer taskId;
 501  
 
 502  
     /**
 503  
      * Constructor.
 504  
      *
 505  
      * @param future Immutable future
 506  
      * @param address Immutable address
 507  
      * @param taskId Immutable taskId
 508  
      */
 509  
     ChannelFutureAddress(
 510  0
         ChannelFuture future, InetSocketAddress address, Integer taskId) {
 511  0
       this.future = future;
 512  0
       this.address = address;
 513  0
       this.taskId = taskId;
 514  0
     }
 515  
 
 516  
     @Override
 517  
     public String toString() {
 518  0
       return "(future=" + future + ",address=" + address + ",taskId=" +
 519  
           taskId + ")";
 520  
     }
 521  
   }
 522  
 
 523  
   /**
 524  
    * Connect to a collection of tasks servers
 525  
    *
 526  
    * @param tasks Tasks to connect to (if haven't already connected)
 527  
    */
 528  
   public void connectAllAddresses(Collection<? extends TaskInfo> tasks) {
 529  0
     List<ChannelFutureAddress> waitingConnectionList =
 530  0
         Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);
 531  0
     for (TaskInfo taskInfo : tasks) {
 532  0
       context.progress();
 533  0
       int taskId = taskInfo.getTaskId();
 534  0
       InetSocketAddress address = taskIdAddressMap.get(taskId);
 535  0
       if (address == null ||
 536  0
           !address.getHostName().equals(taskInfo.getHostname()) ||
 537  0
           address.getPort() != taskInfo.getPort()) {
 538  0
         address = resolveAddress(maxResolveAddressAttempts,
 539  0
             taskInfo.getHostOrIp(), taskInfo.getPort());
 540  0
         taskIdAddressMap.put(taskId, address);
 541  
       }
 542  0
       if (address == null || address.getHostName() == null ||
 543  0
           address.getHostName().isEmpty()) {
 544  0
         throw new IllegalStateException("connectAllAddresses: Null address " +
 545  
             "in addresses " + tasks);
 546  
       }
 547  0
       if (address.isUnresolved()) {
 548  0
         throw new IllegalStateException("connectAllAddresses: Unresolved " +
 549  
             "address " + address);
 550  
       }
 551  
 
 552  0
       if (addressChannelMap.containsKey(address)) {
 553  0
         continue;
 554  
       }
 555  
 
 556  
       // Start connecting to the remote server up to n time
 557  0
       for (int i = 0; i < channelsPerServer; ++i) {
 558  0
         ChannelFuture connectionFuture = bootstrap.connect(address);
 559  
 
 560  0
         waitingConnectionList.add(
 561  
             new ChannelFutureAddress(
 562  0
                 connectionFuture, address, taskId));
 563  
       }
 564  0
     }
 565  
 
 566  
     // Wait for all the connections to succeed up to n tries
 567  0
     int failures = 0;
 568  0
     int connected = 0;
 569  0
     while (failures < maxConnectionFailures) {
 570  0
       List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
 571  0
       boolean isFirstFailure = true;
 572  0
       for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
 573  0
         context.progress();
 574  0
         ChannelFuture future = waitingConnection.future;
 575  0
         ProgressableUtils.awaitChannelFuture(future, context);
 576  0
         if (!future.isSuccess() || !future.channel().isOpen()) {
 577  
           // Make a short pause before trying to reconnect failed addresses
 578  
           // again, but to do it just once per iterating through channels
 579  0
           if (isFirstFailure) {
 580  0
             isFirstFailure = false;
 581  
             try {
 582  0
               Thread.sleep(waitTimeBetweenConnectionRetriesMs);
 583  0
             } catch (InterruptedException e) {
 584  0
               throw new IllegalStateException(
 585  
                   "connectAllAddresses: InterruptedException occurred", e);
 586  0
             }
 587  
           }
 588  
 
 589  0
           LOG.warn("connectAllAddresses: Future failed " +
 590  0
               "to connect with " + waitingConnection.address + " with " +
 591  0
               failures + " failures because of " + future.cause());
 592  
 
 593  0
           ChannelFuture connectionFuture =
 594  0
               bootstrap.connect(waitingConnection.address);
 595  0
           nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,
 596  0
               waitingConnection.address, waitingConnection.taskId));
 597  0
           ++failures;
 598  0
         } else {
 599  0
           Channel channel = future.channel();
 600  0
           if (LOG.isDebugEnabled()) {
 601  0
             LOG.debug("connectAllAddresses: Connected to " +
 602  0
                 channel.remoteAddress() + ", open = " + channel.isOpen());
 603  
           }
 604  
 
 605  0
           if (channel.remoteAddress() == null) {
 606  0
             throw new IllegalStateException(
 607  
                 "connectAllAddresses: Null remote address!");
 608  
           }
 609  
 
 610  0
           ChannelRotater rotater =
 611  0
               addressChannelMap.get(waitingConnection.address);
 612  0
           if (rotater == null) {
 613  0
             ChannelRotater newRotater =
 614  0
                 new ChannelRotater(waitingConnection.taskId,
 615  0
                     waitingConnection.address);
 616  0
             rotater = addressChannelMap.putIfAbsent(
 617  0
                 waitingConnection.address, newRotater);
 618  0
             if (rotater == null) {
 619  0
               rotater = newRotater;
 620  
             }
 621  
           }
 622  0
           rotater.addChannel(future.channel());
 623  0
           ++connected;
 624  
         }
 625  0
       }
 626  0
       LOG.info("connectAllAddresses: Successfully added " +
 627  0
           (waitingConnectionList.size() - nextCheckFutures.size()) +
 628  
           " connections, (" + connected + " total connected) " +
 629  0
           nextCheckFutures.size() + " failed, " +
 630  
           failures + " failures total.");
 631  0
       if (nextCheckFutures.isEmpty()) {
 632  0
         break;
 633  
       }
 634  0
       waitingConnectionList = nextCheckFutures;
 635  0
     }
 636  0
     if (failures >= maxConnectionFailures) {
 637  0
       throw new IllegalStateException(
 638  
           "connectAllAddresses: Too many failures (" + failures + ").");
 639  
     }
 640  0
   }
 641  
 
 642  
 /*if_not[HADOOP_NON_SECURE]*/
 643  
   /**
 644  
    * Authenticate all servers in addressChannelMap.
 645  
    */
 646  
   public void authenticate() {
 647  0
     LOG.info("authenticate: NettyClient starting authentication with " +
 648  
         "servers.");
 649  
     for (Map.Entry<InetSocketAddress, ChannelRotater> entry :
 650  0
       addressChannelMap.entrySet()) {
 651  0
       if (LOG.isDebugEnabled()) {
 652  0
         LOG.debug("authenticate: Authenticating with address:" +
 653  0
           entry.getKey());
 654  
       }
 655  0
       ChannelRotater channelRotater = entry.getValue();
 656  0
       for (Channel channel: channelRotater.getChannels()) {
 657  0
         if (LOG.isDebugEnabled()) {
 658  0
           LOG.debug("authenticate: Authenticating with server on channel: " +
 659  
               channel);
 660  
         }
 661  0
         authenticateOnChannel(channelRotater.getTaskId(), channel);
 662  0
       }
 663  0
     }
 664  0
     if (LOG.isInfoEnabled()) {
 665  0
       LOG.info("authenticate: NettyClient successfully authenticated with " +
 666  0
           addressChannelMap.size() + " server" +
 667  0
           ((addressChannelMap.size() != 1) ? "s" : "") +
 668  
           " - continuing with normal work.");
 669  
     }
 670  0
   }
 671  
 
 672  
   /**
 673  
    * Authenticate with server connected at given channel.
 674  
    *
 675  
    * @param taskId Task id of the channel
 676  
    * @param channel Connection to server to authenticate with.
 677  
    */
 678  
   private void authenticateOnChannel(Integer taskId, Channel channel) {
 679  
     try {
 680  0
       SaslNettyClient saslNettyClient = channel.attr(SASL).get();
 681  0
       if (channel.attr(SASL).get() == null) {
 682  0
         if (LOG.isDebugEnabled()) {
 683  0
           LOG.debug("authenticateOnChannel: Creating saslNettyClient now " +
 684  
               "for channel: " + channel);
 685  
         }
 686  0
         saslNettyClient = new SaslNettyClient();
 687  0
         channel.attr(SASL).set(saslNettyClient);
 688  
       }
 689  0
       if (!saslNettyClient.isComplete()) {
 690  0
         if (LOG.isDebugEnabled()) {
 691  0
           LOG.debug("authenticateOnChannel: Waiting for authentication " +
 692  
               "to complete..");
 693  
         }
 694  0
         SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();
 695  0
         sendWritableRequest(taskId, saslTokenMessage);
 696  
         // We now wait for Netty's thread pool to communicate over this
 697  
         // channel to authenticate with another worker acting as a server.
 698  
         try {
 699  0
           synchronized (saslNettyClient.getAuthenticated()) {
 700  0
             while (!saslNettyClient.isComplete()) {
 701  0
               saslNettyClient.getAuthenticated().wait();
 702  
             }
 703  0
           }
 704  0
         } catch (InterruptedException e) {
 705  0
           LOG.error("authenticateOnChannel: Interrupted while waiting for " +
 706  
               "authentication.");
 707  0
         }
 708  
       }
 709  0
       if (LOG.isDebugEnabled()) {
 710  0
         LOG.debug("authenticateOnChannel: Authentication on channel: " +
 711  
             channel + " has completed successfully.");
 712  
       }
 713  0
     } catch (IOException e) {
 714  0
       LOG.error("authenticateOnChannel: Failed to authenticate with server " +
 715  
           "due to error: " + e);
 716  0
     }
 717  0
     return;
 718  
   }
 719  
 /*end[HADOOP_NON_SECURE]*/
 720  
 
 721  
   /**
 722  
    * Stop the client.
 723  
    */
 724  
   public void stop() {
 725  0
     if (LOG.isInfoEnabled()) {
 726  0
       LOG.info("stop: Halting netty client");
 727  
     }
 728  
     // Close connections asynchronously, in a Netty-approved
 729  
     // way, without cleaning up thread pools until all channels
 730  
     // in addressChannelMap are closed (success or failure)
 731  0
     int channelCount = 0;
 732  0
     for (ChannelRotater channelRotater : addressChannelMap.values()) {
 733  0
       channelCount += channelRotater.size();
 734  0
     }
 735  0
     final int done = channelCount;
 736  0
     final AtomicInteger count = new AtomicInteger(0);
 737  0
     for (ChannelRotater channelRotater : addressChannelMap.values()) {
 738  0
       channelRotater.closeChannels(new ChannelFutureListener() {
 739  
         @Override
 740  
         public void operationComplete(ChannelFuture cf) {
 741  0
           context.progress();
 742  0
           if (count.incrementAndGet() == done) {
 743  0
             if (LOG.isInfoEnabled()) {
 744  0
               LOG.info("stop: reached wait threshold, " +
 745  
                   done + " connections closed, releasing " +
 746  
                   "resources now.");
 747  
             }
 748  0
             workerGroup.shutdownGracefully();
 749  0
             if (executionGroup != null) {
 750  0
               executionGroup.shutdownGracefully();
 751  
             }
 752  
           }
 753  0
         }
 754  
       });
 755  0
     }
 756  0
     ProgressableUtils.awaitTerminationFuture(workerGroup, context);
 757  0
     if (executionGroup != null) {
 758  0
       ProgressableUtils.awaitTerminationFuture(executionGroup, context);
 759  
     }
 760  0
     if (LOG.isInfoEnabled()) {
 761  0
       LOG.info("stop: Netty client halted");
 762  
     }
 763  0
   }
 764  
 
 765  
   /**
 766  
    * Get the next available channel, reconnecting if necessary
 767  
    *
 768  
    * @param remoteServer Remote server to get a channel for
 769  
    * @return Available channel for this remote server
 770  
    */
 771  
   private Channel getNextChannel(InetSocketAddress remoteServer) {
 772  0
     Channel channel = addressChannelMap.get(remoteServer).nextChannel();
 773  0
     if (channel == null) {
 774  0
       LOG.warn("getNextChannel: No channel exists for " + remoteServer);
 775  
     } else {
 776  
       // Return this channel if it is connected
 777  0
       if (channel.isActive()) {
 778  0
         return channel;
 779  
       }
 780  
 
 781  
       // Get rid of the failed channel
 782  0
       if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
 783  0
         LOG.warn("getNextChannel: Unlikely event that the channel " +
 784  
           channel + " was already removed!");
 785  
       }
 786  0
       if (LOG.isInfoEnabled()) {
 787  0
         LOG.info("getNextChannel: Fixing disconnected channel to " +
 788  0
           remoteServer + ", open = " + channel.isOpen() + ", " +
 789  0
           "bound = " + channel.isRegistered());
 790  
       }
 791  
     }
 792  
 
 793  0
     while (reconnectFailures < maxConnectionFailures) {
 794  0
       ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
 795  
       try {
 796  0
         ProgressableUtils.awaitChannelFuture(connectionFuture, context);
 797  0
       } catch (BlockingOperationException e) {
 798  0
         LOG.warn("getNextChannel: Failed connecting to " + remoteServer, e);
 799  0
       }
 800  0
       if (connectionFuture.isSuccess()) {
 801  0
         if (LOG.isInfoEnabled()) {
 802  0
           LOG.info("getNextChannel: Connected to " + remoteServer + "!");
 803  
         }
 804  0
         addressChannelMap.get(remoteServer).addChannel(
 805  0
             connectionFuture.channel());
 806  0
         return connectionFuture.channel();
 807  
       }
 808  0
       ++reconnectFailures;
 809  0
       LOG.warn("getNextChannel: Failed to reconnect to " +  remoteServer +
 810  
           " on attempt " + reconnectFailures + " out of " +
 811  
           maxConnectionFailures + " max attempts, sleeping for 5 secs",
 812  0
           connectionFuture.cause());
 813  0
       ThreadUtils.trySleep(5000);
 814  0
     }
 815  0
     throw new IllegalStateException("getNextChannel: Failed to connect " +
 816  
         "to " + remoteServer + " in " + reconnectFailures +
 817  
         " connect attempts");
 818  
   }
 819  
 
 820  
   /**
 821  
    * Send a request to a remote server honoring the flow control mechanism
 822  
    * (should be already connected)
 823  
    *
 824  
    * @param destTaskId Destination task id
 825  
    * @param request Request to send
 826  
    */
 827  
   public void sendWritableRequest(int destTaskId, WritableRequest request) {
 828  0
     flowControl.sendRequest(destTaskId, request);
 829  0
   }
 830  
 
 831  
   /**
 832  
    * Actual send of a request.
 833  
    *
 834  
    * @param destTaskId destination to send the request to
 835  
    * @param request request itself
 836  
    * @return request id generated for sending the request
 837  
    */
 838  
   public Long doSend(int destTaskId, WritableRequest request) {
 839  0
     InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
 840  0
     if (clientRequestIdRequestInfoMap.isEmpty()) {
 841  0
       inboundByteCounter.resetAll();
 842  0
       outboundByteCounter.resetAll();
 843  
     }
 844  0
     boolean registerRequest = true;
 845  0
     Long requestId = null;
 846  
 /*if_not[HADOOP_NON_SECURE]*/
 847  0
     if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
 848  0
       registerRequest = false;
 849  
     }
 850  
 /*end[HADOOP_NON_SECURE]*/
 851  
 
 852  0
     RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
 853  0
     if (registerRequest) {
 854  0
       request.setClientId(myTaskInfo.getTaskId());
 855  0
       requestId = taskRequestIdGenerator.getNextRequestId(destTaskId);
 856  0
       request.setRequestId(requestId);
 857  0
       ClientRequestId clientRequestId =
 858  0
         new ClientRequestId(destTaskId, request.getRequestId());
 859  0
       RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
 860  
         clientRequestId, newRequestInfo);
 861  0
       if (oldRequestInfo != null) {
 862  0
         throw new IllegalStateException("sendWritableRequest: Impossible to " +
 863  0
           "have a previous request id = " + request.getRequestId() + ", " +
 864  
           "request info of " + oldRequestInfo);
 865  
       }
 866  
     }
 867  0
     if (request.getSerializedSize() >
 868  
         requestSizeWarningThreshold * sendBufferSize) {
 869  0
       LOG.warn("Creating large request of type " + request.getClass() +
 870  0
         ", size " + request.getSerializedSize() +
 871  
         " bytes. Check netty buffer size.");
 872  
     }
 873  0
     writeRequestToChannel(newRequestInfo);
 874  0
     return requestId;
 875  
   }
 876  
 
 877  
   /**
 878  
    * Write request to a channel for its destination.
 879  
    *
 880  
    * Whenever we write to the channel, we also call flush, but we have added a
 881  
    * {@link FlushConsolidationHandler} in the pipeline, which batches the
 882  
    * flushes.
 883  
    *
 884  
    * @param requestInfo Request info
 885  
    */
 886  
   private void writeRequestToChannel(RequestInfo requestInfo) {
 887  0
     Channel channel = getNextChannel(requestInfo.getDestinationAddress());
 888  0
     ChannelFuture writeFuture = channel.writeAndFlush(requestInfo.getRequest());
 889  0
     requestInfo.setWriteFuture(writeFuture);
 890  0
     writeFuture.addListener(logErrorListener);
 891  0
   }
 892  
 
 893  
   /**
 894  
    * Handle receipt of a message. Called by response handler.
 895  
    *
 896  
    * @param senderId Id of sender of the message
 897  
    * @param requestId Id of the request
 898  
    * @param response Actual response
 899  
    * @param shouldDrop Drop the message?
 900  
    */
 901  
   public void messageReceived(int senderId, long requestId, int response,
 902  
       boolean shouldDrop) {
 903  0
     if (shouldDrop) {
 904  0
       synchronized (clientRequestIdRequestInfoMap) {
 905  0
         clientRequestIdRequestInfoMap.notifyAll();
 906  0
       }
 907  0
       return;
 908  
     }
 909  0
     AckSignalFlag responseFlag = flowControl.getAckSignalFlag(response);
 910  0
     if (responseFlag == AckSignalFlag.DUPLICATE_REQUEST) {
 911  0
       LOG.info("messageReceived: Already completed request (taskId = " +
 912  
           senderId + ", requestId = " + requestId + ")");
 913  0
     } else if (responseFlag != AckSignalFlag.NEW_REQUEST) {
 914  0
       throw new IllegalStateException(
 915  
           "messageReceived: Got illegal response " + response);
 916  
     }
 917  0
     RequestInfo requestInfo = clientRequestIdRequestInfoMap
 918  0
         .remove(new ClientRequestId(senderId, requestId));
 919  0
     if (requestInfo == null) {
 920  0
       LOG.info("messageReceived: Already received response for (taskId = " +
 921  
           senderId + ", requestId = " + requestId + ")");
 922  
     } else {
 923  0
       if (LOG.isDebugEnabled()) {
 924  0
         LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" +
 925  
             requestInfo + ".  Waiting on " +
 926  0
             clientRequestIdRequestInfoMap.size() + " requests");
 927  
       }
 928  0
       flowControl.messageAckReceived(senderId, requestId, response);
 929  
       // Help #waitAllRequests() to finish faster
 930  0
       synchronized (clientRequestIdRequestInfoMap) {
 931  0
         clientRequestIdRequestInfoMap.notifyAll();
 932  0
       }
 933  
     }
 934  0
   }
 935  
 
 936  
   /**
 937  
    * Ensure all the request sent so far are complete. Periodically check the
 938  
    * state of current open requests. If there is an issue in any of them,
 939  
    * re-send the request.
 940  
    */
 941  
   public void waitAllRequests() {
 942  0
     flowControl.waitAllRequests();
 943  0
     checkState(flowControl.getNumberOfUnsentRequests() == 0);
 944  0
     while (clientRequestIdRequestInfoMap.size() > 0) {
 945  
       // Wait for requests to complete for some time
 946  0
       synchronized (clientRequestIdRequestInfoMap) {
 947  0
         if (clientRequestIdRequestInfoMap.size() == 0) {
 948  0
           break;
 949  
         }
 950  
         try {
 951  0
           clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
 952  0
         } catch (InterruptedException e) {
 953  0
           throw new IllegalStateException("waitAllRequests: Got unexpected " +
 954  
               "InterruptedException", e);
 955  0
         }
 956  0
       }
 957  0
       logAndSanityCheck();
 958  
     }
 959  0
     if (LOG.isInfoEnabled()) {
 960  0
       LOG.info("waitAllRequests: Finished all requests. " +
 961  0
           inboundByteCounter.getMetrics() + "\n" + outboundByteCounter
 962  0
           .getMetrics());
 963  
     }
 964  0
   }
 965  
 
 966  
   /**
 967  
    * Log information about the requests and check for problems in requests
 968  
    */
 969  
   public void logAndSanityCheck() {
 970  0
     logInfoAboutOpenRequests();
 971  
     // Make sure that waiting doesn't kill the job
 972  0
     context.progress();
 973  0
   }
 974  
 
 975  
   /**
 976  
    * Log the status of open requests.
 977  
    */
 978  
   private void logInfoAboutOpenRequests() {
 979  0
     if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
 980  0
       LOG.info("logInfoAboutOpenRequests: Waiting interval of " +
 981  
           waitingRequestMsecs + " msecs, " +
 982  0
           clientRequestIdRequestInfoMap.size() +
 983  0
           " open requests, " + inboundByteCounter.getMetrics() + "\n" +
 984  0
           outboundByteCounter.getMetrics());
 985  
 
 986  0
       if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
 987  
         for (Map.Entry<ClientRequestId, RequestInfo> entry :
 988  0
             clientRequestIdRequestInfoMap.entrySet()) {
 989  0
           LOG.info("logInfoAboutOpenRequests: Waiting for request " +
 990  0
               entry.getKey() + " - " + entry.getValue());
 991  0
         }
 992  
       }
 993  
 
 994  
       // Count how many open requests each task has
 995  0
       Map<Integer, Integer> openRequestCounts = Maps.newHashMap();
 996  
       for (ClientRequestId clientRequestId :
 997  0
           clientRequestIdRequestInfoMap.keySet()) {
 998  0
         int taskId = clientRequestId.getDestinationTaskId();
 999  0
         Integer currentCount = openRequestCounts.get(taskId);
 1000  0
         openRequestCounts.put(taskId,
 1001  0
             (currentCount == null ? 0 : currentCount) + 1);
 1002  0
       }
 1003  
       // Sort it in decreasing order of number of open requests
 1004  0
       List<Map.Entry<Integer, Integer>> sorted =
 1005  0
           Lists.newArrayList(openRequestCounts.entrySet());
 1006  0
       Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
 1007  
         @Override
 1008  
         public int compare(Map.Entry<Integer, Integer> entry1,
 1009  
             Map.Entry<Integer, Integer> entry2) {
 1010  0
           int value1 = entry1.getValue();
 1011  0
           int value2 = entry2.getValue();
 1012  0
           return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
 1013  
         }
 1014  
       });
 1015  
       // Print task ids which have the most open requests
 1016  0
       StringBuilder message = new StringBuilder();
 1017  0
       message.append("logInfoAboutOpenRequests: ");
 1018  0
       int itemsToPrint =
 1019  0
           Math.min(MAX_DESTINATION_TASK_IDS_TO_LIST, sorted.size());
 1020  0
       for (int i = 0; i < itemsToPrint; i++) {
 1021  0
         message.append(sorted.get(i).getValue())
 1022  0
             .append(" requests for taskId=")
 1023  0
             .append(sorted.get(i).getKey())
 1024  0
             .append(", ");
 1025  
       }
 1026  0
       LOG.info(message);
 1027  0
       flowControl.logInfo();
 1028  
     }
 1029  0
   }
 1030  
 
 1031  
   /**
 1032  
    * Check if there are some open requests which have been sent a long time
 1033  
    * ago, and if so resend them.
 1034  
    */
 1035  
   private void checkRequestsForProblems() {
 1036  0
     long lastTimeChecked = lastTimeCheckedRequestsForProblems.get();
 1037  
     // If not enough time passed from the previous check, return
 1038  0
     if (System.currentTimeMillis() < lastTimeChecked + waitingRequestMsecs) {
 1039  0
       return;
 1040  
     }
 1041  
     // If another thread did the check already, return
 1042  0
     if (!lastTimeCheckedRequestsForProblems.compareAndSet(lastTimeChecked,
 1043  0
         System.currentTimeMillis())) {
 1044  0
       return;
 1045  
     }
 1046  0
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
 1047  
       @Override
 1048  
       public boolean apply(RequestInfo requestInfo) {
 1049  
         // If the request is taking too long, re-establish and resend
 1050  0
         return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
 1051  
       }
 1052  
     }, networkRequestsResentForTimeout, resendTimedOutRequests);
 1053  0
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
 1054  
       @Override
 1055  
       public boolean apply(RequestInfo requestInfo) {
 1056  0
         ChannelFuture writeFuture = requestInfo.getWriteFuture();
 1057  
         // If not connected anymore or request failed re-establish and resend
 1058  0
         return writeFuture != null && (!writeFuture.channel().isActive() ||
 1059  0
             (writeFuture.isDone() && !writeFuture.isSuccess()));
 1060  
       }
 1061  
     }, networkRequestsResentForConnectionFailure, true);
 1062  0
   }
 1063  
 
 1064  
   /**
 1065  
    * Resend requests which satisfy predicate
 1066  
    *  @param shouldResendRequestPredicate Predicate to use to check whether
 1067  
    *                                     request should be resent
 1068  
    * @param counter Counter to increment for every resent network request
 1069  
    * @param resendProblematicRequest Whether to resend problematic request or
 1070  
    *                                fail the job if such request is found
 1071  
    */
 1072  
   private void resendRequestsWhenNeeded(
 1073  
       Predicate<RequestInfo> shouldResendRequestPredicate,
 1074  
       GiraphHadoopCounter counter,
 1075  
       boolean resendProblematicRequest) {
 1076  
     // Check if there are open requests which have been sent a long time ago,
 1077  
     // and if so, resend them.
 1078  0
     List<ClientRequestId> addedRequestIds = Lists.newArrayList();
 1079  0
     List<RequestInfo> addedRequestInfos = Lists.newArrayList();
 1080  
     // Check all the requests for problems
 1081  
     for (Map.Entry<ClientRequestId, RequestInfo> entry :
 1082  0
         clientRequestIdRequestInfoMap.entrySet()) {
 1083  0
       RequestInfo requestInfo = entry.getValue();
 1084  
       // If request should be resent
 1085  0
       if (shouldResendRequestPredicate.apply(requestInfo)) {
 1086  0
         if (!resendProblematicRequest) {
 1087  0
           throw new IllegalStateException("Problem with request id " +
 1088  0
               entry.getKey() + " for " + requestInfo.getDestinationAddress() +
 1089  
               ", failing the job");
 1090  
         }
 1091  0
         ChannelFuture writeFuture = requestInfo.getWriteFuture();
 1092  
         String logMessage;
 1093  0
         if (writeFuture == null) {
 1094  0
           logMessage = "wasn't sent successfully";
 1095  
         } else {
 1096  0
           logMessage = "connected = " +
 1097  0
               writeFuture.channel().isActive() +
 1098  0
               ", future done = " + writeFuture.isDone() + ", " +
 1099  0
               "success = " + writeFuture.isSuccess() + ", " +
 1100  0
               "cause = " + writeFuture.cause() + ", " +
 1101  0
               "channelId = " + writeFuture.channel().hashCode();
 1102  
         }
 1103  0
         LOG.warn("checkRequestsForProblems: Problem with request id " +
 1104  0
             entry.getKey() + ", " + logMessage + ", " +
 1105  0
             "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
 1106  0
             "destination = " + requestInfo.getDestinationAddress() +
 1107  
             " " + requestInfo);
 1108  0
         addedRequestIds.add(entry.getKey());
 1109  0
         addedRequestInfos.add(new RequestInfo(
 1110  0
             requestInfo.getDestinationAddress(), requestInfo.getRequest()));
 1111  0
         counter.increment();
 1112  
       }
 1113  0
     }
 1114  
 
 1115  
     // Add any new requests to the system, connect if necessary, and re-send
 1116  0
     for (int i = 0; i < addedRequestIds.size(); ++i) {
 1117  0
       ClientRequestId requestId = addedRequestIds.get(i);
 1118  0
       RequestInfo requestInfo = addedRequestInfos.get(i);
 1119  
 
 1120  0
       if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) == null) {
 1121  0
         LOG.warn("checkRequestsForProblems: Request " + requestId +
 1122  
             " completed prior to sending the next request");
 1123  0
         clientRequestIdRequestInfoMap.remove(requestId);
 1124  
       }
 1125  0
       if (LOG.isInfoEnabled()) {
 1126  0
         LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
 1127  
       }
 1128  0
       writeRequestToChannel(requestInfo);
 1129  0
       if (LOG.isInfoEnabled()) {
 1130  0
         LOG.info("checkRequestsForProblems: Request " + requestId +
 1131  
             " was resent through channelId=" +
 1132  0
             requestInfo.getWriteFuture().channel().hashCode());
 1133  
       }
 1134  
     }
 1135  0
     addedRequestIds.clear();
 1136  0
     addedRequestInfos.clear();
 1137  0
   }
 1138  
 
 1139  
   /**
 1140  
    * Utility method for resolving addresses
 1141  
    *
 1142  
    * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
 1143  
    *        address
 1144  
    * @param hostOrIp Known IP or host name
 1145  
    * @param port Target port number
 1146  
    * @return The successfully resolved address.
 1147  
    * @throws IllegalStateException if the address is not resolved
 1148  
    *         in <code>maxResolveAddressAttempts</code> tries.
 1149  
    */
 1150  
   private static InetSocketAddress resolveAddress(
 1151  
       int maxResolveAddressAttempts, String hostOrIp, int port) {
 1152  0
     int resolveAttempts = 0;
 1153  0
     InetSocketAddress address = new InetSocketAddress(hostOrIp, port);
 1154  0
     while (address.isUnresolved() &&
 1155  
         resolveAttempts < maxResolveAddressAttempts) {
 1156  0
       ++resolveAttempts;
 1157  0
       LOG.warn("resolveAddress: Failed to resolve " + address +
 1158  
           " on attempt " + resolveAttempts + " of " +
 1159  
           maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
 1160  0
       ThreadUtils.trySleep(5000);
 1161  0
       address = new InetSocketAddress(hostOrIp,
 1162  0
           address.getPort());
 1163  
     }
 1164  0
     if (resolveAttempts >= maxResolveAddressAttempts) {
 1165  0
       throw new IllegalStateException("resolveAddress: Couldn't " +
 1166  
           "resolve " + address + " in " +  resolveAttempts + " tries.");
 1167  
     }
 1168  0
     return address;
 1169  
   }
 1170  
 
 1171  
   public FlowControl getFlowControl() {
 1172  0
     return flowControl;
 1173  
   }
 1174  
 
 1175  
   /**
 1176  
    * Generate and get the next request id to be used for a given worker
 1177  
    *
 1178  
    * @param taskId id of the worker to generate the next request id
 1179  
    * @return request id
 1180  
    */
 1181  
   public Long getNextRequestId(int taskId) {
 1182  0
     return taskRequestIdGenerator.getNextRequestId(taskId);
 1183  
   }
 1184  
 
 1185  
   /**
 1186  
    * @return number of open requests
 1187  
    */
 1188  
   public int getNumberOfOpenRequests() {
 1189  0
     return clientRequestIdRequestInfoMap.size();
 1190  
   }
 1191  
 
 1192  
   /**
 1193  
    * Resend requests related to channel which failed
 1194  
    *
 1195  
    * @param channel Channel which failed
 1196  
    */
 1197  
   private void checkRequestsAfterChannelFailure(final Channel channel) {
 1198  0
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
 1199  
       @Override
 1200  
       public boolean apply(RequestInfo requestInfo) {
 1201  0
         if (requestInfo.getWriteFuture() == null ||
 1202  0
             requestInfo.getWriteFuture().channel() == null) {
 1203  0
           return false;
 1204  
         }
 1205  0
         return requestInfo.getWriteFuture().channel().equals(channel);
 1206  
       }
 1207  
     }, networkRequestsResentForChannelFailure, true);
 1208  0
   }
 1209  
 
 1210  
   /**
 1211  
    * This listener class just dumps exception stack traces if
 1212  
    * something happens.
 1213  
    */
 1214  0
   private static class LogOnErrorChannelFutureListener
 1215  
       implements ChannelFutureListener {
 1216  
 
 1217  
     @Override
 1218  
     public void operationComplete(ChannelFuture future) throws Exception {
 1219  0
       if (future.isDone() && !future.isSuccess()) {
 1220  0
         LOG.error("Channel failed channelId=" + future.channel().hashCode(),
 1221  0
             future.cause());
 1222  
       }
 1223  0
     }
 1224  
   }
 1225  
 }