1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.benchmark;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.hadoop.io.FloatWritable;
24 import org.apache.hadoop.io.IntWritable;
25 import org.apache.hadoop.io.NullWritable;
26
27 import java.io.IOException;
28
29
30
31
32
33 public class PageRankComputation extends BasicComputation<IntWritable,
34 FloatWritable, NullWritable, FloatWritable> {
35
36 public static final String SUPERSTEP_COUNT =
37 "giraph.pageRank.superstepCount";
38
39 @Override
40 public void compute(
41 Vertex<IntWritable, FloatWritable, NullWritable> vertex,
42 Iterable<FloatWritable> messages) throws IOException {
43 if (getSuperstep() >= 1) {
44 float sum = 0;
45 for (FloatWritable message : messages) {
46 sum += message.get();
47 }
48 vertex.getValue().set((0.15f / getTotalNumVertices()) + 0.85f * sum);
49 }
50
51 if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
52 sendMessageToAllEdges(vertex,
53 new FloatWritable(vertex.getValue().get() / vertex.getNumEdges()));
54 } else {
55 vertex.voteToHalt();
56 }
57 }
58 }