1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.bsp; |
20 | |
|
21 | |
import org.apache.giraph.conf.GiraphConstants; |
22 | |
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
23 | |
import org.apache.giraph.graph.GraphTaskManager; |
24 | |
import org.apache.giraph.job.JobProgressTracker; |
25 | |
import org.apache.giraph.partition.GraphPartitionerFactory; |
26 | |
import org.apache.giraph.utils.CheckpointingUtils; |
27 | |
import org.apache.giraph.worker.WorkerInfo; |
28 | |
import org.apache.giraph.writable.kryo.GiraphClassResolver; |
29 | |
import org.apache.giraph.zk.BspEvent; |
30 | |
import org.apache.giraph.zk.PredicateLock; |
31 | |
import org.apache.giraph.zk.ZooKeeperExt; |
32 | |
import org.apache.giraph.zk.ZooKeeperManager; |
33 | |
import org.apache.hadoop.fs.FileSystem; |
34 | |
import org.apache.hadoop.io.Writable; |
35 | |
import org.apache.hadoop.io.WritableComparable; |
36 | |
import org.apache.hadoop.mapreduce.Mapper; |
37 | |
import org.apache.log4j.Logger; |
38 | |
import org.apache.zookeeper.CreateMode; |
39 | |
import org.apache.zookeeper.KeeperException; |
40 | |
import org.apache.zookeeper.WatchedEvent; |
41 | |
import org.apache.zookeeper.Watcher; |
42 | |
import org.apache.zookeeper.Watcher.Event.EventType; |
43 | |
import org.apache.zookeeper.Watcher.Event.KeeperState; |
44 | |
import org.apache.zookeeper.ZooDefs.Ids; |
45 | |
import org.json.JSONException; |
46 | |
import org.json.JSONObject; |
47 | |
|
48 | |
import java.io.IOException; |
49 | |
import java.net.UnknownHostException; |
50 | |
import java.nio.charset.Charset; |
51 | |
import java.util.ArrayList; |
52 | |
import java.util.Collections; |
53 | |
import java.util.List; |
54 | |
|
55 | |
import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID; |
56 | |
|
57 | |
|
58 | |
|
59 | |
|
60 | |
|
61 | |
|
62 | |
|
63 | |
|
64 | |
@SuppressWarnings("rawtypes") |
65 | |
public abstract class BspService<I extends WritableComparable, |
66 | |
V extends Writable, E extends Writable> |
67 | |
implements Watcher, CentralizedService<I, V, E> { |
68 | |
|
69 | |
public static final long UNSET_SUPERSTEP = Long.MIN_VALUE; |
70 | |
|
71 | |
public static final long INPUT_SUPERSTEP = -1; |
72 | |
|
73 | |
public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE; |
74 | |
|
75 | |
public static final String BASE_DIR = "/_hadoopBsp"; |
76 | |
|
77 | |
public static final String MASTER_JOB_STATE_NODE = "/_masterJobState"; |
78 | |
|
79 | |
|
80 | |
public static final String INPUT_SPLITS_WORKER_DONE_DIR = |
81 | |
"/_inputSplitsWorkerDoneDir"; |
82 | |
|
83 | |
public static final String INPUT_SPLITS_ALL_DONE_NODE = |
84 | |
"/_inputSplitsAllDone"; |
85 | |
|
86 | |
public static final String KRYO_REGISTERED_CLASS_DIR = |
87 | |
"/_kryo"; |
88 | |
|
89 | |
public static final String APPLICATION_ATTEMPTS_DIR = |
90 | |
"/_applicationAttemptsDir"; |
91 | |
|
92 | |
public static final String MASTER_ELECTION_DIR = "/_masterElectionDir"; |
93 | |
|
94 | |
public static final String SUPERSTEP_DIR = "/_superstepDir"; |
95 | |
|
96 | |
public static final String COUNTERS_DIR = "/_counters"; |
97 | |
|
98 | |
public static final String METRICS_DIR = "/_metrics"; |
99 | |
|
100 | |
public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir"; |
101 | |
|
102 | |
public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir"; |
103 | |
|
104 | |
public static final String WORKER_WROTE_CHECKPOINT_DIR = |
105 | |
"/_workerWroteCheckpointDir"; |
106 | |
|
107 | |
public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir"; |
108 | |
|
109 | |
public static final String PARTITION_EXCHANGE_DIR = |
110 | |
"/_partitionExchangeDir"; |
111 | |
|
112 | |
public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished"; |
113 | |
|
114 | |
public static final String HALT_COMPUTATION_NODE = "/_haltComputation"; |
115 | |
|
116 | |
public static final String MEMORY_OBSERVER_DIR = "/_memoryObserver"; |
117 | |
|
118 | |
public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop"; |
119 | |
|
120 | |
public static final String CLEANED_UP_DIR = "/_cleanedUpDir"; |
121 | |
|
122 | |
public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey"; |
123 | |
|
124 | |
public static final String JSONOBJ_NUM_MESSAGE_BYTES_KEY = "_numMsgBytesKey"; |
125 | |
|
126 | |
public static final String JSONOBJ_METRICS_KEY = "_metricsKey"; |
127 | |
|
128 | |
|
129 | |
public static final String JSONOBJ_STATE_KEY = "_stateKey"; |
130 | |
|
131 | |
public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY = |
132 | |
"_applicationAttemptKey"; |
133 | |
|
134 | |
public static final String JSONOBJ_SUPERSTEP_KEY = |
135 | |
"_superstepKey"; |
136 | |
|
137 | |
public static final String WORKER_SUFFIX = "_worker"; |
138 | |
|
139 | |
public static final String MASTER_SUFFIX = "_master"; |
140 | |
|
141 | |
|
142 | 0 | private static final Logger LOG = Logger.getLogger(BspService.class); |
143 | |
|
144 | |
protected final String basePath; |
145 | |
|
146 | |
protected final String masterJobStatePath; |
147 | |
|
148 | |
protected final String inputSplitsWorkerDonePath; |
149 | |
|
150 | |
protected final String inputSplitsAllDonePath; |
151 | |
|
152 | |
protected final String applicationAttemptsPath; |
153 | |
|
154 | |
protected final String cleanedUpPath; |
155 | |
|
156 | |
protected final String checkpointBasePath; |
157 | |
|
158 | |
protected final String savedCheckpointBasePath; |
159 | |
|
160 | |
protected final String masterElectionPath; |
161 | |
|
162 | |
protected final String haltComputationPath; |
163 | |
|
164 | |
protected final String memoryObserverPath; |
165 | |
|
166 | |
protected final String kryoRegisteredClassPath; |
167 | |
|
168 | |
private final ZooKeeperExt zk; |
169 | |
|
170 | |
private final BspEvent connectedEvent; |
171 | |
|
172 | |
private final BspEvent workerHealthRegistrationChanged; |
173 | |
|
174 | |
private final BspEvent applicationAttemptChanged; |
175 | |
|
176 | |
private final BspEvent inputSplitsWorkerDoneEvent; |
177 | |
|
178 | |
private final BspEvent inputSplitsAllDoneEvent; |
179 | |
|
180 | |
private final BspEvent superstepFinished; |
181 | |
|
182 | |
private final BspEvent masterElectionChildrenChanged; |
183 | |
|
184 | |
private final BspEvent cleanedUpChildrenChanged; |
185 | |
|
186 | |
|
187 | |
private final BspEvent writtenCountersToZK; |
188 | |
|
189 | 0 | private final List<BspEvent> registeredBspEvents = |
190 | |
new ArrayList<BspEvent>(); |
191 | |
|
192 | |
private final ImmutableClassesGiraphConfiguration<I, V, E> conf; |
193 | |
|
194 | |
private final Mapper<?, ?, ?, ?>.Context context; |
195 | |
|
196 | 0 | private long cachedSuperstep = UNSET_SUPERSTEP; |
197 | |
|
198 | 0 | private long restartedSuperstep = UNSET_SUPERSTEP; |
199 | |
|
200 | 0 | private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT; |
201 | |
|
202 | |
private final String jobId; |
203 | |
|
204 | |
private final int taskId; |
205 | |
|
206 | |
private final String hostname; |
207 | |
|
208 | |
private final String hostnameTaskId; |
209 | |
|
210 | |
private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory; |
211 | |
|
212 | |
private final GraphTaskManager<I, V, E> graphTaskManager; |
213 | |
|
214 | |
private final FileSystem fs; |
215 | |
|
216 | |
|
217 | |
|
218 | |
|
219 | |
|
220 | |
|
221 | |
|
222 | |
public BspService( |
223 | |
Mapper<?, ?, ?, ?>.Context context, |
224 | 0 | GraphTaskManager<I, V, E> graphTaskManager) { |
225 | 0 | this.connectedEvent = new PredicateLock(context); |
226 | 0 | this.workerHealthRegistrationChanged = new PredicateLock(context); |
227 | 0 | this.applicationAttemptChanged = new PredicateLock(context); |
228 | 0 | this.inputSplitsWorkerDoneEvent = new PredicateLock(context); |
229 | 0 | this.inputSplitsAllDoneEvent = new PredicateLock(context); |
230 | 0 | this.superstepFinished = new PredicateLock(context); |
231 | 0 | this.masterElectionChildrenChanged = new PredicateLock(context); |
232 | 0 | this.cleanedUpChildrenChanged = new PredicateLock(context); |
233 | 0 | this.writtenCountersToZK = new PredicateLock(context); |
234 | |
|
235 | 0 | registerBspEvent(connectedEvent); |
236 | 0 | registerBspEvent(workerHealthRegistrationChanged); |
237 | 0 | registerBspEvent(inputSplitsWorkerDoneEvent); |
238 | 0 | registerBspEvent(inputSplitsAllDoneEvent); |
239 | 0 | registerBspEvent(applicationAttemptChanged); |
240 | 0 | registerBspEvent(superstepFinished); |
241 | 0 | registerBspEvent(masterElectionChildrenChanged); |
242 | 0 | registerBspEvent(cleanedUpChildrenChanged); |
243 | 0 | registerBspEvent(writtenCountersToZK); |
244 | |
|
245 | 0 | this.context = context; |
246 | 0 | this.graphTaskManager = graphTaskManager; |
247 | 0 | this.conf = graphTaskManager.getConf(); |
248 | |
|
249 | 0 | this.jobId = conf.getJobId(); |
250 | 0 | this.restartedSuperstep = conf.getLong( |
251 | |
GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP); |
252 | |
try { |
253 | 0 | this.hostname = conf.getLocalHostname(); |
254 | 0 | } catch (UnknownHostException e) { |
255 | 0 | throw new RuntimeException(e); |
256 | 0 | } |
257 | 0 | this.graphPartitionerFactory = conf.createGraphPartitioner(); |
258 | |
|
259 | 0 | basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId; |
260 | 0 | if (LOG.isInfoEnabled()) { |
261 | 0 | LOG.info(String.format("%s: %s", |
262 | |
GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP, basePath)); |
263 | |
} |
264 | 0 | masterJobStatePath = basePath + MASTER_JOB_STATE_NODE; |
265 | 0 | inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR; |
266 | 0 | inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE; |
267 | 0 | applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR; |
268 | 0 | cleanedUpPath = basePath + CLEANED_UP_DIR; |
269 | 0 | kryoRegisteredClassPath = basePath + KRYO_REGISTERED_CLASS_DIR; |
270 | |
|
271 | |
|
272 | 0 | String restartJobId = RESTART_JOB_ID.get(conf); |
273 | |
|
274 | 0 | savedCheckpointBasePath = |
275 | 0 | CheckpointingUtils.getCheckpointBasePath(getConfiguration(), |
276 | 0 | restartJobId == null ? getJobId() : restartJobId); |
277 | |
|
278 | 0 | checkpointBasePath = CheckpointingUtils. |
279 | 0 | getCheckpointBasePath(getConfiguration(), getJobId()); |
280 | |
|
281 | 0 | masterElectionPath = basePath + MASTER_ELECTION_DIR; |
282 | 0 | String serverPortList = graphTaskManager.getZookeeperList(); |
283 | 0 | haltComputationPath = basePath + HALT_COMPUTATION_NODE; |
284 | 0 | memoryObserverPath = basePath + MEMORY_OBSERVER_DIR; |
285 | 0 | getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP, |
286 | |
haltComputationPath); |
287 | 0 | if (LOG.isInfoEnabled()) { |
288 | 0 | LOG.info("BspService: Path to create to halt is " + haltComputationPath); |
289 | |
} |
290 | 0 | if (LOG.isInfoEnabled()) { |
291 | 0 | LOG.info("BspService: Connecting to ZooKeeper with job " + jobId + |
292 | 0 | ", partition " + conf.getTaskPartition() + " on " + serverPortList); |
293 | |
} |
294 | |
try { |
295 | 0 | this.zk = new ZooKeeperExt(serverPortList, |
296 | 0 | conf.getZooKeeperSessionTimeout(), |
297 | 0 | conf.getZookeeperOpsMaxAttempts(), |
298 | 0 | conf.getZookeeperOpsRetryWaitMsecs(), |
299 | |
this, |
300 | |
context); |
301 | 0 | connectedEvent.waitForTimeoutOrFail( |
302 | 0 | GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(conf)); |
303 | 0 | this.fs = FileSystem.get(getConfiguration()); |
304 | 0 | } catch (IOException e) { |
305 | 0 | throw new RuntimeException(e); |
306 | 0 | } |
307 | |
|
308 | 0 | boolean disableGiraphResolver = |
309 | 0 | GiraphConstants.DISABLE_GIRAPH_CLASS_RESOLVER.get(conf); |
310 | 0 | if (!disableGiraphResolver) { |
311 | 0 | GiraphClassResolver.setZookeeperInfo(zk, kryoRegisteredClassPath); |
312 | |
} |
313 | 0 | this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() + |
314 | 0 | conf.getTaskPartition(); |
315 | 0 | this.hostnameTaskId = hostname + "_" + getTaskId(); |
316 | |
|
317 | |
|
318 | 0 | if (restartJobId != null && |
319 | |
restartedSuperstep == UNSET_SUPERSTEP) { |
320 | |
try { |
321 | 0 | restartedSuperstep = getLastCheckpointedSuperstep(); |
322 | 0 | } catch (IOException e) { |
323 | 0 | throw new RuntimeException(e); |
324 | 0 | } |
325 | |
} |
326 | 0 | this.cachedSuperstep = restartedSuperstep; |
327 | 0 | if ((restartedSuperstep != UNSET_SUPERSTEP) && |
328 | |
(restartedSuperstep < 0)) { |
329 | 0 | throw new IllegalArgumentException( |
330 | |
"BspService: Invalid superstep to restart - " + |
331 | |
restartedSuperstep); |
332 | |
} |
333 | 0 | } |
334 | |
|
335 | |
|
336 | |
|
337 | |
|
338 | |
|
339 | |
|
340 | |
|
341 | |
public static long getSuperstepFromPath(String path) { |
342 | 0 | int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR); |
343 | 0 | if (foundSuperstepStart == -1) { |
344 | 0 | throw new IllegalArgumentException( |
345 | |
"getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR + |
346 | |
"from " + path); |
347 | |
} |
348 | 0 | foundSuperstepStart += SUPERSTEP_DIR.length() + 1; |
349 | 0 | int endIndex = foundSuperstepStart + |
350 | 0 | path.substring(foundSuperstepStart).indexOf("/"); |
351 | 0 | if (endIndex == -1) { |
352 | 0 | throw new IllegalArgumentException( |
353 | |
"getSuperstepFromPath: Cannot find end of superstep from " + |
354 | |
path); |
355 | |
} |
356 | 0 | if (LOG.isTraceEnabled()) { |
357 | 0 | LOG.trace("getSuperstepFromPath: Got path=" + path + |
358 | |
", start=" + foundSuperstepStart + ", end=" + endIndex); |
359 | |
} |
360 | 0 | return Long.parseLong(path.substring(foundSuperstepStart, endIndex)); |
361 | |
} |
362 | |
|
363 | |
|
364 | |
|
365 | |
|
366 | |
|
367 | |
|
368 | |
|
369 | |
public static String getHealthyHostnameIdFromPath(String path) { |
370 | 0 | int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR); |
371 | 0 | if (foundWorkerHealthyStart == -1) { |
372 | 0 | throw new IllegalArgumentException( |
373 | |
"getHealthyHostnameidFromPath: Couldn't find " + |
374 | |
WORKER_HEALTHY_DIR + " from " + path); |
375 | |
} |
376 | 0 | foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length(); |
377 | 0 | return path.substring(foundWorkerHealthyStart); |
378 | |
} |
379 | |
|
380 | |
|
381 | |
|
382 | |
|
383 | |
|
384 | |
|
385 | |
|
386 | |
|
387 | |
public final String getSuperstepPath(long attempt) { |
388 | 0 | return applicationAttemptsPath + "/" + attempt + SUPERSTEP_DIR; |
389 | |
} |
390 | |
|
391 | |
|
392 | |
|
393 | |
|
394 | |
|
395 | |
|
396 | |
|
397 | |
|
398 | |
|
399 | |
public final String getWorkerInfoHealthyPath(long attempt, |
400 | |
long superstep) { |
401 | 0 | return applicationAttemptsPath + "/" + attempt + |
402 | |
SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR; |
403 | |
} |
404 | |
|
405 | |
|
406 | |
|
407 | |
|
408 | |
|
409 | |
|
410 | |
|
411 | |
|
412 | |
|
413 | |
public final String getWorkerInfoUnhealthyPath(long attempt, |
414 | |
long superstep) { |
415 | 0 | return applicationAttemptsPath + "/" + attempt + |
416 | |
SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR; |
417 | |
} |
418 | |
|
419 | |
|
420 | |
|
421 | |
|
422 | |
|
423 | |
|
424 | |
|
425 | |
|
426 | |
|
427 | |
public final String getWorkerWroteCheckpointPath(long attempt, |
428 | |
long superstep) { |
429 | 0 | return applicationAttemptsPath + "/" + attempt + |
430 | |
SUPERSTEP_DIR + "/" + superstep + WORKER_WROTE_CHECKPOINT_DIR; |
431 | |
} |
432 | |
|
433 | |
|
434 | |
|
435 | |
|
436 | |
|
437 | |
|
438 | |
|
439 | |
|
440 | |
|
441 | |
public final String getWorkerMetricsFinishedPath( |
442 | |
long attempt, long superstep) { |
443 | 0 | return applicationAttemptsPath + "/" + attempt + |
444 | |
SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR + METRICS_DIR; |
445 | |
} |
446 | |
|
447 | |
|
448 | |
|
449 | |
|
450 | |
|
451 | |
|
452 | |
|
453 | |
|
454 | |
|
455 | |
public final String getWorkerCountersFinishedPath( |
456 | |
long attempt, long superstep) { |
457 | 0 | return applicationAttemptsPath + "/" + attempt + |
458 | |
SUPERSTEP_DIR + "/" + superstep + |
459 | |
WORKER_FINISHED_DIR + COUNTERS_DIR; |
460 | |
} |
461 | |
|
462 | |
|
463 | |
|
464 | |
|
465 | |
|
466 | |
|
467 | |
|
468 | |
|
469 | |
public final String getPartitionExchangePath(long attempt, |
470 | |
long superstep) { |
471 | 0 | return applicationAttemptsPath + "/" + attempt + |
472 | |
SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR; |
473 | |
} |
474 | |
|
475 | |
|
476 | |
|
477 | |
|
478 | |
|
479 | |
|
480 | |
|
481 | |
|
482 | |
|
483 | |
|
484 | |
public final String getPartitionExchangeWorkerPath(long attempt, |
485 | |
long superstep, |
486 | |
WorkerInfo workerInfo) { |
487 | 0 | return getPartitionExchangePath(attempt, superstep) + |
488 | 0 | "/" + workerInfo.getHostnameId(); |
489 | |
} |
490 | |
|
491 | |
|
492 | |
|
493 | |
|
494 | |
|
495 | |
|
496 | |
|
497 | |
|
498 | |
public final String getSuperstepFinishedPath(long attempt, long superstep) { |
499 | 0 | return applicationAttemptsPath + "/" + attempt + |
500 | |
SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE; |
501 | |
} |
502 | |
|
503 | |
|
504 | |
|
505 | |
|
506 | |
|
507 | |
|
508 | |
|
509 | |
|
510 | |
public final String getCheckpointBasePath(long superstep) { |
511 | 0 | return checkpointBasePath + "/" + superstep; |
512 | |
} |
513 | |
|
514 | |
|
515 | |
|
516 | |
|
517 | |
|
518 | |
|
519 | |
|
520 | |
public final String getSavedCheckpointBasePath(long superstep) { |
521 | 0 | return savedCheckpointBasePath + "/" + superstep; |
522 | |
} |
523 | |
|
524 | |
|
525 | |
|
526 | |
|
527 | |
|
528 | |
|
529 | |
|
530 | |
public final ZooKeeperExt getZkExt() { |
531 | 0 | return zk; |
532 | |
} |
533 | |
|
534 | |
@Override |
535 | |
public final long getRestartedSuperstep() { |
536 | 0 | return restartedSuperstep; |
537 | |
} |
538 | |
|
539 | |
|
540 | |
|
541 | |
|
542 | |
|
543 | |
|
544 | |
public final void setRestartedSuperstep(long superstep) { |
545 | 0 | if (superstep < INPUT_SUPERSTEP) { |
546 | 0 | throw new IllegalArgumentException( |
547 | |
"setRestartedSuperstep: Bad argument " + superstep); |
548 | |
} |
549 | 0 | restartedSuperstep = superstep; |
550 | 0 | } |
551 | |
|
552 | |
|
553 | |
|
554 | |
|
555 | |
|
556 | |
|
557 | |
public final FileSystem getFs() { |
558 | 0 | return fs; |
559 | |
} |
560 | |
|
561 | |
public final ImmutableClassesGiraphConfiguration<I, V, E> |
562 | |
getConfiguration() { |
563 | 0 | return conf; |
564 | |
} |
565 | |
|
566 | |
public final Mapper<?, ?, ?, ?>.Context getContext() { |
567 | 0 | return context; |
568 | |
} |
569 | |
|
570 | |
public final String getHostname() { |
571 | 0 | return hostname; |
572 | |
} |
573 | |
|
574 | |
public final String getHostnameTaskId() { |
575 | 0 | return hostnameTaskId; |
576 | |
} |
577 | |
|
578 | |
public final int getTaskId() { |
579 | 0 | return taskId; |
580 | |
} |
581 | |
|
582 | |
public final GraphTaskManager<I, V, E> getGraphTaskManager() { |
583 | 0 | return graphTaskManager; |
584 | |
} |
585 | |
|
586 | |
public final BspEvent getWorkerHealthRegistrationChangedEvent() { |
587 | 0 | return workerHealthRegistrationChanged; |
588 | |
} |
589 | |
|
590 | |
public final BspEvent getApplicationAttemptChangedEvent() { |
591 | 0 | return applicationAttemptChanged; |
592 | |
} |
593 | |
|
594 | |
public final BspEvent getInputSplitsWorkerDoneEvent() { |
595 | 0 | return inputSplitsWorkerDoneEvent; |
596 | |
} |
597 | |
|
598 | |
public final BspEvent getInputSplitsAllDoneEvent() { |
599 | 0 | return inputSplitsAllDoneEvent; |
600 | |
} |
601 | |
|
602 | |
public final BspEvent getSuperstepFinishedEvent() { |
603 | 0 | return superstepFinished; |
604 | |
} |
605 | |
|
606 | |
|
607 | |
public final BspEvent getMasterElectionChildrenChangedEvent() { |
608 | 0 | return masterElectionChildrenChanged; |
609 | |
} |
610 | |
|
611 | |
public final BspEvent getCleanedUpChildrenChangedEvent() { |
612 | 0 | return cleanedUpChildrenChanged; |
613 | |
} |
614 | |
|
615 | |
public final BspEvent getWrittenCountersToZKEvent() { |
616 | 0 | return writtenCountersToZK; |
617 | |
} |
618 | |
|
619 | |
|
620 | |
|
621 | |
|
622 | |
|
623 | |
|
624 | |
|
625 | |
public final JSONObject getJobState() { |
626 | |
try { |
627 | 0 | getZkExt().createExt(masterJobStatePath, |
628 | |
null, |
629 | |
Ids.OPEN_ACL_UNSAFE, |
630 | |
CreateMode.PERSISTENT, |
631 | |
true); |
632 | 0 | } catch (KeeperException.NodeExistsException e) { |
633 | 0 | LOG.info("getJobState: Job state already exists (" + |
634 | |
masterJobStatePath + ")"); |
635 | 0 | } catch (KeeperException e) { |
636 | 0 | throw new IllegalStateException("Failed to create job state path " + |
637 | |
"due to KeeperException", e); |
638 | 0 | } catch (InterruptedException e) { |
639 | 0 | throw new IllegalStateException("Failed to create job state path " + |
640 | |
"due to InterruptedException", e); |
641 | 0 | } |
642 | 0 | String jobState = null; |
643 | |
try { |
644 | 0 | List<String> childList = |
645 | 0 | getZkExt().getChildrenExt( |
646 | |
masterJobStatePath, true, true, true); |
647 | 0 | if (childList.isEmpty()) { |
648 | 0 | return null; |
649 | |
} |
650 | 0 | jobState = |
651 | 0 | new String(getZkExt().getData(childList.get(childList.size() - 1), |
652 | 0 | true, null), Charset.defaultCharset()); |
653 | 0 | } catch (KeeperException.NoNodeException e) { |
654 | 0 | LOG.info("getJobState: Job state path is empty! - " + |
655 | |
masterJobStatePath); |
656 | 0 | } catch (KeeperException e) { |
657 | 0 | throw new IllegalStateException("Failed to get job state path " + |
658 | |
"children due to KeeperException", e); |
659 | 0 | } catch (InterruptedException e) { |
660 | 0 | throw new IllegalStateException("Failed to get job state path " + |
661 | |
"children due to InterruptedException", e); |
662 | 0 | } |
663 | |
try { |
664 | 0 | return new JSONObject(jobState); |
665 | 0 | } catch (JSONException e) { |
666 | 0 | throw new RuntimeException( |
667 | |
"getJobState: Failed to parse job state " + jobState); |
668 | |
} |
669 | |
} |
670 | |
|
671 | |
|
672 | |
|
673 | |
|
674 | |
|
675 | |
|
676 | |
public final String getJobId() { |
677 | 0 | return jobId; |
678 | |
} |
679 | |
|
680 | |
|
681 | |
|
682 | |
|
683 | |
|
684 | |
|
685 | |
public final long getApplicationAttempt() { |
686 | 0 | if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) { |
687 | 0 | return cachedApplicationAttempt; |
688 | |
} |
689 | |
try { |
690 | 0 | getZkExt().createExt(applicationAttemptsPath, |
691 | |
null, |
692 | |
Ids.OPEN_ACL_UNSAFE, |
693 | |
CreateMode.PERSISTENT, |
694 | |
true); |
695 | 0 | } catch (KeeperException.NodeExistsException e) { |
696 | 0 | LOG.info("getApplicationAttempt: Node " + |
697 | |
applicationAttemptsPath + " already exists!"); |
698 | 0 | } catch (KeeperException e) { |
699 | 0 | throw new IllegalStateException("Couldn't create application " + |
700 | |
"attempts path due to KeeperException", e); |
701 | 0 | } catch (InterruptedException e) { |
702 | 0 | throw new IllegalStateException("Couldn't create application " + |
703 | |
"attempts path due to InterruptedException", e); |
704 | 0 | } |
705 | |
try { |
706 | 0 | List<String> attemptList = |
707 | 0 | getZkExt().getChildrenExt( |
708 | |
applicationAttemptsPath, true, false, false); |
709 | 0 | if (attemptList.isEmpty()) { |
710 | 0 | cachedApplicationAttempt = 0; |
711 | |
} else { |
712 | 0 | cachedApplicationAttempt = |
713 | 0 | Long.parseLong(Collections.max(attemptList)); |
714 | |
} |
715 | 0 | } catch (KeeperException e) { |
716 | 0 | throw new IllegalStateException("Couldn't get application " + |
717 | |
"attempts to KeeperException", e); |
718 | 0 | } catch (InterruptedException e) { |
719 | 0 | throw new IllegalStateException("Couldn't get application " + |
720 | |
"attempts to InterruptedException", e); |
721 | 0 | } |
722 | |
|
723 | 0 | return cachedApplicationAttempt; |
724 | |
} |
725 | |
|
726 | |
|
727 | |
|
728 | |
|
729 | |
|
730 | |
|
731 | |
public final long getSuperstep() { |
732 | 0 | if (cachedSuperstep != UNSET_SUPERSTEP) { |
733 | 0 | return cachedSuperstep; |
734 | |
} |
735 | 0 | String superstepPath = getSuperstepPath(getApplicationAttempt()); |
736 | |
try { |
737 | 0 | getZkExt().createExt(superstepPath, |
738 | |
null, |
739 | |
Ids.OPEN_ACL_UNSAFE, |
740 | |
CreateMode.PERSISTENT, |
741 | |
true); |
742 | 0 | } catch (KeeperException.NodeExistsException e) { |
743 | 0 | if (LOG.isInfoEnabled()) { |
744 | 0 | LOG.info("getApplicationAttempt: Node " + |
745 | |
applicationAttemptsPath + " already exists!"); |
746 | |
} |
747 | 0 | } catch (KeeperException e) { |
748 | 0 | throw new IllegalStateException( |
749 | |
"getSuperstep: KeeperException", e); |
750 | 0 | } catch (InterruptedException e) { |
751 | 0 | throw new IllegalStateException( |
752 | |
"getSuperstep: InterruptedException", e); |
753 | 0 | } |
754 | |
|
755 | |
List<String> superstepList; |
756 | |
try { |
757 | 0 | superstepList = |
758 | 0 | getZkExt().getChildrenExt(superstepPath, true, false, false); |
759 | 0 | } catch (KeeperException e) { |
760 | 0 | throw new IllegalStateException( |
761 | |
"getSuperstep: KeeperException", e); |
762 | 0 | } catch (InterruptedException e) { |
763 | 0 | throw new IllegalStateException( |
764 | |
"getSuperstep: InterruptedException", e); |
765 | 0 | } |
766 | 0 | if (superstepList.isEmpty()) { |
767 | 0 | cachedSuperstep = INPUT_SUPERSTEP; |
768 | |
} else { |
769 | 0 | cachedSuperstep = |
770 | 0 | Long.parseLong(Collections.max(superstepList)); |
771 | |
} |
772 | |
|
773 | 0 | return cachedSuperstep; |
774 | |
} |
775 | |
|
776 | |
|
777 | |
|
778 | |
|
779 | |
public final void incrCachedSuperstep() { |
780 | 0 | if (cachedSuperstep == UNSET_SUPERSTEP) { |
781 | 0 | throw new IllegalStateException( |
782 | |
"incrSuperstep: Invalid unset cached superstep " + |
783 | |
UNSET_SUPERSTEP); |
784 | |
} |
785 | 0 | ++cachedSuperstep; |
786 | 0 | } |
787 | |
|
788 | |
|
789 | |
|
790 | |
|
791 | |
|
792 | |
|
793 | |
|
794 | |
public final void setCachedSuperstep(long superstep) { |
795 | 0 | cachedSuperstep = superstep; |
796 | 0 | } |
797 | |
|
798 | |
|
799 | |
|
800 | |
|
801 | |
|
802 | |
|
803 | |
|
804 | |
public final void setApplicationAttempt(long applicationAttempt) { |
805 | 0 | cachedApplicationAttempt = applicationAttempt; |
806 | 0 | String superstepPath = getSuperstepPath(cachedApplicationAttempt); |
807 | |
try { |
808 | 0 | getZkExt().createExt(superstepPath, |
809 | |
null, |
810 | |
Ids.OPEN_ACL_UNSAFE, |
811 | |
CreateMode.PERSISTENT, |
812 | |
true); |
813 | 0 | } catch (KeeperException.NodeExistsException e) { |
814 | 0 | throw new IllegalArgumentException( |
815 | |
"setApplicationAttempt: Attempt already exists! - " + |
816 | |
superstepPath, e); |
817 | 0 | } catch (KeeperException e) { |
818 | 0 | throw new RuntimeException( |
819 | |
"setApplicationAttempt: KeeperException - " + |
820 | |
superstepPath, e); |
821 | 0 | } catch (InterruptedException e) { |
822 | 0 | throw new RuntimeException( |
823 | |
"setApplicationAttempt: InterruptedException - " + |
824 | |
superstepPath, e); |
825 | 0 | } |
826 | 0 | } |
827 | |
|
828 | |
|
829 | |
|
830 | |
|
831 | |
|
832 | |
|
833 | |
|
834 | |
|
835 | |
public void registerBspEvent(BspEvent event) { |
836 | 0 | registeredBspEvents.add(event); |
837 | 0 | } |
838 | |
|
839 | |
|
840 | |
|
841 | |
|
842 | |
|
843 | |
|
844 | |
protected GraphPartitionerFactory<I, V, E> getGraphPartitionerFactory() { |
845 | 0 | return graphPartitionerFactory; |
846 | |
} |
847 | |
|
848 | |
|
849 | |
|
850 | |
|
851 | |
|
852 | |
|
853 | |
|
854 | |
|
855 | |
protected boolean processEvent(WatchedEvent event) { |
856 | 0 | return false; |
857 | |
} |
858 | |
|
859 | |
@Override |
860 | |
public final void process(WatchedEvent event) { |
861 | |
|
862 | |
|
863 | 0 | if (LOG.isDebugEnabled()) { |
864 | 0 | LOG.debug("process: Got a new event, path = " + event.getPath() + |
865 | 0 | ", type = " + event.getType() + ", state = " + |
866 | 0 | event.getState()); |
867 | |
} |
868 | |
|
869 | 0 | if ((event.getPath() == null) && (event.getType() == EventType.None)) { |
870 | 0 | if (event.getState() == KeeperState.Disconnected) { |
871 | |
|
872 | 0 | for (BspEvent bspEvent : registeredBspEvents) { |
873 | 0 | bspEvent.signal(); |
874 | 0 | } |
875 | 0 | LOG.warn("process: Disconnected from ZooKeeper (will automatically " + |
876 | |
"try to recover) " + event); |
877 | 0 | } else if (event.getState() == KeeperState.SyncConnected) { |
878 | 0 | if (LOG.isInfoEnabled()) { |
879 | 0 | LOG.info("process: Asynchronous connection complete."); |
880 | |
} |
881 | 0 | connectedEvent.signal(); |
882 | |
} else { |
883 | 0 | LOG.warn("process: Got unknown null path event " + event); |
884 | |
} |
885 | 0 | return; |
886 | |
} |
887 | |
|
888 | 0 | boolean eventProcessed = false; |
889 | 0 | if (event.getPath().startsWith(masterJobStatePath)) { |
890 | |
|
891 | |
|
892 | 0 | masterElectionChildrenChanged.signal(); |
893 | 0 | eventProcessed = true; |
894 | 0 | } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) || |
895 | 0 | event.getPath().contains(WORKER_UNHEALTHY_DIR)) && |
896 | 0 | (event.getType() == EventType.NodeChildrenChanged)) { |
897 | 0 | if (LOG.isDebugEnabled()) { |
898 | 0 | LOG.debug("process: workerHealthRegistrationChanged " + |
899 | |
"(worker health reported - healthy/unhealthy )"); |
900 | |
} |
901 | 0 | workerHealthRegistrationChanged.signal(); |
902 | 0 | eventProcessed = true; |
903 | 0 | } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) && |
904 | 0 | event.getType() == EventType.NodeCreated) { |
905 | 0 | if (LOG.isInfoEnabled()) { |
906 | 0 | LOG.info("process: all input splits done"); |
907 | |
} |
908 | 0 | inputSplitsAllDoneEvent.signal(); |
909 | 0 | eventProcessed = true; |
910 | 0 | } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) && |
911 | 0 | event.getType() == EventType.NodeChildrenChanged) { |
912 | 0 | if (LOG.isDebugEnabled()) { |
913 | 0 | LOG.debug("process: worker done reading input splits"); |
914 | |
} |
915 | 0 | inputSplitsWorkerDoneEvent.signal(); |
916 | 0 | eventProcessed = true; |
917 | 0 | } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) && |
918 | 0 | event.getType() == EventType.NodeCreated) { |
919 | 0 | if (LOG.isInfoEnabled()) { |
920 | 0 | LOG.info("process: superstepFinished signaled"); |
921 | |
} |
922 | 0 | superstepFinished.signal(); |
923 | 0 | eventProcessed = true; |
924 | 0 | } else if (event.getPath().endsWith(applicationAttemptsPath) && |
925 | 0 | event.getType() == EventType.NodeChildrenChanged) { |
926 | 0 | if (LOG.isInfoEnabled()) { |
927 | 0 | LOG.info("process: applicationAttemptChanged signaled"); |
928 | |
} |
929 | 0 | applicationAttemptChanged.signal(); |
930 | 0 | eventProcessed = true; |
931 | 0 | } else if (event.getPath().contains(MASTER_ELECTION_DIR) && |
932 | 0 | event.getType() == EventType.NodeChildrenChanged) { |
933 | 0 | if (LOG.isInfoEnabled()) { |
934 | 0 | LOG.info("process: masterElectionChildrenChanged signaled"); |
935 | |
} |
936 | 0 | masterElectionChildrenChanged.signal(); |
937 | 0 | eventProcessed = true; |
938 | 0 | } else if (event.getPath().equals(cleanedUpPath) && |
939 | 0 | event.getType() == EventType.NodeChildrenChanged) { |
940 | 0 | if (LOG.isInfoEnabled()) { |
941 | 0 | LOG.info("process: cleanedUpChildrenChanged signaled"); |
942 | |
} |
943 | 0 | cleanedUpChildrenChanged.signal(); |
944 | 0 | eventProcessed = true; |
945 | 0 | } else if (event.getPath().endsWith(COUNTERS_DIR) && |
946 | 0 | event.getType() == EventType.NodeChildrenChanged) { |
947 | 0 | LOG.info("process: writtenCountersToZK signaled"); |
948 | 0 | getWrittenCountersToZKEvent().signal(); |
949 | 0 | eventProcessed = true; |
950 | |
} |
951 | |
|
952 | 0 | if (!(processEvent(event)) && (!eventProcessed)) { |
953 | 0 | LOG.warn("process: Unknown and unprocessed event (path=" + |
954 | 0 | event.getPath() + ", type=" + event.getType() + |
955 | 0 | ", state=" + event.getState() + ")"); |
956 | |
} |
957 | 0 | } |
958 | |
|
959 | |
|
960 | |
|
961 | |
|
962 | |
|
963 | |
|
964 | |
|
965 | |
protected long getLastCheckpointedSuperstep() throws IOException { |
966 | 0 | return CheckpointingUtils.getLastCheckpointedSuperstep(getFs(), |
967 | |
savedCheckpointBasePath); |
968 | |
} |
969 | |
|
970 | |
@Override |
971 | |
public JobProgressTracker getJobProgressTracker() { |
972 | 0 | return getGraphTaskManager().getJobProgressTracker(); |
973 | |
} |
974 | |
|
975 | |
|
976 | |
|
977 | |
|
978 | |
|
979 | |
|
980 | |
|
981 | |
|
982 | |
|
983 | |
|
984 | |
|
985 | |
protected int getWorkerId(WorkerInfo workerInfo) { |
986 | 0 | return getWorkerInfoList().indexOf(workerInfo); |
987 | |
} |
988 | |
|
989 | |
|
990 | |
|
991 | |
|
992 | |
|
993 | |
|
994 | |
protected WorkerInfo getWorkerInfoById(int id) { |
995 | 0 | return getWorkerInfoList().get(id); |
996 | |
} |
997 | |
} |