Coverage Report - org.apache.giraph.comm.aggregators.OwnerAggregatorServerData
 
Classes in this File Line Coverage Branch Coverage Complexity
OwnerAggregatorServerData
0%
0/35
0%
0/8
0
OwnerAggregatorServerData$1
0%
0/4
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.aggregators;
 20  
 
 21  
 import java.util.AbstractMap;
 22  
 import java.util.Map;
 23  
 import java.util.Set;
 24  
 import java.util.concurrent.ConcurrentMap;
 25  
 
 26  
 import org.apache.giraph.reducers.ReduceOperation;
 27  
 import org.apache.giraph.reducers.Reducer;
 28  
 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
 29  
 import org.apache.hadoop.io.Writable;
 30  
 import org.apache.hadoop.util.Progressable;
 31  
 import org.apache.log4j.Logger;
 32  
 
 33  
 import com.google.common.base.Function;
 34  
 import com.google.common.collect.Iterables;
 35  
 import com.google.common.collect.Maps;
 36  
 
 37  
 /**
 38  
  * Class for holding aggregators which current worker owns,
 39  
  * and aggregating partial aggregator values from workers.
 40  
  *
 41  
  * Protocol:
 42  
  * 1. Before the beginning of superstep, worker receives its aggregators
 43  
  * from master, and these aggregators will be registered to this class.
 44  
  * Multiple registrations can be called concurrently.
 45  
  * 2. During the superstep, whenever a worker finishes computation,
 46  
  * it will send partial aggregated values to worker owner. This class is used
 47  
  * to help deserialize the arriving aggregator values, and aggregate the values
 48  
  * at the destination owner worker; these can happen concurrently.
 49  
  * (we know step 1. is finished before anything from step 2. happens because
 50  
  * other workers can't start computation before they receive aggregators
 51  
  * which this worker owns)
 52  
  * 3. This class also tracks the number of partial aggregator requests which
 53  
  * worker received. In the end of superstep, getMyAggregatorValuesWhenReady
 54  
  * will be called to ensure everything was received and get the values which
 55  
  * need to be sent to master.
 56  
  * Because of this counting, in step 2. even if worker owns no aggregators,
 57  
  * it will still send a message without aggregator data.
 58  
  * 4. In the end we reset to prepare for the next superstep.
 59  
  */
 60  
 public class OwnerAggregatorServerData {
 61  
   /** Class logger */
 62  0
   private static final Logger LOG =
 63  0
       Logger.getLogger(OwnerAggregatorServerData.class);
 64  
   /** Map of aggregators which current worker owns */
 65  0
   private final ConcurrentMap<String, Reducer<Object, Writable>>
 66  0
   myReducerMap = Maps.newConcurrentMap();
 67  
   /**
 68  
    * Counts the requests with partial aggregated values from other workers.
 69  
    * It uses GlobalCommType.SPECIAL_COUNT to know how many requests it
 70  
    * has to receive.
 71  
    */
 72  
   private final TaskIdsPermitsBarrier workersBarrier;
 73  
   /** Progressable used to report progress */
 74  
   private final Progressable progressable;
 75  
 
 76  
   /**
 77  
    * Constructor
 78  
    *
 79  
    * @param progressable Progressable used to report progress
 80  
    */
 81  0
   public OwnerAggregatorServerData(Progressable progressable) {
 82  0
     this.progressable = progressable;
 83  0
     workersBarrier = new TaskIdsPermitsBarrier(progressable);
 84  0
   }
 85  
 
 86  
   /**
 87  
    * Register a reducer which current worker owns. Thread-safe.
 88  
    *
 89  
    * @param name Name of aggregator
 90  
    * @param reduceOp Reduce operation
 91  
    */
 92  
   public void registerReducer(String name,
 93  
       ReduceOperation<Object, Writable> reduceOp) {
 94  0
     if (LOG.isDebugEnabled() && myReducerMap.isEmpty()) {
 95  0
       LOG.debug("registerAggregator: The first registration after a reset()");
 96  
     }
 97  0
     myReducerMap.putIfAbsent(name, new Reducer<>(reduceOp));
 98  0
     progressable.progress();
 99  0
   }
 100  
 
 101  
   /**
 102  
    * Reduce partial value of one of current worker's reducers.
 103  
    *
 104  
    * Thread-safe. Call only after reducers have been registered.
 105  
    *
 106  
    * @param name Name of the reducer
 107  
    * @param value Value to reduce to it
 108  
    */
 109  
   public void reduce(String name, Writable value) {
 110  0
     Reducer<Object, Writable> reducer = myReducerMap.get(name);
 111  0
     synchronized (reducer) {
 112  0
       reducer.reduceMerge(value);
 113  0
     }
 114  0
     progressable.progress();
 115  0
   }
 116  
 
 117  
 
 118  
   /**
 119  
    * Create initial value for a reducer. Used so requests
 120  
    * would be able to deserialize data.
 121  
    *
 122  
    * Thread-safe. Call only after reducer has been registered.
 123  
    *
 124  
    * @param name Name of the reducer
 125  
    * @return Empty value
 126  
    */
 127  
   public Writable createInitialValue(String name) {
 128  0
     Reducer<Object, Writable> reducer = myReducerMap.get(name);
 129  0
     synchronized (reducer) {
 130  0
       return reducer.createInitialValue();
 131  0
     }
 132  
   }
 133  
 
 134  
   /**
 135  
    * Notify this object that a partial aggregated values request from some
 136  
    * worker have been received. Thread-safe.
 137  
    */
 138  
   public void receivedRequestFromWorker() {
 139  0
     workersBarrier.releaseOnePermit();
 140  0
   }
 141  
 
 142  
   /**
 143  
    * Notify this object about the total number of requests which should
 144  
    * arrive from one of the workers. Thread-safe.
 145  
    *
 146  
    * @param requestCount Number of requests which should arrive
 147  
    * @param taskId Task id of that worker
 148  
    */
 149  
   public void receivedRequestCountFromWorker(long requestCount, int taskId) {
 150  0
     workersBarrier.requirePermits(requestCount, taskId);
 151  0
   }
 152  
 
 153  
   /**
 154  
    * This function will wait until all partial aggregated values from all
 155  
    * workers are ready and aggregated, and return final aggregated values
 156  
    * afterwards.
 157  
    *
 158  
    * @param workerIds All workers in the job apart from the current one
 159  
    * @return Iterable through final aggregated values which this worker owns
 160  
    */
 161  
   public Iterable<Map.Entry<String, Writable>>
 162  
   getMyReducedValuesWhenReady(Set<Integer> workerIds) {
 163  0
     workersBarrier.waitForRequiredPermits(workerIds);
 164  0
     if (LOG.isDebugEnabled()) {
 165  0
       LOG.debug("getMyAggregatorValuesWhenReady: Values ready");
 166  
     }
 167  0
     return Iterables.transform(myReducerMap.entrySet(),
 168  
         new Function<Map.Entry<String, Reducer<Object, Writable>>,
 169  0
             Map.Entry<String, Writable>>() {
 170  
           @Override
 171  
           public Map.Entry<String, Writable> apply(
 172  
               Map.Entry<String, Reducer<Object, Writable>> aggregator) {
 173  0
             return new AbstractMap.SimpleEntry<String, Writable>(
 174  0
                 aggregator.getKey(),
 175  0
                 aggregator.getValue().getCurrentValue());
 176  
           }
 177  
         });
 178  
   }
 179  
 
 180  
   /**
 181  
    * Prepare for next superstep
 182  
    */
 183  
   public void reset() {
 184  0
     myReducerMap.clear();
 185  0
     if (LOG.isDebugEnabled()) {
 186  0
       LOG.debug("reset: Ready for next superstep");
 187  
     }
 188  0
   }
 189  
 
 190  
 }