1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.benchmark;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.edge.Edge;
23 import org.apache.giraph.edge.MutableEdge;
24 import org.apache.giraph.graph.Vertex;
25 import org.apache.hadoop.io.DoubleWritable;
26 import org.apache.hadoop.io.LongWritable;
27
28 import java.io.IOException;
29
30
31
32
33 public class WeightedPageRankComputation extends BasicComputation<LongWritable,
34 DoubleWritable, DoubleWritable, DoubleWritable> {
35
36 public static final String SUPERSTEP_COUNT =
37 "giraph.weightedPageRank.superstepCount";
38
39 @Override
40 public void compute(
41 Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
42 Iterable<DoubleWritable> messages) throws IOException {
43 if (getSuperstep() == 0) {
44
45 double outEdgeSum = 0;
46 for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
47 outEdgeSum += edge.getValue().get();
48 }
49 for (MutableEdge<LongWritable, DoubleWritable> edge :
50 vertex.getMutableEdges()) {
51 edge.setValue(new DoubleWritable(edge.getValue().get() / outEdgeSum));
52 }
53 } else {
54 double messageSum = 0;
55 for (DoubleWritable message : messages) {
56 messageSum += message.get();
57 }
58 vertex.getValue().set(
59 (0.15f / getTotalNumVertices()) + 0.85f * messageSum);
60 }
61
62 if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
63 for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
64 sendMessage(edge.getTargetVertexId(),
65 new DoubleWritable(
66 vertex.getValue().get() * edge.getValue().get()));
67 }
68 } else {
69 vertex.voteToHalt();
70 }
71 }
72 }