Coverage Report - org.apache.giraph.block_app.reducers.array.BasicArrayReduce
 
Classes in this File Line Coverage Branch Coverage Complexity
BasicArrayReduce
0%
0/61
0%
0/12
0
BasicArrayReduce$1
0%
0/2
N/A
0
BasicArrayReduce$2
0%
0/11
0%
0/4
0
BasicArrayReduce$3
0%
0/13
0%
0/2
0
BasicArrayReduce$3$1
0%
0/6
0%
0/4
0
BasicArrayReduce$3$2
0%
0/7
0%
0/2
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.reducers.array;
 19  
 
 20  
 import java.io.DataInput;
 21  
 import java.io.DataOutput;
 22  
 import java.io.IOException;
 23  
 
 24  
 import org.apache.commons.lang3.tuple.MutablePair;
 25  
 import org.apache.commons.lang3.tuple.Pair;
 26  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 27  
 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
 28  
 import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
 29  
 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
 30  
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
 31  
 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
 32  
 import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
 33  
 import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
 34  
 import org.apache.giraph.master.MasterGlobalCommUsage;
 35  
 import org.apache.giraph.reducers.ReduceOperation;
 36  
 import org.apache.giraph.types.ops.PrimitiveTypeOps;
 37  
 import org.apache.giraph.types.ops.TypeOpsUtils;
 38  
 import org.apache.giraph.types.ops.collections.array.WArrayList;
 39  
 import org.apache.giraph.utils.WritableUtils;
 40  
 import org.apache.giraph.worker.WorkerBroadcastUsage;
 41  
 import org.apache.hadoop.io.Writable;
 42  
 
 43  
 /**
 44  
  * Efficient generic primitive array reduce operation.
 45  
  *
 46  
  * Allows two modes - fixed size, and infinite size
 47  
  * (with keeping only actually used elements and resizing)
 48  
  *
 49  
  * @param <S> Single value type
 50  
  * @param <R> Reduced value type
 51  
  */
 52  0
 public class BasicArrayReduce<S, R extends Writable>
 53  
     implements ReduceOperation<Pair<IntRef, S>, WArrayList<R>> {
 54  
   private int fixedSize;
 55  
   private PrimitiveTypeOps<R> typeOps;
 56  
   private ReduceOperation<S, R> elementReduceOp;
 57  
   private R initialElement;
 58  
   private R reusable;
 59  
   private R reusable2;
 60  
 
 61  0
   public BasicArrayReduce() {
 62  0
   }
 63  
 
 64  
 
 65  
   /**
 66  
    * Create ReduceOperation that reduces BasicArrays by reducing individual
 67  
    * elements, with predefined size.
 68  
    *
 69  
    * @param fixedSize Number of elements
 70  
    * @param typeOps TypeOps of individual elements
 71  
    * @param elementReduceOp ReduceOperation for individual elements
 72  
    */
 73  
   public BasicArrayReduce(
 74  
       int fixedSize,
 75  
       PrimitiveTypeOps<R> typeOps,
 76  0
       ReduceOperation<S, R> elementReduceOp) {
 77  0
     this.fixedSize = fixedSize;
 78  0
     this.typeOps = typeOps;
 79  0
     this.elementReduceOp = elementReduceOp;
 80  0
     init();
 81  0
   }
 82  
 
 83  
 
 84  
   /**
 85  
    * Create ReduceOperation that reduces BasicArrays by reducing individual
 86  
    * elements, with unbounded size.
 87  
    *
 88  
    * @param typeOps TypeOps of individual elements
 89  
    * @param elementReduceOp ReduceOperation for individual elements
 90  
    */
 91  
   public BasicArrayReduce(
 92  
       PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp) {
 93  0
     this(-1, typeOps, elementReduceOp);
 94  0
   }
 95  
 
 96  
 
 97  
   /**
 98  
    * Registers one new local reducer, that will reduce BasicArray,
 99  
    * by reducing individual elements using {@code elementReduceOp},
 100  
    * with unbounded size.
 101  
    *
 102  
    * This function will return ReducerArrayHandle, by which
 103  
    * individual elements can be manipulated separately.
 104  
    *
 105  
    * @param typeOps TypeOps of individual elements
 106  
    * @param elementReduceOp ReduceOperation for individual elements
 107  
    * @param reduceApi API for creating reducers
 108  
    * @return Created ReducerArrayHandle
 109  
    */
 110  
   public static <S, R extends Writable>
 111  
   ReducerArrayHandle<S, R> createLocalArrayHandles(
 112  
       PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
 113  
       CreateReducersApi reduceApi) {
 114  0
     return createLocalArrayHandles(-1, typeOps, elementReduceOp, reduceApi);
 115  
   }
 116  
 
 117  
   /**
 118  
    * Registers one new local reducer, that will reduce BasicArray,
 119  
    * by reducing individual elements using {@code elementReduceOp},
 120  
    * with predefined size.
 121  
    *
 122  
    * This function will return ReducerArrayHandle, by which
 123  
    * individual elements can be manipulated separately.
 124  
    *
 125  
    * @param fixedSize Number of elements
 126  
    * @param typeOps TypeOps of individual elements
 127  
    * @param elementReduceOp ReduceOperation for individual elements
 128  
    * @param reduceApi API for creating reducers
 129  
    * @return Created ReducerArrayHandle
 130  
    */
 131  
   public static <S, R extends Writable>
 132  
   ReducerArrayHandle<S, R> createLocalArrayHandles(
 133  
       int fixedSize, PrimitiveTypeOps<R> typeOps,
 134  
       ReduceOperation<S, R> elementReduceOp,
 135  
       final CreateReducersApi reduceApi) {
 136  0
     return createArrayHandles(fixedSize, typeOps, elementReduceOp,
 137  0
         new CreateReducerFunctionApi() {
 138  
           @Override
 139  
           public <S, R extends Writable> ReducerHandle<S, R> createReducer(
 140  
               ReduceOperation<S, R> reduceOp) {
 141  0
             return reduceApi.createLocalReducer(reduceOp);
 142  
           }
 143  
         });
 144  
   }
 145  
 
 146  
   /**
 147  
    * Registers one new reducer, that will reduce BasicArray,
 148  
    * by reducing individual elements using {@code elementReduceOp},
 149  
    * with unbounded size.
 150  
    *
 151  
    * This function will return ReducerArrayHandle, by which
 152  
    * individual elements can be manipulated separately.
 153  
    *
 154  
    * @param typeOps TypeOps of individual elements
 155  
    * @param elementReduceOp ReduceOperation for individual elements
 156  
    * @param createFunction Function for creating a reducer
 157  
    * @return Created ReducerArrayHandle
 158  
    */
 159  
   public static <S, R extends Writable>
 160  
   ReducerArrayHandle<S, R> createArrayHandles(
 161  
       PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
 162  
       CreateReducerFunctionApi createFunction) {
 163  0
     return createArrayHandles(-1, typeOps, elementReduceOp, createFunction);
 164  
   }
 165  
 
 166  
   /**
 167  
    * Registers one new reducer, that will reduce BasicArray,
 168  
    * by reducing individual elements using {@code elementReduceOp},
 169  
    * with predefined size.
 170  
    *
 171  
    * This function will return ReducerArrayHandle, by which
 172  
    * individual elements can be manipulated separately.
 173  
    *
 174  
    * @param fixedSize Number of elements
 175  
    * @param typeOps TypeOps of individual elements
 176  
    * @param elementReduceOp ReduceOperation for individual elements
 177  
    * @param createFunction Function for creating a reducer
 178  
    * @return Created ReducerArrayHandle
 179  
    */
 180  
   public static <S, R extends Writable>
 181  
   ReducerArrayHandle<S, R> createArrayHandles(
 182  
       final int fixedSize, final PrimitiveTypeOps<R> typeOps,
 183  
       ReduceOperation<S, R> elementReduceOp,
 184  
       CreateReducerFunctionApi createFunction) {
 185  0
     final ReducerHandle<Pair<IntRef, S>, WArrayList<R>> reduceHandle =
 186  0
         createFunction.createReducer(
 187  
             new BasicArrayReduce<>(fixedSize, typeOps, elementReduceOp));
 188  0
     final IntRef curIndex = new IntRef(0);
 189  0
     final R reusableValue = typeOps.create();
 190  0
     final R initialValue = elementReduceOp.createInitialValue();
 191  0
     final MutablePair<IntRef, S> reusablePair =
 192  0
         MutablePair.of(new IntRef(0), null);
 193  0
     final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
 194  
       @Override
 195  
       public R getReducedValue(MasterGlobalCommUsage master) {
 196  0
         WArrayList<R> result = reduceHandle.getReducedValue(master);
 197  0
         if (fixedSize == -1 && curIndex.value >= result.size()) {
 198  0
           typeOps.set(reusableValue, initialValue);
 199  
         } else {
 200  0
           result.getIntoW(curIndex.value, reusableValue);
 201  
         }
 202  0
         return reusableValue;
 203  
       }
 204  
 
 205  
       @Override
 206  
       public void reduce(S valueToReduce) {
 207  0
         reusablePair.getLeft().value = curIndex.value;
 208  0
         reusablePair.setRight(valueToReduce);
 209  0
         reduceHandle.reduce(reusablePair);
 210  0
       }
 211  
 
 212  
       @Override
 213  
       public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
 214  0
         throw new UnsupportedOperationException();
 215  
       }
 216  
     };
 217  
 
 218  0
     return new ReducerArrayHandle<S, R>() {
 219  
       @Override
 220  
       public ReducerHandle<S, R> get(int index) {
 221  0
         curIndex.value = index;
 222  0
         return elementReduceHandle;
 223  
       }
 224  
 
 225  
       @Override
 226  
       public int getStaticSize() {
 227  0
         if (fixedSize == -1) {
 228  0
           throw new UnsupportedOperationException(
 229  
               "Cannot call size, when one is not specified upfront");
 230  
         }
 231  0
         return fixedSize;
 232  
       }
 233  
 
 234  
       @Override
 235  
       public int getReducedSize(BlockMasterApi master) {
 236  0
         return reduceHandle.getReducedValue(master).size();
 237  
       }
 238  
 
 239  
       @Override
 240  
       public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
 241  0
         final BroadcastHandle<WArrayList<R>> broadcastHandle =
 242  0
             reduceHandle.broadcastValue(master);
 243  0
         final IntRef curIndex = new IntRef(0);
 244  0
         final R reusableValue = typeOps.create();
 245  
         final BroadcastHandle<R>
 246  0
         elementBroadcastHandle = new BroadcastHandle<R>() {
 247  
           @Override
 248  
           public R getBroadcast(WorkerBroadcastUsage worker) {
 249  0
             WArrayList<R> result = broadcastHandle.getBroadcast(worker);
 250  0
             if (fixedSize == -1 && curIndex.value >= result.size()) {
 251  0
               typeOps.set(reusableValue, initialValue);
 252  
             } else {
 253  0
               result.getIntoW(curIndex.value, reusableValue);
 254  
             }
 255  0
             return reusableValue;
 256  
           }
 257  
         };
 258  0
         return new BroadcastArrayHandle<R>() {
 259  
           @Override
 260  
           public BroadcastHandle<R> get(int index) {
 261  0
             curIndex.value = index;
 262  0
             return elementBroadcastHandle;
 263  
           }
 264  
 
 265  
           @Override
 266  
           public int getStaticSize() {
 267  0
             if (fixedSize == -1) {
 268  0
               throw new UnsupportedOperationException(
 269  
                   "Cannot call size, when one is not specified upfront");
 270  
             }
 271  0
             return fixedSize;
 272  
           }
 273  
 
 274  
           @Override
 275  
           public int getBroadcastedSize(WorkerBroadcastUsage worker) {
 276  0
             return broadcastHandle.getBroadcast(worker).size();
 277  
           }
 278  
         };
 279  
       }
 280  
     };
 281  
   }
 282  
 
 283  
 
 284  
   private void init() {
 285  0
     initialElement = elementReduceOp.createInitialValue();
 286  0
     reusable = typeOps.create();
 287  0
     reusable2 = typeOps.create();
 288  0
   }
 289  
 
 290  
   @Override
 291  
   public WArrayList<R> createInitialValue() {
 292  0
     if (fixedSize != -1) {
 293  0
       WArrayList<R> list = typeOps.createArrayList(fixedSize);
 294  0
       fill(list, fixedSize);
 295  0
       return list;
 296  
     } else {
 297  0
       return typeOps.createArrayList(1);
 298  
     }
 299  
   }
 300  
 
 301  
   private void fill(WArrayList<R> list, int newSize) {
 302  0
     if (fixedSize != -1 && newSize > fixedSize) {
 303  0
       throw new IllegalArgumentException(newSize + " larger then " + fixedSize);
 304  
     }
 305  
 
 306  0
     if (list.capacity() < newSize) {
 307  0
       list.setCapacity(newSize);
 308  
     }
 309  0
     while (list.size() < newSize) {
 310  0
       list.addW(initialElement);
 311  
     }
 312  0
   }
 313  
 
 314  
   @Override
 315  
   public WArrayList<R> reduce(
 316  
       WArrayList<R> curValue, Pair<IntRef, S> valueToReduce) {
 317  0
     int index = valueToReduce.getLeft().value;
 318  0
     fill(curValue, index + 1);
 319  0
     curValue.getIntoW(index, reusable);
 320  0
     R result = elementReduceOp.reduce(reusable, valueToReduce.getRight());
 321  0
     curValue.setW(index, result);
 322  0
     return curValue;
 323  
   }
 324  
 
 325  
   @Override
 326  
   public WArrayList<R> reduceMerge(
 327  
       WArrayList<R> curValue, WArrayList<R> valueToReduce) {
 328  0
     fill(curValue, valueToReduce.size());
 329  0
     for (int i = 0; i < valueToReduce.size(); i++) {
 330  0
       valueToReduce.getIntoW(i, reusable2);
 331  0
       curValue.getIntoW(i, reusable);
 332  0
       R result = elementReduceOp.reduceMerge(reusable, reusable2);
 333  0
       curValue.setW(i, result);
 334  
     }
 335  
 
 336  0
     return curValue;
 337  
   }
 338  
 
 339  
   @Override
 340  
   public void write(DataOutput out) throws IOException {
 341  0
     out.writeInt(fixedSize);
 342  0
     TypeOpsUtils.writeTypeOps(typeOps, out);
 343  0
     WritableUtils.writeWritableObject(elementReduceOp, out);
 344  0
   }
 345  
 
 346  
   @Override
 347  
   public void readFields(DataInput in) throws IOException {
 348  0
     fixedSize = in.readInt();
 349  0
     typeOps = TypeOpsUtils.readTypeOps(in);
 350  0
     elementReduceOp = WritableUtils.readWritableObject(in, null);
 351  0
     init();
 352  0
   }
 353  
 }