1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils;
20
21 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
22 import org.apache.giraph.conf.GiraphConstants;
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.master.MasterObserver;
25 import org.apache.giraph.metrics.AggregatedMetrics;
26 import org.apache.giraph.partition.PartitionStats;
27 import org.apache.giraph.worker.WorkerObserver;
28 import org.apache.log4j.Logger;
29
30 import java.util.List;
31
32
33
34
35
36
37 public class ReactiveJMapHistoDumper extends
38 DefaultImmutableClassesGiraphConfigurable implements
39 MasterObserver, WorkerObserver {
40
41 private static final Logger LOG = Logger.getLogger(
42 ReactiveJMapHistoDumper.class);
43
44 private static final int MB = 1024 * 1024;
45
46
47 private int sleepMillis;
48
49 private int linesToPrint;
50
51 private int minFreeMemory;
52
53
54 private Thread thread;
55
56 private volatile boolean stop = false;
57
58 private String jmapPath;
59
60 @Override
61 public void preLoad() {
62
63 startSupervisorThread();
64 }
65
66 @Override
67 public void postSave() {
68
69 joinSupervisorThread();
70 }
71
72 @Override
73 public void preApplication() {
74 }
75
76 @Override
77 public void postApplication() {
78 }
79
80
81
82
83 private void joinSupervisorThread() {
84 stop = true;
85 try {
86 thread.join(sleepMillis + 5000);
87 } catch (InterruptedException e) {
88 LOG.error("Failed to join jmap thread");
89 }
90 }
91
92
93
94
95 public void startSupervisorThread() {
96 stop = false;
97 final Runtime runtime = Runtime.getRuntime();
98 thread = ThreadUtils.startThread(new Runnable() {
99 @Override
100 public void run() {
101 while (!stop) {
102 long potentialMemory = (runtime.maxMemory() -
103 runtime.totalMemory()) + runtime.freeMemory();
104 if (potentialMemory / MB < minFreeMemory) {
105 JMap.heapHistogramDump(linesToPrint, jmapPath);
106 }
107 ThreadUtils.trySleep(sleepMillis);
108 }
109 }
110 }, "ReactiveJMapHistoDumperSupervisorThread");
111 }
112
113 @Override
114 public void preSuperstep(long superstep) { }
115
116 @Override
117 public void postSuperstep(long superstep) { }
118
119 @Override
120 public void superstepMetricsUpdate(long superstep,
121 AggregatedMetrics aggregatedMetrics,
122 List<PartitionStats> partitionStatsList) { }
123
124 @Override
125 public void applicationFailed(Exception e) { }
126
127 @Override
128 public void setConf(ImmutableClassesGiraphConfiguration configuration) {
129 sleepMillis = GiraphConstants.JMAP_SLEEP_MILLIS.get(configuration);
130 linesToPrint = GiraphConstants.JMAP_PRINT_LINES.get(configuration);
131 minFreeMemory = GiraphConstants.MIN_FREE_MBS_ON_HEAP.get(configuration);
132 jmapPath = GiraphConstants.JMAP_PATH.get(configuration);
133 }
134 }