Coverage Report - org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece
 
Classes in this File Line Coverage Branch Coverage Complexity
SendMessageWithCombinerPiece
0%
0/14
N/A
0
SendMessageWithCombinerPiece$1
0%
0/6
N/A
0
SendMessageWithCombinerPiece$1$1
0%
0/2
N/A
0
SendMessageWithCombinerPiece$2
0%
0/6
0%
0/6
0
SendMessageWithCombinerPiece$3
0%
0/8
0%
0/4
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.library.internal;
 19  
 
 20  
 import java.util.Iterator;
 21  
 import java.util.Spliterator;
 22  
 import java.util.Spliterators;
 23  
 import java.util.stream.StreamSupport;
 24  
 
 25  
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 26  
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 27  
 import org.apache.giraph.block_app.framework.block.Block;
 28  
 import org.apache.giraph.block_app.framework.piece.Piece;
 29  
 import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
 30  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
 31  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
 32  
 import org.apache.giraph.block_app.library.striping.StripingUtils;
 33  
 import org.apache.giraph.combiner.MessageCombiner;
 34  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 35  
 import org.apache.giraph.function.Function;
 36  
 import org.apache.giraph.function.Predicate;
 37  
 import org.apache.giraph.function.primitive.Int2ObjFunction;
 38  
 import org.apache.giraph.function.vertex.ConsumerWithVertex;
 39  
 import org.apache.giraph.function.vertex.SupplierFromVertex;
 40  
 import org.apache.giraph.graph.Vertex;
 41  
 import org.apache.hadoop.io.Writable;
 42  
 import org.apache.hadoop.io.WritableComparable;
 43  
 
 44  
 import com.google.common.base.Preconditions;
 45  
 
 46  
 /**
 47  
  * Piece that sends a message provided through messageProducer to given set of
 48  
  * neighbors, uses a message combiner and passes them to messagesConsumer.
 49  
  *
 50  
  * @param <I> Vertex id type
 51  
  * @param <V> Vertex value type
 52  
  * @param <E> Edge value type
 53  
  * @param <M> Message type
 54  
  */
 55  0
 public class SendMessageWithCombinerPiece<I extends WritableComparable,
 56  
     V extends Writable, E extends Writable, M extends Writable>
 57  
     extends Piece<I, V, E, M, Object> {
 58  
   private final String name;
 59  
   private final MessageCombiner<? super I, M> messageCombiner;
 60  
   private final SupplierFromVertex<I, V, E, M> messageSupplier;
 61  
   private final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier;
 62  
   private final ConsumerWithVertex<I, V, E, M> messagesConsumer;
 63  
 
 64  
   public SendMessageWithCombinerPiece(String name,
 65  
       MessageCombiner<? super I, M> messageCombiner,
 66  
       SupplierFromVertex<I, V, E, M> messageSupplier,
 67  
       SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
 68  0
       ConsumerWithVertex<I, V, E, M> messagesConsumer) {
 69  0
     Preconditions.checkNotNull(messageCombiner);
 70  0
     this.name = name;
 71  0
     this.messageCombiner = messageCombiner;
 72  0
     this.messageSupplier = messageSupplier;
 73  0
     this.targetsSupplier = targetsSupplier;
 74  0
     this.messagesConsumer = messagesConsumer;
 75  0
   }
 76  
 
 77  
   /**
 78  
    * Stripe message sending computation across multiple stripes, in
 79  
    * each stripe only part of the vertices will receive messages.
 80  
    *
 81  
    * @param stripes Number of stripes
 82  
    * @param stripeSupplier Stripe supplier function, if IDs are Longs, you can
 83  
    *                       use StripingUtils::fastHashStripingPredicate
 84  
    * @return Resulting block
 85  
    */
 86  
   public Block stripeByReceiver(
 87  
       int stripes,
 88  
       Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
 89  0
     return StripingUtils.generateStripedBlock(
 90  
         stripes,
 91  0
         new Function<Predicate<I>, Block>() {
 92  
           @Override
 93  
           public Block apply(final Predicate<I> stripePredicate) {
 94  0
             return FilteringPiece.createReceiveFiltering(
 95  0
                 new SupplierFromVertex<I, V, E, Boolean>() {
 96  
                   @Override
 97  
                   public Boolean get(Vertex<I, V, E> vertex) {
 98  0
                     return stripePredicate.apply(vertex.getId());
 99  
                   }
 100  
                 },
 101  
                 new SendMessageWithCombinerPiece<>(
 102  0
                   name,
 103  0
                   messageCombiner,
 104  0
                   messageSupplier,
 105  
                   new SupplierFromVertex<I, V, E, Iterator<I>>() {
 106  
                     @Override
 107  
                     public Iterator<I> get(Vertex<I, V, E> vertex) {
 108  
                       return StreamSupport.stream(
 109  
                         Spliterators.spliteratorUnknownSize(
 110  
                           targetsSupplier.get(vertex), Spliterator.ORDERED),
 111  
                         false).filter(stripePredicate::apply).iterator();
 112  
                     }
 113  
                   },
 114  0
                   messagesConsumer));
 115  
           }
 116  
         },
 117  
         stripeSupplier);
 118  
   }
 119  
 
 120  
   @Override
 121  
   public VertexSender<I, V, E> getVertexSender(
 122  
       final BlockWorkerSendApi<I, V, E, M> workerApi,
 123  
       Object executionStage) {
 124  0
     return new InnerVertexSender() {
 125  
       @Override
 126  
       public void vertexSend(Vertex<I, V, E> vertex) {
 127  0
         Iterator<I> targets = targetsSupplier.get(vertex);
 128  0
         M message = messageSupplier.get(vertex);
 129  0
         if (message != null && targets != null && targets.hasNext()) {
 130  0
           workerApi.sendMessageToMultipleEdges(targets, message);
 131  
         }
 132  0
       }
 133  
     };
 134  
   }
 135  
 
 136  
   @Override
 137  
   public VertexReceiver<I, V, E, M> getVertexReceiver(
 138  
       BlockWorkerReceiveApi<I> workerApi,
 139  
       Object executionStage) {
 140  0
     return new InnerVertexReceiver() {
 141  
       @Override
 142  
       public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
 143  0
         Iterator<M> iter = messages.iterator();
 144  0
         M combinedMessage = null;
 145  0
         if (iter.hasNext()) {
 146  0
           combinedMessage = iter.next();
 147  
           // When message combiner is used, there is never more then one message
 148  0
           Preconditions.checkArgument(!iter.hasNext());
 149  
         }
 150  0
         messagesConsumer.apply(vertex, combinedMessage);
 151  0
       }
 152  
     };
 153  
   }
 154  
 
 155  
   @Override
 156  
   public MessageCombiner<? super I, M> getMessageCombiner(
 157  
       ImmutableClassesGiraphConfiguration conf) {
 158  0
     return messageCombiner;
 159  
   }
 160  
 
 161  
   @Override
 162  
   public String toString() {
 163  0
     return name;
 164  
   }
 165  
 }