Coverage Report - org.apache.giraph.utils.InternalVertexRunner
 
Classes in this File Line Coverage Branch Coverage Complexity
InternalVertexRunner
0%
0/102
0%
0/22
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.utils;
 20  
 
 21  
 import com.google.common.base.Charsets;
 22  
 import com.google.common.collect.ImmutableList;
 23  
 import com.google.common.io.Files;
 24  
 import org.apache.giraph.conf.GiraphConfiguration;
 25  
 import org.apache.giraph.conf.GiraphConstants;
 26  
 import org.apache.giraph.io.formats.FileOutputFormatUtil;
 27  
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 28  
 import org.apache.giraph.io.formats.InMemoryVertexOutputFormat;
 29  
 import org.apache.giraph.job.GiraphJob;
 30  
 import org.apache.giraph.zk.InProcessZooKeeperRunner;
 31  
 import org.apache.giraph.zk.ZookeeperConfig;
 32  
 import org.apache.hadoop.fs.Path;
 33  
 import org.apache.hadoop.io.Writable;
 34  
 import org.apache.hadoop.io.WritableComparable;
 35  
 import org.apache.hadoop.mapreduce.Job;
 36  
 import org.apache.log4j.Logger;
 37  
 
 38  
 import java.io.File;
 39  
 import java.io.IOException;
 40  
 import java.net.InetSocketAddress;
 41  
 
 42  
 /**
 43  
  * A base class for running internal tests on a vertex
 44  
  *
 45  
  * Extending classes only have to invoke the run() method to test their vertex.
 46  
  * All data is written to a local tmp directory that is removed afterwards.
 47  
  * A local zookeeper instance is started in an extra thread and
 48  
  * shutdown at the end.
 49  
  *
 50  
  * Heavily inspired from Apache Mahout's MahoutTestCase
 51  
  */
 52  
 @SuppressWarnings("unchecked")
 53  
 public class InternalVertexRunner {
 54  
 
 55  
   /** Logger */
 56  0
   private static final Logger LOG =
 57  0
       Logger.getLogger(InternalVertexRunner.class);
 58  
 
 59  
   /** Don't construct */
 60  0
   private InternalVertexRunner() { }
 61  
 
 62  
   /**
 63  
    * Attempts to run the vertex internally in the current JVM, reading from and
 64  
    * writing to a temporary folder on local disk. Will start its own zookeeper
 65  
    * instance.
 66  
    *
 67  
    * @param conf GiraphClasses specifying which types to use
 68  
    * @param vertexInputData linewise vertex input data
 69  
    * @return linewise output data, or null if job fails
 70  
    * @throws Exception if anything goes wrong
 71  
    */
 72  
   public static Iterable<String> run(
 73  
       GiraphConfiguration conf,
 74  
       String[] vertexInputData) throws Exception {
 75  0
     return run(conf, vertexInputData, null);
 76  
   }
 77  
 
 78  
   /**
 79  
    * Run the ZooKeeper in-process and the job.
 80  
    *
 81  
    * @param zookeeperConfig Quorum peer configuration
 82  
    * @param giraphJob Giraph job to run
 83  
    * @return True if successful, false otherwise
 84  
    */
 85  
   private static boolean runZooKeeperAndJob(
 86  
       final ZookeeperConfig zookeeperConfig,
 87  
       GiraphJob giraphJob) throws IOException {
 88  0
     final InProcessZooKeeperRunner.ZooKeeperServerRunner zookeeper =
 89  
         new InProcessZooKeeperRunner.ZooKeeperServerRunner();
 90  
 
 91  0
     int port = zookeeper.start(zookeeperConfig);
 92  
 
 93  0
     LOG.info("Started test zookeeper on port " + port);
 94  0
     GiraphConstants.ZOOKEEPER_LIST.set(giraphJob.getConfiguration(),
 95  
         "localhost:" + port);
 96  
     try {
 97  0
       return giraphJob.run(true);
 98  0
     } catch (InterruptedException |
 99  
         ClassNotFoundException | IOException e) {
 100  0
       LOG.error("runZooKeeperAndJob: Got exception on running", e);
 101  
     } finally {
 102  0
       zookeeper.stop();
 103  0
     }
 104  
 
 105  0
     return false;
 106  
   }
 107  
 
 108  
   /**
 109  
    * Attempts to run the vertex internally in the current JVM, reading from and
 110  
    * writing to a temporary folder on local disk. Will start its own zookeeper
 111  
    * instance.
 112  
    *
 113  
    *
 114  
    * @param conf GiraphClasses specifying which types to use
 115  
    * @param vertexInputData linewise vertex input data
 116  
    * @param edgeInputData linewise edge input data
 117  
    * @return linewise output data, or null if job fails
 118  
    * @throws Exception if anything goes wrong
 119  
    */
 120  
   public static Iterable<String> run(
 121  
       GiraphConfiguration conf,
 122  
       String[] vertexInputData,
 123  
       String[] edgeInputData) throws Exception {
 124  
     // Prepare input file, output folder and temporary folders
 125  0
     File tmpDir = FileUtils.createTestDir(conf.getComputationName());
 126  
     try {
 127  0
       return run(conf, vertexInputData, edgeInputData, null, tmpDir);
 128  
     } finally {
 129  0
       FileUtils.delete(tmpDir);
 130  0
     }
 131  
   }
 132  
 
 133  
   /**
 134  
    * Attempts to run the vertex internally in the current JVM, reading from and
 135  
    * writing to a temporary folder on local disk. Will start its own zookeeper
 136  
    * instance.
 137  
    *
 138  
    *
 139  
    * @param conf GiraphClasses specifying which types to use
 140  
    * @param vertexInputData linewise vertex input data
 141  
    * @param edgeInputData linewise edge input data
 142  
    * @param checkpointsDir if set, will use this folder
 143  
    *                          for storing checkpoints.
 144  
    * @param tmpDir file path for storing temporary files.
 145  
    * @return linewise output data, or null if job fails
 146  
    * @throws Exception if anything goes wrong
 147  
    */
 148  
   public static Iterable<String> run(
 149  
       GiraphConfiguration conf,
 150  
       String[] vertexInputData,
 151  
       String[] edgeInputData,
 152  
       String checkpointsDir,
 153  
       File tmpDir) throws Exception {
 154  0
     File vertexInputFile = null;
 155  0
     File edgeInputFile = null;
 156  0
     if (conf.hasVertexInputFormat()) {
 157  0
       vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
 158  
     }
 159  0
     if (conf.hasEdgeInputFormat()) {
 160  0
       edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
 161  
     }
 162  
 
 163  0
     File outputDir = FileUtils.createTempDir(tmpDir, "output");
 164  0
     File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
 165  0
     File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
 166  0
     File mrLocalDir = FileUtils.createTempDir(tmpDir, "_mapred");
 167  
     // Write input data to disk
 168  0
     if (conf.hasVertexInputFormat()) {
 169  0
       FileUtils.writeLines(vertexInputFile, vertexInputData);
 170  
     }
 171  0
     if (conf.hasEdgeInputFormat()) {
 172  0
       FileUtils.writeLines(edgeInputFile, edgeInputData);
 173  
     }
 174  
 
 175  0
     conf.setWorkerConfiguration(1, 1, 100.0f);
 176  0
     GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
 177  0
     GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
 178  0
     conf.setIfUnset("mapred.job.tracker", "local");
 179  0
     conf.setIfUnset("mapred.local.dir", mrLocalDir.toString());
 180  
 
 181  0
     conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
 182  0
     GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
 183  0
         zkMgrDir.toString());
 184  
 
 185  0
     if (checkpointsDir == null) {
 186  0
       checkpointsDir = FileUtils.createTempDir(
 187  0
           tmpDir, "_checkpoints").toString();
 188  
     }
 189  0
     GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
 190  
 
 191  
     // Create and configure the job to run the vertex
 192  0
     GiraphJob job = new GiraphJob(conf, conf.getComputationName());
 193  
 
 194  0
     Job internalJob = job.getInternalJob();
 195  0
     if (conf.hasVertexInputFormat()) {
 196  0
       GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(),
 197  0
           new Path(vertexInputFile.toString()));
 198  
     }
 199  0
     if (conf.hasEdgeInputFormat()) {
 200  0
       GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(),
 201  0
           new Path(edgeInputFile.toString()));
 202  
     }
 203  0
     FileOutputFormatUtil.setOutputPath(job.getInternalJob(),
 204  0
         new Path(outputDir.toString()));
 205  
 
 206  
     // Configure a local zookeeper instance
 207  0
     ZookeeperConfig qpConfig = configLocalZooKeeper(zkDir);
 208  
 
 209  0
     boolean success = runZooKeeperAndJob(qpConfig, job);
 210  0
     if (!success) {
 211  0
       return null;
 212  
     }
 213  
 
 214  0
     File outFile = new File(outputDir, "part-m-00000");
 215  0
     if (conf.hasVertexOutputFormat() && outFile.canRead()) {
 216  0
       return Files.readLines(outFile, Charsets.UTF_8);
 217  
     } else {
 218  0
       return ImmutableList.of();
 219  
     }
 220  
 
 221  
   }
 222  
 
 223  
   /**
 224  
    * Attempts to run the vertex internally in the current JVM,
 225  
    * reading from an in-memory graph. Will start its own zookeeper
 226  
    * instance.
 227  
    *
 228  
    * @param <I> Vertex ID
 229  
    * @param <V> Vertex Value
 230  
    * @param <E> Edge Value
 231  
    * @param conf GiraphClasses specifying which types to use
 232  
    * @param graph input graph
 233  
    * @throws Exception if anything goes wrong
 234  
    */
 235  
   public static <I extends WritableComparable,
 236  
       V extends Writable,
 237  
       E extends Writable> void run(
 238  
       GiraphConfiguration conf,
 239  
       TestGraph<I, V, E> graph) throws Exception {
 240  
     // Prepare temporary folders
 241  0
     File tmpDir = FileUtils.createTestDir(conf.getComputationName());
 242  
     try {
 243  0
       run(conf, graph, tmpDir, null);
 244  
     } finally {
 245  0
       FileUtils.delete(tmpDir);
 246  0
     }
 247  0
   }
 248  
 
 249  
   /**
 250  
    * Attempts to run the vertex internally in the current JVM,
 251  
    * reading from an in-memory graph. Will start its own zookeeper
 252  
    * instance.
 253  
    *
 254  
    * @param <I> Vertex ID
 255  
    * @param <V> Vertex Value
 256  
    * @param <E> Edge Value
 257  
    * @param conf GiraphClasses specifying which types to use
 258  
    * @param graph input graph
 259  
    * @param tmpDir file path for storing temporary files.
 260  
    * @param checkpointsDir if set, will use this folder
 261  
    *                          for storing checkpoints.
 262  
    * @throws Exception if anything goes wrong
 263  
    */
 264  
   public static <I extends WritableComparable,
 265  
       V extends Writable,
 266  
       E extends Writable> void run(
 267  
       GiraphConfiguration conf,
 268  
       TestGraph<I, V, E> graph,
 269  
       File tmpDir,
 270  
       String checkpointsDir) throws Exception {
 271  0
     File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
 272  0
     File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
 273  0
     File mrLocalDir = FileUtils.createTempDir(tmpDir, "_mapred");
 274  
 
 275  0
     if (checkpointsDir == null) {
 276  0
       checkpointsDir = FileUtils.
 277  0
           createTempDir(tmpDir, "_checkpoints").toString();
 278  
     }
 279  
 
 280  0
     conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
 281  
 
 282  
     // Create and configure the job to run the vertex
 283  0
     GiraphJob job = new GiraphJob(conf, conf.getComputationName());
 284  
 
 285  0
     InMemoryVertexInputFormat.setGraph(graph);
 286  
 
 287  0
     conf.setWorkerConfiguration(1, 1, 100.0f);
 288  0
     GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
 289  0
     GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
 290  0
     GiraphConstants.ZOOKEEPER_SERVER_PORT.set(conf, 0);
 291  0
     conf.setIfUnset("mapred.job.tracker", "local");
 292  0
     conf.setIfUnset("mapred.local.dir", mrLocalDir.toString());
 293  
 
 294  0
     conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
 295  0
     GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
 296  0
         zkMgrDir.toString());
 297  0
     GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
 298  
 
 299  0
     runZooKeeperAndJob(configLocalZooKeeper(zkDir), job);
 300  0
   }
 301  
 
 302  
   /**
 303  
    * Attempts to run the vertex internally in the current JVM, reading and
 304  
    * writing to an in-memory graph. Will start its own zookeeper
 305  
    * instance.
 306  
    *
 307  
    * @param <I> Vertex ID
 308  
    * @param <V> Vertex Value
 309  
    * @param <E> Edge Value
 310  
    * @param conf GiraphClasses specifying which types to use
 311  
    * @param graph input graph
 312  
    * @return Output graph
 313  
    * @throws Exception if anything goes wrong
 314  
    */
 315  
   public static <I extends WritableComparable,
 316  
       V extends Writable,
 317  
       E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
 318  
       GiraphConfiguration conf,
 319  
       TestGraph<I, V, E> graph) throws Exception {
 320  
     // Prepare temporary folders
 321  0
     File tmpDir = FileUtils.createTestDir(conf.getComputationName());
 322  
     try {
 323  0
       return runWithInMemoryOutput(conf, graph, tmpDir, null);
 324  
     } finally {
 325  0
       FileUtils.delete(tmpDir);
 326  0
     }
 327  
   }
 328  
 
 329  
   /**
 330  
    * Attempts to run the vertex internally in the current JVM, reading and
 331  
    * writing to an in-memory graph. Will start its own zookeeper
 332  
    * instance.
 333  
    *
 334  
    * @param <I> Vertex ID
 335  
    * @param <V> Vertex Value
 336  
    * @param <E> Edge Value
 337  
    * @param conf GiraphClasses specifying which types to use
 338  
    * @param graph input graph
 339  
    * @param tmpDir file path for storing temporary files.
 340  
    * @param checkpointsDir if set, will use this folder
 341  
    *                       for storing checkpoints.
 342  
    * @return Output graph
 343  
    * @throws Exception if anything goes wrong
 344  
    */
 345  
   public static <I extends WritableComparable,
 346  
       V extends Writable,
 347  
       E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
 348  
       GiraphConfiguration conf,
 349  
       TestGraph<I, V, E> graph,
 350  
       File tmpDir,
 351  
       String checkpointsDir) throws Exception {
 352  0
     conf.setVertexOutputFormatClass(InMemoryVertexOutputFormat.class);
 353  0
     InMemoryVertexOutputFormat.initializeOutputGraph(conf);
 354  0
     InternalVertexRunner.run(conf, graph, tmpDir, checkpointsDir);
 355  0
     return InMemoryVertexOutputFormat.getOutputGraph();
 356  
   }
 357  
 
 358  
   /**
 359  
    * Configuration options for running local ZK.
 360  
    *
 361  
    * @param zkDir directory for ZK to hold files in.
 362  
    * @return zookeeper configuration object
 363  
    */
 364  
   private static ZookeeperConfig configLocalZooKeeper(File zkDir) {
 365  0
     ZookeeperConfig config = new ZookeeperConfig();
 366  0
     config.setMaxSessionTimeout(100000);
 367  0
     config.setMinSessionTimeout(10000);
 368  0
     config.setClientPortAddress(new InetSocketAddress("localhost", 0));
 369  0
     config.setDataDir(zkDir.getAbsolutePath());
 370  0
     return config;
 371  
   }
 372  
 
 373  
 }