1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.piece;
19
20 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
21 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
22 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
23 import org.apache.giraph.block_app.framework.piece.global_comm.ReduceUtilsObject;
24 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
25 import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper;
26 import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler;
27 import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
28 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
29 import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses;
30 import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf;
31 import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.DefaultMessageFactorySupplierFromConf;
32 import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.SupplierFromConfByCopy;
33 import org.apache.giraph.combiner.MessageCombiner;
34 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
35 import org.apache.giraph.conf.EnumConfOption;
36 import org.apache.giraph.conf.GiraphConfigurationSettable;
37 import org.apache.giraph.conf.GiraphConstants;
38 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
39 import org.apache.giraph.conf.MessageClasses;
40 import org.apache.giraph.factories.MessageValueFactory;
41 import org.apache.giraph.graph.Vertex;
42 import org.apache.giraph.types.NoMessage;
43 import org.apache.hadoop.io.DoubleWritable;
44 import org.apache.hadoop.io.FloatWritable;
45 import org.apache.hadoop.io.IntWritable;
46 import org.apache.hadoop.io.LongWritable;
47 import org.apache.hadoop.io.Writable;
48 import org.apache.hadoop.io.WritableComparable;
49
50 import com.google.common.base.Preconditions;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 @SuppressWarnings({ "rawtypes", "unchecked" })
68 public abstract class DefaultParentPiece<I extends WritableComparable,
69 V extends Writable, E extends Writable, M extends Writable, WV,
70 WM extends Writable, S> extends AbstractPiece<I, V, E, M, WV, WM, S> {
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 public static final EnumConfOption<MessageEncodeAndStoreType>
94 MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE =
95 EnumConfOption.create("giraph.messageEncodeAndStoreTypeMinForce",
96 MessageEncodeAndStoreType.class,
97 MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION,
98 "Select the message_encode_and_store_type min force to use");
99
100 private final ReduceUtilsObject reduceUtils = new ReduceUtilsObject();
101 private ReducersForPieceHandler reducersHandler;
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 public void registerReducers(CreateReducersApi reduceApi, S executionStage) {
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 public VertexSender<I, V, E> getVertexSender(
134 BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
135 return null;
136 }
137
138
139
140
141
142
143
144 protected Class<M> getMessageClass() {
145 return null;
146 }
147
148
149
150
151
152
153
154
155 protected MessageValueFactory<M> getMessageFactory(
156 ImmutableClassesGiraphConfiguration conf) {
157 return null;
158 }
159
160
161
162
163
164
165
166 protected MessageCombiner<? super I, M> getMessageCombiner(
167 ImmutableClassesGiraphConfiguration conf) {
168 return null;
169 }
170
171
172
173
174
175
176
177 protected boolean allowOneMessageToManyIdsEncoding() {
178 return false;
179 }
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195 protected boolean receiveIgnoreExistingVertices() {
196 return false;
197 }
198
199 @Override
200 public MessageClasses<I, M> getMessageClasses(
201 ImmutableClassesGiraphConfiguration conf) {
202 Class<M> messageClass = null;
203 MessageValueFactory<M> messageFactory = getMessageFactory(conf);
204 MessageCombiner<? super I, M> messageCombiner = getMessageCombiner(conf);
205
206 if (messageFactory != null) {
207 messageClass = (Class) messageFactory.newInstance().getClass();
208 } else if (messageCombiner != null) {
209 messageClass = (Class) messageCombiner.createInitialMessage().getClass();
210 }
211
212 if (messageClass != null) {
213 Preconditions.checkState(getMessageClass() == null,
214 "Piece %s defines getMessageFactory or getMessageCombiner, " +
215 "so it doesn't need to define getMessageClass.",
216 toString());
217 } else {
218 messageClass = getMessageClass();
219 if (messageClass == null) {
220 messageClass = (Class) NoMessage.class;
221 }
222 }
223
224 SupplierFromConf<MessageValueFactory<M>> messageFactorySupplier;
225 if (messageFactory != null) {
226 messageFactorySupplier =
227 new SupplierFromConfByCopy<MessageValueFactory<M>>(messageFactory);
228 } else {
229 messageFactorySupplier =
230 new DefaultMessageFactorySupplierFromConf<>(messageClass);
231 }
232
233 SupplierFromConf<? extends MessageCombiner<? super I, M>>
234 messageCombinerSupplier;
235 if (messageCombiner != null) {
236 messageCombinerSupplier = new SupplierFromConfByCopy<>(messageCombiner);
237 } else {
238 messageCombinerSupplier = null;
239 }
240
241 int maxAllowed =
242 GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf).ordinal();
243 int minForce = MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE.get(conf).ordinal();
244 Preconditions.checkState(maxAllowed >= minForce);
245
246 int pieceEncodeType = (allowOneMessageToManyIdsEncoding() ?
247 MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX :
248 MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION).ordinal();
249
250 pieceEncodeType = Math.max(minForce, Math.min(maxAllowed, pieceEncodeType));
251
252 MessageEncodeAndStoreType messageEncodeAndStoreType =
253 MessageEncodeAndStoreType.values()[pieceEncodeType];
254
255 if (messageFactory instanceof GiraphConfigurationSettable) {
256 throw new IllegalStateException(
257 messageFactory.getClass() + " MessageFactory in " + this +
258 " Piece implements GiraphConfigurationSettable");
259 }
260 if (messageCombiner instanceof GiraphConfigurationSettable) {
261 throw new IllegalStateException(
262 messageCombiner.getClass() + " MessageCombiner in " + this +
263 " Piece implements GiraphConfigurationSettable");
264 }
265
266 return new ObjectMessageClasses<>(
267 messageClass, messageFactorySupplier,
268 messageCombinerSupplier, messageEncodeAndStoreType,
269 receiveIgnoreExistingVertices());
270 }
271
272
273
274 @Override
275 public final InnerVertexSender getWrappedVertexSender(
276 final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
277 reducersHandler.vertexSenderWorkerPreprocess(workerApi);
278 final VertexSender<I, V, E> functions =
279 getVertexSender(workerApi, executionStage);
280 return new InnerVertexSender() {
281 @Override
282 public void vertexSend(Vertex<I, V, E> vertex) {
283 if (functions != null) {
284 functions.vertexSend(vertex);
285 }
286 }
287 @Override
288 public void postprocess() {
289 if (functions instanceof VertexPostprocessor) {
290 ((VertexPostprocessor) functions).postprocess();
291 }
292 reducersHandler.vertexSenderWorkerPostprocess(workerApi);
293 }
294 };
295 }
296
297 @Override
298 public final void wrappedRegisterReducers(
299 BlockMasterApi masterApi, S executionStage) {
300 reducersHandler = new ReducersForPieceHandler();
301 registerReducers(new CreateReducersApiWrapper(
302 masterApi, reducersHandler), executionStage);
303 }
304
305
306
307 protected final void reduceDouble(
308 ReducerHandle<DoubleWritable, ?> reduceHandle, double value) {
309 reduceUtils.reduceDouble(reduceHandle, value);
310 }
311
312 protected final void reduceFloat(
313 ReducerHandle<FloatWritable, ?> reduceHandle, float value) {
314 reduceUtils.reduceFloat(reduceHandle, value);
315 }
316
317 protected final void reduceLong(
318 ReducerHandle<LongWritable, ?> reduceHandle, long value) {
319 reduceUtils.reduceLong(reduceHandle, value);
320 }
321
322 protected final void reduceInt(
323 ReducerHandle<IntWritable, ?> reduceHandle, int value) {
324 reduceUtils.reduceInt(reduceHandle, value);
325 }
326 }