Coverage Report - org.apache.giraph.comm.netty.NettyServer
 
Classes in this File Line Coverage Branch Coverage Complexity
NettyServer
0%
0/111
0%
0/28
3.4
NettyServer$1
0%
0/61
0%
0/10
3.4
NettyServer$1$1
0%
0/4
N/A
3.4
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.netty;
 20  
 
 21  
 import io.netty.handler.flush.FlushConsolidationHandler;
 22  
 import org.apache.giraph.comm.flow_control.FlowControl;
 23  
 /*if_not[HADOOP_NON_SECURE]*/
 24  
 import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
 25  
 /*end[HADOOP_NON_SECURE]*/
 26  
 import org.apache.giraph.comm.netty.handler.RequestDecoder;
 27  
 import org.apache.giraph.comm.netty.handler.RequestServerHandler;
 28  
 /*if_not[HADOOP_NON_SECURE]*/
 29  
 import org.apache.giraph.comm.netty.handler.ResponseEncoder;
 30  
 import org.apache.giraph.comm.netty.handler.SaslServerHandler;
 31  
 /*end[HADOOP_NON_SECURE]*/
 32  
 import org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap;
 33  
 import org.apache.giraph.conf.GiraphConstants;
 34  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 35  
 import org.apache.giraph.graph.TaskInfo;
 36  
 import org.apache.giraph.utils.PipelineUtils;
 37  
 import org.apache.giraph.utils.ProgressableUtils;
 38  
 import org.apache.giraph.utils.ThreadUtils;
 39  
 import org.apache.hadoop.util.Progressable;
 40  
 import org.apache.log4j.Logger;
 41  
 import io.netty.bootstrap.ServerBootstrap;
 42  
 import io.netty.channel.group.ChannelGroup;
 43  
 import io.netty.channel.group.DefaultChannelGroup;
 44  
 import io.netty.channel.nio.NioEventLoopGroup;
 45  
 import io.netty.channel.socket.SocketChannel;
 46  
 import io.netty.channel.ChannelHandlerContext;
 47  
 import io.netty.channel.EventLoopGroup;
 48  
 import io.netty.channel.ChannelOption;
 49  
 import io.netty.channel.ChannelInitializer;
 50  
 import io.netty.channel.ChannelInboundHandlerAdapter;
 51  
 import io.netty.channel.ChannelFuture;
 52  
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 53  
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 54  
 /*if_not[HADOOP_NON_SECURE]*/
 55  
 import io.netty.util.AttributeKey;
 56  
 /*end[HADOOP_NON_SECURE]*/
 57  
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 58  
 import io.netty.util.concurrent.EventExecutorGroup;
 59  
 import io.netty.util.concurrent.ImmediateEventExecutor;
 60  
 import io.netty.channel.AdaptiveRecvByteBufAllocator;
 61  
 
 62  
 import java.net.InetSocketAddress;
 63  
 import java.net.UnknownHostException;
 64  
 
 65  
 import static com.google.common.base.Preconditions.checkState;
 66  
 import static org.apache.giraph.conf.GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS;
 67  
 
 68  
 /**
 69  
  * This server uses Netty and will implement all Giraph communication
 70  
  */
 71  0
 public class NettyServer {
 72  
   /** Default maximum thread pool size */
 73  
   public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
 74  
 
 75  
 /*if_not[HADOOP_NON_SECURE]*/
 76  
   /** Used to authenticate with netty clients */
 77  
   public static final AttributeKey<SaslNettyServer>
 78  0
   CHANNEL_SASL_NETTY_SERVERS = AttributeKey.valueOf("channelSaslServers");
 79  
 /*end[HADOOP_NON_SECURE]*/
 80  
 
 81  
   /** Class logger */
 82  0
   private static final Logger LOG = Logger.getLogger(NettyServer.class);
 83  
   /** Configuration */
 84  
   private final ImmutableClassesGiraphConfiguration conf;
 85  
   /** Progressable for reporting progress */
 86  
   private final Progressable progressable;
 87  
   /** Accepted channels */
 88  0
   private final ChannelGroup accepted = new DefaultChannelGroup(
 89  
       ImmediateEventExecutor.INSTANCE);
 90  
   /** Local hostname */
 91  
   private final String localHostOrIp;
 92  
   /** Address of the server */
 93  
   private InetSocketAddress myAddress;
 94  
   /** Current task info */
 95  
   private TaskInfo myTaskInfo;
 96  
   /** Maximum number of threads */
 97  
   private final int maxPoolSize;
 98  
   /** TCP backlog */
 99  
   private final int tcpBacklog;
 100  
   /** Factory for {@link RequestServerHandler} */
 101  
   private final RequestServerHandler.Factory requestServerHandlerFactory;
 102  
 /*if_not[HADOOP_NON_SECURE]*/
 103  
   /** Factory for {@link RequestServerHandler} */
 104  
   private SaslServerHandler.Factory saslServerHandlerFactory;
 105  
 /*end[HADOOP_NON_SECURE]*/
 106  
   /** Server bootstrap */
 107  
   private ServerBootstrap bootstrap;
 108  
   /** Inbound byte counter for this client */
 109  0
   private final InboundByteCounter inByteCounter = new InboundByteCounter();
 110  
   /** Outbound byte counter for this client */
 111  0
   private final OutboundByteCounter outByteCounter = new OutboundByteCounter();
 112  
   /** Send buffer size */
 113  
   private final int sendBufferSize;
 114  
   /** Receive buffer size */
 115  
   private final int receiveBufferSize;
 116  
   /** Boss eventloop group */
 117  
   private final EventLoopGroup bossGroup;
 118  
   /** Worker eventloop group */
 119  
   private final EventLoopGroup workerGroup;
 120  
   /** Request completed map per worker */
 121  
   private final WorkerRequestReservedMap workerRequestReservedMap;
 122  
   /** Use execution group? */
 123  
   private final boolean useExecutionGroup;
 124  
   /** Execution handler (if used) */
 125  
   private final EventExecutorGroup executionGroup;
 126  
   /** Name of the handler before the execution handler (if used) */
 127  
   private final String handlerToUseExecutionGroup;
 128  
   /** Handles all uncaught exceptions in netty threads */
 129  
   private final Thread.UncaughtExceptionHandler exceptionHandler;
 130  
 
 131  
 
 132  
   /**
 133  
    * Constructor for creating the server
 134  
    *
 135  
    * @param conf Configuration to use
 136  
    * @param requestServerHandlerFactory Factory for request handlers
 137  
    * @param myTaskInfo Current task info
 138  
    * @param progressable Progressable for reporting progress
 139  
    * @param exceptionHandler handle uncaught exceptions
 140  
    */
 141  
   public NettyServer(ImmutableClassesGiraphConfiguration conf,
 142  
       RequestServerHandler.Factory requestServerHandlerFactory,
 143  
       TaskInfo myTaskInfo, Progressable progressable,
 144  0
       Thread.UncaughtExceptionHandler exceptionHandler) {
 145  0
     this.conf = conf;
 146  0
     this.progressable = progressable;
 147  0
     this.requestServerHandlerFactory = requestServerHandlerFactory;
 148  
 /*if_not[HADOOP_NON_SECURE]*/
 149  0
     this.saslServerHandlerFactory = new SaslServerHandler.Factory();
 150  
 /*end[HADOOP_NON_SECURE]*/
 151  0
     this.myTaskInfo = myTaskInfo;
 152  0
     this.exceptionHandler = exceptionHandler;
 153  0
     sendBufferSize = GiraphConstants.SERVER_SEND_BUFFER_SIZE.get(conf);
 154  0
     receiveBufferSize = GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE.get(conf);
 155  
 
 156  0
     workerRequestReservedMap = new WorkerRequestReservedMap(conf);
 157  
 
 158  0
     maxPoolSize = GiraphConstants.NETTY_SERVER_THREADS.get(conf);
 159  
 
 160  0
     bossGroup = new NioEventLoopGroup(4,
 161  0
         ThreadUtils.createThreadFactory(
 162  
             "netty-server-boss-%d", exceptionHandler));
 163  
 
 164  0
     workerGroup = new NioEventLoopGroup(maxPoolSize,
 165  0
         ThreadUtils.createThreadFactory(
 166  
             "netty-server-worker-%d", exceptionHandler));
 167  
 
 168  
     try {
 169  0
       this.localHostOrIp = conf.getLocalHostOrIp();
 170  0
     } catch (UnknownHostException e) {
 171  0
       throw new IllegalStateException("NettyServer: unable to get hostname");
 172  0
     }
 173  
 
 174  0
     tcpBacklog = conf.getInt(GiraphConstants.TCP_BACKLOG.getKey(),
 175  0
         conf.getInt(GiraphConstants.MAX_WORKERS,
 176  0
             GiraphConstants.TCP_BACKLOG.getDefaultValue()));
 177  
 
 178  0
     handlerToUseExecutionGroup =
 179  0
         GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER.get(conf);
 180  0
     useExecutionGroup =
 181  0
         GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.get(conf);
 182  0
     if (useExecutionGroup) {
 183  0
       int executionThreads = conf.getNettyServerExecutionThreads();
 184  0
       executionGroup = new DefaultEventExecutorGroup(executionThreads,
 185  0
           ThreadUtils.createThreadFactory(
 186  
               "netty-server-exec-%d", exceptionHandler));
 187  0
       if (LOG.isInfoEnabled()) {
 188  0
         LOG.info("NettyServer: Using execution group with " +
 189  
             executionThreads + " threads for " +
 190  
             handlerToUseExecutionGroup + ".");
 191  
       }
 192  0
     } else {
 193  0
       executionGroup = null;
 194  
     }
 195  0
   }
 196  
 
 197  
 /*if_not[HADOOP_NON_SECURE]*/
 198  
   /**
 199  
    * Constructor for creating the server
 200  
    *
 201  
    * @param conf Configuration to use
 202  
    * @param requestServerHandlerFactory Factory for request handlers
 203  
    * @param myTaskInfo Current task info
 204  
    * @param progressable Progressable for reporting progress
 205  
    * @param saslServerHandlerFactory  Factory for SASL handlers
 206  
    * @param exceptionHandler handle uncaught exceptions
 207  
    */
 208  
   public NettyServer(ImmutableClassesGiraphConfiguration conf,
 209  
                      RequestServerHandler.Factory requestServerHandlerFactory,
 210  
                      TaskInfo myTaskInfo,
 211  
                      Progressable progressable,
 212  
                      SaslServerHandler.Factory saslServerHandlerFactory,
 213  
                      Thread.UncaughtExceptionHandler exceptionHandler) {
 214  0
     this(conf, requestServerHandlerFactory, myTaskInfo,
 215  
         progressable, exceptionHandler);
 216  0
     this.saslServerHandlerFactory = saslServerHandlerFactory;
 217  0
   }
 218  
 /*end[HADOOP_NON_SECURE]*/
 219  
 
 220  
   /**
 221  
    * Returns a handle on the in-bound byte counter.
 222  
    * @return The {@link InboundByteCounter} object for this server.
 223  
    */
 224  
   public InboundByteCounter getInByteCounter() {
 225  0
     return inByteCounter;
 226  
   }
 227  
 
 228  
   /**
 229  
    * Start the server with the appropriate port
 230  
    */
 231  
   public void start() {
 232  0
     bootstrap = new ServerBootstrap();
 233  0
     bootstrap.group(bossGroup, workerGroup)
 234  0
         .channel(NioServerSocketChannel.class)
 235  0
         .option(ChannelOption.SO_BACKLOG, tcpBacklog)
 236  0
         .option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
 237  0
         .childOption(ChannelOption.SO_KEEPALIVE, true)
 238  0
         .childOption(ChannelOption.TCP_NODELAY, true)
 239  0
         .childOption(ChannelOption.SO_SNDBUF, sendBufferSize)
 240  0
         .childOption(ChannelOption.SO_RCVBUF, receiveBufferSize)
 241  0
         .childOption(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
 242  0
         .childOption(ChannelOption.RCVBUF_ALLOCATOR,
 243  
             new AdaptiveRecvByteBufAllocator(receiveBufferSize / 4,
 244  
                 receiveBufferSize, receiveBufferSize));
 245  
 
 246  
     /**
 247  
      * Pipeline setup: depends on whether configured to use authentication
 248  
      * or not.
 249  
      */
 250  0
     bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
 251  
       @Override
 252  
       protected void initChannel(SocketChannel ch) throws Exception {
 253  
 /*if_not[HADOOP_NON_SECURE]*/
 254  0
         if (conf.authenticate()) {
 255  0
           LOG.info("start: Will use Netty pipeline with " +
 256  
               "authentication and authorization of clients.");
 257  
           // After a client authenticates, the two authentication-specific
 258  
           // pipeline components SaslServerHandler and ResponseEncoder are
 259  
           // removed, leaving the pipeline the same as in the non-authenticated
 260  
           // configuration except for the presence of the Authorize component.
 261  0
           PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
 262  
             new FlushConsolidationHandler(FlushConsolidationHandler
 263  
               .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
 264  0
             handlerToUseExecutionGroup, executionGroup, ch);
 265  0
           PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
 266  0
               inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
 267  0
           if (conf.doCompression()) {
 268  0
             PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
 269  0
                 conf.getNettyCompressionDecoder(),
 270  0
                 handlerToUseExecutionGroup, executionGroup, ch);
 271  
           }
 272  0
           PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
 273  0
               outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
 274  0
           if (conf.doCompression()) {
 275  0
             PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
 276  0
                 conf.getNettyCompressionEncoder(),
 277  0
                 handlerToUseExecutionGroup, executionGroup, ch);
 278  
           }
 279  0
           PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
 280  
               new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
 281  0
               handlerToUseExecutionGroup, executionGroup, ch);
 282  0
           PipelineUtils.addLastWithExecutorCheck("requestDecoder",
 283  0
               new RequestDecoder(conf, inByteCounter),
 284  0
               handlerToUseExecutionGroup, executionGroup, ch);
 285  
           // Removed after authentication completes:
 286  0
           PipelineUtils.addLastWithExecutorCheck("saslServerHandler",
 287  0
               saslServerHandlerFactory.newHandler(conf),
 288  0
               handlerToUseExecutionGroup, executionGroup, ch);
 289  0
           PipelineUtils.addLastWithExecutorCheck("authorizeServerHandler",
 290  0
               new AuthorizeServerHandler(), handlerToUseExecutionGroup,
 291  0
               executionGroup, ch);
 292  0
           PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
 293  0
               requestServerHandlerFactory.newHandler(workerRequestReservedMap,
 294  0
                   conf, myTaskInfo, exceptionHandler),
 295  0
               handlerToUseExecutionGroup, executionGroup, ch);
 296  
           // Removed after authentication completes:
 297  0
           PipelineUtils.addLastWithExecutorCheck("responseEncoder",
 298  0
               new ResponseEncoder(), handlerToUseExecutionGroup,
 299  0
               executionGroup, ch);
 300  
         } else {
 301  0
           LOG.info("start: Using Netty without authentication.");
 302  
 /*end[HADOOP_NON_SECURE]*/
 303  
           // Store all connected channels in order to ensure that we can close
 304  
           // them on stop(), or else stop() may hang waiting for the
 305  
           // connections to close on their own
 306  0
           ch.pipeline().addLast("connectedChannels",
 307  0
               new ChannelInboundHandlerAdapter() {
 308  
                 @Override
 309  
                 public void channelActive(ChannelHandlerContext ctx)
 310  
                   throws Exception {
 311  0
                   accepted.add(ctx.channel());
 312  0
                   ctx.fireChannelActive();
 313  0
                 }
 314  
               });
 315  0
           PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
 316  
             new FlushConsolidationHandler(FlushConsolidationHandler
 317  
               .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
 318  0
             handlerToUseExecutionGroup, executionGroup, ch);
 319  0
           PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
 320  0
               inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
 321  0
           if (conf.doCompression()) {
 322  0
             PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
 323  0
                 conf.getNettyCompressionDecoder(),
 324  0
                 handlerToUseExecutionGroup, executionGroup, ch);
 325  
           }
 326  0
           PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
 327  0
               outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
 328  0
           if (conf.doCompression()) {
 329  0
             PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
 330  0
                 conf.getNettyCompressionEncoder(),
 331  0
                 handlerToUseExecutionGroup, executionGroup, ch);
 332  
           }
 333  0
           PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
 334  
               new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
 335  0
               handlerToUseExecutionGroup, executionGroup, ch);
 336  0
           PipelineUtils.addLastWithExecutorCheck("requestDecoder",
 337  0
               new RequestDecoder(conf, inByteCounter),
 338  0
               handlerToUseExecutionGroup, executionGroup, ch);
 339  0
           PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
 340  0
               requestServerHandlerFactory.newHandler(
 341  0
                   workerRequestReservedMap, conf, myTaskInfo, exceptionHandler),
 342  0
               handlerToUseExecutionGroup, executionGroup, ch);
 343  
 /*if_not[HADOOP_NON_SECURE]*/
 344  
         }
 345  
 /*end[HADOOP_NON_SECURE]*/
 346  0
       }
 347  
     });
 348  
 
 349  0
     int taskId = conf.getTaskPartition();
 350  0
     int numTasks = conf.getInt("mapred.map.tasks", 1);
 351  
     // Number of workers + 1 for master
 352  0
     int numServers = conf.getInt(GiraphConstants.MAX_WORKERS, numTasks) + 1;
 353  0
     int portIncrementConstant =
 354  0
         (int) Math.pow(10, Math.ceil(Math.log10(numServers)));
 355  0
     int bindPort = GiraphConstants.IPC_INITIAL_PORT.get(conf) + taskId;
 356  0
     int bindAttempts = 0;
 357  0
     final int maxIpcPortBindAttempts = MAX_IPC_PORT_BIND_ATTEMPTS.get(conf);
 358  0
     final boolean failFirstPortBindingAttempt =
 359  0
         GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.get(conf);
 360  
 
 361  
     // Simple handling of port collisions on the same machine while
 362  
     // preserving debugability from the port number alone.
 363  
     // Round up the max number of workers to the next power of 10 and use
 364  
     // it as a constant to increase the port number with.
 365  0
     while (bindAttempts < maxIpcPortBindAttempts) {
 366  0
       this.myAddress = new InetSocketAddress(localHostOrIp, bindPort);
 367  0
       if (failFirstPortBindingAttempt && bindAttempts == 0) {
 368  0
         if (LOG.isInfoEnabled()) {
 369  0
           LOG.info("start: Intentionally fail first " +
 370  
               "binding attempt as giraph.failFirstIpcPortBindAttempt " +
 371  
               "is true, port " + bindPort);
 372  
         }
 373  0
         ++bindAttempts;
 374  0
         bindPort += portIncrementConstant;
 375  0
         continue;
 376  
       }
 377  
 
 378  
       try {
 379  0
         ChannelFuture f = bootstrap.bind(myAddress).sync();
 380  0
         accepted.add(f.channel());
 381  0
         break;
 382  0
       } catch (InterruptedException e) {
 383  0
         throw new IllegalStateException(e);
 384  
         // CHECKSTYLE: stop IllegalCatchCheck
 385  0
       } catch (Exception e) {
 386  
         // CHECKSTYLE: resume IllegalCatchCheck
 387  0
         LOG.warn("start: Likely failed to bind on attempt " +
 388  0
             bindAttempts + " to port " + bindPort, e.getCause());
 389  0
         ++bindAttempts;
 390  0
         bindPort += portIncrementConstant;
 391  0
       }
 392  
     }
 393  0
     if (bindAttempts == maxIpcPortBindAttempts || myAddress == null) {
 394  0
       throw new IllegalStateException(
 395  
           "start: Failed to start NettyServer with " +
 396  
               bindAttempts + " attempts");
 397  
     }
 398  
 
 399  0
     if (LOG.isInfoEnabled()) {
 400  0
       LOG.info("start: Started server " +
 401  
           "communication server: " + myAddress + " with up to " +
 402  
           maxPoolSize + " threads on bind attempt " + bindAttempts +
 403  
           " with sendBufferSize = " + sendBufferSize +
 404  
           " receiveBufferSize = " + receiveBufferSize);
 405  
     }
 406  0
   }
 407  
 
 408  
   /**
 409  
    * Stop the server.
 410  
    */
 411  
   public void stop() {
 412  0
     if (LOG.isInfoEnabled()) {
 413  0
       LOG.info("stop: Halting netty server");
 414  
     }
 415  0
     ProgressableUtils.awaitChannelGroupFuture(accepted.close(), progressable);
 416  0
     if (LOG.isInfoEnabled()) {
 417  0
       LOG.info("stop: Start releasing resources");
 418  
     }
 419  0
     bossGroup.shutdownGracefully();
 420  0
     workerGroup.shutdownGracefully();
 421  0
     ProgressableUtils.awaitTerminationFuture(bossGroup, progressable);
 422  0
     ProgressableUtils.awaitTerminationFuture(workerGroup, progressable);
 423  0
     if (useExecutionGroup) {
 424  0
       executionGroup.shutdownGracefully();
 425  0
       ProgressableUtils.awaitTerminationFuture(executionGroup, progressable);
 426  
     }
 427  0
     if (LOG.isInfoEnabled()) {
 428  0
       LOG.info("stop: Netty server halted");
 429  
     }
 430  0
   }
 431  
 
 432  
   public InetSocketAddress getMyAddress() {
 433  0
     return myAddress;
 434  
   }
 435  
 
 436  
   public String getLocalHostOrIp() {
 437  0
     return localHostOrIp;
 438  
   }
 439  
 
 440  
   /**
 441  
    * Inform the server about the flow control policy used in sending requests
 442  
    *
 443  
    * @param flowControl reference to the flow control used
 444  
    */
 445  
   public void setFlowControl(FlowControl flowControl) {
 446  0
     checkState(requestServerHandlerFactory != null);
 447  0
     requestServerHandlerFactory.setFlowControl(flowControl);
 448  0
   }
 449  
 }
 450