Coverage Report - org.apache.giraph.conf.GiraphConfiguration
 
Classes in this File Line Coverage Branch Coverage Complexity
GiraphConfiguration
0%
0/229
0%
0/40
1.148
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.conf;
 20  
 
 21  
 import io.netty.buffer.ByteBufAllocator;
 22  
 import io.netty.buffer.PooledByteBufAllocator;
 23  
 import io.netty.buffer.UnpooledByteBufAllocator;
 24  
 
 25  
 import java.net.InetAddress;
 26  
 import java.net.UnknownHostException;
 27  
 
 28  
 import org.apache.giraph.aggregators.AggregatorWriter;
 29  
 import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
 30  
 import org.apache.giraph.combiner.MessageCombiner;
 31  
 import org.apache.giraph.edge.OutEdges;
 32  
 import org.apache.giraph.edge.ReuseObjectsOutEdges;
 33  
 import org.apache.giraph.factories.ComputationFactory;
 34  
 import org.apache.giraph.factories.VertexValueFactory;
 35  
 import org.apache.giraph.graph.Computation;
 36  
 import org.apache.giraph.graph.MapperObserver;
 37  
 import org.apache.giraph.graph.Vertex;
 38  
 import org.apache.giraph.graph.VertexResolver;
 39  
 import org.apache.giraph.graph.VertexValueCombiner;
 40  
 import org.apache.giraph.io.EdgeInputFormat;
 41  
 import org.apache.giraph.io.EdgeOutputFormat;
 42  
 import org.apache.giraph.io.MappingInputFormat;
 43  
 import org.apache.giraph.io.VertexInputFormat;
 44  
 import org.apache.giraph.io.VertexOutputFormat;
 45  
 import org.apache.giraph.io.filters.EdgeInputFilter;
 46  
 import org.apache.giraph.io.filters.VertexInputFilter;
 47  
 import org.apache.giraph.job.GiraphJobObserver;
 48  
 import org.apache.giraph.job.GiraphJobRetryChecker;
 49  
 import org.apache.giraph.master.MasterCompute;
 50  
 import org.apache.giraph.master.MasterObserver;
 51  
 import org.apache.giraph.partition.GraphPartitionerFactory;
 52  
 import org.apache.giraph.partition.Partition;
 53  
 import org.apache.giraph.partition.ReusesObjectsPartition;
 54  
 import org.apache.giraph.utils.GcObserver;
 55  
 import org.apache.giraph.utils.ReflectionUtils;
 56  
 import org.apache.giraph.worker.WorkerContext;
 57  
 import org.apache.giraph.worker.WorkerObserver;
 58  
 import org.apache.hadoop.conf.Configuration;
 59  
 import org.apache.hadoop.mapreduce.Mapper;
 60  
 import org.apache.hadoop.net.DNS;
 61  
 
 62  
 /**
 63  
  * Adds user methods specific to Giraph.  This will be put into an
 64  
  * ImmutableClassesGiraphConfiguration that provides the configuration plus
 65  
  * the immutable classes.
 66  
  *
 67  
  * Keeps track of parameters which were set so it easily set them in another
 68  
  * copy of configuration.
 69  
  */
 70  
 public class GiraphConfiguration extends Configuration
 71  
     implements GiraphConstants {
 72  
   /** ByteBufAllocator to be used by netty */
 73  0
   private ByteBufAllocator nettyBufferAllocator = null;
 74  
 
 75  
   /**
 76  
    * Constructor that creates the configuration
 77  
    */
 78  0
   public GiraphConfiguration() {
 79  0
     configureHadoopSecurity();
 80  0
   }
 81  
 
 82  
   /**
 83  
    * Constructor.
 84  
    *
 85  
    * @param conf Configuration
 86  
    */
 87  
   public GiraphConfiguration(Configuration conf) {
 88  0
     super(conf);
 89  0
     configureHadoopSecurity();
 90  0
   }
 91  
 
 92  
   /**
 93  
    * Get name of computation being run. We leave this up to the
 94  
    * {@link ComputationFactory} to decide what to return.
 95  
    *
 96  
    * @return Name of computation being run
 97  
    */
 98  
   public String getComputationName() {
 99  0
     ComputationFactory compFactory = ReflectionUtils.newInstance(
 100  0
         getComputationFactoryClass());
 101  0
     return compFactory.computationName(this);
 102  
   }
 103  
 
 104  
   /**
 105  
    * Get the user's subclassed {@link ComputationFactory}
 106  
    *
 107  
    * @return User's computation factory class
 108  
    */
 109  
   public Class<? extends ComputationFactory> getComputationFactoryClass() {
 110  0
     return COMPUTATION_FACTORY_CLASS.get(this);
 111  
   }
 112  
 
 113  
   /**
 114  
    * Get the user's subclassed {@link Computation}
 115  
    *
 116  
    * @return User's computation class
 117  
    */
 118  
   public Class<? extends Computation> getComputationClass() {
 119  0
     return COMPUTATION_CLASS.get(this);
 120  
   }
 121  
 
 122  
   /**
 123  
    * Set the computation class (required)
 124  
    *
 125  
    * @param computationClass Runs vertex computation
 126  
    */
 127  
   public void setComputationClass(
 128  
       Class<? extends Computation> computationClass) {
 129  0
     COMPUTATION_CLASS.set(this, computationClass);
 130  0
   }
 131  
 
 132  
   /**
 133  
    * Set the vertex value factory class
 134  
    *
 135  
    * @param vertexValueFactoryClass Creates default vertex values
 136  
    */
 137  
   public final void setVertexValueFactoryClass(
 138  
       Class<? extends VertexValueFactory> vertexValueFactoryClass) {
 139  0
     VERTEX_VALUE_FACTORY_CLASS.set(this, vertexValueFactoryClass);
 140  0
   }
 141  
 
 142  
   /**
 143  
    * Set the edge input filter class
 144  
    *
 145  
    * @param edgeFilterClass class to use
 146  
    */
 147  
   public void setEdgeInputFilterClass(
 148  
       Class<? extends EdgeInputFilter> edgeFilterClass) {
 149  0
     EDGE_INPUT_FILTER_CLASS.set(this, edgeFilterClass);
 150  0
   }
 151  
 
 152  
   /**
 153  
    * Set the vertex input filter class
 154  
    *
 155  
    * @param vertexFilterClass class to use
 156  
    */
 157  
   public void setVertexInputFilterClass(
 158  
       Class<? extends VertexInputFilter> vertexFilterClass) {
 159  0
     VERTEX_INPUT_FILTER_CLASS.set(this, vertexFilterClass);
 160  0
   }
 161  
 
 162  
   /**
 163  
    * Get the vertex edges class
 164  
    *
 165  
    * @return vertex edges class
 166  
    */
 167  
   public Class<? extends OutEdges> getOutEdgesClass() {
 168  0
     return VERTEX_EDGES_CLASS.get(this);
 169  
   }
 170  
 
 171  
   /**
 172  
    * Set the vertex edges class
 173  
    *
 174  
    * @param outEdgesClass Determines the way edges are stored
 175  
    */
 176  
   public final void setOutEdgesClass(
 177  
       Class<? extends OutEdges> outEdgesClass) {
 178  0
     VERTEX_EDGES_CLASS.set(this, outEdgesClass);
 179  0
   }
 180  
 
 181  
   /**
 182  
    * Set the vertex implementation class
 183  
    *
 184  
    * @param vertexClass class of the vertex implementation
 185  
    */
 186  
   public final void setVertexClass(Class<? extends Vertex> vertexClass) {
 187  0
     VERTEX_CLASS.set(this, vertexClass);
 188  0
   }
 189  
 
 190  
 
 191  
   /**
 192  
    * Set the vertex edges class used during edge-based input (if different
 193  
    * from the one used during computation)
 194  
    *
 195  
    * @param inputOutEdgesClass Determines the way edges are stored
 196  
    */
 197  
   public final void setInputOutEdgesClass(
 198  
       Class<? extends OutEdges> inputOutEdgesClass) {
 199  0
     INPUT_VERTEX_EDGES_CLASS.set(this, inputOutEdgesClass);
 200  0
   }
 201  
 
 202  
   /**
 203  
    * True if the {@link org.apache.giraph.edge.OutEdges} implementation
 204  
    * copies the passed edges to its own data structure,
 205  
    * i.e. it doesn't keep references to Edge objects, target vertex ids or edge
 206  
    * values passed to add() or initialize().
 207  
    * This makes it possible to reuse edge objects passed to the data
 208  
    * structure, to minimize object instantiation (see for example
 209  
    * EdgeStore#addPartitionEdges()).
 210  
    *
 211  
    * @return True iff we can reuse the edge objects
 212  
    */
 213  
   public boolean reuseEdgeObjects() {
 214  0
     return ReuseObjectsOutEdges.class.isAssignableFrom(
 215  0
         getOutEdgesClass());
 216  
   }
 217  
 
 218  
   /**
 219  
    * True if the {@link Partition} implementation copies the passed vertices
 220  
    * to its own data structure, i.e. it doesn't keep references to Vertex
 221  
    * objects passed to it.
 222  
    * This makes it possible to reuse vertex objects passed to the data
 223  
    * structure, to minimize object instantiation.
 224  
    *
 225  
    * @return True iff we can reuse the vertex objects
 226  
    */
 227  
   public boolean reuseVertexObjects() {
 228  0
     return ReusesObjectsPartition.class.isAssignableFrom(getPartitionClass());
 229  
   }
 230  
 
 231  
   /**
 232  
    * Get Partition class used
 233  
    * @return Partition class
 234  
    */
 235  
   public Class<? extends Partition> getPartitionClass() {
 236  0
     return PARTITION_CLASS.get(this);
 237  
   }
 238  
 
 239  
   /**
 240  
    * Does the job have a {@link VertexInputFormat}?
 241  
    *
 242  
    * @return True iff a {@link VertexInputFormat} has been specified.
 243  
    */
 244  
   public boolean hasVertexInputFormat() {
 245  0
     return VERTEX_INPUT_FORMAT_CLASS.get(this) != null;
 246  
   }
 247  
 
 248  
   /**
 249  
    * Set the vertex input format class (required)
 250  
    *
 251  
    * @param vertexInputFormatClass Determines how graph is input
 252  
    */
 253  
   public void setVertexInputFormatClass(
 254  
       Class<? extends VertexInputFormat> vertexInputFormatClass) {
 255  0
     VERTEX_INPUT_FORMAT_CLASS.set(this, vertexInputFormatClass);
 256  0
   }
 257  
 
 258  
   /**
 259  
    * Does the job have a {@link EdgeInputFormat}?
 260  
    *
 261  
    * @return True iff a {@link EdgeInputFormat} has been specified.
 262  
    */
 263  
   public boolean hasEdgeInputFormat() {
 264  0
     return EDGE_INPUT_FORMAT_CLASS.get(this) != null;
 265  
   }
 266  
 
 267  
   /**
 268  
    * Set the edge input format class (required)
 269  
    *
 270  
    * @param edgeInputFormatClass Determines how graph is input
 271  
    */
 272  
   public void setEdgeInputFormatClass(
 273  
       Class<? extends EdgeInputFormat> edgeInputFormatClass) {
 274  0
     EDGE_INPUT_FORMAT_CLASS.set(this, edgeInputFormatClass);
 275  0
   }
 276  
 
 277  
   /**
 278  
    * Set the mapping input format class (optional)
 279  
    *
 280  
    * @param mappingInputFormatClass Determines how mappings are input
 281  
    */
 282  
   public void setMappingInputFormatClass(
 283  
     Class<? extends MappingInputFormat> mappingInputFormatClass) {
 284  0
     MAPPING_INPUT_FORMAT_CLASS.set(this, mappingInputFormatClass);
 285  0
   }
 286  
 
 287  
   /**
 288  
    * Set the master class (optional)
 289  
    *
 290  
    * @param masterComputeClass Runs master computation
 291  
    */
 292  
   public final void setMasterComputeClass(
 293  
       Class<? extends MasterCompute> masterComputeClass) {
 294  0
     MASTER_COMPUTE_CLASS.set(this, masterComputeClass);
 295  0
   }
 296  
 
 297  
   /**
 298  
    * Add a MasterObserver class (optional)
 299  
    *
 300  
    * @param masterObserverClass MasterObserver class to add.
 301  
    */
 302  
   public final void addMasterObserverClass(
 303  
       Class<? extends MasterObserver> masterObserverClass) {
 304  0
     MASTER_OBSERVER_CLASSES.add(this, masterObserverClass);
 305  0
   }
 306  
 
 307  
   /**
 308  
    * Add a WorkerObserver class (optional)
 309  
    *
 310  
    * @param workerObserverClass WorkerObserver class to add.
 311  
    */
 312  
   public final void addWorkerObserverClass(
 313  
       Class<? extends WorkerObserver> workerObserverClass) {
 314  0
     WORKER_OBSERVER_CLASSES.add(this, workerObserverClass);
 315  0
   }
 316  
 
 317  
   /**
 318  
    * Add a MapperObserver class (optional)
 319  
    *
 320  
    * @param mapperObserverClass MapperObserver class to add.
 321  
    */
 322  
   public final void addMapperObserverClass(
 323  
       Class<? extends MapperObserver> mapperObserverClass) {
 324  0
     MAPPER_OBSERVER_CLASSES.add(this, mapperObserverClass);
 325  0
   }
 326  
 
 327  
   /**
 328  
    * Add a GcObserver class (optional)
 329  
    *
 330  
    * @param gcObserverClass GcObserver class to add.
 331  
    */
 332  
   public final void addGcObserverClass(
 333  
       Class<? extends GcObserver> gcObserverClass) {
 334  0
     GC_OBSERVER_CLASSES.add(this, gcObserverClass);
 335  0
   }
 336  
 
 337  
   /**
 338  
    * Get job observer class
 339  
    *
 340  
    * @return GiraphJobObserver class set.
 341  
    */
 342  
   public Class<? extends GiraphJobObserver> getJobObserverClass() {
 343  0
     return JOB_OBSERVER_CLASS.get(this);
 344  
   }
 345  
 
 346  
   /**
 347  
    * Set job observer class
 348  
    *
 349  
    * @param klass GiraphJobObserver class to set.
 350  
    */
 351  
   public void setJobObserverClass(Class<? extends GiraphJobObserver> klass) {
 352  0
     JOB_OBSERVER_CLASS.set(this, klass);
 353  0
   }
 354  
 
 355  
   /**
 356  
    * Get job retry checker class
 357  
    *
 358  
    * @return GiraphJobRetryChecker class set.
 359  
    */
 360  
   public Class<? extends GiraphJobRetryChecker> getJobRetryCheckerClass() {
 361  0
     return JOB_RETRY_CHECKER_CLASS.get(this);
 362  
   }
 363  
 
 364  
   /**
 365  
    * Set job retry checker class
 366  
    *
 367  
    * @param klass GiraphJobRetryChecker class to set.
 368  
    */
 369  
   public void setJobRetryCheckerClass(
 370  
       Class<? extends GiraphJobRetryChecker> klass) {
 371  0
     JOB_RETRY_CHECKER_CLASS.set(this, klass);
 372  0
   }
 373  
 
 374  
   /**
 375  
    * Check whether to enable jmap dumping thread.
 376  
    *
 377  
    * @return true if jmap dumper is enabled.
 378  
    */
 379  
   public boolean isJMapHistogramDumpEnabled() {
 380  0
     return JMAP_ENABLE.get(this);
 381  
   }
 382  
 
 383  
   /**
 384  
    * Check whether to enable heap memory supervisor thread
 385  
    *
 386  
    * @return true if jmap dumper is reactively enabled
 387  
    */
 388  
   public boolean isReactiveJmapHistogramDumpEnabled() {
 389  0
     return REACTIVE_JMAP_ENABLE.get(this);
 390  
   }
 391  
 
 392  
   /**
 393  
    * Set mapping from a key name to a list of classes.
 394  
    *
 395  
    * @param name String key name to use.
 396  
    * @param xface interface of the classes being set.
 397  
    * @param klasses Classes to set.
 398  
    */
 399  
   public final void setClasses(String name, Class<?> xface,
 400  
                                Class<?> ... klasses) {
 401  0
     String[] klassNames = new String[klasses.length];
 402  0
     for (int i = 0; i < klasses.length; ++i) {
 403  0
       Class<?> klass = klasses[i];
 404  0
       if (!xface.isAssignableFrom(klass)) {
 405  0
         throw new RuntimeException(klass + " does not implement " +
 406  0
             xface.getName());
 407  
       }
 408  0
       klassNames[i] = klasses[i].getName();
 409  
     }
 410  0
     setStrings(name, klassNames);
 411  0
   }
 412  
 
 413  
   /**
 414  
    * Does the job have a {@link VertexOutputFormat}?
 415  
    *
 416  
    * @return True iff a {@link VertexOutputFormat} has been specified.
 417  
    */
 418  
   public boolean hasVertexOutputFormat() {
 419  0
     return VERTEX_OUTPUT_FORMAT_CLASS.get(this) != null;
 420  
   }
 421  
 
 422  
   /**
 423  
    * Set the vertex output format class (optional)
 424  
    *
 425  
    * @param vertexOutputFormatClass Determines how graph is output
 426  
    */
 427  
   public final void setVertexOutputFormatClass(
 428  
       Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
 429  0
     VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
 430  0
   }
 431  
 
 432  
 
 433  
   /**
 434  
    * Does the job have a {@link EdgeOutputFormat} subdir?
 435  
    *
 436  
    * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
 437  
    */
 438  
   public boolean hasVertexOutputFormatSubdir() {
 439  0
     return !VERTEX_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
 440  
   }
 441  
 
 442  
   /**
 443  
    * Set the vertex output format path
 444  
    *
 445  
    * @param path path where the verteces will be written
 446  
    */
 447  
   public final void setVertexOutputFormatSubdir(String path) {
 448  0
     VERTEX_OUTPUT_FORMAT_SUBDIR.set(this, path);
 449  0
   }
 450  
 
 451  
   /**
 452  
    * Check if output should be done during computation
 453  
    *
 454  
    * @return True iff output should be done during computation
 455  
    */
 456  
   public final boolean doOutputDuringComputation() {
 457  0
     return DO_OUTPUT_DURING_COMPUTATION.get(this);
 458  
   }
 459  
 
 460  
   /**
 461  
    * Set whether or not we should do output during computation
 462  
    *
 463  
    * @param doOutputDuringComputation True iff we want output to happen
 464  
    *                                  during computation
 465  
    */
 466  
   public final void setDoOutputDuringComputation(
 467  
       boolean doOutputDuringComputation) {
 468  0
     DO_OUTPUT_DURING_COMPUTATION.set(this, doOutputDuringComputation);
 469  0
   }
 470  
 
 471  
   /**
 472  
    * Check if VertexOutputFormat is thread-safe
 473  
    *
 474  
    * @return True iff VertexOutputFormat is thread-safe
 475  
    */
 476  
   public final boolean vertexOutputFormatThreadSafe() {
 477  0
     return VERTEX_OUTPUT_FORMAT_THREAD_SAFE.get(this);
 478  
   }
 479  
 
 480  
   /**
 481  
    * Set whether or not selected VertexOutputFormat is thread-safe
 482  
    *
 483  
    * @param vertexOutputFormatThreadSafe True iff selected VertexOutputFormat
 484  
    *                                     is thread-safe
 485  
    */
 486  
   public final void setVertexOutputFormatThreadSafe(
 487  
       boolean vertexOutputFormatThreadSafe) {
 488  0
     VERTEX_OUTPUT_FORMAT_THREAD_SAFE.set(this, vertexOutputFormatThreadSafe);
 489  0
   }
 490  
 
 491  
   /**
 492  
    * Does the job have a {@link EdgeOutputFormat}?
 493  
    *
 494  
    * @return True iff a {@link EdgeOutputFormat} has been specified.
 495  
    */
 496  
   public boolean hasEdgeOutputFormat() {
 497  0
     return EDGE_OUTPUT_FORMAT_CLASS.get(this) != null;
 498  
   }
 499  
 
 500  
   /**
 501  
    * Set the edge output format class (optional)
 502  
    *
 503  
    * @param edgeOutputFormatClass Determines how graph is output
 504  
    */
 505  
   public final void setEdgeOutputFormatClass(
 506  
       Class<? extends EdgeOutputFormat> edgeOutputFormatClass) {
 507  0
     EDGE_OUTPUT_FORMAT_CLASS.set(this, edgeOutputFormatClass);
 508  0
   }
 509  
 
 510  
   /**
 511  
    * Does the job have a {@link EdgeOutputFormat} subdir?
 512  
    *
 513  
    * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
 514  
    */
 515  
   public boolean hasEdgeOutputFormatSubdir() {
 516  0
     return !EDGE_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
 517  
   }
 518  
 
 519  
   /**
 520  
    * Set the edge output format path
 521  
    *
 522  
    * @param path path where the edges will be written
 523  
    */
 524  
   public final void setEdgeOutputFormatSubdir(String path) {
 525  0
     EDGE_OUTPUT_FORMAT_SUBDIR.set(this, path);
 526  0
   }
 527  
 
 528  
   /**
 529  
    * Get the number of threads to use for writing output in the end of the
 530  
    * application. If output format is not thread safe, returns 1.
 531  
    *
 532  
    * @return Number of output threads
 533  
    */
 534  
   public final int getNumOutputThreads() {
 535  0
     if (!vertexOutputFormatThreadSafe()) {
 536  0
       return 1;
 537  
     } else {
 538  0
       return NUM_OUTPUT_THREADS.get(this);
 539  
     }
 540  
   }
 541  
 
 542  
   /**
 543  
    * Set the number of threads to use for writing output in the end of the
 544  
    * application. Will be used only if {#vertexOutputFormatThreadSafe} is true.
 545  
    *
 546  
    * @param numOutputThreads Number of output threads
 547  
    */
 548  
   public void setNumOutputThreads(int numOutputThreads) {
 549  0
     NUM_OUTPUT_THREADS.set(this, numOutputThreads);
 550  0
   }
 551  
 
 552  
   /**
 553  
    * Set the message combiner class (optional)
 554  
    *
 555  
    * @param messageCombinerClass Determines how vertex messages are combined
 556  
    */
 557  
   public void setMessageCombinerClass(
 558  
       Class<? extends MessageCombiner> messageCombinerClass) {
 559  0
     MESSAGE_COMBINER_CLASS.set(this, messageCombinerClass);
 560  0
   }
 561  
 
 562  
   /**
 563  
    * Set the graph partitioner class (optional)
 564  
    *
 565  
    * @param graphPartitionerFactoryClass Determines how the graph is partitioned
 566  
    */
 567  
   public final void setGraphPartitionerFactoryClass(
 568  
       Class<? extends GraphPartitionerFactory> graphPartitionerFactoryClass) {
 569  0
     GRAPH_PARTITIONER_FACTORY_CLASS.set(this, graphPartitionerFactoryClass);
 570  0
   }
 571  
 
 572  
   /**
 573  
    * Set the vertex resolver class (optional)
 574  
    *
 575  
    * @param vertexResolverClass Determines how vertex mutations are resolved
 576  
    */
 577  
   public final void setVertexResolverClass(
 578  
       Class<? extends VertexResolver> vertexResolverClass) {
 579  0
     VERTEX_RESOLVER_CLASS.set(this, vertexResolverClass);
 580  0
   }
 581  
 
 582  
   /**
 583  
    * Whether to create a vertex that doesn't exist when it receives messages.
 584  
    * This only affects DefaultVertexResolver.
 585  
    *
 586  
    * @return true if we should create non existent vertices that get messages.
 587  
    */
 588  
   public final boolean getResolverCreateVertexOnMessages() {
 589  0
     return RESOLVER_CREATE_VERTEX_ON_MSGS.get(this);
 590  
   }
 591  
 
 592  
   /**
 593  
    * Set whether to create non existent vertices when they receive messages.
 594  
    *
 595  
    * @param v true if we should create vertices when they get messages.
 596  
    */
 597  
   public final void setResolverCreateVertexOnMessages(boolean v) {
 598  0
     RESOLVER_CREATE_VERTEX_ON_MSGS.set(this, v);
 599  0
   }
 600  
 
 601  
   /**
 602  
    * Set the vertex value combiner class (optional)
 603  
    *
 604  
    * @param vertexValueCombinerClass Determines how vertices are combined
 605  
    */
 606  
   public final void setVertexValueCombinerClass(
 607  
       Class<? extends VertexValueCombiner> vertexValueCombinerClass) {
 608  0
     VERTEX_VALUE_COMBINER_CLASS.set(this, vertexValueCombinerClass);
 609  0
   }
 610  
 
 611  
   /**
 612  
    * Set the worker context class (optional)
 613  
    *
 614  
    * @param workerContextClass Determines what code is executed on a each
 615  
    *        worker before and after each superstep and computation
 616  
    */
 617  
   public final void setWorkerContextClass(
 618  
       Class<? extends WorkerContext> workerContextClass) {
 619  0
     WORKER_CONTEXT_CLASS.set(this, workerContextClass);
 620  0
   }
 621  
 
 622  
   /**
 623  
    * Set the aggregator writer class (optional)
 624  
    *
 625  
    * @param aggregatorWriterClass Determines how the aggregators are
 626  
    *        written to file at the end of the job
 627  
    */
 628  
   public final void setAggregatorWriterClass(
 629  
       Class<? extends AggregatorWriter> aggregatorWriterClass) {
 630  0
     AGGREGATOR_WRITER_CLASS.set(this, aggregatorWriterClass);
 631  0
   }
 632  
 
 633  
   /**
 634  
    * Set the partition class (optional)
 635  
    *
 636  
    * @param partitionClass Determines the why partitions are stored
 637  
    */
 638  
   public final void setPartitionClass(
 639  
       Class<? extends Partition> partitionClass) {
 640  0
     PARTITION_CLASS.set(this, partitionClass);
 641  0
   }
 642  
 
 643  
   /**
 644  
    * Set worker configuration for determining what is required for
 645  
    * a superstep.
 646  
    *
 647  
    * @param minWorkers Minimum workers to do a superstep
 648  
    * @param maxWorkers Maximum workers to do a superstep
 649  
    *        (max map tasks in job)
 650  
    * @param minPercentResponded 0 - 100 % of the workers required to
 651  
    *        have responded before continuing the superstep
 652  
    */
 653  
   public final void setWorkerConfiguration(int minWorkers,
 654  
                                            int maxWorkers,
 655  
                                            float minPercentResponded) {
 656  0
     setInt(MIN_WORKERS, minWorkers);
 657  0
     setInt(MAX_WORKERS, maxWorkers);
 658  0
     MIN_PERCENT_RESPONDED.set(this, minPercentResponded);
 659  0
   }
 660  
 
 661  
   public final int getMinWorkers() {
 662  0
     return getInt(MIN_WORKERS, -1);
 663  
   }
 664  
 
 665  
   public final int getMaxWorkers() {
 666  0
     return getInt(MAX_WORKERS, -1);
 667  
   }
 668  
 
 669  
   public final float getMinPercentResponded() {
 670  0
     return MIN_PERCENT_RESPONDED.get(this);
 671  
   }
 672  
 
 673  
   /**
 674  
    * How many mappers is job asking for, taking into account whether master
 675  
    * is running on the same mapper as worker or not
 676  
    *
 677  
    * @return How many mappers is job asking for
 678  
    */
 679  
   public final int getMaxMappers() {
 680  0
     return getMaxWorkers() + (SPLIT_MASTER_WORKER.get(this) ? 1 : 0);
 681  
   }
 682  
 
 683  
   /**
 684  
    * Utilize an existing ZooKeeper service.  If this is not set, ZooKeeper
 685  
    * will be dynamically started by Giraph for this job.
 686  
    *
 687  
    * @param serverList Comma separated list of servers and ports
 688  
    *        (i.e. zk1:2221,zk2:2221)
 689  
    */
 690  
   public final void setZooKeeperConfiguration(String serverList) {
 691  0
     ZOOKEEPER_LIST.set(this, serverList);
 692  0
   }
 693  
 
 694  
   /**
 695  
    * Getter for SPLIT_MASTER_WORKER flag.
 696  
    *
 697  
    * @return boolean flag value.
 698  
    */
 699  
   public final boolean getSplitMasterWorker() {
 700  0
     return SPLIT_MASTER_WORKER.get(this);
 701  
   }
 702  
 
 703  
   /**
 704  
    * Get array of MasterObserver classes set in the configuration.
 705  
    *
 706  
    * @return array of MasterObserver classes.
 707  
    */
 708  
   public Class<? extends MasterObserver>[] getMasterObserverClasses() {
 709  0
     return MASTER_OBSERVER_CLASSES.getArray(this);
 710  
   }
 711  
 
 712  
   /**
 713  
    * Get array of WorkerObserver classes set in configuration.
 714  
    *
 715  
    * @return array of WorkerObserver classes.
 716  
    */
 717  
   public Class<? extends WorkerObserver>[] getWorkerObserverClasses() {
 718  0
     return WORKER_OBSERVER_CLASSES.getArray(this);
 719  
   }
 720  
 
 721  
   /**
 722  
    * Get array of MapperObserver classes set in configuration.
 723  
    *
 724  
    * @return array of MapperObserver classes.
 725  
    */
 726  
   public Class<? extends MapperObserver>[] getMapperObserverClasses() {
 727  0
     return MAPPER_OBSERVER_CLASSES.getArray(this);
 728  
   }
 729  
 
 730  
   /**
 731  
    * Get array of GcObserver classes set in configuration.
 732  
    *
 733  
    * @return array of GcObserver classes.
 734  
    */
 735  
   public Class<? extends GcObserver>[] getGcObserverClasses() {
 736  0
     return GC_OBSERVER_CLASSES.getArray(this);
 737  
   }
 738  
 
 739  
   /**
 740  
    * Whether to track, print, and aggregate metrics.
 741  
    *
 742  
    * @return true if metrics are enabled, false otherwise (default)
 743  
    */
 744  
   public boolean metricsEnabled() {
 745  0
     return METRICS_ENABLE.isTrue(this);
 746  
   }
 747  
 
 748  
   /**
 749  
    * Get the task partition
 750  
    *
 751  
    * @return The task partition or -1 if not set
 752  
    */
 753  
   public int getTaskPartition() {
 754  0
     return getInt("mapred.task.partition", -1);
 755  
   }
 756  
 
 757  
   /**
 758  
    * Is this a "pure YARN" Giraph job, or is a MapReduce layer (v1 or v2)
 759  
    * actually managing our cluster nodes, i.e. each task is a Mapper.
 760  
    *
 761  
    * @return TRUE if this is a pure YARN job.
 762  
    */
 763  
   public boolean isPureYarnJob() {
 764  0
     return IS_PURE_YARN_JOB.get(this);
 765  
   }
 766  
 
 767  
   /**
 768  
    * Jars required in "Pure YARN" jobs (names only, no paths) should
 769  
    * be listed here in full, including Giraph framework jar(s).
 770  
    *
 771  
    * @return the comma-separated list of jar names for export to cluster.
 772  
    */
 773  
   public String getYarnLibJars() {
 774  0
     return GIRAPH_YARN_LIBJARS.get(this);
 775  
   }
 776  
 
 777  
   /**
 778  
    * Populate jar list for Pure YARN jobs.
 779  
    *
 780  
    * @param jarList a comma-separated list of jar names
 781  
    */
 782  
   public void setYarnLibJars(String jarList) {
 783  0
     GIRAPH_YARN_LIBJARS.set(this, jarList);
 784  0
   }
 785  
 
 786  
   /**
 787  
    * Get heap size (in MB) for each task in our Giraph job run,
 788  
    * assuming this job will run on the "pure YARN" profile.
 789  
    *
 790  
    * @return the heap size for all tasks, in MB
 791  
    */
 792  
   public int getYarnTaskHeapMb() {
 793  0
     return GIRAPH_YARN_TASK_HEAP_MB.get(this);
 794  
   }
 795  
 
 796  
   /**
 797  
    * Set heap size for Giraph tasks in our job run, assuming
 798  
    * the job will run on the "pure YARN" profile.
 799  
    *
 800  
    * @param heapMb the heap size for all tasks
 801  
    */
 802  
   public void setYarnTaskHeapMb(int heapMb) {
 803  0
     GIRAPH_YARN_TASK_HEAP_MB.set(this, heapMb);
 804  0
   }
 805  
 
 806  
   /**
 807  
    * Get the ZooKeeper list.
 808  
    *
 809  
    * @return ZooKeeper list of strings, comma separated or null if none set.
 810  
    */
 811  
   public String getZookeeperList() {
 812  0
     return ZOOKEEPER_LIST.get(this);
 813  
   }
 814  
 
 815  
   /**
 816  
    * Set the ZooKeeper list to the provided list. This method is used when the
 817  
    * ZooKeeper is started internally and will set the zkIsExternal option to
 818  
    * false as well.
 819  
    *
 820  
    * @param zkList list of strings, comma separated of zookeeper servers
 821  
    */
 822  
   public void setZookeeperList(String zkList) {
 823  0
     ZOOKEEPER_LIST.set(this, zkList);
 824  0
     ZOOKEEPER_IS_EXTERNAL.set(this, false);
 825  0
   }
 826  
 
 827  
   /**
 828  
    * Was ZooKeeper provided externally?
 829  
    *
 830  
    * @return true iff was zookeeper is external
 831  
    */
 832  
   public boolean isZookeeperExternal() {
 833  0
     return ZOOKEEPER_IS_EXTERNAL.get(this);
 834  
   }
 835  
 
 836  
   public String getLocalLevel() {
 837  0
     return LOG_LEVEL.get(this);
 838  
   }
 839  
 
 840  
   /**
 841  
    * Use the log thread layout option?
 842  
    *
 843  
    * @return True if use the log thread layout option, false otherwise
 844  
    */
 845  
   public boolean useLogThreadLayout() {
 846  0
     return LOG_THREAD_LAYOUT.get(this);
 847  
   }
 848  
 
 849  
   /**
 850  
    * is this job run a local test?
 851  
    *
 852  
    * @return the test status as recorded in the Configuration
 853  
    */
 854  
   public boolean getLocalTestMode() {
 855  0
     return LOCAL_TEST_MODE.get(this);
 856  
   }
 857  
 
 858  
   /**
 859  
    * Flag this job as a local test run.
 860  
    *
 861  
    * @param flag the test status for this job
 862  
    */
 863  
   public void setLocalTestMode(boolean flag) {
 864  0
     LOCAL_TEST_MODE.set(this, flag);
 865  0
   }
 866  
 
 867  
   public int getZooKeeperSessionTimeout() {
 868  0
     return ZOOKEEPER_SESSION_TIMEOUT.get(this);
 869  
   }
 870  
 
 871  
   public int getZookeeperOpsMaxAttempts() {
 872  0
     return ZOOKEEPER_OPS_MAX_ATTEMPTS.get(this);
 873  
   }
 874  
 
 875  
   public int getZookeeperOpsRetryWaitMsecs() {
 876  0
     return ZOOKEEPER_OPS_RETRY_WAIT_MSECS.get(this);
 877  
   }
 878  
 
 879  
   public boolean getNettyServerUseExecutionHandler() {
 880  0
     return NETTY_SERVER_USE_EXECUTION_HANDLER.get(this);
 881  
   }
 882  
 
 883  
   public int getNettyServerThreads() {
 884  0
     return NETTY_SERVER_THREADS.get(this);
 885  
   }
 886  
 
 887  
   public int getNettyServerExecutionThreads() {
 888  0
     return NETTY_SERVER_EXECUTION_THREADS.get(this);
 889  
   }
 890  
 
 891  
   /**
 892  
    * Get the netty server execution concurrency.  This depends on whether the
 893  
    * netty server execution handler exists.
 894  
    *
 895  
    * @return Server concurrency
 896  
    */
 897  
   public int getNettyServerExecutionConcurrency() {
 898  0
     if (getNettyServerUseExecutionHandler()) {
 899  0
       return getNettyServerExecutionThreads();
 900  
     } else {
 901  0
       return getNettyServerThreads();
 902  
     }
 903  
   }
 904  
 
 905  
   /**
 906  
    * Used by netty client and server to create ByteBufAllocator
 907  
    *
 908  
    * @return ByteBufAllocator
 909  
    */
 910  
   public ByteBufAllocator getNettyAllocator() {
 911  0
     if (nettyBufferAllocator == null) {
 912  0
       if (NETTY_USE_POOLED_ALLOCATOR.get(this)) { // Use pooled allocator
 913  0
         nettyBufferAllocator = new PooledByteBufAllocator(
 914  0
           NETTY_USE_DIRECT_MEMORY.get(this));
 915  
       } else { // Use un-pooled allocator
 916  
         // Note: Current default settings create un-pooled heap allocator
 917  0
         nettyBufferAllocator = new UnpooledByteBufAllocator(
 918  0
             NETTY_USE_DIRECT_MEMORY.get(this));
 919  
       }
 920  
     }
 921  0
     return nettyBufferAllocator;
 922  
   }
 923  
 
 924  
   public int getZookeeperConnectionAttempts() {
 925  0
     return ZOOKEEPER_CONNECTION_ATTEMPTS.get(this);
 926  
   }
 927  
 
 928  
   public int getZooKeeperMinSessionTimeout() {
 929  0
     return ZOOKEEPER_MIN_SESSION_TIMEOUT.get(this);
 930  
   }
 931  
 
 932  
   public int getZooKeeperMaxSessionTimeout() {
 933  0
     return ZOOKEEPER_MAX_SESSION_TIMEOUT.get(this);
 934  
   }
 935  
 
 936  
   /**
 937  
    * Get the number of map tasks in this job
 938  
    *
 939  
    * @return Number of map tasks in this job
 940  
    */
 941  
   public int getMapTasks() {
 942  0
     int mapTasks = getInt("mapred.map.tasks", -1);
 943  0
     if (mapTasks == -1) {
 944  0
       throw new IllegalStateException("getMapTasks: Failed to get the map " +
 945  
           "tasks!");
 946  
     }
 947  0
     return mapTasks;
 948  
   }
 949  
 
 950  
   /**
 951  
    * Use authentication? (if supported)
 952  
    *
 953  
    * @return True if should authenticate, false otherwise
 954  
    */
 955  
   public boolean authenticate() {
 956  0
     return AUTHENTICATE.get(this);
 957  
   }
 958  
 
 959  
   /**
 960  
    * Set the number of compute threads
 961  
    *
 962  
    * @param numComputeThreads Number of compute threads to use
 963  
    */
 964  
   public void setNumComputeThreads(int numComputeThreads) {
 965  0
     NUM_COMPUTE_THREADS.set(this, numComputeThreads);
 966  0
   }
 967  
 
 968  
   public int getNumComputeThreads() {
 969  0
     return NUM_COMPUTE_THREADS.get(this);
 970  
   }
 971  
 
 972  
   /**
 973  
    * Set the number of input split threads
 974  
    *
 975  
    * @param numInputSplitsThreads Number of input split threads to use
 976  
    */
 977  
   public void setNumInputSplitsThreads(int numInputSplitsThreads) {
 978  0
     NUM_INPUT_THREADS.set(this, numInputSplitsThreads);
 979  0
   }
 980  
 
 981  
   public int getNumInputSplitsThreads() {
 982  0
     return NUM_INPUT_THREADS.get(this);
 983  
   }
 984  
 
 985  
   public long getInputSplitMaxVertices() {
 986  0
     return INPUT_SPLIT_MAX_VERTICES.get(this);
 987  
   }
 988  
 
 989  
   public long getInputSplitMaxEdges() {
 990  0
     return INPUT_SPLIT_MAX_EDGES.get(this);
 991  
   }
 992  
 
 993  
   /**
 994  
    * Set whether to use unsafe serialization
 995  
    *
 996  
    * @param useUnsafeSerialization If true, use unsafe serialization
 997  
    */
 998  
   public void useUnsafeSerialization(boolean useUnsafeSerialization) {
 999  0
     USE_UNSAFE_SERIALIZATION.set(this, useUnsafeSerialization);
 1000  0
   }
 1001  
 
 1002  
   /**
 1003  
    * Set the checkpoint frequeuncy of how many supersteps to wait before
 1004  
    * checkpointing
 1005  
    *
 1006  
    * @param checkpointFrequency How often to checkpoint (0 means never)
 1007  
    */
 1008  
   public void setCheckpointFrequency(int checkpointFrequency) {
 1009  0
     CHECKPOINT_FREQUENCY.set(this, checkpointFrequency);
 1010  0
   }
 1011  
 
 1012  
   /**
 1013  
    * Get the checkpoint frequeuncy of how many supersteps to wait
 1014  
    * before checkpointing
 1015  
    *
 1016  
    * @return Checkpoint frequency (0 means never)
 1017  
    */
 1018  
   public int getCheckpointFrequency() {
 1019  0
     return CHECKPOINT_FREQUENCY.get(this);
 1020  
   }
 1021  
 
 1022  
   /**
 1023  
    * Check if checkpointing is used
 1024  
    *
 1025  
    * @return True iff checkpointing is used
 1026  
    */
 1027  
   public boolean useCheckpointing() {
 1028  0
     return getCheckpointFrequency() != 0;
 1029  
   }
 1030  
 
 1031  
   /**
 1032  
    * Set runtime checkpoint support checker.
 1033  
    * The instance of this class will have to decide whether
 1034  
    * checkpointing is allowed on current superstep.
 1035  
    *
 1036  
    * @param clazz checkpoint supported checker class
 1037  
    */
 1038  
   public void setCheckpointSupportedChecker(
 1039  
       Class<? extends CheckpointSupportedChecker> clazz) {
 1040  0
     GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.set(this, clazz);
 1041  0
   }
 1042  
 
 1043  
   /**
 1044  
    * Set the max task attempts
 1045  
    *
 1046  
    * @param maxTaskAttempts Max task attempts to use
 1047  
    */
 1048  
   public void setMaxTaskAttempts(int maxTaskAttempts) {
 1049  0
     MAX_TASK_ATTEMPTS.set(this, maxTaskAttempts);
 1050  0
   }
 1051  
 
 1052  
   /**
 1053  
    * Get the max task attempts
 1054  
    *
 1055  
    * @return Max task attempts or -1, if not set
 1056  
    */
 1057  
   public int getMaxTaskAttempts() {
 1058  0
     return MAX_TASK_ATTEMPTS.get(this);
 1059  
   }
 1060  
 
 1061  
   /**
 1062  
    * Get the number of milliseconds to wait for an event before continuing on
 1063  
    *
 1064  
    * @return Number of milliseconds to wait for an event before continuing on
 1065  
    */
 1066  
   public int getEventWaitMsecs() {
 1067  0
     return EVENT_WAIT_MSECS.get(this);
 1068  
   }
 1069  
 
 1070  
   /**
 1071  
    * Set the number of milliseconds to wait for an event before continuing on
 1072  
    *
 1073  
    * @param eventWaitMsecs Number of milliseconds to wait for an event before
 1074  
    *                       continuing on
 1075  
    */
 1076  
   public void setEventWaitMsecs(int eventWaitMsecs) {
 1077  0
     EVENT_WAIT_MSECS.set(this, eventWaitMsecs);
 1078  0
   }
 1079  
 
 1080  
   /**
 1081  
    * Get the maximum milliseconds to wait before giving up trying to get the
 1082  
    * minimum number of workers before a superstep.
 1083  
    *
 1084  
    * @return Maximum milliseconds to wait before giving up trying to get the
 1085  
    *         minimum number of workers before a superstep
 1086  
    */
 1087  
   public int getMaxMasterSuperstepWaitMsecs() {
 1088  0
     return MAX_MASTER_SUPERSTEP_WAIT_MSECS.get(this);
 1089  
   }
 1090  
 
 1091  
   public int getMaxCounterWaitMsecs() {
 1092  0
     return MAX_COUNTER_WAIT_MSECS.get(this);
 1093  
   }
 1094  
 
 1095  
   /**
 1096  
    * Set the maximum milliseconds to wait before giving up trying to get the
 1097  
    * minimum number of workers before a superstep.
 1098  
    *
 1099  
    * @param maxMasterSuperstepWaitMsecs Maximum milliseconds to wait before
 1100  
    *                                    giving up trying to get the minimum
 1101  
    *                                    number of workers before a superstep
 1102  
    */
 1103  
   public void setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) {
 1104  0
     MAX_MASTER_SUPERSTEP_WAIT_MSECS.set(this, maxMasterSuperstepWaitMsecs);
 1105  0
   }
 1106  
 
 1107  
   /**
 1108  
    * Check environment for Hadoop security token location in case we are
 1109  
    * executing the Giraph job on a MRv1 Hadoop cluster.
 1110  
    */
 1111  
   public void configureHadoopSecurity() {
 1112  0
     String hadoopTokenFilePath = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
 1113  0
     if (hadoopTokenFilePath != null) {
 1114  0
       set("mapreduce.job.credentials.binary", hadoopTokenFilePath);
 1115  
     }
 1116  0
   }
 1117  
 
 1118  
   /**
 1119  
    * Check if we want to prioritize input splits which reside on the host.
 1120  
    *
 1121  
    * @return True iff we want to use input split locality
 1122  
    */
 1123  
   public boolean useInputSplitLocality() {
 1124  0
     return USE_INPUT_SPLIT_LOCALITY.get(this);
 1125  
   }
 1126  
 
 1127  
   /**
 1128  
    * Get the local hostname on the given interface.
 1129  
    *
 1130  
    * @return The local hostname
 1131  
    * @throws UnknownHostException IP address of a host could not be determined
 1132  
    */
 1133  
   public String getLocalHostname() throws UnknownHostException {
 1134  0
     return DNS.getDefaultHost(
 1135  0
         GiraphConstants.DNS_INTERFACE.get(this),
 1136  0
         GiraphConstants.DNS_NAMESERVER.get(this)).toLowerCase();
 1137  
   }
 1138  
 
 1139  
   /**
 1140  
    * Return local host name by default. Or local host IP if preferIP
 1141  
    * option is set.
 1142  
    * @return local host name or IP
 1143  
    * @throws UnknownHostException IP address of a host could not be determined
 1144  
    */
 1145  
   public String getLocalHostOrIp() throws UnknownHostException {
 1146  0
     if (GiraphConstants.PREFER_IP_ADDRESSES.get(this)) {
 1147  0
       return InetAddress.getLocalHost().getHostAddress();
 1148  
     }
 1149  0
     return getLocalHostname();
 1150  
   }
 1151  
 
 1152  
   /**
 1153  
    * Set the maximum number of supersteps of this application.  After this
 1154  
    * many supersteps are executed, the application will shutdown.
 1155  
    *
 1156  
    * @param maxNumberOfSupersteps Maximum number of supersteps
 1157  
    */
 1158  
   public void setMaxNumberOfSupersteps(int maxNumberOfSupersteps) {
 1159  0
     MAX_NUMBER_OF_SUPERSTEPS.set(this, maxNumberOfSupersteps);
 1160  0
   }
 1161  
 
 1162  
   /**
 1163  
    * Get the maximum number of supersteps of this application.  After this
 1164  
    * many supersteps are executed, the application will shutdown.
 1165  
    *
 1166  
    * @return Maximum number of supersteps
 1167  
    */
 1168  
   public int getMaxNumberOfSupersteps() {
 1169  0
     return MAX_NUMBER_OF_SUPERSTEPS.get(this);
 1170  
   }
 1171  
 
 1172  
   /**
 1173  
    * Get the output directory to write YourKit snapshots to
 1174  
    *
 1175  
    * @param context Map context
 1176  
    * @return output directory
 1177  
    */
 1178  
   public String getYourKitOutputDir(Mapper.Context context) {
 1179  0
     final String cacheKey = "giraph.yourkit.outputDirCached";
 1180  0
     String outputDir = get(cacheKey);
 1181  0
     if (outputDir == null) {
 1182  0
       outputDir = getStringVars(YOURKIT_OUTPUT_DIR, YOURKIT_OUTPUT_DIR_DEFAULT,
 1183  
           context);
 1184  0
       set(cacheKey, outputDir);
 1185  
     }
 1186  0
     return outputDir;
 1187  
   }
 1188  
 
 1189  
   /**
 1190  
    * Get string, replacing variables in the output.
 1191  
    *
 1192  
    * %JOB_ID% =&gt; job id
 1193  
    * %TASK_ID% =&gt; task id
 1194  
    * %USER% =&gt; owning user name
 1195  
    *
 1196  
    * @param key name of key to lookup
 1197  
    * @param context mapper context
 1198  
    * @return value for key, with variables expanded
 1199  
    */
 1200  
   public String getStringVars(String key, Mapper.Context context) {
 1201  0
     return getStringVars(key, null, context);
 1202  
   }
 1203  
 
 1204  
   /**
 1205  
    * Get string, replacing variables in the output.
 1206  
    *
 1207  
    * %JOB_ID% =&gt; job id
 1208  
    * %TASK_ID% =&gt; task id
 1209  
    * %USER% =&gt; owning user name
 1210  
    *
 1211  
    * @param key name of key to lookup
 1212  
    * @param defaultValue value to return if no mapping exists. This can also
 1213  
    *                     have variables, which will be substituted.
 1214  
    * @param context mapper context
 1215  
    * @return value for key, with variables expanded
 1216  
    */
 1217  
   public String getStringVars(String key, String defaultValue,
 1218  
                               Mapper.Context context) {
 1219  0
     String value = get(key);
 1220  0
     if (value == null) {
 1221  0
       if (defaultValue == null) {
 1222  0
         return null;
 1223  
       }
 1224  0
       value = defaultValue;
 1225  
     }
 1226  0
     value = value.replace("%JOB_ID%", context.getJobID().toString());
 1227  0
     value = value.replace("%TASK_ID%", context.getTaskAttemptID().toString());
 1228  0
     value = value.replace("%USER%", get("user.name", "unknown_user"));
 1229  0
     return value;
 1230  
   }
 1231  
 
 1232  
   /**
 1233  
    * Get option whether to create a source vertex present only in edge input
 1234  
    *
 1235  
    * @return CREATE_EDGE_SOURCE_VERTICES option
 1236  
    */
 1237  
   public boolean getCreateSourceVertex() {
 1238  0
     return CREATE_EDGE_SOURCE_VERTICES.get(this);
 1239  
   }
 1240  
 
 1241  
   /**
 1242  
    * set option whether to create a source vertex present only in edge input
 1243  
    * @param createVertex create source vertex option
 1244  
    */
 1245  
   public void setCreateSourceVertex(boolean createVertex) {
 1246  0
     CREATE_EDGE_SOURCE_VERTICES.set(this, createVertex);
 1247  0
   }
 1248  
 
 1249  
   /**
 1250  
    * Get the maximum timeout (in milliseconds) for waiting for all tasks
 1251  
    * to complete after the job is done.
 1252  
    *
 1253  
    * @return Wait task done timeout in milliseconds.
 1254  
    */
 1255  
   public int getWaitTaskDoneTimeoutMs() {
 1256  0
     return WAIT_TASK_DONE_TIMEOUT_MS.get(this);
 1257  
   }
 1258  
 
 1259  
   /**
 1260  
    * Set the maximum timeout (in milliseconds) for waiting for all tasks
 1261  
    * to complete after the job is done.
 1262  
    *
 1263  
    * @param ms Milliseconds to set
 1264  
    */
 1265  
   public void setWaitTaskDoneTimeoutMs(int ms) {
 1266  0
     WAIT_TASK_DONE_TIMEOUT_MS.set(this, ms);
 1267  0
   }
 1268  
 
 1269  
   /**
 1270  
    * Check whether to track job progress on client or not
 1271  
    *
 1272  
    * @return True if job progress should be tracked on client
 1273  
    */
 1274  
   public boolean trackJobProgressOnClient() {
 1275  0
     return TRACK_JOB_PROGRESS_ON_CLIENT.get(this);
 1276  
   }
 1277  
 
 1278  
   /**
 1279  
    * @return Number of retries when creating an HDFS file before failing.
 1280  
    */
 1281  
   public int getHdfsFileCreationRetries() {
 1282  0
     return HDFS_FILE_CREATION_RETRIES.get(this);
 1283  
   }
 1284  
 
 1285  
   /**
 1286  
    * @return Milliseconds to wait before retrying an HDFS file creation
 1287  
    *         operation.
 1288  
    */
 1289  
   public int getHdfsFileCreationRetryWaitMs() {
 1290  0
     return HDFS_FILE_CREATION_RETRY_WAIT_MS.get(this);
 1291  
   }
 1292  
 }