Coverage Report - org.apache.giraph.conf.ImmutableClassesGiraphConfiguration
 
Classes in this File Line Coverage Branch Coverage Complexity
ImmutableClassesGiraphConfiguration
0%
0/238
0%
0/76
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.conf;
 20  
 
 21  
 import io.netty.handler.codec.ByteToMessageDecoder;
 22  
 import io.netty.handler.codec.MessageToByteEncoder;
 23  
 import io.netty.handler.codec.compression.JdkZlibDecoder;
 24  
 import io.netty.handler.codec.compression.JdkZlibEncoder;
 25  
 import io.netty.handler.codec.compression.SnappyFramedDecoder;
 26  
 import io.netty.handler.codec.compression.SnappyFramedEncoder;
 27  
 
 28  
 import org.apache.giraph.aggregators.AggregatorWriter;
 29  
 import org.apache.giraph.combiner.MessageCombiner;
 30  
 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 31  
 import org.apache.giraph.edge.Edge;
 32  
 import org.apache.giraph.edge.EdgeFactory;
 33  
 import org.apache.giraph.edge.EdgeStoreFactory;
 34  
 import org.apache.giraph.edge.OutEdges;
 35  
 import org.apache.giraph.edge.ReusableEdge;
 36  
 import org.apache.giraph.factories.ComputationFactory;
 37  
 import org.apache.giraph.factories.EdgeValueFactory;
 38  
 import org.apache.giraph.factories.MessageValueFactory;
 39  
 import org.apache.giraph.factories.OutEdgesFactory;
 40  
 import org.apache.giraph.factories.ValueFactories;
 41  
 import org.apache.giraph.factories.VertexIdFactory;
 42  
 import org.apache.giraph.factories.VertexValueFactory;
 43  
 import org.apache.giraph.graph.Computation;
 44  
 import org.apache.giraph.graph.Language;
 45  
 import org.apache.giraph.graph.MapperObserver;
 46  
 import org.apache.giraph.graph.Vertex;
 47  
 import org.apache.giraph.graph.VertexResolver;
 48  
 import org.apache.giraph.graph.VertexValueCombiner;
 49  
 import org.apache.giraph.io.EdgeInputFormat;
 50  
 import org.apache.giraph.io.EdgeOutputFormat;
 51  
 import org.apache.giraph.io.MappingInputFormat;
 52  
 import org.apache.giraph.io.VertexInputFormat;
 53  
 import org.apache.giraph.io.VertexOutputFormat;
 54  
 import org.apache.giraph.io.filters.EdgeInputFilter;
 55  
 import org.apache.giraph.io.filters.VertexInputFilter;
 56  
 import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
 57  
 import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
 58  
 import org.apache.giraph.io.internal.WrappedMappingInputFormat;
 59  
 import org.apache.giraph.io.internal.WrappedVertexInputFormat;
 60  
 import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
 61  
 import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
 62  
 import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput;
 63  
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
 64  
 import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
 65  
 import org.apache.giraph.job.GiraphJobObserver;
 66  
 import org.apache.giraph.job.GiraphJobRetryChecker;
 67  
 import org.apache.giraph.mapping.MappingStore;
 68  
 import org.apache.giraph.mapping.MappingStoreOps;
 69  
 import org.apache.giraph.mapping.translate.TranslateEdge;
 70  
 import org.apache.giraph.master.MasterCompute;
 71  
 import org.apache.giraph.master.MasterObserver;
 72  
 import org.apache.giraph.master.SuperstepClasses;
 73  
 import org.apache.giraph.partition.GraphPartitionerFactory;
 74  
 import org.apache.giraph.partition.Partition;
 75  
 import org.apache.giraph.utils.ExtendedByteArrayDataInput;
 76  
 import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
 77  
 import org.apache.giraph.utils.ExtendedDataInput;
 78  
 import org.apache.giraph.utils.ExtendedDataOutput;
 79  
 import org.apache.giraph.utils.GcObserver;
 80  
 import org.apache.giraph.utils.ReflectionUtils;
 81  
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 82  
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
 83  
 import org.apache.giraph.utils.WritableUtils;
 84  
 import org.apache.giraph.utils.io.BigDataInputOutput;
 85  
 import org.apache.giraph.utils.io.DataInputOutput;
 86  
 import org.apache.giraph.utils.io.ExtendedDataInputOutput;
 87  
 import org.apache.giraph.worker.WorkerContext;
 88  
 import org.apache.giraph.worker.WorkerObserver;
 89  
 import org.apache.hadoop.conf.Configuration;
 90  
 import org.apache.hadoop.io.NullWritable;
 91  
 import org.apache.hadoop.io.Writable;
 92  
 import org.apache.hadoop.io.WritableComparable;
 93  
 import org.apache.hadoop.mapreduce.Mapper;
 94  
 import org.apache.hadoop.util.Progressable;
 95  
 
 96  
 import com.google.common.base.Preconditions;
 97  
 
 98  
 
 99  
 /**
 100  
  * The classes set here are immutable, the remaining configuration is mutable.
 101  
  * Classes are immutable and final to provide the best performance for
 102  
  * instantiation.  Everything is thread-safe.
 103  
  *
 104  
  * @param <I> Vertex id
 105  
  * @param <V> Vertex data
 106  
  * @param <E> Edge data
 107  
  */
 108  
 @SuppressWarnings("unchecked")
 109  
 public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
 110  
     V extends Writable, E extends Writable> extends GiraphConfiguration {
 111  
   /** Holder for all the classes */
 112  
   private final GiraphClasses classes;
 113  
   /** Mapping target class */
 114  0
   private Class<? extends Writable> mappingTargetClass = null;
 115  
   /** Value (IVEMM) Factories */
 116  
   private final ValueFactories<I, V, E> valueFactories;
 117  
   /** Factory to create {@link OutEdges} for computation */
 118  
   private final OutEdgesFactory<I, E> outEdgesFactory;
 119  
   /** Factory to create {@link OutEdges} for input */
 120  
   private final OutEdgesFactory<I, E> inputOutEdgesFactory;
 121  
   /** Language values (IVEMM) are implemented in */
 122  
   private final PerGraphTypeEnum<Language> valueLanguages;
 123  
   /** Whether values (IVEMM) need Jython wrappers */
 124  
   private final PerGraphTypeBoolean valueNeedsWrappers;
 125  
 
 126  
 
 127  
   /**
 128  
    * Use unsafe serialization? Cached for fast access to instantiate the
 129  
    * extended data input/output classes
 130  
    */
 131  
   private final boolean useUnsafeSerialization;
 132  
   /**
 133  
    * Use BigDataIO for messages? Cached for fast access to instantiate the
 134  
    * extended data input/output classes for messages
 135  
    */
 136  
   private final boolean useBigDataIOForMessages;
 137  
   /** Is the graph static (meaning there is no mutation)? */
 138  
   private final boolean isStaticGraph;
 139  
   /** Whether or not to use message size encoding */
 140  
   private final boolean useMessageSizeEncoding;
 141  
 
 142  
   /**
 143  
    * Constructor.  Takes the configuration and then gets the classes out of
 144  
    * them for Giraph
 145  
    *
 146  
    * @param conf Configuration
 147  
    */
 148  
   public ImmutableClassesGiraphConfiguration(Configuration conf) {
 149  0
     super(conf);
 150  0
     classes = new GiraphClasses<I, V, E>(conf);
 151  0
     useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
 152  0
     useBigDataIOForMessages = USE_BIG_DATA_IO_FOR_MESSAGES.get(this);
 153  0
     valueLanguages = PerGraphTypeEnum.readFromConf(
 154  
         GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
 155  0
     valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
 156  
         GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf);
 157  0
     isStaticGraph = GiraphConstants.STATIC_GRAPH.get(this);
 158  0
     valueFactories = new ValueFactories<I, V, E>(this);
 159  0
     outEdgesFactory = VERTEX_EDGES_FACTORY_CLASS.newInstance(this);
 160  0
     inputOutEdgesFactory = INPUT_VERTEX_EDGES_FACTORY_CLASS.newInstance(this);
 161  0
     useMessageSizeEncoding = USE_MESSAGE_SIZE_ENCODING.get(conf);
 162  0
   }
 163  
 
 164  
   /**
 165  
    * Configure an object with this instance if the object is configurable.
 166  
    *
 167  
    * @param obj Object
 168  
    */
 169  
   public void configureIfPossible(Object obj) {
 170  0
     if (obj instanceof GiraphConfigurationSettable) {
 171  0
       ((GiraphConfigurationSettable) obj).setConf(this);
 172  
     }
 173  0
   }
 174  
 
 175  
   public PerGraphTypeBoolean getValueNeedsWrappers() {
 176  0
     return valueNeedsWrappers;
 177  
   }
 178  
 
 179  
   public PerGraphTypeEnum<Language> getValueLanguages() {
 180  0
     return valueLanguages;
 181  
   }
 182  
 
 183  
   /**
 184  
    * Get the class used for edge translation during vertex input
 185  
    *
 186  
    * @return edge translation class
 187  
    */
 188  
   public Class<? extends TranslateEdge> edgeTranslationClass() {
 189  0
     return EDGE_TRANSLATION_CLASS.get(this);
 190  
   }
 191  
 
 192  
   /**
 193  
    * Instance of TranslateEdge that contains helper method for edge translation
 194  
    *
 195  
    * @return instance of TranslateEdge
 196  
    */
 197  
   public TranslateEdge<I, E> edgeTranslationInstance() {
 198  0
     if (edgeTranslationClass() != null) {
 199  0
       return ReflectionUtils.newInstance(edgeTranslationClass(), this);
 200  
     }
 201  0
     return null;
 202  
   }
 203  
 
 204  
   /**
 205  
    * Get the vertex input filter class
 206  
    *
 207  
    * @return VertexInputFilter class
 208  
    */
 209  
   public Class<? extends EdgeInputFilter<I, E>>
 210  
   getEdgeInputFilterClass() {
 211  0
     return classes.getEdgeInputFilterClass();
 212  
   }
 213  
 
 214  
   /**
 215  
    * Get the edge input filter to use
 216  
    *
 217  
    * @return EdgeInputFilter
 218  
    */
 219  
   public EdgeInputFilter getEdgeInputFilter() {
 220  0
     return ReflectionUtils.newInstance(getEdgeInputFilterClass(), this);
 221  
   }
 222  
 
 223  
   /**
 224  
    * Get the vertex input filter class
 225  
    *
 226  
    * @return VertexInputFilter class
 227  
    */
 228  
   public Class<? extends VertexInputFilter<I, V, E>>
 229  
   getVertexInputFilterClass() {
 230  0
     return classes.getVertexInputFilterClass();
 231  
   }
 232  
 
 233  
   /**
 234  
    * Get the vertex input filter to use
 235  
    *
 236  
    * @return VertexInputFilter
 237  
    */
 238  
   public VertexInputFilter getVertexInputFilter() {
 239  0
     return ReflectionUtils.newInstance(getVertexInputFilterClass(), this);
 240  
   }
 241  
 
 242  
   /**
 243  
    * Get the user's subclassed
 244  
    * {@link org.apache.giraph.partition.GraphPartitionerFactory}.
 245  
    *
 246  
    * @return User's graph partitioner
 247  
    */
 248  
   public Class<? extends GraphPartitionerFactory<I, V, E>>
 249  
   getGraphPartitionerClass() {
 250  0
     return classes.getGraphPartitionerFactoryClass();
 251  
   }
 252  
 
 253  
   /**
 254  
    * Create a user graph partitioner class
 255  
    *
 256  
    * @return Instantiated user graph partitioner class
 257  
    */
 258  
   public GraphPartitionerFactory<I, V, E> createGraphPartitioner() {
 259  0
     Class<? extends GraphPartitionerFactory<I, V, E>> klass =
 260  0
         classes.getGraphPartitionerFactoryClass();
 261  0
     return ReflectionUtils.newInstance(klass, this);
 262  
   }
 263  
 
 264  
   @Override
 265  
   public boolean hasVertexInputFormat() {
 266  0
     return classes.hasVertexInputFormat();
 267  
   }
 268  
 
 269  
   /**
 270  
    * Get the user's subclassed
 271  
    * {@link org.apache.giraph.io.VertexInputFormat}.
 272  
    *
 273  
    * @return User's vertex input format class
 274  
    */
 275  
   public Class<? extends VertexInputFormat<I, V, E>>
 276  
   getVertexInputFormatClass() {
 277  0
     return classes.getVertexInputFormatClass();
 278  
   }
 279  
 
 280  
   /**
 281  
    * Create a user vertex input format class.
 282  
    * Note: Giraph should only use WrappedVertexInputFormat,
 283  
    * which makes sure that Configuration parameters are set properly.
 284  
    *
 285  
    * @return Instantiated user vertex input format class
 286  
    */
 287  
   private VertexInputFormat<I, V, E> createVertexInputFormat() {
 288  0
     Class<? extends VertexInputFormat<I, V, E>> klass =
 289  0
         getVertexInputFormatClass();
 290  0
     return ReflectionUtils.newInstance(klass, this);
 291  
   }
 292  
 
 293  
   /**
 294  
    * Create a wrapper for user vertex input format,
 295  
    * which makes sure that Configuration parameters are set properly in all
 296  
    * methods related to this format.
 297  
    *
 298  
    * @return Wrapper around user vertex input format
 299  
    */
 300  
   public WrappedVertexInputFormat<I, V, E> createWrappedVertexInputFormat() {
 301  0
     WrappedVertexInputFormat<I, V, E> wrappedVertexInputFormat =
 302  0
         new WrappedVertexInputFormat<I, V, E>(createVertexInputFormat());
 303  0
     configureIfPossible(wrappedVertexInputFormat);
 304  0
     return wrappedVertexInputFormat;
 305  
   }
 306  
 
 307  
   @Override
 308  
   public void setVertexInputFormatClass(
 309  
       Class<? extends VertexInputFormat> vertexInputFormatClass) {
 310  0
     super.setVertexInputFormatClass(vertexInputFormatClass);
 311  0
     classes.setVertexInputFormatClass(vertexInputFormatClass);
 312  0
   }
 313  
 
 314  
   @Override
 315  
   public boolean hasVertexOutputFormat() {
 316  0
     return classes.hasVertexOutputFormat();
 317  
   }
 318  
 
 319  
   /**
 320  
    * Get the user's subclassed
 321  
    * {@link org.apache.giraph.io.VertexOutputFormat}.
 322  
    *
 323  
    * @return User's vertex output format class
 324  
    */
 325  
   public Class<? extends VertexOutputFormat<I, V, E>>
 326  
   getVertexOutputFormatClass() {
 327  0
     return classes.getVertexOutputFormatClass();
 328  
   }
 329  
 
 330  
   /**
 331  
    * Get MappingInputFormatClass
 332  
    *
 333  
    * @return MappingInputFormatClass
 334  
    */
 335  
   public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
 336  
   getMappingInputFormatClass() {
 337  0
     return classes.getMappingInputFormatClass();
 338  
   }
 339  
 
 340  
   /**
 341  
    * Set MappingInputFormatClass
 342  
    *
 343  
    * @param mappingInputFormatClass Determines how mappings are input
 344  
    */
 345  
   @Override
 346  
   public void setMappingInputFormatClass(
 347  
     Class<? extends MappingInputFormat> mappingInputFormatClass) {
 348  0
     super.setMappingInputFormatClass(mappingInputFormatClass);
 349  0
     classes.setMappingInputFormatClass(mappingInputFormatClass);
 350  0
   }
 351  
 
 352  
   /**
 353  
    * Check if mappingInputFormat is set
 354  
    *
 355  
    * @return true if mappingInputFormat is set
 356  
    */
 357  
   public boolean hasMappingInputFormat() {
 358  0
     return classes.hasMappingInputFormat();
 359  
   }
 360  
 
 361  
   /**
 362  
    * Create a user vertex output format class.
 363  
    * Note: Giraph should only use WrappedVertexOutputFormat,
 364  
    * which makes sure that Configuration parameters are set properly.
 365  
    *
 366  
    * @return Instantiated user vertex output format class
 367  
    */
 368  
   private VertexOutputFormat<I, V, E> createVertexOutputFormat() {
 369  0
     Class<? extends VertexOutputFormat<I, V, E>> klass =
 370  0
         getVertexOutputFormatClass();
 371  0
     return ReflectionUtils.newInstance(klass, this);
 372  
   }
 373  
 
 374  
   /**
 375  
    * Create a user mapping input format class.
 376  
    * Note: Giraph should only use WrappedMappingInputFormat,
 377  
    * which makes sure that Configuration parameters are set properly.
 378  
    *
 379  
    * @return Instantiated user mapping input format class
 380  
    */
 381  
   private MappingInputFormat<I, V, E, ? extends Writable>
 382  
   createMappingInputFormat() {
 383  0
     Class<? extends MappingInputFormat<I, V, E, ? extends Writable>> klass =
 384  0
         getMappingInputFormatClass();
 385  0
     return ReflectionUtils.newInstance(klass, this);
 386  
   }
 387  
 
 388  
   /**
 389  
    * Create a wrapper for user vertex output format,
 390  
    * which makes sure that Configuration parameters are set properly in all
 391  
    * methods related to this format.
 392  
    *
 393  
    * @return Wrapper around user vertex output format
 394  
    */
 395  
   public WrappedVertexOutputFormat<I, V, E> createWrappedVertexOutputFormat() {
 396  0
     WrappedVertexOutputFormat<I, V, E> wrappedVertexOutputFormat =
 397  0
         new WrappedVertexOutputFormat<I, V, E>(createVertexOutputFormat());
 398  0
     configureIfPossible(wrappedVertexOutputFormat);
 399  0
     return wrappedVertexOutputFormat;
 400  
   }
 401  
 
 402  
   /**
 403  
    * Create a wrapper for user mapping input format,
 404  
    * which makes sure that Configuration parameters are set properly in all
 405  
    * methods related to this format.
 406  
    *
 407  
    * @return Wrapper around user mapping input format
 408  
    */
 409  
   public WrappedMappingInputFormat<I, V, E, ? extends Writable>
 410  
   createWrappedMappingInputFormat() {
 411  
     WrappedMappingInputFormat<I, V, E, ? extends Writable>
 412  0
       wrappedMappingInputFormat =
 413  0
         new WrappedMappingInputFormat<>(createMappingInputFormat());
 414  0
     configureIfPossible(wrappedMappingInputFormat);
 415  0
     return wrappedMappingInputFormat;
 416  
   }
 417  
 
 418  
   @Override
 419  
   public boolean hasEdgeOutputFormat() {
 420  0
     return classes.hasEdgeOutputFormat();
 421  
   }
 422  
 
 423  
   /**
 424  
    * Get the user's subclassed
 425  
    * {@link org.apache.giraph.io.EdgeOutputFormat}.
 426  
    *
 427  
    * @return User's edge output format class
 428  
    */
 429  
   public Class<? extends EdgeOutputFormat<I, V, E>>
 430  
   getEdgeOutputFormatClass() {
 431  0
     return classes.getEdgeOutputFormatClass();
 432  
   }
 433  
 
 434  
   /**
 435  
    * Create a user edge output format class.
 436  
    * Note: Giraph should only use WrappedEdgeOutputFormat,
 437  
    * which makes sure that Configuration parameters are set properly.
 438  
    *
 439  
    * @return Instantiated user edge output format class
 440  
    */
 441  
   private EdgeOutputFormat<I, V, E> createEdgeOutputFormat() {
 442  0
     Class<? extends EdgeOutputFormat<I, V, E>> klass =
 443  0
         getEdgeOutputFormatClass();
 444  0
     return ReflectionUtils.newInstance(klass, this);
 445  
   }
 446  
 
 447  
   /**
 448  
    * Create a wrapper for user edge output format,
 449  
    * which makes sure that Configuration parameters are set properly in all
 450  
    * methods related to this format.
 451  
    *
 452  
    * @return Wrapper around user edge output format
 453  
    */
 454  
   public WrappedEdgeOutputFormat<I, V, E> createWrappedEdgeOutputFormat() {
 455  0
     WrappedEdgeOutputFormat<I, V, E> wrappedEdgeOutputFormat =
 456  0
         new WrappedEdgeOutputFormat<I, V, E>(createEdgeOutputFormat());
 457  0
     configureIfPossible(wrappedEdgeOutputFormat);
 458  0
     return wrappedEdgeOutputFormat;
 459  
   }
 460  
 
 461  
   /**
 462  
    * Create the proper superstep output, based on the configuration settings.
 463  
    *
 464  
    * @param context Mapper context
 465  
    * @return SuperstepOutput
 466  
    */
 467  
   public SuperstepOutput<I, V, E> createSuperstepOutput(
 468  
       Mapper<?, ?, ?, ?>.Context context) {
 469  0
     if (doOutputDuringComputation()) {
 470  0
       if (vertexOutputFormatThreadSafe()) {
 471  0
         return new MultiThreadedSuperstepOutput<I, V, E>(this, context);
 472  
       } else {
 473  0
         return new SynchronizedSuperstepOutput<I, V, E>(this, context);
 474  
       }
 475  
     } else {
 476  0
       return new NoOpSuperstepOutput<I, V, E>();
 477  
     }
 478  
   }
 479  
 
 480  
   @Override
 481  
   public boolean hasEdgeInputFormat() {
 482  0
     return classes.hasEdgeInputFormat();
 483  
   }
 484  
 
 485  
   /**
 486  
    * Get the user's subclassed
 487  
    * {@link org.apache.giraph.io.EdgeInputFormat}.
 488  
    *
 489  
    * @return User's edge input format class
 490  
    */
 491  
   public Class<? extends EdgeInputFormat<I, E>> getEdgeInputFormatClass() {
 492  0
     return classes.getEdgeInputFormatClass();
 493  
   }
 494  
 
 495  
   /**
 496  
    * Create a user edge input format class.
 497  
    * Note: Giraph should only use WrappedEdgeInputFormat,
 498  
    * which makes sure that Configuration parameters are set properly.
 499  
    *
 500  
    * @return Instantiated user edge input format class
 501  
    */
 502  
   private EdgeInputFormat<I, E> createEdgeInputFormat() {
 503  0
     Class<? extends EdgeInputFormat<I, E>> klass = getEdgeInputFormatClass();
 504  0
     return ReflectionUtils.newInstance(klass, this);
 505  
   }
 506  
 
 507  
   /**
 508  
    * Create a wrapper for user edge input format,
 509  
    * which makes sure that Configuration parameters are set properly in all
 510  
    * methods related to this format.
 511  
    *
 512  
    * @return Wrapper around user edge input format
 513  
    */
 514  
   public WrappedEdgeInputFormat<I, E> createWrappedEdgeInputFormat() {
 515  0
     WrappedEdgeInputFormat<I, E> wrappedEdgeInputFormat =
 516  0
         new WrappedEdgeInputFormat<I, E>(createEdgeInputFormat());
 517  0
     configureIfPossible(wrappedEdgeInputFormat);
 518  0
     return wrappedEdgeInputFormat;
 519  
   }
 520  
 
 521  
   @Override
 522  
   public void setEdgeInputFormatClass(
 523  
       Class<? extends EdgeInputFormat> edgeInputFormatClass) {
 524  0
     super.setEdgeInputFormatClass(edgeInputFormatClass);
 525  0
     classes.setEdgeInputFormatClass(edgeInputFormatClass);
 526  0
   }
 527  
 
 528  
   /**
 529  
    * Get the user's subclassed {@link AggregatorWriter}.
 530  
    *
 531  
    * @return User's aggregator writer class
 532  
    */
 533  
   public Class<? extends AggregatorWriter> getAggregatorWriterClass() {
 534  0
     return classes.getAggregatorWriterClass();
 535  
   }
 536  
 
 537  
   /**
 538  
    * Create a user aggregator output format class
 539  
    *
 540  
    * @return Instantiated user aggregator writer class
 541  
    */
 542  
   public AggregatorWriter createAggregatorWriter() {
 543  0
     return ReflectionUtils.newInstance(getAggregatorWriterClass(), this);
 544  
   }
 545  
 
 546  
   /**
 547  
    * Get the user's subclassed
 548  
    * {@link org.apache.giraph.graph.VertexValueCombiner} class.
 549  
    *
 550  
    * @return User's vertex value combiner class
 551  
    */
 552  
   public Class<? extends VertexValueCombiner<V>>
 553  
   getVertexValueCombinerClass() {
 554  0
     return classes.getVertexValueCombinerClass();
 555  
   }
 556  
 
 557  
   /**
 558  
    * Create a user vertex value combiner class
 559  
    *
 560  
    * @return Instantiated user vertex value combiner class
 561  
    */
 562  
   @SuppressWarnings("rawtypes")
 563  
   public VertexValueCombiner<V> createVertexValueCombiner() {
 564  0
     return ReflectionUtils.newInstance(getVertexValueCombinerClass(), this);
 565  
   }
 566  
 
 567  
   /**
 568  
    * Get the user's subclassed VertexResolver.
 569  
    *
 570  
    * @return User's vertex resolver class
 571  
    */
 572  
   public Class<? extends VertexResolver<I, V, E>> getVertexResolverClass() {
 573  0
     return classes.getVertexResolverClass();
 574  
   }
 575  
 
 576  
   /**
 577  
    * Create a user vertex revolver
 578  
    *
 579  
    * @return Instantiated user vertex resolver
 580  
    */
 581  
   public VertexResolver<I, V, E> createVertexResolver() {
 582  0
     return ReflectionUtils.newInstance(getVertexResolverClass(), this);
 583  
   }
 584  
 
 585  
   /**
 586  
    * Get the user's subclassed WorkerContext.
 587  
    *
 588  
    * @return User's worker context class
 589  
    */
 590  
   public Class<? extends WorkerContext> getWorkerContextClass() {
 591  0
     return classes.getWorkerContextClass();
 592  
   }
 593  
 
 594  
   /**
 595  
    * Create a user worker context
 596  
    *
 597  
    * @return Instantiated user worker context
 598  
    */
 599  
   public WorkerContext createWorkerContext() {
 600  0
     return ReflectionUtils.newInstance(getWorkerContextClass(), this);
 601  
   }
 602  
 
 603  
   /**
 604  
    * Get the user's subclassed {@link org.apache.giraph.master.MasterCompute}
 605  
    *
 606  
    * @return User's master class
 607  
    */
 608  
   public Class<? extends MasterCompute> getMasterComputeClass() {
 609  0
     return classes.getMasterComputeClass();
 610  
   }
 611  
 
 612  
   /**
 613  
    * Create a user master
 614  
    *
 615  
    * @return Instantiated user master
 616  
    */
 617  
   public MasterCompute createMasterCompute() {
 618  0
     return ReflectionUtils.newInstance(getMasterComputeClass(), this);
 619  
   }
 620  
 
 621  
   @Override
 622  
   public Class<? extends Computation<I, V, E,
 623  
       ? extends Writable, ? extends Writable>>
 624  
   getComputationClass() {
 625  0
     return classes.getComputationClass();
 626  
   }
 627  
 
 628  
   /**
 629  
    * Get computation factory class
 630  
    *
 631  
    * @return computation factory class
 632  
    */
 633  
   @Override
 634  
   public Class<? extends ComputationFactory<I, V, E,
 635  
       ? extends Writable, ? extends Writable>>
 636  
   getComputationFactoryClass() {
 637  0
     return classes.getComputationFactoryClass();
 638  
   }
 639  
 
 640  
   /**
 641  
    * Get computation factory
 642  
    *
 643  
    * @return computation factory
 644  
    */
 645  
   public ComputationFactory<I, V, E, ? extends Writable, ? extends Writable>
 646  
   createComputationFactory() {
 647  0
     return ReflectionUtils.newInstance(getComputationFactoryClass(), this);
 648  
   }
 649  
 
 650  
   /**
 651  
    * Create a user computation
 652  
    *
 653  
    * @return Instantiated user computation
 654  
    */
 655  
   public Computation<I, V, E, ? extends Writable, ? extends Writable>
 656  
   createComputation() {
 657  0
     return createComputationFactory().createComputation(this);
 658  
   }
 659  
 
 660  
   /**
 661  
    * Get user types describing graph (I,V,E,M1,M2)
 662  
    *
 663  
    * @return GiraphTypes
 664  
    */
 665  
   public GiraphTypes<I, V, E> getGiraphTypes() {
 666  0
     return classes.getGiraphTypes();
 667  
   }
 668  
 
 669  
   /**
 670  
    * Create a vertex
 671  
    *
 672  
    * @return Instantiated vertex
 673  
    */
 674  
   public Vertex<I, V, E> createVertex() {
 675  0
     Class vertexClass = classes.getVertexClass();
 676  0
     return (Vertex<I, V, E>) ReflectionUtils.newInstance(vertexClass, this);
 677  
   }
 678  
 
 679  
 
 680  
  /**
 681  
    * Get the user's subclassed vertex index class.
 682  
    *
 683  
    * @return User's vertex index class
 684  
    */
 685  
   public Class<I> getVertexIdClass() {
 686  0
     return classes.getVertexIdClass();
 687  
   }
 688  
 
 689  
   /**
 690  
    * Get vertex ID factory
 691  
    *
 692  
    * @return {@link VertexIdFactory}
 693  
    */
 694  
   public VertexIdFactory<I> getVertexIdFactory() {
 695  0
     return valueFactories.getVertexIdFactory();
 696  
   }
 697  
 
 698  
   /**
 699  
    * Create a user vertex index
 700  
    *
 701  
    * @return Instantiated user vertex index
 702  
    */
 703  
   public I createVertexId() {
 704  0
     return getVertexIdFactory().newInstance();
 705  
   }
 706  
 
 707  
   /**
 708  
    * Get the user's subclassed vertex value class.
 709  
    *
 710  
    * @return User's vertex value class
 711  
    */
 712  
   public Class<V> getVertexValueClass() {
 713  0
     return classes.getVertexValueClass();
 714  
   }
 715  
 
 716  
   /**
 717  
    * Get vertex value factory
 718  
    *
 719  
    * @return {@link VertexValueFactory}
 720  
    */
 721  
   public VertexValueFactory<V> getVertexValueFactory() {
 722  0
     return valueFactories.getVertexValueFactory();
 723  
   }
 724  
 
 725  
   /**
 726  
    * Create a user vertex value
 727  
    *
 728  
    * @return Instantiated user vertex value
 729  
    */
 730  
   @SuppressWarnings("unchecked")
 731  
   public V createVertexValue() {
 732  0
     return getVertexValueFactory().newInstance();
 733  
   }
 734  
 
 735  
   /**
 736  
    * Get the user's subclassed vertex value factory class
 737  
    *
 738  
    * @return User's vertex value factory class
 739  
    */
 740  
   public Class<? extends VertexValueFactory<V>> getVertexValueFactoryClass() {
 741  0
     return (Class<? extends VertexValueFactory<V>>)
 742  0
         valueFactories.getVertexValueFactory().getClass();
 743  
   }
 744  
 
 745  
   /**
 746  
    * Create array of MasterObservers.
 747  
    *
 748  
    * @param context Mapper context
 749  
    * @return Instantiated array of MasterObservers.
 750  
    */
 751  
   public MasterObserver[] createMasterObservers(
 752  
       Mapper<?, ?, ?, ?>.Context context) {
 753  0
     Class<? extends MasterObserver>[] klasses = getMasterObserverClasses();
 754  0
     MasterObserver[] objects = new MasterObserver[klasses.length];
 755  0
     for (int i = 0; i < klasses.length; ++i) {
 756  0
       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
 757  
     }
 758  0
     return objects;
 759  
   }
 760  
 
 761  
   /**
 762  
    * Create array of WorkerObservers.
 763  
    *
 764  
    * @param context Mapper context
 765  
    * @return Instantiated array of WorkerObservers.
 766  
    */
 767  
   public WorkerObserver[] createWorkerObservers(
 768  
       Mapper<?, ?, ?, ?>.Context context) {
 769  0
     Class<? extends WorkerObserver>[] klasses = getWorkerObserverClasses();
 770  0
     WorkerObserver[] objects = new WorkerObserver[klasses.length];
 771  0
     for (int i = 0; i < klasses.length; ++i) {
 772  0
       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
 773  
     }
 774  0
     return objects;
 775  
   }
 776  
 
 777  
   /**
 778  
    * Create array of MapperObservers.
 779  
    *
 780  
    * @param context Mapper context
 781  
    * @return Instantiated array of MapperObservers.
 782  
    */
 783  
   public MapperObserver[] createMapperObservers(
 784  
       Mapper<?, ?, ?, ?>.Context context) {
 785  0
     Class<? extends MapperObserver>[] klasses = getMapperObserverClasses();
 786  0
     MapperObserver[] objects = new MapperObserver[klasses.length];
 787  0
     for (int i = 0; i < klasses.length; ++i) {
 788  0
       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
 789  
     }
 790  0
     return objects;
 791  
   }
 792  
 
 793  
   /**
 794  
    * Create array of GcObservers.
 795  
    *
 796  
    * @param context Mapper context
 797  
    * @return Instantiated array of GcObservers.
 798  
    */
 799  
   public GcObserver[] createGcObservers(
 800  
       Mapper<?, ?, ?, ?>.Context context) {
 801  0
     Class<? extends GcObserver>[] klasses = getGcObserverClasses();
 802  0
     GcObserver[] objects = new GcObserver[klasses.length];
 803  0
     for (int i = 0; i < klasses.length; ++i) {
 804  0
       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
 805  
     }
 806  0
     return objects;
 807  
   }
 808  
 
 809  
   /**
 810  
    * Create job observer
 811  
    *
 812  
    * @return GiraphJobObserver set in configuration.
 813  
    */
 814  
   public GiraphJobObserver getJobObserver() {
 815  0
     return ReflectionUtils.newInstance(getJobObserverClass(), this);
 816  
   }
 817  
 
 818  
   /**
 819  
    * Create job retry checker
 820  
    *
 821  
    * @return GiraphJobRetryChecker set in configuration.
 822  
    */
 823  
   public GiraphJobRetryChecker getJobRetryChecker() {
 824  0
     return ReflectionUtils.newInstance(getJobRetryCheckerClass(), this);
 825  
   }
 826  
 
 827  
   /**
 828  
    * Get the user's subclassed edge value class.
 829  
    *
 830  
    * @return User's vertex edge value class
 831  
    */
 832  
   public Class<E> getEdgeValueClass() {
 833  0
     return classes.getEdgeValueClass();
 834  
   }
 835  
 
 836  
   /**
 837  
    * Tell if we are using NullWritable for Edge value.
 838  
    *
 839  
    * @return true if NullWritable is class for
 840  
    */
 841  
   public boolean isEdgeValueNullWritable() {
 842  0
     return getEdgeValueClass() == NullWritable.class;
 843  
   }
 844  
 
 845  
   /**
 846  
    * Get Factory for creating edge values
 847  
    *
 848  
    * @return {@link EdgeValueFactory}
 849  
    */
 850  
   public EdgeValueFactory<E> getEdgeValueFactory() {
 851  0
     return valueFactories.getEdgeValueFactory();
 852  
   }
 853  
 
 854  
   /**
 855  
    * Create a user edge value
 856  
    *
 857  
    * @return Instantiated user edge value
 858  
    */
 859  
   public E createEdgeValue() {
 860  0
     return getEdgeValueFactory().newInstance();
 861  
   }
 862  
 
 863  
   /**
 864  
    * Create a user edge.
 865  
    *
 866  
    * @return Instantiated user edge.
 867  
    */
 868  
   public Edge<I, E> createEdge() {
 869  0
     if (isEdgeValueNullWritable()) {
 870  0
       return (Edge<I, E>) EdgeFactory.create(createVertexId());
 871  
     } else {
 872  0
       return EdgeFactory.create(createVertexId(), createEdgeValue());
 873  
     }
 874  
   }
 875  
 
 876  
   /**
 877  
    * Create edge based on #createEdge definition
 878  
    *
 879  
    * @param translateEdge instance of TranslateEdge
 880  
    * @param edge edge to be translated
 881  
    * @return translated edge
 882  
    */
 883  
   public Edge<I, E> createEdge(TranslateEdge<I, E>
 884  
     translateEdge, Edge<I, E> edge) {
 885  0
     I translatedId = translateEdge.translateId(edge.getTargetVertexId());
 886  0
     if (isEdgeValueNullWritable()) {
 887  0
       return (Edge<I, E>) EdgeFactory.create(translatedId);
 888  
     } else {
 889  0
       return EdgeFactory.create(translatedId,
 890  0
         translateEdge.cloneValue(edge.getValue()));
 891  
     }
 892  
   }
 893  
 
 894  
   /**
 895  
    * Create a reusable edge.
 896  
    *
 897  
    * @return Instantiated reusable edge.
 898  
    */
 899  
   public ReusableEdge<I, E> createReusableEdge() {
 900  0
     if (isEdgeValueNullWritable()) {
 901  0
       return (ReusableEdge<I, E>) EdgeFactory.createReusable(createVertexId());
 902  
     } else {
 903  0
       return EdgeFactory.createReusable(createVertexId(), createEdgeValue());
 904  
     }
 905  
   }
 906  
 
 907  
   /**
 908  
    * Create edge store factory
 909  
    *
 910  
    * @return edge store factory
 911  
    */
 912  
   public EdgeStoreFactory<I, V, E> createEdgeStoreFactory() {
 913  0
     Class<? extends EdgeStoreFactory> edgeStoreFactoryClass =
 914  0
         EDGE_STORE_FACTORY_CLASS.get(this);
 915  0
     return ReflectionUtils.newInstance(edgeStoreFactoryClass);
 916  
   }
 917  
 
 918  
   /**
 919  
    * Get the user's subclassed incoming message value class.
 920  
    *
 921  
    * @param <M> Message data
 922  
    * @return User's vertex message value class
 923  
    */
 924  
   public <M extends Writable> Class<M> getIncomingMessageValueClass() {
 925  0
     return classes.getIncomingMessageClasses().getMessageClass();
 926  
   }
 927  
 
 928  
   /**
 929  
    * Get the user's subclassed outgoing message value class.
 930  
    *
 931  
    * @param <M> Message type
 932  
    * @return User's vertex message value class
 933  
    */
 934  
   public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
 935  0
     return classes.getOutgoingMessageClasses().getMessageClass();
 936  
   }
 937  
 
 938  
   /**
 939  
    * Get incoming message classes
 940  
    * @param <M> message type
 941  
    * @return incoming message classes
 942  
    */
 943  
   public <M extends Writable>
 944  
   MessageClasses<I, M> getIncomingMessageClasses() {
 945  0
     return classes.getIncomingMessageClasses();
 946  
   }
 947  
 
 948  
   /**
 949  
    * Get outgoing message classes
 950  
    * @param <M> message type
 951  
    * @return outgoing message classes
 952  
    */
 953  
   public <M extends Writable>
 954  
   MessageClasses<I, M> getOutgoingMessageClasses() {
 955  0
     return classes.getOutgoingMessageClasses();
 956  
   }
 957  
 
 958  
   /**
 959  
    * Create new outgoing message value factory
 960  
    * @param <M> message type
 961  
    * @return outgoing message value factory
 962  
    */
 963  
   public <M extends Writable>
 964  
   MessageValueFactory<M> createOutgoingMessageValueFactory() {
 965  0
     return classes.getOutgoingMessageClasses().createMessageValueFactory(this);
 966  
   }
 967  
 
 968  
   /**
 969  
    * Create new incoming message value factory
 970  
    * @param <M> message type
 971  
    * @return incoming message value factory
 972  
    */
 973  
   public <M extends Writable>
 974  
   MessageValueFactory<M> createIncomingMessageValueFactory() {
 975  0
     return classes.getIncomingMessageClasses().createMessageValueFactory(this);
 976  
   }
 977  
 
 978  
   @Override
 979  
   public void setMessageCombinerClass(
 980  
       Class<? extends MessageCombiner> messageCombinerClass) {
 981  0
     throw new IllegalArgumentException(
 982  
         "Cannot set message combiner on ImmutableClassesGiraphConfigurable");
 983  
   }
 984  
 
 985  
   /**
 986  
    * Create a user combiner class
 987  
    *
 988  
    * @param <M> Message data
 989  
    * @return Instantiated user combiner class
 990  
    */
 991  
   public <M extends Writable> MessageCombiner<? super I, M>
 992  
   createOutgoingMessageCombiner() {
 993  0
     return classes.getOutgoingMessageClasses().createMessageCombiner(this);
 994  
   }
 995  
 
 996  
   /**
 997  
    * Check if user set a combiner
 998  
    *
 999  
    * @return True iff user set a combiner class
 1000  
    */
 1001  
   public boolean useOutgoingMessageCombiner() {
 1002  0
     return classes.getOutgoingMessageClasses().useMessageCombiner();
 1003  
   }
 1004  
 
 1005  
   /**
 1006  
    * Get outgoing message encode and store type
 1007  
    * @return outgoing message encode and store type
 1008  
    */
 1009  
   public MessageEncodeAndStoreType getOutgoingMessageEncodeAndStoreType() {
 1010  0
     return classes.getOutgoingMessageClasses().getMessageEncodeAndStoreType();
 1011  
   }
 1012  
 
 1013  
   @Override
 1014  
   public Class<? extends OutEdges<I, E>> getOutEdgesClass() {
 1015  0
     return classes.getOutEdgesClass();
 1016  
   }
 1017  
 
 1018  
   /**
 1019  
    * Get the user's subclassed {@link org.apache.giraph.edge.OutEdges} used for
 1020  
    * input
 1021  
    *
 1022  
    * @return User's input vertex edges class
 1023  
    */
 1024  
   public Class<? extends OutEdges<I, E>> getInputOutEdgesClass() {
 1025  0
     return classes.getInputOutEdgesClass();
 1026  
   }
 1027  
 
 1028  
   /**
 1029  
    * Check whether the user has specified a different
 1030  
    * {@link org.apache.giraph.edge.OutEdges} class to be used during
 1031  
    * edge-based input.
 1032  
    *
 1033  
    * @return True iff there is a special edges class for input
 1034  
    */
 1035  
   public boolean useInputOutEdges() {
 1036  0
     return classes.getInputOutEdgesClass() != classes.getOutEdgesClass();
 1037  
   }
 1038  
 
 1039  
   /**
 1040  
    * Get MappingStore class to be used
 1041  
    *
 1042  
    * @return MappingStore class set by user
 1043  
    */
 1044  
   public Class<? extends MappingStore> getMappingStoreClass() {
 1045  0
     return MAPPING_STORE_CLASS.get(this);
 1046  
   }
 1047  
 
 1048  
   /**
 1049  
    * Create a {@link org.apache.giraph.mapping.MappingStore} instance
 1050  
    *
 1051  
    * @return MappingStore Instance
 1052  
    */
 1053  
   public MappingStore<I, ? extends Writable> createMappingStore() {
 1054  0
     if (getMappingStoreClass() != null) {
 1055  0
       return ReflectionUtils.newInstance(getMappingStoreClass(), this);
 1056  
     } else {
 1057  0
       return null;
 1058  
     }
 1059  
   }
 1060  
 
 1061  
   /**
 1062  
    * Get MappingStoreOps class to be used
 1063  
    *
 1064  
    * @return MappingStoreOps class set by user
 1065  
    */
 1066  
   public Class<? extends MappingStoreOps> getMappingStoreOpsClass() {
 1067  0
     return MAPPING_STORE_OPS_CLASS.get(this);
 1068  
   }
 1069  
 
 1070  
   /**
 1071  
    * Create a {@link org.apache.giraph.mapping.MappingStoreOps} instance
 1072  
    *
 1073  
    * @return MappingStoreOps Instance
 1074  
    */
 1075  
   public MappingStoreOps<I, ? extends Writable> createMappingStoreOps() {
 1076  0
     if (getMappingStoreOpsClass() != null) {
 1077  0
       return ReflectionUtils.newInstance(getMappingStoreOpsClass(), this);
 1078  
     } else {
 1079  0
       return null;
 1080  
     }
 1081  
   }
 1082  
 
 1083  
   /**
 1084  
    * Get mappingTarget class
 1085  
    *
 1086  
    * @return mappingTarget class
 1087  
    */
 1088  
   public Class<? extends Writable> getMappingTargetClass() {
 1089  0
     if (mappingTargetClass == null) {
 1090  0
       Class<?>[] classList = ReflectionUtils.getTypeArguments(
 1091  0
         MappingStore.class, getMappingStoreClass());
 1092  0
       Preconditions.checkArgument(classList.length == 2);
 1093  0
       mappingTargetClass = (Class<? extends Writable>) classList[1];
 1094  
     }
 1095  0
     return mappingTargetClass;
 1096  
   }
 1097  
 
 1098  
   /**
 1099  
    * Create and return mappingTarget instance
 1100  
    *
 1101  
    * @return mappingTarget instance
 1102  
    */
 1103  
   public Writable createMappingTarget() {
 1104  0
     return WritableUtils.createWritable(getMappingTargetClass());
 1105  
   }
 1106  
 
 1107  
   /**
 1108  
    * Create a user {@link org.apache.giraph.edge.OutEdges}
 1109  
    *
 1110  
    * @return Instantiated user OutEdges
 1111  
    */
 1112  
   public OutEdges<I, E> createOutEdges() {
 1113  0
     return outEdgesFactory.newInstance();
 1114  
   }
 1115  
 
 1116  
   /**
 1117  
    * Create a {@link org.apache.giraph.edge.OutEdges} instance and initialize
 1118  
    * it with the default capacity.
 1119  
    *
 1120  
    * @return Instantiated OutEdges
 1121  
    */
 1122  
   public OutEdges<I, E> createAndInitializeOutEdges() {
 1123  0
     OutEdges<I, E> outEdges = createOutEdges();
 1124  0
     outEdges.initialize();
 1125  0
     return outEdges;
 1126  
   }
 1127  
 
 1128  
   /**
 1129  
    * Create a {@link org.apache.giraph.edge.OutEdges} instance and initialize
 1130  
    * it with the given capacity (the number of edges that will be added).
 1131  
    *
 1132  
    * @param capacity Number of edges that will be added
 1133  
    * @return Instantiated OutEdges
 1134  
    */
 1135  
   public OutEdges<I, E> createAndInitializeOutEdges(int capacity) {
 1136  0
     OutEdges<I, E> outEdges = createOutEdges();
 1137  0
     outEdges.initialize(capacity);
 1138  0
     return outEdges;
 1139  
   }
 1140  
 
 1141  
   /**
 1142  
    * Create a {@link org.apache.giraph.edge.OutEdges} instance and initialize
 1143  
    * it with the given iterable of edges.
 1144  
    *
 1145  
    * @param edges Iterable of edges to add
 1146  
    * @return Instantiated OutEdges
 1147  
    */
 1148  
   public OutEdges<I, E> createAndInitializeOutEdges(
 1149  
       Iterable<Edge<I, E>> edges) {
 1150  0
     OutEdges<I, E> outEdges = createOutEdges();
 1151  0
     outEdges.initialize(edges);
 1152  0
     return outEdges;
 1153  
   }
 1154  
 
 1155  
   /**
 1156  
    * Create a user {@link org.apache.giraph.edge.OutEdges} used during
 1157  
    * edge-based input
 1158  
    *
 1159  
    * @return Instantiated user input OutEdges
 1160  
    */
 1161  
   public OutEdges<I, E> createInputOutEdges() {
 1162  0
     return inputOutEdgesFactory.newInstance();
 1163  
   }
 1164  
 
 1165  
   /**
 1166  
    * Create an input {@link org.apache.giraph.edge.OutEdges} instance and
 1167  
    * initialize it with the default capacity.
 1168  
    *
 1169  
    * @return Instantiated input OutEdges
 1170  
    */
 1171  
   public OutEdges<I, E> createAndInitializeInputOutEdges() {
 1172  0
     OutEdges<I, E> outEdges = createInputOutEdges();
 1173  0
     outEdges.initialize();
 1174  0
     return outEdges;
 1175  
   }
 1176  
 
 1177  
   /**
 1178  
    * Create a partition
 1179  
    *
 1180  
    * @param id Partition id
 1181  
    * @param progressable Progressable for reporting progress
 1182  
    * @return Instantiated partition
 1183  
    */
 1184  
   public Partition<I, V, E> createPartition(
 1185  
       int id, Progressable progressable) {
 1186  0
     Class<? extends Partition<I, V, E>> klass = classes.getPartitionClass();
 1187  0
     Partition<I, V, E> partition = ReflectionUtils.newInstance(klass, this);
 1188  0
     partition.initialize(id, progressable);
 1189  0
     return partition;
 1190  
   }
 1191  
 
 1192  
   /**
 1193  
    * Use unsafe serialization?
 1194  
    *
 1195  
    * @return True if using unsafe serialization, false otherwise.
 1196  
    */
 1197  
   public boolean useUnsafeSerialization() {
 1198  0
     return useUnsafeSerialization;
 1199  
   }
 1200  
 
 1201  
   /**
 1202  
    * Create DataInputOutput to store messages
 1203  
    *
 1204  
    * @return DataInputOutput object
 1205  
    */
 1206  
   public DataInputOutput createMessagesInputOutput() {
 1207  0
     if (useBigDataIOForMessages) {
 1208  0
       return new BigDataInputOutput(this);
 1209  
     } else {
 1210  0
       return new ExtendedDataInputOutput(this);
 1211  
     }
 1212  
   }
 1213  
 
 1214  
   /**
 1215  
    * Create an extended data output (can be subclassed)
 1216  
    *
 1217  
    * @return ExtendedDataOutput object
 1218  
    */
 1219  
   public ExtendedDataOutput createExtendedDataOutput() {
 1220  0
     if (useUnsafeSerialization) {
 1221  0
       return new UnsafeByteArrayOutputStream();
 1222  
     } else {
 1223  0
       return new ExtendedByteArrayDataOutput();
 1224  
     }
 1225  
   }
 1226  
 
 1227  
   /**
 1228  
    * Create an extended data output (can be subclassed)
 1229  
    *
 1230  
    * @param expectedSize Expected size
 1231  
    * @return ExtendedDataOutput object
 1232  
    */
 1233  
   public ExtendedDataOutput createExtendedDataOutput(int expectedSize) {
 1234  0
     if (useUnsafeSerialization) {
 1235  0
       return new UnsafeByteArrayOutputStream(expectedSize);
 1236  
     } else {
 1237  0
       return new ExtendedByteArrayDataOutput(expectedSize);
 1238  
     }
 1239  
   }
 1240  
 
 1241  
   /**
 1242  
    * Create an extended data output (can be subclassed)
 1243  
    *
 1244  
    * @param buf Buffer to use for the output (reuse perhaps)
 1245  
    * @param pos How much of the buffer is already used
 1246  
    * @return ExtendedDataOutput object
 1247  
    */
 1248  
   public ExtendedDataOutput createExtendedDataOutput(byte[] buf,
 1249  
                                                      int pos) {
 1250  0
     if (useUnsafeSerialization) {
 1251  0
       return new UnsafeByteArrayOutputStream(buf, pos);
 1252  
     } else {
 1253  0
       return new ExtendedByteArrayDataOutput(buf, pos);
 1254  
     }
 1255  
   }
 1256  
 
 1257  
   /**
 1258  
    * Create an extended data input (can be subclassed)
 1259  
    *
 1260  
    * @param buf Buffer to use for the input
 1261  
    * @param off Where to start reading in the buffer
 1262  
    * @param length Maximum length of the buffer
 1263  
    * @return ExtendedDataInput object
 1264  
    */
 1265  
   public ExtendedDataInput createExtendedDataInput(
 1266  
       byte[] buf, int off, int length) {
 1267  0
     if (useUnsafeSerialization) {
 1268  0
       return new UnsafeByteArrayInputStream(buf, off, length);
 1269  
     } else {
 1270  0
       return new ExtendedByteArrayDataInput(buf, off, length);
 1271  
     }
 1272  
   }
 1273  
 
 1274  
   /**
 1275  
    * Create an extended data input (can be subclassed)
 1276  
    *
 1277  
    * @param buf Buffer to use for the input
 1278  
    * @return ExtendedDataInput object
 1279  
    */
 1280  
   public ExtendedDataInput createExtendedDataInput(byte[] buf) {
 1281  0
     if (useUnsafeSerialization) {
 1282  0
       return new UnsafeByteArrayInputStream(buf);
 1283  
     } else {
 1284  0
       return new ExtendedByteArrayDataInput(buf);
 1285  
     }
 1286  
   }
 1287  
 
 1288  
   /**
 1289  
    * Create extendedDataInput based on extendedDataOutput
 1290  
    *
 1291  
    * @param extendedDataOutput extendedDataOutput
 1292  
    * @return extendedDataInput
 1293  
    */
 1294  
   public ExtendedDataInput createExtendedDataInput(
 1295  
     ExtendedDataOutput extendedDataOutput) {
 1296  0
     return createExtendedDataInput(extendedDataOutput.getByteArray(), 0,
 1297  0
         extendedDataOutput.getPos());
 1298  
   }
 1299  
 
 1300  
   /**
 1301  
    * Whether to use an unsafe serialization
 1302  
    *
 1303  
    * @return whether to use unsafe serialization
 1304  
    */
 1305  
   public boolean getUseUnsafeSerialization() {
 1306  0
     return useUnsafeSerialization;
 1307  
   }
 1308  
 
 1309  
   /**
 1310  
    * Update Computation and MessageCombiner class used
 1311  
    *
 1312  
    * @param superstepClasses SuperstepClasses
 1313  
    */
 1314  
   public void updateSuperstepClasses(SuperstepClasses superstepClasses) {
 1315  0
     superstepClasses.updateGiraphClasses(classes);
 1316  0
   }
 1317  
 
 1318  
   /**
 1319  
    * Has the user enabled compression in netty client &amp; server
 1320  
    *
 1321  
    * @return true if ok to do compression of netty requests
 1322  
    */
 1323  
   public boolean doCompression() {
 1324  0
     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
 1325  
     case "SNAPPY":
 1326  0
       return true;
 1327  
     case "INFLATE":
 1328  0
       return true;
 1329  
     default:
 1330  0
       return false;
 1331  
     }
 1332  
   }
 1333  
 
 1334  
   /**
 1335  
    * Get encoder for message compression in netty
 1336  
    *
 1337  
    * @return message to byte encoder
 1338  
    */
 1339  
   public MessageToByteEncoder getNettyCompressionEncoder() {
 1340  0
     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
 1341  
     case "SNAPPY":
 1342  0
       return new SnappyFramedEncoder();
 1343  
     case "INFLATE":
 1344  0
       return new JdkZlibEncoder();
 1345  
     default:
 1346  0
       return null;
 1347  
     }
 1348  
   }
 1349  
 
 1350  
   /**
 1351  
    * Get decoder for message decompression in netty
 1352  
    *
 1353  
    * @return byte to message decoder
 1354  
    */
 1355  
   public ByteToMessageDecoder getNettyCompressionDecoder() {
 1356  0
     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
 1357  
     case "SNAPPY":
 1358  0
       return new SnappyFramedDecoder();
 1359  
     case "INFLATE":
 1360  0
       return new JdkZlibDecoder();
 1361  
     default:
 1362  0
       return null;
 1363  
     }
 1364  
   }
 1365  
 
 1366  
   /**
 1367  
    * Whether the application with change or not the graph topology.
 1368  
    *
 1369  
    * @return true if the graph is static, false otherwise.
 1370  
    */
 1371  
   public boolean isStaticGraph() {
 1372  0
     return isStaticGraph;
 1373  
   }
 1374  
 
 1375  
   /**
 1376  
    * @return job id
 1377  
    */
 1378  
   public String getJobId() {
 1379  0
     return get("mapred.job.id", "UnknownJob");
 1380  
   }
 1381  
 
 1382  
   /**
 1383  
    * Use message size encoding?  This feature may help with complex message
 1384  
    * objects.
 1385  
    *
 1386  
    * @return Whether to use message size encoding
 1387  
    */
 1388  
   public boolean useMessageSizeEncoding() {
 1389  0
     return useMessageSizeEncoding;
 1390  
   }
 1391  
 }