1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.edge.Edge;
23 import org.apache.giraph.graph.Vertex;
24 import org.apache.hadoop.io.DoubleWritable;
25 import org.apache.hadoop.io.LongWritable;
26 import org.apache.hadoop.io.Writable;
27
28 import java.io.IOException;
29
30
31
32
33
34
35 public abstract class RandomWalkComputation<E extends Writable>
36 extends BasicComputation<LongWritable, DoubleWritable, E, DoubleWritable> {
37
38 static final String MAX_SUPERSTEPS = RandomWalkComputation.class.getName() +
39 ".maxSupersteps";
40
41 static final String TELEPORTATION_PROBABILITY = RandomWalkComputation.class
42 .getName() + ".teleportationProbability";
43
44 static final String CUMULATIVE_DANGLING_PROBABILITY =
45 RandomWalkComputation.class.getName() + ".cumulativeDanglingProbability";
46
47 static final String CUMULATIVE_PROBABILITY = RandomWalkComputation.class
48 .getName() + ".cumulativeProbability";
49
50 static final String NUM_DANGLING_VERTICES = RandomWalkComputation.class
51 .getName() + ".numDanglingVertices";
52
53
54 static final String L1_NORM_OF_PROBABILITY_DIFFERENCE =
55 RandomWalkComputation.class.getName() + ".l1NormOfProbabilityDifference";
56
57 private final DoubleWritable doubleWritable = new DoubleWritable();
58
59 private final LongWritable one = new LongWritable(1);
60
61
62
63
64
65
66 protected double initialProbability() {
67 return 1.0 / getTotalNumVertices();
68 }
69
70
71
72
73
74
75
76
77 protected abstract double transitionProbability(
78 Vertex<LongWritable, DoubleWritable, E> vertex,
79 double stateProbability,
80 Edge<LongWritable, E> edge);
81
82
83
84
85
86
87
88
89
90 protected abstract double recompute(
91 Vertex<LongWritable, DoubleWritable, E> vertex,
92 Iterable<DoubleWritable> messages,
93 double teleportationProbability);
94
95
96
97
98
99 protected double getDanglingProbability() {
100 return this.<DoubleWritable>getAggregatedValue(
101 RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY).get();
102 }
103
104
105
106
107
108 protected double getPreviousCumulativeProbability() {
109 return this.<DoubleWritable>getAggregatedValue(
110 RandomWalkComputation.CUMULATIVE_PROBABILITY).get();
111 }
112
113 @Override
114 public void compute(
115 Vertex<LongWritable, DoubleWritable, E> vertex,
116 Iterable<DoubleWritable> messages) throws IOException {
117 double stateProbability;
118
119 if (getSuperstep() > 0) {
120
121 double previousStateProbability = vertex.getValue().get();
122 stateProbability =
123 recompute(vertex, messages, teleportationProbability());
124
125
126 stateProbability /= getPreviousCumulativeProbability();
127
128 doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
129 aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
130
131 } else {
132 stateProbability = initialProbability();
133 }
134
135 vertex.getValue().set(stateProbability);
136
137 aggregate(CUMULATIVE_PROBABILITY, vertex.getValue());
138
139
140 if (vertex.getNumEdges() == 0) {
141 aggregate(NUM_DANGLING_VERTICES, one);
142 aggregate(CUMULATIVE_DANGLING_PROBABILITY, vertex.getValue());
143 }
144
145 if (getSuperstep() < maxSupersteps()) {
146 for (Edge<LongWritable, E> edge : vertex.getEdges()) {
147 double transitionProbability =
148 transitionProbability(vertex, stateProbability, edge);
149 doubleWritable.set(transitionProbability);
150 sendMessage(edge.getTargetVertexId(), doubleWritable);
151 }
152 } else {
153 vertex.voteToHalt();
154 }
155 }
156
157
158
159
160
161 private int maxSupersteps() {
162 return ((RandomWalkWorkerContext) getWorkerContext()).getMaxSupersteps();
163 }
164
165
166
167
168
169 protected double teleportationProbability() {
170 return ((RandomWalkWorkerContext) getWorkerContext())
171 .getTeleportationProbability();
172 }
173 }