1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.ooc.policy; |
19 | |
|
20 | |
import com.sun.management.GarbageCollectionNotificationInfo; |
21 | |
import it.unimi.dsi.fastutil.doubles.DoubleArrayList; |
22 | |
import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression; |
23 | |
import org.apache.giraph.comm.NetworkMetrics; |
24 | |
import org.apache.giraph.conf.FloatConfOption; |
25 | |
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
26 | |
import org.apache.giraph.conf.LongConfOption; |
27 | |
import org.apache.giraph.edge.AbstractEdgeStore; |
28 | |
import org.apache.giraph.ooc.OutOfCoreEngine; |
29 | |
import org.apache.giraph.ooc.command.IOCommand; |
30 | |
import org.apache.giraph.ooc.command.LoadPartitionIOCommand; |
31 | |
import org.apache.giraph.ooc.command.WaitIOCommand; |
32 | |
import org.apache.giraph.utils.ThreadUtils; |
33 | |
import org.apache.giraph.worker.EdgeInputSplitsCallable; |
34 | |
import org.apache.giraph.worker.VertexInputSplitsCallable; |
35 | |
import org.apache.giraph.worker.WorkerProgress; |
36 | |
import org.apache.log4j.Logger; |
37 | |
|
38 | |
import javax.annotation.Nullable; |
39 | |
import java.lang.management.ManagementFactory; |
40 | |
import java.lang.management.MemoryPoolMXBean; |
41 | |
import java.lang.management.MemoryUsage; |
42 | |
import java.util.ArrayList; |
43 | |
import java.util.Arrays; |
44 | |
import java.util.List; |
45 | |
import java.util.Map; |
46 | |
import java.util.concurrent.atomic.AtomicLong; |
47 | |
import java.util.concurrent.locks.Lock; |
48 | |
import java.util.concurrent.locks.ReentrantLock; |
49 | |
|
50 | |
import static com.google.common.base.Preconditions.checkState; |
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | |
|
58 | |
|
59 | |
|
60 | |
|
61 | |
|
62 | |
|
63 | |
|
64 | |
|
65 | |
|
66 | |
|
67 | 0 | public class MemoryEstimatorOracle implements OutOfCoreOracle { |
68 | |
|
69 | 0 | public static final LongConfOption CHECK_MEMORY_INTERVAL = |
70 | |
new LongConfOption("giraph.garbageEstimator.checkMemoryInterval", 1000, |
71 | |
"The interval where memory checker thread wakes up and " + |
72 | |
"monitors memory footprint (in milliseconds)"); |
73 | |
|
74 | |
|
75 | |
|
76 | |
|
77 | 0 | public static final FloatConfOption MANUAL_GC_MEMORY_PRESSURE = |
78 | |
new FloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f, |
79 | |
"The threshold above which GC is called manually if Full GC has not " + |
80 | |
"happened in a while"); |
81 | |
|
82 | 0 | public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION = |
83 | |
new FloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f, |
84 | |
"Minimum percentage of memory we expect to be reclaimed after a Full " + |
85 | |
"GC. If less than this amount is reclaimed, it is sage to say " + |
86 | |
"we are in a high memory situation and the estimation mechanism " + |
87 | |
"has not recognized it yet!"); |
88 | |
|
89 | 0 | public static final FloatConfOption AM_HIGH_THRESHOLD = |
90 | |
new FloatConfOption("giraph.amHighThreshold", 0.95f, |
91 | |
"If mem-usage is above this threshold, all active threads " + |
92 | |
"(compute/input) are paused."); |
93 | |
|
94 | 0 | public static final FloatConfOption AM_LOW_THRESHOLD = |
95 | |
new FloatConfOption("giraph.amLowThreshold", 0.90f, |
96 | |
"If mem-usage is below this threshold, all active threads " + |
97 | |
"(compute/input) are running."); |
98 | |
|
99 | 0 | public static final FloatConfOption CREDIT_HIGH_THRESHOLD = |
100 | |
new FloatConfOption("giraph.creditHighThreshold", 0.95f, |
101 | |
"If mem-usage is above this threshold, credit is set to 0"); |
102 | |
|
103 | 0 | public static final FloatConfOption CREDIT_LOW_THRESHOLD = |
104 | |
new FloatConfOption("giraph.creditLowThreshold", 0.90f, |
105 | |
"If mem-usage is below this threshold, credit is set to max"); |
106 | |
|
107 | 0 | public static final FloatConfOption OOC_THRESHOLD = |
108 | |
new FloatConfOption("giraph.oocThreshold", 0.90f, |
109 | |
"If mem-usage is above this threshold, out of core threads starts " + |
110 | |
"writing data to disk"); |
111 | |
|
112 | |
|
113 | 0 | private static final Logger LOG = |
114 | 0 | Logger.getLogger(MemoryEstimatorOracle.class); |
115 | |
|
116 | |
|
117 | |
private final float manualGCMemoryPressure; |
118 | |
|
119 | |
private final float gcReclaimFraction; |
120 | |
|
121 | |
private final float amHighThreshold; |
122 | |
|
123 | |
private final float amLowThreshold; |
124 | |
|
125 | |
private final float creditHighThreshold; |
126 | |
|
127 | |
private final float creditLowThreshold; |
128 | |
|
129 | |
private final float oocThreshold; |
130 | |
|
131 | |
|
132 | |
private final OutOfCoreEngine oocEngine; |
133 | |
|
134 | |
private final MemoryEstimator memoryEstimator; |
135 | |
|
136 | 0 | private final AtomicLong oocBytesInjected = new AtomicLong(0); |
137 | |
|
138 | 0 | private final AtomicLong numBytesToOffload = new AtomicLong(0); |
139 | |
|
140 | 0 | private volatile State state = State.STABLE; |
141 | |
|
142 | 0 | private volatile long lastMajorGCTime = 0; |
143 | |
|
144 | |
|
145 | |
|
146 | |
|
147 | 0 | private enum State { |
148 | |
|
149 | 0 | STABLE, |
150 | |
|
151 | 0 | OFFLOADING, |
152 | |
} |
153 | |
|
154 | |
|
155 | |
|
156 | |
|
157 | |
|
158 | |
|
159 | |
|
160 | |
public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf, |
161 | 0 | final OutOfCoreEngine oocEngine) { |
162 | 0 | this.oocEngine = oocEngine; |
163 | 0 | this.memoryEstimator = new MemoryEstimator(this.oocBytesInjected, |
164 | 0 | oocEngine.getNetworkMetrics()); |
165 | |
|
166 | 0 | this.manualGCMemoryPressure = MANUAL_GC_MEMORY_PRESSURE.get(conf); |
167 | 0 | this.gcReclaimFraction = GC_MINIMUM_RECLAIM_FRACTION.get(conf); |
168 | 0 | this.amHighThreshold = AM_HIGH_THRESHOLD.get(conf); |
169 | 0 | this.amLowThreshold = AM_LOW_THRESHOLD.get(conf); |
170 | 0 | this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf); |
171 | 0 | this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf); |
172 | 0 | this.oocThreshold = OOC_THRESHOLD.get(conf); |
173 | |
|
174 | 0 | final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf); |
175 | |
|
176 | 0 | ThreadUtils.startThread(new Runnable() { |
177 | |
@Override |
178 | |
public void run() { |
179 | |
while (true) { |
180 | 0 | long oldGenUsageEstimate = memoryEstimator.getUsageEstimate(); |
181 | 0 | MemoryUsage usage = getOldGenUsed(); |
182 | 0 | if (oldGenUsageEstimate > 0) { |
183 | 0 | updateRates(oldGenUsageEstimate, usage.getMax()); |
184 | |
} else { |
185 | 0 | long time = System.currentTimeMillis(); |
186 | 0 | if (time - lastMajorGCTime >= 10000) { |
187 | 0 | double used = (double) usage.getUsed() / usage.getMax(); |
188 | 0 | if (used > manualGCMemoryPressure) { |
189 | 0 | if (LOG.isInfoEnabled()) { |
190 | 0 | LOG.info( |
191 | |
"High memory pressure with no full GC from the JVM. " + |
192 | |
"Calling GC manually. Used fraction of old-gen is " + |
193 | 0 | String.format("%.2f", used) + "."); |
194 | |
} |
195 | 0 | System.gc(); |
196 | 0 | time = System.currentTimeMillis() - time; |
197 | 0 | usage = getOldGenUsed(); |
198 | 0 | used = (double) usage.getUsed() / usage.getMax(); |
199 | 0 | if (LOG.isInfoEnabled()) { |
200 | 0 | LOG.info("Manual GC done. It took " + |
201 | 0 | String.format("%.2f", time / 1000.0) + |
202 | |
" seconds. Used fraction of old-gen is " + |
203 | 0 | String.format("%.2f", used) + "."); |
204 | |
} |
205 | |
} |
206 | |
} |
207 | |
} |
208 | |
try { |
209 | 0 | Thread.sleep(checkMemoryInterval); |
210 | 0 | } catch (InterruptedException e) { |
211 | 0 | LOG.warn("run: exception occurred!", e); |
212 | 0 | return; |
213 | 0 | } |
214 | 0 | } |
215 | |
} |
216 | 0 | }, "ooc-memory-checker", oocEngine.getServiceWorker().getGraphTaskManager() |
217 | 0 | .createUncaughtExceptionHandler()); |
218 | 0 | } |
219 | |
|
220 | |
|
221 | |
|
222 | |
|
223 | |
|
224 | |
|
225 | |
|
226 | |
|
227 | |
|
228 | |
|
229 | |
|
230 | |
@Override |
231 | |
public void startIteration() { |
232 | 0 | AbstractEdgeStore.PROGRESS_COUNTER.reset(); |
233 | 0 | oocBytesInjected.set(0); |
234 | 0 | memoryEstimator.clear(); |
235 | 0 | memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep()); |
236 | 0 | oocEngine.updateRequestsCreditFraction(1); |
237 | 0 | oocEngine.updateActiveThreadsFraction(1); |
238 | 0 | } |
239 | |
|
240 | |
|
241 | |
@Override |
242 | |
public IOAction[] getNextIOActions() { |
243 | 0 | if (state == State.OFFLOADING) { |
244 | 0 | return new IOAction[]{ |
245 | |
IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION}; |
246 | |
} |
247 | 0 | long oldGenUsage = memoryEstimator.getUsageEstimate(); |
248 | 0 | MemoryUsage usage = getOldGenUsed(); |
249 | 0 | if (oldGenUsage > 0) { |
250 | 0 | double usageEstimate = (double) oldGenUsage / usage.getMax(); |
251 | 0 | if (usageEstimate > oocThreshold) { |
252 | 0 | return new IOAction[]{ |
253 | |
IOAction.STORE_MESSAGES_AND_BUFFERS, |
254 | |
IOAction.STORE_PARTITION}; |
255 | |
} else { |
256 | 0 | return new IOAction[]{IOAction.LOAD_PARTITION}; |
257 | |
} |
258 | |
} else { |
259 | 0 | return new IOAction[]{IOAction.LOAD_PARTITION}; |
260 | |
} |
261 | |
} |
262 | |
|
263 | |
@Override |
264 | |
public boolean approve(IOCommand command) { |
265 | 0 | return true; |
266 | |
} |
267 | |
|
268 | |
@Override |
269 | |
public void commandCompleted(IOCommand command) { |
270 | 0 | if (command instanceof LoadPartitionIOCommand) { |
271 | 0 | oocBytesInjected.getAndAdd(command.bytesTransferred()); |
272 | 0 | if (state == State.OFFLOADING) { |
273 | 0 | numBytesToOffload.getAndAdd(command.bytesTransferred()); |
274 | |
} |
275 | 0 | } else if (!(command instanceof WaitIOCommand)) { |
276 | 0 | oocBytesInjected.getAndAdd(0 - command.bytesTransferred()); |
277 | 0 | if (state == State.OFFLOADING) { |
278 | 0 | numBytesToOffload.getAndAdd(0 - command.bytesTransferred()); |
279 | |
} |
280 | |
} |
281 | |
|
282 | 0 | if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) { |
283 | 0 | numBytesToOffload.set(0); |
284 | 0 | state = State.STABLE; |
285 | 0 | updateRates(-1, 1); |
286 | |
} |
287 | 0 | } |
288 | |
|
289 | |
|
290 | |
|
291 | |
|
292 | |
|
293 | |
|
294 | |
|
295 | |
@Override |
296 | |
public synchronized void gcCompleted( |
297 | |
GarbageCollectionNotificationInfo gcInfo) { |
298 | 0 | String action = gcInfo.getGcAction().toLowerCase(); |
299 | 0 | String cause = gcInfo.getGcCause().toLowerCase(); |
300 | 0 | if (action.contains("major") && |
301 | 0 | (cause.contains("ergo") || cause.contains("system"))) { |
302 | 0 | lastMajorGCTime = System.currentTimeMillis(); |
303 | 0 | MemoryUsage before = null; |
304 | 0 | MemoryUsage after = null; |
305 | |
|
306 | |
for (Map.Entry<String, MemoryUsage> entry : |
307 | 0 | gcInfo.getGcInfo().getMemoryUsageBeforeGc().entrySet()) { |
308 | 0 | String poolName = entry.getKey(); |
309 | 0 | if (poolName.toLowerCase().contains("old")) { |
310 | 0 | before = entry.getValue(); |
311 | 0 | after = gcInfo.getGcInfo().getMemoryUsageAfterGc().get(poolName); |
312 | 0 | break; |
313 | |
} |
314 | 0 | } |
315 | 0 | if (after == null) { |
316 | 0 | throw new IllegalStateException("Missing Memory Usage After GC info"); |
317 | |
} |
318 | 0 | if (before == null) { |
319 | 0 | throw new IllegalStateException("Missing Memory Usage Before GC info"); |
320 | |
} |
321 | |
|
322 | |
|
323 | 0 | long usedMemoryEstimate = memoryEstimator.getUsageEstimate(); |
324 | 0 | long usedMemoryReal = after.getUsed(); |
325 | 0 | if (usedMemoryEstimate >= 0) { |
326 | 0 | if (LOG.isInfoEnabled()) { |
327 | 0 | LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + " real=" + |
328 | |
usedMemoryReal + " error=" + |
329 | 0 | ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) / |
330 | |
usedMemoryReal * 100)); |
331 | |
} |
332 | |
} |
333 | |
|
334 | |
|
335 | 0 | long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 : |
336 | 0 | EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count(); |
337 | |
|
338 | 0 | long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 : |
339 | 0 | VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count(); |
340 | |
|
341 | 0 | long verticesComputed = WorkerProgress.get().getVerticesComputed() + |
342 | 0 | WorkerProgress.get().getVerticesStored() + |
343 | 0 | AbstractEdgeStore.PROGRESS_COUNTER.getProgress(); |
344 | |
|
345 | 0 | long receivedBytes = |
346 | 0 | oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep(); |
347 | |
|
348 | 0 | long oocBytes = oocBytesInjected.get(); |
349 | |
|
350 | 0 | memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded, |
351 | |
verticesLoaded, verticesComputed, receivedBytes, oocBytes); |
352 | |
|
353 | 0 | long garbage = before.getUsed() - after.getUsed(); |
354 | 0 | long maxMem = after.getMax(); |
355 | 0 | long memUsed = after.getUsed(); |
356 | 0 | boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction * maxMem && |
357 | |
garbage < gcReclaimFraction * maxMem; |
358 | 0 | boolean predictionExist = memoryEstimator.getUsageEstimate() > 0; |
359 | 0 | if (isTight && !predictionExist) { |
360 | 0 | if (LOG.isInfoEnabled()) { |
361 | 0 | LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" + |
362 | |
memUsed + " maxMem=" + maxMem); |
363 | |
} |
364 | 0 | numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) - |
365 | |
(maxMem - memUsed)); |
366 | 0 | if (LOG.isInfoEnabled()) { |
367 | 0 | LOG.info("gcCompleted: tight memory usage. Starting to offload " + |
368 | 0 | "until " + numBytesToOffload.get() + " bytes are offloaded"); |
369 | |
} |
370 | 0 | state = State.OFFLOADING; |
371 | 0 | updateRates(1, 1); |
372 | |
} |
373 | |
} |
374 | 0 | } |
375 | |
|
376 | |
|
377 | |
|
378 | |
|
379 | |
|
380 | |
|
381 | |
|
382 | |
|
383 | |
|
384 | |
private void updateRates(long usageEstimateMem, long maxMemory) { |
385 | 0 | double usageEstimate = (double) usageEstimateMem / maxMemory; |
386 | 0 | if (usageEstimate > 0) { |
387 | 0 | if (usageEstimate >= amHighThreshold) { |
388 | 0 | oocEngine.updateActiveThreadsFraction(0); |
389 | 0 | } else if (usageEstimate < amLowThreshold) { |
390 | 0 | oocEngine.updateActiveThreadsFraction(1); |
391 | |
} else { |
392 | 0 | oocEngine.updateActiveThreadsFraction(1 - |
393 | |
(usageEstimate - amLowThreshold) / |
394 | |
(amHighThreshold - amLowThreshold)); |
395 | |
} |
396 | |
|
397 | 0 | if (usageEstimate >= creditHighThreshold) { |
398 | 0 | oocEngine.updateRequestsCreditFraction(0); |
399 | 0 | } else if (usageEstimate < creditLowThreshold) { |
400 | 0 | oocEngine.updateRequestsCreditFraction(1); |
401 | |
} else { |
402 | 0 | oocEngine.updateRequestsCreditFraction(1 - |
403 | |
(usageEstimate - creditLowThreshold) / |
404 | |
(creditHighThreshold - creditLowThreshold)); |
405 | |
} |
406 | |
} else { |
407 | 0 | oocEngine.updateActiveThreadsFraction(1); |
408 | 0 | oocEngine.updateRequestsCreditFraction(1); |
409 | |
} |
410 | 0 | } |
411 | |
|
412 | |
|
413 | |
|
414 | |
|
415 | |
|
416 | |
private MemoryUsage getOldGenUsed() { |
417 | |
List<MemoryPoolMXBean> memoryPoolList = |
418 | 0 | ManagementFactory.getMemoryPoolMXBeans(); |
419 | 0 | for (MemoryPoolMXBean pool : memoryPoolList) { |
420 | 0 | String normalName = pool.getName().toLowerCase(); |
421 | 0 | if (normalName.contains("old") || normalName.contains("tenured")) { |
422 | 0 | return pool.getUsage(); |
423 | |
} |
424 | 0 | } |
425 | 0 | throw new IllegalStateException("Bad Memory Pool"); |
426 | |
} |
427 | |
|
428 | |
|
429 | |
|
430 | |
|
431 | |
|
432 | |
|
433 | |
|
434 | |
|
435 | |
|
436 | |
private static class MemoryEstimator { |
437 | |
|
438 | 0 | private List<double[]> dataSamples = new ArrayList<>(); |
439 | |
|
440 | 0 | private DoubleArrayList memorySamples = new DoubleArrayList(); |
441 | |
|
442 | 0 | private double[] coefficient = new double[6]; |
443 | |
|
444 | 0 | private List<Integer> validColumnIndices = new ArrayList<>(); |
445 | |
|
446 | 0 | private double[] extreme = new double[6]; |
447 | |
|
448 | 0 | private boolean isValid = false; |
449 | |
|
450 | 0 | private OLSMultipleLinearRegression mlr = new OLSMultipleLinearRegression(); |
451 | |
|
452 | 0 | private Lock lock = new ReentrantLock(); |
453 | |
|
454 | 0 | private long currentSuperstep = -1; |
455 | |
|
456 | |
private final AtomicLong oocBytesInjected; |
457 | |
|
458 | |
private final NetworkMetrics networkMetrics; |
459 | |
|
460 | |
|
461 | |
|
462 | |
|
463 | |
|
464 | |
|
465 | |
|
466 | |
public MemoryEstimator(AtomicLong oocBytesInjected, |
467 | 0 | NetworkMetrics networkMetrics) { |
468 | 0 | this.oocBytesInjected = oocBytesInjected; |
469 | 0 | this.networkMetrics = networkMetrics; |
470 | 0 | } |
471 | |
|
472 | |
|
473 | |
|
474 | |
|
475 | |
|
476 | |
public void clear() { |
477 | 0 | dataSamples.clear(); |
478 | 0 | memorySamples.clear(); |
479 | 0 | isValid = false; |
480 | 0 | } |
481 | |
|
482 | |
public void setCurrentSuperstep(long superstep) { |
483 | 0 | this.currentSuperstep = superstep; |
484 | 0 | } |
485 | |
|
486 | |
|
487 | |
|
488 | |
|
489 | |
|
490 | |
|
491 | |
|
492 | |
|
493 | |
public long getUsageEstimate() { |
494 | 0 | long usage = -1; |
495 | 0 | lock.lock(); |
496 | |
try { |
497 | 0 | if (isValid) { |
498 | |
|
499 | 0 | long edgesLoaded = currentSuperstep >= 0 ? 0 : |
500 | 0 | EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count(); |
501 | |
|
502 | 0 | long verticesLoaded = currentSuperstep >= 0 ? 0 : |
503 | 0 | VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count(); |
504 | |
|
505 | 0 | long verticesComputed = WorkerProgress.get().getVerticesComputed() + |
506 | 0 | WorkerProgress.get().getVerticesStored() + |
507 | 0 | AbstractEdgeStore.PROGRESS_COUNTER.getProgress(); |
508 | |
|
509 | 0 | long receivedBytes = networkMetrics.getBytesReceivedPerSuperstep(); |
510 | |
|
511 | 0 | long oocBytes = this.oocBytesInjected.get(); |
512 | |
|
513 | 0 | usage = (long) (edgesLoaded * coefficient[0] + |
514 | |
verticesLoaded * coefficient[1] + |
515 | |
verticesComputed * coefficient[2] + |
516 | |
receivedBytes * coefficient[3] + |
517 | |
oocBytes * coefficient[4] + |
518 | |
coefficient[5]); |
519 | |
} |
520 | |
} finally { |
521 | 0 | lock.unlock(); |
522 | 0 | } |
523 | 0 | return usage; |
524 | |
} |
525 | |
|
526 | |
|
527 | |
|
528 | |
|
529 | |
|
530 | |
|
531 | |
|
532 | |
|
533 | |
|
534 | |
|
535 | |
|
536 | |
public void addRecord(long memUsed, long edges, long vertices, |
537 | |
long verticesProcessed, |
538 | |
long bytesReceived, long oocBytesInjected) { |
539 | 0 | checkState(memUsed > 0, "Memory Usage cannot be negative"); |
540 | 0 | if (dataSamples.size() > 0) { |
541 | 0 | double[] last = dataSamples.get(dataSamples.size() - 1); |
542 | 0 | if (edges == last[0] && vertices == last[1] && |
543 | |
verticesProcessed == last[2] && bytesReceived == last[3] && |
544 | |
oocBytesInjected == last[4]) { |
545 | 0 | if (LOG.isDebugEnabled()) { |
546 | 0 | LOG.debug( |
547 | |
"addRecord: avoiding to add the same entry as the last one!"); |
548 | |
} |
549 | 0 | return; |
550 | |
} |
551 | |
} |
552 | 0 | dataSamples.add(new double[] {edges, vertices, verticesProcessed, |
553 | |
bytesReceived, oocBytesInjected}); |
554 | 0 | memorySamples.add((double) memUsed); |
555 | |
|
556 | |
|
557 | 0 | validColumnIndices.clear(); |
558 | 0 | for (int i = 0; i < 5; ++i) { |
559 | 0 | boolean validIndex = false; |
560 | |
|
561 | 0 | for (double[] value : dataSamples) { |
562 | 0 | if (value[i] != 0) { |
563 | 0 | validIndex = true; |
564 | 0 | break; |
565 | |
} |
566 | 0 | } |
567 | 0 | if (validIndex) { |
568 | |
|
569 | 0 | double firstValue = -1; |
570 | 0 | boolean allEqual = true; |
571 | 0 | for (double[] value : dataSamples) { |
572 | 0 | if (firstValue == -1) { |
573 | 0 | firstValue = value[i]; |
574 | |
} else { |
575 | 0 | if (Math.abs((value[i] - firstValue) / firstValue) > 0.01) { |
576 | 0 | allEqual = false; |
577 | 0 | break; |
578 | |
} |
579 | |
} |
580 | 0 | } |
581 | 0 | validIndex = !allEqual; |
582 | 0 | if (validIndex) { |
583 | |
|
584 | 0 | for (int col = i + 1; col < 5; ++col) { |
585 | 0 | if (isLinearDependence(dataSamples, i, col)) { |
586 | 0 | validIndex = false; |
587 | 0 | break; |
588 | |
} |
589 | |
} |
590 | |
} |
591 | |
} |
592 | |
|
593 | 0 | if (validIndex) { |
594 | 0 | validColumnIndices.add(i); |
595 | |
} |
596 | |
} |
597 | |
|
598 | |
|
599 | |
|
600 | |
|
601 | |
|
602 | 0 | boolean setIsValid = false; |
603 | 0 | lock.lock(); |
604 | |
try { |
605 | 0 | if (validColumnIndices.size() >= 1 && |
606 | 0 | dataSamples.size() >= validColumnIndices.size() + 1) { |
607 | |
|
608 | 0 | double[][] xValues = new double[dataSamples.size()][]; |
609 | 0 | fillXMatrix(dataSamples, validColumnIndices, xValues); |
610 | 0 | double[] yValues = |
611 | 0 | memorySamples.toDoubleArray(new double[memorySamples.size()]); |
612 | 0 | mlr.newSampleData(yValues, xValues); |
613 | 0 | boolean isRegressionValid = |
614 | 0 | calculateRegression(coefficient, validColumnIndices, mlr); |
615 | |
|
616 | 0 | if (!isRegressionValid) { |
617 | 0 | return; |
618 | |
} |
619 | |
|
620 | |
|
621 | |
|
622 | |
|
623 | |
|
624 | |
|
625 | |
|
626 | |
|
627 | |
|
628 | |
|
629 | |
|
630 | |
|
631 | |
|
632 | |
|
633 | |
|
634 | |
|
635 | |
|
636 | |
|
637 | |
|
638 | |
|
639 | |
|
640 | |
|
641 | |
|
642 | |
|
643 | |
|
644 | |
|
645 | |
|
646 | |
|
647 | |
|
648 | |
|
649 | |
|
650 | |
|
651 | |
|
652 | |
|
653 | |
|
654 | |
|
655 | |
boolean changed; |
656 | 0 | extreme[3] = -1; |
657 | 0 | extreme[4] = -1; |
658 | |
do { |
659 | 0 | Boolean result = null; |
660 | |
|
661 | 0 | result = refineCoefficient(4, 1, 2, xValues, yValues); |
662 | 0 | if (result == null) { |
663 | 0 | return; |
664 | |
} |
665 | 0 | changed = result; |
666 | |
|
667 | 0 | result = refineCoefficient(3, 0, 2, xValues, yValues); |
668 | 0 | if (result == null) { |
669 | 0 | return; |
670 | |
} |
671 | 0 | changed |= result; |
672 | 0 | } while (changed); |
673 | 0 | if (extreme[3] != -1) { |
674 | 0 | coefficient[3] = extreme[3]; |
675 | |
} |
676 | 0 | if (extreme[4] != -1) { |
677 | 0 | coefficient[4] = extreme[4]; |
678 | |
} |
679 | 0 | setIsValid = true; |
680 | 0 | return; |
681 | |
} |
682 | |
} finally { |
683 | |
|
684 | |
|
685 | 0 | try { |
686 | 0 | isValid = setIsValid; |
687 | 0 | printStats(); |
688 | |
} finally { |
689 | 0 | lock.unlock(); |
690 | 0 | } |
691 | 0 | } |
692 | 0 | } |
693 | |
|
694 | |
|
695 | |
|
696 | |
|
697 | |
|
698 | |
|
699 | |
|
700 | |
|
701 | |
|
702 | |
|
703 | |
|
704 | |
|
705 | |
|
706 | |
|
707 | |
|
708 | |
|
709 | |
|
710 | |
|
711 | |
|
712 | |
|
713 | |
|
714 | |
|
715 | |
|
716 | |
|
717 | |
|
718 | |
|
719 | |
|
720 | |
|
721 | |
|
722 | |
|
723 | |
@Nullable |
724 | |
private Boolean refineCoefficient(int coefIndex, double lowerBound, |
725 | |
double upperBound, double[][] xValues, double[] yValues) { |
726 | |
|
727 | 0 | boolean result = false; |
728 | 0 | if (coefficient[coefIndex] < lowerBound || |
729 | |
coefficient[coefIndex] > upperBound) { |
730 | |
|
731 | |
double value; |
732 | 0 | if (coefficient[coefIndex] < lowerBound) { |
733 | 0 | value = lowerBound; |
734 | |
} else { |
735 | 0 | value = upperBound; |
736 | |
} |
737 | 0 | int ptr = -1; |
738 | |
|
739 | |
|
740 | |
|
741 | 0 | for (int i = validColumnIndices.size() - 1; i >= 0; --i) { |
742 | 0 | if (validColumnIndices.get(i) == coefIndex) { |
743 | 0 | ptr = i; |
744 | 0 | break; |
745 | |
} |
746 | |
} |
747 | 0 | if (ptr != -1) { |
748 | 0 | if (LOG.isDebugEnabled()) { |
749 | 0 | LOG.debug("addRecord: coefficient at index " + coefIndex + |
750 | |
" is wrong in the regression, setting it to " + value); |
751 | |
} |
752 | |
|
753 | 0 | validColumnIndices.remove(ptr); |
754 | |
|
755 | 0 | fillXMatrix(dataSamples, validColumnIndices, xValues); |
756 | |
|
757 | 0 | for (int i = 0; i < memorySamples.size(); ++i) { |
758 | 0 | yValues[i] -= value * dataSamples.get(i)[coefIndex]; |
759 | |
} |
760 | |
|
761 | 0 | extreme[coefIndex] = value; |
762 | |
|
763 | 0 | mlr.newSampleData(yValues, xValues); |
764 | 0 | result = calculateRegression(coefficient, validColumnIndices, mlr); |
765 | 0 | if (!result) { |
766 | 0 | return null; |
767 | |
} |
768 | |
} else { |
769 | 0 | if (LOG.isDebugEnabled()) { |
770 | 0 | LOG.debug( |
771 | |
"addRecord: coefficient was not in the regression, " + |
772 | |
"setting it to the extreme of the bound"); |
773 | |
} |
774 | 0 | result = false; |
775 | |
} |
776 | 0 | coefficient[coefIndex] = value; |
777 | |
} |
778 | 0 | return result; |
779 | |
} |
780 | |
|
781 | |
|
782 | |
|
783 | |
|
784 | |
|
785 | |
|
786 | |
|
787 | |
|
788 | |
private static boolean calculateRegression(double[] coefficient, |
789 | |
List<Integer> validColumnIndices, OLSMultipleLinearRegression mlr) { |
790 | |
|
791 | 0 | if (coefficient.length != validColumnIndices.size()) { |
792 | 0 | LOG.info("There are " + coefficient.length + |
793 | 0 | " coefficients, and " + validColumnIndices.size() + |
794 | |
" valid columns in the regression"); |
795 | |
} |
796 | |
|
797 | 0 | double[] beta = mlr.estimateRegressionParameters(); |
798 | 0 | Arrays.fill(coefficient, 0); |
799 | 0 | for (int i = 0; i < validColumnIndices.size(); ++i) { |
800 | 0 | coefficient[validColumnIndices.get(i)] = beta[i]; |
801 | |
} |
802 | 0 | coefficient[5] = beta[validColumnIndices.size()]; |
803 | 0 | return true; |
804 | |
} |
805 | |
|
806 | |
|
807 | |
|
808 | |
|
809 | |
|
810 | |
|
811 | |
|
812 | |
|
813 | |
private static void fillXMatrix(List<double[]> sourceValues, |
814 | |
List<Integer> validColumnIndices, |
815 | |
double[][] xValues) { |
816 | |
|
817 | 0 | for (int i = 0; i < sourceValues.size(); ++i) { |
818 | 0 | xValues[i] = new double[validColumnIndices.size() + 1]; |
819 | 0 | for (int j = 0; j < validColumnIndices.size(); ++j) { |
820 | 0 | xValues[i][j] = sourceValues.get(i)[validColumnIndices.get(j)]; |
821 | |
} |
822 | 0 | xValues[i][validColumnIndices.size()] = 1; |
823 | |
} |
824 | 0 | } |
825 | |
|
826 | |
|
827 | |
|
828 | |
|
829 | |
|
830 | |
|
831 | |
|
832 | |
|
833 | |
|
834 | |
private static boolean equal(double val1, double val2) { |
835 | 0 | return Math.abs(val1 - val2) < 0.01; |
836 | |
} |
837 | |
|
838 | |
|
839 | |
|
840 | |
|
841 | |
|
842 | |
|
843 | |
|
844 | |
|
845 | |
|
846 | |
private static boolean isLinearDependence(List<double[]> values, |
847 | |
int col1, int col2) { |
848 | 0 | boolean firstValSeen = false; |
849 | 0 | double firstVal = 0; |
850 | 0 | for (double[] value : values) { |
851 | 0 | double val1 = value[col1]; |
852 | 0 | double val2 = value[col2]; |
853 | 0 | if (equal(val1, 0)) { |
854 | 0 | if (equal(val2, 0)) { |
855 | 0 | continue; |
856 | |
} else { |
857 | 0 | return false; |
858 | |
} |
859 | |
} |
860 | 0 | if (equal(val2, 0)) { |
861 | 0 | return false; |
862 | |
} |
863 | 0 | if (!firstValSeen) { |
864 | 0 | firstVal = val1 / val2; |
865 | 0 | firstValSeen = true; |
866 | |
} else { |
867 | 0 | if (!equal((val1 / val2 - firstVal) / firstVal, 0)) { |
868 | 0 | return false; |
869 | |
} |
870 | |
} |
871 | 0 | } |
872 | 0 | return true; |
873 | |
} |
874 | |
|
875 | |
|
876 | |
|
877 | |
|
878 | |
private void printStats() { |
879 | 0 | if (LOG.isDebugEnabled()) { |
880 | 0 | StringBuilder sb = new StringBuilder(); |
881 | 0 | sb.append( |
882 | |
"\nEDGES\t\tVERTICES\t\tV_PROC\t\tRECEIVED\t\tOOC\t\tMEM_USED\n"); |
883 | 0 | for (int i = 0; i < dataSamples.size(); ++i) { |
884 | 0 | for (int j = 0; j < dataSamples.get(i).length; ++j) { |
885 | 0 | sb.append(String.format("%.2f\t\t", dataSamples.get(i)[j])); |
886 | |
} |
887 | 0 | sb.append(memorySamples.get(i)); |
888 | 0 | sb.append("\n"); |
889 | |
} |
890 | 0 | sb.append("COEFFICIENT:\n"); |
891 | 0 | for (int i = 0; i < coefficient.length; ++i) { |
892 | 0 | sb.append(String.format("%.2f\t\t", coefficient[i])); |
893 | |
} |
894 | 0 | sb.append("\n"); |
895 | 0 | LOG.debug("printStats: isValid=" + isValid + sb.toString()); |
896 | |
} |
897 | 0 | } |
898 | |
} |
899 | |
} |