View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.library;
19  
20  import java.util.Iterator;
21  
22  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
23  import org.apache.giraph.block_app.framework.block.Block;
24  import org.apache.giraph.block_app.framework.block.SequenceBlock;
25  import org.apache.giraph.combiner.MessageCombiner;
26  import org.apache.giraph.function.Consumer;
27  import org.apache.giraph.function.Function;
28  import org.apache.giraph.function.ObjectTransfer;
29  import org.apache.giraph.function.PairConsumer;
30  import org.apache.giraph.function.vertex.ConsumerWithVertex;
31  import org.apache.giraph.function.vertex.FunctionWithVertex;
32  import org.apache.giraph.function.vertex.SupplierFromVertex;
33  import org.apache.giraph.graph.Vertex;
34  import org.apache.giraph.reducers.ReduceOperation;
35  import org.apache.hadoop.io.Writable;
36  import org.apache.hadoop.io.WritableComparable;
37  
38  /**
39   * Utility class for creating sequences of sending replies to received
40   * messages. Current instance of this object represents partial chain,
41   * where we have specified which messages will be send at the lastly defined
42   * link in the chain thus far, but we haven't specified yet what to do when
43   * vertices receive them.
44   *
45   * Contains set of:
46   * - static startX methods, used to create the chain
47   * - thenX methods, used to add one more Piece to the chain, can be
48   *   "chained" arbitrary number of times.
49   * - endX methods, used to finish the chain, returning
50   *   the Block representing the whole chain
51   *
52   * If messageSupplier or targetsSupplier returns null, current vertex
53   * is not going to send any messages.
54   *
55   * @param <I> Vertex id type
56   * @param <V> Vertex value type
57   * @param <E> Edge value type
58   * @param <P> Previous value
59   */
60  public class SendMessageChain<I extends WritableComparable, V extends Writable,
61      E extends Writable, P> {
62    /**
63     * Represent current partial chain. Given a way to consume messages
64     * received in lastly defined link in this chain, it will produce block
65     * representing a chain created thus far.
66     */
67    private final Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator;
68  
69    private SendMessageChain(
70        Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator) {
71      this.blockCreator = blockCreator;
72    }
73  
74    /**
75     * Start chain with sending message provided by messageSupplier to all
76     * targets provided by targetsSupplier.
77     */
78    public static <I extends WritableComparable, V extends Writable,
79    E extends Writable, M extends Writable>
80    SendMessageChain<I, V, E, Iterable<M>> startSend(
81        final String name,
82        final Class<M> messageClass,
83        final SupplierFromVertex<I, V, E, M> messageSupplier,
84        final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
85      return new SendMessageChain<>(
86          new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() {
87            @Override
88            public Block apply(
89                ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
90              return Pieces.sendMessage(
91                  name, messageClass, messageSupplier,
92                  targetsSupplier, messagesConsumer);
93            }
94          });
95    }
96  
97    /**
98     * Start chain with sending message provided by messageSupplier to all
99     * targets provided by targetsSupplier, and use given messageCombiner to
100    * combine messages together.
101    */
102   public static <I extends WritableComparable, V extends Writable,
103   E extends Writable, M extends Writable>
104   SendMessageChain<I, V, E, M> startSend(
105       final String name,
106       final MessageCombiner<? super I, M> messageCombiner,
107       final SupplierFromVertex<I, V, E, M> messageSupplier,
108       final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
109     return new SendMessageChain<>(
110         new Function<ConsumerWithVertex<I, V, E, M>, Block>() {
111           @Override
112           public Block apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) {
113             return Pieces.sendMessage(
114                 name, messageCombiner, messageSupplier,
115                 targetsSupplier, messagesConsumer);
116           }
117         });
118   }
119 
120   /**
121    * Start chain with sending message provided by messageSupplier to all
122    * neighbors of a current vertex.
123    */
124   public static <I extends WritableComparable, V extends Writable,
125   E extends Writable, M extends Writable>
126   SendMessageChain<I, V, E, Iterable<M>> startSendToNeighbors(
127       final String name,
128       final Class<M> messageClass,
129       final SupplierFromVertex<I, V, E, M> messageSupplier) {
130     return startSend(name, messageClass, messageSupplier,
131         VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
132   }
133 
134   /**
135    * Start chain with sending message provided by messageSupplier to all
136    * neighbors of a current vertex, and use given messageCombiner to
137    * combine messages together.
138    */
139   public static <I extends WritableComparable, V extends Writable,
140   E extends Writable, M extends Writable>
141   SendMessageChain<I, V, E, M> startSendToNeighbors(
142       final String name,
143       final MessageCombiner<? super I, M> messageCombiner,
144       final SupplierFromVertex<I, V, E, M> messageSupplier) {
145     return startSend(name, messageCombiner, messageSupplier,
146         VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
147   }
148 
149   /**
150    * Start chain by providing a function that will produce Block representing
151    * beginning of the chain, given a consumer of messages send
152    * by the last link in the created block.
153    */
154   public static <I extends WritableComparable, V extends Writable,
155   E extends Writable, P extends Writable>
156   SendMessageChain<I, V, E, P> startCustom(
157       Function<ConsumerWithVertex<I, V, E, P>, Block> createStartingBlock) {
158     return new SendMessageChain<>(createStartingBlock);
159   }
160 
161   /**
162    * Give previously received message(s) to messageSupplier, and send message
163    * it returns to all targets provided by targetsSupplier.
164    */
165   public <M extends Writable>
166   SendMessageChain<I, V, E, Iterable<M>> thenSend(
167       final String name,
168       final Class<M> messageClass,
169       final FunctionWithVertex<I, V, E, P, M> messageSupplier,
170       final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
171     final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
172 
173     return new SendMessageChain<>(
174         new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() {
175           @Override
176           public Block apply(
177               ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
178             return new SequenceBlock(
179               blockCreator.apply(
180                   prevMessagesTransfer.<I, V, E>castToConsumer()),
181               Pieces.sendMessage(
182                 name, messageClass,
183                 new SupplierFromVertex<I, V, E, M>() {
184                   @Override
185                   public M get(Vertex<I, V, E> vertex) {
186                     return messageSupplier.apply(
187                         vertex, prevMessagesTransfer.get());
188                   }
189                 },
190                 targetsSupplier, messagesConsumer));
191           }
192         });
193   }
194 
195   /**
196    * Give previously received message(s) to messageSupplier, and send message
197    * it returns to all neighbors of current vertex.
198    */
199   public <M extends Writable>
200   SendMessageChain<I, V, E, Iterable<M>> thenSendToNeighbors(
201       final String name,
202       final Class<M> messageClass,
203       final FunctionWithVertex<I, V, E, P, M> messageSupplier) {
204     return thenSend(name, messageClass, messageSupplier,
205         VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
206   }
207 
208   /**
209    * Give previously received message(s) to messageSupplier, and send message
210    * it returns to all targets provided by targetsSupplier, and use given
211    * messageCombiner to combine messages together.
212    */
213   public <M extends Writable>
214   SendMessageChain<I, V, E, M> thenSend(
215       final String name,
216       final MessageCombiner<? super I, M> messageCombiner,
217       final FunctionWithVertex<I, V, E, P, M> messageSupplier,
218       final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
219     final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
220 
221     return new SendMessageChain<>(
222         new Function<ConsumerWithVertex<I, V, E, M>, Block>() {
223           @Override
224           public Block apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) {
225             return new SequenceBlock(
226               blockCreator.apply(
227                   prevMessagesTransfer.<I, V, E>castToConsumer()),
228               Pieces.sendMessage(
229                 name, messageCombiner,
230                 new SupplierFromVertex<I, V, E, M>() {
231                   @Override
232                   public M get(Vertex<I, V, E> vertex) {
233                     return messageSupplier.apply(
234                         vertex, prevMessagesTransfer.get());
235                   }
236                 },
237                 targetsSupplier, messagesConsumer));
238           }
239         });
240   }
241 
242   /**
243    * Give previously received message(s) to messageSupplier, and send message
244    * it returns to all neighbors of current vertex, and use given
245    * messageCombiner to combine messages together.
246    */
247   public <M extends Writable>
248   SendMessageChain<I, V, E, M> thenSendToNeighbors(
249       final String name,
250       final MessageCombiner<? super I, M> messageCombiner,
251       final FunctionWithVertex<I, V, E, P, M> messageSupplier) {
252     return thenSend(name, messageCombiner, messageSupplier,
253         VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
254   }
255 
256   /**
257    * End chain by giving received messages to valueSupplier,
258    * to produce value that should be reduced, and consumed on master
259    * by reducedValueConsumer.
260    */
261   public <S, R extends Writable>
262   Block endReduce(final String name, final ReduceOperation<S, R> reduceOp,
263       final FunctionWithVertex<I, V, E, P, S> valueSupplier,
264       final Consumer<R> reducedValueConsumer) {
265     return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() {
266       @Override
267       public Block apply(final SupplierFromVertex<I, V, E, P> prevMessages) {
268         return Pieces.reduce(
269             name,
270             reduceOp,
271             new SupplierFromVertex<I, V, E, S>() {
272               @Override
273               public S get(Vertex<I, V, E> vertex) {
274                 return valueSupplier.apply(vertex, prevMessages.get(vertex));
275               }
276             },
277             reducedValueConsumer);
278       }
279     });
280   }
281 
282   /**
283    * End chain by giving received messages to valueSupplier,
284    * to produce value that should be reduced, and consumed on master
285    * by reducedValueConsumer.
286    */
287   public <S, R extends Writable>
288   Block endReduceWithMaster(
289       final String name, final ReduceOperation<S, R> reduceOp,
290       final FunctionWithVertex<I, V, E, P, S> valueSupplier,
291       final PairConsumer<R, BlockMasterApi> reducedValueConsumer) {
292     return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() {
293       @Override
294       public Block apply(final SupplierFromVertex<I, V, E, P> prevMessages) {
295         return Pieces.reduceWithMaster(
296             name,
297             reduceOp,
298             new SupplierFromVertex<I, V, E, S>() {
299               @Override
300               public S get(Vertex<I, V, E> vertex) {
301                 return valueSupplier.apply(vertex, prevMessages.get(vertex));
302               }
303             },
304             reducedValueConsumer);
305       }
306     });
307   }
308 
309   /**
310    * End chain by processing messages received within the last link
311    * in the chain.
312    */
313   public Block endConsume(ConsumerWithVertex<I, V, E, P> messagesConsumer) {
314     return blockCreator.apply(messagesConsumer);
315   }
316 
317   /**
318    * End chain by providing a function that will produce Block to be attached
319    * to the end of current chain, given a supplier of messages received
320    * within the last link in the chain.
321    */
322   public Block endCustom(
323       Function<SupplierFromVertex<I, V, E, P>, Block> createBlockToAttach) {
324     final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
325     return new SequenceBlock(
326         blockCreator.apply(prevMessagesTransfer.<I, V, E>castToConsumer()),
327         createBlockToAttach.apply(
328             prevMessagesTransfer.<I, V, E>castToSupplier()));
329   }
330 }