Coverage Report - org.apache.giraph.comm.aggregators.AllAggregatorServerData
 
Classes in this File Line Coverage Branch Coverage Complexity
AllAggregatorServerData
0%
0/51
0%
0/11
0
AllAggregatorServerData$1
0%
0/1
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.aggregators;
 20  
 
 21  
 import java.util.Collections;
 22  
 import java.util.List;
 23  
 import java.util.Map;
 24  
 import java.util.Map.Entry;
 25  
 import java.util.Set;
 26  
 import java.util.concurrent.ConcurrentMap;
 27  
 
 28  
 import org.apache.giraph.comm.GlobalCommType;
 29  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 30  
 import org.apache.giraph.master.MasterInfo;
 31  
 import org.apache.giraph.reducers.ReduceOperation;
 32  
 import org.apache.giraph.reducers.Reducer;
 33  
 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
 34  
 import org.apache.hadoop.io.Writable;
 35  
 import org.apache.hadoop.util.Progressable;
 36  
 import org.apache.log4j.Logger;
 37  
 
 38  
 import com.google.common.base.Preconditions;
 39  
 import com.google.common.collect.Lists;
 40  
 import com.google.common.collect.Maps;
 41  
 
 42  
 /**
 43  
  * Accepts aggregators and their values from previous superstep from master
 44  
  * and workers which own aggregators. Keeps data received from master so it
 45  
  * could be distributed later. Also counts the requests so we would know
 46  
  * when we are done receiving requests.
 47  
  *
 48  
  * Only restriction is that we need to call registerAggregatorClass before
 49  
  * calling createAggregatorInitialValue, other than that methods of this class
 50  
  * are thread-safe.
 51  
  */
 52  
 public class AllAggregatorServerData {
 53  
   /** Class logger */
 54  0
   private static final Logger LOG =
 55  0
       Logger.getLogger(AllAggregatorServerData.class);
 56  
   /** Map of broadcasted values from master */
 57  0
   private final ConcurrentMap<String, Writable>
 58  0
   broadcastedMap = Maps.newConcurrentMap();
 59  
   /** Map of registered reducers for current superstep */
 60  0
   private final ConcurrentMap<String, ReduceOperation<Object, Writable>>
 61  0
   reduceOpMap = Maps.newConcurrentMap();
 62  
   /**
 63  
    * Counts the requests with final aggregators from master.
 64  
    * It uses values from special aggregators
 65  
    * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
 66  
    * to know how many requests it has to receive.
 67  
    */
 68  
   private final TaskIdsPermitsBarrier masterBarrier;
 69  
   /**
 70  
    * Aggregator data which this worker received from master and which it is
 71  
    * going to distribute before starting next superstep. Thread-safe.
 72  
    */
 73  0
   private final List<byte[]> masterData =
 74  0
       Collections.synchronizedList(Lists.<byte[]>newArrayList());
 75  
   /**
 76  
    * Counts the requests with final aggregators from other workers.
 77  
    * It uses values from special aggregators
 78  
    * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
 79  
    * to know how many requests it has to receive.
 80  
    */
 81  
   private final TaskIdsPermitsBarrier workersBarrier;
 82  
   /** Progressable used to report progress */
 83  
   private final Progressable progressable;
 84  
   /** Configuration */
 85  
   private final ImmutableClassesGiraphConfiguration conf;
 86  
 
 87  
   /**
 88  
    * Constructor
 89  
    *
 90  
    * @param progressable Progressable used to report progress
 91  
    * @param conf Configuration
 92  
    */
 93  
   public AllAggregatorServerData(Progressable progressable,
 94  0
       ImmutableClassesGiraphConfiguration conf) {
 95  0
     this.progressable = progressable;
 96  0
     this.conf = conf;
 97  0
     workersBarrier = new TaskIdsPermitsBarrier(progressable);
 98  0
     masterBarrier = new TaskIdsPermitsBarrier(progressable);
 99  0
   }
 100  
 
 101  
   /**
 102  
    * Received value through global communication from master.
 103  
    * @param name Name
 104  
    * @param type Global communication type
 105  
    * @param value Object value
 106  
    */
 107  
   public void receiveValueFromMaster(
 108  
       String name, GlobalCommType type, Writable value) {
 109  0
     switch (type) {
 110  
     case BROADCAST:
 111  0
       broadcastedMap.put(name, value);
 112  0
       break;
 113  
 
 114  
     case REDUCE_OPERATIONS:
 115  0
       reduceOpMap.put(name, (ReduceOperation<Object, Writable>) value);
 116  0
       break;
 117  
 
 118  
     default:
 119  0
       throw new IllegalArgumentException("Unkown request type " + type);
 120  
     }
 121  0
     progressable.progress();
 122  0
   }
 123  
 
 124  
   /**
 125  
    * Notify this object that an aggregator request from master has been
 126  
    * received.
 127  
    *
 128  
    * @param data Byte request with data received from master
 129  
    */
 130  
   public void receivedRequestFromMaster(byte[] data) {
 131  0
     masterData.add(data);
 132  0
     masterBarrier.releaseOnePermit();
 133  0
   }
 134  
 
 135  
   /**
 136  
    * Notify this object about the total number of requests which should
 137  
    * arrive from master.
 138  
    *
 139  
    * @param requestCount Number of requests which should arrive
 140  
    * @param taskId Task id of master
 141  
    */
 142  
   public void receivedRequestCountFromMaster(long requestCount, int taskId) {
 143  0
     masterBarrier.requirePermits(requestCount, taskId);
 144  0
   }
 145  
 
 146  
   /**
 147  
    * Notify this object that an aggregator request from some worker has been
 148  
    * received.
 149  
    */
 150  
   public void receivedRequestFromWorker() {
 151  0
     workersBarrier.releaseOnePermit();
 152  0
   }
 153  
 
 154  
   /**
 155  
    * Notify this object about the total number of requests which should
 156  
    * arrive from one of the workers.
 157  
    *
 158  
    * @param requestCount Number of requests which should arrive
 159  
    * @param taskId Task id of that worker
 160  
    */
 161  
   public void receivedRequestCountFromWorker(long requestCount, int taskId) {
 162  0
     workersBarrier.requirePermits(requestCount, taskId);
 163  0
   }
 164  
 
 165  
   /**
 166  
    * This function will wait until all aggregator requests from master have
 167  
    * arrived, and return that data afterwards.
 168  
    *
 169  
    * @param masterInfo Master info
 170  
    * @return Iterable through data received from master
 171  
    */
 172  
   public Iterable<byte[]> getDataFromMasterWhenReady(MasterInfo masterInfo) {
 173  0
     masterBarrier.waitForRequiredPermits(
 174  0
         Collections.singleton(masterInfo.getTaskId()));
 175  0
     if (LOG.isDebugEnabled()) {
 176  0
       LOG.debug("getDataFromMasterWhenReady: " +
 177  
           "Aggregator data for distribution ready");
 178  
     }
 179  0
     return masterData;
 180  
   }
 181  
 
 182  
   /**
 183  
    * This function will wait until all aggregator requests from workers have
 184  
    * arrived, and fill the maps for next superstep when ready.
 185  
    *
 186  
    * @param workerIds All workers in the job apart from the current one
 187  
    * @param broadcastedMapToFill Broadcast map to fill out
 188  
    * @param reducerMapToFill Registered reducer map to fill out.
 189  
    */
 190  
   public void fillNextSuperstepMapsWhenReady(
 191  
       Set<Integer> workerIds,
 192  
       Map<String, Writable> broadcastedMapToFill,
 193  
       Map<String, Reducer<Object, Writable>> reducerMapToFill) {
 194  0
     workersBarrier.waitForRequiredPermits(workerIds);
 195  0
     if (LOG.isDebugEnabled()) {
 196  0
       LOG.debug("fillNextSuperstepMapsWhenReady: Global data ready");
 197  
     }
 198  
 
 199  0
     Preconditions.checkArgument(broadcastedMapToFill.isEmpty(),
 200  
         "broadcastedMap needs to be empty for filling");
 201  0
     Preconditions.checkArgument(reducerMapToFill.isEmpty(),
 202  
         "reducerMap needs to be empty for filling");
 203  
 
 204  0
     broadcastedMapToFill.putAll(broadcastedMap);
 205  
 
 206  
     for (Entry<String, ReduceOperation<Object, Writable>> entry :
 207  0
         reduceOpMap.entrySet()) {
 208  0
       reducerMapToFill.put(entry.getKey(), new Reducer<>(entry.getValue()));
 209  0
     }
 210  
 
 211  0
     broadcastedMap.clear();
 212  0
     reduceOpMap.clear();
 213  0
     masterData.clear();
 214  0
     if (LOG.isDebugEnabled()) {
 215  0
       LOG.debug("reset: Ready for next superstep");
 216  
     }
 217  0
   }
 218  
 }
 219