Coverage Report - org.apache.giraph.conf.FacebookConfiguration
 
Classes in this File Line Coverage Branch Coverage Complexity
FacebookConfiguration
0%
0/52
0%
0/6
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.conf;
 20  
 
 21  
 import org.apache.commons.lang3.StringUtils;
 22  
 import org.apache.giraph.comm.flow_control.StaticFlowControl;
 23  
 import org.apache.giraph.comm.netty.NettyClient;
 24  
 import org.apache.giraph.master.BspServiceMaster;
 25  
 import org.apache.giraph.worker.MemoryObserver;
 26  
 import org.apache.hadoop.conf.Configuration;
 27  
 
 28  
 import com.google.common.base.Preconditions;
 29  
 
 30  
 import java.util.ArrayList;
 31  
 import java.util.List;
 32  
 
 33  
 /**
 34  
  * Default configuration used in Facebook
 35  
  */
 36  0
 public class FacebookConfiguration implements BulkConfigurator {
 37  
   /**
 38  
    * How much memory per mapper should we use
 39  
    */
 40  0
   public static final IntConfOption MAPPER_MEMORY =
 41  
       new IntConfOption("giraph.mapperMemoryGb", 10,
 42  
           "How many GBs of memory to give to the mappers");
 43  
   /**
 44  
    * How many cores per mapper should we use
 45  
    */
 46  0
   public static final IntConfOption MAPPER_CORES =
 47  
       new IntConfOption("giraph.mapperCores", 10,
 48  
           "How many cores will mapper be allowed to use");
 49  
 
 50  
   /**
 51  
    * Fraction of {@link #MAPPER_MEMORY} to use for new generation
 52  
    */
 53  0
   public static final FloatConfOption NEW_GEN_MEMORY_FRACTION =
 54  
       new FloatConfOption("giraph.newGenMemoryFraction", 0.1f,
 55  
           "Fraction of total mapper memory to use for new generation");
 56  
   /**
 57  
    * Note: using G1 is often faster, but we've seen it add off heap memory
 58  
    * overhead which can cause issues.
 59  
    */
 60  0
   public static final BooleanConfOption USE_G1_COLLECTOR =
 61  
       new BooleanConfOption("giraph.useG1Collector", false,
 62  
           "Whether or not to use G1 garbage collector");
 63  
   /**
 64  
    * Which fraction of cores to use for threads when computation and
 65  
    * communication overlap
 66  
    */
 67  0
   public static final FloatConfOption CORES_FRACTION_DURING_COMMUNICATION =
 68  
       new FloatConfOption("giraph.coresFractionDuringCommunication", 0.7f,
 69  
           "Fraction of mapper cores to use for threads which overlap with" +
 70  
               " network communication");
 71  
 
 72  
   /**
 73  
    * Whether to configure java opts.
 74  
    */
 75  0
   public static final BooleanConfOption CONFIGURE_JAVA_OPTS =
 76  
       new BooleanConfOption("giraph.configureJavaOpts", true,
 77  
           "Whether to configure java opts");
 78  
 
 79  
   /**
 80  
    * Java options passed to mappers.
 81  
    */
 82  0
   public static final StrConfOption MAPRED_JAVA_JOB_OPTIONS =
 83  
       new StrConfOption("mapred.child.java.opts", null,
 84  
           "Java options passed to mappers");
 85  
 
 86  
   /**
 87  
    * Expand GiraphConfiguration with default Facebook settings.
 88  
    * Assumes {@link #MAPPER_CORES} and number of workers to use
 89  
    * are already set correctly in Configuration.
 90  
    *
 91  
    * For all conf options it changed it will only do so if they are not set,
 92  
    * so it won't override any of your custom settings. The only exception is
 93  
    * mapred.child.java.opts, this one will be overwritten depending on the
 94  
    * {@link #CONFIGURE_JAVA_OPTS} setting
 95  
    *
 96  
    * @param conf Configuration
 97  
    */
 98  
   @Override
 99  
   public void configure(GiraphConfiguration conf) {
 100  0
     int workers = conf.getInt(GiraphConstants.MIN_WORKERS, -1);
 101  0
     Preconditions.checkArgument(workers > 0, "Number of workers not set");
 102  0
     int cores = MAPPER_CORES.get(conf);
 103  
 
 104  
     // Nothing else happens while we write input splits to zk,
 105  
     // so we can use all threads
 106  0
     conf.setIfUnset(BspServiceMaster.NUM_MASTER_ZK_INPUT_SPLIT_THREADS,
 107  0
         Integer.toString(cores));
 108  
     // Nothing else happens while we write output, so we can use all threads
 109  0
     GiraphConstants.NUM_OUTPUT_THREADS.setIfUnset(conf, cores);
 110  
 
 111  0
     int threadsDuringCommunication = Math.max(1,
 112  0
         (int) (cores * CORES_FRACTION_DURING_COMMUNICATION.get(conf)));
 113  
     // Input overlaps with communication, set threads properly
 114  0
     GiraphConstants.NUM_INPUT_THREADS.setIfUnset(
 115  
         conf, threadsDuringCommunication);
 116  
     // Compute overlaps with communication, set threads properly
 117  0
     GiraphConstants.NUM_COMPUTE_THREADS.setIfUnset(
 118  
         conf, threadsDuringCommunication);
 119  
     // Netty server threads are the ones adding messages to stores,
 120  
     // or adding vertices and edges to stores during input,
 121  
     // these are expensive operations so set threads properly
 122  0
     GiraphConstants.NETTY_SERVER_THREADS.setIfUnset(
 123  
         conf, threadsDuringCommunication);
 124  
 
 125  
     // Ensure we can utilize all communication threads by having enough
 126  
     // channels per server, in cases when we have just a few machines
 127  0
     GiraphConstants.CHANNELS_PER_SERVER.setIfUnset(conf,
 128  0
         Math.max(1, 2 * threadsDuringCommunication / workers));
 129  
 
 130  
     // Limit number of open requests to 2000
 131  0
     NettyClient.LIMIT_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, true);
 132  0
     StaticFlowControl.MAX_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, 100);
 133  
     // Pooled allocator in netty is faster
 134  0
     GiraphConstants.NETTY_USE_POOLED_ALLOCATOR.setIfUnset(conf, true);
 135  
 
 136  
     // Synchronize full gc calls across workers
 137  0
     MemoryObserver.USE_MEMORY_OBSERVER.setIfUnset(conf, true);
 138  
 
 139  
     // Increase number of partitions per compute thread
 140  0
     GiraphConstants.MIN_PARTITIONS_PER_COMPUTE_THREAD.setIfUnset(conf, 3);
 141  
 
 142  
     // Prefer ip addresses
 143  0
     GiraphConstants.PREFER_IP_ADDRESSES.setIfUnset(conf, true);
 144  
 
 145  
     // Track job progress
 146  0
     GiraphConstants.TRACK_JOB_PROGRESS_ON_CLIENT.setIfUnset(conf, true);
 147  
     // Thread-level debugging for easier understanding
 148  0
     GiraphConstants.LOG_THREAD_LAYOUT.setIfUnset(conf, true);
 149  
     // Enable tracking and printing of metrics
 150  0
     GiraphConstants.METRICS_ENABLE.setIfUnset(conf, true);
 151  
 
 152  0
     if (CONFIGURE_JAVA_OPTS.get(conf)) {
 153  0
       List<String> javaOpts = getMemoryJavaOpts(conf);
 154  0
       javaOpts.addAll(getGcJavaOpts(conf));
 155  0
       MAPRED_JAVA_JOB_OPTIONS.set(conf, StringUtils.join(javaOpts, " "));
 156  
     }
 157  0
   }
 158  
 
 159  
   /**
 160  
    * Get memory java opts to use
 161  
    *
 162  
    * @param conf Configuration
 163  
    * @return Java opts
 164  
    */
 165  
   public static List<String> getMemoryJavaOpts(Configuration conf) {
 166  0
     int memoryGb = MAPPER_MEMORY.get(conf);
 167  0
     List<String> javaOpts = new ArrayList<>();
 168  
     // Set xmx and xms to the same value
 169  0
     javaOpts.add("-Xms" + memoryGb + "g");
 170  0
     javaOpts.add("-Xmx" + memoryGb + "g");
 171  
     // Non-uniform memory allocator (great for multi-threading and appears to
 172  
     // have no impact when single threaded)
 173  0
     javaOpts.add("-XX:+UseNUMA");
 174  0
     return javaOpts;
 175  
   }
 176  
 
 177  
   /**
 178  
    * Get garbage collection java opts to use
 179  
    *
 180  
    * @param conf Configuration
 181  
    * @return Java opts
 182  
    */
 183  
   public static List<String> getGcJavaOpts(Configuration conf) {
 184  0
     List<String> gcJavaOpts = new ArrayList<>();
 185  0
     if (USE_G1_COLLECTOR.get(conf)) {
 186  0
       gcJavaOpts.add("-XX:+UseG1GC");
 187  0
       gcJavaOpts.add("-XX:MaxGCPauseMillis=500");
 188  
     } else {
 189  0
       int newGenMemoryGb = Math.max(1,
 190  0
           (int) (MAPPER_MEMORY.get(conf) * NEW_GEN_MEMORY_FRACTION.get(conf)));
 191  
       // Use parallel gc collector
 192  0
       gcJavaOpts.add("-XX:+UseParallelGC");
 193  0
       gcJavaOpts.add("-XX:+UseParallelOldGC");
 194  
       // Fix new size generation
 195  0
       gcJavaOpts.add("-XX:NewSize=" + newGenMemoryGb + "g");
 196  0
       gcJavaOpts.add("-XX:MaxNewSize=" + newGenMemoryGb + "g");
 197  
     }
 198  0
     return gcJavaOpts;
 199  
   }
 200  
 }