1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.master;
20
21 import java.util.List;
22
23 import org.apache.giraph.aggregators.Aggregator;
24 import org.apache.giraph.bsp.CentralizedServiceMaster;
25 import org.apache.giraph.combiner.MessageCombiner;
26 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
27 import org.apache.giraph.conf.MessageClasses;
28 import org.apache.giraph.graph.Computation;
29 import org.apache.giraph.graph.GraphState;
30 import org.apache.giraph.reducers.ReduceOperation;
31 import org.apache.giraph.worker.WorkerInfo;
32 import org.apache.hadoop.io.Writable;
33 import org.apache.hadoop.io.WritableComparable;
34 import org.apache.hadoop.mapreduce.Mapper;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public abstract class MasterCompute
50 extends DefaultImmutableClassesGiraphConfigurable
51 implements MasterAggregatorUsage, MasterGlobalCommUsage, Writable {
52
53 private boolean halt = false;
54
55 private CentralizedServiceMaster serviceMaster;
56
57 private GraphState graphState;
58
59
60
61
62 private SuperstepClasses superstepClasses;
63
64
65
66
67 public abstract void compute();
68
69
70
71
72
73
74
75
76 public abstract void initialize() throws InstantiationException,
77 IllegalAccessException;
78
79
80
81
82
83
84 public final long getSuperstep() {
85 return graphState.getSuperstep();
86 }
87
88
89
90
91
92
93
94 public final long getTotalNumVertices() {
95 return graphState.getTotalNumVertices();
96 }
97
98
99
100
101
102
103
104 public final long getTotalNumEdges() {
105 return graphState.getTotalNumEdges();
106 }
107
108
109
110
111
112 public final void haltComputation() {
113 halt = true;
114 }
115
116
117
118
119
120
121 public final boolean isHalted() {
122 return halt;
123 }
124
125
126
127
128
129
130 public final Mapper.Context getContext() {
131 return graphState.getContext();
132 }
133
134
135
136
137
138
139 public final List<WorkerInfo> getWorkerInfoList() {
140 return serviceMaster.getWorkerInfoList();
141 }
142
143
144
145
146
147
148 public final void setComputation(
149 Class<? extends Computation> computationClass) {
150 superstepClasses.setComputationClass(computationClass);
151 }
152
153
154
155
156
157
158 public final Class<? extends Computation> getComputation() {
159
160 if (superstepClasses == null) {
161 return null;
162 }
163
164 return superstepClasses.getComputationClass();
165 }
166
167
168
169
170
171
172 public final void setMessageCombiner(
173 Class<? extends MessageCombiner> combinerClass) {
174 superstepClasses.setMessageCombinerClass(combinerClass);
175 }
176
177
178
179
180
181
182 public final Class<? extends MessageCombiner> getMessageCombiner() {
183
184 if (superstepClasses == null) {
185 return null;
186 }
187
188 return superstepClasses.getMessageCombinerClass();
189 }
190
191
192
193
194
195
196 @Deprecated
197 public final void setIncomingMessage(
198 Class<? extends Writable> incomingMessageClass) {
199 superstepClasses.setIncomingMessageClass(incomingMessageClass);
200 }
201
202
203
204
205
206
207 public final void setOutgoingMessage(
208 Class<? extends Writable> outgoingMessageClass) {
209 superstepClasses.setOutgoingMessageClass(outgoingMessageClass);
210 }
211
212
213
214
215
216
217 public void setOutgoingMessageClasses(
218 MessageClasses<? extends WritableComparable, ? extends Writable>
219 outgoingMessageClasses) {
220 superstepClasses.setOutgoingMessageClasses(outgoingMessageClasses);
221 }
222
223 @Override
224 public final <S, R extends Writable> void registerReducer(
225 String name, ReduceOperation<S, R> reduceOp) {
226 serviceMaster.getGlobalCommHandler().registerReducer(name, reduceOp);
227 }
228
229 @Override
230 public final <S, R extends Writable> void registerReducer(
231 String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
232 serviceMaster.getGlobalCommHandler().registerReducer(
233 name, reduceOp, globalInitialValue);
234 }
235
236 @Override
237 public final <T extends Writable> T getReduced(String name) {
238 return serviceMaster.getGlobalCommHandler().getReduced(name);
239 }
240
241 @Override
242 public final void broadcast(String name, Writable object) {
243 serviceMaster.getGlobalCommHandler().broadcast(name, object);
244 }
245
246 @Override
247 public final <A extends Writable> boolean registerAggregator(
248 String name, Class<? extends Aggregator<A>> aggregatorClass)
249 throws InstantiationException, IllegalAccessException {
250 return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
251 name, aggregatorClass);
252 }
253
254 @Override
255 public final <A extends Writable> boolean registerPersistentAggregator(
256 String name,
257 Class<? extends Aggregator<A>> aggregatorClass) throws
258 InstantiationException, IllegalAccessException {
259 return serviceMaster.getAggregatorTranslationHandler()
260 .registerPersistentAggregator(name, aggregatorClass);
261 }
262
263 @Override
264 public final <A extends Writable> A getAggregatedValue(String name) {
265 return serviceMaster.getAggregatorTranslationHandler()
266 .<A>getAggregatedValue(name);
267 }
268
269 @Override
270 public final <A extends Writable> void setAggregatedValue(
271 String name, A value) {
272 serviceMaster.getAggregatorTranslationHandler()
273 .setAggregatedValue(name, value);
274 }
275
276
277
278
279
280
281
282 public void logToCommandLine(String line) {
283 serviceMaster.getJobProgressTracker().logInfo(line);
284 }
285
286 public final void setGraphState(GraphState graphState) {
287 this.graphState = graphState;
288 }
289
290 public final void setMasterService(CentralizedServiceMaster serviceMaster) {
291 this.serviceMaster = serviceMaster;
292 }
293
294 public final void setSuperstepClasses(SuperstepClasses superstepClasses) {
295 this.superstepClasses = superstepClasses;
296 }
297 }