Coverage Report - org.apache.giraph.block_app.library.Pieces
 
Classes in this File Line Coverage Branch Coverage Complexity
Pieces
0%
0/17
N/A
0
Pieces$1
0%
0/3
N/A
0
Pieces$1$1
0%
0/3
N/A
0
Pieces$2
0%
0/3
N/A
0
Pieces$3
0%
0/3
N/A
0
Pieces$3$1
0%
0/3
N/A
0
Pieces$4
0%
0/7
N/A
0
Pieces$4$1
0%
0/5
0%
0/2
0
Pieces$5
0%
0/3
N/A
0
Pieces$6
0%
0/7
N/A
0
Pieces$6$1
0%
0/3
N/A
0
Pieces$7
0%
0/10
N/A
0
Pieces$7$1
0%
0/3
N/A
0
Pieces$7$2
0%
0/3
N/A
0
Pieces$8
0%
0/12
0%
0/2
0
Pieces$8$1
0%
0/2
N/A
0
Pieces$8$2
0%
0/4
N/A
0
Pieces$8$3
0%
0/4
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.library;
 19  
 
 20  
 import java.util.ArrayList;
 21  
 import java.util.Iterator;
 22  
 import java.util.List;
 23  
 
 24  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 25  
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 26  
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 27  
 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
 28  
 import org.apache.giraph.block_app.framework.piece.Piece;
 29  
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerAndBroadcastWrapperHandle;
 30  
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
 31  
 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
 32  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
 33  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
 34  
 import org.apache.giraph.block_app.library.internal.SendMessagePiece;
 35  
 import org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece;
 36  
 import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
 37  
 import org.apache.giraph.combiner.MessageCombiner;
 38  
 import org.apache.giraph.function.Consumer;
 39  
 import org.apache.giraph.function.PairConsumer;
 40  
 import org.apache.giraph.function.Supplier;
 41  
 import org.apache.giraph.function.vertex.ConsumerWithVertex;
 42  
 import org.apache.giraph.function.vertex.SupplierFromVertex;
 43  
 import org.apache.giraph.graph.Vertex;
 44  
 import org.apache.giraph.reducers.ReduceOperation;
 45  
 import org.apache.giraph.reducers.impl.SumReduce;
 46  
 import org.apache.giraph.types.NoMessage;
 47  
 import org.apache.hadoop.io.LongWritable;
 48  
 import org.apache.hadoop.io.Writable;
 49  
 import org.apache.hadoop.io.WritableComparable;
 50  
 import org.apache.log4j.Logger;
 51  
 
 52  
 /**
 53  
  * Utility class for creating common Pieces and computations for processing
 54  
  * graphs.
 55  
  */
 56  0
 public class Pieces {
 57  0
   private static final Logger LOG = Logger.getLogger(Pieces.class);
 58  
 
 59  0
   private Pieces() { }
 60  
 
 61  
   /**
 62  
    * For each vertex execute given process function.
 63  
    * Computation is happening in send phase of the returned Piece.
 64  
    */
 65  
   public static
 66  
   <I extends WritableComparable, V extends Writable, E extends Writable>
 67  
   Piece<I, V, E, NoMessage, Object> forAllVertices(
 68  
       final String pieceName, final Consumer<Vertex<I, V, E>> process) {
 69  0
     return new Piece<I, V, E, NoMessage, Object>() {
 70  
       @Override
 71  
       public VertexSender<I, V, E> getVertexSender(
 72  
           BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
 73  
           Object executionStage) {
 74  0
         return new InnerVertexSender() {
 75  
           @Override
 76  
           public void vertexSend(Vertex<I, V, E> vertex) {
 77  0
             process.apply(vertex);
 78  0
           }
 79  
         };
 80  
       }
 81  
 
 82  
       @Override
 83  
       public String toString() {
 84  0
         return pieceName;
 85  
       }
 86  
     };
 87  
   }
 88  
 
 89  
   /**
 90  
    * Execute given function on master.
 91  
    */
 92  
   public static
 93  
   Piece<WritableComparable, Writable,  Writable, NoMessage,
 94  
     Object> masterCompute(
 95  
       final String pieceName, final Consumer<BlockMasterApi> process) {
 96  0
     return new Piece<WritableComparable, Writable,  Writable, NoMessage,
 97  0
         Object>() {
 98  
       @Override
 99  
       public void masterCompute(
 100  
           BlockMasterApi masterApi, Object executionStage) {
 101  0
         process.apply(masterApi);
 102  0
       }
 103  
     };
 104  
   }
 105  
 
 106  
   /**
 107  
    * For each vertex execute given process function.
 108  
    * Computation is happening in the receive phase of the returned Piece.
 109  
    * This function should be used if you need returned Piece to interact with
 110  
    * subsequent Piece, as that requires passed function to be executed
 111  
    * during receive phase,
 112  
    */
 113  
   public static
 114  
   <I extends WritableComparable, V extends Writable, E extends Writable>
 115  
   Piece<I, V, E, NoMessage, Object> forAllVerticesOnReceive(
 116  
       final String pieceName, final Consumer<Vertex<I, V, E>> process) {
 117  0
     return new Piece<I, V, E, NoMessage, Object>() {
 118  
       @Override
 119  
       public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
 120  
           BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
 121  0
         return new InnerVertexReceiver() {
 122  
           @Override
 123  
           public void vertexReceive(
 124  
               Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
 125  0
             process.apply(vertex);
 126  0
           }
 127  
         };
 128  
       }
 129  
 
 130  
       @Override
 131  
       public String toString() {
 132  0
         return pieceName;
 133  
       }
 134  
     };
 135  
   }
 136  
 
 137  
   /**
 138  
    * Creates Piece which removes vertices for which supplier returns true.
 139  
    */
 140  
   public static
 141  
   <I extends WritableComparable, V extends Writable, E extends Writable>
 142  
   Piece<I, V, E, NoMessage, Object> removeVertices(
 143  
       final String pieceName,
 144  
       final SupplierFromVertex<I, V, E, Boolean> shouldRemoveVertex) {
 145  0
     return new Piece<I, V, E, NoMessage, Object>() {
 146  
       private ReducerHandle<LongWritable, LongWritable> countRemovedAgg;
 147  
 
 148  
       @Override
 149  
       public void registerReducers(
 150  
           CreateReducersApi reduceApi, Object executionStage) {
 151  0
         countRemovedAgg = reduceApi.createLocalReducer(SumReduce.LONG);
 152  0
       }
 153  
 
 154  
       @Override
 155  
       public VertexSender<I, V, E> getVertexSender(
 156  
           final BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
 157  
           Object executionStage) {
 158  0
         return new InnerVertexSender() {
 159  
           @Override
 160  
           public void vertexSend(Vertex<I, V, E> vertex) {
 161  0
             if (shouldRemoveVertex.get(vertex)) {
 162  0
               workerApi.removeVertexRequest(vertex.getId());
 163  0
               reduceLong(countRemovedAgg, 1);
 164  
             }
 165  0
           }
 166  
         };
 167  
       }
 168  
 
 169  
       @Override
 170  
       public void masterCompute(BlockMasterApi master, Object executionStage) {
 171  0
         LOG.info("Removed " + countRemovedAgg.getReducedValue(master) +
 172  
             " vertices from the graph, during stage " + executionStage);
 173  0
       }
 174  
 
 175  
       @Override
 176  
       public String toString() {
 177  0
         return pieceName;
 178  
       }
 179  
     };
 180  
   }
 181  
 
 182  
   /**
 183  
    * Creates single reducer piece - given reduce class, supplier of values on
 184  
    * worker, reduces and passes the result to given consumer on master.
 185  
    *
 186  
    * @param <S> Single value type, objects passed on workers
 187  
    * @param <R> Reduced value type
 188  
    * @param <I> Vertex id type
 189  
    * @param <V> Vertex value type
 190  
    * @param <E> Edge value type
 191  
    */
 192  
   public static
 193  
   <S, R extends Writable, I extends WritableComparable, V extends Writable,
 194  
   E extends Writable>
 195  
   Piece<I, V, E, NoMessage, Object> reduce(
 196  
       String name,
 197  
       ReduceOperation<S, R> reduceOp,
 198  
       SupplierFromVertex<I, V, E, S> valueSupplier,
 199  
       final Consumer<R> reducedValueConsumer) {
 200  0
     return reduceWithMaster(
 201  
         name, reduceOp, valueSupplier,
 202  0
         new PairConsumer<R, BlockMasterApi>() {
 203  
           @Override
 204  
           public void apply(R input, BlockMasterApi master) {
 205  0
             reducedValueConsumer.apply(input);
 206  0
           }
 207  
         });
 208  
   }
 209  
 
 210  
   /**
 211  
    * Creates single reducer piece - given reduce class, supplier of values on
 212  
    * worker, reduces and passes the result to given consumer on master.
 213  
    *
 214  
    * @param <S> Single value type, objects passed on workers
 215  
    * @param <R> Reduced value type
 216  
    * @param <I> Vertex id type
 217  
    * @param <V> Vertex value type
 218  
    * @param <E> Edge value type
 219  
    */
 220  
   public static
 221  
   <S, R extends Writable, I extends WritableComparable, V extends Writable,
 222  
   E extends Writable>
 223  
   Piece<I, V, E, NoMessage, Object> reduceWithMaster(
 224  
       final String name,
 225  
       final ReduceOperation<S, R> reduceOp,
 226  
       final SupplierFromVertex<I, V, E, S> valueSupplier,
 227  
       final PairConsumer<R, BlockMasterApi> reducedValueConsumer) {
 228  0
     return new Piece<I, V, E, NoMessage, Object>() {
 229  
       private ReducerHandle<S, R> handle;
 230  
 
 231  
       @Override
 232  
       public void registerReducers(
 233  
           CreateReducersApi reduceApi, Object executionStage) {
 234  0
         handle = reduceApi.createLocalReducer(reduceOp);
 235  0
       }
 236  
 
 237  
       @Override
 238  
       public VertexSender<I, V, E> getVertexSender(
 239  
           BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
 240  
           Object executionStage) {
 241  0
         return new InnerVertexSender() {
 242  
           @Override
 243  
           public void vertexSend(Vertex<I, V, E> vertex) {
 244  0
             handle.reduce(valueSupplier.get(vertex));
 245  0
           }
 246  
         };
 247  
       }
 248  
 
 249  
       @Override
 250  
       public void masterCompute(BlockMasterApi master, Object executionStage) {
 251  0
         reducedValueConsumer.apply(handle.getReducedValue(master), master);
 252  0
       }
 253  
 
 254  
       @Override
 255  
       public String toString() {
 256  0
         return name;
 257  
       }
 258  
     };
 259  
   }
 260  
 
 261  
   /**
 262  
    * Creates single reducer and broadcast piece - given reduce class, supplier
 263  
    * of values on worker, reduces and broadcasts the value, passing it to the
 264  
    * consumer on worker for each vertex.
 265  
    *
 266  
    * @param <S> Single value type, objects passed on workers
 267  
    * @param <R> Reduced value type
 268  
    * @param <I> Vertex id type
 269  
    * @param <V> Vertex value type
 270  
    * @param <E> Edge value type
 271  
    */
 272  
   public static
 273  
   <S, R extends Writable, I extends WritableComparable, V extends Writable,
 274  
   E extends Writable>
 275  
   Piece<I, V, E, NoMessage, Object> reduceAndBroadcast(
 276  
       final String name,
 277  
       final ReduceOperation<S, R> reduceOp,
 278  
       final SupplierFromVertex<I, V, E, S> valueSupplier,
 279  
       final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
 280  0
     return new Piece<I, V, E, NoMessage, Object>() {
 281  0
       private final ReducerAndBroadcastWrapperHandle<S, R> handle =
 282  
           new ReducerAndBroadcastWrapperHandle<>();
 283  
 
 284  
       @Override
 285  
       public void registerReducers(
 286  
           CreateReducersApi reduceApi, Object executionStage) {
 287  0
         handle.registeredReducer(reduceApi.createLocalReducer(reduceOp));
 288  0
       }
 289  
 
 290  
       @Override
 291  
       public VertexSender<I, V, E> getVertexSender(
 292  
           BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
 293  
           Object executionStage) {
 294  0
         return new InnerVertexSender() {
 295  
           @Override
 296  
           public void vertexSend(Vertex<I, V, E> vertex) {
 297  0
             handle.reduce(valueSupplier.get(vertex));
 298  0
           }
 299  
         };
 300  
       }
 301  
 
 302  
       @Override
 303  
       public void masterCompute(BlockMasterApi master, Object executionStage) {
 304  0
         handle.broadcastValue(master);
 305  0
       }
 306  
 
 307  
       @Override
 308  
       public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
 309  
           BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
 310  0
         final R value = handle.getBroadcast(workerApi);
 311  0
         return new InnerVertexReceiver() {
 312  
           @Override
 313  
           public void vertexReceive(
 314  
               Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
 315  0
             reducedValueConsumer.apply(vertex, value);
 316  0
           }
 317  
         };
 318  
       }
 319  
 
 320  
       @Override
 321  
       public String toString() {
 322  0
         return name;
 323  
       }
 324  
     };
 325  
   }
 326  
 
 327  
   /**
 328  
    * Like reduceAndBroadcast, but uses array of handles for reducers and
 329  
    * broadcasts, to make it feasible and performant when values are large.
 330  
    * Each supplied value to reduce will be reduced in the handle defined by
 331  
    * handleHashSupplier%numHandles
 332  
    *
 333  
    * @param <S> Single value type, objects passed on workers
 334  
    * @param <R> Reduced value type
 335  
    * @param <I> Vertex id type
 336  
    * @param <V> Vertex value type
 337  
    * @param <E> Edge value type
 338  
    */
 339  
   public static
 340  
   <S, R extends Writable, I extends WritableComparable, V extends Writable,
 341  
       E extends Writable>
 342  
   Piece<I, V, E, NoMessage, Object> reduceAndBroadcastWithArrayOfHandles(
 343  
       final String name,
 344  
       final int numHandles,
 345  
       final Supplier<ReduceOperation<S, R>> reduceOp,
 346  
       final SupplierFromVertex<I, V, E, Long> handleHashSupplier,
 347  
       final SupplierFromVertex<I, V, E, S> valueSupplier,
 348  
       final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
 349  0
     return new Piece<I, V, E, NoMessage, Object>() {
 350  
       protected ArrayOfHandles.ArrayOfReducers<S, R> reducers;
 351  
       protected BroadcastArrayHandle<R> broadcasts;
 352  
 
 353  
       private int getHandleIndex(Vertex<I, V, E> vertex) {
 354  0
         return (int) Math.abs(handleHashSupplier.get(vertex) % numHandles);
 355  
       }
 356  
 
 357  
       @Override
 358  
       public void registerReducers(
 359  
           final CreateReducersApi reduceApi, Object executionStage) {
 360  0
         reducers = new ArrayOfHandles.ArrayOfReducers<>(
 361  
             numHandles,
 362  0
             new Supplier<ReducerHandle<S, R>>() {
 363  
               @Override
 364  
               public ReducerHandle<S, R> get() {
 365  0
                 return reduceApi.createLocalReducer(reduceOp.get());
 366  
               }
 367  
             });
 368  0
       }
 369  
 
 370  
       @Override
 371  
       public VertexSender<I, V, E> getVertexSender(
 372  
           BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
 373  
           Object executionStage) {
 374  0
         return new InnerVertexSender() {
 375  
           @Override
 376  
           public void vertexSend(Vertex<I, V, E> vertex) {
 377  0
             reducers.get(getHandleIndex(vertex)).reduce(
 378  0
                 valueSupplier.get(vertex));
 379  0
           }
 380  
         };
 381  
       }
 382  
 
 383  
       @Override
 384  
       public void masterCompute(BlockMasterApi master, Object executionStage) {
 385  0
         broadcasts = reducers.broadcastValue(master);
 386  0
       }
 387  
 
 388  
       @Override
 389  
       public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
 390  
           BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
 391  0
         final List<R> values = new ArrayList<>();
 392  0
         for (int i = 0; i < numHandles; i++) {
 393  0
           values.add(broadcasts.get(i).getBroadcast(workerApi));
 394  
         }
 395  0
         return new InnerVertexReceiver() {
 396  
           @Override
 397  
           public void vertexReceive(
 398  
               Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
 399  0
             reducedValueConsumer.apply(
 400  0
                 vertex, values.get(getHandleIndex(vertex)));
 401  0
           }
 402  
         };
 403  
       }
 404  
 
 405  
       @Override
 406  
       public String toString() {
 407  0
         return name;
 408  
       }
 409  
     };
 410  
   }
 411  
 
 412  
   /**
 413  
    * Creates Piece that for each vertex, sends message provided by
 414  
    * messageSupplier to all targets provided by targetsSupplier.
 415  
    * Received messages are then passed to and processed by provided
 416  
    * messagesConsumer.
 417  
    *
 418  
    * If messageSupplier or targetsSupplier returns null, current vertex
 419  
    * is not going to send any messages.
 420  
    */
 421  
   public static
 422  
   <I extends WritableComparable, V extends Writable, E extends Writable,
 423  
   M extends Writable>
 424  
   SendMessagePiece<I, V, E, M> sendMessage(
 425  
       String name,
 426  
       Class<M> messageClass,
 427  
       SupplierFromVertex<I, V, E, M> messageSupplier,
 428  
       SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
 429  
       ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
 430  0
     return new SendMessagePiece<>(
 431  
         name, messageClass, messageSupplier, targetsSupplier, messagesConsumer);
 432  
   }
 433  
 
 434  
   /**
 435  
    * Creates Piece that for each vertex, sends message provided by
 436  
    * messageSupplier to all neighbors of current vertex.
 437  
    * Received messages are then passed to and processed by provided
 438  
    * messagesConsumer.
 439  
    *
 440  
    * If messageSupplier returns null, current vertex
 441  
    * is not going to send any messages.
 442  
    */
 443  
   public static
 444  
   <I extends WritableComparable, V extends Writable, E extends Writable,
 445  
   M extends Writable>
 446  
   SendMessagePiece<I, V, E, M> sendMessageToNeighbors(
 447  
       String name,
 448  
       Class<M> messageClass,
 449  
       SupplierFromVertex<I, V, E, M> messageSupplier,
 450  
       ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
 451  0
     return sendMessage(
 452  
         name, messageClass, messageSupplier,
 453  0
         VertexSuppliers.<I, V, E>vertexNeighborsSupplier(),
 454  
         messagesConsumer);
 455  
   }
 456  
 
 457  
   /**
 458  
    * Creates Piece that for each vertex, sends message provided by
 459  
    * messageSupplier to all targets provided by targetsSupplier,
 460  
    * and uses given messageCombiner to combine messages together.
 461  
    * Received combined message is then passed to and processed by provided
 462  
    * messageConsumer. (null is passed to it, if vertex received no messages)
 463  
    *
 464  
    * If messageSupplier or targetsSupplier returns null, current vertex
 465  
    * is not going to send any messages.
 466  
    */
 467  
   public static
 468  
   <I extends WritableComparable, V extends Writable, E extends Writable,
 469  
   M extends Writable>
 470  
   SendMessageWithCombinerPiece<I, V, E, M> sendMessage(
 471  
       String name,
 472  
       MessageCombiner<? super I, M> messageCombiner,
 473  
       SupplierFromVertex<I, V, E, M> messageSupplier,
 474  
       SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
 475  
       ConsumerWithVertex<I, V, E, M> messagesConsumer) {
 476  0
     return new SendMessageWithCombinerPiece<>(
 477  
         name, messageCombiner,
 478  
         messageSupplier, targetsSupplier, messagesConsumer);
 479  
   }
 480  
 
 481  
   /**
 482  
    * Creates Piece that for each vertex, sends message provided by
 483  
    * messageSupplier to all neighbors of current vertex,
 484  
    * and uses given messageCombiner to combine messages together.
 485  
    * Received combined message is then passed to and processed by provided
 486  
    * messageConsumer. (null is passed to it, if vertex received no messages)
 487  
    *
 488  
    * If messageSupplier returns null, current vertex
 489  
    * is not going to send any messages.
 490  
    */
 491  
   public static
 492  
   <I extends WritableComparable, V extends Writable, E extends Writable,
 493  
   M extends Writable>
 494  
   SendMessageWithCombinerPiece<I, V, E, M> sendMessageToNeighbors(
 495  
       String name,
 496  
       MessageCombiner<? super I, M> messageCombiner,
 497  
       SupplierFromVertex<I, V, E, M> messageSupplier,
 498  
       ConsumerWithVertex<I, V, E, M> messagesConsumer) {
 499  0
     return sendMessage(
 500  
         name, messageCombiner, messageSupplier,
 501  0
         VertexSuppliers.<I, V, E>vertexNeighborsSupplier(),
 502  
         messagesConsumer);
 503  
   }
 504  
 }