1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.benchmark; |
20 | |
|
21 | |
import org.apache.commons.cli.CommandLine; |
22 | |
import org.apache.giraph.aggregators.LongSumAggregator; |
23 | |
import org.apache.giraph.graph.BasicComputation; |
24 | |
import org.apache.giraph.conf.GiraphConfiguration; |
25 | |
import org.apache.giraph.conf.GiraphConstants; |
26 | |
import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; |
27 | |
import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat; |
28 | |
import org.apache.giraph.master.DefaultMasterCompute; |
29 | |
import org.apache.giraph.graph.Vertex; |
30 | |
import org.apache.giraph.worker.WorkerContext; |
31 | |
import org.apache.hadoop.io.BytesWritable; |
32 | |
import org.apache.hadoop.io.DoubleWritable; |
33 | |
import org.apache.hadoop.io.LongWritable; |
34 | |
import org.apache.hadoop.util.ToolRunner; |
35 | |
import org.apache.log4j.Logger; |
36 | |
|
37 | |
import com.google.common.collect.Sets; |
38 | |
|
39 | |
import java.io.IOException; |
40 | |
import java.util.Random; |
41 | |
import java.util.Set; |
42 | |
|
43 | |
|
44 | |
|
45 | |
|
46 | 0 | public class RandomMessageBenchmark extends GiraphBenchmark { |
47 | |
|
48 | |
public static final String SUPERSTEP_COUNT = |
49 | |
"giraph.randomMessageBenchmark.superstepCount"; |
50 | |
|
51 | |
public static final String NUM_BYTES_PER_MESSAGE = |
52 | |
"giraph.randomMessageBenchmark.numBytesPerMessage"; |
53 | |
|
54 | |
public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16; |
55 | |
|
56 | |
public static final String NUM_MESSAGES_PER_EDGE = |
57 | |
"giraph.randomMessageBenchmark.numMessagesPerEdge"; |
58 | |
|
59 | |
public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1; |
60 | |
|
61 | |
public static final String AGG_SUPERSTEP_TOTAL_BYTES = |
62 | |
"superstep total bytes sent"; |
63 | |
|
64 | |
public static final String AGG_TOTAL_BYTES = "total bytes sent"; |
65 | |
|
66 | |
public static final String AGG_SUPERSTEP_TOTAL_MESSAGES = |
67 | |
"superstep total messages"; |
68 | |
|
69 | |
public static final String AGG_TOTAL_MESSAGES = "total messages"; |
70 | |
|
71 | |
public static final String AGG_SUPERSTEP_TOTAL_MILLIS = |
72 | |
"superstep total millis"; |
73 | |
|
74 | |
public static final String AGG_TOTAL_MILLIS = "total millis"; |
75 | |
|
76 | |
public static final String WORKERS_NUM = "workers"; |
77 | |
|
78 | |
|
79 | 0 | private static final BenchmarkOption BYTES_PER_MESSAGE = new BenchmarkOption( |
80 | |
"b", "bytes", true, "Message bytes per memssage", |
81 | |
"Need to set the number of message bytes (-b)"); |
82 | |
|
83 | 0 | private static final BenchmarkOption MESSAGES_PER_EDGE = new BenchmarkOption( |
84 | |
"n", "number", true, "Number of messages per edge", |
85 | |
"Need to set the number of messages per edge (-n)"); |
86 | |
|
87 | 0 | private static final BenchmarkOption FLUSH_THREADS = new BenchmarkOption( |
88 | |
"f", "flusher", true, "Number of flush threads"); |
89 | |
|
90 | |
|
91 | 0 | private static final Logger LOG = |
92 | 0 | Logger.getLogger(RandomMessageBenchmarkWorkerContext.class); |
93 | |
|
94 | |
|
95 | |
|
96 | |
|
97 | 0 | public static class RandomMessageBenchmarkWorkerContext extends |
98 | |
WorkerContext { |
99 | |
|
100 | 0 | private static final Logger LOG = |
101 | 0 | Logger.getLogger(RandomMessageBenchmarkWorkerContext.class); |
102 | |
|
103 | |
private byte[] messageBytes; |
104 | |
|
105 | 0 | private int numMessagesPerEdge = -1; |
106 | |
|
107 | 0 | private int numSupersteps = -1; |
108 | |
|
109 | 0 | private final Random random = new Random(System.currentTimeMillis()); |
110 | |
|
111 | 0 | private long startSuperstepMillis = 0; |
112 | |
|
113 | 0 | private long totalBytes = 0; |
114 | |
|
115 | 0 | private long totalMessages = 0; |
116 | |
|
117 | 0 | private long totalMillis = 0; |
118 | |
|
119 | |
@Override |
120 | |
public void preApplication() |
121 | |
throws InstantiationException, IllegalAccessException { |
122 | 0 | messageBytes = |
123 | 0 | new byte[getContext().getConfiguration(). |
124 | 0 | getInt(NUM_BYTES_PER_MESSAGE, |
125 | |
DEFAULT_NUM_BYTES_PER_MESSAGE)]; |
126 | 0 | numMessagesPerEdge = |
127 | 0 | getContext().getConfiguration(). |
128 | 0 | getInt(NUM_MESSAGES_PER_EDGE, |
129 | |
DEFAULT_NUM_MESSAGES_PER_EDGE); |
130 | 0 | numSupersteps = getContext().getConfiguration(). |
131 | 0 | getInt(SUPERSTEP_COUNT, -1); |
132 | 0 | } |
133 | |
|
134 | |
@Override |
135 | |
public void preSuperstep() { |
136 | 0 | long superstepBytes = this.<LongWritable> |
137 | 0 | getAggregatedValue(AGG_SUPERSTEP_TOTAL_BYTES).get(); |
138 | 0 | long superstepMessages = this.<LongWritable> |
139 | 0 | getAggregatedValue(AGG_SUPERSTEP_TOTAL_MESSAGES).get(); |
140 | 0 | long superstepMillis = this.<LongWritable> |
141 | 0 | getAggregatedValue(AGG_SUPERSTEP_TOTAL_MILLIS).get(); |
142 | 0 | long workers = this.<LongWritable>getAggregatedValue(WORKERS_NUM).get(); |
143 | |
|
144 | |
|
145 | |
|
146 | |
|
147 | |
|
148 | 0 | if (getSuperstep() == 0) { |
149 | 0 | startSuperstepMillis = System.currentTimeMillis(); |
150 | |
} else { |
151 | 0 | totalBytes += superstepBytes; |
152 | 0 | totalMessages += superstepMessages; |
153 | 0 | totalMillis += superstepMillis; |
154 | 0 | double superstepMegabytesPerSecond = |
155 | |
superstepBytes * workers * 1000d / 1024d / 1024d / superstepMillis; |
156 | 0 | double megabytesPerSecond = totalBytes * |
157 | |
workers * 1000d / 1024d / 1024d / totalMillis; |
158 | 0 | double superstepMessagesPerSecond = |
159 | |
superstepMessages * workers * 1000d / superstepMillis; |
160 | 0 | double messagesPerSecond = |
161 | |
totalMessages * workers * 1000d / totalMillis; |
162 | 0 | if (LOG.isInfoEnabled()) { |
163 | 0 | LOG.info("Outputing statistics for superstep " + getSuperstep()); |
164 | 0 | LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " + superstepBytes); |
165 | 0 | LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes); |
166 | 0 | LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " + superstepMessages); |
167 | 0 | LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages); |
168 | 0 | LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " + superstepMillis); |
169 | 0 | LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis); |
170 | 0 | LOG.info(WORKERS_NUM + " : " + workers); |
171 | 0 | LOG.info("Superstep megabytes / second = " + |
172 | |
superstepMegabytesPerSecond); |
173 | 0 | LOG.info("Total megabytes / second = " + |
174 | |
megabytesPerSecond); |
175 | 0 | LOG.info("Superstep messages / second = " + |
176 | |
superstepMessagesPerSecond); |
177 | 0 | LOG.info("Total messages / second = " + |
178 | |
messagesPerSecond); |
179 | 0 | LOG.info("Superstep megabytes / second / worker = " + |
180 | |
superstepMegabytesPerSecond / workers); |
181 | 0 | LOG.info("Total megabytes / second / worker = " + |
182 | |
megabytesPerSecond / workers); |
183 | 0 | LOG.info("Superstep messages / second / worker = " + |
184 | |
superstepMessagesPerSecond / workers); |
185 | 0 | LOG.info("Total messages / second / worker = " + |
186 | |
messagesPerSecond / workers); |
187 | |
} |
188 | |
} |
189 | |
|
190 | 0 | aggregate(WORKERS_NUM, new LongWritable(1)); |
191 | 0 | } |
192 | |
|
193 | |
@Override |
194 | |
public void postSuperstep() { |
195 | 0 | long endSuperstepMillis = System.currentTimeMillis(); |
196 | 0 | long superstepMillis = endSuperstepMillis - startSuperstepMillis; |
197 | 0 | startSuperstepMillis = endSuperstepMillis; |
198 | 0 | aggregate(AGG_SUPERSTEP_TOTAL_MILLIS, new LongWritable(superstepMillis)); |
199 | 0 | } |
200 | |
|
201 | |
@Override |
202 | 0 | public void postApplication() { } |
203 | |
|
204 | |
|
205 | |
|
206 | |
|
207 | |
|
208 | |
|
209 | |
public byte[] getMessageBytes() { |
210 | 0 | return messageBytes; |
211 | |
} |
212 | |
|
213 | |
|
214 | |
|
215 | |
|
216 | |
|
217 | |
|
218 | |
public int getNumMessagePerEdge() { |
219 | 0 | return numMessagesPerEdge; |
220 | |
} |
221 | |
|
222 | |
|
223 | |
|
224 | |
|
225 | |
|
226 | |
|
227 | |
public int getNumSupersteps() { |
228 | 0 | return numSupersteps; |
229 | |
} |
230 | |
|
231 | |
|
232 | |
|
233 | |
|
234 | |
public void randomizeMessageBytes() { |
235 | 0 | random.nextBytes(messageBytes); |
236 | 0 | } |
237 | |
} |
238 | |
|
239 | |
|
240 | |
|
241 | |
|
242 | |
|
243 | 0 | public static class RandomMessageBenchmarkMasterCompute extends |
244 | |
DefaultMasterCompute { |
245 | |
@Override |
246 | |
public void initialize() throws InstantiationException, |
247 | |
IllegalAccessException { |
248 | 0 | registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES, |
249 | |
LongSumAggregator.class); |
250 | 0 | registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES, |
251 | |
LongSumAggregator.class); |
252 | 0 | registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS, |
253 | |
LongSumAggregator.class); |
254 | 0 | registerAggregator(WORKERS_NUM, |
255 | |
LongSumAggregator.class); |
256 | 0 | } |
257 | |
} |
258 | |
|
259 | |
|
260 | |
|
261 | |
|
262 | 0 | public static class RandomMessageComputation extends BasicComputation< |
263 | |
LongWritable, DoubleWritable, DoubleWritable, BytesWritable> { |
264 | |
@Override |
265 | |
public void compute( |
266 | |
Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex, |
267 | |
Iterable<BytesWritable> messages) throws IOException { |
268 | 0 | RandomMessageBenchmarkWorkerContext workerContext = getWorkerContext(); |
269 | 0 | if (getSuperstep() < workerContext.getNumSupersteps()) { |
270 | 0 | for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) { |
271 | 0 | workerContext.randomizeMessageBytes(); |
272 | 0 | sendMessageToAllEdges(vertex, |
273 | 0 | new BytesWritable(workerContext.getMessageBytes())); |
274 | 0 | long bytesSent = workerContext.getMessageBytes().length * |
275 | 0 | vertex.getNumEdges(); |
276 | 0 | aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent)); |
277 | 0 | aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES, |
278 | 0 | new LongWritable(vertex.getNumEdges())); |
279 | |
} |
280 | |
} else { |
281 | 0 | vertex.voteToHalt(); |
282 | |
} |
283 | 0 | } |
284 | |
} |
285 | |
|
286 | |
@Override |
287 | |
public Set<BenchmarkOption> getBenchmarkOptions() { |
288 | 0 | return Sets.newHashSet(BenchmarkOption.SUPERSTEPS, |
289 | |
BenchmarkOption.VERTICES, BenchmarkOption.EDGES_PER_VERTEX, |
290 | |
BYTES_PER_MESSAGE, MESSAGES_PER_EDGE, FLUSH_THREADS); |
291 | |
} |
292 | |
|
293 | |
@Override |
294 | |
protected void prepareConfiguration(GiraphConfiguration conf, |
295 | |
CommandLine cmd) { |
296 | 0 | conf.setComputationClass(RandomMessageComputation.class); |
297 | 0 | conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class); |
298 | 0 | conf.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class); |
299 | 0 | conf.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class); |
300 | 0 | conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, |
301 | 0 | BenchmarkOption.VERTICES.getOptionLongValue(cmd)); |
302 | 0 | conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, |
303 | 0 | BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd)); |
304 | 0 | conf.setInt(SUPERSTEP_COUNT, |
305 | 0 | BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd)); |
306 | 0 | conf.setInt(RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE, |
307 | 0 | BYTES_PER_MESSAGE.getOptionIntValue(cmd)); |
308 | 0 | conf.setInt(RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE, |
309 | 0 | MESSAGES_PER_EDGE.getOptionIntValue(cmd)); |
310 | 0 | if (FLUSH_THREADS.optionTurnedOn(cmd)) { |
311 | 0 | conf.setInt(GiraphConstants.MSG_NUM_FLUSH_THREADS, |
312 | 0 | FLUSH_THREADS.getOptionIntValue(cmd)); |
313 | |
} |
314 | 0 | } |
315 | |
|
316 | |
|
317 | |
|
318 | |
|
319 | |
|
320 | |
|
321 | |
|
322 | |
public static void main(String[] args) throws Exception { |
323 | 0 | System.exit(ToolRunner.run(new RandomMessageBenchmark(), args)); |
324 | 0 | } |
325 | |
} |