1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.policy;
20
21 import com.sun.management.GarbageCollectionNotificationInfo;
22 import org.apache.giraph.comm.netty.NettyClient;
23 import org.apache.giraph.conf.FloatConfOption;
24 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25 import org.apache.giraph.conf.LongConfOption;
26 import org.apache.giraph.ooc.OutOfCoreEngine;
27 import org.apache.giraph.ooc.command.IOCommand;
28 import org.apache.giraph.utils.MemoryUtils;
29 import org.apache.giraph.utils.ThreadUtils;
30 import org.apache.log4j.Logger;
31
32 import static com.google.common.base.Preconditions.checkState;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 public class ThresholdBasedOracle implements OutOfCoreOracle {
78
79 public static final FloatConfOption FAIL_MEMORY_PRESSURE =
80 new FloatConfOption("giraph.threshold.failPressure", 0.975f,
81 "The memory pressure (fraction of used memory) at/above which the " +
82 "job would fail.");
83
84
85
86
87
88 public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
89 new FloatConfOption("giraph.threshold.emergencyPressure", 0.925f,
90 "The memory pressure (fraction of used memory) at which the job " +
91 "is close to fail, hence we should reduce its processing rate " +
92 "as much as possible.");
93
94 public static final FloatConfOption HIGH_MEMORY_PRESSURE =
95 new FloatConfOption("giraph.threshold.highPressure", 0.875f,
96 "The memory pressure (fraction of used memory) at which the job " +
97 "is suffering from GC overhead.");
98
99
100
101
102 public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
103 new FloatConfOption("giraph.threshold.optimalPressure", 0.8f,
104 "The memory pressure (fraction of used memory) at which a " +
105 "memory-intensive job shows the optimal GC behavior.");
106
107
108
109
110 public static final FloatConfOption LOW_MEMORY_PRESSURE =
111 new FloatConfOption("giraph.threshold.lowPressure", 0.7f,
112 "The memory pressure (fraction of used memory) at/below which the " +
113 "job can use more memory without suffering the performance.");
114
115 public static final LongConfOption CHECK_MEMORY_INTERVAL =
116 new LongConfOption("giraph.threshold.checkMemoryInterval", 2500,
117 "The interval/period where memory observer thread wakes up and " +
118 "monitors memory footprint (in milliseconds)");
119
120
121
122
123
124 public static final LongConfOption LAST_GC_CALL_INTERVAL =
125 new LongConfOption("giraph.threshold.lastGcCallInterval", 10 * 1000,
126 "How long after last major/full GC should we call manual GC?");
127
128
129 private static final Logger LOG =
130 Logger.getLogger(ThresholdBasedOracle.class);
131
132 private final float failMemoryPressure;
133
134 private final float emergencyMemoryPressure;
135
136 private final float highMemoryPressure;
137
138 private final float optimalMemoryPressure;
139
140 private final float lowMemoryPressure;
141
142 private final long checkMemoryInterval;
143
144 private final long lastGCCallInterval;
145
146 private final OutOfCoreEngine oocEngine;
147
148 private volatile long lastMajorGCTime;
149
150 private volatile long lastMinorGCTime;
151
152
153
154
155
156
157
158 public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
159 OutOfCoreEngine oocEngine) {
160 this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf);
161 this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf);
162 this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf);
163 this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
164 this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
165 this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
166 this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
167 NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
168 boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
169 checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
170 "must be enabled. Use giraph.waitForPerWorkerRequests=true");
171 this.oocEngine = oocEngine;
172 this.lastMajorGCTime = 0;
173
174 ThreadUtils.startThread(new Runnable() {
175 @Override
176 public void run() {
177 while (true) {
178 double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
179 long time = System.currentTimeMillis();
180 if ((usedMemoryFraction > highMemoryPressure &&
181 time - lastMajorGCTime >= lastGCCallInterval) ||
182 (usedMemoryFraction > optimalMemoryPressure &&
183 time - lastMajorGCTime >= lastGCCallInterval &&
184 time - lastMinorGCTime >= lastGCCallInterval)) {
185 if (LOG.isInfoEnabled()) {
186 LOG.info("call: last GC happened a while ago and the " +
187 "amount of used memory is high (used memory " +
188 "fraction is " +
189 String.format("%.2f", usedMemoryFraction) + "). " +
190 "Calling GC manually");
191 }
192 System.gc();
193 time = System.currentTimeMillis() - time;
194 usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
195 if (LOG.isInfoEnabled()) {
196 LOG.info("call: manual GC is done. It took " +
197 String.format("%.2f", (double) time / 1000) +
198 " seconds. Used memory fraction is " +
199 String.format("%.2f", usedMemoryFraction));
200 }
201 }
202 updateRates(usedMemoryFraction);
203 try {
204 Thread.sleep(checkMemoryInterval);
205 } catch (InterruptedException e) {
206 LOG.warn("run: exception occurred!", e);
207 return;
208 }
209 }
210 }
211 }, "memory-checker", oocEngine.getServiceWorker().getGraphTaskManager().
212 createUncaughtExceptionHandler());
213 }
214
215
216
217
218
219
220
221 public void updateRates(double usedMemoryFraction) {
222
223 if (usedMemoryFraction >= failMemoryPressure) {
224 oocEngine.updateActiveThreadsFraction(0);
225 } else if (usedMemoryFraction < emergencyMemoryPressure) {
226 oocEngine.updateActiveThreadsFraction(1);
227 } else {
228 oocEngine.updateActiveThreadsFraction(1 -
229 (usedMemoryFraction - emergencyMemoryPressure) /
230 (failMemoryPressure - emergencyMemoryPressure));
231 }
232
233
234
235 if (usedMemoryFraction >= emergencyMemoryPressure) {
236 oocEngine.updateRequestsCreditFraction(0);
237 } else if (usedMemoryFraction < optimalMemoryPressure) {
238 oocEngine.updateRequestsCreditFraction(1);
239 } else {
240 oocEngine.updateRequestsCreditFraction(1 -
241 (usedMemoryFraction - optimalMemoryPressure) /
242 (emergencyMemoryPressure - optimalMemoryPressure));
243 }
244 }
245
246 @Override
247 public IOAction[] getNextIOActions() {
248 double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
249 if (LOG.isDebugEnabled()) {
250 LOG.debug(String.format("getNextIOActions: usedMemoryFraction = %.2f",
251 usedMemoryFraction));
252 }
253 if (usedMemoryFraction > highMemoryPressure) {
254 return new IOAction[]{
255 IOAction.STORE_MESSAGES_AND_BUFFERS,
256 IOAction.STORE_PARTITION};
257 } else if (usedMemoryFraction > optimalMemoryPressure) {
258 return new IOAction[]{
259 IOAction.LOAD_UNPROCESSED_PARTITION,
260 IOAction.STORE_MESSAGES_AND_BUFFERS,
261 IOAction.STORE_PROCESSED_PARTITION};
262 } else if (usedMemoryFraction > lowMemoryPressure) {
263 return new IOAction[]{
264 IOAction.LOAD_UNPROCESSED_PARTITION,
265 IOAction.STORE_MESSAGES_AND_BUFFERS,
266 IOAction.LOAD_PARTITION};
267 } else {
268 return new IOAction[]{IOAction.LOAD_PARTITION};
269 }
270 }
271
272 @Override
273 public boolean approve(IOCommand command) {
274 return true;
275 }
276
277 @Override
278 public void commandCompleted(IOCommand command) {
279
280 }
281
282 @Override
283 public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) {
284 String gcAction = gcInfo.getGcAction().toLowerCase();
285 if (gcAction.contains("full") || gcAction.contains("major")) {
286 if (!gcInfo.getGcCause().contains("No GC")) {
287 lastMajorGCTime = System.currentTimeMillis();
288 }
289 } else {
290 lastMinorGCTime = System.currentTimeMillis();
291 }
292 }
293
294 @Override
295 public void startIteration() {
296 }
297 }