Coverage Report - org.apache.giraph.comm.requests.SendWorkerVerticesRequest
 
Classes in this File Line Coverage Branch Coverage Complexity
SendWorkerVerticesRequest
0%
0/40
0%
0/8
1.571
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.requests;
 20  
 
 21  
 import org.apache.giraph.comm.ServerData;
 22  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 23  
 import org.apache.giraph.utils.ExtendedDataOutput;
 24  
 import org.apache.giraph.utils.PairList;
 25  
 import org.apache.giraph.utils.WritableUtils;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 import org.apache.log4j.Logger;
 29  
 
 30  
 import java.io.DataInput;
 31  
 import java.io.DataOutput;
 32  
 import java.io.IOException;
 33  
 
 34  
 /**
 35  
  * Send to a worker one or more partitions of vertices
 36  
  *
 37  
  * @param <I> Vertex id
 38  
  * @param <V> Vertex data
 39  
  * @param <E> Edge data
 40  
  */
 41  
 @SuppressWarnings("rawtypes")
 42  
 public class SendWorkerVerticesRequest<I extends WritableComparable,
 43  
     V extends Writable, E extends Writable> extends
 44  
     WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
 45  
   /** Class logger */
 46  0
   private static final Logger LOG =
 47  0
       Logger.getLogger(SendWorkerVerticesRequest.class);
 48  
   /** Worker partitions to be sent */
 49  
   private PairList<Integer, ExtendedDataOutput> workerPartitions;
 50  
 
 51  
   /**
 52  
    * Constructor used for reflection only
 53  
    */
 54  0
   public SendWorkerVerticesRequest() { }
 55  
 
 56  
   /**
 57  
    * Constructor for sending a request.
 58  
    *
 59  
    * @param conf Configuration
 60  
    * @param workerPartitions Partitions to be send in this request
 61  
    */
 62  
   public SendWorkerVerticesRequest(
 63  
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
 64  0
       PairList<Integer, ExtendedDataOutput> workerPartitions) {
 65  0
     this.workerPartitions = workerPartitions;
 66  0
     setConf(conf);
 67  0
   }
 68  
 
 69  
   @Override
 70  
   public void readFieldsRequest(DataInput input) throws IOException {
 71  0
     int numPartitions = input.readInt();
 72  0
     workerPartitions = new PairList<Integer, ExtendedDataOutput>();
 73  0
     workerPartitions.initialize(numPartitions);
 74  0
     while (numPartitions-- > 0) {
 75  0
       final int partitionId = input.readInt();
 76  0
       ExtendedDataOutput partitionData =
 77  0
           WritableUtils.readExtendedDataOutput(input, getConf());
 78  0
       workerPartitions.add(partitionId, partitionData);
 79  0
     }
 80  0
   }
 81  
 
 82  
   @Override
 83  
   public void writeRequest(DataOutput output) throws IOException {
 84  0
     output.writeInt(workerPartitions.getSize());
 85  
     PairList<Integer, ExtendedDataOutput>.Iterator
 86  0
         iterator = workerPartitions.getIterator();
 87  0
     while (iterator.hasNext()) {
 88  0
       iterator.next();
 89  0
       output.writeInt(iterator.getCurrentFirst());
 90  0
       WritableUtils.writeExtendedDataOutput(
 91  0
           iterator.getCurrentSecond(), output);
 92  
     }
 93  0
   }
 94  
 
 95  
   @Override
 96  
   public RequestType getType() {
 97  0
     return RequestType.SEND_WORKER_VERTICES_REQUEST;
 98  
   }
 99  
 
 100  
   @Override
 101  
   public void doRequest(ServerData<I, V, E> serverData) {
 102  
     PairList<Integer, ExtendedDataOutput>.Iterator
 103  0
         iterator = workerPartitions.getIterator();
 104  0
     while (iterator.hasNext()) {
 105  0
       iterator.next();
 106  0
       serverData.getPartitionStore()
 107  0
           .addPartitionVertices(iterator.getCurrentFirst(),
 108  0
               iterator.getCurrentSecond());
 109  
     }
 110  0
   }
 111  
 
 112  
   @Override
 113  
   public int getSerializedSize() {
 114  
     // 4 for number of partitions
 115  0
     int size = super.getSerializedSize() + 4;
 116  0
     PairList<Integer, ExtendedDataOutput>.Iterator iterator =
 117  0
         workerPartitions.getIterator();
 118  0
     while (iterator.hasNext()) {
 119  0
       iterator.next();
 120  
       // 4 bytes for the partition id and 4 bytes for the size
 121  0
       size += 8 + iterator.getCurrentSecond().getPos();
 122  
     }
 123  0
     return size;
 124  
   }
 125  
 }
 126