1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.io.hcatalog; |
20 | |
|
21 | |
import org.apache.commons.cli.CommandLine; |
22 | |
import org.apache.commons.cli.CommandLineParser; |
23 | |
import org.apache.commons.cli.GnuParser; |
24 | |
import org.apache.commons.cli.HelpFormatter; |
25 | |
import org.apache.commons.cli.Options; |
26 | |
import org.apache.commons.cli.ParseException; |
27 | |
import org.apache.giraph.graph.Computation; |
28 | |
import org.apache.giraph.io.EdgeInputFormat; |
29 | |
import org.apache.giraph.io.VertexInputFormat; |
30 | |
import org.apache.giraph.io.VertexOutputFormat; |
31 | |
import org.apache.giraph.job.GiraphJob; |
32 | |
import org.apache.hadoop.conf.Configuration; |
33 | |
import org.apache.hadoop.hive.conf.HiveConf; |
34 | |
import org.apache.hadoop.util.Tool; |
35 | |
import org.apache.hadoop.util.ToolRunner; |
36 | |
import org.apache.hcatalog.mapreduce.HCatOutputFormat; |
37 | |
import org.apache.hcatalog.mapreduce.InputJobInfo; |
38 | |
import org.apache.hcatalog.mapreduce.OutputJobInfo; |
39 | |
import org.apache.log4j.Logger; |
40 | |
|
41 | |
import com.google.common.collect.Lists; |
42 | |
|
43 | |
import java.io.File; |
44 | |
import java.util.Arrays; |
45 | |
import java.util.Collection; |
46 | |
import java.util.List; |
47 | |
import java.util.Map; |
48 | |
|
49 | |
|
50 | |
|
51 | |
|
52 | |
public class HCatGiraphRunner implements Tool { |
53 | |
|
54 | |
|
55 | |
|
56 | 0 | private static final Logger LOG = Logger.getLogger(HCatGiraphRunner.class); |
57 | |
|
58 | |
|
59 | |
|
60 | |
protected int workers; |
61 | |
|
62 | |
|
63 | |
|
64 | |
protected boolean isVerbose; |
65 | |
|
66 | |
|
67 | |
|
68 | |
protected Map<String, String> outputTablePartitionValues; |
69 | |
|
70 | |
|
71 | |
|
72 | |
protected String dbName; |
73 | |
|
74 | |
|
75 | |
|
76 | |
protected String vertexInputTableName; |
77 | |
|
78 | |
|
79 | |
|
80 | |
protected String vertexInputTableFilterExpr; |
81 | |
|
82 | |
|
83 | |
|
84 | |
protected String edgeInputTableName; |
85 | |
|
86 | |
|
87 | |
|
88 | |
protected String edgeInputTableFilterExpr; |
89 | |
|
90 | |
|
91 | |
|
92 | |
protected String outputTableName; |
93 | |
|
94 | |
private Configuration conf; |
95 | |
|
96 | 0 | private boolean skipOutput = false; |
97 | |
|
98 | |
|
99 | |
|
100 | |
|
101 | |
private Class<? extends Computation> computationClass; |
102 | |
|
103 | |
|
104 | |
|
105 | |
private Class<? extends VertexInputFormat> vertexInputFormatClass; |
106 | |
|
107 | |
|
108 | |
|
109 | |
private Class<? extends EdgeInputFormat> edgeInputFormatClass; |
110 | |
|
111 | |
|
112 | |
|
113 | |
private Class<? extends VertexOutputFormat> vertexOutputFormatClass; |
114 | |
|
115 | |
|
116 | |
|
117 | |
|
118 | |
|
119 | |
|
120 | |
|
121 | |
|
122 | |
|
123 | |
protected HCatGiraphRunner( |
124 | |
Class<? extends Computation> computationClass, |
125 | |
Class<? extends VertexInputFormat> vertexInputFormatClass, |
126 | |
Class<? extends EdgeInputFormat> edgeInputFormatClass, |
127 | 0 | Class<? extends VertexOutputFormat> vertexOutputFormatClass) { |
128 | 0 | this.computationClass = computationClass; |
129 | 0 | this.vertexInputFormatClass = vertexInputFormatClass; |
130 | 0 | this.edgeInputFormatClass = edgeInputFormatClass; |
131 | 0 | this.vertexOutputFormatClass = vertexOutputFormatClass; |
132 | 0 | this.conf = new HiveConf(getClass()); |
133 | 0 | } |
134 | |
|
135 | |
|
136 | |
|
137 | |
|
138 | |
|
139 | |
|
140 | |
public static void main(String[] args) throws Exception { |
141 | 0 | System.exit(ToolRunner.run( |
142 | |
new HCatGiraphRunner(null, null, null, null), args)); |
143 | 0 | } |
144 | |
|
145 | |
@Override |
146 | |
public final int run(String[] args) throws Exception { |
147 | |
|
148 | |
try { |
149 | 0 | processArguments(args); |
150 | 0 | } catch (InterruptedException e) { |
151 | 0 | return 0; |
152 | 0 | } catch (IllegalArgumentException e) { |
153 | 0 | System.err.println(e.getMessage()); |
154 | 0 | return -1; |
155 | 0 | } |
156 | |
|
157 | |
|
158 | 0 | adjustConfigurationForHive(getConf()); |
159 | |
|
160 | |
|
161 | 0 | GiraphJob job = new GiraphJob(getConf(), getClass().getName()); |
162 | 0 | job.getConfiguration().setComputationClass(computationClass); |
163 | |
|
164 | |
|
165 | 0 | if (vertexInputFormatClass != null) { |
166 | 0 | InputJobInfo vertexInputJobInfo = InputJobInfo.create(dbName, |
167 | |
vertexInputTableName, vertexInputTableFilterExpr); |
168 | 0 | GiraphHCatInputFormat.setVertexInput(job.getInternalJob(), |
169 | |
vertexInputJobInfo); |
170 | 0 | job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass); |
171 | |
} |
172 | 0 | if (edgeInputFormatClass != null) { |
173 | 0 | InputJobInfo edgeInputJobInfo = InputJobInfo.create(dbName, |
174 | |
edgeInputTableName, edgeInputTableFilterExpr); |
175 | 0 | GiraphHCatInputFormat.setEdgeInput(job.getInternalJob(), |
176 | |
edgeInputJobInfo); |
177 | 0 | job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass); |
178 | |
} |
179 | |
|
180 | |
|
181 | 0 | HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create( |
182 | |
dbName, outputTableName, outputTablePartitionValues)); |
183 | 0 | HCatOutputFormat.setSchema(job.getInternalJob(), |
184 | 0 | HCatOutputFormat.getTableSchema(job.getInternalJob())); |
185 | 0 | if (skipOutput) { |
186 | 0 | LOG.warn("run: Warning - Output will be skipped!"); |
187 | |
} else { |
188 | 0 | job.getConfiguration().setVertexOutputFormatClass( |
189 | |
vertexOutputFormatClass); |
190 | |
} |
191 | |
|
192 | 0 | job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f); |
193 | 0 | initGiraphJob(job); |
194 | |
|
195 | 0 | return job.run(isVerbose) ? 0 : -1; |
196 | |
} |
197 | |
|
198 | |
|
199 | |
|
200 | |
|
201 | |
|
202 | |
private static void adjustConfigurationForHive(Configuration conf) { |
203 | |
|
204 | |
|
205 | |
|
206 | 0 | addToStringCollection(conf, "tmpfiles", conf.getClassLoader() |
207 | 0 | .getResource("hive-site.xml").toString()); |
208 | |
|
209 | |
|
210 | |
|
211 | |
|
212 | |
|
213 | |
|
214 | |
|
215 | 0 | String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split( |
216 | |
File.pathSeparator); |
217 | 0 | List<String> hadoopJarURLs = Lists.newArrayList(); |
218 | 0 | for (String jarPath : hadoopJars) { |
219 | 0 | File file = new File(jarPath); |
220 | 0 | if (file.exists() && file.isFile()) { |
221 | 0 | String jarURL = file.toURI().toString(); |
222 | 0 | hadoopJarURLs.add(jarURL); |
223 | |
} |
224 | |
} |
225 | 0 | addToStringCollection(conf, "tmpjars", hadoopJarURLs); |
226 | 0 | } |
227 | |
|
228 | |
|
229 | |
|
230 | |
|
231 | |
|
232 | |
|
233 | |
|
234 | |
|
235 | |
private CommandLine processArguments(String[] args) throws ParseException, |
236 | |
InterruptedException { |
237 | 0 | Options options = new Options(); |
238 | 0 | options.addOption("h", "help", false, "Help"); |
239 | 0 | options.addOption("v", "verbose", false, "Verbose"); |
240 | 0 | options.addOption("D", "hiveconf", true, |
241 | |
"property=value for Hive/Hadoop configuration"); |
242 | 0 | options.addOption("w", "workers", true, "Number of workers"); |
243 | 0 | if (computationClass == null) { |
244 | 0 | options.addOption(null, "computationClass", true, |
245 | |
"Giraph Computation class to use"); |
246 | |
} |
247 | 0 | if (vertexInputFormatClass == null) { |
248 | 0 | options.addOption(null, "vertexInputFormatClass", true, |
249 | |
"Giraph HCatalogVertexInputFormat class to use"); |
250 | |
} |
251 | 0 | if (edgeInputFormatClass == null) { |
252 | 0 | options.addOption(null, "edgeInputFormatClass", true, |
253 | |
"Giraph HCatalogEdgeInputFormat class to use"); |
254 | |
} |
255 | |
|
256 | 0 | if (vertexOutputFormatClass == null) { |
257 | 0 | options.addOption(null, "vertexOutputFormatClass", true, |
258 | |
"Giraph HCatalogVertexOutputFormat class to use"); |
259 | |
} |
260 | |
|
261 | 0 | options.addOption("db", "dbName", true, "Hive database name"); |
262 | 0 | options.addOption("vi", "vertexInputTable", true, |
263 | |
"Vertex input table name"); |
264 | 0 | options.addOption("VI", "vertexInputFilter", true, |
265 | |
"Vertex input table filter expression (e.g., \"a<2 AND b='two'\""); |
266 | 0 | options.addOption("ei", "edgeInputTable", true, |
267 | |
"Edge input table name"); |
268 | 0 | options.addOption("EI", "edgeInputFilter", true, |
269 | |
"Edge input table filter expression (e.g., \"a<2 AND b='two'\""); |
270 | 0 | options.addOption("o", "outputTable", true, "Output table name"); |
271 | 0 | options.addOption("O", "outputPartition", true, |
272 | |
"Output table partition values (e.g., \"a=1,b=two\")"); |
273 | 0 | options.addOption("s", "skipOutput", false, "Skip output?"); |
274 | |
|
275 | 0 | addMoreOptions(options); |
276 | |
|
277 | 0 | CommandLineParser parser = new GnuParser(); |
278 | 0 | final CommandLine cmdln = parser.parse(options, args); |
279 | 0 | if (args.length == 0 || cmdln.hasOption("help")) { |
280 | 0 | new HelpFormatter().printHelp(getClass().getName(), options, true); |
281 | 0 | throw new InterruptedException(); |
282 | |
} |
283 | |
|
284 | |
|
285 | 0 | if (cmdln.hasOption("computationClass")) { |
286 | 0 | computationClass = findClass(cmdln.getOptionValue("computationClass"), |
287 | |
Computation.class); |
288 | |
} |
289 | 0 | if (cmdln.hasOption("vertexInputFormatClass")) { |
290 | 0 | vertexInputFormatClass = findClass( |
291 | 0 | cmdln.getOptionValue("vertexInputFormatClass"), |
292 | |
HCatalogVertexInputFormat.class); |
293 | |
} |
294 | 0 | if (cmdln.hasOption("edgeInputFormatClass")) { |
295 | 0 | edgeInputFormatClass = findClass( |
296 | 0 | cmdln.getOptionValue("edgeInputFormatClass"), |
297 | |
HCatalogEdgeInputFormat.class); |
298 | |
} |
299 | |
|
300 | 0 | if (cmdln.hasOption("vertexOutputFormatClass")) { |
301 | 0 | vertexOutputFormatClass = findClass( |
302 | 0 | cmdln.getOptionValue("vertexOutputFormatClass"), |
303 | |
HCatalogVertexOutputFormat.class); |
304 | |
} |
305 | |
|
306 | 0 | if (cmdln.hasOption("skipOutput")) { |
307 | 0 | skipOutput = true; |
308 | |
} |
309 | |
|
310 | 0 | if (computationClass == null) { |
311 | 0 | throw new IllegalArgumentException( |
312 | |
"Need the Giraph Computation class name (-computationClass) to use"); |
313 | |
} |
314 | 0 | if (vertexInputFormatClass == null && edgeInputFormatClass == null) { |
315 | 0 | throw new IllegalArgumentException( |
316 | |
"Need at least one of Giraph VertexInputFormat " + |
317 | |
"class name (-vertexInputFormatClass) and " + |
318 | |
"EdgeInputFormat class name (-edgeInputFormatClass)"); |
319 | |
} |
320 | 0 | if (vertexOutputFormatClass == null) { |
321 | 0 | throw new IllegalArgumentException( |
322 | |
"Need the Giraph VertexOutputFormat " + |
323 | |
"class name (-vertexOutputFormatClass) to use"); |
324 | |
} |
325 | 0 | if (!cmdln.hasOption("workers")) { |
326 | 0 | throw new IllegalArgumentException( |
327 | |
"Need to choose the number of workers (-w)"); |
328 | |
} |
329 | 0 | if (!cmdln.hasOption("vertexInputTable") && |
330 | |
vertexInputFormatClass != null) { |
331 | 0 | throw new IllegalArgumentException( |
332 | |
"Need to set the vertex input table name (-vi)"); |
333 | |
} |
334 | 0 | if (!cmdln.hasOption("edgeInputTable") && |
335 | |
edgeInputFormatClass != null) { |
336 | 0 | throw new IllegalArgumentException( |
337 | |
"Need to set the edge input table name (-ei)"); |
338 | |
} |
339 | 0 | if (!cmdln.hasOption("outputTable")) { |
340 | 0 | throw new IllegalArgumentException( |
341 | |
"Need to set the output table name (-o)"); |
342 | |
} |
343 | 0 | dbName = cmdln.getOptionValue("dbName", "default"); |
344 | 0 | vertexInputTableName = cmdln.getOptionValue("vertexInputTable"); |
345 | 0 | vertexInputTableFilterExpr = cmdln.getOptionValue("vertexInputFilter"); |
346 | 0 | edgeInputTableName = cmdln.getOptionValue("edgeInputTable"); |
347 | 0 | edgeInputTableFilterExpr = cmdln.getOptionValue("edgeInputFilter"); |
348 | 0 | outputTableName = cmdln.getOptionValue("outputTable"); |
349 | 0 | outputTablePartitionValues = HiveUtils.parsePartitionValues(cmdln |
350 | 0 | .getOptionValue("outputPartition")); |
351 | 0 | workers = Integer.parseInt(cmdln.getOptionValue("workers")); |
352 | 0 | isVerbose = cmdln.hasOption("verbose"); |
353 | |
|
354 | |
|
355 | 0 | for (String hiveconf : cmdln.getOptionValues("hiveconf")) { |
356 | 0 | String[] keyval = hiveconf.split("=", 2); |
357 | 0 | if (keyval.length == 2) { |
358 | 0 | String name = keyval[0]; |
359 | 0 | String value = keyval[1]; |
360 | 0 | if (name.equals("tmpjars") || name.equals("tmpfiles")) { |
361 | 0 | addToStringCollection( |
362 | |
conf, name, value); |
363 | |
} else { |
364 | 0 | conf.set(name, value); |
365 | |
} |
366 | |
} |
367 | |
} |
368 | |
|
369 | 0 | processMoreArguments(cmdln); |
370 | |
|
371 | 0 | return cmdln; |
372 | |
} |
373 | |
|
374 | |
|
375 | |
|
376 | |
|
377 | |
|
378 | |
|
379 | |
|
380 | |
private static void addToStringCollection(Configuration conf, String name, |
381 | |
String... values) { |
382 | 0 | addToStringCollection(conf, name, Arrays.asList(values)); |
383 | 0 | } |
384 | |
|
385 | |
|
386 | |
|
387 | |
|
388 | |
|
389 | |
|
390 | |
|
391 | |
private static void addToStringCollection( |
392 | |
Configuration conf, String name, Collection |
393 | |
<? extends String> values) { |
394 | 0 | Collection<String> tmpfiles = conf.getStringCollection(name); |
395 | 0 | tmpfiles.addAll(values); |
396 | 0 | conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()])); |
397 | 0 | } |
398 | |
|
399 | |
|
400 | |
|
401 | |
|
402 | |
|
403 | |
|
404 | |
|
405 | |
|
406 | |
private <T> Class<? extends T> findClass(String className, Class<T> base) { |
407 | |
try { |
408 | 0 | Class<?> cls = Class.forName(className); |
409 | 0 | if (base.isAssignableFrom(cls)) { |
410 | 0 | return cls.asSubclass(base); |
411 | |
} |
412 | 0 | return null; |
413 | 0 | } catch (ClassNotFoundException e) { |
414 | 0 | throw new IllegalArgumentException(className + ": Invalid class name"); |
415 | |
} |
416 | |
} |
417 | |
|
418 | |
@Override |
419 | |
public final Configuration getConf() { |
420 | 0 | return conf; |
421 | |
} |
422 | |
|
423 | |
@Override |
424 | |
public final void setConf(Configuration conf) { |
425 | 0 | this.conf = conf; |
426 | 0 | } |
427 | |
|
428 | |
|
429 | |
|
430 | |
|
431 | |
|
432 | |
|
433 | |
|
434 | |
protected void addMoreOptions(Options options) { |
435 | 0 | } |
436 | |
|
437 | |
|
438 | |
|
439 | |
|
440 | |
|
441 | |
|
442 | |
|
443 | |
|
444 | |
protected void processMoreArguments(CommandLine cmd) { |
445 | 0 | } |
446 | |
|
447 | |
|
448 | |
|
449 | |
|
450 | |
|
451 | |
|
452 | |
|
453 | |
|
454 | |
protected void initGiraphJob(GiraphJob job) { |
455 | 0 | LOG.info(getClass().getSimpleName() + " with"); |
456 | 0 | String prefix = "\t"; |
457 | 0 | LOG.info(prefix + "-computationClass=" + |
458 | 0 | computationClass.getCanonicalName()); |
459 | 0 | if (vertexInputFormatClass != null) { |
460 | 0 | LOG.info(prefix + "-vertexInputFormatClass=" + |
461 | 0 | vertexInputFormatClass.getCanonicalName()); |
462 | |
} |
463 | 0 | if (edgeInputFormatClass != null) { |
464 | 0 | LOG.info(prefix + "-edgeInputFormatClass=" + |
465 | 0 | edgeInputFormatClass.getCanonicalName()); |
466 | |
} |
467 | 0 | LOG.info(prefix + "-vertexOutputFormatClass=" + |
468 | 0 | vertexOutputFormatClass.getCanonicalName()); |
469 | 0 | if (vertexInputTableName != null) { |
470 | 0 | LOG.info(prefix + "-vertexInputTable=" + vertexInputTableName); |
471 | |
} |
472 | 0 | if (vertexInputTableFilterExpr != null) { |
473 | 0 | LOG.info(prefix + "-vertexInputFilter=\"" + |
474 | |
vertexInputTableFilterExpr + "\""); |
475 | |
} |
476 | 0 | if (edgeInputTableName != null) { |
477 | 0 | LOG.info(prefix + "-edgeInputTable=" + edgeInputTableName); |
478 | |
} |
479 | 0 | if (edgeInputTableFilterExpr != null) { |
480 | 0 | LOG.info(prefix + "-edgeInputFilter=\"" + |
481 | |
edgeInputTableFilterExpr + "\""); |
482 | |
} |
483 | 0 | LOG.info(prefix + "-outputTable=" + outputTableName); |
484 | 0 | if (outputTablePartitionValues != null) { |
485 | 0 | LOG.info(prefix + "-outputPartition=\"" + |
486 | |
outputTablePartitionValues + "\""); |
487 | |
} |
488 | 0 | LOG.info(prefix + "-workers=" + workers); |
489 | 0 | } |
490 | |
} |