Coverage Report - org.apache.giraph.comm.SendDataCache
 
Classes in this File Line Coverage Branch Coverage Complexity
SendDataCache
0%
0/59
0%
0/16
1.667
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import com.google.common.collect.Lists;
 22  
 import com.google.common.collect.Maps;
 23  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 24  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 25  
 import org.apache.giraph.partition.PartitionOwner;
 26  
 import org.apache.giraph.utils.PairList;
 27  
 import org.apache.giraph.worker.WorkerInfo;
 28  
 
 29  
 import javax.annotation.concurrent.NotThreadSafe;
 30  
 import java.util.List;
 31  
 import java.util.Map;
 32  
 
 33  
 /**
 34  
  * An abstract structure for caching data by partitions
 35  
  * to be sent to workers in bulk. Not thread-safe.
 36  
  *
 37  
  * @param <D> Data type of partition cache
 38  
  */
 39  
 @NotThreadSafe
 40  
 @SuppressWarnings("unchecked")
 41  
 public abstract class SendDataCache<D> {
 42  
   /**
 43  
    * Internal cache of partitions (index) to their partition caches of
 44  
    * type D.
 45  
    */
 46  
   private final D[] dataCache;
 47  
   /** How big to initially make output streams for each worker's partitions */
 48  
   private final int[] initialBufferSizes;
 49  
   /** Service worker */
 50  
   private final CentralizedServiceWorker serviceWorker;
 51  
   /** Size of data (in bytes) for each worker */
 52  
   private final int[] dataSizes;
 53  
   /** Total number of workers */
 54  
   private final int numWorkers;
 55  
   /** List of partition ids belonging to a worker */
 56  0
   private final Map<WorkerInfo, List<Integer>> workerPartitions =
 57  0
       Maps.newHashMap();
 58  
   /** Giraph configuration */
 59  
   private final ImmutableClassesGiraphConfiguration conf;
 60  
 
 61  
   /**
 62  
    * Constructor.
 63  
    *
 64  
    * @param conf Giraph configuration
 65  
    * @param serviceWorker Service worker
 66  
    * @param maxRequestSize Maximum request size (in bytes)
 67  
    * @param additionalRequestSize Additional request size (expressed as a
 68  
    *                              ratio of the average request size)
 69  
    */
 70  
   public SendDataCache(ImmutableClassesGiraphConfiguration conf,
 71  
                        CentralizedServiceWorker<?, ?, ?> serviceWorker,
 72  
                        int maxRequestSize,
 73  0
                        float additionalRequestSize) {
 74  0
     this.conf = conf;
 75  0
     this.serviceWorker = serviceWorker;
 76  0
     int maxPartition = 0;
 77  0
     for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
 78  0
       List<Integer> workerPartitionIds =
 79  0
           workerPartitions.get(partitionOwner.getWorkerInfo());
 80  0
       if (workerPartitionIds == null) {
 81  0
         workerPartitionIds = Lists.newArrayList();
 82  0
         workerPartitions.put(partitionOwner.getWorkerInfo(),
 83  
             workerPartitionIds);
 84  
       }
 85  0
       workerPartitionIds.add(partitionOwner.getPartitionId());
 86  0
       maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
 87  0
     }
 88  0
     dataCache = (D[]) new Object[maxPartition + 1];
 89  
 
 90  0
     int maxWorker = 0;
 91  0
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
 92  0
       maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
 93  0
     }
 94  0
     dataSizes = new int[maxWorker + 1];
 95  
 
 96  0
     int initialRequestSize =
 97  
         (int) (maxRequestSize * (1 + additionalRequestSize));
 98  0
     initialBufferSizes = new int[maxWorker + 1];
 99  0
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
 100  0
       initialBufferSizes[workerInfo.getTaskId()] =
 101  0
           initialRequestSize / workerPartitions.get(workerInfo).size();
 102  0
     }
 103  0
     numWorkers = maxWorker + 1;
 104  0
   }
 105  
 
 106  
   /**
 107  
    * Gets the data for a worker and removes it from the cache.
 108  
    *
 109  
    * @param workerInfo the address of the worker who owns the data
 110  
    *                   partitions that are receiving the data
 111  
    * @return List of pairs (partitionId, ByteArrayVertexIdData),
 112  
    *         where all partition ids belong to workerInfo
 113  
    */
 114  
   public PairList<Integer, D>
 115  
   removeWorkerData(WorkerInfo workerInfo) {
 116  0
     PairList<Integer, D> workerData = new PairList<Integer, D>();
 117  0
     List<Integer> partitions = workerPartitions.get(workerInfo);
 118  0
     workerData.initialize(partitions.size());
 119  0
     for (Integer partitionId : partitions) {
 120  0
       if (dataCache[partitionId] != null) {
 121  0
         workerData.add(partitionId, (D) dataCache[partitionId]);
 122  0
         dataCache[partitionId] = null;
 123  
       }
 124  0
     }
 125  0
     dataSizes[workerInfo.getTaskId()] = 0;
 126  0
     return workerData;
 127  
   }
 128  
 
 129  
   /**
 130  
    * Gets all the data and removes it from the cache.
 131  
    *
 132  
    * @return All data for all vertices for all partitions
 133  
    */
 134  
   public PairList<WorkerInfo, PairList<Integer, D>> removeAllData() {
 135  0
     PairList<WorkerInfo, PairList<Integer, D>> allData =
 136  
         new PairList<WorkerInfo, PairList<Integer, D>>();
 137  0
     allData.initialize(dataSizes.length);
 138  0
     for (WorkerInfo workerInfo : workerPartitions.keySet()) {
 139  0
       PairList<Integer, D> workerData = removeWorkerData(workerInfo);
 140  0
       if (!workerData.isEmpty()) {
 141  0
         allData.add(workerInfo, workerData);
 142  
       }
 143  0
       dataSizes[workerInfo.getTaskId()] = 0;
 144  0
     }
 145  0
     return allData;
 146  
   }
 147  
 
 148  
   /**
 149  
    * Get the data cache for a partition id
 150  
    *
 151  
    * @param partitionId Partition id
 152  
    * @return Data cache for a partition
 153  
    */
 154  
   public D getData(int partitionId) {
 155  0
     return dataCache[partitionId];
 156  
   }
 157  
 
 158  
   /**
 159  
    * Set the data cache for a partition id
 160  
    *
 161  
    * @param partitionId Partition id
 162  
    * @param data Data to be set for a partition id
 163  
    */
 164  
   public void setData(int partitionId, D data) {
 165  0
     dataCache[partitionId] = data;
 166  0
   }
 167  
 
 168  
   /**
 169  
    * Get initial buffer size of a partition.
 170  
    *
 171  
    * @param partitionId Partition id
 172  
    * @return Initial buffer size of a partition
 173  
    */
 174  
   public int getInitialBufferSize(int partitionId) {
 175  0
     return initialBufferSizes[partitionId];
 176  
   }
 177  
 
 178  
   /**
 179  
    * Increment the data size
 180  
    *
 181  
    * @param partitionId Partition id
 182  
    * @param size Size to increment by
 183  
    * @return new data size
 184  
    */
 185  
   public int incrDataSize(int partitionId, int size) {
 186  0
     dataSizes[partitionId] += size;
 187  0
     return dataSizes[partitionId];
 188  
   }
 189  
 
 190  
   public ImmutableClassesGiraphConfiguration getConf() {
 191  0
     return conf;
 192  
   }
 193  
 
 194  
   /**
 195  
    * Get the service worker.
 196  
    *
 197  
    * @return CentralizedServiceWorker
 198  
    */
 199  
   protected CentralizedServiceWorker getServiceWorker() {
 200  0
     return serviceWorker;
 201  
   }
 202  
 
 203  
   /**
 204  
    * Get the initial buffer size for the messages sent to a worker.
 205  
    *
 206  
    * @param taskId The task ID of a worker.
 207  
    * @return The initial buffer size for a worker.
 208  
    */
 209  
   protected int getSendWorkerInitialBufferSize(int taskId) {
 210  0
     return initialBufferSizes[taskId];
 211  
   }
 212  
 
 213  
   protected int getNumWorkers() {
 214  0
     return this.numWorkers;
 215  
   }
 216  
 
 217  
   protected Map<WorkerInfo, List<Integer>> getWorkerPartitions() {
 218  0
     return workerPartitions;
 219  
   }
 220  
 }