Coverage Report - org.apache.giraph.zk.ZooKeeperExt
 
Classes in this File Line Coverage Branch Coverage Complexity
ZooKeeperExt
0%
0/127
0%
0/46
4.562
ZooKeeperExt$1
0%
0/10
0%
0/4
4.562
ZooKeeperExt$PathStat
0%
0/6
N/A
4.562
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.zk;
 20  
 
 21  
 import java.io.IOException;
 22  
 
 23  
 import org.apache.hadoop.util.Progressable;
 24  
 import org.apache.log4j.Logger;
 25  
 import org.apache.zookeeper.KeeperException;
 26  
 import org.apache.zookeeper.CreateMode;
 27  
 import org.apache.zookeeper.data.ACL;
 28  
 import org.apache.zookeeper.data.Stat;
 29  
 
 30  
 import java.util.ArrayList;
 31  
 import java.util.Collections;
 32  
 import java.util.Comparator;
 33  
 import java.util.List;
 34  
 
 35  
 import org.apache.zookeeper.Watcher;
 36  
 import org.apache.zookeeper.ZooKeeper;
 37  
 
 38  
 /**
 39  
  * ZooKeeper provides only atomic operations.  ZooKeeperExt provides additional
 40  
  * non-atomic operations that are useful.  It also provides wrappers to
 41  
  * deal with ConnectionLossException.  All methods of this class
 42  
  * should be thread-safe.
 43  
  */
 44  
 public class ZooKeeperExt {
 45  
   /** Length of the ZK sequence number */
 46  
   public static final int SEQUENCE_NUMBER_LENGTH = 10;
 47  
   /** Internal logger */
 48  0
   private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
 49  
   /** Internal ZooKeeper */
 50  
   private final ZooKeeper zooKeeper;
 51  
   /** Ensure we have progress */
 52  
   private final Progressable progressable;
 53  
   /** Number of max attempts to retry when failing due to connection loss */
 54  
   private final int maxRetryAttempts;
 55  
   /** Milliseconds to wait before trying again due to connection loss */
 56  
   private final int retryWaitMsecs;
 57  
 
 58  
   /**
 59  
    * Constructor to connect to ZooKeeper, does not make progress
 60  
    *
 61  
    * @param connectString Comma separated host:port pairs, each corresponding
 62  
    *        to a zk server. e.g.
 63  
    *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
 64  
    *        chroot suffix is used the example would look
 65  
    *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
 66  
    *        where the client would be rooted at "/app/a" and all paths
 67  
    *        would be relative to this root - ie getting/setting/etc...
 68  
    *        "/foo/bar" would result in operations being run on
 69  
    *        "/app/a/foo/bar" (from the server perspective).
 70  
    * @param sessionTimeout Session timeout in milliseconds
 71  
    * @param maxRetryAttempts Max retry attempts during connection loss
 72  
    * @param retryWaitMsecs Msecs to wait when retrying due to connection
 73  
    *        loss
 74  
    * @param watcher A watcher object which will be notified of state changes,
 75  
    *        may also be notified for node events
 76  
    * @throws IOException
 77  
    */
 78  
   public ZooKeeperExt(String connectString,
 79  
                       int sessionTimeout,
 80  
                       int maxRetryAttempts,
 81  
                       int retryWaitMsecs,
 82  
                       Watcher watcher) throws IOException {
 83  0
     this(connectString, sessionTimeout, maxRetryAttempts,
 84  
         retryWaitMsecs, watcher, null);
 85  0
   }
 86  
 
 87  
   /**
 88  
    * Constructor to connect to ZooKeeper, make progress
 89  
    *
 90  
    * @param connectString Comma separated host:port pairs, each corresponding
 91  
    *        to a zk server. e.g.
 92  
    *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
 93  
    *        chroot suffix is used the example would look
 94  
    *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
 95  
    *        where the client would be rooted at "/app/a" and all paths
 96  
    *        would be relative to this root - ie getting/setting/etc...
 97  
    *        "/foo/bar" would result in operations being run on
 98  
    *        "/app/a/foo/bar" (from the server perspective).
 99  
    * @param sessionTimeout Session timeout in milliseconds
 100  
    * @param maxRetryAttempts Max retry attempts during connection loss
 101  
    * @param retryWaitMsecs Msecs to wait when retrying due to connection
 102  
    *        loss
 103  
    * @param watcher A watcher object which will be notified of state changes,
 104  
    *        may also be notified for node events
 105  
    * @param progressable Makes progress for longer operations
 106  
    * @throws IOException
 107  
    */
 108  
   public ZooKeeperExt(String connectString,
 109  
       int sessionTimeout,
 110  
       int maxRetryAttempts,
 111  
       int retryWaitMsecs,
 112  
       Watcher watcher,
 113  0
       Progressable progressable) throws IOException {
 114  0
     this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
 115  0
     this.progressable = progressable;
 116  0
     this.maxRetryAttempts = maxRetryAttempts;
 117  0
     this.retryWaitMsecs = retryWaitMsecs;
 118  0
   }
 119  
 
 120  
   /**
 121  
    * Provides a possibility of a creating a path consisting of more than one
 122  
    * znode (not atomic).  If recursive is false, operates exactly the
 123  
    * same as create().
 124  
    *
 125  
    * @param path path to create
 126  
    * @param data data to set on the final znode
 127  
    * @param acl acls on each znode created
 128  
    * @param createMode only affects the final znode
 129  
    * @param recursive if true, creates all ancestors
 130  
    * @return Actual created path
 131  
    * @throws KeeperException
 132  
    * @throws InterruptedException
 133  
    */
 134  
   public String createExt(
 135  
       final String path,
 136  
       byte[] data,
 137  
       List<ACL> acl,
 138  
       CreateMode createMode,
 139  
       boolean recursive) throws KeeperException, InterruptedException {
 140  0
     if (LOG.isDebugEnabled()) {
 141  0
       LOG.debug("createExt: Creating path " + path);
 142  
     }
 143  
 
 144  0
     int attempt = 0;
 145  0
     while (attempt < maxRetryAttempts) {
 146  
       try {
 147  0
         if (!recursive) {
 148  0
           return zooKeeper.create(path, data, acl, createMode);
 149  
         }
 150  
 
 151  
         try {
 152  0
           return zooKeeper.create(path, data, acl, createMode);
 153  0
         } catch (KeeperException.NoNodeException e) {
 154  0
           if (LOG.isDebugEnabled()) {
 155  0
             LOG.debug("createExt: Cannot directly create node " + path);
 156  
           }
 157  
         }
 158  
 
 159  0
         int pos = path.indexOf("/", 1);
 160  0
         for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
 161  
           try {
 162  0
             if (progressable != null) {
 163  0
               progressable.progress();
 164  
             }
 165  0
             String filePath = path.substring(0, pos);
 166  0
             if (zooKeeper.exists(filePath, false) == null) {
 167  0
               zooKeeper.create(
 168  
                   filePath, null, acl, CreateMode.PERSISTENT);
 169  
             }
 170  0
           } catch (KeeperException.NodeExistsException e) {
 171  0
             if (LOG.isDebugEnabled()) {
 172  0
               LOG.debug("createExt: Znode " + path.substring(0, pos) +
 173  
                   " already exists");
 174  
             }
 175  0
           }
 176  
         }
 177  0
         return zooKeeper.create(path, data, acl, createMode);
 178  0
       } catch (KeeperException.ConnectionLossException e) {
 179  0
         LOG.warn("createExt: Connection loss on attempt " + attempt + ", " +
 180  
             "waiting " + retryWaitMsecs + " msecs before retrying.", e);
 181  
       }
 182  0
       ++attempt;
 183  0
       Thread.sleep(retryWaitMsecs);
 184  
     }
 185  0
     throw new IllegalStateException("createExt: Failed to create " + path  +
 186  
         " after " + attempt + " tries!");
 187  
   }
 188  
 
 189  
   /**
 190  
    * Data structure for handling the output of createOrSet()
 191  
    */
 192  
   public static class PathStat {
 193  
     /** Path to created znode (if any) */
 194  
     private String path;
 195  
     /** Stat from set znode (if any) */
 196  
     private Stat stat;
 197  
 
 198  
     /**
 199  
      * Put in results from createOrSet()
 200  
      *
 201  
      * @param path Path to created znode (or null)
 202  
      * @param stat Stat from set znode (if set)
 203  
      */
 204  0
     public PathStat(String path, Stat stat) {
 205  0
       this.path = path;
 206  0
       this.stat = stat;
 207  0
     }
 208  
 
 209  
     /**
 210  
      * Get the path of the created znode if it was created.
 211  
      *
 212  
      * @return Path of created znode or null if not created
 213  
      */
 214  
     public String getPath() {
 215  0
       return path;
 216  
     }
 217  
 
 218  
     /**
 219  
      * Get the stat of the set znode if set
 220  
      *
 221  
      * @return Stat of set znode or null if not set
 222  
      */
 223  
     public Stat getStat() {
 224  0
       return stat;
 225  
     }
 226  
   }
 227  
 
 228  
   /**
 229  
    * Create a znode.  Set the znode if the created znode already exists.
 230  
    *
 231  
    * @param path path to create
 232  
    * @param data data to set on the final znode
 233  
    * @param acl acls on each znode created
 234  
    * @param createMode only affects the final znode
 235  
    * @param recursive if true, creates all ancestors
 236  
    * @param version Version to set if setting
 237  
    * @return Path of created znode or Stat of set znode
 238  
    * @throws InterruptedException
 239  
    * @throws KeeperException
 240  
    */
 241  
   public PathStat createOrSetExt(final String path,
 242  
       byte[] data,
 243  
       List<ACL> acl,
 244  
       CreateMode createMode,
 245  
       boolean recursive,
 246  
       int version) throws KeeperException, InterruptedException {
 247  0
     String createdPath = null;
 248  0
     Stat setStat = null;
 249  
     try {
 250  0
       createdPath = createExt(path, data, acl, createMode, recursive);
 251  0
     } catch (KeeperException.NodeExistsException e) {
 252  0
       if (LOG.isDebugEnabled()) {
 253  0
         LOG.debug("createOrSet: Node exists on path " + path);
 254  
       }
 255  0
       setStat = zooKeeper.setData(path, data, version);
 256  0
     }
 257  0
     return new PathStat(createdPath, setStat);
 258  
   }
 259  
 
 260  
   /**
 261  
    * Create a znode if there is no other znode there
 262  
    *
 263  
    * @param path path to create
 264  
    * @param data data to set on the final znode
 265  
    * @param acl acls on each znode created
 266  
    * @param createMode only affects the final znode
 267  
    * @param recursive if true, creates all ancestors
 268  
    * @return Path of created znode or Stat of set znode
 269  
    * @throws InterruptedException
 270  
    * @throws KeeperException
 271  
    */
 272  
   public PathStat createOnceExt(final String path,
 273  
       byte[] data,
 274  
       List<ACL> acl,
 275  
       CreateMode createMode,
 276  
       boolean recursive) throws KeeperException, InterruptedException {
 277  0
     String createdPath = null;
 278  0
     Stat setStat = null;
 279  
     try {
 280  0
       createdPath = createExt(path, data, acl, createMode, recursive);
 281  0
     } catch (KeeperException.NodeExistsException e) {
 282  0
       if (LOG.isDebugEnabled()) {
 283  0
         LOG.debug("createOnceExt: Node already exists on path " + path);
 284  
       }
 285  0
     }
 286  0
     return new PathStat(createdPath, setStat);
 287  
   }
 288  
 
 289  
   /**
 290  
    * Delete a path recursively.  When the deletion is recursive, it is a
 291  
    * non-atomic operation, hence, not part of ZooKeeper.
 292  
    * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
 293  
    * @param version expected version (-1 for all)
 294  
    * @param recursive if true, remove all children, otherwise behave like
 295  
    *        remove()
 296  
    * @throws InterruptedException
 297  
    * @throws KeeperException
 298  
    */
 299  
   public void deleteExt(final String path, int version, boolean recursive)
 300  
     throws InterruptedException, KeeperException {
 301  0
     int attempt = 0;
 302  0
     while (attempt < maxRetryAttempts) {
 303  
       try {
 304  0
         if (!recursive) {
 305  0
           zooKeeper.delete(path, version);
 306  0
           return;
 307  
         }
 308  
 
 309  
         try {
 310  0
           zooKeeper.delete(path, version);
 311  0
           return;
 312  0
         } catch (KeeperException.NotEmptyException e) {
 313  0
           if (LOG.isDebugEnabled()) {
 314  0
             LOG.debug("deleteExt: Cannot directly remove node " + path);
 315  
           }
 316  
         }
 317  
 
 318  0
         List<String> childList = zooKeeper.getChildren(path, false);
 319  0
         for (String child : childList) {
 320  0
           if (progressable != null) {
 321  0
             progressable.progress();
 322  
           }
 323  0
           deleteExt(path + "/" + child, -1, true);
 324  0
         }
 325  
 
 326  0
         zooKeeper.delete(path, version);
 327  0
         return;
 328  0
       } catch (KeeperException.ConnectionLossException e) {
 329  0
         LOG.warn("deleteExt: Connection loss on attempt " +
 330  
             attempt + ", waiting " + retryWaitMsecs +
 331  
             " msecs before retrying.", e);
 332  
       }
 333  0
       ++attempt;
 334  0
       Thread.sleep(retryWaitMsecs);
 335  
     }
 336  0
     throw new IllegalStateException("deleteExt: Failed to delete " + path  +
 337  
         " after " + attempt + " tries!");
 338  
   }
 339  
 
 340  
   /**
 341  
    * Return the stat of the node of the given path. Return null if no such a
 342  
    * node exists.
 343  
    * <p>
 344  
    * If the watch is true and the call is successful (no exception is thrown),
 345  
    * a watch will be left on the node with the given path. The watch will be
 346  
    * triggered by a successful operation that creates/delete the node or sets
 347  
    * the data on the node.
 348  
    *
 349  
    * @param path
 350  
    *                the node path
 351  
    * @param watch
 352  
    *                whether need to watch this node
 353  
    * @return the stat of the node of the given path; return null if no such a
 354  
    *         node exists.
 355  
    * @throws KeeperException If the server signals an error
 356  
    * @throws InterruptedException If the server transaction is interrupted.
 357  
    */
 358  
   public Stat exists(String path, boolean watch) throws KeeperException,
 359  
       InterruptedException {
 360  0
     int attempt = 0;
 361  0
     while (attempt < maxRetryAttempts) {
 362  
       try {
 363  0
         return zooKeeper.exists(path, watch);
 364  0
       } catch (KeeperException.ConnectionLossException e) {
 365  0
         LOG.warn("exists: Connection loss on attempt " +
 366  
             attempt + ", waiting " + retryWaitMsecs +
 367  
             " msecs before retrying.", e);
 368  
       }
 369  0
       ++attempt;
 370  0
       Thread.sleep(retryWaitMsecs);
 371  
     }
 372  0
     throw new IllegalStateException("exists: Failed to check " + path  +
 373  
         " after " + attempt + " tries!");
 374  
   }
 375  
 
 376  
   /**
 377  
    * Return the stat of the node of the given path. Return null if no such a
 378  
    * node exists.
 379  
    * <p>
 380  
    * If the watch is non-null and the call is successful (no exception is
 381  
    * thrown), a watch will be left on the node with the given path. The
 382  
    * watch will be triggered by a successful operation that
 383  
    * creates/delete the node or sets the data on the node.
 384  
    *
 385  
    * @param path the node path
 386  
    * @param watcher explicit watcher
 387  
    * @return the stat of the node of the given path; return null if no such a
 388  
    *         node exists.
 389  
    * @throws KeeperException If the server signals an error
 390  
    * @throws InterruptedException If the server transaction is interrupted.
 391  
    * @throws IllegalArgumentException if an invalid path is specified
 392  
    */
 393  
   public Stat exists(final String path, Watcher watcher)
 394  
     throws KeeperException, InterruptedException {
 395  0
     int attempt = 0;
 396  0
     while (attempt < maxRetryAttempts) {
 397  
       try {
 398  0
         return zooKeeper.exists(path, watcher);
 399  0
       } catch (KeeperException.ConnectionLossException e) {
 400  0
         LOG.warn("exists: Connection loss on attempt " +
 401  
             attempt + ", waiting " + retryWaitMsecs +
 402  
             " msecs before retrying.", e);
 403  
       }
 404  0
       ++attempt;
 405  0
       Thread.sleep(retryWaitMsecs);
 406  
     }
 407  0
     throw new IllegalStateException("exists: Failed to check " + path  +
 408  
         " after " + attempt + " tries!");
 409  
   }
 410  
 
 411  
   /**
 412  
    * Return the data and the stat of the node of the given path.
 413  
    * <p>
 414  
    * If the watch is non-null and the call is successful (no exception is
 415  
    * thrown), a watch will be left on the node with the given path. The watch
 416  
    * will be triggered by a successful operation that sets data on the node, or
 417  
    * deletes the node.
 418  
    * <p>
 419  
    * A KeeperException with error code KeeperException.NoNode will be thrown
 420  
    * if no node with the given path exists.
 421  
    *
 422  
    * @param path the given path
 423  
    * @param watcher explicit watcher
 424  
    * @param stat the stat of the node
 425  
    * @return the data of the node
 426  
    * @throws KeeperException If the server signals an error with a non-zero
 427  
    *         error code
 428  
    * @throws InterruptedException If the server transaction is interrupted.
 429  
    * @throws IllegalArgumentException if an invalid path is specified
 430  
    */
 431  
   public byte[] getData(final String path, Watcher watcher, Stat stat)
 432  
     throws KeeperException, InterruptedException {
 433  0
     int attempt = 0;
 434  0
     while (attempt < maxRetryAttempts) {
 435  
       try {
 436  0
         return zooKeeper.getData(path, watcher, stat);
 437  0
       } catch (KeeperException.ConnectionLossException e) {
 438  0
         LOG.warn("getData: Connection loss on attempt " +
 439  
             attempt + ", waiting " + retryWaitMsecs +
 440  
             " msecs before retrying.", e);
 441  
       }
 442  0
       ++attempt;
 443  0
       Thread.sleep(retryWaitMsecs);
 444  
     }
 445  0
     throw new IllegalStateException("getData: Failed to get " + path  +
 446  
         " after " + attempt + " tries!");
 447  
   }
 448  
 
 449  
   /**
 450  
    * Return the data and the stat of the node of the given path.
 451  
    * <p>
 452  
    * If the watch is true and the call is successful (no exception is
 453  
    * thrown), a watch will be left on the node with the given path. The watch
 454  
    * will be triggered by a successful operation that sets data on the node, or
 455  
    * deletes the node.
 456  
    * <p>
 457  
    * A KeeperException with error code KeeperException.NoNode will be thrown
 458  
    * if no node with the given path exists.
 459  
    *
 460  
    * @param path the given path
 461  
    * @param watch whether need to watch this node
 462  
    * @param stat the stat of the node
 463  
    * @return the data of the node
 464  
    * @throws KeeperException If the server signals an error with a non-zero
 465  
    *         error code
 466  
    * @throws InterruptedException If the server transaction is interrupted.
 467  
    */
 468  
   public byte[] getData(String path, boolean watch, Stat stat)
 469  
     throws KeeperException, InterruptedException {
 470  0
     int attempt = 0;
 471  0
     while (attempt < maxRetryAttempts) {
 472  
       try {
 473  0
         return zooKeeper.getData(path, watch, stat);
 474  0
       } catch (KeeperException.ConnectionLossException e) {
 475  0
         LOG.warn("getData: Connection loss on attempt " +
 476  
             attempt + ", waiting " + retryWaitMsecs +
 477  
             " msecs before retrying.", e);
 478  
       }
 479  0
       ++attempt;
 480  0
       Thread.sleep(retryWaitMsecs);
 481  
     }
 482  0
     throw new IllegalStateException("getData: Failed to get " + path  +
 483  
         " after " + attempt + " tries!");
 484  
   }
 485  
 
 486  
   /**
 487  
    * Get the children of the path with extensions.
 488  
    * Extension 1: Sort the children based on sequence number
 489  
    * Extension 2: Get the full path instead of relative path
 490  
    *
 491  
    * @param path path to znode
 492  
    * @param watch set the watch?
 493  
    * @param sequenceSorted sort by the sequence number
 494  
    * @param fullPath if true, get the fully znode path back
 495  
    * @return list of children
 496  
    * @throws InterruptedException
 497  
    * @throws KeeperException
 498  
    */
 499  
   public List<String> getChildrenExt(
 500  
       final String path,
 501  
       boolean watch,
 502  
       boolean sequenceSorted,
 503  
       boolean fullPath) throws KeeperException, InterruptedException {
 504  0
     int attempt = 0;
 505  0
     while (attempt < maxRetryAttempts) {
 506  
       try {
 507  0
         List<String> childList = zooKeeper.getChildren(path, watch);
 508  
         /* Sort children according to the sequence number, if desired */
 509  0
         if (sequenceSorted) {
 510  0
           Collections.sort(childList, new Comparator<String>() {
 511  
             public int compare(String s1, String s2) {
 512  0
               if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
 513  0
                   (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
 514  0
                 throw new RuntimeException(
 515  
                     "getChildrenExt: Invalid length for sequence " +
 516  
                         " sorting > " +
 517  
                         SEQUENCE_NUMBER_LENGTH +
 518  
                         " for s1 (" +
 519  0
                         s1.length() + ") or s2 (" + s2.length() + ")");
 520  
               }
 521  0
               int s1sequenceNumber = Integer.parseInt(
 522  0
                   s1.substring(s1.length() -
 523  
                       SEQUENCE_NUMBER_LENGTH));
 524  0
               int s2sequenceNumber = Integer.parseInt(
 525  0
                   s2.substring(s2.length() -
 526  
                       SEQUENCE_NUMBER_LENGTH));
 527  0
               return s1sequenceNumber - s2sequenceNumber;
 528  
             }
 529  
           });
 530  
         }
 531  0
         if (fullPath) {
 532  0
           List<String> fullChildList = new ArrayList<String>();
 533  0
           for (String child : childList) {
 534  0
             fullChildList.add(path + "/" + child);
 535  0
           }
 536  0
           return fullChildList;
 537  
         }
 538  0
         return childList;
 539  0
       } catch (KeeperException.ConnectionLossException e) {
 540  0
         LOG.warn("getChildrenExt: Connection loss on attempt " +
 541  
             attempt + ", waiting " + retryWaitMsecs +
 542  
             " msecs before retrying.", e);
 543  
       }
 544  0
       ++attempt;
 545  0
       Thread.sleep(retryWaitMsecs);
 546  
     }
 547  0
     throw new IllegalStateException("createExt: Failed to create " + path  +
 548  
         " after " + attempt + " tries!");
 549  
   }
 550  
 
 551  
   /**
 552  
    * Close this client object. Once the client is closed, its session becomes
 553  
    * invalid. All the ephemeral nodes in the ZooKeeper server associated with
 554  
    * the session will be removed. The watches left on those nodes (and on
 555  
    * their parents) will be triggered.
 556  
    *
 557  
    * @throws InterruptedException
 558  
    */
 559  
   public void close() throws InterruptedException {
 560  0
     zooKeeper.close();
 561  0
   }
 562  
 }