Coverage Report - org.apache.giraph.comm.netty.handler.RequestEncoder
 
Classes in this File Line Coverage Branch Coverage Complexity
RequestEncoder
0%
0/38
0%
0/8
4.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.netty.handler;
 20  
 
 21  
 import io.netty.buffer.ByteBufOutputStream;
 22  
 import org.apache.giraph.comm.requests.WritableRequest;
 23  
 import org.apache.giraph.conf.GiraphConfiguration;
 24  
 import org.apache.giraph.conf.GiraphConstants;
 25  
 import org.apache.giraph.time.SystemTime;
 26  
 import org.apache.giraph.time.Time;
 27  
 import org.apache.giraph.time.Times;
 28  
 import org.apache.log4j.Logger;
 29  
 
 30  
 import io.netty.buffer.ByteBuf;
 31  
 import io.netty.channel.ChannelHandlerContext;
 32  
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 33  
 import io.netty.channel.ChannelPromise;
 34  
 
 35  
 import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
 36  
 import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
 37  
 
 38  
 /**
 39  
  * Requests have a request type and an encoded request.
 40  
  */
 41  
 public class RequestEncoder extends ChannelOutboundHandlerAdapter {
 42  
   /** Class logger */
 43  0
   private static final Logger LOG = Logger.getLogger(RequestEncoder.class);
 44  
   /** Time class to use */
 45  0
   private static final Time TIME = SystemTime.get();
 46  
   /** Buffer starting size */
 47  
   private final int bufferStartingSize;
 48  
   /** Start nanoseconds for the encoding time */
 49  0
   private long startEncodingNanoseconds = -1;
 50  
 
 51  
   /**
 52  
    * Constructor.
 53  
    *
 54  
    * @param conf Giraph configuration
 55  
    */
 56  0
   public RequestEncoder(GiraphConfiguration conf) {
 57  0
     bufferStartingSize =
 58  0
         GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE.get(conf);
 59  0
   }
 60  
 
 61  
   @Override
 62  
   public void write(ChannelHandlerContext ctx, Object msg,
 63  
     ChannelPromise promise) throws Exception {
 64  0
     if (!(msg instanceof WritableRequest)) {
 65  0
       throw new IllegalArgumentException(
 66  0
           "encode: Got a message of type " + msg.getClass());
 67  
     }
 68  
 
 69  
     // Encode the request
 70  0
     if (LOG.isDebugEnabled()) {
 71  0
       startEncodingNanoseconds = TIME.getNanoseconds();
 72  
     }
 73  
 
 74  
     ByteBuf buf;
 75  0
     WritableRequest request = (WritableRequest) msg;
 76  0
     int requestSize = request.getSerializedSize();
 77  0
     if (requestSize == WritableRequest.UNKNOWN_SIZE) {
 78  0
       buf = ctx.alloc().buffer(bufferStartingSize);
 79  
     } else {
 80  0
       requestSize +=  SIZE_OF_INT + SIZE_OF_BYTE;
 81  0
       buf = ctx.alloc().buffer(requestSize);
 82  
     }
 83  0
     ByteBufOutputStream output = new ByteBufOutputStream(buf);
 84  
 
 85  
     // This will later be filled with the correct size of serialized request
 86  0
     output.writeInt(0);
 87  0
     output.writeByte(request.getType().ordinal());
 88  
     try {
 89  0
       request.write(output);
 90  0
     } catch (IndexOutOfBoundsException e) {
 91  0
       LOG.error("write: Most likely the size of request was not properly " +
 92  
           "specified (this buffer is too small) - see getSerializedSize() " +
 93  0
           "in " + request.getType().getRequestClass());
 94  0
       throw new IllegalStateException(e);
 95  0
     }
 96  0
     output.flush();
 97  0
     output.close();
 98  
 
 99  
     // Set the correct size at the end
 100  0
     buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
 101  0
     if (LOG.isDebugEnabled()) {
 102  0
       LOG.debug("write: Client " + request.getClientId() + ", " +
 103  0
           "requestId " + request.getRequestId() +
 104  0
           ", size = " + buf.readableBytes() + ", " +
 105  0
           request.getType() + " took " +
 106  0
           Times.getNanosSince(TIME, startEncodingNanoseconds) + " ns");
 107  
     }
 108  0
     ctx.write(buf, promise);
 109  0
   }
 110  
 }