Coverage Report - org.apache.giraph.master.MasterAggregatorHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
MasterAggregatorHandler
0%
0/132
0%
0/50
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.master;
 19  
 
 20  
 import java.io.DataInput;
 21  
 import java.io.DataOutput;
 22  
 import java.io.IOException;
 23  
 import java.util.Map;
 24  
 import java.util.Map.Entry;
 25  
 
 26  
 import org.apache.giraph.aggregators.AggregatorWriter;
 27  
 import org.apache.giraph.bsp.BspService;
 28  
 import org.apache.giraph.bsp.SuperstepState;
 29  
 import org.apache.giraph.comm.GlobalCommType;
 30  
 import org.apache.giraph.comm.MasterClient;
 31  
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 32  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 33  
 import org.apache.giraph.reducers.ReduceOperation;
 34  
 import org.apache.giraph.reducers.Reducer;
 35  
 import org.apache.giraph.utils.WritableUtils;
 36  
 import org.apache.hadoop.io.Writable;
 37  
 import org.apache.hadoop.util.Progressable;
 38  
 import org.apache.log4j.Logger;
 39  
 
 40  
 import com.google.common.base.Preconditions;
 41  
 import com.google.common.collect.Maps;
 42  
 
 43  
 /** Handler for reduce/broadcast on the master */
 44  
 public class MasterAggregatorHandler
 45  
     implements MasterGlobalCommUsageAggregators, Writable {
 46  
   /** Class logger */
 47  0
   private static final Logger LOG =
 48  0
       Logger.getLogger(MasterAggregatorHandler.class);
 49  
 
 50  
   /** Map of reducers registered for the next worker computation */
 51  0
   private final Map<String, Reducer<Object, Writable>> reducerMap =
 52  0
       Maps.newHashMap();
 53  
   /** Map of values to be sent to workers for next computation */
 54  0
   private final Map<String, Writable> broadcastMap =
 55  0
       Maps.newHashMap();
 56  
   /** Values reduced from previous computation */
 57  0
   private final Map<String, Writable> reducedMap =
 58  0
       Maps.newHashMap();
 59  
 
 60  
   /** Aggregator writer - for writing reduced values */
 61  
   private final AggregatorWriter aggregatorWriter;
 62  
   /** Progressable used to report progress */
 63  
   private final Progressable progressable;
 64  
 
 65  
   /** Conf */
 66  
   private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
 67  
 
 68  
   /**
 69  
    * Constructor
 70  
    *
 71  
    * @param conf Configuration
 72  
    * @param progressable Progress reporter
 73  
    */
 74  
   public MasterAggregatorHandler(
 75  
       ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
 76  0
       Progressable progressable) {
 77  0
     this.progressable = progressable;
 78  0
     this.conf = conf;
 79  0
     aggregatorWriter = conf.createAggregatorWriter();
 80  0
   }
 81  
 
 82  
   @Override
 83  
   public final <S, R extends Writable> void registerReducer(
 84  
       String name, ReduceOperation<S, R> reduceOp) {
 85  0
     registerReducer(name, reduceOp, reduceOp.createInitialValue());
 86  0
   }
 87  
 
 88  
   @Override
 89  
   public <S, R extends Writable> void registerReducer(
 90  
       String name, ReduceOperation<S, R> reduceOp,
 91  
       R globalInitialValue) {
 92  0
     if (reducerMap.containsKey(name)) {
 93  0
       throw new IllegalArgumentException(
 94  
           "Reducer with name " + name + " was already registered, " +
 95  0
           " and is " + reducerMap.get(name) + ", and we are trying to " +
 96  
           " register " + reduceOp);
 97  
     }
 98  0
     if (reduceOp == null) {
 99  0
       throw new IllegalArgumentException(
 100  
           "null reducer cannot be registered, with name " + name);
 101  
     }
 102  0
     if (globalInitialValue == null) {
 103  0
       throw new IllegalArgumentException(
 104  
           "global initial value for reducer cannot be null, but is for " +
 105  
           reduceOp + " with naem" + name);
 106  
     }
 107  
 
 108  0
     Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
 109  0
     reducerMap.put(name, (Reducer<Object, Writable>) reducer);
 110  0
   }
 111  
 
 112  
   @Override
 113  
   public <T extends Writable> T getReduced(String name) {
 114  0
     T value = (T) reducedMap.get(name);
 115  0
     if (value == null) {
 116  0
       LOG.warn("getReduced: " +
 117  0
         AggregatorUtils.getUnregisteredReducerMessage(name,
 118  0
             reducedMap.size() != 0, conf));
 119  
     }
 120  0
     return value;
 121  
   }
 122  
 
 123  
   @Override
 124  
   public void broadcast(String name, Writable object) {
 125  0
     if (broadcastMap.containsKey(name)) {
 126  0
       throw new IllegalArgumentException(
 127  
           "Value already broadcasted for name " + name);
 128  
     }
 129  0
     if (object == null) {
 130  0
       throw new IllegalArgumentException("null cannot be broadcasted");
 131  
     }
 132  
 
 133  0
     broadcastMap.put(name, object);
 134  0
   }
 135  
 
 136  
   /** Prepare reduced values for current superstep's master compute */
 137  
   public void prepareSuperstep() {
 138  0
     if (LOG.isDebugEnabled()) {
 139  0
       LOG.debug("prepareSuperstep: Start preparing reducers");
 140  
     }
 141  
 
 142  0
     Preconditions.checkState(reducedMap.isEmpty(),
 143  
         "reducedMap must be empty before start of the superstep");
 144  0
     Preconditions.checkState(broadcastMap.isEmpty(),
 145  
         "broadcastMap must be empty before start of the superstep");
 146  
 
 147  
     for (Entry<String, Reducer<Object, Writable>> entry :
 148  0
         reducerMap.entrySet()) {
 149  0
       Writable value = entry.getValue().getCurrentValue();
 150  0
       if (value == null) {
 151  0
         value = entry.getValue().createInitialValue();
 152  
       }
 153  
 
 154  0
       reducedMap.put(entry.getKey(), value);
 155  0
     }
 156  
 
 157  0
     reducerMap.clear();
 158  
 
 159  0
     if (LOG.isDebugEnabled()) {
 160  0
       LOG.debug("prepareSuperstep: Aggregators prepared");
 161  
     }
 162  0
   }
 163  
 
 164  
   /** Finalize aggregators for current superstep */
 165  
   public void finishSuperstep() {
 166  0
     if (LOG.isDebugEnabled()) {
 167  0
       LOG.debug("finishSuperstep: Start finishing aggregators");
 168  
     }
 169  
 
 170  0
     reducedMap.clear();
 171  
 
 172  0
     if (LOG.isDebugEnabled()) {
 173  0
       LOG.debug("finishSuperstep: Aggregators finished");
 174  
     }
 175  0
   }
 176  
 
 177  
   /**
 178  
    * Send data to workers (through owner workers)
 179  
    *
 180  
    * @param masterClient IPC client on master
 181  
    */
 182  
   public void sendDataToOwners(MasterClient masterClient) {
 183  
     // send broadcast values and reduceOperations to their owners
 184  
     try {
 185  
       for (Entry<String, Reducer<Object, Writable>> entry :
 186  0
           reducerMap.entrySet()) {
 187  0
         masterClient.sendToOwner(entry.getKey(),
 188  
             GlobalCommType.REDUCE_OPERATIONS,
 189  0
             entry.getValue().getReduceOp());
 190  0
         progressable.progress();
 191  0
       }
 192  
 
 193  0
       for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
 194  0
         masterClient.sendToOwner(entry.getKey(),
 195  
             GlobalCommType.BROADCAST,
 196  0
             entry.getValue());
 197  0
         progressable.progress();
 198  0
       }
 199  0
       masterClient.finishSendingValues();
 200  
 
 201  0
       broadcastMap.clear();
 202  0
     } catch (IOException e) {
 203  0
       throw new IllegalStateException("finishSuperstep: " +
 204  
           "IOException occurred while sending aggregators", e);
 205  0
     }
 206  0
   }
 207  
 
 208  
   /**
 209  
    * Accept reduced values sent by worker. Every value will be sent
 210  
    * only once, by its owner.
 211  
    * We don't need to count the number of these requests because global
 212  
    * superstep barrier will happen after workers ensure all requests of this
 213  
    * type have been received and processed by master.
 214  
    *
 215  
    * @param reducedValuesInput Input in which aggregated values are
 216  
    *                              written in the following format:
 217  
    *                              numReducers
 218  
    *                              name_1  REDUCED_VALUE  value_1
 219  
    *                              name_2  REDUCED_VALUE  value_2
 220  
    *                              ...
 221  
    * @throws IOException
 222  
    */
 223  
   public void acceptReducedValues(
 224  
       DataInput reducedValuesInput) throws IOException {
 225  0
     int numReducers = reducedValuesInput.readInt();
 226  0
     for (int i = 0; i < numReducers; i++) {
 227  0
       String name = reducedValuesInput.readUTF();
 228  
       GlobalCommType type =
 229  0
           GlobalCommType.values()[reducedValuesInput.readByte()];
 230  0
       if (type != GlobalCommType.REDUCED_VALUE) {
 231  0
         throw new IllegalStateException(
 232  
             "SendReducedToMasterRequest received " + type);
 233  
       }
 234  0
       Reducer<Object, Writable> reducer = reducerMap.get(name);
 235  0
       if (reducer == null) {
 236  0
         throw new IllegalStateException(
 237  
             "acceptReducedValues: " +
 238  
                 "Master received reduced value which isn't registered: " +
 239  
                 name);
 240  
       }
 241  
 
 242  0
       Writable valueToReduce = reducer.createInitialValue();
 243  0
       valueToReduce.readFields(reducedValuesInput);
 244  
 
 245  0
       if (reducer.getCurrentValue() != null) {
 246  0
         reducer.reduceMerge(valueToReduce);
 247  
       } else {
 248  0
         reducer.setCurrentValue(valueToReduce);
 249  
       }
 250  0
       progressable.progress();
 251  
     }
 252  0
     if (LOG.isDebugEnabled()) {
 253  0
       LOG.debug("acceptReducedValues: Accepted one set with " +
 254  
           numReducers + " aggregated values");
 255  
     }
 256  0
   }
 257  
 
 258  
   /**
 259  
    * Write aggregators to {@link AggregatorWriter}
 260  
    *
 261  
    * @param superstep      Superstep which just finished
 262  
    * @param superstepState State of the superstep which just finished
 263  
    */
 264  
   public void writeAggregators(
 265  
       long superstep, SuperstepState superstepState) {
 266  
     try {
 267  0
       aggregatorWriter.writeAggregator(reducedMap.entrySet(),
 268  
           (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
 269  
               AggregatorWriter.LAST_SUPERSTEP : superstep);
 270  0
     } catch (IOException e) {
 271  0
       throw new IllegalStateException(
 272  
           "coordinateSuperstep: IOException while " +
 273  
               "writing aggregators data", e);
 274  0
     }
 275  0
   }
 276  
 
 277  
   /**
 278  
    * Initialize {@link AggregatorWriter}
 279  
    *
 280  
    * @param service BspService
 281  
    */
 282  
   public void initialize(BspService service) {
 283  
     try {
 284  0
       aggregatorWriter.initialize(service.getContext(),
 285  0
           service.getApplicationAttempt());
 286  0
     } catch (IOException e) {
 287  0
       throw new IllegalStateException("initialize: " +
 288  
           "Couldn't initialize aggregatorWriter", e);
 289  0
     }
 290  0
   }
 291  
 
 292  
   /**
 293  
    * Close {@link AggregatorWriter}
 294  
    *
 295  
    * @throws IOException
 296  
    */
 297  
   public void close() throws IOException {
 298  0
     aggregatorWriter.close();
 299  0
   }
 300  
 
 301  
   @Override
 302  
   public void write(DataOutput out) throws IOException {
 303  
     // At the end of superstep, only reduceOpMap can be non-empty
 304  0
     Preconditions.checkState(reducedMap.isEmpty(),
 305  
         "reducedMap must be empty at the end of the superstep");
 306  
 
 307  0
     out.writeInt(reducerMap.size());
 308  
     for (Entry<String, Reducer<Object, Writable>> entry :
 309  0
         reducerMap.entrySet()) {
 310  0
       out.writeUTF(entry.getKey());
 311  0
       entry.getValue().write(out);
 312  0
       progressable.progress();
 313  0
     }
 314  
 
 315  0
     out.writeInt(broadcastMap.size());
 316  0
     for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
 317  0
       out.writeUTF(entry.getKey());
 318  0
       WritableUtils.writeWritableObject(entry.getValue(), out);
 319  0
     }
 320  0
   }
 321  
 
 322  
   @Override
 323  
   public void readFields(DataInput in) throws IOException {
 324  0
     reducedMap.clear();
 325  0
     broadcastMap.clear();
 326  0
     reducerMap.clear();
 327  
 
 328  0
     int numReducers = in.readInt();
 329  0
     for (int i = 0; i < numReducers; i++) {
 330  0
       String name = in.readUTF();
 331  0
       Reducer<Object, Writable> reducer = new Reducer<>();
 332  0
       reducer.readFields(in, conf);
 333  0
       reducerMap.put(name, reducer);
 334  
     }
 335  
 
 336  0
     int numBroadcast = in.readInt();
 337  0
     for (int i = 0; i < numBroadcast; i++) {
 338  0
       String name = in.readUTF();
 339  0
       Writable value = WritableUtils.readWritableObject(in, conf);
 340  0
       broadcastMap.put(name, value);
 341  
     }
 342  0
   }
 343  
 }