1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.metrics;
20
21 import org.apache.giraph.graph.GraphTaskManager;
22 import org.apache.giraph.ooc.OutOfCoreEngine;
23 import org.apache.giraph.ooc.OutOfCoreIOCallable;
24 import org.apache.giraph.worker.BspServiceWorker;
25 import org.apache.hadoop.io.Writable;
26
27 import com.yammer.metrics.core.Gauge;
28
29 import java.io.DataInput;
30 import java.io.DataOutput;
31 import java.io.IOException;
32 import java.io.PrintStream;
33 import java.util.concurrent.TimeUnit;
34
35
36
37
38 public class WorkerSuperstepMetrics implements Writable {
39
40 private LongAndTimeUnit commTimer;
41
42 private LongAndTimeUnit computeAllTimer;
43
44 private LongAndTimeUnit timeToFirstMsg;
45
46 private LongAndTimeUnit superstepTimer;
47
48 private LongAndTimeUnit waitRequestsTimer;
49
50 private LongAndTimeUnit superstepGCTimer;
51
52 private long bytesLoadedFromDisk;
53
54 private long bytesStoredOnDisk;
55
56 private double graphPercentageInMemory;
57
58
59
60
61 public WorkerSuperstepMetrics() {
62 commTimer = new LongAndTimeUnit();
63 computeAllTimer = new LongAndTimeUnit();
64 timeToFirstMsg = new LongAndTimeUnit();
65 superstepTimer = new LongAndTimeUnit();
66 waitRequestsTimer = new LongAndTimeUnit();
67 superstepGCTimer = new LongAndTimeUnit();
68 superstepGCTimer.setTimeUnit(TimeUnit.MILLISECONDS);
69 bytesLoadedFromDisk = 0;
70 bytesStoredOnDisk = 0;
71 graphPercentageInMemory = 100;
72 }
73
74
75
76
77
78
79 public WorkerSuperstepMetrics readFromRegistry() {
80 readGiraphTimer(GraphTaskManager.TIMER_COMMUNICATION_TIME, commTimer);
81 readGiraphTimer(GraphTaskManager.TIMER_COMPUTE_ALL, computeAllTimer);
82 readGiraphTimer(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG, timeToFirstMsg);
83 readGiraphTimer(GraphTaskManager.TIMER_SUPERSTEP_TIME, superstepTimer);
84 readGiraphTimer(BspServiceWorker.TIMER_WAIT_REQUESTS, waitRequestsTimer);
85 SuperstepMetricsRegistry registry = GiraphMetrics.get().perSuperstep();
86 superstepGCTimer.setValue(
87 registry.getCounter(GraphTaskManager.TIMER_SUPERSTEP_GC_TIME).count());
88 bytesLoadedFromDisk =
89 registry.getCounter(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK).count();
90 bytesStoredOnDisk =
91 registry.getCounter(OutOfCoreIOCallable.BYTES_STORE_TO_DISK).count();
92 Gauge<Double> gauge =
93 registry.getExistingGauge(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
94 if (gauge != null) {
95 graphPercentageInMemory = gauge.value();
96 }
97 return this;
98 }
99
100
101
102
103
104
105
106 private void readGiraphTimer(String name, LongAndTimeUnit data) {
107 Gauge<Long> gauge = GiraphMetrics.get().perSuperstep().
108 getExistingGauge(name);
109 if (gauge instanceof GiraphTimer) {
110 GiraphTimer giraphTimer = (GiraphTimer) gauge;
111 data.setTimeUnit(giraphTimer.getTimeUnit());
112 data.setValue(giraphTimer.value());
113 } else if (gauge != null) {
114 throw new IllegalStateException(name + " is not a GiraphTimer");
115 }
116 }
117
118
119
120
121
122
123
124
125 public WorkerSuperstepMetrics print(long superstep, PrintStream out) {
126 out.println();
127 out.println("--- METRICS: superstep " + superstep + " ---");
128 out.println(" superstep time: " + superstepTimer);
129 out.println(" compute all partitions: " + computeAllTimer);
130 out.println(" time spent in gc: " + superstepGCTimer);
131 out.println(" bytes transferred in out-of-core: " +
132 (bytesLoadedFromDisk + bytesStoredOnDisk));
133 out.println(" network communication time: " + commTimer);
134 out.println(" time to first message: " + timeToFirstMsg);
135 out.println(" wait on requests time: " + waitRequestsTimer);
136 return this;
137 }
138
139
140
141
142 public long getCommTimer() {
143 return commTimer.getValue();
144 }
145
146
147
148
149 public long getComputeAllTimer() {
150 return computeAllTimer.getValue();
151 }
152
153
154
155
156 public long getTimeToFirstMsg() {
157 return timeToFirstMsg.getValue();
158 }
159
160
161
162
163 public long getSuperstepTimer() {
164 return superstepTimer.getValue();
165 }
166
167
168
169
170 public long getWaitRequestsTimer() {
171 return waitRequestsTimer.getValue();
172 }
173
174
175
176
177
178 public long getBytesLoadedFromDisk() {
179 return bytesLoadedFromDisk;
180 }
181
182
183
184
185
186 public long getBytesStoredOnDisk() {
187 return bytesStoredOnDisk;
188 }
189
190
191
192
193 public double getGraphPercentageInMemory() {
194 return graphPercentageInMemory;
195 }
196
197 @Override
198 public void readFields(DataInput dataInput) throws IOException {
199 commTimer.setValue(dataInput.readLong());
200 computeAllTimer.setValue(dataInput.readLong());
201 timeToFirstMsg.setValue(dataInput.readLong());
202 superstepTimer.setValue(dataInput.readLong());
203 waitRequestsTimer.setValue(dataInput.readLong());
204 bytesLoadedFromDisk = dataInput.readLong();
205 bytesStoredOnDisk = dataInput.readLong();
206 graphPercentageInMemory = dataInput.readDouble();
207 }
208
209 @Override
210 public void write(DataOutput dataOutput) throws IOException {
211 dataOutput.writeLong(commTimer.getValue());
212 dataOutput.writeLong(computeAllTimer.getValue());
213 dataOutput.writeLong(timeToFirstMsg.getValue());
214 dataOutput.writeLong(superstepTimer.getValue());
215 dataOutput.writeLong(waitRequestsTimer.getValue());
216 dataOutput.writeLong(bytesLoadedFromDisk);
217 dataOutput.writeLong(bytesStoredOnDisk);
218 dataOutput.writeDouble(graphPercentageInMemory);
219 }
220 }