1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.hadoop.io.FloatWritable;
24 import org.apache.hadoop.io.IntWritable;
25 import org.apache.hadoop.io.LongWritable;
26 import org.apache.log4j.Logger;
27
28 import java.io.IOException;
29
30
31
32
33 public class SimpleCombinerComputation extends
34 BasicComputation<LongWritable, IntWritable, FloatWritable, IntWritable> {
35
36 private static Logger LOG = Logger.getLogger(SimpleCombinerComputation.class);
37
38 @Override
39 public void compute(
40 Vertex<LongWritable, IntWritable, FloatWritable> vertex,
41 Iterable<IntWritable> messages) throws IOException {
42 if (vertex.getId().equals(new LongWritable(2))) {
43 sendMessage(new LongWritable(1), new IntWritable(101));
44 sendMessage(new LongWritable(1), new IntWritable(102));
45 sendMessage(new LongWritable(1), new IntWritable(103));
46 }
47 if (!vertex.getId().equals(new LongWritable(1))) {
48 vertex.voteToHalt();
49 } else {
50
51 int sum = 0;
52 int num = 0;
53 for (IntWritable message : messages) {
54 sum += message.get();
55 num++;
56 }
57 LOG.info("TestCombinerVertex: Received a sum of " + sum +
58 " (should have 306 with a single message value)");
59
60 if (num == 1 && sum == 306) {
61 vertex.voteToHalt();
62 }
63 }
64 if (getSuperstep() > 3) {
65 throw new IllegalStateException(
66 "TestCombinerVertex: Vertex 1 failed to receive " +
67 "messages in time");
68 }
69 }
70 }