1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.hadoop.io.FloatWritable;
24 import org.apache.hadoop.io.IntWritable;
25 import org.apache.hadoop.io.LongWritable;
26 import org.apache.log4j.Logger;
27
28 import java.io.IOException;
29
30
31
32
33 public class SimpleMsgComputation extends
34 BasicComputation<LongWritable, IntWritable, FloatWritable, IntWritable> {
35
36 private static Logger LOG = Logger.getLogger(SimpleMsgComputation.class);
37 @Override
38 public void compute(
39 Vertex<LongWritable, IntWritable, FloatWritable> vertex,
40 Iterable<IntWritable> messages) throws IOException {
41 if (vertex.getId().equals(new LongWritable(2))) {
42 sendMessage(new LongWritable(1), new IntWritable(101));
43 sendMessage(new LongWritable(1), new IntWritable(102));
44 sendMessage(new LongWritable(1), new IntWritable(103));
45 }
46 if (!vertex.getId().equals(new LongWritable(1))) {
47 vertex.voteToHalt();
48 } else {
49
50 int sum = 0;
51 for (IntWritable message : messages) {
52 sum += message.get();
53 }
54 LOG.info("compute: Received a sum of " + sum +
55 " (will stop on 306)");
56
57 if (sum == 306) {
58 vertex.voteToHalt();
59 }
60 }
61 if (getSuperstep() > 3) {
62 System.err.println("compute: Vertex 1 failed to receive " +
63 "messages in time");
64 vertex.voteToHalt();
65 }
66 }
67 }