1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples.scc;
20
21 import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.PHASE;
22 import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.NEW_MAXIMUM;
23 import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.CONVERGED;
24
25 import java.io.IOException;
26
27 import org.apache.giraph.Algorithm;
28 import org.apache.giraph.examples.scc.SccPhaseMasterCompute.Phases;
29 import org.apache.giraph.graph.BasicComputation;
30 import org.apache.giraph.graph.Vertex;
31 import org.apache.hadoop.io.BooleanWritable;
32 import org.apache.hadoop.io.IntWritable;
33 import org.apache.hadoop.io.LongWritable;
34 import org.apache.hadoop.io.NullWritable;
35
36
37
38
39 @Algorithm(name = "Strongly Connected Components",
40 description = "Finds strongly connected components of the graph")
41 public class SccComputation extends
42 BasicComputation<LongWritable, SccVertexValue, NullWritable, LongWritable> {
43
44
45
46
47 private Phases currPhase;
48
49
50
51
52
53 private LongWritable messageValue = new LongWritable();
54
55
56
57
58 private LongWritable parentId = new LongWritable();
59
60 @Override
61 public void preSuperstep() {
62 IntWritable phaseInt = getAggregatedValue(PHASE);
63 currPhase = SccPhaseMasterCompute.getPhase(phaseInt);
64 }
65
66 @Override
67 public void compute(
68 Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
69 Iterable<LongWritable> messages) throws IOException {
70
71 SccVertexValue vertexValue = vertex.getValue();
72
73 if (!vertexValue.isActive()) {
74 vertex.voteToHalt();
75 return;
76 }
77
78 switch (currPhase) {
79 case TRANSPOSE :
80 vertexValue.clearParents();
81 sendMessageToAllEdges(vertex, vertex.getId());
82 break;
83 case TRIMMING :
84 trim(vertex, messages);
85 break;
86 case FORWARD_TRAVERSAL :
87 forwardTraversal(vertex, messages);
88 break;
89 case BACKWARD_TRAVERSAL_START :
90 backwardTraversalStart(vertex);
91 break;
92 case BACKWARD_TRAVERSAL_REST :
93 backwardTraversalRest(vertex, messages);
94 break;
95 default :
96 break;
97 }
98
99 }
100
101
102
103
104
105
106
107
108 private void trim(Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
109 Iterable<LongWritable> messages) {
110 SccVertexValue vertexValue = vertex.getValue();
111
112 for (LongWritable parent : messages) {
113 vertexValue.addParent(parent.get());
114 }
115
116
117 vertexValue.set(vertex.getId().get());
118 if (vertex.getNumEdges() == 0 || vertexValue.getParents() == null) {
119 vertexValue.deactivate();
120 } else {
121 messageValue.set(vertexValue.get());
122 sendMessageToAllEdges(vertex, messageValue);
123 }
124 }
125
126
127
128
129
130
131
132
133 private void forwardTraversal(
134 Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
135 Iterable<LongWritable> messages) {
136 SccVertexValue vertexValue = vertex.getValue();
137 boolean changed = setMaxValue(vertexValue, messages);
138 if (changed) {
139 messageValue.set(vertexValue.get());
140 sendMessageToAllEdges(vertex, messageValue);
141 aggregate(NEW_MAXIMUM, new BooleanWritable(true));
142 }
143 }
144
145
146
147
148
149 private void backwardTraversalStart(
150 Vertex<LongWritable, SccVertexValue, NullWritable> vertex) {
151 SccVertexValue vertexValue = vertex.getValue();
152 if (vertexValue.get() == vertex.getId().get()) {
153 messageValue.set(vertexValue.get());
154 sendMessageToAllParents(vertex, messageValue);
155 }
156 }
157
158
159
160
161
162
163 private void backwardTraversalRest(
164 Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
165 Iterable<LongWritable> messages) {
166 SccVertexValue vertexValue = vertex.getValue();
167 for (LongWritable m : messages) {
168 if (vertexValue.get() == m.get()) {
169 sendMessageToAllParents(vertex, m);
170 aggregate(CONVERGED, new BooleanWritable(true));
171 vertexValue.deactivate();
172 vertex.voteToHalt();
173 break;
174 }
175 }
176 }
177
178
179
180
181
182
183
184
185
186
187 private boolean setMaxValue(SccVertexValue vertexValue,
188 Iterable<LongWritable> messages) {
189 boolean changed = false;
190 for (LongWritable m : messages) {
191 if (vertexValue.get() < m.get()) {
192 vertexValue.set(m.get());
193 changed = true;
194 }
195 }
196 return changed;
197 }
198
199
200
201
202
203
204
205 private void sendMessageToAllParents(
206 Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
207 LongWritable message) {
208 for (Long id : vertex.getValue().getParents()) {
209 parentId.set(id);
210 sendMessage(parentId, message);
211 }
212 }
213 }