1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework;
19
20 import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
21
22 import java.lang.reflect.Field;
23 import java.lang.reflect.Modifier;
24
25 import org.apache.giraph.block_app.framework.api.giraph.BlockComputation;
26 import org.apache.giraph.block_app.framework.api.giraph.BlockMasterCompute;
27 import org.apache.giraph.block_app.framework.api.giraph.BlockWorkerContext;
28 import org.apache.giraph.block_app.framework.block.Block;
29 import org.apache.giraph.block_app.framework.block.PieceCount;
30 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
31 import org.apache.giraph.block_app.framework.piece.Piece;
32 import org.apache.giraph.conf.BooleanConfOption;
33 import org.apache.giraph.conf.ClassConfOption;
34 import org.apache.giraph.conf.GiraphConfiguration;
35 import org.apache.giraph.conf.GiraphConstants;
36 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
37 import org.apache.giraph.conf.MessageClasses;
38 import org.apache.giraph.function.Consumer;
39 import org.apache.giraph.types.NoMessage;
40 import org.apache.giraph.utils.ReflectionUtils;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.log4j.Logger;
43
44 import com.google.common.base.Preconditions;
45
46
47
48
49 @SuppressWarnings({ "rawtypes", "unchecked" })
50 public class BlockUtils {
51
52 public static final ClassConfOption<BlockFactory> BLOCK_FACTORY_CLASS =
53 ClassConfOption.create("digraph.block_factory", null, BlockFactory.class,
54 "block factory describing giraph job");
55
56
57 public static final ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS =
58 ClassConfOption.create(
59 "digraph.block_worker_context_value_class",
60 Object.class, Object.class,
61 "block worker context value class");
62
63
64 public static final
65 BooleanConfOption LOG_EXECUTION_STATUS = new BooleanConfOption(
66 "giraph.block_utils.log_execution_status", true,
67 "Log execution status (of which pieces are being executed, etc)");
68
69 private static final Logger LOG = Logger.getLogger(BlockUtils.class);
70
71
72 private BlockUtils() { }
73
74
75
76
77 public static <S> BlockFactory<S> createBlockFactory(Configuration conf) {
78 return ReflectionUtils.newInstance(BLOCK_FACTORY_CLASS.get(conf));
79 }
80
81
82
83
84
85 public static void setBlockFactoryClass(Configuration conf,
86 Class<? extends BlockFactory<?>> clazz) {
87 BLOCK_FACTORY_CLASS.set(conf, clazz);
88 }
89
90
91
92
93
94
95 public static void setAndInitBlockFactoryClass(GiraphConfiguration conf,
96 Class<? extends BlockFactory<?>> clazz) {
97 BLOCK_FACTORY_CLASS.set(conf, clazz);
98 initAndCheckConfig(conf);
99 }
100
101
102
103
104
105
106 public static void initAndCheckConfig(GiraphConfiguration conf) {
107 conf.setMasterComputeClass(BlockMasterCompute.class);
108 conf.setComputationClass(BlockComputation.class);
109 conf.setWorkerContextClass(BlockWorkerContext.class);
110
111 Preconditions.checkState(
112 GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null,
113 "Message types should only be specified in Pieces, " +
114 "but outgoing was specified globally");
115 Preconditions.checkState(
116 GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS
117 .isDefaultValue(conf),
118 "Message types should only be specified in Pieces, " +
119 "but factory was specified globally");
120 Preconditions.checkState(
121 GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null,
122 "Message combiner should only be specified in Pieces, " +
123 "but was specified globally");
124
125 BlockFactory<?> blockFactory = createBlockFactory(conf);
126 blockFactory.initConfig(conf);
127
128 Preconditions.checkState(
129 GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null,
130 "Outgoing message type was specified in blockFactory.initConfig");
131 Preconditions.checkState(
132 GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS
133 .isDefaultValue(conf),
134 "Outgoing message factory type was specified in " +
135 "blockFactory.initConfig");
136 Preconditions.checkState(
137 GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null,
138 "Message combiner type was specified in blockFactory.initConfig");
139
140 GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.set(conf, NoMessage.class);
141
142 final ImmutableClassesGiraphConfiguration immConf =
143 new ImmutableClassesGiraphConfiguration<>(conf);
144
145
146
147 Block executionBlock = blockFactory.createBlock(immConf);
148 checkBlockTypes(
149 executionBlock, blockFactory.createExecutionStage(immConf), immConf);
150
151 PieceCount pieceCount = executionBlock.getPieceCount();
152 if (pieceCount.isKnown()) {
153 GiraphConstants.SUPERSTEP_COUNT.set(conf, pieceCount.getCount() + 1);
154 }
155
156
157 Class<?> bfClass = blockFactory.getClass();
158 while (!bfClass.equals(Object.class)) {
159 for (Field field : bfClass.getDeclaredFields()) {
160 if (!Modifier.isStatic(field.getModifiers()) ||
161 !Modifier.isFinal(field.getModifiers())) {
162 throw new IllegalStateException("BlockFactory (" + bfClass +
163 ") cannot have any mutable (non 'static final') fields as a " +
164 "safety measure, as createBlock function is called from a " +
165 "different context then all other functions, use conf argument " +
166 "instead, or make it 'static final'. Field present: " + field);
167 }
168 }
169 bfClass = bfClass.getSuperclass();
170 }
171
172
173 blockFactory.registerOutputs(conf);
174 }
175
176 public static void checkBlockTypes(
177 Block executionBlock, Object executionStage,
178 final ImmutableClassesGiraphConfiguration conf) {
179 LOG.info("Executing application - " + executionBlock);
180
181 final Class<?> vertexIdClass = conf.getVertexIdClass();
182 final Class<?> vertexValueClass = conf.getVertexValueClass();
183 final Class<?> edgeValueClass = conf.getEdgeValueClass();
184 final Class<?> workerContextValueClass =
185 BLOCK_WORKER_CONTEXT_VALUE_CLASS.get(conf);
186 final Class<?> executionStageClass = executionStage.getClass();
187
188
189 executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
190 @Override
191 public void apply(AbstractPiece piece) {
192 if (!piece.getClass().equals(Piece.class)) {
193 Class<?>[] classList = getTypeArguments(
194 AbstractPiece.class, piece.getClass());
195 Preconditions.checkArgument(classList.length == 7);
196
197 ReflectionUtils.verifyTypes(
198 vertexIdClass, classList[0], "vertexId", piece.getClass());
199 ReflectionUtils.verifyTypes(
200 vertexValueClass, classList[1], "vertexValue", piece.getClass());
201 ReflectionUtils.verifyTypes(
202 edgeValueClass, classList[2], "edgeValue", piece.getClass());
203
204 MessageClasses classes = piece.getMessageClasses(conf);
205 Class<?> messageType = classes.getMessageClass();
206 if (messageType == null) {
207 messageType = NoMessage.class;
208 }
209 ReflectionUtils.verifyTypes(
210 messageType, classList[3], "message", piece.getClass());
211
212 ReflectionUtils.verifyTypes(
213 workerContextValueClass, classList[4],
214 "workerContextValue", piece.getClass());
215
216
217 ReflectionUtils.verifyTypes(
218 executionStageClass, classList[6],
219 "executionStage", piece.getClass());
220 }
221 }
222 });
223 }
224 }