Coverage Report - org.apache.giraph.job.GiraphJob
 
Classes in this File Line Coverage Branch Coverage Complexity
GiraphJob
0%
0/89
0%
0/34
3
GiraphJob$DelegatedJob
0%
0/8
0%
0/2
3
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.job;
 20  
 
 21  
 import com.google.common.collect.ImmutableList;
 22  
 import org.apache.giraph.bsp.BspInputFormat;
 23  
 import org.apache.giraph.conf.GiraphConfiguration;
 24  
 import org.apache.giraph.conf.GiraphConstants;
 25  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 26  
 import org.apache.giraph.graph.GraphMapper;
 27  
 import org.apache.hadoop.conf.Configuration;
 28  
 import org.apache.hadoop.ipc.Client;
 29  
 import org.apache.hadoop.mapreduce.Job;
 30  
 import org.apache.log4j.Logger;
 31  
 
 32  
 import java.io.IOException;
 33  
 
 34  
 /**
 35  
  * Generates an appropriate internal {@link Job} for using Giraph in Hadoop.
 36  
  * Uses composition to avoid unwanted {@link Job} methods from exposure
 37  
  * to the user.
 38  
  */
 39  0
 public class GiraphJob {
 40  
   static {
 41  0
     Configuration.addDefaultResource("giraph-site.xml");
 42  
   }
 43  
 
 44  
   /** Class logger */
 45  0
   private static final Logger LOG = Logger.getLogger(GiraphJob.class);
 46  
   /** Internal delegated job to proxy interface requests for Job */
 47  
   private final DelegatedJob delegatedJob;
 48  
   /** Name of the job */
 49  
   private String jobName;
 50  
   /** Helper configuration from the job */
 51  
   private final GiraphConfiguration giraphConfiguration;
 52  
 
 53  
   /**
 54  
    * Delegated job that simply passes along the class GiraphConfiguration.
 55  
    */
 56  0
   private class DelegatedJob extends Job {
 57  
     /** Ensure that for job initiation the super.getConfiguration() is used */
 58  0
     private boolean jobInited = false;
 59  
 
 60  
     /**
 61  
      * Constructor
 62  
      *
 63  
      * @param conf Configuration
 64  
      * @throws IOException
 65  
      */
 66  0
     DelegatedJob(Configuration conf) throws IOException {
 67  0
       super(conf);
 68  0
     }
 69  
 
 70  
     @Override
 71  
     public Configuration getConfiguration() {
 72  0
       if (jobInited) {
 73  0
         return giraphConfiguration;
 74  
       } else {
 75  0
         return super.getConfiguration();
 76  
       }
 77  
     }
 78  
   }
 79  
 
 80  
   /**
 81  
    * Constructor that will instantiate the configuration
 82  
    *
 83  
    * @param jobName User-defined job name
 84  
    * @throws IOException
 85  
    */
 86  
   public GiraphJob(String jobName) throws IOException {
 87  0
     this(new GiraphConfiguration(), jobName);
 88  0
   }
 89  
 
 90  
   /**
 91  
    * Constructor.
 92  
    *
 93  
    * @param configuration User-defined configuration
 94  
    * @param jobName User-defined job name
 95  
    * @throws IOException
 96  
    */
 97  
   public GiraphJob(Configuration configuration,
 98  
                    String jobName) throws IOException {
 99  0
     this(new GiraphConfiguration(configuration), jobName);
 100  0
   }
 101  
 
 102  
   /**
 103  
    * Constructor.
 104  
    *
 105  
    * @param giraphConfiguration User-defined configuration
 106  
    * @param jobName User-defined job name
 107  
    * @throws IOException
 108  
    */
 109  
   public GiraphJob(GiraphConfiguration giraphConfiguration,
 110  0
                    String jobName) throws IOException {
 111  0
     this.jobName = jobName;
 112  0
     this.giraphConfiguration = giraphConfiguration;
 113  0
     this.delegatedJob = new DelegatedJob(giraphConfiguration);
 114  0
   }
 115  
 
 116  
   public String getJobName() {
 117  0
     return jobName;
 118  
   }
 119  
 
 120  
   public void setJobName(String jobName) {
 121  0
     this.jobName = jobName;
 122  0
   }
 123  
 
 124  
   /**
 125  
    * Get the configuration from the internal job.
 126  
    *
 127  
    * @return Configuration used by the job.
 128  
    */
 129  
   public GiraphConfiguration getConfiguration() {
 130  0
     return giraphConfiguration;
 131  
   }
 132  
 
 133  
   /**
 134  
    * Be very cautious when using this method as it returns the internal job
 135  
    * of {@link GiraphJob}.  This should only be used for methods that require
 136  
    * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath().
 137  
    *
 138  
    * @return Internal job that will actually be submitted to Hadoop.
 139  
    */
 140  
   public Job getInternalJob() {
 141  0
     delegatedJob.jobInited = true;
 142  0
     return delegatedJob;
 143  
   }
 144  
 
 145  
   /**
 146  
    * Check if the configuration is local.  If it is local, do additional
 147  
    * checks due to the restrictions of LocalJobRunner. This checking is
 148  
    * performed here because the local job runner is MRv1-configured.
 149  
    *
 150  
    * @param conf Configuration
 151  
    */
 152  
   private static void checkLocalJobRunnerConfiguration(
 153  
       ImmutableClassesGiraphConfiguration conf) {
 154  0
     String jobTracker = conf.get("mapred.job.tracker", null);
 155  0
     if (!jobTracker.equals("local")) {
 156  
       // Nothing to check
 157  0
       return;
 158  
     }
 159  
 
 160  0
     int maxWorkers = conf.getMaxWorkers();
 161  0
     if (maxWorkers != 1) {
 162  0
       throw new IllegalArgumentException(
 163  
           "checkLocalJobRunnerConfiguration: When using " +
 164  
               "LocalJobRunner, must have only one worker since " +
 165  
           "only 1 task at a time!");
 166  
     }
 167  0
     if (conf.getSplitMasterWorker()) {
 168  0
       throw new IllegalArgumentException(
 169  
           "checkLocalJobRunnerConfiguration: When using " +
 170  
               "LocalJobRunner, you cannot run in split master / worker " +
 171  
           "mode since there is only 1 task at a time!");
 172  
     }
 173  0
   }
 174  
 
 175  
   /**
 176  
    * Check whether a specified int conf value is set and if not, set it.
 177  
    *
 178  
    * @param param Conf value to check
 179  
    * @param defaultValue Assign to value if not set
 180  
    */
 181  
   private void setIntConfIfDefault(String param, int defaultValue) {
 182  0
     if (giraphConfiguration.getInt(param, Integer.MIN_VALUE) ==
 183  
         Integer.MIN_VALUE) {
 184  0
       giraphConfiguration.setInt(param, defaultValue);
 185  
     }
 186  0
   }
 187  
 
 188  
   /**
 189  
    * Runs the actual graph application through Hadoop Map-Reduce.
 190  
    *
 191  
    * @param verbose If true, provide verbose output, false otherwise
 192  
    * @return True if success, false otherwise
 193  
    * @throws ClassNotFoundException
 194  
    * @throws InterruptedException
 195  
    * @throws IOException
 196  
    */
 197  
   public final boolean run(boolean verbose)
 198  
     throws IOException, InterruptedException, ClassNotFoundException {
 199  
     // Most users won't hit this hopefully and can set it higher if desired
 200  0
     setIntConfIfDefault("mapreduce.job.counters.limit", 512);
 201  
 
 202  
     // Capacity scheduler-specific settings.  These should be enough for
 203  
     // a reasonable Giraph job
 204  0
     setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
 205  0
     setIntConfIfDefault("mapred.job.reduce.memory.mb", 0);
 206  
 
 207  
     // Speculative execution doesn't make sense for Giraph
 208  0
     giraphConfiguration.setBoolean(
 209  
         "mapred.map.tasks.speculative.execution", false);
 210  
 
 211  
     // Set the ping interval to 5 minutes instead of one minute
 212  
     // (DEFAULT_PING_INTERVAL)
 213  0
     Client.setPingInterval(giraphConfiguration, 60000 * 5);
 214  
 
 215  
     // Should work in MAPREDUCE-1938 to let the user jars/classes
 216  
     // get loaded first
 217  0
     giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
 218  0
     giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first", true);
 219  
 
 220  
     // If the checkpoint frequency is 0 (no failure handling), set the max
 221  
     // tasks attempts to be 1 to encourage faster failure of unrecoverable jobs
 222  0
     if (giraphConfiguration.getCheckpointFrequency() == 0) {
 223  0
       int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts();
 224  0
       giraphConfiguration.setMaxTaskAttempts(1);
 225  0
       if (LOG.isInfoEnabled()) {
 226  0
         LOG.info("run: Since checkpointing is disabled (default), " +
 227  
             "do not allow any task retries (setting " +
 228  0
             GiraphConstants.MAX_TASK_ATTEMPTS.getKey() + " = 1, " +
 229  
             "old value = " + oldMaxTaskAttempts + ")");
 230  
       }
 231  
     }
 232  
 
 233  
     // Set the job properties, check them, and submit the job
 234  0
     ImmutableClassesGiraphConfiguration conf =
 235  
         new ImmutableClassesGiraphConfiguration(giraphConfiguration);
 236  0
     checkLocalJobRunnerConfiguration(conf);
 237  
 
 238  0
     int tryCount = 0;
 239  0
     GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker();
 240  
     while (true) {
 241  0
       GiraphJobObserver jobObserver = conf.getJobObserver();
 242  
 
 243  0
       JobProgressTrackerService jobProgressTrackerService =
 244  0
           DefaultJobProgressTrackerService.createJobProgressTrackerService(
 245  
               conf, jobObserver);
 246  0
       ClientThriftServer clientThriftServer = null;
 247  0
       if (jobProgressTrackerService != null) {
 248  0
         clientThriftServer = new ClientThriftServer(
 249  0
             conf, ImmutableList.of(jobProgressTrackerService));
 250  
       }
 251  
 
 252  0
       tryCount++;
 253  0
       Job submittedJob = new Job(conf, jobName);
 254  0
       if (submittedJob.getJar() == null) {
 255  0
         submittedJob.setJarByClass(getClass());
 256  
       }
 257  0
       submittedJob.setNumReduceTasks(0);
 258  0
       submittedJob.setMapperClass(GraphMapper.class);
 259  0
       submittedJob.setInputFormatClass(BspInputFormat.class);
 260  0
       submittedJob.setOutputFormatClass(
 261  0
           GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.get(conf));
 262  0
       if (jobProgressTrackerService != null) {
 263  0
         jobProgressTrackerService.setJob(submittedJob);
 264  
       }
 265  
 
 266  0
       jobObserver.launchingJob(submittedJob);
 267  0
       submittedJob.submit();
 268  0
       if (LOG.isInfoEnabled()) {
 269  0
         LOG.info("Tracking URL: " + submittedJob.getTrackingURL());
 270  0
         LOG.info(
 271  
             "Waiting for resources... Job will start only when it gets all " +
 272  0
                 (conf.getMinWorkers() + 1) + " mappers");
 273  
       }
 274  0
       jobObserver.jobRunning(submittedJob);
 275  0
       HaltApplicationUtils.printHaltInfo(submittedJob, conf);
 276  
 
 277  0
       boolean passed = submittedJob.waitForCompletion(verbose);
 278  0
       if (jobProgressTrackerService != null) {
 279  0
         jobProgressTrackerService.stop(passed);
 280  
       }
 281  0
       if (clientThriftServer != null) {
 282  0
         clientThriftServer.stopThriftServer();
 283  
       }
 284  
 
 285  0
       jobObserver.jobFinished(submittedJob, passed);
 286  
 
 287  0
       if (!passed) {
 288  0
         String restartFrom = retryChecker.shouldRestartCheckpoint(submittedJob);
 289  0
         if (restartFrom != null) {
 290  0
           GiraphConstants.RESTART_JOB_ID.set(conf, restartFrom);
 291  0
           continue;
 292  
         }
 293  
       }
 294  
 
 295  0
       if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {
 296  0
         return passed;
 297  
       }
 298  0
       if (LOG.isInfoEnabled()) {
 299  0
         LOG.info("run: Retrying job, " + tryCount + " try");
 300  
       }
 301  0
     }
 302  
   }
 303  
 }