Coverage Report - org.apache.giraph.block_app.reducers.array.ArrayReduce
 
Classes in this File Line Coverage Branch Coverage Complexity
ArrayReduce
0%
0/37
0%
0/4
0
ArrayReduce$1
0%
0/8
N/A
0
ArrayReduce$2
0%
0/10
N/A
0
ArrayReduce$2$1
0%
0/3
N/A
0
ArrayReduce$2$2
0%
0/5
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.reducers.array;
 19  
 
 20  
 import java.io.DataInput;
 21  
 import java.io.DataOutput;
 22  
 import java.io.IOException;
 23  
 import java.lang.reflect.Array;
 24  
 
 25  
 import org.apache.commons.lang3.tuple.MutablePair;
 26  
 import org.apache.commons.lang3.tuple.Pair;
 27  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 28  
 import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
 29  
 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
 30  
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
 31  
 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
 32  
 import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
 33  
 import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
 34  
 import org.apache.giraph.master.MasterGlobalCommUsage;
 35  
 import org.apache.giraph.reducers.ReduceOperation;
 36  
 import org.apache.giraph.utils.ArrayWritable;
 37  
 import org.apache.giraph.utils.WritableUtils;
 38  
 import org.apache.giraph.worker.WorkerBroadcastUsage;
 39  
 import org.apache.hadoop.io.Writable;
 40  
 
 41  
 /**
 42  
  * One reducer representing reduction of array of individual values.
 43  
  * Elements are represented as object, and so BasicArrayReduce should be
 44  
  * used instead when elements are primitive types.
 45  
  *
 46  
  * @param <S> Single value type, objects passed on workers
 47  
  * @param <R> Reduced value type
 48  
  */
 49  0
 public class ArrayReduce<S, R extends Writable>
 50  
     implements ReduceOperation<Pair<IntRef, S>, ArrayWritable<R>> {
 51  
   private int fixedSize;
 52  
   private ReduceOperation<S, R> elementReduceOp;
 53  
   private Class<R> elementClass;
 54  
 
 55  0
   public ArrayReduce() {
 56  0
   }
 57  
 
 58  
   /**
 59  
    * Create ReduceOperation that reduces arrays by reducing individual
 60  
    * elements.
 61  
    *
 62  
    * @param fixedSize Number of elements
 63  
    * @param elementReduceOp ReduceOperation for individual elements
 64  
    */
 65  0
   public ArrayReduce(int fixedSize, ReduceOperation<S, R> elementReduceOp) {
 66  0
     this.fixedSize = fixedSize;
 67  0
     this.elementReduceOp = elementReduceOp;
 68  0
     init();
 69  0
   }
 70  
 
 71  
   /**
 72  
    * Registers one new reducer, that will reduce array of objects,
 73  
    * by reducing individual elements using {@code elementReduceOp}.
 74  
    *
 75  
    * This function will return ReducerArrayHandle to it, by which
 76  
    * individual elements can be manipulated separately.
 77  
    *
 78  
    * @param fixedSize Number of elements
 79  
    * @param elementReduceOp ReduceOperation for individual elements
 80  
    * @param createFunction Function for creating a reducer
 81  
    * @return Created ReducerArrayHandle
 82  
    */
 83  
   public static <S, T extends Writable>
 84  
   ReducerArrayHandle<S, T> createArrayHandles(
 85  
       final int fixedSize, ReduceOperation<S, T> elementReduceOp,
 86  
       CreateReducerFunctionApi createFunction) {
 87  0
     final ReducerHandle<Pair<IntRef, S>, ArrayWritable<T>> reduceHandle =
 88  0
         createFunction.createReducer(
 89  
             new ArrayReduce<>(fixedSize, elementReduceOp));
 90  
 
 91  0
     final IntRef curIndex = new IntRef(0);
 92  0
     final MutablePair<IntRef, S> reusablePair =
 93  0
         MutablePair.of(new IntRef(0), null);
 94  0
     final ReducerHandle<S, T> elementReduceHandle = new ReducerHandle<S, T>() {
 95  
       @Override
 96  
       public T getReducedValue(MasterGlobalCommUsage master) {
 97  0
         ArrayWritable<T> result = reduceHandle.getReducedValue(master);
 98  0
         return result.get()[curIndex.value];
 99  
       }
 100  
 
 101  
       @Override
 102  
       public void reduce(S valueToReduce) {
 103  0
         reusablePair.getLeft().value = curIndex.value;
 104  0
         reusablePair.setRight(valueToReduce);
 105  0
         reduceHandle.reduce(reusablePair);
 106  0
       }
 107  
 
 108  
       @Override
 109  
       public BroadcastHandle<T> broadcastValue(BlockMasterApi master) {
 110  0
         throw new UnsupportedOperationException();
 111  
       }
 112  
     };
 113  
 
 114  0
     return new ReducerArrayHandle<S, T>() {
 115  
       @Override
 116  
       public ReducerHandle<S, T> get(int index) {
 117  0
         curIndex.value = index;
 118  0
         return elementReduceHandle;
 119  
       }
 120  
 
 121  
       @Override
 122  
       public int getStaticSize() {
 123  0
         return fixedSize;
 124  
       }
 125  
 
 126  
       @Override
 127  
       public int getReducedSize(BlockMasterApi master) {
 128  0
         return getStaticSize();
 129  
       }
 130  
 
 131  
       @Override
 132  
       public BroadcastArrayHandle<T> broadcastValue(BlockMasterApi master) {
 133  0
         final BroadcastHandle<ArrayWritable<T>> broadcastHandle =
 134  0
             reduceHandle.broadcastValue(master);
 135  0
         final IntRef curIndex = new IntRef(0);
 136  
         final BroadcastHandle<T>
 137  0
         elementBroadcastHandle = new BroadcastHandle<T>() {
 138  
           @Override
 139  
           public T getBroadcast(WorkerBroadcastUsage worker) {
 140  0
             ArrayWritable<T> result = broadcastHandle.getBroadcast(worker);
 141  0
             return result.get()[curIndex.value];
 142  
           }
 143  
         };
 144  0
         return new BroadcastArrayHandle<T>() {
 145  
           @Override
 146  
           public BroadcastHandle<T> get(int index) {
 147  0
             curIndex.value = index;
 148  0
             return elementBroadcastHandle;
 149  
           }
 150  
 
 151  
           @Override
 152  
           public int getStaticSize() {
 153  0
             return fixedSize;
 154  
           }
 155  
 
 156  
           @Override
 157  
           public int getBroadcastedSize(WorkerBroadcastUsage worker) {
 158  0
             return getStaticSize();
 159  
           }
 160  
         };
 161  
       }
 162  
     };
 163  
   }
 164  
 
 165  
   private void init() {
 166  0
     elementClass = (Class<R>) elementReduceOp.createInitialValue().getClass();
 167  0
   }
 168  
 
 169  
   @Override
 170  
   public ArrayWritable<R> createInitialValue() {
 171  0
     R[] values = (R[]) Array.newInstance(elementClass, fixedSize);
 172  0
     for (int i = 0; i < fixedSize; i++) {
 173  0
       values[i] = elementReduceOp.createInitialValue();
 174  
     }
 175  0
     return new ArrayWritable<>(elementClass, values);
 176  
   }
 177  
 
 178  
   @Override
 179  
   public ArrayWritable<R> reduce(
 180  
       ArrayWritable<R> curValue, Pair<IntRef, S> valueToReduce) {
 181  0
     int index = valueToReduce.getLeft().value;
 182  0
     curValue.get()[index] =
 183  0
         elementReduceOp.reduce(curValue.get()[index], valueToReduce.getRight());
 184  0
     return curValue;
 185  
   }
 186  
 
 187  
   @Override
 188  
   public ArrayWritable<R> reduceMerge(
 189  
       ArrayWritable<R> curValue, ArrayWritable<R> valueToReduce) {
 190  0
     for (int i = 0; i < fixedSize; i++) {
 191  0
       curValue.get()[i] =
 192  0
           elementReduceOp.reduceMerge(
 193  0
               curValue.get()[i], valueToReduce.get()[i]);
 194  
     }
 195  0
     return curValue;
 196  
   }
 197  
 
 198  
   @Override
 199  
   public void write(DataOutput out) throws IOException {
 200  0
     out.writeInt(fixedSize);
 201  0
     WritableUtils.writeWritableObject(elementReduceOp, out);
 202  0
   }
 203  
 
 204  
   @Override
 205  
   public void readFields(DataInput in) throws IOException {
 206  0
     fixedSize = in.readInt();
 207  0
     elementReduceOp = WritableUtils.readWritableObject(in, null);
 208  0
     init();
 209  0
   }
 210  
 
 211  
 }