Coverage Report - org.apache.giraph.comm.SendPartitionCache
 
Classes in this File Line Coverage Branch Coverage Complexity
SendPartitionCache
0%
0/20
0%
0/2
2.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.comm;
 19  
 
 20  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 21  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 22  
 import org.apache.giraph.graph.Vertex;
 23  
 import org.apache.giraph.partition.PartitionOwner;
 24  
 import org.apache.giraph.utils.ExtendedDataOutput;
 25  
 import org.apache.giraph.utils.WritableUtils;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 import org.apache.log4j.Logger;
 29  
 
 30  
 import java.io.IOException;
 31  
 
 32  
 import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_VERTEX_REQUEST_SIZE;
 33  
 import static org.apache.giraph.conf.GiraphConstants.MAX_VERTEX_REQUEST_SIZE;
 34  
 
 35  
 /**
 36  
  * Caches partition vertices prior to sending.  Aggregating these requests
 37  
  * will make larger, more efficient requests.  Not thread-safe.
 38  
  *
 39  
  * @param <I> Vertex index value
 40  
  * @param <V> Vertex value
 41  
  * @param <E> Edge value
 42  
  */
 43  
 public class SendPartitionCache<I extends WritableComparable,
 44  
     V extends Writable, E extends Writable> extends
 45  
     SendDataCache<ExtendedDataOutput> {
 46  
   /** Class logger */
 47  0
   private static final Logger LOG =
 48  0
       Logger.getLogger(SendPartitionCache.class);
 49  
 
 50  
   /**
 51  
    * Constructor.
 52  
    *
 53  
    * @param conf Giraph configuration
 54  
    * @param serviceWorker Service worker
 55  
    */
 56  
   public SendPartitionCache(ImmutableClassesGiraphConfiguration<I, V, E> conf,
 57  
                             CentralizedServiceWorker<?, ?, ?> serviceWorker) {
 58  0
     super(conf, serviceWorker, MAX_VERTEX_REQUEST_SIZE.get(conf),
 59  0
         ADDITIONAL_VERTEX_REQUEST_SIZE.get(conf));
 60  0
   }
 61  
 
 62  
   /**
 63  
    * Add a vertex to the cache.
 64  
    *
 65  
    * @param partitionOwner Partition owner of the vertex
 66  
    * @param vertex Vertex to add
 67  
    * @return Size of partitions for this worker
 68  
    */
 69  
   public int addVertex(PartitionOwner partitionOwner,
 70  
       Vertex<I, V, E> vertex) {
 71  
     // Get the data collection
 72  0
     ExtendedDataOutput partitionData =
 73  0
         getData(partitionOwner.getPartitionId());
 74  0
     int taskId = partitionOwner.getWorkerInfo().getTaskId();
 75  0
     int originalSize = 0;
 76  0
     if (partitionData == null) {
 77  0
       partitionData = getConf().createExtendedDataOutput(
 78  0
           getInitialBufferSize(taskId));
 79  0
       setData(partitionOwner.getPartitionId(), partitionData);
 80  
     } else {
 81  0
       originalSize = partitionData.getPos();
 82  
     }
 83  
     try {
 84  0
       WritableUtils.<I, V, E>writeVertexToDataOutput(
 85  0
           partitionData, vertex, getConf());
 86  0
     } catch (IOException e) {
 87  0
       throw new IllegalStateException("addVertex: Failed to serialize", e);
 88  0
     }
 89  
 
 90  
     // Update the size of cached, outgoing data per worker
 91  0
     return incrDataSize(taskId, partitionData.getPos() - originalSize);
 92  
   }
 93  
 }
 94