Coverage Report - org.apache.giraph.block_app.reducers.map.BasicMapReduce
 
Classes in this File Line Coverage Branch Coverage Complexity
BasicMapReduce
0%
0/45
0%
0/6
0
BasicMapReduce$1
0%
0/2
N/A
0
BasicMapReduce$2
0%
0/12
0%
0/2
0
BasicMapReduce$3
0%
0/10
N/A
0
BasicMapReduce$3$1
0%
0/7
0%
0/2
0
BasicMapReduce$3$2
0%
0/4
N/A
0
BasicMapReduce$4
0%
0/6
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.reducers.map;
 19  
 
 20  
 import java.io.DataInput;
 21  
 import java.io.DataOutput;
 22  
 import java.io.IOException;
 23  
 import java.util.Iterator;
 24  
 
 25  
 import org.apache.commons.lang3.tuple.MutablePair;
 26  
 import org.apache.commons.lang3.tuple.Pair;
 27  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 28  
 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
 29  
 import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
 30  
 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
 31  
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
 32  
 import org.apache.giraph.block_app.framework.piece.global_comm.map.BroadcastMapHandle;
 33  
 import org.apache.giraph.block_app.framework.piece.global_comm.map.ReducerMapHandle;
 34  
 import org.apache.giraph.master.MasterGlobalCommUsage;
 35  
 import org.apache.giraph.reducers.ReduceOperation;
 36  
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
 37  
 import org.apache.giraph.types.ops.PrimitiveTypeOps;
 38  
 import org.apache.giraph.types.ops.TypeOpsUtils;
 39  
 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
 40  
 import org.apache.giraph.types.ops.collections.WritableWriter;
 41  
 import org.apache.giraph.utils.WritableUtils;
 42  
 import org.apache.giraph.worker.WorkerBroadcastUsage;
 43  
 import org.apache.hadoop.io.Writable;
 44  
 import org.apache.hadoop.io.WritableComparable;
 45  
 
 46  
 
 47  
 /**
 48  
  * Efficient generic primitive map of values reduce operation.
 49  
  * (it is BasicMap Reduce, not to be confused with MapReduce)
 50  
  *
 51  
  * @param <K> Key type
 52  
  * @param <S> Single value type
 53  
  * @param <R> Reduced value type
 54  
  */
 55  0
 public class BasicMapReduce<K extends WritableComparable, S,
 56  
     R extends Writable>
 57  
     implements ReduceOperation<Pair<K, S>, Basic2ObjectMap<K, R>> {
 58  
   private PrimitiveIdTypeOps<K> keyTypeOps;
 59  
   private PrimitiveTypeOps<R> typeOps;
 60  
   private ReduceOperation<S, R> elementReduceOp;
 61  
   private WritableWriter<R> writer;
 62  
 
 63  0
   public BasicMapReduce() {
 64  0
   }
 65  
 
 66  
   /**
 67  
    * Create ReduceOperation that reduces BasicMaps by reducing individual
 68  
    * elements corresponding to the same key.
 69  
    *
 70  
    * @param keyTypeOps TypeOps of keys
 71  
    * @param typeOps TypeOps of individual elements
 72  
    * @param elementReduceOp ReduceOperation for individual elements
 73  
    */
 74  
   public BasicMapReduce(
 75  
       PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps,
 76  0
       ReduceOperation<S, R> elementReduceOp) {
 77  0
     this.keyTypeOps = keyTypeOps;
 78  0
     this.typeOps = typeOps;
 79  0
     this.elementReduceOp = elementReduceOp;
 80  0
     init();
 81  0
   }
 82  
 
 83  
   /**
 84  
    * Registers one new local reducer, that will reduce BasicMap,
 85  
    * by reducing individual elements corresponding to the same key
 86  
    * using {@code elementReduceOp}.
 87  
    *
 88  
    * This function will return ReducerMapHandle, by which
 89  
    * individual elements can be manipulated separately.
 90  
    *
 91  
    * @param keyTypeOps TypeOps of keys
 92  
    * @param typeOps TypeOps of individual elements
 93  
    * @param elementReduceOp ReduceOperation for individual elements
 94  
    * @param reduceApi API for creating reducers
 95  
    * @return Created ReducerMapHandle
 96  
    */
 97  
   public static <K extends WritableComparable, S, R extends Writable>
 98  
   ReducerMapHandle<K, S, R> createLocalMapHandles(
 99  
       PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps,
 100  
       ReduceOperation<S, R> elementReduceOp,
 101  
       final CreateReducersApi reduceApi) {
 102  0
     return createMapHandles(
 103  
         keyTypeOps, typeOps, elementReduceOp,
 104  0
         new CreateReducerFunctionApi() {
 105  
           @Override
 106  
           public <S, R extends Writable> ReducerHandle<S, R> createReducer(
 107  
               ReduceOperation<S, R> reduceOp) {
 108  0
             return reduceApi.createLocalReducer(reduceOp);
 109  
           }
 110  
         });
 111  
   }
 112  
 
 113  
   /**
 114  
    * Registers one new reducer, that will reduce BasicMap,
 115  
    * by reducing individual elements corresponding to the same key
 116  
    * using {@code elementReduceOp}.
 117  
    *
 118  
    * This function will return ReducerMapHandle, by which
 119  
    * individual elements can be manipulated separately.
 120  
    *
 121  
    * @param keyTypeOps TypeOps of keys
 122  
    * @param typeOps TypeOps of individual elements
 123  
    * @param elementReduceOp ReduceOperation for individual elements
 124  
    * @param createFunction Function for creating a reducer
 125  
    * @return Created ReducerMapHandle
 126  
    */
 127  
   public static <K extends WritableComparable, S, R extends Writable>
 128  
   ReducerMapHandle<K, S, R> createMapHandles(
 129  
       final PrimitiveIdTypeOps<K> keyTypeOps, final PrimitiveTypeOps<R> typeOps,
 130  
       ReduceOperation<S, R> elementReduceOp,
 131  
       CreateReducerFunctionApi createFunction) {
 132  0
     final ReducerHandle<Pair<K, S>, Basic2ObjectMap<K, R>> reduceHandle =
 133  0
       createFunction.createReducer(
 134  
           new BasicMapReduce<>(keyTypeOps, typeOps, elementReduceOp));
 135  0
     final K curIndex = keyTypeOps.create();
 136  0
     final R reusableValue = typeOps.create();
 137  0
     final R initialValue = elementReduceOp.createInitialValue();
 138  0
     final MutablePair<K, S> reusablePair = MutablePair.of(null, null);
 139  0
     final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
 140  
       @Override
 141  
       public R getReducedValue(MasterGlobalCommUsage master) {
 142  0
         Basic2ObjectMap<K, R> result = reduceHandle.getReducedValue(master);
 143  0
         R value = result.get(curIndex);
 144  0
         if (value == null) {
 145  0
           typeOps.set(reusableValue, initialValue);
 146  
         } else {
 147  0
           typeOps.set(reusableValue, value);
 148  
         }
 149  0
         return reusableValue;
 150  
       }
 151  
 
 152  
       @Override
 153  
       public void reduce(S valueToReduce) {
 154  0
         reusablePair.setLeft(curIndex);
 155  0
         reusablePair.setRight(valueToReduce);
 156  0
         reduceHandle.reduce(reusablePair);
 157  0
       }
 158  
 
 159  
       @Override
 160  
       public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
 161  0
         throw new UnsupportedOperationException();
 162  
       }
 163  
     };
 164  
 
 165  0
     return new ReducerMapHandle<K, S, R>() {
 166  
       @Override
 167  
       public ReducerHandle<S, R> get(K key) {
 168  0
         keyTypeOps.set(curIndex, key);
 169  0
         return elementReduceHandle;
 170  
       }
 171  
 
 172  
       @Override
 173  
       public int getReducedSize(BlockMasterApi master) {
 174  0
         return reduceHandle.getReducedValue(master).size();
 175  
       }
 176  
 
 177  
       @Override
 178  
       public BroadcastMapHandle<K, R> broadcastValue(BlockMasterApi master) {
 179  0
         final BroadcastHandle<Basic2ObjectMap<K, R>> broadcastHandle =
 180  0
           reduceHandle.broadcastValue(master);
 181  0
         final K curIndex = keyTypeOps.create();
 182  0
         final R reusableValue = typeOps.create();
 183  
         final BroadcastHandle<R>
 184  0
         elementBroadcastHandle = new BroadcastHandle<R>() {
 185  
           @Override
 186  
           public R getBroadcast(WorkerBroadcastUsage worker) {
 187  0
             Basic2ObjectMap<K, R> result = broadcastHandle.getBroadcast(worker);
 188  0
             R value = result.get(curIndex);
 189  0
             if (value == null) {
 190  0
               typeOps.set(reusableValue, initialValue);
 191  
             } else {
 192  0
               typeOps.set(reusableValue, value);
 193  
             }
 194  0
             return reusableValue;
 195  
           }
 196  
         };
 197  0
         return new BroadcastMapHandle<K, R>() {
 198  
           @Override
 199  
           public BroadcastHandle<R> get(K key) {
 200  0
             keyTypeOps.set(curIndex, key);
 201  0
             return elementBroadcastHandle;
 202  
           }
 203  
 
 204  
           @Override
 205  
           public int getBroadcastedSize(WorkerBroadcastUsage worker) {
 206  0
             return broadcastHandle.getBroadcast(worker).size();
 207  
           }
 208  
         };
 209  
       }
 210  
     };
 211  
   }
 212  
 
 213  
   private void init() {
 214  0
     writer = new WritableWriter<R>() {
 215  
       @Override
 216  
       public void write(DataOutput out, R value) throws IOException {
 217  0
         value.write(out);
 218  0
       }
 219  
 
 220  
       @Override
 221  
       public R readFields(DataInput in) throws IOException {
 222  0
         R result = typeOps.create();
 223  0
         result.readFields(in);
 224  0
         return result;
 225  
       }
 226  
     };
 227  0
   }
 228  
 
 229  
   @Override
 230  
   public Basic2ObjectMap<K, R> createInitialValue() {
 231  0
     return keyTypeOps.create2ObjectOpenHashMap(writer);
 232  
   }
 233  
 
 234  
   @Override
 235  
   public Basic2ObjectMap<K, R> reduce(
 236  
       Basic2ObjectMap<K, R> curValue, Pair<K, S> valueToReduce) {
 237  0
     R result = curValue.get(valueToReduce.getLeft());
 238  0
     if (result == null) {
 239  0
       result = typeOps.create();
 240  
     }
 241  0
     result = elementReduceOp.reduce(result, valueToReduce.getRight());
 242  0
     curValue.put(valueToReduce.getLeft(), result);
 243  0
     return curValue;
 244  
   }
 245  
 
 246  
   @Override
 247  
   public Basic2ObjectMap<K, R> reduceMerge(
 248  
       Basic2ObjectMap<K, R> curValue, Basic2ObjectMap<K, R> valueToReduce) {
 249  0
     for (Iterator<K> iter = valueToReduce.fastKeyIterator(); iter.hasNext();) {
 250  0
       K key = iter.next();
 251  
 
 252  0
       R result = curValue.get(key);
 253  0
       if (result == null) {
 254  0
         result = typeOps.create();
 255  
       }
 256  0
       result = elementReduceOp.reduceMerge(result, valueToReduce.get(key));
 257  0
       curValue.put(key, result);
 258  0
     }
 259  0
     return curValue;
 260  
   }
 261  
 
 262  
   @Override
 263  
   public void write(DataOutput out) throws IOException {
 264  0
     TypeOpsUtils.writeTypeOps(keyTypeOps, out);
 265  0
     TypeOpsUtils.writeTypeOps(typeOps, out);
 266  0
     WritableUtils.writeWritableObject(elementReduceOp, out);
 267  0
   }
 268  
 
 269  
   @Override
 270  
   public void readFields(DataInput in) throws IOException {
 271  0
     keyTypeOps = TypeOpsUtils.readTypeOps(in);
 272  0
     typeOps = TypeOpsUtils.readTypeOps(in);
 273  0
     elementReduceOp = WritableUtils.readWritableObject(in, null);
 274  0
     init();
 275  0
   }
 276  
 }