Coverage Report - org.apache.giraph.worker.WorkerAggregatorHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
WorkerAggregatorHandler
0%
0/113
0%
0/34
0
WorkerAggregatorHandler$ThreadLocalWorkerGlobalCommUsage
0%
0/35
0%
0/12
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.worker;
 19  
 
 20  
 import java.io.IOException;
 21  
 import java.util.Map;
 22  
 import java.util.Map.Entry;
 23  
 import java.util.Set;
 24  
 
 25  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 26  
 import org.apache.giraph.comm.GlobalCommType;
 27  
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 28  
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 29  
 import org.apache.giraph.comm.aggregators.GlobalCommValueOutputStream;
 30  
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
 31  
 import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
 32  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 33  
 import org.apache.giraph.reducers.ReduceOperation;
 34  
 import org.apache.giraph.reducers.Reducer;
 35  
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
 36  
 import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
 37  
 import org.apache.giraph.utils.WritableUtils;
 38  
 import org.apache.hadoop.io.Writable;
 39  
 import org.apache.hadoop.util.Progressable;
 40  
 import org.apache.log4j.Logger;
 41  
 
 42  
 import com.google.common.collect.Maps;
 43  
 import com.google.common.collect.Sets;
 44  
 
 45  
 /** Handler for reduce/broadcast on the workers */
 46  0
 public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
 47  
   /** Class logger */
 48  0
   private static final Logger LOG =
 49  0
       Logger.getLogger(WorkerAggregatorHandler.class);
 50  
   /** Map of broadcasted values */
 51  0
   private final Map<String, Writable> broadcastedMap =
 52  0
       Maps.newHashMap();
 53  
   /** Map of reducers currently being reduced */
 54  0
   private final Map<String, Reducer<Object, Writable>> reducerMap =
 55  0
       Maps.newHashMap();
 56  
 
 57  
   /** Service worker */
 58  
   private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
 59  
   /** Progressable for reporting progress */
 60  
   private final Progressable progressable;
 61  
   /** How big a single aggregator request can be */
 62  
   private final int maxBytesPerAggregatorRequest;
 63  
   /** Giraph configuration */
 64  
   private final ImmutableClassesGiraphConfiguration conf;
 65  
 
 66  
   /**
 67  
    * Constructor
 68  
    *
 69  
    * @param serviceWorker Service worker
 70  
    * @param conf          Giraph configuration
 71  
    * @param progressable  Progressable for reporting progress
 72  
    */
 73  
   public WorkerAggregatorHandler(
 74  
       CentralizedServiceWorker<?, ?, ?> serviceWorker,
 75  
       ImmutableClassesGiraphConfiguration conf,
 76  0
       Progressable progressable) {
 77  0
     this.serviceWorker = serviceWorker;
 78  0
     this.progressable = progressable;
 79  0
     this.conf = conf;
 80  0
     maxBytesPerAggregatorRequest = conf.getInt(
 81  
         AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
 82  
         AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
 83  0
   }
 84  
 
 85  
   @Override
 86  
   public <B extends Writable> B getBroadcast(String name) {
 87  0
     B value = (B) broadcastedMap.get(name);
 88  0
     if (value == null) {
 89  0
       LOG.warn("getBroadcast: " +
 90  0
           AggregatorUtils.getUnregisteredBroadcastMessage(name,
 91  0
               broadcastedMap.size() != 0, conf));
 92  
     }
 93  0
     return value;
 94  
   }
 95  
 
 96  
   @Override
 97  
   public void reduce(String name, Object value) {
 98  0
     Reducer<Object, Writable> reducer = reducerMap.get(name);
 99  0
     if (reducer != null) {
 100  0
       progressable.progress();
 101  0
       synchronized (reducer) {
 102  0
         reducer.reduce(value);
 103  0
       }
 104  
     } else {
 105  0
       throw new IllegalStateException("reduce: " +
 106  0
           AggregatorUtils.getUnregisteredReducerMessage(name,
 107  0
               reducerMap.size() != 0, conf));
 108  
     }
 109  0
   }
 110  
 
 111  
   /**
 112  
    * Combine partially reduced value into currently reduced value.
 113  
    * @param name Name of the reducer
 114  
    * @param valueToReduce Partial value to reduce
 115  
    */
 116  
   @Override
 117  
   public void reduceMerge(String name, Writable valueToReduce) {
 118  0
     Reducer<Object, Writable> reducer = reducerMap.get(name);
 119  0
     if (reducer != null) {
 120  0
       progressable.progress();
 121  0
       synchronized (reducer) {
 122  0
         reducer.reduceMerge(valueToReduce);
 123  0
       }
 124  
     } else {
 125  0
       throw new IllegalStateException("reduce: " +
 126  0
           AggregatorUtils.getUnregisteredReducerMessage(name,
 127  0
               reducerMap.size() != 0, conf));
 128  
     }
 129  0
   }
 130  
 
 131  
   /**
 132  
    * Prepare aggregators for current superstep
 133  
    *
 134  
    * @param requestProcessor Request processor for aggregators
 135  
    */
 136  
   public void prepareSuperstep(
 137  
       WorkerAggregatorRequestProcessor requestProcessor) {
 138  0
     broadcastedMap.clear();
 139  0
     reducerMap.clear();
 140  
 
 141  0
     if (LOG.isDebugEnabled()) {
 142  0
       LOG.debug("prepareSuperstep: Start preparing aggregators");
 143  
     }
 144  0
     AllAggregatorServerData allGlobalCommData =
 145  0
         serviceWorker.getServerData().getAllAggregatorData();
 146  
     // Wait for my aggregators
 147  0
     Iterable<byte[]> dataToDistribute =
 148  0
         allGlobalCommData.getDataFromMasterWhenReady(
 149  0
             serviceWorker.getMasterInfo());
 150  
     try {
 151  
       // Distribute my aggregators
 152  0
       requestProcessor.distributeReducedValues(dataToDistribute);
 153  0
     } catch (IOException e) {
 154  0
       throw new IllegalStateException("prepareSuperstep: " +
 155  
           "IOException occurred while trying to distribute aggregators", e);
 156  0
     }
 157  
     // Wait for all other aggregators and store them
 158  0
     allGlobalCommData.fillNextSuperstepMapsWhenReady(
 159  0
         getOtherWorkerIdsSet(), broadcastedMap,
 160  
         reducerMap);
 161  0
     if (LOG.isDebugEnabled()) {
 162  0
       LOG.debug("prepareSuperstep: Aggregators prepared");
 163  
     }
 164  0
   }
 165  
 
 166  
   /**
 167  
    * Send aggregators to their owners and in the end to the master
 168  
    *
 169  
    * @param requestProcessor Request processor for aggregators
 170  
    */
 171  
   public void finishSuperstep(
 172  
       WorkerAggregatorRequestProcessor requestProcessor) {
 173  0
     if (LOG.isInfoEnabled()) {
 174  0
       LOG.info("finishSuperstep: Start gathering aggregators, " +
 175  
           "workers will send their aggregated values " +
 176  
           "once they are done with superstep computation");
 177  
     }
 178  0
     OwnerAggregatorServerData ownerGlobalCommData =
 179  0
         serviceWorker.getServerData().getOwnerAggregatorData();
 180  
     // First send partial aggregated values to their owners and determine
 181  
     // which aggregators belong to this worker
 182  
     for (Map.Entry<String, Reducer<Object, Writable>> entry :
 183  0
         reducerMap.entrySet()) {
 184  
       try {
 185  0
         boolean sent = requestProcessor.sendReducedValue(entry.getKey(),
 186  0
             entry.getValue().getCurrentValue());
 187  0
         if (!sent) {
 188  
           // If it's my aggregator, add it directly
 189  0
           ownerGlobalCommData.reduce(entry.getKey(),
 190  0
               entry.getValue().getCurrentValue());
 191  
         }
 192  0
       } catch (IOException e) {
 193  0
         throw new IllegalStateException("finishSuperstep: " +
 194  
             "IOException occurred while sending aggregator " +
 195  0
             entry.getKey() + " to its owner", e);
 196  0
       }
 197  0
       progressable.progress();
 198  0
     }
 199  
     try {
 200  
       // Flush
 201  0
       requestProcessor.flush();
 202  0
     } catch (IOException e) {
 203  0
       throw new IllegalStateException("finishSuperstep: " +
 204  
           "IOException occurred while sending aggregators to owners", e);
 205  0
     }
 206  
 
 207  
     // Wait to receive partial aggregated values from all other workers
 208  0
     Iterable<Map.Entry<String, Writable>> myReducedValues =
 209  0
         ownerGlobalCommData.getMyReducedValuesWhenReady(
 210  0
             getOtherWorkerIdsSet());
 211  
 
 212  
     // Send final aggregated values to master
 213  0
     GlobalCommValueOutputStream globalOutput =
 214  
         new GlobalCommValueOutputStream(false);
 215  0
     for (Map.Entry<String, Writable> entry : myReducedValues) {
 216  
       try {
 217  0
         int currentSize = globalOutput.addValue(entry.getKey(),
 218  
             GlobalCommType.REDUCED_VALUE,
 219  0
             entry.getValue());
 220  0
         if (currentSize > maxBytesPerAggregatorRequest) {
 221  0
           requestProcessor.sendReducedValuesToMaster(
 222  0
               globalOutput.flush());
 223  
         }
 224  0
         progressable.progress();
 225  0
       } catch (IOException e) {
 226  0
         throw new IllegalStateException("finishSuperstep: " +
 227  
             "IOException occurred while writing aggregator " +
 228  0
             entry.getKey(), e);
 229  0
       }
 230  0
     }
 231  
     try {
 232  0
       requestProcessor.sendReducedValuesToMaster(globalOutput.flush());
 233  0
     } catch (IOException e) {
 234  0
       throw new IllegalStateException("finishSuperstep: " +
 235  
           "IOException occured while sending aggregators to master", e);
 236  0
     }
 237  
     // Wait for master to receive aggregated values before proceeding
 238  0
     serviceWorker.getWorkerClient().waitAllRequests();
 239  
 
 240  0
     ownerGlobalCommData.reset();
 241  0
     if (LOG.isDebugEnabled()) {
 242  0
       LOG.debug("finishSuperstep: Aggregators finished");
 243  
     }
 244  0
   }
 245  
 
 246  
   /**
 247  
    * Create new aggregator usage which will be used by one of the compute
 248  
    * threads.
 249  
    *
 250  
    * @return New aggregator usage
 251  
    */
 252  
   public WorkerThreadGlobalCommUsage newThreadAggregatorUsage() {
 253  0
     if (AggregatorUtils.useThreadLocalAggregators(conf)) {
 254  0
       return new ThreadLocalWorkerGlobalCommUsage();
 255  
     } else {
 256  0
       return this;
 257  
     }
 258  
   }
 259  
 
 260  
   @Override
 261  
   public void finishThreadComputation() {
 262  
     // If we don't use thread-local aggregators, all the aggregated values
 263  
     // are already in this object
 264  0
   }
 265  
 
 266  
   /**
 267  
    * Get set of all worker task ids except the current one
 268  
    *
 269  
    * @return Set of all other worker task ids
 270  
    */
 271  
   public Set<Integer> getOtherWorkerIdsSet() {
 272  0
     Set<Integer> otherWorkers = Sets.newHashSetWithExpectedSize(
 273  0
         serviceWorker.getWorkerInfoList().size());
 274  0
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
 275  0
       if (workerInfo.getTaskId() != serviceWorker.getWorkerInfo().getTaskId()) {
 276  0
         otherWorkers.add(workerInfo.getTaskId());
 277  
       }
 278  0
     }
 279  0
     return otherWorkers;
 280  
   }
 281  
 
 282  
   /**
 283  
   * Not thread-safe implementation of {@link WorkerThreadGlobalCommUsage}.
 284  
   * We can use one instance of this object per thread to prevent
 285  
   * synchronizing on each aggregate() call. In the end of superstep,
 286  
   * values from each of these will be aggregated back to {@link
 287  
   * WorkerThreadGlobalCommUsage}
 288  
   */
 289  
   public class ThreadLocalWorkerGlobalCommUsage
 290  
     implements WorkerThreadGlobalCommUsage {
 291  
     /** Thread-local reducer map */
 292  
     private final Map<String, Reducer<Object, Writable>> threadReducerMap;
 293  
 
 294  
     /**
 295  
     * Constructor
 296  
     *
 297  
     * Creates new instances of all reducers from
 298  
     * {@link WorkerAggregatorHandler}
 299  
     */
 300  0
     public ThreadLocalWorkerGlobalCommUsage() {
 301  0
       threadReducerMap = Maps.newHashMapWithExpectedSize(
 302  0
           WorkerAggregatorHandler.this.reducerMap.size());
 303  
 
 304  0
       UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
 305  0
       UnsafeReusableByteArrayInput in = new UnsafeReusableByteArrayInput();
 306  
 
 307  
       for (Entry<String, Reducer<Object, Writable>> entry :
 308  0
           reducerMap.entrySet()) {
 309  0
         ReduceOperation<Object, Writable> globalReduceOp =
 310  0
             entry.getValue().getReduceOp();
 311  
 
 312  0
         ReduceOperation<Object, Writable> threadLocalCopy =
 313  0
             WritableUtils.createCopy(out, in, globalReduceOp, conf);
 314  
 
 315  0
         threadReducerMap.put(entry.getKey(), new Reducer<>(threadLocalCopy));
 316  0
       }
 317  0
     }
 318  
 
 319  
     @Override
 320  
     public void reduce(String name, Object value) {
 321  0
       Reducer<Object, Writable> reducer = threadReducerMap.get(name);
 322  0
       if (reducer != null) {
 323  0
         progressable.progress();
 324  0
         reducer.reduce(value);
 325  
       } else {
 326  0
         throw new IllegalStateException("reduce: " +
 327  0
             AggregatorUtils.getUnregisteredAggregatorMessage(name,
 328  0
                 threadReducerMap.size() != 0, conf));
 329  
       }
 330  0
     }
 331  
 
 332  
     @Override
 333  
     public void reduceMerge(String name, Writable value) {
 334  0
       Reducer<Object, Writable> reducer = threadReducerMap.get(name);
 335  0
       if (reducer != null) {
 336  0
         progressable.progress();
 337  0
         reducer.reduceMerge(value);
 338  
       } else {
 339  0
         throw new IllegalStateException("reduceMerge: " +
 340  0
             AggregatorUtils.getUnregisteredAggregatorMessage(name,
 341  0
                 threadReducerMap.size() != 0, conf));
 342  
       }
 343  0
     }
 344  
 
 345  
     @Override
 346  
     public <B extends Writable> B getBroadcast(String name) {
 347  0
       return WorkerAggregatorHandler.this.getBroadcast(name);
 348  
     }
 349  
 
 350  
     @Override
 351  
     public void finishThreadComputation() {
 352  
       // Aggregate the values this thread's vertices provided back to
 353  
       // WorkerAggregatorHandler
 354  
       for (Entry<String, Reducer<Object, Writable>> entry :
 355  0
           threadReducerMap.entrySet()) {
 356  0
         WorkerAggregatorHandler.this.reduceMerge(entry.getKey(),
 357  0
             entry.getValue().getCurrentValue());
 358  0
       }
 359  0
     }
 360  
   }
 361  
 
 362  
 }