Coverage Report - org.apache.giraph.block_app.library.SendMessageChain
 
Classes in this File Line Coverage Branch Coverage Complexity
SendMessageChain
0%
0/27
N/A
0
SendMessageChain$1
0%
0/2
N/A
0
SendMessageChain$2
0%
0/2
N/A
0
SendMessageChain$3
0%
0/5
N/A
0
SendMessageChain$3$1
0%
0/3
N/A
0
SendMessageChain$4
0%
0/5
N/A
0
SendMessageChain$4$1
0%
0/3
N/A
0
SendMessageChain$5
0%
0/2
N/A
0
SendMessageChain$5$1
0%
0/2
N/A
0
SendMessageChain$6
0%
0/2
N/A
0
SendMessageChain$6$1
0%
0/2
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.library;
 19  
 
 20  
 import java.util.Iterator;
 21  
 
 22  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 23  
 import org.apache.giraph.block_app.framework.block.Block;
 24  
 import org.apache.giraph.block_app.framework.block.SequenceBlock;
 25  
 import org.apache.giraph.combiner.MessageCombiner;
 26  
 import org.apache.giraph.function.Consumer;
 27  
 import org.apache.giraph.function.Function;
 28  
 import org.apache.giraph.function.ObjectTransfer;
 29  
 import org.apache.giraph.function.PairConsumer;
 30  
 import org.apache.giraph.function.vertex.ConsumerWithVertex;
 31  
 import org.apache.giraph.function.vertex.FunctionWithVertex;
 32  
 import org.apache.giraph.function.vertex.SupplierFromVertex;
 33  
 import org.apache.giraph.graph.Vertex;
 34  
 import org.apache.giraph.reducers.ReduceOperation;
 35  
 import org.apache.hadoop.io.Writable;
 36  
 import org.apache.hadoop.io.WritableComparable;
 37  
 
 38  
 /**
 39  
  * Utility class for creating sequences of sending replies to received
 40  
  * messages. Current instance of this object represents partial chain,
 41  
  * where we have specified which messages will be send at the lastly defined
 42  
  * link in the chain thus far, but we haven't specified yet what to do when
 43  
  * vertices receive them.
 44  
  *
 45  
  * Contains set of:
 46  
  * - static startX methods, used to create the chain
 47  
  * - thenX methods, used to add one more Piece to the chain, can be
 48  
  *   "chained" arbitrary number of times.
 49  
  * - endX methods, used to finish the chain, returning
 50  
  *   the Block representing the whole chain
 51  
  *
 52  
  * If messageSupplier or targetsSupplier returns null, current vertex
 53  
  * is not going to send any messages.
 54  
  *
 55  
  * @param <I> Vertex id type
 56  
  * @param <V> Vertex value type
 57  
  * @param <E> Edge value type
 58  
  * @param <P> Previous value
 59  
  */
 60  0
 public class SendMessageChain<I extends WritableComparable, V extends Writable,
 61  
     E extends Writable, P> {
 62  
   /**
 63  
    * Represent current partial chain. Given a way to consume messages
 64  
    * received in lastly defined link in this chain, it will produce block
 65  
    * representing a chain created thus far.
 66  
    */
 67  
   private final Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator;
 68  
 
 69  
   private SendMessageChain(
 70  0
       Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator) {
 71  0
     this.blockCreator = blockCreator;
 72  0
   }
 73  
 
 74  
   /**
 75  
    * Start chain with sending message provided by messageSupplier to all
 76  
    * targets provided by targetsSupplier.
 77  
    */
 78  
   public static <I extends WritableComparable, V extends Writable,
 79  
   E extends Writable, M extends Writable>
 80  
   SendMessageChain<I, V, E, Iterable<M>> startSend(
 81  
       final String name,
 82  
       final Class<M> messageClass,
 83  
       final SupplierFromVertex<I, V, E, M> messageSupplier,
 84  
       final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
 85  0
     return new SendMessageChain<>(
 86  0
         new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() {
 87  
           @Override
 88  
           public Block apply(
 89  
               ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
 90  0
             return Pieces.sendMessage(
 91  
                 name, messageClass, messageSupplier,
 92  
                 targetsSupplier, messagesConsumer);
 93  
           }
 94  
         });
 95  
   }
 96  
 
 97  
   /**
 98  
    * Start chain with sending message provided by messageSupplier to all
 99  
    * targets provided by targetsSupplier, and use given messageCombiner to
 100  
    * combine messages together.
 101  
    */
 102  
   public static <I extends WritableComparable, V extends Writable,
 103  
   E extends Writable, M extends Writable>
 104  
   SendMessageChain<I, V, E, M> startSend(
 105  
       final String name,
 106  
       final MessageCombiner<? super I, M> messageCombiner,
 107  
       final SupplierFromVertex<I, V, E, M> messageSupplier,
 108  
       final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
 109  0
     return new SendMessageChain<>(
 110  0
         new Function<ConsumerWithVertex<I, V, E, M>, Block>() {
 111  
           @Override
 112  
           public Block apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) {
 113  0
             return Pieces.sendMessage(
 114  
                 name, messageCombiner, messageSupplier,
 115  
                 targetsSupplier, messagesConsumer);
 116  
           }
 117  
         });
 118  
   }
 119  
 
 120  
   /**
 121  
    * Start chain with sending message provided by messageSupplier to all
 122  
    * neighbors of a current vertex.
 123  
    */
 124  
   public static <I extends WritableComparable, V extends Writable,
 125  
   E extends Writable, M extends Writable>
 126  
   SendMessageChain<I, V, E, Iterable<M>> startSendToNeighbors(
 127  
       final String name,
 128  
       final Class<M> messageClass,
 129  
       final SupplierFromVertex<I, V, E, M> messageSupplier) {
 130  0
     return startSend(name, messageClass, messageSupplier,
 131  0
         VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
 132  
   }
 133  
 
 134  
   /**
 135  
    * Start chain with sending message provided by messageSupplier to all
 136  
    * neighbors of a current vertex, and use given messageCombiner to
 137  
    * combine messages together.
 138  
    */
 139  
   public static <I extends WritableComparable, V extends Writable,
 140  
   E extends Writable, M extends Writable>
 141  
   SendMessageChain<I, V, E, M> startSendToNeighbors(
 142  
       final String name,
 143  
       final MessageCombiner<? super I, M> messageCombiner,
 144  
       final SupplierFromVertex<I, V, E, M> messageSupplier) {
 145  0
     return startSend(name, messageCombiner, messageSupplier,
 146  0
         VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
 147  
   }
 148  
 
 149  
   /**
 150  
    * Start chain by providing a function that will produce Block representing
 151  
    * beginning of the chain, given a consumer of messages send
 152  
    * by the last link in the created block.
 153  
    */
 154  
   public static <I extends WritableComparable, V extends Writable,
 155  
   E extends Writable, P extends Writable>
 156  
   SendMessageChain<I, V, E, P> startCustom(
 157  
       Function<ConsumerWithVertex<I, V, E, P>, Block> createStartingBlock) {
 158  0
     return new SendMessageChain<>(createStartingBlock);
 159  
   }
 160  
 
 161  
   /**
 162  
    * Give previously received message(s) to messageSupplier, and send message
 163  
    * it returns to all targets provided by targetsSupplier.
 164  
    */
 165  
   public <M extends Writable>
 166  
   SendMessageChain<I, V, E, Iterable<M>> thenSend(
 167  
       final String name,
 168  
       final Class<M> messageClass,
 169  
       final FunctionWithVertex<I, V, E, P, M> messageSupplier,
 170  
       final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
 171  0
     final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
 172  
 
 173  0
     return new SendMessageChain<>(
 174  0
         new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() {
 175  
           @Override
 176  
           public Block apply(
 177  
               ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
 178  0
             return new SequenceBlock(
 179  0
               blockCreator.apply(
 180  0
                   prevMessagesTransfer.<I, V, E>castToConsumer()),
 181  0
               Pieces.sendMessage(
 182  
                 name, messageClass,
 183  0
                 new SupplierFromVertex<I, V, E, M>() {
 184  
                   @Override
 185  
                   public M get(Vertex<I, V, E> vertex) {
 186  0
                     return messageSupplier.apply(
 187  0
                         vertex, prevMessagesTransfer.get());
 188  
                   }
 189  
                 },
 190  
                 targetsSupplier, messagesConsumer));
 191  
           }
 192  
         });
 193  
   }
 194  
 
 195  
   /**
 196  
    * Give previously received message(s) to messageSupplier, and send message
 197  
    * it returns to all neighbors of current vertex.
 198  
    */
 199  
   public <M extends Writable>
 200  
   SendMessageChain<I, V, E, Iterable<M>> thenSendToNeighbors(
 201  
       final String name,
 202  
       final Class<M> messageClass,
 203  
       final FunctionWithVertex<I, V, E, P, M> messageSupplier) {
 204  0
     return thenSend(name, messageClass, messageSupplier,
 205  0
         VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
 206  
   }
 207  
 
 208  
   /**
 209  
    * Give previously received message(s) to messageSupplier, and send message
 210  
    * it returns to all targets provided by targetsSupplier, and use given
 211  
    * messageCombiner to combine messages together.
 212  
    */
 213  
   public <M extends Writable>
 214  
   SendMessageChain<I, V, E, M> thenSend(
 215  
       final String name,
 216  
       final MessageCombiner<? super I, M> messageCombiner,
 217  
       final FunctionWithVertex<I, V, E, P, M> messageSupplier,
 218  
       final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
 219  0
     final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
 220  
 
 221  0
     return new SendMessageChain<>(
 222  0
         new Function<ConsumerWithVertex<I, V, E, M>, Block>() {
 223  
           @Override
 224  
           public Block apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) {
 225  0
             return new SequenceBlock(
 226  0
               blockCreator.apply(
 227  0
                   prevMessagesTransfer.<I, V, E>castToConsumer()),
 228  0
               Pieces.sendMessage(
 229  
                 name, messageCombiner,
 230  0
                 new SupplierFromVertex<I, V, E, M>() {
 231  
                   @Override
 232  
                   public M get(Vertex<I, V, E> vertex) {
 233  0
                     return messageSupplier.apply(
 234  0
                         vertex, prevMessagesTransfer.get());
 235  
                   }
 236  
                 },
 237  
                 targetsSupplier, messagesConsumer));
 238  
           }
 239  
         });
 240  
   }
 241  
 
 242  
   /**
 243  
    * Give previously received message(s) to messageSupplier, and send message
 244  
    * it returns to all neighbors of current vertex, and use given
 245  
    * messageCombiner to combine messages together.
 246  
    */
 247  
   public <M extends Writable>
 248  
   SendMessageChain<I, V, E, M> thenSendToNeighbors(
 249  
       final String name,
 250  
       final MessageCombiner<? super I, M> messageCombiner,
 251  
       final FunctionWithVertex<I, V, E, P, M> messageSupplier) {
 252  0
     return thenSend(name, messageCombiner, messageSupplier,
 253  0
         VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
 254  
   }
 255  
 
 256  
   /**
 257  
    * End chain by giving received messages to valueSupplier,
 258  
    * to produce value that should be reduced, and consumed on master
 259  
    * by reducedValueConsumer.
 260  
    */
 261  
   public <S, R extends Writable>
 262  
   Block endReduce(final String name, final ReduceOperation<S, R> reduceOp,
 263  
       final FunctionWithVertex<I, V, E, P, S> valueSupplier,
 264  
       final Consumer<R> reducedValueConsumer) {
 265  0
     return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() {
 266  
       @Override
 267  
       public Block apply(final SupplierFromVertex<I, V, E, P> prevMessages) {
 268  0
         return Pieces.reduce(
 269  
             name,
 270  
             reduceOp,
 271  0
             new SupplierFromVertex<I, V, E, S>() {
 272  
               @Override
 273  
               public S get(Vertex<I, V, E> vertex) {
 274  0
                 return valueSupplier.apply(vertex, prevMessages.get(vertex));
 275  
               }
 276  
             },
 277  
             reducedValueConsumer);
 278  
       }
 279  
     });
 280  
   }
 281  
 
 282  
   /**
 283  
    * End chain by giving received messages to valueSupplier,
 284  
    * to produce value that should be reduced, and consumed on master
 285  
    * by reducedValueConsumer.
 286  
    */
 287  
   public <S, R extends Writable>
 288  
   Block endReduceWithMaster(
 289  
       final String name, final ReduceOperation<S, R> reduceOp,
 290  
       final FunctionWithVertex<I, V, E, P, S> valueSupplier,
 291  
       final PairConsumer<R, BlockMasterApi> reducedValueConsumer) {
 292  0
     return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() {
 293  
       @Override
 294  
       public Block apply(final SupplierFromVertex<I, V, E, P> prevMessages) {
 295  0
         return Pieces.reduceWithMaster(
 296  
             name,
 297  
             reduceOp,
 298  0
             new SupplierFromVertex<I, V, E, S>() {
 299  
               @Override
 300  
               public S get(Vertex<I, V, E> vertex) {
 301  0
                 return valueSupplier.apply(vertex, prevMessages.get(vertex));
 302  
               }
 303  
             },
 304  
             reducedValueConsumer);
 305  
       }
 306  
     });
 307  
   }
 308  
 
 309  
   /**
 310  
    * End chain by processing messages received within the last link
 311  
    * in the chain.
 312  
    */
 313  
   public Block endConsume(ConsumerWithVertex<I, V, E, P> messagesConsumer) {
 314  0
     return blockCreator.apply(messagesConsumer);
 315  
   }
 316  
 
 317  
   /**
 318  
    * End chain by providing a function that will produce Block to be attached
 319  
    * to the end of current chain, given a supplier of messages received
 320  
    * within the last link in the chain.
 321  
    */
 322  
   public Block endCustom(
 323  
       Function<SupplierFromVertex<I, V, E, P>, Block> createBlockToAttach) {
 324  0
     final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
 325  0
     return new SequenceBlock(
 326  0
         blockCreator.apply(prevMessagesTransfer.<I, V, E>castToConsumer()),
 327  0
         createBlockToAttach.apply(
 328  0
             prevMessagesTransfer.<I, V, E>castToSupplier()));
 329  
   }
 330  
 }