Coverage Report - org.apache.giraph.io.hcatalog.HCatGiraphRunner
 
Classes in this File Line Coverage Branch Coverage Complexity
HCatGiraphRunner
0%
0/165
0%
0/80
5.308
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.hcatalog;
 20  
 
 21  
 import org.apache.commons.cli.CommandLine;
 22  
 import org.apache.commons.cli.CommandLineParser;
 23  
 import org.apache.commons.cli.GnuParser;
 24  
 import org.apache.commons.cli.HelpFormatter;
 25  
 import org.apache.commons.cli.Options;
 26  
 import org.apache.commons.cli.ParseException;
 27  
 import org.apache.giraph.graph.Computation;
 28  
 import org.apache.giraph.io.EdgeInputFormat;
 29  
 import org.apache.giraph.io.VertexInputFormat;
 30  
 import org.apache.giraph.io.VertexOutputFormat;
 31  
 import org.apache.giraph.job.GiraphJob;
 32  
 import org.apache.hadoop.conf.Configuration;
 33  
 import org.apache.hadoop.hive.conf.HiveConf;
 34  
 import org.apache.hadoop.util.Tool;
 35  
 import org.apache.hadoop.util.ToolRunner;
 36  
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 37  
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 38  
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 39  
 import org.apache.log4j.Logger;
 40  
 
 41  
 import com.google.common.collect.Lists;
 42  
 
 43  
 import java.io.File;
 44  
 import java.util.Arrays;
 45  
 import java.util.Collection;
 46  
 import java.util.List;
 47  
 import java.util.Map;
 48  
 
 49  
 /**
 50  
  * Hive Giraph Runner
 51  
  */
 52  
 public class HCatGiraphRunner implements Tool {
 53  
   /**
 54  
    * logger
 55  
    */
 56  0
   private static final Logger LOG = Logger.getLogger(HCatGiraphRunner.class);
 57  
   /**
 58  
    * workers
 59  
    */
 60  
   protected int workers;
 61  
   /**
 62  
    * is verbose
 63  
    */
 64  
   protected boolean isVerbose;
 65  
   /**
 66  
    * output table partitions
 67  
    */
 68  
   protected Map<String, String> outputTablePartitionValues;
 69  
   /**
 70  
    * dbName
 71  
    */
 72  
   protected String dbName;
 73  
   /**
 74  
    * vertex input table name
 75  
    */
 76  
   protected String vertexInputTableName;
 77  
   /**
 78  
    * vertex input table filter
 79  
    */
 80  
   protected String vertexInputTableFilterExpr;
 81  
   /**
 82  
    * edge input table name
 83  
    */
 84  
   protected String edgeInputTableName;
 85  
   /**
 86  
    * edge input table filter
 87  
    */
 88  
   protected String edgeInputTableFilterExpr;
 89  
   /**
 90  
    * output table name
 91  
    */
 92  
   protected String outputTableName;
 93  
   /** Configuration */
 94  
   private Configuration conf;
 95  
   /** Skip output? (Useful for testing without writing) */
 96  0
   private boolean skipOutput = false;
 97  
 
 98  
   /**
 99  
   * computation class.
 100  
   */
 101  
   private Class<? extends Computation> computationClass;
 102  
   /**
 103  
    * vertex input format internal.
 104  
    */
 105  
   private Class<? extends VertexInputFormat> vertexInputFormatClass;
 106  
   /**
 107  
    * edge input format internal.
 108  
    */
 109  
   private Class<? extends EdgeInputFormat> edgeInputFormatClass;
 110  
   /**
 111  
   * vertex output format internal.
 112  
   */
 113  
   private Class<? extends VertexOutputFormat> vertexOutputFormatClass;
 114  
 
 115  
   /**
 116  
   * Giraph runner class.
 117  
    *
 118  
   * @param computationClass Computation class
 119  
   * @param vertexInputFormatClass Vertex input format
 120  
   * @param edgeInputFormatClass Edge input format
 121  
   * @param vertexOutputFormatClass Output format
 122  
   */
 123  
   protected HCatGiraphRunner(
 124  
       Class<? extends Computation> computationClass,
 125  
       Class<? extends VertexInputFormat> vertexInputFormatClass,
 126  
       Class<? extends EdgeInputFormat> edgeInputFormatClass,
 127  0
       Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
 128  0
     this.computationClass = computationClass;
 129  0
     this.vertexInputFormatClass = vertexInputFormatClass;
 130  0
     this.edgeInputFormatClass = edgeInputFormatClass;
 131  0
     this.vertexOutputFormatClass = vertexOutputFormatClass;
 132  0
     this.conf = new HiveConf(getClass());
 133  0
   }
 134  
 
 135  
   /**
 136  
   * main method
 137  
   * @param args system arguments
 138  
   * @throws Exception any errors from Hive Giraph Runner
 139  
   */
 140  
   public static void main(String[] args) throws Exception {
 141  0
     System.exit(ToolRunner.run(
 142  
         new HCatGiraphRunner(null, null, null, null), args));
 143  0
   }
 144  
 
 145  
   @Override
 146  
   public final int run(String[] args) throws Exception {
 147  
     // process args
 148  
     try {
 149  0
       processArguments(args);
 150  0
     } catch (InterruptedException e) {
 151  0
       return 0;
 152  0
     } catch (IllegalArgumentException e) {
 153  0
       System.err.println(e.getMessage());
 154  0
       return -1;
 155  0
     }
 156  
 
 157  
     // additional configuration for Hive
 158  0
     adjustConfigurationForHive(getConf());
 159  
 
 160  
     // setup GiraphJob
 161  0
     GiraphJob job = new GiraphJob(getConf(), getClass().getName());
 162  0
     job.getConfiguration().setComputationClass(computationClass);
 163  
 
 164  
     // setup input from Hive
 165  0
     if (vertexInputFormatClass != null) {
 166  0
       InputJobInfo vertexInputJobInfo = InputJobInfo.create(dbName,
 167  
           vertexInputTableName, vertexInputTableFilterExpr);
 168  0
       GiraphHCatInputFormat.setVertexInput(job.getInternalJob(),
 169  
           vertexInputJobInfo);
 170  0
       job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
 171  
     }
 172  0
     if (edgeInputFormatClass != null) {
 173  0
       InputJobInfo edgeInputJobInfo = InputJobInfo.create(dbName,
 174  
           edgeInputTableName, edgeInputTableFilterExpr);
 175  0
       GiraphHCatInputFormat.setEdgeInput(job.getInternalJob(),
 176  
           edgeInputJobInfo);
 177  0
       job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
 178  
     }
 179  
 
 180  
     // setup output to Hive
 181  0
     HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
 182  
         dbName, outputTableName, outputTablePartitionValues));
 183  0
     HCatOutputFormat.setSchema(job.getInternalJob(),
 184  0
         HCatOutputFormat.getTableSchema(job.getInternalJob()));
 185  0
     if (skipOutput) {
 186  0
       LOG.warn("run: Warning - Output will be skipped!");
 187  
     } else {
 188  0
       job.getConfiguration().setVertexOutputFormatClass(
 189  
           vertexOutputFormatClass);
 190  
     }
 191  
 
 192  0
     job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
 193  0
     initGiraphJob(job);
 194  
 
 195  0
     return job.run(isVerbose) ? 0 : -1;
 196  
   }
 197  
 
 198  
   /**
 199  
   * set hive configuration
 200  
   * @param conf Configuration argument
 201  
   */
 202  
   private static void adjustConfigurationForHive(Configuration conf) {
 203  
     // when output partitions are used, workers register them to the
 204  
     // metastore at cleanup stage, and on HiveConf's initialization, it
 205  
     // looks for hive-site.xml from.
 206  0
     addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
 207  0
         .getResource("hive-site.xml").toString());
 208  
 
 209  
     // Also, you need hive.aux.jars as well
 210  
     // addToStringCollection(conf, "tmpjars",
 211  
     // conf.getStringCollection("hive.aux.jars.path"));
 212  
 
 213  
     // Or, more effectively, we can provide all the jars client needed to
 214  
     // the workers as well
 215  0
     String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
 216  
         File.pathSeparator);
 217  0
     List<String> hadoopJarURLs = Lists.newArrayList();
 218  0
     for (String jarPath : hadoopJars) {
 219  0
       File file = new File(jarPath);
 220  0
       if (file.exists() && file.isFile()) {
 221  0
         String jarURL = file.toURI().toString();
 222  0
         hadoopJarURLs.add(jarURL);
 223  
       }
 224  
     }
 225  0
     addToStringCollection(conf, "tmpjars", hadoopJarURLs);
 226  0
   }
 227  
 
 228  
   /**
 229  
   * process arguments
 230  
   * @param args to process
 231  
   * @return CommandLine instance
 232  
   * @throws ParseException error parsing arguments
 233  
   * @throws InterruptedException interrupted
 234  
   */
 235  
   private CommandLine processArguments(String[] args) throws ParseException,
 236  
             InterruptedException {
 237  0
     Options options = new Options();
 238  0
     options.addOption("h", "help", false, "Help");
 239  0
     options.addOption("v", "verbose", false, "Verbose");
 240  0
     options.addOption("D", "hiveconf", true,
 241  
                 "property=value for Hive/Hadoop configuration");
 242  0
     options.addOption("w", "workers", true, "Number of workers");
 243  0
     if (computationClass == null) {
 244  0
       options.addOption(null, "computationClass", true,
 245  
           "Giraph Computation class to use");
 246  
     }
 247  0
     if (vertexInputFormatClass == null) {
 248  0
       options.addOption(null, "vertexInputFormatClass", true,
 249  
           "Giraph HCatalogVertexInputFormat class to use");
 250  
     }
 251  0
     if (edgeInputFormatClass == null) {
 252  0
       options.addOption(null, "edgeInputFormatClass", true,
 253  
           "Giraph HCatalogEdgeInputFormat class to use");
 254  
     }
 255  
 
 256  0
     if (vertexOutputFormatClass == null) {
 257  0
       options.addOption(null, "vertexOutputFormatClass", true,
 258  
           "Giraph HCatalogVertexOutputFormat class to use");
 259  
     }
 260  
 
 261  0
     options.addOption("db", "dbName", true, "Hive database name");
 262  0
     options.addOption("vi", "vertexInputTable", true,
 263  
         "Vertex input table name");
 264  0
     options.addOption("VI", "vertexInputFilter", true,
 265  
         "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
 266  0
     options.addOption("ei", "edgeInputTable", true,
 267  
         "Edge input table name");
 268  0
     options.addOption("EI", "edgeInputFilter", true,
 269  
         "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
 270  0
     options.addOption("o", "outputTable", true, "Output table name");
 271  0
     options.addOption("O", "outputPartition", true,
 272  
         "Output table partition values (e.g., \"a=1,b=two\")");
 273  0
     options.addOption("s", "skipOutput", false, "Skip output?");
 274  
 
 275  0
     addMoreOptions(options);
 276  
 
 277  0
     CommandLineParser parser = new GnuParser();
 278  0
     final CommandLine cmdln = parser.parse(options, args);
 279  0
     if (args.length == 0 || cmdln.hasOption("help")) {
 280  0
       new HelpFormatter().printHelp(getClass().getName(), options, true);
 281  0
       throw new InterruptedException();
 282  
     }
 283  
 
 284  
     // Giraph classes
 285  0
     if (cmdln.hasOption("computationClass")) {
 286  0
       computationClass = findClass(cmdln.getOptionValue("computationClass"),
 287  
           Computation.class);
 288  
     }
 289  0
     if (cmdln.hasOption("vertexInputFormatClass")) {
 290  0
       vertexInputFormatClass = findClass(
 291  0
           cmdln.getOptionValue("vertexInputFormatClass"),
 292  
           HCatalogVertexInputFormat.class);
 293  
     }
 294  0
     if (cmdln.hasOption("edgeInputFormatClass")) {
 295  0
       edgeInputFormatClass = findClass(
 296  0
           cmdln.getOptionValue("edgeInputFormatClass"),
 297  
           HCatalogEdgeInputFormat.class);
 298  
     }
 299  
 
 300  0
     if (cmdln.hasOption("vertexOutputFormatClass")) {
 301  0
       vertexOutputFormatClass = findClass(
 302  0
           cmdln.getOptionValue("vertexOutputFormatClass"),
 303  
           HCatalogVertexOutputFormat.class);
 304  
     }
 305  
 
 306  0
     if (cmdln.hasOption("skipOutput")) {
 307  0
       skipOutput = true;
 308  
     }
 309  
 
 310  0
     if (computationClass == null) {
 311  0
       throw new IllegalArgumentException(
 312  
           "Need the Giraph Computation class name (-computationClass) to use");
 313  
     }
 314  0
     if (vertexInputFormatClass == null && edgeInputFormatClass == null) {
 315  0
       throw new IllegalArgumentException(
 316  
           "Need at least one of Giraph VertexInputFormat " +
 317  
               "class name (-vertexInputFormatClass) and " +
 318  
               "EdgeInputFormat class name (-edgeInputFormatClass)");
 319  
     }
 320  0
     if (vertexOutputFormatClass == null) {
 321  0
       throw new IllegalArgumentException(
 322  
           "Need the Giraph VertexOutputFormat " +
 323  
               "class name (-vertexOutputFormatClass) to use");
 324  
     }
 325  0
     if (!cmdln.hasOption("workers")) {
 326  0
       throw new IllegalArgumentException(
 327  
           "Need to choose the number of workers (-w)");
 328  
     }
 329  0
     if (!cmdln.hasOption("vertexInputTable") &&
 330  
         vertexInputFormatClass != null) {
 331  0
       throw new IllegalArgumentException(
 332  
           "Need to set the vertex input table name (-vi)");
 333  
     }
 334  0
     if (!cmdln.hasOption("edgeInputTable") &&
 335  
         edgeInputFormatClass != null) {
 336  0
       throw new IllegalArgumentException(
 337  
           "Need to set the edge input table name (-ei)");
 338  
     }
 339  0
     if (!cmdln.hasOption("outputTable")) {
 340  0
       throw new IllegalArgumentException(
 341  
           "Need to set the output table name (-o)");
 342  
     }
 343  0
     dbName = cmdln.getOptionValue("dbName", "default");
 344  0
     vertexInputTableName = cmdln.getOptionValue("vertexInputTable");
 345  0
     vertexInputTableFilterExpr = cmdln.getOptionValue("vertexInputFilter");
 346  0
     edgeInputTableName = cmdln.getOptionValue("edgeInputTable");
 347  0
     edgeInputTableFilterExpr = cmdln.getOptionValue("edgeInputFilter");
 348  0
     outputTableName = cmdln.getOptionValue("outputTable");
 349  0
     outputTablePartitionValues = HiveUtils.parsePartitionValues(cmdln
 350  0
                 .getOptionValue("outputPartition"));
 351  0
     workers = Integer.parseInt(cmdln.getOptionValue("workers"));
 352  0
     isVerbose = cmdln.hasOption("verbose");
 353  
 
 354  
     // pick up -hiveconf arguments
 355  0
     for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
 356  0
       String[] keyval = hiveconf.split("=", 2);
 357  0
       if (keyval.length == 2) {
 358  0
         String name = keyval[0];
 359  0
         String value = keyval[1];
 360  0
         if (name.equals("tmpjars") || name.equals("tmpfiles")) {
 361  0
           addToStringCollection(
 362  
                   conf, name, value);
 363  
         } else {
 364  0
           conf.set(name, value);
 365  
         }
 366  
       }
 367  
     }
 368  
 
 369  0
     processMoreArguments(cmdln);
 370  
 
 371  0
     return cmdln;
 372  
   }
 373  
 
 374  
   /**
 375  
   * add string to collection
 376  
   * @param conf Configuration
 377  
   * @param name name to add
 378  
   * @param values values for collection
 379  
   */
 380  
   private static void addToStringCollection(Configuration conf, String name,
 381  
                                               String... values) {
 382  0
     addToStringCollection(conf, name, Arrays.asList(values));
 383  0
   }
 384  
 
 385  
   /**
 386  
   * add string to collection
 387  
   * @param conf Configuration
 388  
   * @param name to add
 389  
   * @param values values for collection
 390  
   */
 391  
   private static void addToStringCollection(
 392  
           Configuration conf, String name, Collection
 393  
           <? extends String> values) {
 394  0
     Collection<String> tmpfiles = conf.getStringCollection(name);
 395  0
     tmpfiles.addAll(values);
 396  0
     conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
 397  0
   }
 398  
 
 399  
   /**
 400  
   *
 401  
   * @param className to find
 402  
   * @param base  base class
 403  
   * @param <T> class type found
 404  
   * @return type found
 405  
   */
 406  
   private <T> Class<? extends T> findClass(String className, Class<T> base) {
 407  
     try {
 408  0
       Class<?> cls = Class.forName(className);
 409  0
       if (base.isAssignableFrom(cls)) {
 410  0
         return cls.asSubclass(base);
 411  
       }
 412  0
       return null;
 413  0
     } catch (ClassNotFoundException e) {
 414  0
       throw new IllegalArgumentException(className + ": Invalid class name");
 415  
     }
 416  
   }
 417  
 
 418  
   @Override
 419  
   public final Configuration getConf() {
 420  0
     return conf;
 421  
   }
 422  
 
 423  
   @Override
 424  
   public final void setConf(Configuration conf) {
 425  0
     this.conf = conf;
 426  0
   }
 427  
 
 428  
   /**
 429  
   * Override this method to add more command-line options. You can process
 430  
   * them by also overriding {@link #processMoreArguments(CommandLine)}.
 431  
   *
 432  
   * @param options Options
 433  
   */
 434  
   protected void addMoreOptions(Options options) {
 435  0
   }
 436  
 
 437  
   /**
 438  
   * Override this method to process additional command-line arguments. You
 439  
   * may want to declare additional options by also overriding
 440  
   * {@link #addMoreOptions(Options)}.
 441  
   *
 442  
   * @param cmd Command
 443  
   */
 444  
   protected void processMoreArguments(CommandLine cmd) {
 445  0
   }
 446  
 
 447  
   /**
 448  
   * Override this method to do additional setup with the GiraphJob that will
 449  
   * run.
 450  
   *
 451  
   * @param job
 452  
   *            GiraphJob that is going to run
 453  
   */
 454  
   protected void initGiraphJob(GiraphJob job) {
 455  0
     LOG.info(getClass().getSimpleName() + " with");
 456  0
     String prefix = "\t";
 457  0
     LOG.info(prefix + "-computationClass=" +
 458  0
          computationClass.getCanonicalName());
 459  0
     if (vertexInputFormatClass != null) {
 460  0
       LOG.info(prefix + "-vertexInputFormatClass=" +
 461  0
           vertexInputFormatClass.getCanonicalName());
 462  
     }
 463  0
     if (edgeInputFormatClass != null) {
 464  0
       LOG.info(prefix + "-edgeInputFormatClass=" +
 465  0
           edgeInputFormatClass.getCanonicalName());
 466  
     }
 467  0
     LOG.info(prefix + "-vertexOutputFormatClass=" +
 468  0
         vertexOutputFormatClass.getCanonicalName());
 469  0
     if (vertexInputTableName != null) {
 470  0
       LOG.info(prefix + "-vertexInputTable=" + vertexInputTableName);
 471  
     }
 472  0
     if (vertexInputTableFilterExpr != null) {
 473  0
       LOG.info(prefix + "-vertexInputFilter=\"" +
 474  
           vertexInputTableFilterExpr + "\"");
 475  
     }
 476  0
     if (edgeInputTableName != null) {
 477  0
       LOG.info(prefix + "-edgeInputTable=" + edgeInputTableName);
 478  
     }
 479  0
     if (edgeInputTableFilterExpr != null) {
 480  0
       LOG.info(prefix + "-edgeInputFilter=\"" +
 481  
           edgeInputTableFilterExpr + "\"");
 482  
     }
 483  0
     LOG.info(prefix + "-outputTable=" + outputTableName);
 484  0
     if (outputTablePartitionValues != null) {
 485  0
       LOG.info(prefix + "-outputPartition=\"" +
 486  
           outputTablePartitionValues + "\"");
 487  
     }
 488  0
     LOG.info(prefix + "-workers=" + workers);
 489  0
   }
 490  
 }