Coverage Report - org.apache.giraph.comm.SendVertexIdDataCache
 
Classes in this File Line Coverage Branch Coverage Complexity
SendVertexIdDataCache
0%
0/21
0%
0/2
1.2
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 22  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 23  
 import org.apache.giraph.utils.VertexIdData;
 24  
 import org.apache.giraph.worker.WorkerInfo;
 25  
 import org.apache.hadoop.io.WritableComparable;
 26  
 
 27  
 import javax.annotation.concurrent.NotThreadSafe;
 28  
 
 29  
 /**
 30  
  * An abstract structure for caching data indexed by vertex id,
 31  
  * to be sent to workers in bulk. Not thread-safe.
 32  
  *
 33  
  * @param <I> Vertex id
 34  
  * @param <T> Data
 35  
  * @param <B> Specialization of {@link VertexIdData} for T
 36  
  */
 37  
 @NotThreadSafe
 38  
 @SuppressWarnings("unchecked")
 39  
 public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
 40  
     B extends VertexIdData<I, T>> extends SendDataCache<B> {
 41  
   /**
 42  
    * Constructor.
 43  
    *
 44  
    * @param conf Giraph configuration
 45  
    * @param serviceWorker Service worker
 46  
    * @param maxRequestSize Maximum request size (in bytes)
 47  
    * @param additionalRequestSize Additional request size (expressed as a
 48  
    *                              ratio of the average request size)
 49  
    */
 50  
   public SendVertexIdDataCache(ImmutableClassesGiraphConfiguration conf,
 51  
                                CentralizedServiceWorker<?, ?, ?> serviceWorker,
 52  
                                int maxRequestSize,
 53  
                                float additionalRequestSize) {
 54  0
     super(conf, serviceWorker, maxRequestSize, additionalRequestSize);
 55  0
   }
 56  
 
 57  
   /**
 58  
    * Create a new {@link VertexIdData} specialized for the use case.
 59  
    *
 60  
    * @return A new instance of {@link VertexIdData}
 61  
    */
 62  
   public abstract B createVertexIdData();
 63  
 
 64  
   /**
 65  
    * Add data to the cache.
 66  
    *
 67  
    * @param workerInfo the remote worker destination
 68  
    * @param partitionId the remote Partition this message belongs to
 69  
    * @param destVertexId vertex id that is ultimate destination
 70  
    * @param data Data to send to remote worker
 71  
    * @return Size of messages for the worker.
 72  
    */
 73  
   public int addData(WorkerInfo workerInfo,
 74  
                      int partitionId, I destVertexId, T data) {
 75  
     // Get the data collection
 76  0
     VertexIdData<I, T> partitionData =
 77  0
         getPartitionData(workerInfo, partitionId);
 78  0
     int originalSize = partitionData.getSize();
 79  0
     partitionData.add(destVertexId, data);
 80  
     // Update the size of cached, outgoing data per worker
 81  0
     return incrDataSize(workerInfo.getTaskId(),
 82  0
         partitionData.getSize() - originalSize);
 83  
   }
 84  
 
 85  
   /**
 86  
    * This method is similar to the method above,
 87  
    * but use a serialized id to replace original I type
 88  
    * destVertexId.
 89  
    *
 90  
    * @param workerInfo The remote worker destination
 91  
    * @param partitionId The remote Partition this message belongs to
 92  
    * @param serializedId The byte array to store the serialized target vertex id
 93  
    * @param idPos The length of bytes of serialized id in the byte array above
 94  
    * @param data Data to send to remote worker
 95  
    * @return The number of bytes added to the target worker
 96  
    */
 97  
   public int addData(WorkerInfo workerInfo, int partitionId,
 98  
                      byte[] serializedId, int idPos, T data) {
 99  
     // Get the data collection
 100  0
     VertexIdData<I, T> partitionData =
 101  0
         getPartitionData(workerInfo, partitionId);
 102  0
     int originalSize = partitionData.getSize();
 103  0
     partitionData.add(serializedId, idPos, data);
 104  
     // Update the size of cached, outgoing data per worker
 105  0
     return incrDataSize(workerInfo.getTaskId(),
 106  0
         partitionData.getSize() - originalSize);
 107  
   }
 108  
 
 109  
   /**
 110  
    * This method tries to get a partition data from the data cache.
 111  
    * If null, it will create one.
 112  
    *
 113  
    * @param workerInfo The remote worker destination
 114  
    * @param partitionId The remote Partition this message belongs to
 115  
    * @return The partition data in data cache
 116  
    */
 117  
   private VertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
 118  
                                                        int partitionId) {
 119  
     // Get the data collection
 120  0
     B partitionData = getData(partitionId);
 121  0
     if (partitionData == null) {
 122  0
       partitionData = createVertexIdData();
 123  0
       partitionData.setConf(getConf());
 124  0
       partitionData.initialize(getInitialBufferSize(workerInfo.getTaskId()));
 125  0
       setData(partitionId, partitionData);
 126  
     }
 127  
 
 128  0
     return partitionData;
 129  
   }
 130  
 }