Coverage Report - org.apache.giraph.zk.ZooKeeperManager
 
Classes in this File Line Coverage Branch Coverage Complexity
ZooKeeperManager
0%
0/300
0%
0/98
4.857
ZooKeeperManager$State
0%
0/3
N/A
4.857
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.zk;
 20  
 
 21  
 import com.google.common.util.concurrent.Uninterruptibles;
 22  
 import org.apache.commons.io.FileUtils;
 23  
 import org.apache.giraph.conf.GiraphConstants;
 24  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 25  
 import org.apache.giraph.time.SystemTime;
 26  
 import org.apache.giraph.time.Time;
 27  
 import org.apache.hadoop.conf.Configuration;
 28  
 import org.apache.hadoop.fs.FileStatus;
 29  
 import org.apache.hadoop.fs.FileSystem;
 30  
 import org.apache.hadoop.fs.Path;
 31  
 import org.apache.hadoop.mapreduce.Mapper;
 32  
 import org.apache.log4j.Logger;
 33  
 
 34  
 import java.io.File;
 35  
 import java.io.IOException;
 36  
 import java.net.ConnectException;
 37  
 import java.net.InetSocketAddress;
 38  
 import java.net.Socket;
 39  
 import java.net.SocketTimeoutException;
 40  
 import java.util.Arrays;
 41  
 import java.util.concurrent.TimeUnit;
 42  
 
 43  
 import static com.google.common.base.Preconditions.checkState;
 44  
 import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY;
 45  
 import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY;
 46  
 
 47  
 
 48  
 /**
 49  
  * Manages the election of ZooKeeper servers, starting/stopping the services,
 50  
  * etc.
 51  
  */
 52  
 public class ZooKeeperManager {
 53  
   /** Class logger */
 54  0
   private static final Logger LOG = Logger.getLogger(ZooKeeperManager.class);
 55  
   /** Separates the hostname and task in the candidate stamp */
 56  
   private static final String HOSTNAME_TASK_SEPARATOR = " ";
 57  
   /** The ZooKeeperString filename prefix */
 58  
   private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX =
 59  
       "zkServerList_";
 60  
   /** Job context (mainly for progress) */
 61  
   private Mapper<?, ?, ?, ?>.Context context;
 62  
   /** Hadoop configuration */
 63  
   private final ImmutableClassesGiraphConfiguration conf;
 64  
   /** Task partition, to ensure uniqueness */
 65  
   private final int taskPartition;
 66  
   /** HDFS base directory for all file-based coordination */
 67  
   private final Path baseDirectory;
 68  
   /**
 69  
    * HDFS task ZooKeeper candidate/completed
 70  
    * directory for all file-based coordination
 71  
    */
 72  
   private final Path taskDirectory;
 73  
   /**
 74  
    * HDFS ZooKeeper server ready/done directory
 75  
    * for all file-based coordination
 76  
    */
 77  
   private final Path serverDirectory;
 78  
   /** HDFS path to whether the task is done */
 79  
   private final Path myClosedPath;
 80  
   /** Polling msecs timeout */
 81  
   private final int pollMsecs;
 82  
   /** File system */
 83  
   private final FileSystem fs;
 84  
   /** Zookeeper wrapper */
 85  
   private ZooKeeperRunner zkRunner;
 86  
   /** ZooKeeper local file system directory */
 87  
   private final String zkDir;
 88  
   /** ZooKeeper config file path */
 89  
   private final ZookeeperConfig config;
 90  
   /** ZooKeeper server host */
 91  
   private String zkServerHost;
 92  
   /** ZooKeeper server task */
 93  
   private int zkServerTask;
 94  
   /** ZooKeeper base port */
 95  
   private int zkBasePort;
 96  
   /** Final ZooKeeper server port list (for clients) */
 97  
   private String zkServerPortString;
 98  
   /** My hostname */
 99  0
   private String myHostname = null;
 100  
   /** Job id, to ensure uniqueness */
 101  
   private final String jobId;
 102  
   /** Time object for tracking timeouts */
 103  0
   private final Time time = SystemTime.get();
 104  
 
 105  
   /** State of the application */
 106  0
   public enum State {
 107  
     /** Failure occurred */
 108  0
     FAILED,
 109  
     /** Application finished */
 110  0
     FINISHED
 111  
   }
 112  
 
 113  
   /**
 114  
    * Constructor with context.
 115  
    *
 116  
    * @param context Context to be stored internally
 117  
    * @param configuration Configuration
 118  
    * @throws IOException
 119  
    */
 120  
   public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context,
 121  
                           ImmutableClassesGiraphConfiguration configuration)
 122  0
     throws IOException {
 123  0
     this.context = context;
 124  0
     this.conf = configuration;
 125  0
     taskPartition = conf.getTaskPartition();
 126  0
     jobId = conf.getJobId();
 127  0
     baseDirectory =
 128  0
         new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf,
 129  0
             getFinalZooKeeperPath()));
 130  0
     taskDirectory = new Path(baseDirectory,
 131  
         "_task");
 132  0
     serverDirectory = new Path(baseDirectory,
 133  
         "_zkServer");
 134  0
     myClosedPath = new Path(taskDirectory,
 135  0
         (new ComputationDoneName(taskPartition)).getName());
 136  0
     pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf);
 137  0
     String jobLocalDir = conf.get("job.local.dir");
 138  
     String zkDirDefault;
 139  0
     if (jobLocalDir != null) { // for non-local jobs
 140  0
       zkDirDefault = jobLocalDir +
 141  
           "/_bspZooKeeper";
 142  
     } else {
 143  0
       zkDirDefault = System.getProperty("user.dir") + "/" +
 144  0
               ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue();
 145  
     }
 146  0
     zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault);
 147  0
     config = new ZookeeperConfig();
 148  0
     zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(conf);
 149  
 
 150  0
     myHostname = conf.getLocalHostname();
 151  0
     fs = FileSystem.get(conf);
 152  0
   }
 153  
 
 154  
   /**
 155  
    * Generate the final ZooKeeper coordination directory on HDFS
 156  
    *
 157  
    * @return directory path with job id
 158  
    */
 159  
   private String getFinalZooKeeperPath() {
 160  0
     return ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue() + "/" + jobId;
 161  
   }
 162  
 
 163  
   /**
 164  
    * Return the base ZooKeeper ZNode from which all other ZNodes Giraph creates
 165  
    * should be sited, for instance in a multi-tenant ZooKeeper, the znode
 166  
    * reserved for Giraph
 167  
    *
 168  
    * @param conf  Necessary to access user-provided values
 169  
    * @return  String of path without trailing slash
 170  
    */
 171  
   public static String getBasePath(Configuration conf) {
 172  0
     String result = conf.get(BASE_ZNODE_KEY, "");
 173  0
     if (!result.equals("") && !result.startsWith("/")) {
 174  0
       throw new IllegalArgumentException("Value for " +
 175  
           BASE_ZNODE_KEY + " must start with /: " + result);
 176  
     }
 177  
 
 178  0
     return result;
 179  
   }
 180  
 
 181  
   /**
 182  
    * Create the candidate stamps and decide on the servers to start if
 183  
    * you are partition 0.
 184  
    *
 185  
    * @throws IOException
 186  
    * @throws InterruptedException
 187  
    */
 188  
   public void setup() throws IOException, InterruptedException {
 189  0
     createCandidateStamp();
 190  0
     getZooKeeperServerList();
 191  0
   }
 192  
 
 193  
   /**
 194  
    * Create a HDFS stamp for this task.  If another task already
 195  
    * created it, then this one will fail, which is fine.
 196  
    */
 197  
   public void createCandidateStamp() {
 198  
     try {
 199  0
       fs.mkdirs(baseDirectory);
 200  0
       LOG.info("createCandidateStamp: Made the directory " +
 201  
           baseDirectory);
 202  0
     } catch (IOException e) {
 203  0
       LOG.error("createCandidateStamp: Failed to mkdirs " +
 204  
           baseDirectory);
 205  0
     }
 206  
     try {
 207  0
       fs.mkdirs(serverDirectory);
 208  0
       LOG.info("createCandidateStamp: Made the directory " +
 209  
           serverDirectory);
 210  0
     } catch (IOException e) {
 211  0
       LOG.error("createCandidateStamp: Failed to mkdirs " +
 212  
           serverDirectory);
 213  0
     }
 214  
     // Check that the base directory exists and is a directory
 215  
     try {
 216  0
       if (!fs.getFileStatus(baseDirectory).isDir()) {
 217  0
         throw new IllegalArgumentException(
 218  
             "createCandidateStamp: " + baseDirectory +
 219  
             " is not a directory, but should be.");
 220  
       }
 221  0
     } catch (IOException e) {
 222  0
       throw new IllegalArgumentException(
 223  
           "createCandidateStamp: Couldn't get file status " +
 224  
               "for base directory " + baseDirectory + ".  If there is an " +
 225  
               "issue with this directory, please set an accesible " +
 226  
               "base directory with the Hadoop configuration option " +
 227  0
               ZOOKEEPER_MANAGER_DIRECTORY.getKey(), e);
 228  0
     }
 229  
 
 230  0
     Path myCandidacyPath = new Path(
 231  
         taskDirectory, myHostname +
 232  
         HOSTNAME_TASK_SEPARATOR + taskPartition);
 233  
     try {
 234  0
       if (LOG.isInfoEnabled()) {
 235  0
         LOG.info("createCandidateStamp: Creating my filestamp " +
 236  
             myCandidacyPath);
 237  
       }
 238  0
       fs.createNewFile(myCandidacyPath);
 239  0
     } catch (IOException e) {
 240  0
       LOG.error("createCandidateStamp: Failed (maybe previous task " +
 241  
           "failed) to create filestamp " + myCandidacyPath, e);
 242  0
     }
 243  0
   }
 244  
 
 245  
   /**
 246  
    * Create a new file with retries if it fails.
 247  
    *
 248  
    * @param fs File system where the new file is created
 249  
    * @param path Path of the new file
 250  
    * @param maxAttempts Maximum number of attempts
 251  
    * @param retryWaitMsecs Milliseconds to wait before retrying
 252  
    */
 253  
   private static void createNewFileWithRetries(
 254  
       FileSystem fs, Path path, int maxAttempts, int retryWaitMsecs) {
 255  0
     int attempt = 0;
 256  0
     while (attempt < maxAttempts) {
 257  
       try {
 258  0
         fs.createNewFile(path);
 259  0
         return;
 260  0
       } catch (IOException e) {
 261  0
         LOG.warn("createNewFileWithRetries: Failed to create file at path " +
 262  
             path + " on attempt " + attempt + " of " + maxAttempts + ".", e);
 263  
       }
 264  0
       ++attempt;
 265  0
       Uninterruptibles.sleepUninterruptibly(
 266  
           retryWaitMsecs, TimeUnit.MILLISECONDS);
 267  
     }
 268  0
     throw new IllegalStateException(
 269  
         "createNewFileWithRetries: Failed to create file at path " +
 270  
             path + " after " + attempt + " attempts");
 271  
   }
 272  
 
 273  
   /**
 274  
    * Every task must create a stamp to let the ZooKeeper servers know that
 275  
    * they can shutdown.  This also lets the task know that it was already
 276  
    * completed.
 277  
    */
 278  
   private void createZooKeeperClosedStamp() {
 279  0
     LOG.info("createZooKeeperClosedStamp: Creating my filestamp " +
 280  
         myClosedPath);
 281  0
     createNewFileWithRetries(fs, myClosedPath,
 282  0
         conf.getHdfsFileCreationRetries(),
 283  0
         conf.getHdfsFileCreationRetryWaitMs());
 284  0
   }
 285  
 
 286  
   /**
 287  
    * Check if all the computation is done.
 288  
    * @return true if all computation is done.
 289  
    */
 290  
   public boolean computationDone() {
 291  
     try {
 292  0
       return fs.exists(myClosedPath);
 293  0
     } catch (IOException e) {
 294  0
       throw new RuntimeException(e);
 295  
     }
 296  
   }
 297  
 
 298  
   /**
 299  
    * Task 0 will call this to create the ZooKeeper server list.  The result is
 300  
    * a file that describes the ZooKeeper servers through the filename.
 301  
    *
 302  
    * @throws IOException
 303  
    * @throws InterruptedException
 304  
    */
 305  
   private void createZooKeeperServerList() throws IOException,
 306  
       InterruptedException {
 307  
     String host;
 308  
     String task;
 309  
     while (true) {
 310  0
       FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
 311  0
       if (fileStatusArray.length > 0) {
 312  0
         FileStatus fileStatus = fileStatusArray[0];
 313  0
         String[] hostnameTaskArray =
 314  0
             fileStatus.getPath().getName().split(
 315  
                 HOSTNAME_TASK_SEPARATOR);
 316  0
         checkState(hostnameTaskArray.length == 2,
 317  
             "createZooKeeperServerList: Task 0 failed " +
 318  0
             "to parse " + fileStatus.getPath().getName());
 319  0
         host = hostnameTaskArray[0];
 320  0
         task = hostnameTaskArray[1];
 321  0
         break;
 322  
       }
 323  0
       Thread.sleep(pollMsecs);
 324  0
     }
 325  0
     String serverListFile =
 326  
         ZOOKEEPER_SERVER_LIST_FILE_PREFIX + host +
 327  
         HOSTNAME_TASK_SEPARATOR + task;
 328  0
     Path serverListPath =
 329  
         new Path(baseDirectory, serverListFile);
 330  0
     if (LOG.isInfoEnabled()) {
 331  0
       LOG.info("createZooKeeperServerList: Creating the final " +
 332  
           "ZooKeeper file '" + serverListPath + "'");
 333  
     }
 334  0
     fs.createNewFile(serverListPath);
 335  0
   }
 336  
 
 337  
   /**
 338  
    * Make an attempt to get the server list file by looking for a file in
 339  
    * the appropriate directory with the prefix
 340  
    * ZOOKEEPER_SERVER_LIST_FILE_PREFIX.
 341  
    * @return null if not found or the filename if found
 342  
    * @throws IOException
 343  
    */
 344  
   private String getServerListFile() throws IOException {
 345  0
     String serverListFile = null;
 346  0
     FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
 347  0
     for (FileStatus fileStatus : fileStatusArray) {
 348  0
       if (fileStatus.getPath().getName().startsWith(
 349  
           ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
 350  0
         serverListFile = fileStatus.getPath().getName();
 351  0
         break;
 352  
       }
 353  
     }
 354  0
     return serverListFile;
 355  
   }
 356  
 
 357  
   /**
 358  
    * Task 0 is the designated master and will generate the server list
 359  
    * (unless it has already done so).  Other
 360  
    * tasks will consume the file after it is created (just the filename).
 361  
    * @throws IOException
 362  
    * @throws InterruptedException
 363  
    */
 364  
   private void getZooKeeperServerList() throws IOException,
 365  
       InterruptedException {
 366  
     String serverListFile;
 367  
 
 368  0
     if (taskPartition == 0) {
 369  0
       serverListFile = getServerListFile();
 370  0
       if (serverListFile == null) {
 371  0
         createZooKeeperServerList();
 372  
       }
 373  
     }
 374  
 
 375  
     while (true) {
 376  0
       serverListFile = getServerListFile();
 377  0
       if (LOG.isInfoEnabled()) {
 378  0
         LOG.info("getZooKeeperServerList: For task " + taskPartition +
 379  
             ", got file '" + serverListFile +
 380  
             "' (polling period is " +
 381  
             pollMsecs + ")");
 382  
       }
 383  0
       if (serverListFile != null) {
 384  0
         break;
 385  
       }
 386  
       try {
 387  0
         Thread.sleep(pollMsecs);
 388  0
       } catch (InterruptedException e) {
 389  0
         LOG.warn("getZooKeeperServerList: Strange interrupted " +
 390  0
             "exception " + e.getMessage());
 391  0
       }
 392  
 
 393  
     }
 394  
 
 395  0
     String[] serverHostList = serverListFile.substring(
 396  0
         ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
 397  
             HOSTNAME_TASK_SEPARATOR);
 398  0
     if (LOG.isInfoEnabled()) {
 399  0
       LOG.info("getZooKeeperServerList: Found " +
 400  0
           Arrays.toString(serverHostList) +
 401  
           " hosts in filename '" + serverListFile + "'");
 402  
     }
 403  
 
 404  0
     zkServerHost = serverHostList[0];
 405  0
     zkServerTask = Integer.parseInt(serverHostList[1]);
 406  0
     updateZkPortString();
 407  0
   }
 408  
 
 409  
   /**
 410  
    * Update zookeeper host:port string.
 411  
    */
 412  
   private void updateZkPortString() {
 413  0
     zkServerPortString = zkServerHost + ":" + zkBasePort;
 414  0
   }
 415  
 
 416  
   /**
 417  
    * Users can get the server port string to connect to ZooKeeper
 418  
    * @return server port string - comma separated
 419  
    */
 420  
   public String getZooKeeperServerPortString() {
 421  0
     return zkServerPortString;
 422  
   }
 423  
 
 424  
   /**
 425  
    * Whoever is elected to be a ZooKeeper server must generate a config file
 426  
    * locally.
 427  
    *
 428  
    */
 429  
   private void generateZooKeeperConfig() {
 430  0
     if (LOG.isInfoEnabled()) {
 431  0
       LOG.info("generateZooKeeperConfig: with base port " +
 432  
           zkBasePort);
 433  
     }
 434  0
     File zkDirFile = new File(this.zkDir);
 435  0
     boolean mkDirRet = zkDirFile.mkdirs();
 436  0
     if (LOG.isInfoEnabled()) {
 437  0
       LOG.info("generateZooKeeperConfigFile: Make directory of " +
 438  0
           zkDirFile.getName() + " = " + mkDirRet);
 439  
     }
 440  
     /** Set zookeeper system properties */
 441  0
     System.setProperty("zookeeper.snapCount",
 442  0
         Integer.toString(GiraphConstants.DEFAULT_ZOOKEEPER_SNAP_COUNT));
 443  0
     System.setProperty("zookeeper.forceSync",
 444  0
         GiraphConstants.ZOOKEEPER_FORCE_SYNC.get(conf) ? "yes" : "no");
 445  0
     System.setProperty("zookeeper.skipACL",
 446  0
         GiraphConstants.ZOOKEEPER_SKIP_ACL.get(conf) ? "yes" : "no");
 447  
 
 448  0
     config.setDataDir(zkDir);
 449  0
     config.setDataLogDir(zkDir);
 450  0
     config.setClientPortAddress(new InetSocketAddress(zkBasePort));
 451  0
     config.setMinSessionTimeout(conf.getZooKeeperMinSessionTimeout());
 452  0
     config.setMaxSessionTimeout(conf.getZooKeeperMaxSessionTimeout());
 453  
 
 454  0
   }
 455  
 
 456  
   /**
 457  
    * If this task has been selected, online a ZooKeeper server.  Otherwise,
 458  
    * wait until this task knows that the ZooKeeper servers have been onlined.
 459  
    *
 460  
    * @throws IOException
 461  
    */
 462  
   public void onlineZooKeeperServer() throws IOException {
 463  0
     if (zkServerTask == taskPartition) {
 464  0
       File zkDirFile = new File(this.zkDir);
 465  
       try {
 466  0
         if (LOG.isInfoEnabled()) {
 467  0
           LOG.info("onlineZooKeeperServers: Trying to delete old " +
 468  
               "directory " + this.zkDir);
 469  
         }
 470  0
         FileUtils.deleteDirectory(zkDirFile);
 471  0
       } catch (IOException e) {
 472  0
         LOG.warn("onlineZooKeeperServers: Failed to delete " +
 473  
             "directory " + this.zkDir, e);
 474  0
       }
 475  0
       generateZooKeeperConfig();
 476  0
       synchronized (this) {
 477  0
         zkRunner = createRunner();
 478  0
         int port = zkRunner.start(zkDir, config);
 479  0
         if (port > 0) {
 480  0
           zkBasePort = port;
 481  0
           updateZkPortString();
 482  
         }
 483  0
       }
 484  
 
 485  
       // Once the server is up and running, notify that this server is up
 486  
       // and running by dropping a ready stamp.
 487  0
       int connectAttempts = 0;
 488  0
       final int maxConnectAttempts =
 489  0
           conf.getZookeeperConnectionAttempts();
 490  0
       while (connectAttempts < maxConnectAttempts) {
 491  
         try {
 492  0
           if (LOG.isInfoEnabled()) {
 493  0
             LOG.info("onlineZooKeeperServers: Connect attempt " +
 494  
                 connectAttempts + " of " +
 495  
                 maxConnectAttempts +
 496  
                 " max trying to connect to " +
 497  
                 myHostname + ":" + zkBasePort +
 498  
                 " with poll msecs = " + pollMsecs);
 499  
           }
 500  0
           InetSocketAddress zkServerAddress =
 501  
               new InetSocketAddress(myHostname, zkBasePort);
 502  0
           Socket testServerSock = new Socket();
 503  0
           testServerSock.connect(zkServerAddress, 5000);
 504  0
           if (LOG.isInfoEnabled()) {
 505  0
             LOG.info("onlineZooKeeperServers: Connected to " +
 506  
                 zkServerAddress + "!");
 507  
           }
 508  0
           break;
 509  0
         } catch (SocketTimeoutException e) {
 510  0
           LOG.warn("onlineZooKeeperServers: Got " +
 511  
               "SocketTimeoutException", e);
 512  0
         } catch (ConnectException e) {
 513  0
           LOG.warn("onlineZooKeeperServers: Got " +
 514  
               "ConnectException", e);
 515  0
         } catch (IOException e) {
 516  0
           LOG.warn("onlineZooKeeperServers: Got " +
 517  
               "IOException", e);
 518  0
         }
 519  
 
 520  0
         ++connectAttempts;
 521  
         try {
 522  0
           Thread.sleep(pollMsecs);
 523  0
         } catch (InterruptedException e) {
 524  0
           LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs +
 525  0
               " interrupted - " + e.getMessage());
 526  0
         }
 527  
       }
 528  0
       if (connectAttempts == maxConnectAttempts) {
 529  0
         throw new IllegalStateException(
 530  
             "onlineZooKeeperServers: Failed to connect in " +
 531  
                 connectAttempts + " tries!");
 532  
       }
 533  0
       Path myReadyPath = new Path(
 534  
           serverDirectory, myHostname +
 535  
           HOSTNAME_TASK_SEPARATOR + taskPartition +
 536  
           HOSTNAME_TASK_SEPARATOR + zkBasePort);
 537  
       try {
 538  0
         if (LOG.isInfoEnabled()) {
 539  0
           LOG.info("onlineZooKeeperServers: Creating my filestamp " +
 540  
               myReadyPath);
 541  
         }
 542  0
         fs.createNewFile(myReadyPath);
 543  0
       } catch (IOException e) {
 544  0
         LOG.error("onlineZooKeeperServers: Failed (maybe previous " +
 545  
             "task failed) to create filestamp " + myReadyPath, e);
 546  0
       }
 547  0
     } else {
 548  0
       int readyRetrievalAttempt = 0;
 549  0
       String foundServer = null;
 550  
       while (true) {
 551  
         try {
 552  0
           FileStatus [] fileStatusArray =
 553  0
               fs.listStatus(serverDirectory);
 554  0
           if ((fileStatusArray != null) &&
 555  
               (fileStatusArray.length > 0)) {
 556  0
             for (int i = 0; i < fileStatusArray.length; ++i) {
 557  0
               String[] hostnameTaskArray =
 558  0
                   fileStatusArray[i].getPath().getName().split(
 559  
                       HOSTNAME_TASK_SEPARATOR);
 560  0
               if (hostnameTaskArray.length != 3) {
 561  0
                 throw new RuntimeException(
 562  
                     "getZooKeeperServerList: Task 0 failed " +
 563  
                         "to parse " +
 564  0
                         fileStatusArray[i].getPath().getName());
 565  
               }
 566  0
               foundServer = hostnameTaskArray[0];
 567  0
               zkBasePort = Integer.parseInt(hostnameTaskArray[2]);
 568  0
               updateZkPortString();
 569  
             }
 570  0
             if (LOG.isInfoEnabled()) {
 571  0
               LOG.info("onlineZooKeeperServers: Got " +
 572  
                   foundServer + " on port " +
 573  
                   zkBasePort +
 574  
                   " (polling period is " +
 575  
                   pollMsecs + ") on attempt " +
 576  
                   readyRetrievalAttempt);
 577  
             }
 578  0
             if (zkServerHost.equals(foundServer)) {
 579  0
               break;
 580  
             }
 581  
           } else {
 582  0
             if (LOG.isInfoEnabled()) {
 583  0
               LOG.info("onlineZooKeeperServers: Empty " +
 584  
                   "directory " + serverDirectory +
 585  
                   ", waiting " + pollMsecs + " msecs.");
 586  
             }
 587  
           }
 588  0
           Thread.sleep(pollMsecs);
 589  0
           ++readyRetrievalAttempt;
 590  0
         } catch (IOException e) {
 591  0
           throw new RuntimeException(e);
 592  0
         } catch (InterruptedException e) {
 593  0
           LOG.warn("onlineZooKeeperServers: Strange interrupt from " +
 594  0
               e.getMessage(), e);
 595  0
         }
 596  
       }
 597  
     }
 598  0
   }
 599  
 
 600  
   /**
 601  
    * Wait for all workers to signal completion.  Will wait up to
 602  
    * WAIT_TASK_DONE_TIMEOUT_MS milliseconds for this to complete before
 603  
    * reporting an error.
 604  
    *
 605  
    * @param totalWorkers Number of workers to wait for
 606  
    */
 607  
   private void waitUntilAllTasksDone(int totalWorkers) {
 608  0
     int attempt = 0;
 609  0
     long maxMs = time.getMilliseconds() +
 610  0
         conf.getWaitTaskDoneTimeoutMs();
 611  
     while (true) {
 612  0
       boolean[] taskDoneArray = new boolean[totalWorkers];
 613  
       try {
 614  0
         FileStatus [] fileStatusArray =
 615  0
             fs.listStatus(taskDirectory);
 616  0
         int totalDone = 0;
 617  0
         if (fileStatusArray.length > 0) {
 618  0
           for (FileStatus fileStatus : fileStatusArray) {
 619  0
             String name = fileStatus.getPath().getName();
 620  0
             if (ComputationDoneName.isName(name)) {
 621  0
               ++totalDone;
 622  0
               taskDoneArray[ComputationDoneName.fromName(
 623  0
                   name).getWorkerId()] = true;
 624  
             }
 625  
           }
 626  
         }
 627  0
         if (LOG.isInfoEnabled()) {
 628  0
           LOG.info("waitUntilAllTasksDone: Got " + totalDone +
 629  
               " and " + totalWorkers +
 630  
               " desired (polling period is " +
 631  
               pollMsecs + ") on attempt " +
 632  
               attempt);
 633  
         }
 634  0
         if (totalDone >= totalWorkers) {
 635  0
           break;
 636  
         } else {
 637  0
           StringBuilder sb = new StringBuilder();
 638  0
           for (int i = 0; i < taskDoneArray.length; ++i) {
 639  0
             if (!taskDoneArray[i]) {
 640  0
               sb.append(i).append(", ");
 641  
             }
 642  
           }
 643  0
           LOG.info("waitUntilAllTasksDone: Still waiting on tasks " +
 644  0
               sb.toString());
 645  
         }
 646  0
         ++attempt;
 647  0
         Thread.sleep(pollMsecs);
 648  0
         context.progress();
 649  0
       } catch (IOException e) {
 650  0
         LOG.warn("waitUntilAllTasksDone: Got IOException.", e);
 651  0
       } catch (InterruptedException e) {
 652  0
         LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e);
 653  0
       }
 654  
 
 655  0
       if (time.getMilliseconds() > maxMs) {
 656  0
         throw new IllegalStateException("waitUntilAllTasksDone: Tasks " +
 657  
             "did not finish by the maximum time of " +
 658  0
             conf.getWaitTaskDoneTimeoutMs() + " milliseconds");
 659  
       }
 660  0
     }
 661  0
   }
 662  
 
 663  
   /**
 664  
    * Notify the ZooKeeper servers that this partition is done with all
 665  
    * ZooKeeper communication.  If this task is running a ZooKeeper server,
 666  
    * kill it when all partitions are done and wait for
 667  
    * completion.  Clean up the ZooKeeper local directory as well.
 668  
    *
 669  
    * @param state State of the application
 670  
    */
 671  
   public void offlineZooKeeperServers(State state) {
 672  0
     if (state == State.FINISHED) {
 673  0
       createZooKeeperClosedStamp();
 674  
     }
 675  0
     synchronized (this) {
 676  0
       if (zkRunner != null) {
 677  0
         boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
 678  0
         int totalWorkers = conf.getMapTasks();
 679  
         // A Yarn job always spawns MAX_WORKERS + 1 containers
 680  0
         if (isYarnJob) {
 681  0
           totalWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0) + 1;
 682  
         }
 683  0
         LOG.info("offlineZooKeeperServers: Will wait for " +
 684  
             totalWorkers + " tasks");
 685  0
         waitUntilAllTasksDone(totalWorkers);
 686  0
         zkRunner.stop();
 687  
         File zkDirFile;
 688  
         try {
 689  0
           zkDirFile = new File(zkDir);
 690  0
           FileUtils.deleteDirectory(zkDirFile);
 691  0
         } catch (IOException e) {
 692  0
           LOG.warn("offlineZooKeeperSevers: " +
 693  
                   "IOException, but continuing",
 694  
               e);
 695  0
         }
 696  0
         if (LOG.isInfoEnabled()) {
 697  0
           LOG.info("offlineZooKeeperServers: deleted directory " + zkDir);
 698  
         }
 699  0
         zkRunner = null;
 700  
       }
 701  0
     }
 702  0
   }
 703  
 
 704  
   /**
 705  
    * Create appropriate zookeeper wrapper depending on configuration.
 706  
    * Zookeeper can run in master process or outside as a separate
 707  
    * java process.
 708  
    *
 709  
    * @return either in process or out of process wrapper.
 710  
    */
 711  
   private ZooKeeperRunner createRunner() {
 712  0
     ZooKeeperRunner runner = new InProcessZooKeeperRunner();
 713  0
     runner.setConf(conf);
 714  0
     return runner;
 715  
   }
 716  
 
 717  
   /**
 718  
    *  Is this task running a ZooKeeper server?  Only could be true if called
 719  
    *  after onlineZooKeeperServers().
 720  
    *
 721  
    *  @return true if running a ZooKeeper server, false otherwise
 722  
    */
 723  
   public boolean runsZooKeeper() {
 724  0
     synchronized (this) {
 725  0
       return zkRunner != null;
 726  0
     }
 727  
   }
 728  
 
 729  
   /**
 730  
    * Mark files zookeeper creates in hdfs to be deleted on exit.
 731  
    * To be called on master, since it's the last one who finishes.
 732  
    */
 733  
   public void cleanupOnExit() {
 734  
     try {
 735  0
       fs.deleteOnExit(baseDirectory);
 736  0
     } catch (IOException e) {
 737  0
       LOG.error("cleanupOnExit: Failed to delete on exit " + baseDirectory);
 738  0
     }
 739  0
   }
 740  
 
 741  
   /**
 742  
    * Do necessary cleanup in zookeeper wrapper.
 743  
    */
 744  
   public void cleanup() {
 745  0
     synchronized (this) {
 746  0
       if (zkRunner != null) {
 747  0
         zkRunner.cleanup();
 748  
       }
 749  0
     }
 750  0
   }
 751  
 }