1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.conf;
20
21 import org.apache.commons.lang3.StringUtils;
22 import org.apache.giraph.comm.flow_control.StaticFlowControl;
23 import org.apache.giraph.comm.netty.NettyClient;
24 import org.apache.giraph.master.BspServiceMaster;
25 import org.apache.giraph.worker.MemoryObserver;
26 import org.apache.hadoop.conf.Configuration;
27
28 import com.google.common.base.Preconditions;
29
30 import java.util.ArrayList;
31 import java.util.List;
32
33
34
35
36 public class FacebookConfiguration implements BulkConfigurator {
37
38
39
40 public static final IntConfOption MAPPER_MEMORY =
41 new IntConfOption("giraph.mapperMemoryGb", 10,
42 "How many GBs of memory to give to the mappers");
43
44
45
46 public static final IntConfOption MAPPER_CORES =
47 new IntConfOption("giraph.mapperCores", 10,
48 "How many cores will mapper be allowed to use");
49
50
51
52
53 public static final FloatConfOption NEW_GEN_MEMORY_FRACTION =
54 new FloatConfOption("giraph.newGenMemoryFraction", 0.1f,
55 "Fraction of total mapper memory to use for new generation");
56
57
58
59
60 public static final BooleanConfOption USE_G1_COLLECTOR =
61 new BooleanConfOption("giraph.useG1Collector", false,
62 "Whether or not to use G1 garbage collector");
63
64
65
66
67 public static final FloatConfOption CORES_FRACTION_DURING_COMMUNICATION =
68 new FloatConfOption("giraph.coresFractionDuringCommunication", 0.7f,
69 "Fraction of mapper cores to use for threads which overlap with" +
70 " network communication");
71
72
73
74
75 public static final BooleanConfOption CONFIGURE_JAVA_OPTS =
76 new BooleanConfOption("giraph.configureJavaOpts", true,
77 "Whether to configure java opts");
78
79
80
81
82 public static final StrConfOption MAPRED_JAVA_JOB_OPTIONS =
83 new StrConfOption("mapred.child.java.opts", null,
84 "Java options passed to mappers");
85
86
87
88
89
90
91
92
93
94
95
96
97
98 @Override
99 public void configure(GiraphConfiguration conf) {
100 int workers = conf.getInt(GiraphConstants.MIN_WORKERS, -1);
101 Preconditions.checkArgument(workers > 0, "Number of workers not set");
102 int cores = MAPPER_CORES.get(conf);
103
104
105
106 conf.setIfUnset(BspServiceMaster.NUM_MASTER_ZK_INPUT_SPLIT_THREADS,
107 Integer.toString(cores));
108
109 GiraphConstants.NUM_OUTPUT_THREADS.setIfUnset(conf, cores);
110
111 int threadsDuringCommunication = Math.max(1,
112 (int) (cores * CORES_FRACTION_DURING_COMMUNICATION.get(conf)));
113
114 GiraphConstants.NUM_INPUT_THREADS.setIfUnset(
115 conf, threadsDuringCommunication);
116
117 GiraphConstants.NUM_COMPUTE_THREADS.setIfUnset(
118 conf, threadsDuringCommunication);
119
120
121
122 GiraphConstants.NETTY_SERVER_THREADS.setIfUnset(
123 conf, threadsDuringCommunication);
124
125
126
127 GiraphConstants.CHANNELS_PER_SERVER.setIfUnset(conf,
128 Math.max(1, 2 * threadsDuringCommunication / workers));
129
130
131 NettyClient.LIMIT_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, true);
132 StaticFlowControl.MAX_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, 100);
133
134 GiraphConstants.NETTY_USE_POOLED_ALLOCATOR.setIfUnset(conf, true);
135
136
137 MemoryObserver.USE_MEMORY_OBSERVER.setIfUnset(conf, true);
138
139
140 GiraphConstants.MIN_PARTITIONS_PER_COMPUTE_THREAD.setIfUnset(conf, 3);
141
142
143 GiraphConstants.PREFER_IP_ADDRESSES.setIfUnset(conf, true);
144
145
146 GiraphConstants.TRACK_JOB_PROGRESS_ON_CLIENT.setIfUnset(conf, true);
147
148 GiraphConstants.LOG_THREAD_LAYOUT.setIfUnset(conf, true);
149
150 GiraphConstants.METRICS_ENABLE.setIfUnset(conf, true);
151
152 if (CONFIGURE_JAVA_OPTS.get(conf)) {
153 List<String> javaOpts = getMemoryJavaOpts(conf);
154 javaOpts.addAll(getGcJavaOpts(conf));
155 MAPRED_JAVA_JOB_OPTIONS.set(conf, StringUtils.join(javaOpts, " "));
156 }
157 }
158
159
160
161
162
163
164
165 public static List<String> getMemoryJavaOpts(Configuration conf) {
166 int memoryGb = MAPPER_MEMORY.get(conf);
167 List<String> javaOpts = new ArrayList<>();
168
169 javaOpts.add("-Xms" + memoryGb + "g");
170 javaOpts.add("-Xmx" + memoryGb + "g");
171
172
173 javaOpts.add("-XX:+UseNUMA");
174 return javaOpts;
175 }
176
177
178
179
180
181
182
183 public static List<String> getGcJavaOpts(Configuration conf) {
184 List<String> gcJavaOpts = new ArrayList<>();
185 if (USE_G1_COLLECTOR.get(conf)) {
186 gcJavaOpts.add("-XX:+UseG1GC");
187 gcJavaOpts.add("-XX:MaxGCPauseMillis=500");
188 } else {
189 int newGenMemoryGb = Math.max(1,
190 (int) (MAPPER_MEMORY.get(conf) * NEW_GEN_MEMORY_FRACTION.get(conf)));
191
192 gcJavaOpts.add("-XX:+UseParallelGC");
193 gcJavaOpts.add("-XX:+UseParallelOldGC");
194
195 gcJavaOpts.add("-XX:NewSize=" + newGenMemoryGb + "g");
196 gcJavaOpts.add("-XX:MaxNewSize=" + newGenMemoryGb + "g");
197 }
198 return gcJavaOpts;
199 }
200 }