Coverage Report - org.apache.giraph.comm.netty.handler.RequestServerHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
RequestServerHandler
0%
0/60
0%
0/16
2
RequestServerHandler$Factory
N/A
N/A
2
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.netty.handler;
 20  
 
 21  
 import org.apache.giraph.comm.flow_control.FlowControl;
 22  
 import org.apache.giraph.comm.requests.WritableRequest;
 23  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 24  
 import org.apache.giraph.graph.TaskInfo;
 25  
 import org.apache.giraph.time.SystemTime;
 26  
 import org.apache.giraph.time.Time;
 27  
 import org.apache.giraph.time.Times;
 28  
 import org.apache.log4j.Logger;
 29  
 
 30  
 import io.netty.buffer.ByteBuf;
 31  
 import io.netty.channel.ChannelHandlerContext;
 32  
 import io.netty.channel.ChannelInboundHandlerAdapter;
 33  
 
 34  
 import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
 35  
 
 36  
 /**
 37  
  * Generic handler of requests.
 38  
  *
 39  
  * @param <R> Request type
 40  
  */
 41  
 public abstract class RequestServerHandler<R> extends
 42  
   ChannelInboundHandlerAdapter {
 43  
   /** Number of bytes in the encoded response */
 44  
   public static final int RESPONSE_BYTES = 16;
 45  
   /** Time class to use */
 46  0
   private static Time TIME = SystemTime.get();
 47  
   /** Class logger */
 48  0
   private static final Logger LOG =
 49  0
       Logger.getLogger(RequestServerHandler.class);
 50  
   /** Already closed first request? */
 51  0
   private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
 52  
   /** Flow control used in sending requests */
 53  
   protected FlowControl flowControl;
 54  
   /** Close connection on first request (used for simulating failure) */
 55  
   private final boolean closeFirstRequest;
 56  
   /** Request reserved map (for exactly one semantics) */
 57  
   private final WorkerRequestReservedMap workerRequestReservedMap;
 58  
   /** My task info */
 59  
   private final TaskInfo myTaskInfo;
 60  
   /** Start nanoseconds for the processing time */
 61  0
   private long startProcessingNanoseconds = -1;
 62  
   /** Handler for uncaught exceptions */
 63  
   private final Thread.UncaughtExceptionHandler exceptionHandler;
 64  
 
 65  
   /**
 66  
    * Constructor
 67  
    *
 68  
    * @param workerRequestReservedMap Worker request reservation map
 69  
    * @param conf Configuration
 70  
    * @param myTaskInfo Current task info
 71  
    * @param exceptionHandler Handles uncaught exceptions
 72  
    */
 73  
   public RequestServerHandler(
 74  
       WorkerRequestReservedMap workerRequestReservedMap,
 75  
       ImmutableClassesGiraphConfiguration conf,
 76  
       TaskInfo myTaskInfo,
 77  0
       Thread.UncaughtExceptionHandler exceptionHandler) {
 78  0
     this.workerRequestReservedMap = workerRequestReservedMap;
 79  0
     closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
 80  0
     this.myTaskInfo = myTaskInfo;
 81  0
     this.exceptionHandler = exceptionHandler;
 82  0
   }
 83  
 
 84  
   @Override
 85  
   public void channelRead(ChannelHandlerContext ctx, Object msg)
 86  
     throws Exception {
 87  0
     if (LOG.isTraceEnabled()) {
 88  0
       LOG.trace("messageReceived: Got " + msg.getClass());
 89  
     }
 90  
 
 91  0
     WritableRequest request = (WritableRequest) msg;
 92  
 
 93  
     // Simulate a closed connection on the first request (if desired)
 94  0
     if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
 95  0
       LOG.info("messageReceived: Simulating closing channel on first " +
 96  0
           "request " + request.getRequestId() + " from " +
 97  0
           request.getClientId());
 98  0
       setAlreadyClosedFirstRequest();
 99  0
       ctx.close();
 100  0
       return;
 101  
     }
 102  
 
 103  
     // Only execute this request exactly once
 104  0
     AckSignalFlag alreadyDone = AckSignalFlag.DUPLICATE_REQUEST;
 105  0
     if (workerRequestReservedMap.reserveRequest(
 106  0
         request.getClientId(),
 107  0
         request.getRequestId())) {
 108  0
       if (LOG.isDebugEnabled()) {
 109  0
         startProcessingNanoseconds = TIME.getNanoseconds();
 110  
       }
 111  0
       processRequest((R) request);
 112  0
       if (LOG.isDebugEnabled()) {
 113  0
         LOG.debug("messageReceived: Processing client " +
 114  0
             request.getClientId() + ", " +
 115  0
             "requestId " + request.getRequestId() +
 116  0
             ", " +  request.getType() + " took " +
 117  0
             Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns");
 118  
       }
 119  0
       alreadyDone = AckSignalFlag.NEW_REQUEST;
 120  
     } else {
 121  0
       LOG.info("messageReceived: Request id " +
 122  0
           request.getRequestId() + " from client " +
 123  0
           request.getClientId() +
 124  
           " was already processed, " +
 125  
           "not processing again.");
 126  
     }
 127  
 
 128  
     // Send the response with the request id
 129  0
     ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
 130  0
     buffer.writeInt(myTaskInfo.getTaskId());
 131  0
     buffer.writeLong(request.getRequestId());
 132  0
     int signal =
 133  0
         flowControl.calculateResponse(alreadyDone, request.getClientId());
 134  0
     buffer.writeInt(signal);
 135  0
     ctx.write(buffer);
 136  0
   }
 137  
 
 138  
   /**
 139  
    * Set the flag indicating already closed first request
 140  
    */
 141  
   private static void setAlreadyClosedFirstRequest() {
 142  0
     ALREADY_CLOSED_FIRST_REQUEST = true;
 143  0
   }
 144  
 
 145  
   /**
 146  
    * Process request
 147  
    *
 148  
    * @param request Request to process
 149  
    */
 150  
   public abstract void processRequest(R request);
 151  
 
 152  
   @Override
 153  
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
 154  0
     if (LOG.isDebugEnabled()) {
 155  0
       LOG.debug("channelActive: Connected the channel on " +
 156  0
           ctx.channel().remoteAddress());
 157  
     }
 158  0
     ctx.fireChannelActive();
 159  0
   }
 160  
 
 161  
   @Override
 162  
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
 163  0
     if (LOG.isDebugEnabled()) {
 164  0
       LOG.debug("channelInactive: Closed the channel on " +
 165  0
           ctx.channel().remoteAddress());
 166  
     }
 167  0
     ctx.fireChannelInactive();
 168  0
   }
 169  
 
 170  
   @Override
 171  
   public void exceptionCaught(
 172  
       ChannelHandlerContext ctx, Throwable cause) throws Exception {
 173  0
     exceptionHandler.uncaughtException(Thread.currentThread(), cause);
 174  0
   }
 175  
 
 176  
   /**
 177  
    * Factory for {@link RequestServerHandler}
 178  
    */
 179  
   public interface Factory {
 180  
     /**
 181  
      * Create new {@link RequestServerHandler}
 182  
      *
 183  
      * @param workerRequestReservedMap Worker request reservation map
 184  
      * @param conf Configuration to use
 185  
      * @param myTaskInfo Current task info
 186  
      * @param exceptionHandler Handles uncaught exceptions
 187  
      * @return New {@link RequestServerHandler}
 188  
      */
 189  
     RequestServerHandler newHandler(
 190  
         WorkerRequestReservedMap workerRequestReservedMap,
 191  
         ImmutableClassesGiraphConfiguration conf,
 192  
         TaskInfo myTaskInfo,
 193  
         Thread.UncaughtExceptionHandler exceptionHandler);
 194  
 
 195  
     /**
 196  
      * Inform the factory about the flow control policy used (this method should
 197  
      * be called before any call to `#newHandle()`)
 198  
      *
 199  
      * @param flowControl reference to flow control used
 200  
      */
 201  
     void setFlowControl(FlowControl flowControl);
 202  
   }
 203  
 }