1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.master; |
20 | |
|
21 | |
import com.google.common.collect.Lists; |
22 | |
import com.google.common.collect.Sets; |
23 | |
import net.iharder.Base64; |
24 | |
import org.apache.commons.io.FilenameUtils; |
25 | |
import org.apache.giraph.bsp.ApplicationState; |
26 | |
import org.apache.giraph.bsp.BspInputFormat; |
27 | |
import org.apache.giraph.bsp.BspService; |
28 | |
import org.apache.giraph.bsp.CentralizedServiceMaster; |
29 | |
import org.apache.giraph.bsp.SuperstepState; |
30 | |
import org.apache.giraph.bsp.checkpoints.CheckpointStatus; |
31 | |
import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker; |
32 | |
import org.apache.giraph.comm.MasterClient; |
33 | |
import org.apache.giraph.comm.MasterServer; |
34 | |
import org.apache.giraph.comm.netty.NettyClient; |
35 | |
import org.apache.giraph.comm.netty.NettyMasterClient; |
36 | |
import org.apache.giraph.comm.netty.NettyMasterServer; |
37 | |
import org.apache.giraph.comm.requests.AddressesAndPartitionsRequest; |
38 | |
import org.apache.giraph.conf.GiraphConfiguration; |
39 | |
import org.apache.giraph.conf.GiraphConstants; |
40 | |
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
41 | |
import org.apache.giraph.counters.CustomCounter; |
42 | |
import org.apache.giraph.counters.CustomCounters; |
43 | |
import org.apache.giraph.counters.GiraphCountersThriftStruct; |
44 | |
import org.apache.giraph.counters.GiraphStats; |
45 | |
import org.apache.giraph.counters.GiraphTimers; |
46 | |
import org.apache.giraph.graph.AddressesAndPartitionsWritable; |
47 | |
import org.apache.giraph.graph.GlobalStats; |
48 | |
import org.apache.giraph.graph.GraphFunctions; |
49 | |
import org.apache.giraph.graph.GraphState; |
50 | |
import org.apache.giraph.graph.GraphTaskManager; |
51 | |
import org.apache.giraph.io.EdgeInputFormat; |
52 | |
import org.apache.giraph.io.GiraphInputFormat; |
53 | |
import org.apache.giraph.io.InputType; |
54 | |
import org.apache.giraph.io.MappingInputFormat; |
55 | |
import org.apache.giraph.io.VertexInputFormat; |
56 | |
import org.apache.giraph.master.input.MasterInputSplitsHandler; |
57 | |
import org.apache.giraph.metrics.AggregatedMetrics; |
58 | |
import org.apache.giraph.metrics.GiraphMetrics; |
59 | |
import org.apache.giraph.metrics.GiraphTimer; |
60 | |
import org.apache.giraph.metrics.GiraphTimerContext; |
61 | |
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; |
62 | |
import org.apache.giraph.metrics.SuperstepMetricsRegistry; |
63 | |
import org.apache.giraph.metrics.WorkerSuperstepMetrics; |
64 | |
import org.apache.giraph.partition.BasicPartitionOwner; |
65 | |
import org.apache.giraph.partition.MasterGraphPartitioner; |
66 | |
import org.apache.giraph.partition.PartitionOwner; |
67 | |
import org.apache.giraph.partition.PartitionStats; |
68 | |
import org.apache.giraph.partition.PartitionUtils; |
69 | |
import org.apache.giraph.time.SystemTime; |
70 | |
import org.apache.giraph.time.Time; |
71 | |
import org.apache.giraph.utils.CheckpointingUtils; |
72 | |
import org.apache.giraph.utils.JMapHistoDumper; |
73 | |
import org.apache.giraph.utils.ReactiveJMapHistoDumper; |
74 | |
import org.apache.giraph.utils.ReflectionUtils; |
75 | |
import org.apache.giraph.utils.WritableUtils; |
76 | |
import org.apache.giraph.worker.WorkerInfo; |
77 | |
import org.apache.giraph.zk.BspEvent; |
78 | |
import org.apache.giraph.zk.PredicateLock; |
79 | |
import org.apache.hadoop.fs.FSDataOutputStream; |
80 | |
import org.apache.hadoop.fs.FileSystem; |
81 | |
import org.apache.hadoop.fs.Path; |
82 | |
import org.apache.hadoop.io.Writable; |
83 | |
import org.apache.hadoop.io.WritableComparable; |
84 | |
import org.apache.hadoop.mapred.JobID; |
85 | |
import org.apache.hadoop.mapred.RunningJob; |
86 | |
import org.apache.hadoop.mapreduce.Counter; |
87 | |
import org.apache.hadoop.mapreduce.InputSplit; |
88 | |
import org.apache.hadoop.mapreduce.Mapper; |
89 | |
import org.apache.log4j.Logger; |
90 | |
import org.apache.zookeeper.CreateMode; |
91 | |
import org.apache.zookeeper.KeeperException; |
92 | |
import org.apache.zookeeper.WatchedEvent; |
93 | |
import org.apache.zookeeper.Watcher.Event.EventType; |
94 | |
import org.apache.zookeeper.ZooDefs.Ids; |
95 | |
import org.json.JSONArray; |
96 | |
import org.json.JSONException; |
97 | |
import org.json.JSONObject; |
98 | |
|
99 | |
import java.io.DataInputStream; |
100 | |
import java.io.IOException; |
101 | |
import java.io.PrintStream; |
102 | |
import java.nio.charset.Charset; |
103 | |
import java.util.ArrayList; |
104 | |
import java.util.Collection; |
105 | |
import java.util.Collections; |
106 | |
import java.util.Comparator; |
107 | |
import java.util.HashSet; |
108 | |
import java.util.List; |
109 | |
import java.util.Map; |
110 | |
import java.util.Set; |
111 | |
import java.util.TreeSet; |
112 | |
import java.util.concurrent.TimeUnit; |
113 | |
|
114 | |
import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT; |
115 | |
import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA; |
116 | |
import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT; |
117 | |
|
118 | |
|
119 | |
|
120 | |
|
121 | |
|
122 | |
|
123 | |
|
124 | |
|
125 | |
@SuppressWarnings("rawtypes, unchecked") |
126 | |
public class BspServiceMaster<I extends WritableComparable, |
127 | |
V extends Writable, E extends Writable> |
128 | |
extends BspService<I, V, E> |
129 | |
implements CentralizedServiceMaster<I, V, E>, |
130 | |
ResetSuperstepMetricsObserver { |
131 | |
|
132 | |
public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10; |
133 | |
|
134 | |
public static final String NUM_MASTER_ZK_INPUT_SPLIT_THREADS = |
135 | |
"giraph.numMasterZkInputSplitThreads"; |
136 | |
|
137 | |
public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1; |
138 | |
|
139 | 0 | private static final Time TIME = SystemTime.get(); |
140 | |
|
141 | 0 | private static final Logger LOG = Logger.getLogger(BspServiceMaster.class); |
142 | |
|
143 | 0 | private boolean isMaster = false; |
144 | |
|
145 | |
private final int maxWorkers; |
146 | |
|
147 | |
private final int minWorkers; |
148 | |
|
149 | |
private final int maxNumberOfSupersteps; |
150 | |
|
151 | |
private final float minPercentResponded; |
152 | |
|
153 | |
private final int eventWaitMsecs; |
154 | |
|
155 | |
private final int maxSuperstepWaitMsecs; |
156 | |
|
157 | |
|
158 | |
private final int maxCounterWaitMsecs; |
159 | |
|
160 | |
private final int partitionLongTailMinPrint; |
161 | |
|
162 | 0 | private long lastCheckpointedSuperstep = -1; |
163 | |
|
164 | |
private final BspEvent workerWroteCheckpoint; |
165 | |
|
166 | |
private final BspEvent superstepStateChanged; |
167 | |
|
168 | |
private final MasterGraphPartitioner<I, V, E> masterGraphPartitioner; |
169 | |
|
170 | 0 | private final List<PartitionStats> allPartitionStatsList = |
171 | |
new ArrayList<PartitionStats>(); |
172 | |
|
173 | |
private MasterGlobalCommHandler globalCommHandler; |
174 | |
|
175 | |
private AggregatorToGlobalCommTranslation aggregatorTranslation; |
176 | |
|
177 | |
private MasterCompute masterCompute; |
178 | |
|
179 | |
private MasterClient masterClient; |
180 | |
|
181 | |
private MasterServer masterServer; |
182 | |
|
183 | |
private MasterInfo masterInfo; |
184 | |
|
185 | 0 | private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList(); |
186 | |
|
187 | |
private final MasterObserver[] observers; |
188 | |
|
189 | |
|
190 | |
|
191 | |
private GiraphTimer masterComputeTimer; |
192 | |
|
193 | |
|
194 | |
private final int checkpointFrequency; |
195 | |
|
196 | |
private CheckpointStatus checkpointStatus; |
197 | |
|
198 | |
private final CheckpointSupportedChecker checkpointSupportedChecker; |
199 | |
|
200 | |
private final GiraphCountersThriftStruct giraphCountersThriftStruct; |
201 | |
|
202 | |
|
203 | |
|
204 | |
|
205 | |
|
206 | |
|
207 | |
|
208 | |
public BspServiceMaster( |
209 | |
Mapper<?, ?, ?, ?>.Context context, |
210 | |
GraphTaskManager<I, V, E> graphTaskManager) { |
211 | 0 | super(context, graphTaskManager); |
212 | 0 | workerWroteCheckpoint = new PredicateLock(context); |
213 | 0 | registerBspEvent(workerWroteCheckpoint); |
214 | 0 | superstepStateChanged = new PredicateLock(context); |
215 | 0 | registerBspEvent(superstepStateChanged); |
216 | |
|
217 | 0 | ImmutableClassesGiraphConfiguration<I, V, E> conf = |
218 | 0 | getConfiguration(); |
219 | |
|
220 | 0 | maxWorkers = conf.getMaxWorkers(); |
221 | 0 | minWorkers = conf.getMinWorkers(); |
222 | 0 | maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps(); |
223 | 0 | minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(conf); |
224 | 0 | eventWaitMsecs = conf.getEventWaitMsecs(); |
225 | 0 | maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs(); |
226 | 0 | maxCounterWaitMsecs = conf.getMaxCounterWaitMsecs(); |
227 | 0 | partitionLongTailMinPrint = PARTITION_LONG_TAIL_MIN_PRINT.get(conf); |
228 | 0 | masterGraphPartitioner = |
229 | 0 | getGraphPartitionerFactory().createMasterGraphPartitioner(); |
230 | 0 | if (conf.isJMapHistogramDumpEnabled()) { |
231 | 0 | conf.addMasterObserverClass(JMapHistoDumper.class); |
232 | |
} |
233 | 0 | if (conf.isReactiveJmapHistogramDumpEnabled()) { |
234 | 0 | conf.addMasterObserverClass(ReactiveJMapHistoDumper.class); |
235 | |
} |
236 | 0 | observers = conf.createMasterObservers(context); |
237 | |
|
238 | 0 | this.checkpointFrequency = conf.getCheckpointFrequency(); |
239 | 0 | this.checkpointStatus = CheckpointStatus.NONE; |
240 | 0 | this.checkpointSupportedChecker = |
241 | 0 | ReflectionUtils.newInstance( |
242 | 0 | GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.get(conf)); |
243 | 0 | this.giraphCountersThriftStruct = new GiraphCountersThriftStruct(); |
244 | |
|
245 | 0 | GiraphMetrics.get().addSuperstepResetObserver(this); |
246 | 0 | GiraphStats.init(context); |
247 | 0 | } |
248 | |
|
249 | |
@Override |
250 | |
public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) { |
251 | 0 | masterComputeTimer = new GiraphTimer(superstepMetrics, |
252 | |
"master-compute-call", TimeUnit.MILLISECONDS); |
253 | 0 | } |
254 | |
|
255 | |
@Override |
256 | |
public void setJobState(ApplicationState state, |
257 | |
long applicationAttempt, |
258 | |
long desiredSuperstep) { |
259 | 0 | setJobState(state, applicationAttempt, desiredSuperstep, true); |
260 | 0 | } |
261 | |
|
262 | |
|
263 | |
|
264 | |
|
265 | |
|
266 | |
|
267 | |
|
268 | |
|
269 | |
|
270 | |
|
271 | |
private void setJobState(ApplicationState state, |
272 | |
long applicationAttempt, |
273 | |
long desiredSuperstep, |
274 | |
boolean killJobOnFailure) { |
275 | 0 | JSONObject jobState = new JSONObject(); |
276 | |
try { |
277 | 0 | jobState.put(JSONOBJ_STATE_KEY, state.toString()); |
278 | 0 | jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt); |
279 | 0 | jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep); |
280 | 0 | } catch (JSONException e) { |
281 | 0 | throw new RuntimeException("setJobState: Couldn't put " + |
282 | 0 | state.toString()); |
283 | 0 | } |
284 | 0 | if (LOG.isInfoEnabled()) { |
285 | 0 | LOG.info("setJobState: " + jobState.toString() + " on superstep " + |
286 | 0 | getSuperstep()); |
287 | |
} |
288 | |
try { |
289 | 0 | getZkExt().createExt(masterJobStatePath + "/jobState", |
290 | 0 | jobState.toString().getBytes(Charset.defaultCharset()), |
291 | |
Ids.OPEN_ACL_UNSAFE, |
292 | |
CreateMode.PERSISTENT_SEQUENTIAL, |
293 | |
true); |
294 | 0 | LOG.info("setJobState: " + jobState); |
295 | 0 | } catch (KeeperException.NodeExistsException e) { |
296 | 0 | throw new IllegalStateException( |
297 | |
"setJobState: Imposible that " + |
298 | |
masterJobStatePath + " already exists!", e); |
299 | 0 | } catch (KeeperException e) { |
300 | 0 | throw new IllegalStateException( |
301 | |
"setJobState: Unknown KeeperException for " + |
302 | |
masterJobStatePath, e); |
303 | 0 | } catch (InterruptedException e) { |
304 | 0 | throw new IllegalStateException( |
305 | |
"setJobState: Unknown InterruptedException for " + |
306 | |
masterJobStatePath, e); |
307 | 0 | } |
308 | 0 | if (state == ApplicationState.FAILED && killJobOnFailure) { |
309 | 0 | failJob(new IllegalStateException("FAILED")); |
310 | |
} |
311 | |
|
312 | 0 | } |
313 | |
|
314 | |
|
315 | |
|
316 | |
|
317 | |
|
318 | |
|
319 | |
|
320 | |
private void setJobStateFailed(String reason) { |
321 | 0 | getGraphTaskManager().getJobProgressTracker().logFailure(reason); |
322 | 0 | setJobState(ApplicationState.FAILED, -1, -1, false); |
323 | 0 | failJob(new IllegalStateException(reason)); |
324 | 0 | } |
325 | |
|
326 | |
|
327 | |
|
328 | |
|
329 | |
|
330 | |
|
331 | |
|
332 | |
|
333 | |
|
334 | |
private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat, |
335 | |
int minSplitCountHint, |
336 | |
InputType inputSplitType) { |
337 | 0 | String logPrefix = "generate" + inputSplitType + "InputSplits"; |
338 | |
List<InputSplit> splits; |
339 | |
try { |
340 | 0 | splits = inputFormat.getSplits(getContext(), minSplitCountHint); |
341 | 0 | } catch (IOException e) { |
342 | 0 | throw new IllegalStateException(logPrefix + ": Got IOException", e); |
343 | 0 | } catch (InterruptedException e) { |
344 | 0 | throw new IllegalStateException( |
345 | |
logPrefix + ": Got InterruptedException", e); |
346 | 0 | } |
347 | 0 | float samplePercent = |
348 | 0 | INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration()); |
349 | 0 | if (samplePercent != INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) { |
350 | 0 | int lastIndex = (int) (samplePercent * splits.size() / 100f); |
351 | 0 | Collections.shuffle(splits); |
352 | 0 | List<InputSplit> sampleSplits = splits.subList(0, lastIndex); |
353 | 0 | LOG.warn(logPrefix + ": Using sampling - Processing only " + |
354 | 0 | sampleSplits.size() + " instead of " + splits.size() + |
355 | |
" expected splits."); |
356 | 0 | return sampleSplits; |
357 | |
} else { |
358 | 0 | if (LOG.isInfoEnabled()) { |
359 | 0 | LOG.info(logPrefix + ": Got " + splits.size() + |
360 | |
" input splits for " + minSplitCountHint + " input threads"); |
361 | |
} |
362 | 0 | return splits; |
363 | |
} |
364 | |
} |
365 | |
|
366 | |
|
367 | |
|
368 | |
|
369 | |
|
370 | |
|
371 | |
private void failJob(Exception e) { |
372 | 0 | LOG.fatal("failJob: Killing job " + getJobId()); |
373 | 0 | LOG.fatal("failJob: exception " + e.toString()); |
374 | |
try { |
375 | 0 | if (getConfiguration().isPureYarnJob()) { |
376 | 0 | throw new RuntimeException( |
377 | |
"BspServiceMaster (YARN profile) is " + |
378 | |
"FAILING this task, throwing exception to end job run.", e); |
379 | |
} else { |
380 | |
@SuppressWarnings("deprecation") |
381 | 0 | org.apache.hadoop.mapred.JobClient jobClient = |
382 | |
new org.apache.hadoop.mapred.JobClient( |
383 | |
(org.apache.hadoop.mapred.JobConf) |
384 | 0 | getContext().getConfiguration()); |
385 | |
try { |
386 | |
@SuppressWarnings("deprecation") |
387 | 0 | JobID jobId = JobID.forName(getJobId()); |
388 | 0 | RunningJob job = jobClient.getJob(jobId); |
389 | 0 | if (job != null) { |
390 | 0 | job.killJob(); |
391 | |
} else { |
392 | 0 | LOG.error("Job not found for jobId=" + getJobId()); |
393 | |
} |
394 | 0 | } catch (IllegalArgumentException iae) { |
395 | 0 | LOG.info("This job (" + getJobId() + |
396 | |
") is not a legacy Hadoop job and will " + |
397 | |
"continue with failure cleanup." + |
398 | 0 | e.getMessage(), |
399 | |
e); |
400 | 0 | } |
401 | |
} |
402 | 0 | } catch (IOException ioe) { |
403 | 0 | throw new RuntimeException(ioe); |
404 | |
} finally { |
405 | 0 | failureCleanup(e); |
406 | 0 | } |
407 | 0 | } |
408 | |
|
409 | |
|
410 | |
|
411 | |
|
412 | |
|
413 | |
|
414 | |
|
415 | |
|
416 | |
|
417 | |
private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath, |
418 | |
boolean watch) { |
419 | 0 | List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>(); |
420 | |
List<String> workerInfoPathList; |
421 | |
try { |
422 | 0 | workerInfoPathList = |
423 | 0 | getZkExt().getChildrenExt(workerInfosPath, watch, false, true); |
424 | 0 | } catch (KeeperException e) { |
425 | 0 | throw new IllegalStateException( |
426 | |
"getWorkers: Got KeeperException", e); |
427 | 0 | } catch (InterruptedException e) { |
428 | 0 | throw new IllegalStateException( |
429 | |
"getWorkers: Got InterruptedStateException", e); |
430 | 0 | } |
431 | 0 | for (String workerInfoPath : workerInfoPathList) { |
432 | 0 | WorkerInfo workerInfo = new WorkerInfo(); |
433 | |
try { |
434 | 0 | WritableUtils.readFieldsFromZnode( |
435 | 0 | getZkExt(), workerInfoPath, true, null, workerInfo); |
436 | 0 | workerInfoList.add(workerInfo); |
437 | 0 | } catch (IllegalStateException e) { |
438 | 0 | LOG.warn("Can't get info from worker, did it die in between? " + |
439 | |
"workerInfoPath=" + workerInfoPath, e); |
440 | 0 | } |
441 | 0 | } |
442 | 0 | return workerInfoList; |
443 | |
} |
444 | |
|
445 | |
|
446 | |
|
447 | |
|
448 | |
|
449 | |
|
450 | |
|
451 | |
|
452 | |
|
453 | |
private void getAllWorkerInfos( |
454 | |
long superstep, |
455 | |
List<WorkerInfo> healthyWorkerInfoList, |
456 | |
List<WorkerInfo> unhealthyWorkerInfoList) { |
457 | 0 | String healthyWorkerInfoPath = |
458 | 0 | getWorkerInfoHealthyPath(getApplicationAttempt(), superstep); |
459 | 0 | String unhealthyWorkerInfoPath = |
460 | 0 | getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep); |
461 | |
|
462 | |
try { |
463 | 0 | getZkExt().createOnceExt(healthyWorkerInfoPath, |
464 | |
null, |
465 | |
Ids.OPEN_ACL_UNSAFE, |
466 | |
CreateMode.PERSISTENT, |
467 | |
true); |
468 | 0 | } catch (KeeperException e) { |
469 | 0 | throw new IllegalStateException("getWorkers: KeeperException", e); |
470 | 0 | } catch (InterruptedException e) { |
471 | 0 | throw new IllegalStateException("getWorkers: InterruptedException", e); |
472 | 0 | } |
473 | |
|
474 | |
try { |
475 | 0 | getZkExt().createOnceExt(unhealthyWorkerInfoPath, |
476 | |
null, |
477 | |
Ids.OPEN_ACL_UNSAFE, |
478 | |
CreateMode.PERSISTENT, |
479 | |
true); |
480 | 0 | } catch (KeeperException e) { |
481 | 0 | throw new IllegalStateException("getWorkers: KeeperException", e); |
482 | 0 | } catch (InterruptedException e) { |
483 | 0 | throw new IllegalStateException("getWorkers: InterruptedException", e); |
484 | 0 | } |
485 | |
|
486 | 0 | List<WorkerInfo> currentHealthyWorkerInfoList = |
487 | 0 | getWorkerInfosFromPath(healthyWorkerInfoPath, true); |
488 | 0 | List<WorkerInfo> currentUnhealthyWorkerInfoList = |
489 | 0 | getWorkerInfosFromPath(unhealthyWorkerInfoPath, false); |
490 | |
|
491 | 0 | healthyWorkerInfoList.clear(); |
492 | 0 | if (currentHealthyWorkerInfoList != null) { |
493 | |
for (WorkerInfo healthyWorkerInfo : |
494 | 0 | currentHealthyWorkerInfoList) { |
495 | 0 | healthyWorkerInfoList.add(healthyWorkerInfo); |
496 | 0 | } |
497 | |
} |
498 | |
|
499 | 0 | unhealthyWorkerInfoList.clear(); |
500 | 0 | if (currentUnhealthyWorkerInfoList != null) { |
501 | |
for (WorkerInfo unhealthyWorkerInfo : |
502 | 0 | currentUnhealthyWorkerInfoList) { |
503 | 0 | unhealthyWorkerInfoList.add(unhealthyWorkerInfo); |
504 | 0 | } |
505 | |
} |
506 | 0 | } |
507 | |
|
508 | |
@Override |
509 | |
public List<WorkerInfo> checkWorkers() { |
510 | 0 | boolean failJob = true; |
511 | |
long failWorkerCheckMsecs = |
512 | 0 | SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs; |
513 | 0 | List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>(); |
514 | 0 | List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>(); |
515 | 0 | int totalResponses = -1; |
516 | 0 | while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) { |
517 | 0 | getContext().progress(); |
518 | 0 | getAllWorkerInfos( |
519 | 0 | getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList); |
520 | 0 | totalResponses = healthyWorkerInfoList.size() + |
521 | 0 | unhealthyWorkerInfoList.size(); |
522 | 0 | if ((totalResponses * 100.0f / maxWorkers) >= |
523 | |
minPercentResponded) { |
524 | 0 | failJob = false; |
525 | 0 | break; |
526 | |
} |
527 | 0 | getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " + |
528 | |
"checkWorkers: Only found " + |
529 | |
totalResponses + |
530 | |
" responses of " + maxWorkers + |
531 | |
" needed to start superstep " + |
532 | 0 | getSuperstep()); |
533 | 0 | if (getWorkerHealthRegistrationChangedEvent().waitMsecs( |
534 | |
eventWaitMsecs)) { |
535 | 0 | if (LOG.isDebugEnabled()) { |
536 | 0 | LOG.debug("checkWorkers: Got event that health " + |
537 | |
"registration changed, not using poll attempt"); |
538 | |
} |
539 | 0 | getWorkerHealthRegistrationChangedEvent().reset(); |
540 | 0 | continue; |
541 | |
} |
542 | 0 | if (LOG.isInfoEnabled()) { |
543 | 0 | LOG.info("checkWorkers: Only found " + totalResponses + |
544 | |
" responses of " + maxWorkers + |
545 | |
" needed to start superstep " + |
546 | 0 | getSuperstep() + ". Reporting every " + |
547 | |
eventWaitMsecs + " msecs, " + |
548 | 0 | (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) + |
549 | |
" more msecs left before giving up."); |
550 | |
|
551 | 0 | if ((maxWorkers - totalResponses) <= |
552 | |
partitionLongTailMinPrint) { |
553 | 0 | logMissingWorkersOnSuperstep(healthyWorkerInfoList, |
554 | |
unhealthyWorkerInfoList); |
555 | |
} |
556 | |
} |
557 | |
} |
558 | 0 | if (failJob) { |
559 | 0 | LOG.error("checkWorkers: Did not receive enough processes in " + |
560 | |
"time (only " + totalResponses + " of " + |
561 | |
minWorkers + " required) after waiting " + maxSuperstepWaitMsecs + |
562 | |
"msecs). This occurs if you do not have enough map tasks " + |
563 | |
"available simultaneously on your Hadoop instance to fulfill " + |
564 | |
"the number of requested workers."); |
565 | 0 | return null; |
566 | |
} |
567 | |
|
568 | 0 | if (healthyWorkerInfoList.size() < minWorkers) { |
569 | 0 | LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() + |
570 | |
" available when " + minWorkers + " are required."); |
571 | 0 | logMissingWorkersOnSuperstep(healthyWorkerInfoList, |
572 | |
unhealthyWorkerInfoList); |
573 | 0 | return null; |
574 | |
} |
575 | |
|
576 | 0 | getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " + |
577 | |
"checkWorkers: Done - Found " + totalResponses + |
578 | |
" responses of " + maxWorkers + " needed to start superstep " + |
579 | 0 | getSuperstep()); |
580 | |
|
581 | 0 | return healthyWorkerInfoList; |
582 | |
} |
583 | |
|
584 | |
|
585 | |
|
586 | |
|
587 | |
|
588 | |
|
589 | |
|
590 | |
private void logMissingWorkersOnSuperstep( |
591 | |
List<WorkerInfo> healthyWorkerInfoList, |
592 | |
List<WorkerInfo> unhealthyWorkerInfoList) { |
593 | 0 | if (LOG.isInfoEnabled()) { |
594 | 0 | Set<Integer> partitionSet = new TreeSet<Integer>(); |
595 | 0 | for (WorkerInfo workerInfo : healthyWorkerInfoList) { |
596 | 0 | partitionSet.add(workerInfo.getTaskId() % maxWorkers); |
597 | 0 | } |
598 | 0 | for (WorkerInfo workerInfo : unhealthyWorkerInfoList) { |
599 | 0 | partitionSet.add(workerInfo.getTaskId() % maxWorkers); |
600 | 0 | } |
601 | 0 | for (int i = 1; i <= maxWorkers; ++i) { |
602 | 0 | if (partitionSet.contains(Integer.valueOf(i))) { |
603 | 0 | continue; |
604 | 0 | } else if (i == getTaskId() % maxWorkers) { |
605 | 0 | continue; |
606 | |
} else { |
607 | 0 | LOG.info("logMissingWorkersOnSuperstep: No response from " + |
608 | |
"partition " + i + " (could be master)"); |
609 | |
} |
610 | |
} |
611 | |
} |
612 | 0 | } |
613 | |
|
614 | |
|
615 | |
|
616 | |
|
617 | |
|
618 | |
|
619 | |
|
620 | |
|
621 | |
|
622 | |
private int createInputSplits(GiraphInputFormat inputFormat, |
623 | |
InputType inputSplitType) { |
624 | 0 | ImmutableClassesGiraphConfiguration conf = getConfiguration(); |
625 | 0 | String logPrefix = "create" + inputSplitType + "InputSplits"; |
626 | |
|
627 | |
|
628 | |
|
629 | |
|
630 | 0 | List<WorkerInfo> healthyWorkerInfoList = checkWorkers(); |
631 | 0 | if (healthyWorkerInfoList == null) { |
632 | 0 | setJobStateFailed("Not enough healthy workers to create input splits"); |
633 | 0 | return -1; |
634 | |
} |
635 | 0 | globalCommHandler.getInputSplitsHandler().initialize(masterClient, |
636 | |
healthyWorkerInfoList); |
637 | |
|
638 | |
|
639 | 0 | int minSplitCountHint = healthyWorkerInfoList.size() * |
640 | 0 | conf.getNumInputSplitsThreads(); |
641 | |
|
642 | |
|
643 | |
|
644 | 0 | List<InputSplit> splitList = generateInputSplits(inputFormat, |
645 | |
minSplitCountHint, inputSplitType); |
646 | |
|
647 | 0 | if (splitList.isEmpty() && GiraphConstants.FAIL_ON_EMPTY_INPUT.get(conf)) { |
648 | 0 | LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " + |
649 | 0 | "check input of " + inputFormat.getClass().getName() + "!"); |
650 | 0 | getContext().setStatus("Failing job due to 0 input splits, " + |
651 | 0 | "check input of " + inputFormat.getClass().getName() + "!"); |
652 | 0 | setJobStateFailed("******* PLEASE CHECK YOUR INPUT TABLES - PARTITIONS " + |
653 | |
"WHICH YOU SPECIFIED ARE MISSING (for " + inputSplitType + |
654 | |
" input). FAILING THE JOB *******"); |
655 | |
} |
656 | 0 | if (minSplitCountHint > splitList.size()) { |
657 | 0 | LOG.warn(logPrefix + ": Number of inputSplits=" + |
658 | 0 | splitList.size() + " < " + |
659 | |
minSplitCountHint + |
660 | |
"=total number of input threads, " + |
661 | |
"some threads will be not used"); |
662 | |
} |
663 | |
|
664 | 0 | globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType, |
665 | |
splitList, inputFormat); |
666 | |
|
667 | 0 | return splitList.size(); |
668 | |
} |
669 | |
|
670 | |
@Override |
671 | |
public int createMappingInputSplits() { |
672 | 0 | if (!getConfiguration().hasMappingInputFormat()) { |
673 | 0 | return 0; |
674 | |
} |
675 | 0 | MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat = |
676 | 0 | getConfiguration().createWrappedMappingInputFormat(); |
677 | 0 | return createInputSplits(mappingInputFormat, InputType.MAPPING); |
678 | |
} |
679 | |
|
680 | |
@Override |
681 | |
public int createVertexInputSplits() { |
682 | 0 | int splits = 0; |
683 | 0 | if (getConfiguration().hasVertexInputFormat()) { |
684 | 0 | VertexInputFormat<I, V, E> vertexInputFormat = |
685 | 0 | getConfiguration().createWrappedVertexInputFormat(); |
686 | 0 | splits = createInputSplits(vertexInputFormat, InputType.VERTEX); |
687 | |
} |
688 | 0 | MasterProgress.get().setVertexInputSplitCount(splits); |
689 | 0 | getJobProgressTracker().updateMasterProgress(MasterProgress.get()); |
690 | 0 | return splits; |
691 | |
} |
692 | |
|
693 | |
@Override |
694 | |
public int createEdgeInputSplits() { |
695 | 0 | int splits = 0; |
696 | 0 | if (getConfiguration().hasEdgeInputFormat()) { |
697 | 0 | EdgeInputFormat<I, E> edgeInputFormat = |
698 | 0 | getConfiguration().createWrappedEdgeInputFormat(); |
699 | 0 | splits = createInputSplits(edgeInputFormat, InputType.EDGE); |
700 | |
} |
701 | 0 | MasterProgress.get().setEdgeInputSplitsCount(splits); |
702 | 0 | getJobProgressTracker().updateMasterProgress(MasterProgress.get()); |
703 | 0 | return splits; |
704 | |
} |
705 | |
|
706 | |
@Override |
707 | |
public List<WorkerInfo> getWorkerInfoList() { |
708 | 0 | return chosenWorkerInfoList; |
709 | |
} |
710 | |
|
711 | |
@Override |
712 | |
public MasterGlobalCommHandler getGlobalCommHandler() { |
713 | 0 | return globalCommHandler; |
714 | |
} |
715 | |
|
716 | |
@Override |
717 | |
public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() { |
718 | 0 | return aggregatorTranslation; |
719 | |
} |
720 | |
|
721 | |
@Override |
722 | |
public MasterCompute getMasterCompute() { |
723 | 0 | return masterCompute; |
724 | |
} |
725 | |
|
726 | |
|
727 | |
|
728 | |
|
729 | |
|
730 | |
|
731 | |
|
732 | |
|
733 | |
|
734 | |
|
735 | |
|
736 | |
|
737 | |
|
738 | |
|
739 | |
private Collection<PartitionOwner> prepareCheckpointRestart(long superstep) |
740 | |
throws IOException, KeeperException, InterruptedException { |
741 | 0 | List<PartitionOwner> partitionOwners = new ArrayList<>(); |
742 | 0 | FileSystem fs = getFs(); |
743 | 0 | String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) + |
744 | |
CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX; |
745 | 0 | LOG.info("Loading checkpoint from " + finalizedCheckpointPath); |
746 | 0 | DataInputStream finalizedStream = |
747 | 0 | fs.open(new Path(finalizedCheckpointPath)); |
748 | 0 | GlobalStats globalStats = new GlobalStats(); |
749 | 0 | globalStats.readFields(finalizedStream); |
750 | 0 | updateCounters(globalStats); |
751 | 0 | SuperstepClasses superstepClasses = |
752 | 0 | SuperstepClasses.createToRead(getConfiguration()); |
753 | 0 | superstepClasses.readFields(finalizedStream); |
754 | 0 | getConfiguration().updateSuperstepClasses(superstepClasses); |
755 | 0 | int prefixFileCount = finalizedStream.readInt(); |
756 | |
|
757 | 0 | String checkpointFile = |
758 | 0 | finalizedStream.readUTF(); |
759 | 0 | for (int i = 0; i < prefixFileCount; ++i) { |
760 | 0 | int mrTaskId = finalizedStream.readInt(); |
761 | |
|
762 | 0 | DataInputStream metadataStream = fs.open(new Path(checkpointFile + |
763 | |
"." + mrTaskId + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX)); |
764 | 0 | long partitions = metadataStream.readInt(); |
765 | 0 | WorkerInfo worker = getWorkerInfoById(mrTaskId); |
766 | 0 | for (long p = 0; p < partitions; ++p) { |
767 | 0 | int partitionId = metadataStream.readInt(); |
768 | 0 | PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId, |
769 | |
worker); |
770 | 0 | partitionOwners.add(partitionOwner); |
771 | 0 | LOG.info("prepareCheckpointRestart partitionId=" + partitionId + |
772 | |
" assigned to " + partitionOwner); |
773 | |
} |
774 | 0 | metadataStream.close(); |
775 | |
} |
776 | |
|
777 | |
|
778 | 0 | Collections.sort(partitionOwners, new Comparator<PartitionOwner>() { |
779 | |
@Override |
780 | |
public int compare(PartitionOwner p1, PartitionOwner p2) { |
781 | 0 | return Integer.compare(p1.getPartitionId(), p2.getPartitionId()); |
782 | |
} |
783 | |
}); |
784 | |
|
785 | |
|
786 | 0 | globalCommHandler.getAggregatorHandler().readFields(finalizedStream); |
787 | 0 | aggregatorTranslation.readFields(finalizedStream); |
788 | 0 | masterCompute.readFields(finalizedStream); |
789 | 0 | finalizedStream.close(); |
790 | |
|
791 | 0 | return partitionOwners; |
792 | |
} |
793 | |
|
794 | |
@Override |
795 | |
public void setup() { |
796 | |
|
797 | |
|
798 | |
|
799 | |
|
800 | |
|
801 | 0 | if (getRestartedSuperstep() != UNSET_SUPERSTEP) { |
802 | 0 | GiraphStats.getInstance().getSuperstepCounter(). |
803 | 0 | setValue(getRestartedSuperstep()); |
804 | |
} |
805 | 0 | for (MasterObserver observer : observers) { |
806 | 0 | observer.preApplication(); |
807 | 0 | getContext().progress(); |
808 | |
} |
809 | 0 | } |
810 | |
|
811 | |
@Override |
812 | |
public boolean becomeMaster() { |
813 | |
|
814 | |
|
815 | 0 | String myBid = null; |
816 | |
try { |
817 | 0 | myBid = |
818 | 0 | getZkExt().createExt(masterElectionPath + |
819 | 0 | "/" + getHostnameTaskId(), |
820 | |
null, |
821 | |
Ids.OPEN_ACL_UNSAFE, |
822 | |
CreateMode.EPHEMERAL_SEQUENTIAL, |
823 | |
true); |
824 | 0 | } catch (KeeperException e) { |
825 | 0 | throw new IllegalStateException( |
826 | |
"becomeMaster: KeeperException", e); |
827 | 0 | } catch (InterruptedException e) { |
828 | 0 | throw new IllegalStateException( |
829 | |
"becomeMaster: IllegalStateException", e); |
830 | 0 | } |
831 | |
while (true) { |
832 | 0 | JSONObject jobState = getJobState(); |
833 | |
try { |
834 | 0 | if ((jobState != null) && |
835 | 0 | ApplicationState.valueOf( |
836 | 0 | jobState.getString(JSONOBJ_STATE_KEY)) == |
837 | |
ApplicationState.FINISHED) { |
838 | 0 | LOG.info("becomeMaster: Job is finished, " + |
839 | |
"give up trying to be the master!"); |
840 | 0 | isMaster = false; |
841 | 0 | return isMaster; |
842 | |
} |
843 | 0 | } catch (JSONException e) { |
844 | 0 | throw new IllegalStateException( |
845 | |
"becomeMaster: Couldn't get state from " + jobState, e); |
846 | 0 | } |
847 | |
try { |
848 | 0 | List<String> masterChildArr = |
849 | 0 | getZkExt().getChildrenExt( |
850 | |
masterElectionPath, true, true, true); |
851 | 0 | if (LOG.isInfoEnabled()) { |
852 | 0 | LOG.info("becomeMaster: First child is '" + |
853 | 0 | masterChildArr.get(0) + "' and my bid is '" + |
854 | |
myBid + "'"); |
855 | |
} |
856 | 0 | if (masterChildArr.get(0).equals(myBid)) { |
857 | 0 | GiraphStats.getInstance().getCurrentMasterTaskPartition(). |
858 | 0 | setValue(getTaskId()); |
859 | |
|
860 | 0 | globalCommHandler = new MasterGlobalCommHandler( |
861 | 0 | new MasterAggregatorHandler(getConfiguration(), getContext()), |
862 | |
new MasterInputSplitsHandler( |
863 | 0 | getConfiguration().useInputSplitLocality(), getContext())); |
864 | 0 | aggregatorTranslation = new AggregatorToGlobalCommTranslation( |
865 | 0 | getConfiguration(), globalCommHandler); |
866 | |
|
867 | 0 | globalCommHandler.getAggregatorHandler().initialize(this); |
868 | 0 | masterCompute = getConfiguration().createMasterCompute(); |
869 | 0 | masterCompute.setMasterService(this); |
870 | |
|
871 | 0 | masterInfo = new MasterInfo(); |
872 | 0 | masterServer = |
873 | 0 | new NettyMasterServer(getConfiguration(), this, getContext(), |
874 | 0 | getGraphTaskManager().createUncaughtExceptionHandler()); |
875 | 0 | masterInfo.setInetSocketAddress(masterServer.getMyAddress(), |
876 | 0 | masterServer.getLocalHostOrIp()); |
877 | 0 | masterInfo.setTaskId(getTaskId()); |
878 | 0 | masterClient = |
879 | 0 | new NettyMasterClient(getContext(), getConfiguration(), this, |
880 | 0 | getGraphTaskManager().createUncaughtExceptionHandler()); |
881 | 0 | masterServer.setFlowControl(masterClient.getFlowControl()); |
882 | |
|
883 | 0 | if (LOG.isInfoEnabled()) { |
884 | 0 | LOG.info("becomeMaster: I am now the master!"); |
885 | |
} |
886 | 0 | isMaster = true; |
887 | 0 | return isMaster; |
888 | |
} |
889 | 0 | LOG.info("becomeMaster: Waiting to become the master..."); |
890 | 0 | getMasterElectionChildrenChangedEvent().waitForTimeoutOrFail( |
891 | 0 | GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get( |
892 | 0 | getConfiguration())); |
893 | 0 | getMasterElectionChildrenChangedEvent().reset(); |
894 | 0 | } catch (KeeperException e) { |
895 | 0 | throw new IllegalStateException( |
896 | |
"becomeMaster: KeeperException", e); |
897 | 0 | } catch (InterruptedException e) { |
898 | 0 | throw new IllegalStateException( |
899 | |
"becomeMaster: IllegalStateException", e); |
900 | 0 | } |
901 | 0 | } |
902 | |
} |
903 | |
|
904 | |
@Override |
905 | |
public MasterInfo getMasterInfo() { |
906 | 0 | return masterInfo; |
907 | |
} |
908 | |
|
909 | |
|
910 | |
|
911 | |
|
912 | |
|
913 | |
|
914 | |
|
915 | |
private GlobalStats aggregateWorkerStats(long superstep) { |
916 | 0 | ImmutableClassesGiraphConfiguration conf = getConfiguration(); |
917 | |
|
918 | 0 | GlobalStats globalStats = new GlobalStats(); |
919 | |
|
920 | 0 | String workerFinishedPath = |
921 | 0 | getWorkerMetricsFinishedPath(getApplicationAttempt(), superstep); |
922 | 0 | List<String> workerFinishedPathList = null; |
923 | |
try { |
924 | 0 | workerFinishedPathList = |
925 | 0 | getZkExt().getChildrenExt( |
926 | |
workerFinishedPath, false, false, true); |
927 | 0 | } catch (KeeperException e) { |
928 | 0 | throw new IllegalStateException( |
929 | |
"aggregateWorkerStats: KeeperException", e); |
930 | 0 | } catch (InterruptedException e) { |
931 | 0 | throw new IllegalStateException( |
932 | |
"aggregateWorkerStats: InterruptedException", e); |
933 | 0 | } |
934 | |
|
935 | 0 | AggregatedMetrics aggregatedMetrics = new AggregatedMetrics(); |
936 | |
|
937 | 0 | for (String finishedPath : workerFinishedPathList) { |
938 | 0 | String hostnamePartitionId = FilenameUtils.getName(finishedPath); |
939 | 0 | JSONObject workerFinishedInfoObj = null; |
940 | |
try { |
941 | 0 | byte [] zkData = |
942 | 0 | getZkExt().getData(finishedPath, false, null); |
943 | 0 | workerFinishedInfoObj = new JSONObject(new String(zkData, |
944 | 0 | Charset.defaultCharset())); |
945 | 0 | globalStats.addMessageCount( |
946 | 0 | workerFinishedInfoObj.getLong( |
947 | |
JSONOBJ_NUM_MESSAGES_KEY)); |
948 | 0 | globalStats.addMessageBytesCount( |
949 | 0 | workerFinishedInfoObj.getLong( |
950 | |
JSONOBJ_NUM_MESSAGE_BYTES_KEY)); |
951 | 0 | if (conf.metricsEnabled() && |
952 | 0 | workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) { |
953 | 0 | WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics(); |
954 | 0 | WritableUtils.readFieldsFromByteArray( |
955 | 0 | Base64.decode( |
956 | 0 | workerFinishedInfoObj.getString( |
957 | |
JSONOBJ_METRICS_KEY)), |
958 | |
workerMetrics); |
959 | 0 | globalStats.addOocLoadBytesCount( |
960 | 0 | workerMetrics.getBytesLoadedFromDisk()); |
961 | 0 | globalStats.addOocStoreBytesCount( |
962 | 0 | workerMetrics.getBytesStoredOnDisk()); |
963 | |
|
964 | |
|
965 | 0 | globalStats.setLowestGraphPercentageInMemory( |
966 | 0 | Math.min(globalStats.getLowestGraphPercentageInMemory(), |
967 | 0 | (int) Math.round( |
968 | 0 | workerMetrics.getGraphPercentageInMemory()))); |
969 | 0 | aggregatedMetrics.add(workerMetrics, hostnamePartitionId); |
970 | |
} |
971 | 0 | } catch (JSONException e) { |
972 | 0 | throw new IllegalStateException( |
973 | |
"aggregateWorkerStats: JSONException", e); |
974 | 0 | } catch (KeeperException e) { |
975 | 0 | throw new IllegalStateException( |
976 | |
"aggregateWorkerStats: KeeperException", e); |
977 | 0 | } catch (InterruptedException e) { |
978 | 0 | throw new IllegalStateException( |
979 | |
"aggregateWorkerStats: InterruptedException", e); |
980 | 0 | } catch (IOException e) { |
981 | 0 | throw new IllegalStateException( |
982 | |
"aggregateWorkerStats: IOException", e); |
983 | 0 | } |
984 | 0 | } |
985 | |
|
986 | 0 | allPartitionStatsList.clear(); |
987 | 0 | Iterable<PartitionStats> statsList = globalCommHandler.getAllPartitionStats( |
988 | 0 | workerFinishedPathList.size(), getContext()); |
989 | 0 | for (PartitionStats partitionStats : statsList) { |
990 | 0 | globalStats.addPartitionStats(partitionStats); |
991 | 0 | allPartitionStatsList.add(partitionStats); |
992 | 0 | } |
993 | |
|
994 | 0 | if (conf.metricsEnabled()) { |
995 | 0 | if (GiraphConstants.METRICS_DIRECTORY.isDefaultValue(conf)) { |
996 | 0 | aggregatedMetrics.print(superstep, System.err); |
997 | |
} else { |
998 | 0 | printAggregatedMetricsToHDFS(superstep, aggregatedMetrics); |
999 | |
} |
1000 | 0 | for (MasterObserver observer : observers) { |
1001 | 0 | observer.superstepMetricsUpdate( |
1002 | |
superstep, aggregatedMetrics, allPartitionStatsList); |
1003 | |
} |
1004 | |
} |
1005 | |
|
1006 | 0 | if (LOG.isInfoEnabled()) { |
1007 | 0 | LOG.info("aggregateWorkerStats: Aggregation found " + globalStats + |
1008 | 0 | " on superstep = " + getSuperstep()); |
1009 | |
} |
1010 | 0 | return globalStats; |
1011 | |
} |
1012 | |
|
1013 | |
|
1014 | |
|
1015 | |
|
1016 | |
|
1017 | |
|
1018 | |
private void printAggregatedMetricsToHDFS( |
1019 | |
long superstep, AggregatedMetrics aggregatedMetrics) { |
1020 | 0 | ImmutableClassesGiraphConfiguration conf = getConfiguration(); |
1021 | 0 | PrintStream out = null; |
1022 | 0 | Path dir = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf)); |
1023 | 0 | Path outFile = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf) + |
1024 | |
Path.SEPARATOR_CHAR + "superstep_" + superstep + ".metrics"); |
1025 | |
try { |
1026 | |
FileSystem fs; |
1027 | 0 | fs = FileSystem.get(conf); |
1028 | 0 | if (!fs.exists(dir)) { |
1029 | 0 | fs.mkdirs(dir); |
1030 | |
} |
1031 | 0 | if (fs.exists(outFile)) { |
1032 | 0 | throw new RuntimeException( |
1033 | |
"printAggregatedMetricsToHDFS: metrics file exists"); |
1034 | |
} |
1035 | 0 | out = new PrintStream(fs.create(outFile), false, |
1036 | 0 | Charset.defaultCharset().name()); |
1037 | 0 | aggregatedMetrics.print(superstep, out); |
1038 | 0 | } catch (IOException e) { |
1039 | 0 | throw new RuntimeException( |
1040 | |
"printAggregatedMetricsToHDFS: error creating metrics file", e); |
1041 | |
} finally { |
1042 | 0 | if (out != null) { |
1043 | 0 | out.close(); |
1044 | |
} |
1045 | 0 | } |
1046 | 0 | } |
1047 | |
|
1048 | |
|
1049 | |
|
1050 | |
|
1051 | |
|
1052 | |
|
1053 | |
|
1054 | |
|
1055 | |
|
1056 | |
|
1057 | |
|
1058 | |
|
1059 | |
private void finalizeCheckpoint(long superstep, |
1060 | |
List<WorkerInfo> chosenWorkerInfoList) |
1061 | |
throws IOException, KeeperException, InterruptedException { |
1062 | 0 | Path finalizedCheckpointPath = |
1063 | 0 | new Path(getCheckpointBasePath(superstep) + |
1064 | |
CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX); |
1065 | |
try { |
1066 | 0 | getFs().delete(finalizedCheckpointPath, false); |
1067 | 0 | } catch (IOException e) { |
1068 | 0 | LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " + |
1069 | |
finalizedCheckpointPath); |
1070 | 0 | } |
1071 | |
|
1072 | |
|
1073 | |
|
1074 | |
|
1075 | |
|
1076 | |
|
1077 | |
|
1078 | |
|
1079 | 0 | FSDataOutputStream finalizedOutputStream = |
1080 | 0 | getFs().create(finalizedCheckpointPath); |
1081 | |
|
1082 | 0 | String superstepFinishedNode = |
1083 | 0 | getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1); |
1084 | 0 | finalizedOutputStream.write( |
1085 | 0 | getZkExt().getData(superstepFinishedNode, false, null)); |
1086 | |
|
1087 | 0 | finalizedOutputStream.writeInt(chosenWorkerInfoList.size()); |
1088 | 0 | finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep)); |
1089 | 0 | for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { |
1090 | 0 | finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo)); |
1091 | 0 | } |
1092 | 0 | globalCommHandler.getAggregatorHandler().write(finalizedOutputStream); |
1093 | 0 | aggregatorTranslation.write(finalizedOutputStream); |
1094 | 0 | masterCompute.write(finalizedOutputStream); |
1095 | 0 | finalizedOutputStream.close(); |
1096 | 0 | lastCheckpointedSuperstep = superstep; |
1097 | 0 | GiraphStats.getInstance(). |
1098 | 0 | getLastCheckpointedSuperstep().setValue(superstep); |
1099 | 0 | } |
1100 | |
|
1101 | |
|
1102 | |
|
1103 | |
|
1104 | |
|
1105 | |
|
1106 | |
|
1107 | |
private void assignPartitionOwners() { |
1108 | |
Collection<PartitionOwner> partitionOwners; |
1109 | 0 | if (getSuperstep() == INPUT_SUPERSTEP) { |
1110 | 0 | partitionOwners = |
1111 | 0 | masterGraphPartitioner.createInitialPartitionOwners( |
1112 | |
chosenWorkerInfoList, maxWorkers); |
1113 | 0 | if (partitionOwners.isEmpty()) { |
1114 | 0 | throw new IllegalStateException( |
1115 | |
"assignAndExchangePartitions: No partition owners set"); |
1116 | |
} |
1117 | 0 | } else if (getRestartedSuperstep() == getSuperstep()) { |
1118 | |
|
1119 | |
try { |
1120 | 0 | partitionOwners = prepareCheckpointRestart(getSuperstep()); |
1121 | 0 | } catch (IOException e) { |
1122 | 0 | throw new IllegalStateException( |
1123 | |
"assignPartitionOwners: IOException on preparing", e); |
1124 | 0 | } catch (KeeperException e) { |
1125 | 0 | throw new IllegalStateException( |
1126 | |
"assignPartitionOwners: KeeperException on preparing", e); |
1127 | 0 | } catch (InterruptedException e) { |
1128 | 0 | throw new IllegalStateException( |
1129 | |
"assignPartitionOwners: InteruptedException on preparing", |
1130 | |
e); |
1131 | 0 | } |
1132 | 0 | masterGraphPartitioner.setPartitionOwners(partitionOwners); |
1133 | |
} else { |
1134 | 0 | partitionOwners = |
1135 | 0 | masterGraphPartitioner.generateChangedPartitionOwners( |
1136 | |
allPartitionStatsList, |
1137 | |
chosenWorkerInfoList, |
1138 | |
maxWorkers, |
1139 | 0 | getSuperstep()); |
1140 | |
|
1141 | 0 | PartitionUtils.analyzePartitionStats(partitionOwners, |
1142 | |
allPartitionStatsList); |
1143 | |
} |
1144 | 0 | checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners()); |
1145 | |
|
1146 | |
|
1147 | |
|
1148 | |
|
1149 | 0 | if (!partitionOwners.isEmpty()) { |
1150 | 0 | String vertexExchangePath = |
1151 | 0 | getPartitionExchangePath(getApplicationAttempt(), |
1152 | 0 | getSuperstep()); |
1153 | |
try { |
1154 | 0 | getZkExt().createOnceExt(vertexExchangePath, |
1155 | |
null, |
1156 | |
Ids.OPEN_ACL_UNSAFE, |
1157 | |
CreateMode.PERSISTENT, |
1158 | |
true); |
1159 | 0 | } catch (KeeperException e) { |
1160 | 0 | throw new IllegalStateException( |
1161 | |
"assignPartitionOwners: KeeperException creating " + |
1162 | |
vertexExchangePath); |
1163 | 0 | } catch (InterruptedException e) { |
1164 | 0 | throw new IllegalStateException( |
1165 | |
"assignPartitionOwners: InterruptedException creating " + |
1166 | |
vertexExchangePath); |
1167 | 0 | } |
1168 | |
} |
1169 | |
|
1170 | 0 | AddressesAndPartitionsWritable addressesAndPartitions = |
1171 | |
new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList, |
1172 | |
partitionOwners); |
1173 | |
|
1174 | |
|
1175 | |
|
1176 | 0 | for (WorkerInfo workerInfo : chosenWorkerInfoList) { |
1177 | 0 | masterClient.sendWritableRequest(workerInfo.getTaskId(), |
1178 | |
new AddressesAndPartitionsRequest(addressesAndPartitions)); |
1179 | 0 | } |
1180 | 0 | } |
1181 | |
|
1182 | |
|
1183 | |
|
1184 | |
|
1185 | |
|
1186 | |
|
1187 | |
private void checkPartitions(Collection<PartitionOwner> partitionOwners) { |
1188 | 0 | for (PartitionOwner partitionOwner : partitionOwners) { |
1189 | 0 | int partitionId = partitionOwner.getPartitionId(); |
1190 | 0 | if (partitionId < 0 || partitionId >= partitionOwners.size()) { |
1191 | 0 | throw new IllegalStateException("checkPartitions: " + |
1192 | |
"Invalid partition id " + partitionId + |
1193 | |
" - partition ids must be values from 0 to (numPartitions - 1)"); |
1194 | |
} |
1195 | 0 | } |
1196 | 0 | } |
1197 | |
|
1198 | |
|
1199 | |
|
1200 | |
|
1201 | |
|
1202 | |
|
1203 | |
|
1204 | |
|
1205 | |
|
1206 | |
|
1207 | |
private Collection<WorkerInfo> superstepChosenWorkerAlive( |
1208 | |
String chosenWorkerInfoHealthPath, |
1209 | |
List<WorkerInfo> chosenWorkerInfoList) |
1210 | |
throws KeeperException, InterruptedException { |
1211 | 0 | List<WorkerInfo> chosenWorkerInfoHealthyList = |
1212 | 0 | getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false); |
1213 | 0 | Set<WorkerInfo> chosenWorkerInfoHealthySet = |
1214 | |
new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList); |
1215 | 0 | List<WorkerInfo> deadWorkers = new ArrayList<>(); |
1216 | 0 | for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { |
1217 | 0 | if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) { |
1218 | 0 | deadWorkers.add(chosenWorkerInfo); |
1219 | |
} |
1220 | 0 | } |
1221 | 0 | return deadWorkers; |
1222 | |
} |
1223 | |
|
1224 | |
@Override |
1225 | |
public void restartFromCheckpoint(long checkpoint) { |
1226 | |
|
1227 | |
|
1228 | |
|
1229 | 0 | setApplicationAttempt(getApplicationAttempt() + 1); |
1230 | 0 | setCachedSuperstep(checkpoint); |
1231 | 0 | setRestartedSuperstep(checkpoint); |
1232 | 0 | checkpointStatus = CheckpointStatus.NONE; |
1233 | 0 | setJobState(ApplicationState.START_SUPERSTEP, |
1234 | 0 | getApplicationAttempt(), |
1235 | |
checkpoint); |
1236 | 0 | } |
1237 | |
|
1238 | |
|
1239 | |
|
1240 | |
|
1241 | |
|
1242 | |
|
1243 | |
|
1244 | |
private void zkDeleteNode(String path) { |
1245 | |
try { |
1246 | 0 | getZkExt().deleteExt(path, -1, true); |
1247 | 0 | } catch (KeeperException.NoNodeException e) { |
1248 | 0 | LOG.info("zkDeleteNode: node has already been removed " + path); |
1249 | 0 | } catch (InterruptedException e) { |
1250 | 0 | throw new RuntimeException( |
1251 | |
"zkDeleteNode: InterruptedException", e); |
1252 | 0 | } catch (KeeperException e) { |
1253 | 0 | throw new RuntimeException( |
1254 | |
"zkDeleteNode: KeeperException", e); |
1255 | 0 | } |
1256 | 0 | } |
1257 | |
|
1258 | |
@Override |
1259 | |
public long getLastGoodCheckpoint() throws IOException { |
1260 | |
|
1261 | |
|
1262 | 0 | if (lastCheckpointedSuperstep == -1) { |
1263 | |
try { |
1264 | 0 | lastCheckpointedSuperstep = getLastCheckpointedSuperstep(); |
1265 | 0 | } catch (IOException e) { |
1266 | 0 | LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " + |
1267 | |
"found, killing the job.", e); |
1268 | 0 | failJob(e); |
1269 | 0 | } |
1270 | |
} |
1271 | |
|
1272 | 0 | return lastCheckpointedSuperstep; |
1273 | |
} |
1274 | |
|
1275 | |
|
1276 | |
|
1277 | |
|
1278 | |
|
1279 | |
|
1280 | |
|
1281 | |
|
1282 | |
|
1283 | |
|
1284 | |
|
1285 | |
|
1286 | |
|
1287 | |
|
1288 | |
private boolean barrierOnWorkerList(String finishedWorkerPath, |
1289 | |
List<WorkerInfo> workerInfoList, |
1290 | |
BspEvent event, |
1291 | |
boolean ignoreDeath) { |
1292 | |
try { |
1293 | 0 | getZkExt().createOnceExt(finishedWorkerPath, |
1294 | |
null, |
1295 | |
Ids.OPEN_ACL_UNSAFE, |
1296 | |
CreateMode.PERSISTENT, |
1297 | |
true); |
1298 | 0 | } catch (KeeperException e) { |
1299 | 0 | throw new IllegalStateException( |
1300 | |
"barrierOnWorkerList: KeeperException - Couldn't create " + |
1301 | |
finishedWorkerPath, e); |
1302 | 0 | } catch (InterruptedException e) { |
1303 | 0 | throw new IllegalStateException( |
1304 | |
"barrierOnWorkerList: InterruptedException - Couldn't create " + |
1305 | |
finishedWorkerPath, e); |
1306 | 0 | } |
1307 | 0 | List<String> hostnameIdList = |
1308 | 0 | new ArrayList<String>(workerInfoList.size()); |
1309 | 0 | for (WorkerInfo workerInfo : workerInfoList) { |
1310 | 0 | hostnameIdList.add(workerInfo.getHostnameId()); |
1311 | 0 | } |
1312 | 0 | String workerInfoHealthyPath = |
1313 | 0 | getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep()); |
1314 | 0 | List<String> finishedHostnameIdList = new ArrayList<>(); |
1315 | |
List<String> tmpFinishedHostnameIdList; |
1316 | 0 | long nextInfoMillis = System.currentTimeMillis(); |
1317 | 0 | final int defaultTaskTimeoutMsec = 10 * 60 * 1000; |
1318 | 0 | final int waitBetweenLogInfoMsec = 30 * 1000; |
1319 | 0 | final int taskTimeoutMsec = getContext().getConfiguration().getInt( |
1320 | |
"mapred.task.timeout", defaultTaskTimeoutMsec) / 2; |
1321 | 0 | long lastRegularRunTimeMsec = 0; |
1322 | 0 | int eventLoopTimeout = Math.min(taskTimeoutMsec, waitBetweenLogInfoMsec); |
1323 | 0 | boolean logInfoOnlyRun = false; |
1324 | 0 | List<WorkerInfo> deadWorkers = new ArrayList<>(); |
1325 | |
while (true) { |
1326 | 0 | if (! logInfoOnlyRun) { |
1327 | |
try { |
1328 | 0 | tmpFinishedHostnameIdList = |
1329 | 0 | getZkExt().getChildrenExt(finishedWorkerPath, |
1330 | |
true, |
1331 | |
false, |
1332 | |
false); |
1333 | 0 | } catch (KeeperException e) { |
1334 | 0 | throw new IllegalStateException( |
1335 | |
"barrierOnWorkerList: KeeperException - Couldn't get " + |
1336 | |
"children of " + finishedWorkerPath, e); |
1337 | 0 | } catch (InterruptedException e) { |
1338 | 0 | throw new IllegalStateException( |
1339 | |
"barrierOnWorkerList: IllegalException - Couldn't get " + |
1340 | |
"children of " + finishedWorkerPath, e); |
1341 | 0 | } |
1342 | 0 | if (LOG.isDebugEnabled()) { |
1343 | |
|
1344 | 0 | Set<String> newFinishedHostnames = Sets.difference( |
1345 | 0 | Sets.newHashSet(tmpFinishedHostnameIdList), |
1346 | 0 | Sets.newHashSet(finishedHostnameIdList)); |
1347 | 0 | LOG.debug("barrierOnWorkerList: Got new finished worker list = " + |
1348 | |
newFinishedHostnames + ", size = " + |
1349 | 0 | newFinishedHostnames.size() + |
1350 | |
" from " + finishedWorkerPath); |
1351 | |
} |
1352 | 0 | finishedHostnameIdList = tmpFinishedHostnameIdList; |
1353 | |
} |
1354 | |
|
1355 | 0 | if (LOG.isInfoEnabled() && |
1356 | 0 | (System.currentTimeMillis() > nextInfoMillis)) { |
1357 | 0 | nextInfoMillis = System.currentTimeMillis() + waitBetweenLogInfoMsec; |
1358 | 0 | LOG.info("barrierOnWorkerList: " + |
1359 | 0 | finishedHostnameIdList.size() + |
1360 | 0 | " out of " + workerInfoList.size() + |
1361 | |
" workers finished on superstep " + |
1362 | 0 | getSuperstep() + " on path " + finishedWorkerPath); |
1363 | 0 | if (workerInfoList.size() - finishedHostnameIdList.size() < |
1364 | |
MAX_PRINTABLE_REMAINING_WORKERS) { |
1365 | 0 | Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList); |
1366 | 0 | remainingWorkers.removeAll(finishedHostnameIdList); |
1367 | 0 | LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers); |
1368 | |
} |
1369 | |
} |
1370 | |
|
1371 | 0 | if (! logInfoOnlyRun) { |
1372 | 0 | getContext().setStatus(getGraphTaskManager().getGraphFunctions() + |
1373 | |
" - " + |
1374 | 0 | finishedHostnameIdList.size() + |
1375 | |
" finished out of " + |
1376 | 0 | workerInfoList.size() + |
1377 | 0 | " on superstep " + getSuperstep()); |
1378 | 0 | if (finishedHostnameIdList.containsAll(hostnameIdList)) { |
1379 | 0 | break; |
1380 | |
} |
1381 | |
|
1382 | 0 | for (WorkerInfo deadWorker : deadWorkers) { |
1383 | 0 | if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) { |
1384 | 0 | LOG.error("barrierOnWorkerList: no results arived from " + |
1385 | |
"worker that was pronounced dead: " + deadWorker + |
1386 | 0 | " on superstep " + getSuperstep()); |
1387 | 0 | return false; |
1388 | |
} |
1389 | 0 | } |
1390 | |
|
1391 | |
|
1392 | 0 | lastRegularRunTimeMsec = System.currentTimeMillis(); |
1393 | |
} |
1394 | |
|
1395 | |
|
1396 | 0 | boolean eventTriggered = event.waitMsecs(eventLoopTimeout); |
1397 | |
|
1398 | |
|
1399 | |
|
1400 | 0 | if (eventTriggered) { |
1401 | 0 | event.reset(); |
1402 | |
} |
1403 | |
|
1404 | 0 | long elapsedTimeSinceRegularRunMsec = System.currentTimeMillis() - |
1405 | |
lastRegularRunTimeMsec; |
1406 | 0 | getContext().progress(); |
1407 | |
|
1408 | 0 | if (eventTriggered || |
1409 | |
taskTimeoutMsec == eventLoopTimeout || |
1410 | |
elapsedTimeSinceRegularRunMsec >= taskTimeoutMsec) { |
1411 | 0 | logInfoOnlyRun = false; |
1412 | |
} else { |
1413 | 0 | logInfoOnlyRun = true; |
1414 | 0 | continue; |
1415 | |
} |
1416 | |
|
1417 | |
|
1418 | |
try { |
1419 | 0 | deadWorkers.addAll(superstepChosenWorkerAlive( |
1420 | |
workerInfoHealthyPath, |
1421 | |
workerInfoList)); |
1422 | 0 | if (!ignoreDeath && deadWorkers.size() > 0) { |
1423 | 0 | String errorMessage = "******* WORKERS " + deadWorkers + |
1424 | |
" FAILED *******"; |
1425 | |
|
1426 | 0 | if (!getConfiguration().useCheckpointing()) { |
1427 | 0 | setJobStateFailed(errorMessage); |
1428 | |
} else { |
1429 | 0 | LOG.error("barrierOnWorkerList: Missing chosen " + |
1430 | |
"workers " + deadWorkers + |
1431 | 0 | " on superstep " + getSuperstep()); |
1432 | |
|
1433 | 0 | getGraphTaskManager().getJobProgressTracker().logInfo(errorMessage); |
1434 | |
} |
1435 | 0 | return false; |
1436 | |
} |
1437 | 0 | } catch (KeeperException e) { |
1438 | 0 | throw new IllegalStateException( |
1439 | |
"barrierOnWorkerList: KeeperException - " + |
1440 | |
"Couldn't get " + workerInfoHealthyPath, e); |
1441 | 0 | } catch (InterruptedException e) { |
1442 | 0 | throw new IllegalStateException( |
1443 | |
"barrierOnWorkerList: InterruptedException - " + |
1444 | |
"Couldn't get " + workerInfoHealthyPath, e); |
1445 | 0 | } |
1446 | 0 | } |
1447 | |
|
1448 | 0 | return true; |
1449 | |
} |
1450 | |
|
1451 | |
|
1452 | |
|
1453 | |
|
1454 | |
|
1455 | |
|
1456 | |
|
1457 | |
private void cleanUpOldSuperstep(long removeableSuperstep) throws |
1458 | |
InterruptedException { |
1459 | 0 | if (KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) && |
1460 | |
(removeableSuperstep >= 0)) { |
1461 | 0 | String oldSuperstepPath = |
1462 | 0 | getSuperstepPath(getApplicationAttempt()) + "/" + |
1463 | |
removeableSuperstep; |
1464 | |
try { |
1465 | 0 | if (LOG.isInfoEnabled()) { |
1466 | 0 | LOG.info("coordinateSuperstep: Cleaning up old Superstep " + |
1467 | |
oldSuperstepPath); |
1468 | |
} |
1469 | 0 | getZkExt().deleteExt(oldSuperstepPath, |
1470 | |
-1, |
1471 | |
true); |
1472 | 0 | } catch (KeeperException.NoNodeException e) { |
1473 | 0 | LOG.warn("coordinateBarrier: Already cleaned up " + |
1474 | |
oldSuperstepPath); |
1475 | 0 | } catch (KeeperException e) { |
1476 | 0 | throw new IllegalStateException( |
1477 | |
"coordinateSuperstep: KeeperException on " + |
1478 | |
"finalizing checkpoint", e); |
1479 | 0 | } |
1480 | |
} |
1481 | 0 | } |
1482 | |
|
1483 | |
|
1484 | |
|
1485 | |
|
1486 | |
private void coordinateInputSplits() { |
1487 | |
|
1488 | |
|
1489 | 0 | if (!barrierOnWorkerList(inputSplitsWorkerDonePath, |
1490 | |
chosenWorkerInfoList, |
1491 | 0 | getInputSplitsWorkerDoneEvent(), |
1492 | |
false)) { |
1493 | 0 | throw new IllegalStateException("coordinateInputSplits: Worker failed " + |
1494 | |
"during input split (currently not supported)"); |
1495 | |
} |
1496 | |
try { |
1497 | 0 | getZkExt().createExt(inputSplitsAllDonePath, |
1498 | |
null, |
1499 | |
Ids.OPEN_ACL_UNSAFE, |
1500 | |
CreateMode.PERSISTENT, |
1501 | |
false); |
1502 | 0 | } catch (KeeperException.NodeExistsException e) { |
1503 | 0 | LOG.info("coordinateInputSplits: Node " + |
1504 | |
inputSplitsAllDonePath + " already exists."); |
1505 | 0 | } catch (KeeperException e) { |
1506 | 0 | throw new IllegalStateException( |
1507 | |
"coordinateInputSplits: KeeperException", e); |
1508 | 0 | } catch (InterruptedException e) { |
1509 | 0 | throw new IllegalStateException( |
1510 | |
"coordinateInputSplits: IllegalStateException", e); |
1511 | 0 | } |
1512 | 0 | } |
1513 | |
|
1514 | |
|
1515 | |
|
1516 | |
|
1517 | |
|
1518 | |
|
1519 | |
|
1520 | |
|
1521 | |
|
1522 | |
|
1523 | |
|
1524 | |
|
1525 | |
|
1526 | |
|
1527 | |
|
1528 | |
private void initializeAggregatorInputSuperstep() |
1529 | |
throws InterruptedException { |
1530 | 0 | globalCommHandler.getAggregatorHandler().prepareSuperstep(); |
1531 | |
|
1532 | 0 | prepareMasterCompute(getSuperstep()); |
1533 | |
try { |
1534 | 0 | masterCompute.initialize(); |
1535 | 0 | } catch (InstantiationException e) { |
1536 | 0 | LOG.fatal( |
1537 | |
"initializeAggregatorInputSuperstep: Failed in instantiation", e); |
1538 | 0 | throw new RuntimeException( |
1539 | |
"initializeAggregatorInputSuperstep: Failed in instantiation", e); |
1540 | 0 | } catch (IllegalAccessException e) { |
1541 | 0 | LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e); |
1542 | 0 | throw new RuntimeException( |
1543 | |
"initializeAggregatorInputSuperstep: Failed in access", e); |
1544 | 0 | } |
1545 | 0 | aggregatorTranslation.postMasterCompute(); |
1546 | 0 | globalCommHandler.getAggregatorHandler().finishSuperstep(); |
1547 | |
|
1548 | 0 | globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient); |
1549 | 0 | } |
1550 | |
|
1551 | |
|
1552 | |
|
1553 | |
|
1554 | |
|
1555 | |
|
1556 | |
|
1557 | |
|
1558 | |
private SuperstepClasses prepareMasterCompute(long superstep) { |
1559 | 0 | GraphState graphState = new GraphState(superstep , |
1560 | 0 | GiraphStats.getInstance().getVertices().getValue(), |
1561 | 0 | GiraphStats.getInstance().getEdges().getValue(), |
1562 | 0 | getContext()); |
1563 | 0 | SuperstepClasses superstepClasses = |
1564 | 0 | SuperstepClasses.createAndExtractTypes(getConfiguration()); |
1565 | 0 | masterCompute.setGraphState(graphState); |
1566 | 0 | masterCompute.setSuperstepClasses(superstepClasses); |
1567 | 0 | return superstepClasses; |
1568 | |
} |
1569 | |
|
1570 | |
@Override |
1571 | |
public SuperstepState coordinateSuperstep() throws |
1572 | |
KeeperException, InterruptedException { |
1573 | |
|
1574 | |
|
1575 | |
|
1576 | |
|
1577 | |
|
1578 | |
|
1579 | |
|
1580 | |
|
1581 | 0 | for (MasterObserver observer : observers) { |
1582 | 0 | observer.preSuperstep(getSuperstep()); |
1583 | 0 | getContext().progress(); |
1584 | |
} |
1585 | |
|
1586 | 0 | chosenWorkerInfoList = checkWorkers(); |
1587 | 0 | if (chosenWorkerInfoList == null) { |
1588 | 0 | setJobStateFailed("coordinateSuperstep: Not enough healthy workers for " + |
1589 | 0 | "superstep " + getSuperstep()); |
1590 | |
} else { |
1591 | |
|
1592 | 0 | Collections.sort(chosenWorkerInfoList, new Comparator<WorkerInfo>() { |
1593 | |
@Override |
1594 | |
public int compare(WorkerInfo wi1, WorkerInfo wi2) { |
1595 | 0 | return Integer.compare(wi1.getTaskId(), wi2.getTaskId()); |
1596 | |
} |
1597 | |
}); |
1598 | 0 | for (WorkerInfo workerInfo : chosenWorkerInfoList) { |
1599 | 0 | String workerInfoHealthyPath = |
1600 | 0 | getWorkerInfoHealthyPath(getApplicationAttempt(), |
1601 | 0 | getSuperstep()) + "/" + |
1602 | 0 | workerInfo.getHostnameId(); |
1603 | 0 | if (getZkExt().exists(workerInfoHealthyPath, true) == null) { |
1604 | 0 | LOG.warn("coordinateSuperstep: Chosen worker " + |
1605 | |
workerInfoHealthyPath + |
1606 | |
" is no longer valid, failing superstep"); |
1607 | |
} |
1608 | 0 | } |
1609 | |
} |
1610 | |
|
1611 | |
|
1612 | 0 | if (getSuperstep() >= 0) { |
1613 | 0 | aggregatorTranslation.postMasterCompute(); |
1614 | 0 | globalCommHandler.getAggregatorHandler().finishSuperstep(); |
1615 | |
} |
1616 | |
|
1617 | 0 | masterClient.openConnections(); |
1618 | |
|
1619 | 0 | GiraphStats.getInstance(). |
1620 | 0 | getCurrentWorkers().setValue(chosenWorkerInfoList.size()); |
1621 | 0 | assignPartitionOwners(); |
1622 | |
|
1623 | |
|
1624 | |
|
1625 | 0 | if (checkpointStatus != CheckpointStatus.NONE) { |
1626 | 0 | String workerWroteCheckpointPath = |
1627 | 0 | getWorkerWroteCheckpointPath(getApplicationAttempt(), |
1628 | 0 | getSuperstep()); |
1629 | |
|
1630 | 0 | if (!barrierOnWorkerList(workerWroteCheckpointPath, |
1631 | |
chosenWorkerInfoList, |
1632 | 0 | getWorkerWroteCheckpointEvent(), |
1633 | |
checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) { |
1634 | 0 | return SuperstepState.WORKER_FAILURE; |
1635 | |
} |
1636 | |
try { |
1637 | 0 | finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList); |
1638 | 0 | } catch (IOException e) { |
1639 | 0 | throw new IllegalStateException( |
1640 | |
"coordinateSuperstep: IOException on finalizing checkpoint", |
1641 | |
e); |
1642 | 0 | } |
1643 | 0 | if (checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) { |
1644 | 0 | return SuperstepState.CHECKPOINT_AND_HALT; |
1645 | |
} |
1646 | |
} |
1647 | |
|
1648 | |
|
1649 | 0 | if (getSuperstep() >= 0) { |
1650 | 0 | globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient); |
1651 | |
} |
1652 | |
|
1653 | 0 | if (getSuperstep() == INPUT_SUPERSTEP) { |
1654 | |
|
1655 | 0 | initializeAggregatorInputSuperstep(); |
1656 | 0 | coordinateInputSplits(); |
1657 | |
} |
1658 | |
|
1659 | 0 | String finishedWorkerPath = |
1660 | 0 | getWorkerMetricsFinishedPath(getApplicationAttempt(), getSuperstep()); |
1661 | 0 | if (!barrierOnWorkerList(finishedWorkerPath, |
1662 | |
chosenWorkerInfoList, |
1663 | 0 | getSuperstepStateChangedEvent(), |
1664 | |
false)) { |
1665 | 0 | return SuperstepState.WORKER_FAILURE; |
1666 | |
} |
1667 | |
|
1668 | |
|
1669 | |
|
1670 | 0 | globalCommHandler.getAggregatorHandler().prepareSuperstep(); |
1671 | 0 | aggregatorTranslation.prepareSuperstep(); |
1672 | |
|
1673 | 0 | SuperstepClasses superstepClasses = |
1674 | 0 | prepareMasterCompute(getSuperstep() + 1); |
1675 | 0 | doMasterCompute(); |
1676 | |
|
1677 | |
|
1678 | |
|
1679 | 0 | GlobalStats globalStats = aggregateWorkerStats(getSuperstep()); |
1680 | 0 | aggregateCountersFromWorkersAndMaster(); |
1681 | 0 | if (masterCompute.isHalted() || |
1682 | 0 | (globalStats.getFinishedVertexCount() == |
1683 | 0 | globalStats.getVertexCount() && |
1684 | 0 | globalStats.getMessageCount() == 0)) { |
1685 | 0 | globalStats.setHaltComputation(true); |
1686 | 0 | } else if (getZkExt().exists(haltComputationPath, false) != null) { |
1687 | 0 | if (LOG.isInfoEnabled()) { |
1688 | 0 | LOG.info("Halting computation because halt zookeeper node was created"); |
1689 | |
} |
1690 | 0 | globalStats.setHaltComputation(true); |
1691 | |
} |
1692 | |
|
1693 | |
|
1694 | |
|
1695 | 0 | if (maxNumberOfSupersteps != |
1696 | 0 | GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() && |
1697 | 0 | (getSuperstep() == maxNumberOfSupersteps - 1)) { |
1698 | 0 | if (LOG.isInfoEnabled()) { |
1699 | 0 | LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps + |
1700 | |
" supersteps (max specified by the user), halting"); |
1701 | |
} |
1702 | 0 | globalStats.setHaltComputation(true); |
1703 | |
} |
1704 | |
|
1705 | |
|
1706 | |
|
1707 | |
|
1708 | 0 | if (!globalStats.getHaltComputation()) { |
1709 | 0 | superstepClasses.verifyTypesMatch(getSuperstep() > 0); |
1710 | |
} |
1711 | 0 | getConfiguration().updateSuperstepClasses(superstepClasses); |
1712 | |
|
1713 | |
|
1714 | 0 | checkpointStatus = getCheckpointStatus(getSuperstep() + 1); |
1715 | 0 | globalStats.setCheckpointStatus(checkpointStatus); |
1716 | |
|
1717 | |
|
1718 | 0 | String superstepFinishedNode = |
1719 | 0 | getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep()); |
1720 | |
|
1721 | 0 | WritableUtils.writeToZnode( |
1722 | 0 | getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses); |
1723 | 0 | updateCounters(globalStats); |
1724 | |
|
1725 | 0 | cleanUpOldSuperstep(getSuperstep() - 1); |
1726 | 0 | incrCachedSuperstep(); |
1727 | |
|
1728 | 0 | if (getSuperstep() > 0) { |
1729 | 0 | GiraphStats.getInstance().getSuperstepCounter().increment(); |
1730 | |
} |
1731 | |
SuperstepState superstepState; |
1732 | 0 | if (globalStats.getHaltComputation()) { |
1733 | 0 | superstepState = SuperstepState.ALL_SUPERSTEPS_DONE; |
1734 | |
} else { |
1735 | 0 | superstepState = SuperstepState.THIS_SUPERSTEP_DONE; |
1736 | |
} |
1737 | 0 | globalCommHandler.getAggregatorHandler().writeAggregators( |
1738 | 0 | getSuperstep(), superstepState); |
1739 | |
|
1740 | 0 | return superstepState; |
1741 | |
} |
1742 | |
|
1743 | |
|
1744 | |
|
1745 | |
|
1746 | |
|
1747 | |
|
1748 | |
|
1749 | |
|
1750 | |
|
1751 | |
private CheckpointStatus getCheckpointStatus(long superstep) { |
1752 | |
try { |
1753 | 0 | if (getZkExt(). |
1754 | 0 | exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) { |
1755 | 0 | if (isCheckpointingSupported(getConfiguration(), masterCompute)) { |
1756 | 0 | return CheckpointStatus.CHECKPOINT_AND_HALT; |
1757 | |
} else { |
1758 | 0 | LOG.warn("Attempted to manually checkpoint the job that " + |
1759 | |
"does not support checkpoints. Ignoring"); |
1760 | |
} |
1761 | |
} |
1762 | 0 | } catch (KeeperException e) { |
1763 | 0 | throw new IllegalStateException( |
1764 | |
"cleanupZooKeeper: Got KeeperException", e); |
1765 | 0 | } catch (InterruptedException e) { |
1766 | 0 | throw new IllegalStateException( |
1767 | |
"cleanupZooKeeper: Got IllegalStateException", e); |
1768 | 0 | } |
1769 | 0 | if (checkpointFrequency == 0) { |
1770 | 0 | return CheckpointStatus.NONE; |
1771 | |
} |
1772 | 0 | long firstCheckpoint = INPUT_SUPERSTEP + 1; |
1773 | 0 | if (getRestartedSuperstep() != UNSET_SUPERSTEP) { |
1774 | 0 | firstCheckpoint = getRestartedSuperstep() + checkpointFrequency; |
1775 | |
} |
1776 | 0 | if (superstep < firstCheckpoint) { |
1777 | 0 | return CheckpointStatus.NONE; |
1778 | |
} |
1779 | 0 | if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) { |
1780 | 0 | if (isCheckpointingSupported(getConfiguration(), masterCompute)) { |
1781 | 0 | return CheckpointStatus.CHECKPOINT; |
1782 | |
} |
1783 | |
} |
1784 | 0 | return CheckpointStatus.NONE; |
1785 | |
} |
1786 | |
|
1787 | |
|
1788 | |
|
1789 | |
|
1790 | |
|
1791 | |
|
1792 | |
|
1793 | |
|
1794 | |
|
1795 | |
|
1796 | |
private boolean isCheckpointingSupported( |
1797 | |
GiraphConfiguration conf, MasterCompute masterCompute) { |
1798 | 0 | return checkpointSupportedChecker.isCheckpointSupported( |
1799 | |
conf, masterCompute); |
1800 | |
} |
1801 | |
|
1802 | |
|
1803 | |
|
1804 | |
|
1805 | |
|
1806 | |
|
1807 | |
private void doMasterCompute() { |
1808 | 0 | GiraphTimerContext timerContext = masterComputeTimer.time(); |
1809 | 0 | masterCompute.compute(); |
1810 | 0 | timerContext.stop(); |
1811 | 0 | } |
1812 | |
|
1813 | |
|
1814 | |
|
1815 | |
|
1816 | |
|
1817 | |
|
1818 | |
|
1819 | |
|
1820 | |
private void populateCountersFromContext(Mapper.Context context, |
1821 | |
Map<String, Set<String>> counterGroupAndNames, |
1822 | |
Set<CustomCounter> counters) { |
1823 | |
Counter counter; |
1824 | |
for (Map.Entry<String, Set<String>> entry : |
1825 | 0 | counterGroupAndNames.entrySet()) { |
1826 | 0 | String groupName = entry.getKey(); |
1827 | 0 | for (String counterName: entry.getValue()) { |
1828 | 0 | CustomCounter customCounter = new CustomCounter(groupName, counterName, |
1829 | |
CustomCounter.Aggregation.SUM); |
1830 | 0 | counter = context.getCounter(groupName, counterName); |
1831 | 0 | customCounter.setValue(counter.getValue()); |
1832 | 0 | counters.add(customCounter); |
1833 | 0 | } |
1834 | 0 | } |
1835 | 0 | } |
1836 | |
|
1837 | |
|
1838 | |
|
1839 | |
|
1840 | |
|
1841 | |
|
1842 | |
|
1843 | |
|
1844 | |
|
1845 | |
private void aggregateCountersFromWorkersAndMaster() { |
1846 | 0 | CustomCounters customCounters = new CustomCounters(); |
1847 | 0 | long superstep = getSuperstep(); |
1848 | |
|
1849 | 0 | String workerFinishedPath = getWorkerCountersFinishedPath( |
1850 | 0 | getApplicationAttempt(), superstep); |
1851 | |
try { |
1852 | 0 | getZkExt().createOnceExt(workerFinishedPath, |
1853 | |
null, |
1854 | |
Ids.OPEN_ACL_UNSAFE, |
1855 | |
CreateMode.PERSISTENT, |
1856 | |
true); |
1857 | 0 | } catch (KeeperException e) { |
1858 | 0 | LOG.warn("aggregateCounters: KeeperException - " + |
1859 | |
"Couldn't create " + workerFinishedPath, e); |
1860 | 0 | } catch (InterruptedException e) { |
1861 | 0 | LOG.warn("barrierOnWorkerList: InterruptedException - " + |
1862 | |
"Couldn't create " + workerFinishedPath, e); |
1863 | 0 | } |
1864 | 0 | List<String> workerFinishedPathList = new ArrayList<>(); |
1865 | |
long waitForCountersTimeout = |
1866 | 0 | SystemTime.get().getMilliseconds() + maxCounterWaitMsecs; |
1867 | |
|
1868 | 0 | int numWorkers = BspInputFormat.getMaxTasks(getConfiguration()) - 1; |
1869 | 0 | if (numWorkers == 0) { |
1870 | |
|
1871 | |
|
1872 | 0 | numWorkers += 1; |
1873 | |
} |
1874 | |
|
1875 | |
|
1876 | 0 | while (SystemTime.get().getMilliseconds() < waitForCountersTimeout) { |
1877 | |
try { |
1878 | 0 | workerFinishedPathList = getZkExt().getChildrenExt( |
1879 | |
workerFinishedPath, true, |
1880 | |
false, true); |
1881 | 0 | LOG.info(String.format("Fetching counter values from " + |
1882 | |
"workers for superstep %d. Got %d out of %d", |
1883 | 0 | superstep, workerFinishedPathList.size(), numWorkers)); |
1884 | 0 | if (workerFinishedPathList.size() == numWorkers) { |
1885 | 0 | break; |
1886 | |
} |
1887 | 0 | } catch (KeeperException e) { |
1888 | 0 | LOG.warn("Got Keeper exception, but will retry: ", e); |
1889 | 0 | } catch (InterruptedException e) { |
1890 | 0 | LOG.warn("aggregateCounters: InterruptedException", e); |
1891 | 0 | } |
1892 | 0 | getWrittenCountersToZKEvent().waitMsecs(eventWaitMsecs); |
1893 | 0 | getWrittenCountersToZKEvent().reset(); |
1894 | |
} |
1895 | 0 | for (String finishedPath : workerFinishedPathList) { |
1896 | 0 | JSONArray jsonCounters = null; |
1897 | |
try { |
1898 | 0 | byte [] zkData = |
1899 | 0 | getZkExt().getData(finishedPath, false, null); |
1900 | 0 | jsonCounters = new JSONArray(new String(zkData, |
1901 | 0 | Charset.defaultCharset())); |
1902 | 0 | Set<CustomCounter> workerCounters = new HashSet<>(); |
1903 | 0 | for (int i = 0; i < jsonCounters.length(); i++) { |
1904 | 0 | CustomCounter customCounter = new CustomCounter(); |
1905 | 0 | WritableUtils.readFieldsFromByteArray(Base64.decode( |
1906 | 0 | jsonCounters.getString(i)), customCounter); |
1907 | 0 | workerCounters.add(customCounter); |
1908 | |
} |
1909 | 0 | customCounters.mergeCounters(workerCounters); |
1910 | 0 | } catch (JSONException e) { |
1911 | 0 | LOG.warn("aggregateCounters: JSONException", e); |
1912 | 0 | } catch (KeeperException e) { |
1913 | 0 | LOG.warn("aggregateCounters: KeeperException", e); |
1914 | 0 | } catch (InterruptedException e) { |
1915 | 0 | LOG.warn("aggregateCounters: InterruptedException", e); |
1916 | 0 | } catch (IOException e) { |
1917 | 0 | LOG.warn("aggregateCounters: IOException", e); |
1918 | 0 | } |
1919 | 0 | } |
1920 | 0 | Mapper.Context context = getContext(); |
1921 | 0 | Set<CustomCounter> masterCounters = new HashSet<>(); |
1922 | |
|
1923 | 0 | if (numWorkers != 1) { |
1924 | |
|
1925 | |
|
1926 | |
|
1927 | |
Counter counter; |
1928 | |
Set<CustomCounter> masterCounterNames = |
1929 | 0 | CustomCounters.getAndClearCustomCounters(); |
1930 | 0 | for (CustomCounter customCounter : masterCounterNames) { |
1931 | 0 | String groupName = customCounter.getGroupName(); |
1932 | 0 | String counterName = customCounter.getCounterName(); |
1933 | 0 | counter = context.getCounter(groupName, counterName); |
1934 | 0 | customCounter.setValue(counter.getValue()); |
1935 | 0 | masterCounters.add(customCounter); |
1936 | 0 | } |
1937 | |
|
1938 | |
Map<String, Set<String>> nettyCounters = |
1939 | 0 | NettyClient.getCounterGroupsAndNames(); |
1940 | 0 | populateCountersFromContext(context, nettyCounters, masterCounters); |
1941 | |
} |
1942 | |
|
1943 | |
|
1944 | |
|
1945 | |
Map<String, Set<String>> inputSplitCounter = |
1946 | 0 | MasterInputSplitsHandler.getCounterGroupAndNames(); |
1947 | 0 | populateCountersFromContext(context, inputSplitCounter, masterCounters); |
1948 | 0 | customCounters.mergeCounters(masterCounters); |
1949 | |
|
1950 | 0 | List<CustomCounter> allCounters = new ArrayList<>(); |
1951 | 0 | allCounters.addAll(GiraphStats.getInstance().getCounterList()); |
1952 | |
|
1953 | 0 | allCounters.addAll(customCounters.getCounterList()); |
1954 | |
|
1955 | 0 | giraphCountersThriftStruct.setCounters(allCounters); |
1956 | 0 | } |
1957 | |
|
1958 | |
|
1959 | |
|
1960 | |
|
1961 | |
|
1962 | |
|
1963 | |
|
1964 | |
|
1965 | |
|
1966 | |
public void addGiraphTimersAndSendCounters(long superstep) { |
1967 | 0 | List<CustomCounter> giraphCounters = |
1968 | 0 | giraphCountersThriftStruct.getCounters(); |
1969 | 0 | giraphCounters.addAll(GiraphTimers.getInstance().getCounterList(superstep)); |
1970 | 0 | giraphCountersThriftStruct.setCounters(giraphCounters); |
1971 | 0 | getJobProgressTracker().sendMasterCounters(giraphCountersThriftStruct); |
1972 | 0 | } |
1973 | |
|
1974 | |
|
1975 | |
|
1976 | |
|
1977 | |
|
1978 | |
private void waitForWorkersToCleanup() { |
1979 | |
try { |
1980 | 0 | getZkExt().createExt(cleanedUpPath, |
1981 | |
null, |
1982 | |
Ids.OPEN_ACL_UNSAFE, |
1983 | |
CreateMode.PERSISTENT, |
1984 | |
true); |
1985 | 0 | } catch (KeeperException.NodeExistsException e) { |
1986 | 0 | if (LOG.isInfoEnabled()) { |
1987 | 0 | LOG.info("cleanUpZooKeeper: Node " + cleanedUpPath + |
1988 | |
" already exists, no need to create."); |
1989 | |
} |
1990 | 0 | } catch (KeeperException e) { |
1991 | 0 | throw new IllegalStateException( |
1992 | |
"cleanupZooKeeper: Got KeeperException", e); |
1993 | 0 | } catch (InterruptedException e) { |
1994 | 0 | throw new IllegalStateException( |
1995 | |
"cleanupZooKeeper: Got IllegalStateException", e); |
1996 | 0 | } |
1997 | |
|
1998 | 0 | int maxTasks = BspInputFormat.getMaxTasks(getConfiguration()); |
1999 | 0 | if ((getGraphTaskManager().getGraphFunctions() == GraphFunctions.ALL) || |
2000 | 0 | (getGraphTaskManager().getGraphFunctions() == |
2001 | |
GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) { |
2002 | 0 | maxTasks *= 2; |
2003 | |
} |
2004 | 0 | List<String> cleanedUpChildrenList = null; |
2005 | |
while (true) { |
2006 | |
try { |
2007 | 0 | cleanedUpChildrenList = |
2008 | 0 | getZkExt().getChildrenExt( |
2009 | |
cleanedUpPath, true, false, true); |
2010 | 0 | if (LOG.isInfoEnabled()) { |
2011 | 0 | LOG.info("cleanUpZooKeeper: Got " + |
2012 | 0 | cleanedUpChildrenList.size() + " of " + |
2013 | |
maxTasks + " desired children from " + |
2014 | |
cleanedUpPath); |
2015 | |
} |
2016 | 0 | if (cleanedUpChildrenList.size() == maxTasks) { |
2017 | 0 | break; |
2018 | |
} |
2019 | 0 | if (LOG.isInfoEnabled()) { |
2020 | 0 | LOG.info("cleanedUpZooKeeper: Waiting for the " + |
2021 | |
"children of " + cleanedUpPath + |
2022 | |
" to change since only got " + |
2023 | 0 | cleanedUpChildrenList.size() + " nodes."); |
2024 | |
} |
2025 | 0 | } catch (KeeperException e) { |
2026 | |
|
2027 | 0 | LOG.error("cleanUpZooKeeper: Got KeeperException, " + |
2028 | |
"but will continue", e); |
2029 | 0 | return; |
2030 | 0 | } catch (InterruptedException e) { |
2031 | |
|
2032 | 0 | LOG.error("cleanUpZooKeeper: Got InterruptedException, " + |
2033 | |
"but will continue", e); |
2034 | 0 | return; |
2035 | 0 | } |
2036 | |
|
2037 | 0 | getCleanedUpChildrenChangedEvent().waitForTimeoutOrFail( |
2038 | 0 | GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get( |
2039 | 0 | getConfiguration())); |
2040 | 0 | getCleanedUpChildrenChangedEvent().reset(); |
2041 | |
} |
2042 | 0 | } |
2043 | |
|
2044 | |
|
2045 | |
|
2046 | |
|
2047 | |
|
2048 | |
private void cleanUpZooKeeper() { |
2049 | |
|
2050 | |
|
2051 | |
|
2052 | |
try { |
2053 | 0 | if (getConfiguration().isZookeeperExternal() && |
2054 | 0 | KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) { |
2055 | 0 | if (LOG.isInfoEnabled()) { |
2056 | 0 | LOG.info("cleanupZooKeeper: Removing the following path " + |
2057 | |
"and all children - " + basePath + " from ZooKeeper list " + |
2058 | 0 | getConfiguration().getZookeeperList()); |
2059 | |
} |
2060 | 0 | getZkExt().deleteExt(basePath, -1, true); |
2061 | |
} |
2062 | 0 | } catch (KeeperException e) { |
2063 | 0 | LOG.error("cleanupZooKeeper: Failed to do cleanup of " + |
2064 | |
basePath + " due to KeeperException", e); |
2065 | 0 | } catch (InterruptedException e) { |
2066 | 0 | LOG.error("cleanupZooKeeper: Failed to do cleanup of " + |
2067 | |
basePath + " due to InterruptedException", e); |
2068 | 0 | } |
2069 | 0 | } |
2070 | |
|
2071 | |
@Override |
2072 | |
public void postApplication() { |
2073 | 0 | for (MasterObserver observer : observers) { |
2074 | 0 | observer.postApplication(); |
2075 | 0 | getContext().progress(); |
2076 | |
} |
2077 | 0 | } |
2078 | |
|
2079 | |
@Override |
2080 | |
public void postSuperstep() { |
2081 | 0 | for (MasterObserver observer : observers) { |
2082 | 0 | observer.postSuperstep(getSuperstep()); |
2083 | 0 | getContext().progress(); |
2084 | |
} |
2085 | 0 | } |
2086 | |
|
2087 | |
@Override |
2088 | |
public void failureCleanup(Exception e) { |
2089 | 0 | for (MasterObserver observer : observers) { |
2090 | |
try { |
2091 | 0 | observer.applicationFailed(e); |
2092 | |
|
2093 | 0 | } catch (RuntimeException re) { |
2094 | |
|
2095 | 0 | LOG.error(re.getClass().getName() + " from observer " + |
2096 | 0 | observer.getClass().getName(), re); |
2097 | 0 | } |
2098 | 0 | getContext().progress(); |
2099 | |
} |
2100 | 0 | } |
2101 | |
|
2102 | |
@Override |
2103 | |
public void cleanup(SuperstepState superstepState) throws IOException { |
2104 | 0 | ImmutableClassesGiraphConfiguration conf = getConfiguration(); |
2105 | |
|
2106 | |
|
2107 | |
|
2108 | |
|
2109 | |
|
2110 | 0 | String masterCleanedUpPath = cleanedUpPath + "/" + |
2111 | 0 | getTaskId() + MASTER_SUFFIX; |
2112 | |
try { |
2113 | 0 | String finalFinishedPath = |
2114 | 0 | getZkExt().createExt(masterCleanedUpPath, |
2115 | |
null, |
2116 | |
Ids.OPEN_ACL_UNSAFE, |
2117 | |
CreateMode.PERSISTENT, |
2118 | |
true); |
2119 | 0 | if (LOG.isInfoEnabled()) { |
2120 | 0 | LOG.info("cleanup: Notifying master its okay to cleanup with " + |
2121 | |
finalFinishedPath); |
2122 | |
} |
2123 | 0 | } catch (KeeperException.NodeExistsException e) { |
2124 | 0 | if (LOG.isInfoEnabled()) { |
2125 | 0 | LOG.info("cleanup: Couldn't create finished node '" + |
2126 | |
masterCleanedUpPath); |
2127 | |
} |
2128 | 0 | } catch (KeeperException e) { |
2129 | 0 | LOG.error("cleanup: Got KeeperException, continuing", e); |
2130 | 0 | } catch (InterruptedException e) { |
2131 | 0 | LOG.error("cleanup: Got InterruptedException, continuing", e); |
2132 | 0 | } |
2133 | |
|
2134 | 0 | if (isMaster) { |
2135 | 0 | getGraphTaskManager().setIsMaster(true); |
2136 | 0 | waitForWorkersToCleanup(); |
2137 | 0 | aggregateCountersFromWorkersAndMaster(); |
2138 | 0 | cleanUpZooKeeper(); |
2139 | |
|
2140 | 0 | if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE && |
2141 | 0 | GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) { |
2142 | 0 | boolean success = |
2143 | 0 | getFs().delete(new Path(checkpointBasePath), true); |
2144 | 0 | if (LOG.isInfoEnabled()) { |
2145 | 0 | LOG.info("cleanup: Removed HDFS checkpoint directory (" + |
2146 | |
checkpointBasePath + ") with return = " + |
2147 | 0 | success + " since the job " + getContext().getJobName() + |
2148 | |
" succeeded "); |
2149 | |
} |
2150 | |
} |
2151 | 0 | if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) { |
2152 | 0 | getFs().create(CheckpointingUtils.getCheckpointMarkPath(conf, |
2153 | 0 | getJobId()), true); |
2154 | 0 | failJob(new Exception("Checkpoint and halt requested. " + |
2155 | |
"Killing this job.")); |
2156 | |
} |
2157 | 0 | globalCommHandler.getAggregatorHandler().close(); |
2158 | 0 | masterClient.closeConnections(); |
2159 | 0 | masterServer.close(); |
2160 | |
} |
2161 | |
|
2162 | |
try { |
2163 | 0 | getZkExt().close(); |
2164 | 0 | } catch (InterruptedException e) { |
2165 | |
|
2166 | 0 | LOG.error("cleanup: Zookeeper failed to close", e); |
2167 | 0 | } |
2168 | 0 | } |
2169 | |
|
2170 | |
|
2171 | |
|
2172 | |
|
2173 | |
|
2174 | |
|
2175 | |
public final BspEvent getWorkerWroteCheckpointEvent() { |
2176 | 0 | return workerWroteCheckpoint; |
2177 | |
} |
2178 | |
|
2179 | |
|
2180 | |
|
2181 | |
|
2182 | |
|
2183 | |
|
2184 | |
|
2185 | |
public final BspEvent getSuperstepStateChangedEvent() { |
2186 | 0 | return superstepStateChanged; |
2187 | |
} |
2188 | |
|
2189 | |
|
2190 | |
|
2191 | |
|
2192 | |
|
2193 | |
|
2194 | |
private void checkHealthyWorkerFailure(String failedWorkerPath) { |
2195 | 0 | if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) { |
2196 | 0 | return; |
2197 | |
} |
2198 | |
|
2199 | 0 | Collection<PartitionOwner> partitionOwners = |
2200 | 0 | masterGraphPartitioner.getCurrentPartitionOwners(); |
2201 | 0 | String hostnameId = |
2202 | 0 | getHealthyHostnameIdFromPath(failedWorkerPath); |
2203 | 0 | for (PartitionOwner partitionOwner : partitionOwners) { |
2204 | 0 | WorkerInfo workerInfo = partitionOwner.getWorkerInfo(); |
2205 | 0 | WorkerInfo previousWorkerInfo = |
2206 | 0 | partitionOwner.getPreviousWorkerInfo(); |
2207 | 0 | if (workerInfo.getHostnameId().equals(hostnameId) || |
2208 | |
((previousWorkerInfo != null) && |
2209 | 0 | previousWorkerInfo.getHostnameId().equals(hostnameId))) { |
2210 | 0 | LOG.warn("checkHealthyWorkerFailure: " + |
2211 | |
"at least one healthy worker went down " + |
2212 | 0 | "for superstep " + getSuperstep() + " - " + |
2213 | |
hostnameId + ", will try to restart from " + |
2214 | |
"checkpointed superstep " + |
2215 | |
lastCheckpointedSuperstep); |
2216 | 0 | superstepStateChanged.signal(); |
2217 | |
} |
2218 | 0 | } |
2219 | 0 | } |
2220 | |
|
2221 | |
@Override |
2222 | |
public boolean processEvent(WatchedEvent event) { |
2223 | 0 | boolean foundEvent = false; |
2224 | 0 | if (event.getPath().contains(WORKER_HEALTHY_DIR) && |
2225 | 0 | (event.getType() == EventType.NodeDeleted)) { |
2226 | 0 | if (LOG.isDebugEnabled()) { |
2227 | 0 | LOG.debug("processEvent: Healthy worker died (node deleted) " + |
2228 | 0 | "in " + event.getPath()); |
2229 | |
} |
2230 | 0 | checkHealthyWorkerFailure(event.getPath()); |
2231 | 0 | superstepStateChanged.signal(); |
2232 | 0 | foundEvent = true; |
2233 | 0 | } else if (event.getPath().endsWith(METRICS_DIR) && |
2234 | 0 | event.getType() == EventType.NodeChildrenChanged) { |
2235 | 0 | if (LOG.isDebugEnabled()) { |
2236 | 0 | LOG.debug("processEvent: Worker finished (node change) " + |
2237 | |
"event - superstepStateChanged signaled"); |
2238 | |
} |
2239 | 0 | superstepStateChanged.signal(); |
2240 | 0 | foundEvent = true; |
2241 | 0 | } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) && |
2242 | 0 | event.getType() == EventType.NodeChildrenChanged) { |
2243 | 0 | if (LOG.isDebugEnabled()) { |
2244 | 0 | LOG.debug("processEvent: Worker wrote checkpoint (node change) " + |
2245 | |
"event - workerWroteCheckpoint signaled"); |
2246 | |
} |
2247 | 0 | workerWroteCheckpoint.signal(); |
2248 | 0 | foundEvent = true; |
2249 | |
} |
2250 | |
|
2251 | 0 | return foundEvent; |
2252 | |
} |
2253 | |
|
2254 | |
|
2255 | |
|
2256 | |
|
2257 | |
|
2258 | |
|
2259 | |
private void updateCounters(GlobalStats globalStats) { |
2260 | 0 | GiraphStats gs = GiraphStats.getInstance(); |
2261 | 0 | gs.getVertices().setValue(globalStats.getVertexCount()); |
2262 | 0 | gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount()); |
2263 | 0 | gs.getEdges().setValue(globalStats.getEdgeCount()); |
2264 | 0 | gs.getSentMessages().setValue(globalStats.getMessageCount()); |
2265 | 0 | gs.getSentMessageBytes().setValue(globalStats.getMessageBytesCount()); |
2266 | 0 | gs.getAggregateSentMessages().increment(globalStats.getMessageCount()); |
2267 | 0 | gs.getAggregateSentMessageBytes() |
2268 | 0 | .increment(globalStats.getMessageBytesCount()); |
2269 | 0 | gs.getAggregateOOCBytesLoaded() |
2270 | 0 | .increment(globalStats.getOocLoadBytesCount()); |
2271 | 0 | gs.getAggregateOOCBytesStored() |
2272 | 0 | .increment(globalStats.getOocStoreBytesCount()); |
2273 | |
|
2274 | |
|
2275 | 0 | int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue(); |
2276 | 0 | gs.getLowestGraphPercentageInMemory().setValue( |
2277 | 0 | Math.min(percentage, globalStats.getLowestGraphPercentageInMemory())); |
2278 | 0 | } |
2279 | |
} |