Coverage Report - org.apache.giraph.block_app.reducers.array.HugeArrayUtils
 
Classes in this File Line Coverage Branch Coverage Complexity
HugeArrayUtils
0%
0/43
0%
0/10
0
HugeArrayUtils$1
0%
0/2
N/A
0
HugeArrayUtils$10
0%
0/5
N/A
0
HugeArrayUtils$2
0%
0/2
N/A
0
HugeArrayUtils$3
0%
0/9
0%
0/4
0
HugeArrayUtils$4
0%
0/4
0%
0/2
0
HugeArrayUtils$5
0%
0/6
0%
0/4
0
HugeArrayUtils$6
0%
0/10
0%
0/4
0
HugeArrayUtils$7
0%
0/2
N/A
0
HugeArrayUtils$8
0%
0/7
0%
0/2
0
HugeArrayUtils$9
0%
0/4
N/A
0
HugeArrayUtils$ObjectStriping
0%
0/17
0%
0/8
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.reducers.array;
 19  
 
 20  
 import java.util.ArrayList;
 21  
 
 22  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 23  
 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
 24  
 import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
 25  
 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
 26  
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
 27  
 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
 28  
 import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
 29  
 import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfBroadcasts;
 30  
 import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfReducers;
 31  
 import org.apache.giraph.conf.IntConfOption;
 32  
 import org.apache.giraph.function.ObjectHolder;
 33  
 import org.apache.giraph.function.Supplier;
 34  
 import org.apache.giraph.function.primitive.Int2ObjFunction;
 35  
 import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
 36  
 import org.apache.giraph.reducers.ReduceOperation;
 37  
 import org.apache.giraph.types.ops.PrimitiveTypeOps;
 38  
 import org.apache.giraph.types.ops.TypeOpsUtils;
 39  
 import org.apache.giraph.types.ops.collections.array.WArrayList;
 40  
 import org.apache.giraph.utils.ArrayWritable;
 41  
 import org.apache.giraph.worker.WorkerBroadcastUsage;
 42  
 import org.apache.hadoop.io.Writable;
 43  
 
 44  
 /**
 45  
  * Utility class when we are dealing with huge arrays (i.e. large number of
 46  
  * elements) within reducing/broadcasting.
 47  
  *
 48  
  * In Giraph, for each reducer there is a worker machine which is it's owner,
 49  
  * which does partial aggregation for it. So if we have only single huge
 50  
  * reducer - other workers will have to wait, while that single worker is doing
 51  
  * huge reducing operation. Additionally single reducer should be smaller then
 52  
  * max netty message, which is 1MB.
 53  
  * On the other hand, each reducer has a meaningful overhead, so we should try
 54  
  * to keep number of reducers as low as possible.
 55  
  *
 56  
  * By default we are being conservative, to keep individual reducers small,
 57  
  * with striping into 500k reducers by default. If you know exact sizes of
 58  
  * your objects you can specify exact number you want.
 59  
  *
 60  
  * So when we have huge array, we don't want one reducer/broadcast for each
 61  
  * element, but we also don't want one reducer/broadcast for the whole array.
 62  
  *
 63  
  * This class allows transparent split into reasonable number of reducers
 64  
  * (~500k), which solves both of the above issues.
 65  
  */
 66  
 public class HugeArrayUtils {
 67  
   // Even with 100GB object, average stripe will be 200KB on average,
 68  
   // keeping outliers mostly under 1MB limit
 69  0
   private static final IntConfOption NUM_STRIPES = new IntConfOption(
 70  
       "giraph.reducers.HugeArrayUtils.num_stripes", 500000,
 71  
       "Number of distict reducers to create. If array is smaller then this" +
 72  
       "number, each element will be it's own reducer");
 73  
 
 74  0
   private HugeArrayUtils() { }
 75  
 
 76  
   /**
 77  
    * Create global array of reducers, by splitting the huge array
 78  
    * into NUM_STRIPES number of parts.
 79  
    *
 80  
    * @param fixedSize Number of elements
 81  
    * @param elementReduceOp ReduceOperation for individual elements
 82  
    * @param reduceApi Api for creating reducers
 83  
    * @return Created ReducerArrayHandle
 84  
    */
 85  
   public static <S, R extends Writable>
 86  
   ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
 87  
       final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
 88  
       final CreateReducersApi reduceApi) {
 89  0
     return createGlobalReducerArrayHandle(
 90  
         fixedSize, elementReduceOp, reduceApi,
 91  0
         NUM_STRIPES.get(reduceApi.getConf()));
 92  
   }
 93  
 
 94  
   /**
 95  
    * Create global array of reducers, by splitting the huge array
 96  
    * into {@code maxNumStripes} number of parts.
 97  
    *
 98  
    * @param fixedSize Number of elements
 99  
    * @param elementReduceOp ReduceOperation for individual elements
 100  
    * @param reduceApi Api for creating reducers
 101  
    * @param maxNumStripes Maximal number of reducers to create.
 102  
    * @return Created ReducerArrayHandle
 103  
    */
 104  
   public static <S, R extends Writable>
 105  
   ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
 106  
       final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
 107  
       final CreateReducersApi reduceApi, int maxNumStripes) {
 108  0
     PrimitiveTypeOps<R> typeOps = TypeOpsUtils.getPrimitiveTypeOpsOrNull(
 109  0
         (Class<R>) elementReduceOp.createInitialValue().getClass());
 110  
 
 111  
     final CreateReducerFunctionApi
 112  0
     createReducer = new CreateReducerFunctionApi() {
 113  
       @Override
 114  
       public <S, R extends Writable> ReducerHandle<S, R> createReducer(
 115  
           ReduceOperation<S, R> reduceOp) {
 116  0
         return reduceApi.createGlobalReducer(reduceOp);
 117  
       }
 118  
     };
 119  
 
 120  0
     if (fixedSize < maxNumStripes) {
 121  0
       return new ArrayOfReducers<>(
 122  
           fixedSize,
 123  0
           new Supplier<ReducerHandle<S, R>>() {
 124  
             @Override
 125  
             public ReducerHandle<S, R> get() {
 126  0
               return createReducer.createReducer(elementReduceOp);
 127  
             }
 128  
           });
 129  
     } else {
 130  0
       final ObjectStriping striping =
 131  
           new ObjectStriping(fixedSize, maxNumStripes);
 132  
 
 133  0
       final ArrayList<ReducerArrayHandle<S, R>> handles =
 134  0
           new ArrayList<>(striping.getSplits());
 135  0
       for (int i = 0; i < striping.getSplits(); i++) {
 136  0
         if (typeOps != null) {
 137  0
           handles.add(BasicArrayReduce.createArrayHandles(
 138  0
               striping.getSplitSize(i), typeOps,
 139  
               elementReduceOp, createReducer));
 140  
         } else {
 141  0
           handles.add(ArrayReduce.createArrayHandles(
 142  0
               striping.getSplitSize(i), elementReduceOp, createReducer));
 143  
         }
 144  
       }
 145  
 
 146  0
       return new ReducerArrayHandle<S, R>() {
 147  
         @Override
 148  
         public ReducerHandle<S, R> get(int index) {
 149  0
           if ((index >= fixedSize) || (index < 0)) {
 150  0
             throw new RuntimeException(
 151  
                 "Reducer Access out of bounds: requested : " +
 152  
                     index + " from array of size : " + fixedSize);
 153  
           }
 154  0
           int reducerIndex = striping.getSplitIndex(index);
 155  0
           int insideIndex = striping.getInsideIndex(index);
 156  0
           return handles.get(reducerIndex).get(insideIndex);
 157  
         }
 158  
 
 159  
         @Override
 160  
         public int getStaticSize() {
 161  0
           return fixedSize;
 162  
         }
 163  
 
 164  
         @Override
 165  
         public int getReducedSize(BlockMasterApi master) {
 166  0
           return getStaticSize();
 167  
         }
 168  
 
 169  
         @Override
 170  
         public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
 171  0
           throw new UnsupportedOperationException("for now not supported");
 172  
         }
 173  
       };
 174  
     }
 175  
   }
 176  
 
 177  
   /**
 178  
    * Broadcast a huge array, by splitting into NUM_STRIPES number of parts.
 179  
    *
 180  
    * @param count Number of elements
 181  
    * @param valueSupplier Supplier of value to be broadcasted for a given index
 182  
    * @param master Master API
 183  
    * @return Created BroadcastArrayHandle
 184  
    */
 185  
   public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
 186  
       final int count,
 187  
       final Int2ObjFunction<V> valueSupplier,
 188  
       final BlockMasterApi master) {
 189  0
     return broadcast(count, valueSupplier, null, master);
 190  
   }
 191  
 
 192  
   /**
 193  
    * Broadcast a huge array, by splitting into NUM_STRIPES number of parts.
 194  
    * Efficient for primitive types, using BasicArray underneath.
 195  
    *
 196  
    * @param count Number of elements
 197  
    * @param valueSupplier Supplier of value to be broadcasted for a given index
 198  
    * @param typeOps Element TypeOps
 199  
    * @param master Master API
 200  
    * @return Created BroadcastArrayHandle
 201  
    */
 202  
   public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
 203  
       final int count,
 204  
       final Int2ObjFunction<V> valueSupplier,
 205  
       final PrimitiveTypeOps<V> typeOps,
 206  
       final BlockMasterApi master) {
 207  0
     int numStripes = NUM_STRIPES.get(master.getConf());
 208  0
     if (count < numStripes) {
 209  0
       return new ArrayOfBroadcasts<>(
 210  
           count,
 211  0
           new Int2ObjFunction<BroadcastHandle<V>>() {
 212  
             @Override
 213  
             public BroadcastHandle<V> apply(int i) {
 214  
               // We create a copy because the valueSupplier might return a
 215  
               // reusable obj. This function is NOT safe if typeOps is null
 216  
               // & valueSupplier returns reusable
 217  0
               return master.broadcast(
 218  
                 typeOps != null ?
 219  0
                 typeOps.createCopy(valueSupplier.apply(i)) :
 220  0
                 valueSupplier.apply(i));
 221  
             }
 222  
           });
 223  
     } else {
 224  0
       ObjectStriping striping = new ObjectStriping(count, numStripes);
 225  
       final Int2ObjFunction<BroadcastHandle<V>> handleSupplier;
 226  
 
 227  0
       if (typeOps != null) {
 228  0
         handleSupplier = getPrimitiveBroadcastHandleSupplier(
 229  
             valueSupplier, typeOps, master, striping);
 230  
       } else {
 231  0
         handleSupplier = getObjectBroadcastHandleSupplier(
 232  
             valueSupplier, master, striping);
 233  
       }
 234  0
       return new BroadcastArrayHandle<V>() {
 235  
         @Override
 236  
         public BroadcastHandle<V> get(int index) {
 237  0
           if (index >= count || index < 0) {
 238  0
             throw new RuntimeException(
 239  
                 "Broadcast Access out of bounds: requested: " +
 240  
                   index + " from array of size : " + count);
 241  
           }
 242  0
           return handleSupplier.apply(index);
 243  
         }
 244  
 
 245  
         @Override
 246  
         public int getBroadcastedSize(WorkerBroadcastUsage worker) {
 247  0
           return count;
 248  
         }
 249  
 
 250  
         @Override
 251  
         public int getStaticSize() {
 252  0
           return count;
 253  
         }
 254  
       };
 255  
     }
 256  
   }
 257  
 
 258  
   private static <V extends Writable>
 259  
   Int2ObjFunction<BroadcastHandle<V>> getObjectBroadcastHandleSupplier(
 260  
       final Int2ObjFunction<V> valueSupplier,
 261  
       final BlockMasterApi master, final ObjectStriping striping) {
 262  0
     final ObjectHolder<Class<V>> elementClass = new ObjectHolder<>();
 263  0
     final ArrayOfHandles<BroadcastHandle<ArrayWritable<V>>> arrayOfBroadcasts =
 264  
       new ArrayOfHandles<>(
 265  0
         striping.getSplits(),
 266  0
         new Int2ObjFunction<BroadcastHandle<ArrayWritable<V>>>() {
 267  
           @Override
 268  
           public BroadcastHandle<ArrayWritable<V>> apply(int value) {
 269  0
             int size = striping.getSplitSize(value);
 270  0
             int start = striping.getSplitStart(value);
 271  0
             V[] array = (V[]) new Writable[size];
 272  0
             for (int i = 0; i < size; i++) {
 273  0
               array[i] = valueSupplier.apply(start + i);
 274  0
               if (elementClass.get() == null) {
 275  0
                 elementClass.apply((Class<V>) array[i].getClass());
 276  
               }
 277  
             }
 278  0
             return master.broadcast(
 279  0
                 new ArrayWritable<>(elementClass.get(), array));
 280  
           }
 281  
         });
 282  
 
 283  0
     final IntRef insideIndex = new IntRef(-1);
 284  0
     final ObjectHolder<BroadcastHandle<ArrayWritable<V>>> handleHolder =
 285  
         new ObjectHolder<>();
 286  
 
 287  0
     final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
 288  
       @Override
 289  
       public V getBroadcast(WorkerBroadcastUsage worker) {
 290  0
         return handleHolder.get().getBroadcast(worker).get()[insideIndex.value];
 291  
       }
 292  
     };
 293  
 
 294  0
     return createBroadcastHandleSupplier(
 295  
         striping, arrayOfBroadcasts, insideIndex, handleHolder,
 296  
         reusableHandle);
 297  
   }
 298  
 
 299  
   private static <V extends Writable>
 300  
   Int2ObjFunction<BroadcastHandle<V>> getPrimitiveBroadcastHandleSupplier(
 301  
       final Int2ObjFunction<V> valueSupplier, final PrimitiveTypeOps<V> typeOps,
 302  
       final BlockMasterApi master, final ObjectStriping striping) {
 303  0
     final ArrayOfHandles<BroadcastHandle<WArrayList<V>>> arrayOfBroadcasts =
 304  
       new ArrayOfHandles<>(
 305  0
         striping.getSplits(),
 306  0
         new Int2ObjFunction<BroadcastHandle<WArrayList<V>>>() {
 307  
           @Override
 308  
           public BroadcastHandle<WArrayList<V>> apply(int value) {
 309  0
             int size = striping.getSplitSize(value);
 310  0
             int start = striping.getSplitStart(value);
 311  0
             WArrayList<V> array = typeOps.createArrayList(size);
 312  0
             for (int i = 0; i < size; i++) {
 313  0
               array.addW(valueSupplier.apply(start + i));
 314  
             }
 315  0
             return master.broadcast(array);
 316  
           }
 317  
         });
 318  
 
 319  0
     final IntRef insideIndex = new IntRef(-1);
 320  0
     final ObjectHolder<BroadcastHandle<WArrayList<V>>> handleHolder =
 321  
             new ObjectHolder<>();
 322  0
     final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
 323  0
       private final V reusable = typeOps.create();
 324  
       @Override
 325  
       public V getBroadcast(WorkerBroadcastUsage worker) {
 326  0
         handleHolder.get().getBroadcast(worker).getIntoW(
 327  
             insideIndex.value, reusable);
 328  0
         return reusable;
 329  
       }
 330  
     };
 331  
 
 332  0
     return createBroadcastHandleSupplier(
 333  
         striping, arrayOfBroadcasts, insideIndex, handleHolder,
 334  
         reusableHandle);
 335  
   }
 336  
 
 337  
   private static <V extends Writable, A>
 338  
   Int2ObjFunction<BroadcastHandle<V>> createBroadcastHandleSupplier(
 339  
       final ObjectStriping striping,
 340  
       final ArrayOfHandles<BroadcastHandle<A>> arrayOfBroadcasts,
 341  
       final IntRef insideIndex,
 342  
       final ObjectHolder<BroadcastHandle<A>> handleHolder,
 343  
       final BroadcastHandle<V> reusableHandle) {
 344  0
     final Int2ObjFunction<BroadcastHandle<V>> handleProvider =
 345  0
         new Int2ObjFunction<BroadcastHandle<V>>() {
 346  
       @Override
 347  
       public BroadcastHandle<V> apply(int index) {
 348  0
         int broadcastIndex = striping.getSplitIndex(index);
 349  0
         insideIndex.value = striping.getInsideIndex(index);
 350  0
         handleHolder.apply(arrayOfBroadcasts.get(broadcastIndex));
 351  0
         return reusableHandle;
 352  
       }
 353  
     };
 354  0
     return handleProvider;
 355  
   }
 356  
 
 357  
   /**
 358  
    * Handles indices calculations when spliting one range into smaller number
 359  
    * of splits, where indices stay consecutive.
 360  
    */
 361  
   static class ObjectStriping {
 362  
     private final int splits;
 363  
     private final int indicesPerObject;
 364  
     private final int overflowNum;
 365  
     private final int beforeOverflow;
 366  
 
 367  0
     public ObjectStriping(int size, int splits) {
 368  0
       this.splits = splits;
 369  0
       this.indicesPerObject = size / splits;
 370  0
       this.overflowNum = size % splits;
 371  0
       this.beforeOverflow = overflowNum * (indicesPerObject + 1);
 372  0
     }
 373  
 
 374  
     public int getSplits() {
 375  0
       return splits;
 376  
     }
 377  
 
 378  
     public int getSplitSize(int splitIndex) {
 379  0
       return indicesPerObject + (splitIndex < overflowNum ? 1 : 0);
 380  
     }
 381  
 
 382  
     public int getSplitStart(int splitIndex) {
 383  0
       if (splitIndex < overflowNum) {
 384  0
         return splitIndex * (indicesPerObject + 1);
 385  
       } else {
 386  0
         return beforeOverflow + (splitIndex - overflowNum) * indicesPerObject;
 387  
       }
 388  
     }
 389  
 
 390  
     public int getSplitIndex(int objectIndex) {
 391  0
       if (objectIndex < beforeOverflow) {
 392  0
         return objectIndex / (indicesPerObject + 1);
 393  
       } else {
 394  0
         return (objectIndex - beforeOverflow) / indicesPerObject + overflowNum;
 395  
       }
 396  
     }
 397  
 
 398  
     public int getInsideIndex(int objectIndex) {
 399  0
       if (objectIndex < beforeOverflow) {
 400  0
         return objectIndex % (indicesPerObject + 1);
 401  
       } else {
 402  0
         return (objectIndex - beforeOverflow) % indicesPerObject;
 403  
       }
 404  
     }
 405  
   }
 406  
 }