Coverage Report - org.apache.giraph.master.AggregatorToGlobalCommTranslation
 
Classes in this File Line Coverage Branch Coverage Complexity
AggregatorToGlobalCommTranslation
0%
0/78
0%
0/28
0
AggregatorToGlobalCommTranslation$AggregatorWrapper
0%
0/22
0%
0/2
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.master;
 19  
 
 20  
 import java.io.DataInput;
 21  
 import java.io.DataOutput;
 22  
 import java.io.IOException;
 23  
 import java.util.HashMap;
 24  
 import java.util.Map.Entry;
 25  
 
 26  
 import org.apache.giraph.aggregators.Aggregator;
 27  
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 28  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 29  
 import org.apache.giraph.utils.MasterLoggingAggregator;
 30  
 import org.apache.giraph.utils.WritableUtils;
 31  
 import org.apache.hadoop.io.Writable;
 32  
 import org.apache.log4j.Logger;
 33  
 
 34  
 import com.google.common.base.Preconditions;
 35  
 
 36  
 /**
 37  
  * Class that translates aggregator handling on the master to
 38  
  * reduce and broadcast operations supported by the MasterAggregatorHandler.
 39  
  */
 40  0
 public class AggregatorToGlobalCommTranslation
 41  
     implements MasterAggregatorUsage, Writable {
 42  
   /** Class logger */
 43  0
   private static final Logger LOG =
 44  0
       Logger.getLogger(AggregatorToGlobalCommTranslation.class);
 45  
 
 46  
   /** Class providing reduce and broadcast interface to use */
 47  
   private final MasterGlobalCommUsage globalComm;
 48  
   /** List of registered aggregators */
 49  0
   private final HashMap<String, AggregatorWrapper<Writable>>
 50  
   registeredAggregators = new HashMap<>();
 51  
 
 52  
   /**
 53  
    * List of init aggregator values, in case someone tries to
 54  
    * access aggregator immediatelly after registering it.
 55  
    *
 56  
    * Instead of simply returning value, we need to store it during
 57  
    * that superstep, so consecutive calls will return identical object,
 58  
    * which they can modify.
 59  
    */
 60  0
   private final HashMap<String, Writable>
 61  
   initAggregatorValues = new HashMap<>();
 62  
 
 63  
   /** Conf */
 64  
   private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
 65  
 
 66  
   /**
 67  
    * Constructor
 68  
    * @param conf Configuration
 69  
    * @param globalComm Global communication interface
 70  
    */
 71  
   public AggregatorToGlobalCommTranslation(
 72  
       ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
 73  0
       MasterGlobalCommUsage globalComm) {
 74  0
     this.conf = conf;
 75  0
     this.globalComm = globalComm;
 76  0
     MasterLoggingAggregator.registerAggregator(this, conf);
 77  0
   }
 78  
 
 79  
   @Override
 80  
   public <A extends Writable> A getAggregatedValue(String name) {
 81  0
     AggregatorWrapper<Writable> agg = registeredAggregators.get(name);
 82  0
     if (agg == null) {
 83  0
       LOG.warn("getAggregatedValue: " +
 84  0
         AggregatorUtils.getUnregisteredAggregatorMessage(name,
 85  0
             registeredAggregators.size() != 0, conf));
 86  
       // to make sure we are not accessing reducer of the same name.
 87  0
       return null;
 88  
     }
 89  
 
 90  0
     A value = globalComm.getReduced(name);
 91  0
     if (value == null) {
 92  0
       value = (A) initAggregatorValues.get(name);
 93  
     }
 94  
 
 95  0
     if (value == null) {
 96  0
       value = (A) agg.getReduceOp().createInitialValue();
 97  0
       initAggregatorValues.put(name, value);
 98  
     }
 99  
 
 100  0
     Preconditions.checkState(value != null);
 101  0
     return value;
 102  
   }
 103  
 
 104  
   @Override
 105  
   public <A extends Writable> void setAggregatedValue(String name, A value) {
 106  0
     AggregatorWrapper<Writable> aggregator = registeredAggregators.get(name);
 107  0
     if (aggregator == null) {
 108  0
       throw new IllegalArgumentException("setAggregatedValue: "  +
 109  0
           AggregatorUtils.getUnregisteredAggregatorMessage(name,
 110  0
               registeredAggregators.size() != 0, conf));
 111  
     }
 112  0
     aggregator.setCurrentValue(value);
 113  0
   }
 114  
 
 115  
   /**
 116  
    * Called after master compute, to do aggregator-&gt;reduce/broadcast
 117  
    * translation
 118  
    */
 119  
   public void postMasterCompute() {
 120  
     // broadcast what master set, or if it didn't broadcast reduced value
 121  
     // register reduce with the same value
 122  
     for (Entry<String, AggregatorWrapper<Writable>> entry :
 123  0
         registeredAggregators.entrySet()) {
 124  0
       Writable value = entry.getValue().getCurrentValue();
 125  0
       if (value == null) {
 126  0
         value = globalComm.getReduced(entry.getKey());
 127  
       }
 128  0
       Preconditions.checkState(value != null);
 129  
 
 130  0
       globalComm.broadcast(entry.getKey(), new AggregatorBroadcast<>(
 131  0
           entry.getValue().getReduceOp().getAggregatorClass(), value));
 132  
 
 133  
       // Always register clean instance of reduceOp, not to conflict with
 134  
       // reduceOp from previous superstep.
 135  0
       AggregatorReduceOperation<Writable> cleanReduceOp =
 136  0
           entry.getValue().createReduceOp();
 137  0
       if (entry.getValue().isPersistent()) {
 138  0
         globalComm.registerReducer(
 139  0
             entry.getKey(), cleanReduceOp, value);
 140  
       } else {
 141  0
         globalComm.registerReducer(
 142  0
             entry.getKey(), cleanReduceOp);
 143  
       }
 144  0
       entry.getValue().setCurrentValue(null);
 145  0
     }
 146  0
     initAggregatorValues.clear();
 147  0
   }
 148  
 
 149  
   /** Prepare before calling master compute */
 150  
   public void prepareSuperstep() {
 151  0
     MasterLoggingAggregator.logAggregatedValue(this, conf);
 152  0
   }
 153  
 
 154  
   @Override
 155  
   public <A extends Writable> boolean registerAggregator(String name,
 156  
       Class<? extends Aggregator<A>> aggregatorClass) throws
 157  
       InstantiationException, IllegalAccessException {
 158  0
     registerAggregator(name, aggregatorClass, false);
 159  0
     return true;
 160  
   }
 161  
 
 162  
   @Override
 163  
   public <A extends Writable> boolean registerPersistentAggregator(String name,
 164  
       Class<? extends Aggregator<A>> aggregatorClass) throws
 165  
       InstantiationException, IllegalAccessException {
 166  0
     registerAggregator(name, aggregatorClass, true);
 167  0
     return true;
 168  
   }
 169  
 
 170  
   @Override
 171  
   public void write(DataOutput out) throws IOException {
 172  0
     out.writeInt(registeredAggregators.size());
 173  
     for (Entry<String, AggregatorWrapper<Writable>> entry :
 174  0
         registeredAggregators.entrySet()) {
 175  0
       out.writeUTF(entry.getKey());
 176  0
       entry.getValue().write(out);
 177  0
     }
 178  0
   }
 179  
 
 180  
   @Override
 181  
   public void readFields(DataInput in) throws IOException {
 182  0
     registeredAggregators.clear();
 183  0
     int numAggregators = in.readInt();
 184  0
     for (int i = 0; i < numAggregators; i++) {
 185  0
       String name = in.readUTF();
 186  0
       AggregatorWrapper<Writable> agg = new AggregatorWrapper<>();
 187  0
       agg.readFields(in);
 188  0
       registeredAggregators.put(name, agg);
 189  
     }
 190  0
     initAggregatorValues.clear();
 191  0
   }
 192  
 
 193  
   /**
 194  
    * Helper function for registering aggregators.
 195  
    *
 196  
    * @param name            Name of the aggregator
 197  
    * @param aggregatorClass Aggregator class
 198  
    * @param persistent      Whether aggregator is persistent or not
 199  
    * @param <A>             Aggregated value type
 200  
    * @return Newly registered aggregator or aggregator which was previously
 201  
    *         created with selected name, if any
 202  
    */
 203  
   private <A extends Writable> AggregatorWrapper<A> registerAggregator
 204  
   (String name, Class<? extends Aggregator<A>> aggregatorClass,
 205  
       boolean persistent) throws InstantiationException,
 206  
       IllegalAccessException {
 207  0
     AggregatorWrapper<A> aggregatorWrapper =
 208  0
         (AggregatorWrapper<A>) registeredAggregators.get(name);
 209  0
     if (aggregatorWrapper == null) {
 210  0
       aggregatorWrapper =
 211  
           new AggregatorWrapper<A>(aggregatorClass, persistent);
 212  
       // postMasterCompute uses previously reduced value to broadcast,
 213  
       // unless current value is set. After aggregator is registered,
 214  
       // there was no previously reduced value, so set current value
 215  
       // to default to avoid calling getReduced() on unregistered reducer.
 216  
       // (which logs unnecessary warnings)
 217  0
       aggregatorWrapper.setCurrentValue(
 218  0
           aggregatorWrapper.getReduceOp().createInitialValue());
 219  0
       registeredAggregators.put(
 220  
           name, (AggregatorWrapper<Writable>) aggregatorWrapper);
 221  
     }
 222  0
     return aggregatorWrapper;
 223  
   }
 224  
 
 225  
   /**
 226  
    * Object holding all needed data related to single Aggregator
 227  
    * @param <A> Aggregated value type
 228  
    */
 229  
   private class AggregatorWrapper<A extends Writable>
 230  
       implements Writable {
 231  
     /** False iff aggregator should be reset at the end of each super step */
 232  
     private boolean persistent;
 233  
     /** Translation of aggregator to reduce operations */
 234  
     private AggregatorReduceOperation<A> reduceOp;
 235  
     /** Current value, set by master manually */
 236  
     private A currentValue;
 237  
 
 238  
     /** Constructor */
 239  0
     public AggregatorWrapper() {
 240  0
     }
 241  
 
 242  
     /**
 243  
      * Constructor
 244  
      * @param aggregatorClass Aggregator class
 245  
      * @param persistent Is persistent
 246  
      */
 247  
     public AggregatorWrapper(
 248  
         Class<? extends Aggregator<A>> aggregatorClass,
 249  0
         boolean persistent) {
 250  0
       this.persistent = persistent;
 251  0
       this.reduceOp = new AggregatorReduceOperation<>(aggregatorClass, conf);
 252  0
     }
 253  
 
 254  
     public AggregatorReduceOperation<A> getReduceOp() {
 255  0
       return reduceOp;
 256  
     }
 257  
 
 258  
     /**
 259  
      * Create a fresh instance of AggregatorReduceOperation
 260  
      * @return fresh instance of AggregatorReduceOperation
 261  
      */
 262  
     public AggregatorReduceOperation<A> createReduceOp() {
 263  0
       return reduceOp.createCopy();
 264  
     }
 265  
 
 266  
     public A getCurrentValue() {
 267  0
       return currentValue;
 268  
     }
 269  
 
 270  
     public void setCurrentValue(A currentValue) {
 271  0
       this.currentValue = currentValue;
 272  0
     }
 273  
 
 274  
     public boolean isPersistent() {
 275  0
       return persistent;
 276  
     }
 277  
 
 278  
     @Override
 279  
     public void write(DataOutput out) throws IOException {
 280  0
       out.writeBoolean(persistent);
 281  0
       reduceOp.write(out);
 282  
 
 283  0
       Preconditions.checkState(currentValue == null, "AggregatorWrapper " +
 284  
           "shouldn't have value at the end of the superstep");
 285  0
     }
 286  
 
 287  
     @Override
 288  
     public void readFields(DataInput in) throws IOException {
 289  0
       persistent = in.readBoolean();
 290  0
       reduceOp = WritableUtils.createWritable(
 291  0
           AggregatorReduceOperation.class, conf);
 292  0
       reduceOp.readFields(in);
 293  0
       currentValue = null;
 294  0
     }
 295  
   }
 296  
 }