1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.zk;
20
21 import java.io.IOException;
22
23 import org.apache.hadoop.util.Progressable;
24 import org.apache.log4j.Logger;
25 import org.apache.zookeeper.KeeperException;
26 import org.apache.zookeeper.CreateMode;
27 import org.apache.zookeeper.data.ACL;
28 import org.apache.zookeeper.data.Stat;
29
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.Comparator;
33 import java.util.List;
34
35 import org.apache.zookeeper.Watcher;
36 import org.apache.zookeeper.ZooKeeper;
37
38
39
40
41
42
43
44 public class ZooKeeperExt {
45
46 public static final int SEQUENCE_NUMBER_LENGTH = 10;
47
48 private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
49
50 private final ZooKeeper zooKeeper;
51
52 private final Progressable progressable;
53
54 private final int maxRetryAttempts;
55
56 private final int retryWaitMsecs;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public ZooKeeperExt(String connectString,
79 int sessionTimeout,
80 int maxRetryAttempts,
81 int retryWaitMsecs,
82 Watcher watcher) throws IOException {
83 this(connectString, sessionTimeout, maxRetryAttempts,
84 retryWaitMsecs, watcher, null);
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 public ZooKeeperExt(String connectString,
109 int sessionTimeout,
110 int maxRetryAttempts,
111 int retryWaitMsecs,
112 Watcher watcher,
113 Progressable progressable) throws IOException {
114 this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
115 this.progressable = progressable;
116 this.maxRetryAttempts = maxRetryAttempts;
117 this.retryWaitMsecs = retryWaitMsecs;
118 }
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 public String createExt(
135 final String path,
136 byte[] data,
137 List<ACL> acl,
138 CreateMode createMode,
139 boolean recursive) throws KeeperException, InterruptedException {
140 if (LOG.isDebugEnabled()) {
141 LOG.debug("createExt: Creating path " + path);
142 }
143
144 int attempt = 0;
145 while (attempt < maxRetryAttempts) {
146 try {
147 if (!recursive) {
148 return zooKeeper.create(path, data, acl, createMode);
149 }
150
151 try {
152 return zooKeeper.create(path, data, acl, createMode);
153 } catch (KeeperException.NoNodeException e) {
154 if (LOG.isDebugEnabled()) {
155 LOG.debug("createExt: Cannot directly create node " + path);
156 }
157 }
158
159 int pos = path.indexOf("/", 1);
160 for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
161 try {
162 if (progressable != null) {
163 progressable.progress();
164 }
165 String filePath = path.substring(0, pos);
166 if (zooKeeper.exists(filePath, false) == null) {
167 zooKeeper.create(
168 filePath, null, acl, CreateMode.PERSISTENT);
169 }
170 } catch (KeeperException.NodeExistsException e) {
171 if (LOG.isDebugEnabled()) {
172 LOG.debug("createExt: Znode " + path.substring(0, pos) +
173 " already exists");
174 }
175 }
176 }
177 return zooKeeper.create(path, data, acl, createMode);
178 } catch (KeeperException.ConnectionLossException e) {
179 LOG.warn("createExt: Connection loss on attempt " + attempt + ", " +
180 "waiting " + retryWaitMsecs + " msecs before retrying.", e);
181 }
182 ++attempt;
183 Thread.sleep(retryWaitMsecs);
184 }
185 throw new IllegalStateException("createExt: Failed to create " + path +
186 " after " + attempt + " tries!");
187 }
188
189
190
191
192 public static class PathStat {
193
194 private String path;
195
196 private Stat stat;
197
198
199
200
201
202
203
204 public PathStat(String path, Stat stat) {
205 this.path = path;
206 this.stat = stat;
207 }
208
209
210
211
212
213
214 public String getPath() {
215 return path;
216 }
217
218
219
220
221
222
223 public Stat getStat() {
224 return stat;
225 }
226 }
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241 public PathStat createOrSetExt(final String path,
242 byte[] data,
243 List<ACL> acl,
244 CreateMode createMode,
245 boolean recursive,
246 int version) throws KeeperException, InterruptedException {
247 String createdPath = null;
248 Stat setStat = null;
249 try {
250 createdPath = createExt(path, data, acl, createMode, recursive);
251 } catch (KeeperException.NodeExistsException e) {
252 if (LOG.isDebugEnabled()) {
253 LOG.debug("createOrSet: Node exists on path " + path);
254 }
255 setStat = zooKeeper.setData(path, data, version);
256 }
257 return new PathStat(createdPath, setStat);
258 }
259
260
261
262
263
264
265
266
267
268
269
270
271
272 public PathStat createOnceExt(final String path,
273 byte[] data,
274 List<ACL> acl,
275 CreateMode createMode,
276 boolean recursive) throws KeeperException, InterruptedException {
277 String createdPath = null;
278 Stat setStat = null;
279 try {
280 createdPath = createExt(path, data, acl, createMode, recursive);
281 } catch (KeeperException.NodeExistsException e) {
282 if (LOG.isDebugEnabled()) {
283 LOG.debug("createOnceExt: Node already exists on path " + path);
284 }
285 }
286 return new PathStat(createdPath, setStat);
287 }
288
289
290
291
292
293
294
295
296
297
298
299 public void deleteExt(final String path, int version, boolean recursive)
300 throws InterruptedException, KeeperException {
301 int attempt = 0;
302 while (attempt < maxRetryAttempts) {
303 try {
304 if (!recursive) {
305 zooKeeper.delete(path, version);
306 return;
307 }
308
309 try {
310 zooKeeper.delete(path, version);
311 return;
312 } catch (KeeperException.NotEmptyException e) {
313 if (LOG.isDebugEnabled()) {
314 LOG.debug("deleteExt: Cannot directly remove node " + path);
315 }
316 }
317
318 List<String> childList = zooKeeper.getChildren(path, false);
319 for (String child : childList) {
320 if (progressable != null) {
321 progressable.progress();
322 }
323 deleteExt(path + "/" + child, -1, true);
324 }
325
326 zooKeeper.delete(path, version);
327 return;
328 } catch (KeeperException.ConnectionLossException e) {
329 LOG.warn("deleteExt: Connection loss on attempt " +
330 attempt + ", waiting " + retryWaitMsecs +
331 " msecs before retrying.", e);
332 }
333 ++attempt;
334 Thread.sleep(retryWaitMsecs);
335 }
336 throw new IllegalStateException("deleteExt: Failed to delete " + path +
337 " after " + attempt + " tries!");
338 }
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358 public Stat exists(String path, boolean watch) throws KeeperException,
359 InterruptedException {
360 int attempt = 0;
361 while (attempt < maxRetryAttempts) {
362 try {
363 return zooKeeper.exists(path, watch);
364 } catch (KeeperException.ConnectionLossException e) {
365 LOG.warn("exists: Connection loss on attempt " +
366 attempt + ", waiting " + retryWaitMsecs +
367 " msecs before retrying.", e);
368 }
369 ++attempt;
370 Thread.sleep(retryWaitMsecs);
371 }
372 throw new IllegalStateException("exists: Failed to check " + path +
373 " after " + attempt + " tries!");
374 }
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393 public Stat exists(final String path, Watcher watcher)
394 throws KeeperException, InterruptedException {
395 int attempt = 0;
396 while (attempt < maxRetryAttempts) {
397 try {
398 return zooKeeper.exists(path, watcher);
399 } catch (KeeperException.ConnectionLossException e) {
400 LOG.warn("exists: Connection loss on attempt " +
401 attempt + ", waiting " + retryWaitMsecs +
402 " msecs before retrying.", e);
403 }
404 ++attempt;
405 Thread.sleep(retryWaitMsecs);
406 }
407 throw new IllegalStateException("exists: Failed to check " + path +
408 " after " + attempt + " tries!");
409 }
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431 public byte[] getData(final String path, Watcher watcher, Stat stat)
432 throws KeeperException, InterruptedException {
433 int attempt = 0;
434 while (attempt < maxRetryAttempts) {
435 try {
436 return zooKeeper.getData(path, watcher, stat);
437 } catch (KeeperException.ConnectionLossException e) {
438 LOG.warn("getData: Connection loss on attempt " +
439 attempt + ", waiting " + retryWaitMsecs +
440 " msecs before retrying.", e);
441 }
442 ++attempt;
443 Thread.sleep(retryWaitMsecs);
444 }
445 throw new IllegalStateException("getData: Failed to get " + path +
446 " after " + attempt + " tries!");
447 }
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468 public byte[] getData(String path, boolean watch, Stat stat)
469 throws KeeperException, InterruptedException {
470 int attempt = 0;
471 while (attempt < maxRetryAttempts) {
472 try {
473 return zooKeeper.getData(path, watch, stat);
474 } catch (KeeperException.ConnectionLossException e) {
475 LOG.warn("getData: Connection loss on attempt " +
476 attempt + ", waiting " + retryWaitMsecs +
477 " msecs before retrying.", e);
478 }
479 ++attempt;
480 Thread.sleep(retryWaitMsecs);
481 }
482 throw new IllegalStateException("getData: Failed to get " + path +
483 " after " + attempt + " tries!");
484 }
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499 public List<String> getChildrenExt(
500 final String path,
501 boolean watch,
502 boolean sequenceSorted,
503 boolean fullPath) throws KeeperException, InterruptedException {
504 int attempt = 0;
505 while (attempt < maxRetryAttempts) {
506 try {
507 List<String> childList = zooKeeper.getChildren(path, watch);
508
509 if (sequenceSorted) {
510 Collections.sort(childList, new Comparator<String>() {
511 public int compare(String s1, String s2) {
512 if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
513 (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
514 throw new RuntimeException(
515 "getChildrenExt: Invalid length for sequence " +
516 " sorting > " +
517 SEQUENCE_NUMBER_LENGTH +
518 " for s1 (" +
519 s1.length() + ") or s2 (" + s2.length() + ")");
520 }
521 int s1sequenceNumber = Integer.parseInt(
522 s1.substring(s1.length() -
523 SEQUENCE_NUMBER_LENGTH));
524 int s2sequenceNumber = Integer.parseInt(
525 s2.substring(s2.length() -
526 SEQUENCE_NUMBER_LENGTH));
527 return s1sequenceNumber - s2sequenceNumber;
528 }
529 });
530 }
531 if (fullPath) {
532 List<String> fullChildList = new ArrayList<String>();
533 for (String child : childList) {
534 fullChildList.add(path + "/" + child);
535 }
536 return fullChildList;
537 }
538 return childList;
539 } catch (KeeperException.ConnectionLossException e) {
540 LOG.warn("getChildrenExt: Connection loss on attempt " +
541 attempt + ", waiting " + retryWaitMsecs +
542 " msecs before retrying.", e);
543 }
544 ++attempt;
545 Thread.sleep(retryWaitMsecs);
546 }
547 throw new IllegalStateException("createExt: Failed to create " + path +
548 " after " + attempt + " tries!");
549 }
550
551
552
553
554
555
556
557
558
559 public void close() throws InterruptedException {
560 zooKeeper.close();
561 }
562 }