1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.benchmark;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.edge.Edge;
23 import org.apache.giraph.graph.Vertex;
24 import org.apache.hadoop.io.DoubleWritable;
25 import org.apache.hadoop.io.LongWritable;
26
27 import java.io.IOException;
28
29
30
31
32 public class ShortestPathsComputation extends BasicComputation<LongWritable,
33 DoubleWritable, DoubleWritable, DoubleWritable> {
34
35 public static final String SOURCE_ID =
36 "giraph.shortestPathsBenchmark.sourceId";
37
38 public static final long SOURCE_ID_DEFAULT = 1;
39
40
41
42
43
44
45
46 private boolean isSource(
47 Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex) {
48 return vertex.getId().get() ==
49 getConf().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
50 }
51
52 @Override
53 public void compute(
54 Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
55 Iterable<DoubleWritable> messages) throws IOException {
56 if (getSuperstep() == 0) {
57 vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
58 }
59
60 double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
61 for (DoubleWritable message : messages) {
62 minDist = Math.min(minDist, message.get());
63 }
64
65 if (minDist < vertex.getValue().get()) {
66 vertex.setValue(new DoubleWritable(minDist));
67 for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
68 double distance = minDist + edge.getValue().get();
69 sendMessage(edge.getTargetVertexId(),
70 new DoubleWritable(distance));
71 }
72 }
73
74 vertex.voteToHalt();
75 }
76 }