Coverage Report - org.apache.giraph.block_app.framework.piece.delegate.DelegatePiece
 
Classes in this File Line Coverage Branch Coverage Complexity
DelegatePiece
0%
0/68
0%
0/30
0
DelegatePiece$DelegateWorkerReceiveFunctions
0%
0/13
0%
0/8
0
DelegatePiece$DelegateWorkerSendFunctions
0%
0/13
0%
0/8
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.piece.delegate;
 19  
 
 20  
 import java.util.ArrayList;
 21  
 import java.util.Arrays;
 22  
 import java.util.List;
 23  
 
 24  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 25  
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
 26  
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
 27  
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 28  
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 29  
 import org.apache.giraph.block_app.framework.block.PieceCount;
 30  
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 31  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
 32  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
 33  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 34  
 import org.apache.giraph.conf.MessageClasses;
 35  
 import org.apache.giraph.function.Consumer;
 36  
 import org.apache.giraph.graph.Vertex;
 37  
 import org.apache.giraph.types.NoMessage;
 38  
 import org.apache.hadoop.io.Writable;
 39  
 import org.apache.hadoop.io.WritableComparable;
 40  
 
 41  
 import com.google.common.base.Preconditions;
 42  
 
 43  
 /**
 44  
  * Delegate Piece which allows combining multiple pieces in same iteration:
 45  
  * new DelegatePiece(new LogicPiece(), new StatsPiece())
 46  
  * You should be careful when doing so, since those pieces must not interact,
 47  
  * and only one can send messages.
 48  
  * Execution of any of the Piece methods by the framework is going to trigger
 49  
  * sequential execution of that method on all of the pieces this DelegatePiece
 50  
  * wraps. That means for example, getVertexSender is going to be called on all
 51  
  * pieces before masterCompute is called on all pieces, which is called before
 52  
  * getVertexReceiver on all pieces.
 53  
  *
 54  
  * Also, via overriding, it provides an abstract class for filtering. I.e. if
 55  
  * you want piece that filters out calls to masterCompute, you can have:
 56  
  * new FilterMasterPiece(new LogicPiece()),
 57  
  * with FilterMasterPiece extends DelegatePiece, and only overrides getMaster
 58  
  * function and DelegateMasterPiece class.
 59  
  *
 60  
  * @param <I> Vertex id type
 61  
  * @param <V> Vertex value type
 62  
  * @param <E> Edge value type
 63  
  * @param <M> Message type
 64  
  * @param <WV> Worker value type
 65  
  * @param <WM> Worker message type
 66  
  * @param <S> Execution stage type
 67  
  */
 68  0
 @SuppressWarnings("rawtypes")
 69  
 public class DelegatePiece<I extends WritableComparable, V extends Writable,
 70  
     E extends Writable, M extends Writable, WV, WM extends Writable, S>
 71  
     extends AbstractPiece<I, V, E, M, WV, WM, S> {
 72  
 
 73  
   private final List<AbstractPiece<I, V, E, M, WV, WM, S>> innerPieces;
 74  
 
 75  
   @SafeVarargs
 76  
   @SuppressWarnings("unchecked")
 77  
   public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
 78  0
       ? super M, ? super WV, ? super WM, ? super S>... innerPieces) {
 79  
     // Pieces are contravariant, but Java generics cannot express that,
 80  
     // so use unchecked cast inside to allow callers to be typesafe
 81  0
     this.innerPieces = new ArrayList(Arrays.asList(innerPieces));
 82  0
   }
 83  
 
 84  
   @SuppressWarnings("unchecked")
 85  
   public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
 86  0
       ? super M, ? super WV, ? super WM, ? super S> innerPiece) {
 87  
     // Pieces are contravariant, but Java generics cannot express that,
 88  
     // so use unchecked cast inside to allow callers to be typesafe
 89  0
     this.innerPieces = new ArrayList(Arrays.asList(innerPiece));
 90  0
   }
 91  
 
 92  
   protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
 93  
       ArrayList<InnerVertexSender> workerSendFunctions,
 94  
       BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
 95  0
     return new DelegateWorkerSendFunctions(workerSendFunctions);
 96  
   }
 97  
 
 98  
   protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
 99  
       ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
 100  
       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
 101  0
     return new DelegateWorkerReceiveFunctions(workerReceiveFunctions);
 102  
   }
 103  
 
 104  
   @Override
 105  
   public InnerVertexSender getWrappedVertexSender(
 106  
       BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
 107  0
     ArrayList<InnerVertexSender> workerSendFunctions =
 108  0
         new ArrayList<>(innerPieces.size());
 109  0
     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
 110  0
       workerSendFunctions.add(
 111  0
           innerPiece.getWrappedVertexSender(workerApi, executionStage));
 112  0
     }
 113  0
     return delegateWorkerSendFunctions(
 114  
         workerSendFunctions, workerApi, executionStage);
 115  
   }
 116  
 
 117  
   @Override
 118  
   public InnerVertexReceiver getVertexReceiver(
 119  
       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
 120  0
     ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions =
 121  0
         new ArrayList<>(innerPieces.size());
 122  0
     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
 123  0
       workerReceiveFunctions.add(
 124  0
           innerPiece.getVertexReceiver(workerApi, executionStage));
 125  0
     }
 126  0
     return delegateWorkerReceiveFunctions(
 127  
         workerReceiveFunctions, workerApi, executionStage);
 128  
   }
 129  
 
 130  
   /** Delegating WorkerSendPiece */
 131  
   protected class DelegateWorkerSendFunctions extends InnerVertexSender {
 132  
     private final ArrayList<InnerVertexSender> workerSendFunctions;
 133  
 
 134  
     public DelegateWorkerSendFunctions(
 135  0
         ArrayList<InnerVertexSender> workerSendFunctions) {
 136  0
       this.workerSendFunctions = workerSendFunctions;
 137  0
     }
 138  
 
 139  
     @Override
 140  
     public void vertexSend(Vertex<I, V, E> vertex) {
 141  0
       for (InnerVertexSender functions : workerSendFunctions) {
 142  0
         if (functions != null) {
 143  0
           functions.vertexSend(vertex);
 144  
         }
 145  0
       }
 146  0
     }
 147  
 
 148  
     @Override
 149  
     public void postprocess() {
 150  0
       for (InnerVertexSender functions : workerSendFunctions) {
 151  0
         if (functions != null) {
 152  0
           functions.postprocess();
 153  
         }
 154  0
       }
 155  0
     }
 156  
   }
 157  
 
 158  
   /** Delegating WorkerReceivePiece */
 159  
   protected class DelegateWorkerReceiveFunctions extends InnerVertexReceiver {
 160  
     private final ArrayList<VertexReceiver<I, V, E, M>>
 161  
     workerReceiveFunctions;
 162  
 
 163  
     public DelegateWorkerReceiveFunctions(
 164  0
         ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions) {
 165  0
       this.workerReceiveFunctions = workerReceiveFunctions;
 166  0
     }
 167  
 
 168  
     @Override
 169  
     public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
 170  
       for (VertexReceiver<I, V, E, M> functions :
 171  0
             workerReceiveFunctions) {
 172  0
         if (functions != null) {
 173  0
           functions.vertexReceive(vertex, messages);
 174  
         }
 175  0
       }
 176  0
     }
 177  
 
 178  
     @Override
 179  
     public void postprocess() {
 180  
       for (VertexReceiver<I, V, E, M> functions :
 181  0
             workerReceiveFunctions) {
 182  0
         if (functions instanceof VertexPostprocessor) {
 183  0
           ((VertexPostprocessor) functions).postprocess();
 184  
         }
 185  0
       }
 186  0
     }
 187  
   }
 188  
 
 189  
   @Override
 190  
   public void masterCompute(BlockMasterApi api, S executionStage) {
 191  0
     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
 192  0
       piece.masterCompute(api, executionStage);
 193  0
     }
 194  0
   }
 195  
 
 196  
   @Override
 197  
   public void workerContextSend(
 198  
       BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
 199  
       WV workerValue) {
 200  0
     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
 201  0
       piece.workerContextSend(workerContextApi, executionStage, workerValue);
 202  0
     }
 203  0
   }
 204  
 
 205  
   @Override
 206  
   public void workerContextReceive(
 207  
       BlockWorkerContextReceiveApi workerContextApi, S executionStage,
 208  
       WV workerValue, List<WM> workerMessages) {
 209  0
     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
 210  0
       piece.workerContextReceive(
 211  
           workerContextApi, executionStage, workerValue, workerMessages);
 212  0
     }
 213  0
   }
 214  
 
 215  
   @Override
 216  
   public S nextExecutionStage(S executionStage) {
 217  0
     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
 218  0
       executionStage = innerPiece.nextExecutionStage(executionStage);
 219  0
     }
 220  0
     return executionStage;
 221  
   }
 222  
 
 223  
   @Override
 224  
   public MessageClasses<I, M> getMessageClasses(
 225  
       ImmutableClassesGiraphConfiguration conf) {
 226  0
     MessageClasses<I, M> messageClasses = null;
 227  0
     MessageClasses<I, M> firstMessageClasses = null;
 228  0
     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
 229  0
       MessageClasses<I, M> cur = innerPiece.getMessageClasses(conf);
 230  0
       Preconditions.checkState(cur != null);
 231  0
       if (!cur.getMessageClass().equals(NoMessage.class)) {
 232  0
         if (messageClasses != null) {
 233  0
           throw new RuntimeException(
 234  
               "Only one piece combined through delegate (" +
 235  0
               toString() + ") can send messages");
 236  
         }
 237  0
         messageClasses = cur;
 238  
       }
 239  0
       if (firstMessageClasses == null) {
 240  0
         firstMessageClasses = cur;
 241  
       }
 242  0
     }
 243  0
     return messageClasses != null ? messageClasses : firstMessageClasses;
 244  
   }
 245  
 
 246  
   @Override
 247  
   public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
 248  0
     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
 249  0
       innerPiece.forAllPossiblePieces(consumer);
 250  0
     }
 251  0
   }
 252  
 
 253  
   @Override
 254  
   public PieceCount getPieceCount() {
 255  0
     return new PieceCount(1);
 256  
   }
 257  
 
 258  
   @SuppressWarnings("deprecation")
 259  
   @Override
 260  
   public void registerAggregators(BlockMasterApi master)
 261  
       throws InstantiationException, IllegalAccessException {
 262  0
     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
 263  0
       innerPiece.registerAggregators(master);
 264  0
     }
 265  0
   }
 266  
 
 267  
   @Override
 268  
   public void wrappedRegisterReducers(
 269  
       BlockMasterApi masterApi, S executionStage) {
 270  0
     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
 271  0
       innerPiece.wrappedRegisterReducers(masterApi, executionStage);
 272  0
     }
 273  0
   }
 274  
 
 275  
   protected String delegationName() {
 276  0
     return "Delegate";
 277  
   }
 278  
 
 279  
   @Override
 280  
   public String toString() {
 281  0
     return delegationName() + innerPieces.toString();
 282  
   }
 283  
 }