Coverage Report - org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
ReducersForPieceHandler
0%
0/23
0%
0/4
0
ReducersForPieceHandler$1
0%
0/2
N/A
0
ReducersForPieceHandler$BroadcastHandleImpl
0%
0/5
N/A
0
ReducersForPieceHandler$GlobalReduceHandle
0%
0/8
N/A
0
ReducersForPieceHandler$LocalReduceHandle
0%
0/9
N/A
0
ReducersForPieceHandler$ReduceHandleImpl
0%
0/10
N/A
0
ReducersForPieceHandler$WrappedReducedValue
0%
0/14
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.piece.global_comm.internal;
 19  
 
 20  
 import java.io.DataInput;
 21  
 import java.io.DataOutput;
 22  
 import java.io.IOException;
 23  
 import java.util.ArrayList;
 24  
 import java.util.concurrent.atomic.AtomicInteger;
 25  
 
 26  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 27  
 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
 28  
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
 29  
 import org.apache.giraph.master.MasterGlobalCommUsage;
 30  
 import org.apache.giraph.reducers.ReduceOperation;
 31  
 import org.apache.giraph.reducers.Reducer;
 32  
 import org.apache.giraph.utils.WritableUtils;
 33  
 import org.apache.giraph.worker.WorkerBroadcastUsage;
 34  
 import org.apache.giraph.worker.WorkerReduceUsage;
 35  
 import org.apache.hadoop.io.Writable;
 36  
 
 37  
 /**
 38  
  * All logic for transforming Giraph's reducer API to reducer handles.
 39  
  * Contains state of active reducers, and is kept within a Piece.
 40  
  */
 41  0
 public class ReducersForPieceHandler implements VertexSenderObserver {
 42  0
   private static final AtomicInteger HANDLER_COUNTER = new AtomicInteger();
 43  0
   private static final AtomicInteger BROADCAST_COUNTER = new AtomicInteger();
 44  
 
 45  0
   private final int handleIndex = HANDLER_COUNTER.incrementAndGet();
 46  0
   private final AtomicInteger reduceCounter = new AtomicInteger();
 47  
 
 48  0
   private final ArrayList<VertexSenderObserver> observers = new ArrayList<>();
 49  
 
 50  
   @Override
 51  
   public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
 52  0
     for (VertexSenderObserver observer : observers) {
 53  0
       observer.vertexSenderWorkerPreprocess(usage);
 54  0
     }
 55  0
   }
 56  
 
 57  
   @Override
 58  
   public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
 59  0
     for (VertexSenderObserver observer : observers) {
 60  0
       observer.vertexSenderWorkerPostprocess(usage);
 61  0
     }
 62  0
   }
 63  
 
 64  
   public <S, R extends Writable> ReducerHandle<S, R> createLocalReducer(
 65  
       MasterGlobalCommUsage master,  ReduceOperation<S, R> reduceOp,
 66  
       R globalInitialValue) {
 67  0
     LocalReduceHandle<S, R> handle = new LocalReduceHandle<>(reduceOp);
 68  0
     master.registerReducer(handle.getName(), reduceOp, globalInitialValue);
 69  0
     observers.add(handle);
 70  0
     return handle;
 71  
   }
 72  
 
 73  
   public <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer(
 74  
       MasterGlobalCommUsage master,  ReduceOperation<S, R> reduceOp,
 75  
       R globalInitialValue) {
 76  0
     ReduceHandleImpl<S, R> handle = new GlobalReduceHandle<>(reduceOp);
 77  0
     master.registerReducer(handle.getName(), reduceOp, globalInitialValue);
 78  0
     observers.add(handle);
 79  0
     return handle;
 80  
   }
 81  
 
 82  
   /**
 83  
    * Implementation of BroadcastHandle
 84  
    *
 85  
    * @param <T> Value type
 86  
    */
 87  
   public static class BroadcastHandleImpl<T> implements BroadcastHandle<T> {
 88  
     private final String name;
 89  
 
 90  0
     public BroadcastHandleImpl() {
 91  0
       this.name = "_utils.broadcast." + BROADCAST_COUNTER.incrementAndGet();
 92  0
     }
 93  
 
 94  
     public String getName() {
 95  0
       return name;
 96  
     }
 97  
 
 98  
     @Override
 99  
     public T getBroadcast(WorkerBroadcastUsage worker) {
 100  0
       return worker.getBroadcast(name);
 101  
     }
 102  
   }
 103  
 
 104  
   /**
 105  
    * Parent implementation of ReducerHandle
 106  
    *
 107  
    * @param <S> Single value type
 108  
    * @param <R> Reduced value type
 109  
    */
 110  0
   public abstract class ReduceHandleImpl<S, R extends Writable>
 111  
       implements ReducerHandle<S, R>, VertexSenderObserver {
 112  
     protected final ReduceOperation<S, R> reduceOp;
 113  
     private final String name;
 114  
 
 115  0
     private ReduceHandleImpl(ReduceOperation<S, R> reduceOp) {
 116  0
       this.reduceOp = reduceOp;
 117  0
       name = "_utils." + handleIndex +
 118  0
           ".reduce." + reduceCounter.incrementAndGet();
 119  0
     }
 120  
 
 121  
     public String getName() {
 122  0
       return name;
 123  
     }
 124  
 
 125  
     @Override
 126  
     public R getReducedValue(MasterGlobalCommUsage master) {
 127  0
       return master.getReduced(name);
 128  
     }
 129  
 
 130  
     @Override
 131  
     public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
 132  0
       return unwrapHandle(master.broadcast(
 133  0
           new WrappedReducedValue<>(reduceOp, getReducedValue(master))));
 134  
     }
 135  
   }
 136  
 
 137  
   private static <R extends Writable> BroadcastHandle<R> unwrapHandle(
 138  
       final BroadcastHandle<WrappedReducedValue<R>> handle) {
 139  0
     return new BroadcastHandle<R>() {
 140  
       @Override
 141  
       public R getBroadcast(WorkerBroadcastUsage worker) {
 142  0
         return handle.getBroadcast(worker).getValue();
 143  
       }
 144  
     };
 145  
   }
 146  
 
 147  
   /**
 148  
    * Wrapper that makes reduced values self-serializable,
 149  
    * and allows them to be broadcasted.
 150  
    *
 151  
    * @param <R> Reduced value type
 152  
    */
 153  
   public static class WrappedReducedValue<R extends Writable>
 154  
       implements Writable {
 155  
     private ReduceOperation<?, R> reduceOp;
 156  
     private R value;
 157  
 
 158  0
     public WrappedReducedValue() {
 159  0
     }
 160  
 
 161  0
     public WrappedReducedValue(ReduceOperation<?, R> reduceOp, R value) {
 162  0
       this.reduceOp = reduceOp;
 163  0
       this.value = value;
 164  0
     }
 165  
 
 166  
     @Override
 167  
     public void write(DataOutput out) throws IOException {
 168  0
       WritableUtils.writeWritableObject(reduceOp, out);
 169  0
       value.write(out);
 170  0
     }
 171  
 
 172  
     @Override
 173  
     public void readFields(DataInput in) throws IOException {
 174  0
       reduceOp = WritableUtils.readWritableObject(in, null);
 175  0
       value = reduceOp.createInitialValue();
 176  0
       value.readFields(in);
 177  0
     }
 178  
 
 179  
     public R getValue() {
 180  0
       return value;
 181  
     }
 182  
   }
 183  
 
 184  
   /**
 185  
    * Global Reduce Handle is implementation of ReducerHandle, that will keep
 186  
    * only one value for each worker, and each call to reduce will have
 187  
    * to obtain a global lock, and incur synchronization costs.
 188  
    * Use only when objects are so large, that having many copies cannot fit
 189  
    * into memory.
 190  
    *
 191  
    * @param <S> Single value type
 192  
    * @param <R> Reduced value type
 193  
    */
 194  
   public class GlobalReduceHandle<S, R extends Writable>
 195  
       extends ReduceHandleImpl<S, R> {
 196  
     private transient WorkerReduceUsage usage;
 197  
 
 198  0
     public GlobalReduceHandle(ReduceOperation<S, R> reduceOp) {
 199  0
       super(reduceOp);
 200  0
     }
 201  
 
 202  
     @Override
 203  
     public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
 204  0
       this.usage = usage;
 205  0
     }
 206  
 
 207  
     @Override
 208  
     public void reduce(S valueToReduce) {
 209  0
       usage.reduce(getName(), valueToReduce);
 210  0
     }
 211  
 
 212  
     @Override
 213  
     public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
 214  0
     }
 215  
   }
 216  
 
 217  
   /**
 218  
    * Local Reduce Handle is implementation of ReducerHandle, that will make a
 219  
    * partially reduced value on each worker thread, which are at the end
 220  
    * reduced all together.
 221  
    * This is preferred implementation, unless it cannot be used due to memory
 222  
    * overhead, because all partially reduced values will not fit the memory.
 223  
    *
 224  
    * @param <S> Single value type
 225  
    * @param <R> Reduced value type
 226  
    */
 227  
   public class LocalReduceHandle<S, R extends Writable>
 228  
       extends ReduceHandleImpl<S, R> {
 229  
     private transient Reducer<S, R> reducer;
 230  
 
 231  0
     public LocalReduceHandle(ReduceOperation<S, R> reduceOp) {
 232  0
       super(reduceOp);
 233  0
     }
 234  
 
 235  
     @Override
 236  
     public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
 237  0
       this.reducer = new Reducer<>(reduceOp);
 238  0
     }
 239  
 
 240  
     @Override
 241  
     public void reduce(S valueToReduce) {
 242  0
       reducer.reduce(valueToReduce);
 243  0
     }
 244  
 
 245  
     @Override
 246  
     public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
 247  0
       usage.reduceMerge(getName(), reducer.getCurrentValue());
 248  0
     }
 249  
   }
 250  
 }