1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.examples.scc;
19
20 import org.apache.giraph.aggregators.BooleanOrAggregator;
21 import org.apache.giraph.aggregators.IntOverwriteAggregator;
22 import org.apache.giraph.master.DefaultMasterCompute;
23 import org.apache.hadoop.io.BooleanWritable;
24 import org.apache.hadoop.io.IntWritable;
25
26
27
28
29
30
31
32
33
34 public class SccPhaseMasterCompute extends DefaultMasterCompute {
35
36
37
38
39 public static final String PHASE = "scccompute.phase";
40
41
42
43
44 public static final String NEW_MAXIMUM = "scccompute.max";
45
46
47
48
49 public static final String CONVERGED = "scccompute.converged";
50
51
52
53
54 public enum Phases {
55
56 TRANSPOSE, TRIMMING,
57
58 FORWARD_TRAVERSAL,
59
60 BACKWARD_TRAVERSAL_START, BACKWARD_TRAVERSAL_REST
61 };
62
63 @Override
64 public void initialize() throws InstantiationException,
65 IllegalAccessException {
66 registerPersistentAggregator(PHASE, IntOverwriteAggregator.class);
67 registerAggregator(NEW_MAXIMUM, BooleanOrAggregator.class);
68 registerAggregator(CONVERGED, BooleanOrAggregator.class);
69 }
70
71 @Override
72 public void compute() {
73 if (getSuperstep() == 0) {
74 setPhase(Phases.TRANSPOSE);
75 } else {
76 Phases currPhase = getPhase();
77 switch (currPhase) {
78 case TRANSPOSE:
79 setPhase(Phases.TRIMMING);
80 break;
81 case TRIMMING :
82 setPhase(Phases.FORWARD_TRAVERSAL);
83 break;
84 case FORWARD_TRAVERSAL :
85 BooleanWritable newMaxFound = getAggregatedValue(NEW_MAXIMUM);
86
87
88 if (!newMaxFound.get()) {
89 setPhase(Phases.BACKWARD_TRAVERSAL_START);
90 }
91 break;
92 case BACKWARD_TRAVERSAL_START :
93 setPhase(Phases.BACKWARD_TRAVERSAL_REST);
94 break;
95 case BACKWARD_TRAVERSAL_REST :
96 BooleanWritable converged = getAggregatedValue(CONVERGED);
97 if (!converged.get()) {
98 setPhase(Phases.TRANSPOSE);
99 }
100 break;
101 default :
102 break;
103 }
104 }
105 }
106
107
108
109
110
111
112 private void setPhase(Phases phase) {
113 setAggregatedValue(PHASE, new IntWritable(phase.ordinal()));
114 }
115
116
117
118
119
120 private Phases getPhase() {
121 IntWritable phaseInt = getAggregatedValue(PHASE);
122 return getPhase(phaseInt);
123 }
124
125
126
127
128
129
130
131
132 public static Phases getPhase(IntWritable phaseInt) {
133 return Phases.values()[phaseInt.get()];
134 }
135
136 }