Coverage Report - org.apache.giraph.bsp.BspService
 
Classes in this File Line Coverage Branch Coverage Complexity
BspService
0%
0/305
0%
0/110
3.298
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.bsp;
 20  
 
 21  
 import org.apache.giraph.conf.GiraphConstants;
 22  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 23  
 import org.apache.giraph.graph.GraphTaskManager;
 24  
 import org.apache.giraph.job.JobProgressTracker;
 25  
 import org.apache.giraph.partition.GraphPartitionerFactory;
 26  
 import org.apache.giraph.utils.CheckpointingUtils;
 27  
 import org.apache.giraph.worker.WorkerInfo;
 28  
 import org.apache.giraph.writable.kryo.GiraphClassResolver;
 29  
 import org.apache.giraph.zk.BspEvent;
 30  
 import org.apache.giraph.zk.PredicateLock;
 31  
 import org.apache.giraph.zk.ZooKeeperExt;
 32  
 import org.apache.giraph.zk.ZooKeeperManager;
 33  
 import org.apache.hadoop.fs.FileSystem;
 34  
 import org.apache.hadoop.io.Writable;
 35  
 import org.apache.hadoop.io.WritableComparable;
 36  
 import org.apache.hadoop.mapreduce.Mapper;
 37  
 import org.apache.log4j.Logger;
 38  
 import org.apache.zookeeper.CreateMode;
 39  
 import org.apache.zookeeper.KeeperException;
 40  
 import org.apache.zookeeper.WatchedEvent;
 41  
 import org.apache.zookeeper.Watcher;
 42  
 import org.apache.zookeeper.Watcher.Event.EventType;
 43  
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 44  
 import org.apache.zookeeper.ZooDefs.Ids;
 45  
 import org.json.JSONException;
 46  
 import org.json.JSONObject;
 47  
 
 48  
 import java.io.IOException;
 49  
 import java.net.UnknownHostException;
 50  
 import java.nio.charset.Charset;
 51  
 import java.util.ArrayList;
 52  
 import java.util.Collections;
 53  
 import java.util.List;
 54  
 
 55  
 import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID;
 56  
 
 57  
 /**
 58  
  * Zookeeper-based implementation of {@link CentralizedService}.
 59  
  *
 60  
  * @param <I> Vertex id
 61  
  * @param <V> Vertex data
 62  
  * @param <E> Edge data
 63  
  */
 64  
 @SuppressWarnings("rawtypes")
 65  
 public abstract class BspService<I extends WritableComparable,
 66  
     V extends Writable, E extends Writable>
 67  
     implements Watcher, CentralizedService<I, V, E> {
 68  
   /** Unset superstep */
 69  
   public static final long UNSET_SUPERSTEP = Long.MIN_VALUE;
 70  
   /** Input superstep (superstep when loading the vertices happens) */
 71  
   public static final long INPUT_SUPERSTEP = -1;
 72  
   /** Unset application attempt */
 73  
   public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE;
 74  
   /** Base ZooKeeper directory */
 75  
   public static final String BASE_DIR = "/_hadoopBsp";
 76  
   /** Master job state znode above base dir */
 77  
   public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
 78  
 
 79  
   /** Input splits worker done directory */
 80  
   public static final String INPUT_SPLITS_WORKER_DONE_DIR =
 81  
       "/_inputSplitsWorkerDoneDir";
 82  
   /** Input splits all done node*/
 83  
   public static final String INPUT_SPLITS_ALL_DONE_NODE =
 84  
       "/_inputSplitsAllDone";
 85  
   /** Directory to store kryo className-ID assignment */
 86  
   public static final String KRYO_REGISTERED_CLASS_DIR =
 87  
           "/_kryo";
 88  
   /** Directory of attempts of this application */
 89  
   public static final String APPLICATION_ATTEMPTS_DIR =
 90  
       "/_applicationAttemptsDir";
 91  
   /** Where the master election happens */
 92  
   public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
 93  
   /** Superstep scope */
 94  
   public static final String SUPERSTEP_DIR = "/_superstepDir";
 95  
   /** Counter sub directory */
 96  
   public static final String COUNTERS_DIR = "/_counters";
 97  
   /** Metrics sub directory */
 98  
   public static final String METRICS_DIR = "/_metrics";
 99  
   /** Healthy workers register here. */
 100  
   public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
 101  
   /** Unhealthy workers register here. */
 102  
   public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
 103  
   /** Workers which wrote checkpoint notify here */
 104  
   public static final String WORKER_WROTE_CHECKPOINT_DIR =
 105  
       "/_workerWroteCheckpointDir";
 106  
   /** Finished workers notify here */
 107  
   public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
 108  
   /** Helps coordinate the partition exchnages */
 109  
   public static final String PARTITION_EXCHANGE_DIR =
 110  
       "/_partitionExchangeDir";
 111  
   /** Denotes that the superstep is done */
 112  
   public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
 113  
   /** Denotes that computation should be halted */
 114  
   public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
 115  
   /** Memory observer dir */
 116  
   public static final String MEMORY_OBSERVER_DIR = "/_memoryObserver";
 117  
   /** User sets this flag to checkpoint and stop the job */
 118  
   public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop";
 119  
   /** Denotes which workers have been cleaned up */
 120  
   public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
 121  
   /** JSON message count key */
 122  
   public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
 123  
   /** JSON message bytes count key */
 124  
   public static final String JSONOBJ_NUM_MESSAGE_BYTES_KEY = "_numMsgBytesKey";
 125  
   /** JSON metrics key */
 126  
   public static final String JSONOBJ_METRICS_KEY = "_metricsKey";
 127  
 
 128  
   /** JSON state key */
 129  
   public static final String JSONOBJ_STATE_KEY = "_stateKey";
 130  
   /** JSON application attempt key */
 131  
   public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY =
 132  
       "_applicationAttemptKey";
 133  
   /** JSON superstep key */
 134  
   public static final String JSONOBJ_SUPERSTEP_KEY =
 135  
       "_superstepKey";
 136  
   /** Suffix denotes a worker */
 137  
   public static final String WORKER_SUFFIX = "_worker";
 138  
   /** Suffix denotes a master */
 139  
   public static final String MASTER_SUFFIX = "_master";
 140  
 
 141  
   /** Class logger */
 142  0
   private static final Logger LOG = Logger.getLogger(BspService.class);
 143  
   /** Path to the job's root */
 144  
   protected final String basePath;
 145  
   /** Path to the job state determined by the master (informative only) */
 146  
   protected final String masterJobStatePath;
 147  
   /** Input splits worker done directory */
 148  
   protected final String inputSplitsWorkerDonePath;
 149  
   /** Input splits all done node */
 150  
   protected final String inputSplitsAllDonePath;
 151  
   /** Path to the application attempts) */
 152  
   protected final String applicationAttemptsPath;
 153  
   /** Path to the cleaned up notifications */
 154  
   protected final String cleanedUpPath;
 155  
   /** Path to the checkpoint's root (including job id) */
 156  
   protected final String checkpointBasePath;
 157  
   /** Old checkpoint in case we want to restart some job */
 158  
   protected final String savedCheckpointBasePath;
 159  
   /** Path to the master election path */
 160  
   protected final String masterElectionPath;
 161  
   /** If this path exists computation will be halted */
 162  
   protected final String haltComputationPath;
 163  
   /** Path where memory observer stores data */
 164  
   protected final String memoryObserverPath;
 165  
   /** Kryo className-ID mapping directory */
 166  
   protected final String kryoRegisteredClassPath;
 167  
   /** Private ZooKeeper instance that implements the service */
 168  
   private final ZooKeeperExt zk;
 169  
   /** Has the Connection occurred? */
 170  
   private final BspEvent connectedEvent;
 171  
   /** Has worker registration changed (either healthy or unhealthy) */
 172  
   private final BspEvent workerHealthRegistrationChanged;
 173  
   /** Application attempt changed */
 174  
   private final BspEvent applicationAttemptChanged;
 175  
   /** Input splits worker done */
 176  
   private final BspEvent inputSplitsWorkerDoneEvent;
 177  
   /** Input splits all done */
 178  
   private final BspEvent inputSplitsAllDoneEvent;
 179  
   /** Superstep finished synchronization */
 180  
   private final BspEvent superstepFinished;
 181  
   /** Master election changed for any waited on attempt */
 182  
   private final BspEvent masterElectionChildrenChanged;
 183  
   /** Cleaned up directory children changed*/
 184  
   private final BspEvent cleanedUpChildrenChanged;
 185  
   /** Event to synchronize when workers have written their counters to the
 186  
    * zookeeper*/
 187  
   private final BspEvent writtenCountersToZK;
 188  
   /** Registered list of BspEvents */
 189  0
   private final List<BspEvent> registeredBspEvents =
 190  
       new ArrayList<BspEvent>();
 191  
   /** Immutable configuration of the job*/
 192  
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
 193  
   /** Job context (mainly for progress) */
 194  
   private final Mapper<?, ?, ?, ?>.Context context;
 195  
   /** Cached superstep (from ZooKeeper) */
 196  0
   private long cachedSuperstep = UNSET_SUPERSTEP;
 197  
   /** Restarted from a checkpoint (manual or automatic) */
 198  0
   private long restartedSuperstep = UNSET_SUPERSTEP;
 199  
   /** Cached application attempt (from ZooKeeper) */
 200  0
   private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
 201  
   /** Job id, to ensure uniqueness */
 202  
   private final String jobId;
 203  
   /** Task id, from partition and application attempt to ensure uniqueness */
 204  
   private final int taskId;
 205  
   /** My hostname */
 206  
   private final String hostname;
 207  
   /** Combination of hostname '_' task (unique id) */
 208  
   private final String hostnameTaskId;
 209  
   /** Graph partitioner */
 210  
   private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
 211  
   /** Mapper that will do the graph computation */
 212  
   private final GraphTaskManager<I, V, E> graphTaskManager;
 213  
   /** File system */
 214  
   private final FileSystem fs;
 215  
 
 216  
   /**
 217  
    * Constructor.
 218  
    *
 219  
    * @param context Mapper context
 220  
    * @param graphTaskManager GraphTaskManager for this compute node
 221  
    */
 222  
   public BspService(
 223  
       Mapper<?, ?, ?, ?>.Context context,
 224  0
       GraphTaskManager<I, V, E> graphTaskManager) {
 225  0
     this.connectedEvent = new PredicateLock(context);
 226  0
     this.workerHealthRegistrationChanged = new PredicateLock(context);
 227  0
     this.applicationAttemptChanged = new PredicateLock(context);
 228  0
     this.inputSplitsWorkerDoneEvent = new PredicateLock(context);
 229  0
     this.inputSplitsAllDoneEvent = new PredicateLock(context);
 230  0
     this.superstepFinished = new PredicateLock(context);
 231  0
     this.masterElectionChildrenChanged = new PredicateLock(context);
 232  0
     this.cleanedUpChildrenChanged = new PredicateLock(context);
 233  0
     this.writtenCountersToZK = new PredicateLock(context);
 234  
 
 235  0
     registerBspEvent(connectedEvent);
 236  0
     registerBspEvent(workerHealthRegistrationChanged);
 237  0
     registerBspEvent(inputSplitsWorkerDoneEvent);
 238  0
     registerBspEvent(inputSplitsAllDoneEvent);
 239  0
     registerBspEvent(applicationAttemptChanged);
 240  0
     registerBspEvent(superstepFinished);
 241  0
     registerBspEvent(masterElectionChildrenChanged);
 242  0
     registerBspEvent(cleanedUpChildrenChanged);
 243  0
     registerBspEvent(writtenCountersToZK);
 244  
 
 245  0
     this.context = context;
 246  0
     this.graphTaskManager = graphTaskManager;
 247  0
     this.conf = graphTaskManager.getConf();
 248  
 
 249  0
     this.jobId = conf.getJobId();
 250  0
     this.restartedSuperstep = conf.getLong(
 251  
         GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
 252  
     try {
 253  0
       this.hostname = conf.getLocalHostname();
 254  0
     } catch (UnknownHostException e) {
 255  0
       throw new RuntimeException(e);
 256  0
     }
 257  0
     this.graphPartitionerFactory = conf.createGraphPartitioner();
 258  
 
 259  0
     basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
 260  0
     if (LOG.isInfoEnabled()) {
 261  0
       LOG.info(String.format("%s: %s",
 262  
               GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP, basePath));
 263  
     }
 264  0
     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
 265  0
     inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR;
 266  0
     inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
 267  0
     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
 268  0
     cleanedUpPath = basePath + CLEANED_UP_DIR;
 269  0
     kryoRegisteredClassPath = basePath + KRYO_REGISTERED_CLASS_DIR;
 270  
 
 271  
 
 272  0
     String restartJobId = RESTART_JOB_ID.get(conf);
 273  
 
 274  0
     savedCheckpointBasePath =
 275  0
         CheckpointingUtils.getCheckpointBasePath(getConfiguration(),
 276  0
             restartJobId == null ? getJobId() : restartJobId);
 277  
 
 278  0
     checkpointBasePath = CheckpointingUtils.
 279  0
         getCheckpointBasePath(getConfiguration(), getJobId());
 280  
 
 281  0
     masterElectionPath = basePath + MASTER_ELECTION_DIR;
 282  0
     String serverPortList = graphTaskManager.getZookeeperList();
 283  0
     haltComputationPath = basePath + HALT_COMPUTATION_NODE;
 284  0
     memoryObserverPath = basePath + MEMORY_OBSERVER_DIR;
 285  0
     getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
 286  
         haltComputationPath);
 287  0
     if (LOG.isInfoEnabled()) {
 288  0
       LOG.info("BspService: Path to create to halt is " + haltComputationPath);
 289  
     }
 290  0
     if (LOG.isInfoEnabled()) {
 291  0
       LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
 292  0
           ", partition " + conf.getTaskPartition() + " on " + serverPortList);
 293  
     }
 294  
     try {
 295  0
       this.zk = new ZooKeeperExt(serverPortList,
 296  0
                                  conf.getZooKeeperSessionTimeout(),
 297  0
                                  conf.getZookeeperOpsMaxAttempts(),
 298  0
                                  conf.getZookeeperOpsRetryWaitMsecs(),
 299  
                                  this,
 300  
                                  context);
 301  0
       connectedEvent.waitForTimeoutOrFail(
 302  0
           GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(conf));
 303  0
       this.fs = FileSystem.get(getConfiguration());
 304  0
     } catch (IOException e) {
 305  0
       throw new RuntimeException(e);
 306  0
     }
 307  
 
 308  0
     boolean disableGiraphResolver =
 309  0
             GiraphConstants.DISABLE_GIRAPH_CLASS_RESOLVER.get(conf);
 310  0
     if (!disableGiraphResolver) {
 311  0
       GiraphClassResolver.setZookeeperInfo(zk, kryoRegisteredClassPath);
 312  
     }
 313  0
     this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
 314  0
             conf.getTaskPartition();
 315  0
     this.hostnameTaskId = hostname + "_" + getTaskId();
 316  
 
 317  
     //Trying to restart from the latest superstep
 318  0
     if (restartJobId != null &&
 319  
         restartedSuperstep == UNSET_SUPERSTEP) {
 320  
       try {
 321  0
         restartedSuperstep = getLastCheckpointedSuperstep();
 322  0
       } catch (IOException e) {
 323  0
         throw new RuntimeException(e);
 324  0
       }
 325  
     }
 326  0
     this.cachedSuperstep = restartedSuperstep;
 327  0
     if ((restartedSuperstep != UNSET_SUPERSTEP) &&
 328  
         (restartedSuperstep < 0)) {
 329  0
       throw new IllegalArgumentException(
 330  
           "BspService: Invalid superstep to restart - " +
 331  
               restartedSuperstep);
 332  
     }
 333  0
   }
 334  
 
 335  
   /**
 336  
    * Get the superstep from a ZooKeeper path
 337  
    *
 338  
    * @param path Path to parse for the superstep
 339  
    * @return Superstep from the path.
 340  
    */
 341  
   public static long getSuperstepFromPath(String path) {
 342  0
     int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR);
 343  0
     if (foundSuperstepStart == -1) {
 344  0
       throw new IllegalArgumentException(
 345  
           "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR +
 346  
           "from " + path);
 347  
     }
 348  0
     foundSuperstepStart += SUPERSTEP_DIR.length() + 1;
 349  0
     int endIndex = foundSuperstepStart +
 350  0
         path.substring(foundSuperstepStart).indexOf("/");
 351  0
     if (endIndex == -1) {
 352  0
       throw new IllegalArgumentException(
 353  
           "getSuperstepFromPath: Cannot find end of superstep from " +
 354  
               path);
 355  
     }
 356  0
     if (LOG.isTraceEnabled()) {
 357  0
       LOG.trace("getSuperstepFromPath: Got path=" + path +
 358  
           ", start=" + foundSuperstepStart + ", end=" + endIndex);
 359  
     }
 360  0
     return Long.parseLong(path.substring(foundSuperstepStart, endIndex));
 361  
   }
 362  
 
 363  
   /**
 364  
    * Get the hostname and id from a "healthy" worker path
 365  
    *
 366  
    * @param path Path to check
 367  
    * @return Hostname and id from path
 368  
    */
 369  
   public static String getHealthyHostnameIdFromPath(String path) {
 370  0
     int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR);
 371  0
     if (foundWorkerHealthyStart == -1) {
 372  0
       throw new IllegalArgumentException(
 373  
           "getHealthyHostnameidFromPath: Couldn't find " +
 374  
               WORKER_HEALTHY_DIR + " from " + path);
 375  
     }
 376  0
     foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length();
 377  0
     return path.substring(foundWorkerHealthyStart);
 378  
   }
 379  
 
 380  
   /**
 381  
    * Generate the base superstep directory path for a given application
 382  
    * attempt
 383  
    *
 384  
    * @param attempt application attempt number
 385  
    * @return directory path based on the an attempt
 386  
    */
 387  
   public final String getSuperstepPath(long attempt) {
 388  0
     return applicationAttemptsPath + "/" + attempt + SUPERSTEP_DIR;
 389  
   }
 390  
 
 391  
   /**
 392  
    * Generate the worker information "healthy" directory path for a
 393  
    * superstep
 394  
    *
 395  
    * @param attempt application attempt number
 396  
    * @param superstep superstep to use
 397  
    * @return directory path based on the a superstep
 398  
    */
 399  
   public final String getWorkerInfoHealthyPath(long attempt,
 400  
       long superstep) {
 401  0
     return applicationAttemptsPath + "/" + attempt +
 402  
         SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR;
 403  
   }
 404  
 
 405  
   /**
 406  
    * Generate the worker information "unhealthy" directory path for a
 407  
    * superstep
 408  
    *
 409  
    * @param attempt application attempt number
 410  
    * @param superstep superstep to use
 411  
    * @return directory path based on the a superstep
 412  
    */
 413  
   public final String getWorkerInfoUnhealthyPath(long attempt,
 414  
       long superstep) {
 415  0
     return applicationAttemptsPath + "/" + attempt +
 416  
         SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR;
 417  
   }
 418  
 
 419  
   /**
 420  
    * Generate the worker "wrote checkpoint" directory path for a
 421  
    * superstep
 422  
    *
 423  
    * @param attempt application attempt number
 424  
    * @param superstep superstep to use
 425  
    * @return directory path based on the a superstep
 426  
    */
 427  
   public final String getWorkerWroteCheckpointPath(long attempt,
 428  
       long superstep) {
 429  0
     return applicationAttemptsPath + "/" + attempt +
 430  
         SUPERSTEP_DIR + "/" + superstep + WORKER_WROTE_CHECKPOINT_DIR;
 431  
   }
 432  
 
 433  
   /**
 434  
    * Generate the worker "finished" directory path for a
 435  
    * superstep, for storing the superstep-related metrics
 436  
    *
 437  
    * @param attempt application attempt number
 438  
    * @param superstep superstep to use
 439  
    * @return directory path based on the a superstep
 440  
    */
 441  
   public final String getWorkerMetricsFinishedPath(
 442  
           long attempt, long superstep) {
 443  0
     return applicationAttemptsPath + "/" + attempt +
 444  
             SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR + METRICS_DIR;
 445  
   }
 446  
 
 447  
   /**
 448  
    * Generate the worker "finished" directory path for a
 449  
    * superstep, for storing the superstep-related counters
 450  
    *
 451  
    * @param attempt application attempt number
 452  
    * @param superstep superstep to use
 453  
    * @return directory path based on the a superstep
 454  
    */
 455  
   public final String getWorkerCountersFinishedPath(
 456  
           long attempt, long superstep) {
 457  0
     return applicationAttemptsPath + "/" + attempt +
 458  
             SUPERSTEP_DIR + "/" + superstep +
 459  
             WORKER_FINISHED_DIR + COUNTERS_DIR;
 460  
   }
 461  
 
 462  
   /**
 463  
    * Generate the "partition exchange" directory path for a superstep
 464  
    *
 465  
    * @param attempt application attempt number
 466  
    * @param superstep superstep to use
 467  
    * @return directory path based on the a superstep
 468  
    */
 469  
   public final String getPartitionExchangePath(long attempt,
 470  
       long superstep) {
 471  0
     return applicationAttemptsPath + "/" + attempt +
 472  
         SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR;
 473  
   }
 474  
 
 475  
   /**
 476  
    * Based on the superstep, worker info, and attempt, get the appropriate
 477  
    * worker path for the exchange.
 478  
    *
 479  
    * @param attempt Application attempt
 480  
    * @param superstep Superstep
 481  
    * @param workerInfo Worker info of the exchange.
 482  
    * @return Path of the desired worker
 483  
    */
 484  
   public final String getPartitionExchangeWorkerPath(long attempt,
 485  
       long superstep,
 486  
       WorkerInfo workerInfo) {
 487  0
     return getPartitionExchangePath(attempt, superstep) +
 488  0
         "/" + workerInfo.getHostnameId();
 489  
   }
 490  
 
 491  
   /**
 492  
    * Generate the "superstep finished" directory path for a superstep
 493  
    *
 494  
    * @param attempt application attempt number
 495  
    * @param superstep superstep to use
 496  
    * @return directory path based on the a superstep
 497  
    */
 498  
   public final String getSuperstepFinishedPath(long attempt, long superstep) {
 499  0
     return applicationAttemptsPath + "/" + attempt +
 500  
         SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE;
 501  
   }
 502  
 
 503  
   /**
 504  
    * Generate the base superstep directory path for a given application
 505  
    * attempt
 506  
    *
 507  
    * @param superstep Superstep to use
 508  
    * @return Directory path based on the a superstep
 509  
    */
 510  
   public final String getCheckpointBasePath(long superstep) {
 511  0
     return checkpointBasePath + "/" + superstep;
 512  
   }
 513  
 
 514  
   /**
 515  
    * In case when we restart another job this will give us a path
 516  
    * to saved checkpoint.
 517  
    * @param superstep superstep to use
 518  
    * @return Direcory path for restarted job based on the superstep
 519  
    */
 520  
   public final String getSavedCheckpointBasePath(long superstep) {
 521  0
     return savedCheckpointBasePath + "/" + superstep;
 522  
   }
 523  
 
 524  
 
 525  
   /**
 526  
    * Get the ZooKeeperExt instance.
 527  
    *
 528  
    * @return ZooKeeperExt instance.
 529  
    */
 530  
   public final ZooKeeperExt getZkExt() {
 531  0
     return zk;
 532  
   }
 533  
 
 534  
   @Override
 535  
   public final long getRestartedSuperstep() {
 536  0
     return restartedSuperstep;
 537  
   }
 538  
 
 539  
   /**
 540  
    * Set the restarted superstep
 541  
    *
 542  
    * @param superstep Set the manually restarted superstep
 543  
    */
 544  
   public final void setRestartedSuperstep(long superstep) {
 545  0
     if (superstep < INPUT_SUPERSTEP) {
 546  0
       throw new IllegalArgumentException(
 547  
           "setRestartedSuperstep: Bad argument " + superstep);
 548  
     }
 549  0
     restartedSuperstep = superstep;
 550  0
   }
 551  
 
 552  
   /**
 553  
    * Get the file system
 554  
    *
 555  
    * @return file system
 556  
    */
 557  
   public final FileSystem getFs() {
 558  0
     return fs;
 559  
   }
 560  
 
 561  
   public final ImmutableClassesGiraphConfiguration<I, V, E>
 562  
   getConfiguration() {
 563  0
     return conf;
 564  
   }
 565  
 
 566  
   public final Mapper<?, ?, ?, ?>.Context getContext() {
 567  0
     return context;
 568  
   }
 569  
 
 570  
   public final String getHostname() {
 571  0
     return hostname;
 572  
   }
 573  
 
 574  
   public final String getHostnameTaskId() {
 575  0
     return hostnameTaskId;
 576  
   }
 577  
 
 578  
   public final int getTaskId() {
 579  0
     return taskId;
 580  
   }
 581  
 
 582  
   public final GraphTaskManager<I, V, E> getGraphTaskManager() {
 583  0
     return graphTaskManager;
 584  
   }
 585  
 
 586  
   public final BspEvent getWorkerHealthRegistrationChangedEvent() {
 587  0
     return workerHealthRegistrationChanged;
 588  
   }
 589  
 
 590  
   public final BspEvent getApplicationAttemptChangedEvent() {
 591  0
     return applicationAttemptChanged;
 592  
   }
 593  
 
 594  
   public final BspEvent getInputSplitsWorkerDoneEvent() {
 595  0
     return inputSplitsWorkerDoneEvent;
 596  
   }
 597  
 
 598  
   public final BspEvent getInputSplitsAllDoneEvent() {
 599  0
     return inputSplitsAllDoneEvent;
 600  
   }
 601  
 
 602  
   public final BspEvent getSuperstepFinishedEvent() {
 603  0
     return superstepFinished;
 604  
   }
 605  
 
 606  
 
 607  
   public final BspEvent getMasterElectionChildrenChangedEvent() {
 608  0
     return masterElectionChildrenChanged;
 609  
   }
 610  
 
 611  
   public final BspEvent getCleanedUpChildrenChangedEvent() {
 612  0
     return cleanedUpChildrenChanged;
 613  
   }
 614  
 
 615  
   public final BspEvent getWrittenCountersToZKEvent() {
 616  0
     return writtenCountersToZK;
 617  
   }
 618  
 
 619  
   /**
 620  
    * Get the master commanded job state as a JSONObject.  Also sets the
 621  
    * watches to see if the master commanded job state changes.
 622  
    *
 623  
    * @return Last job state or null if none
 624  
    */
 625  
   public final JSONObject getJobState() {
 626  
     try {
 627  0
       getZkExt().createExt(masterJobStatePath,
 628  
           null,
 629  
           Ids.OPEN_ACL_UNSAFE,
 630  
           CreateMode.PERSISTENT,
 631  
           true);
 632  0
     } catch (KeeperException.NodeExistsException e) {
 633  0
       LOG.info("getJobState: Job state already exists (" +
 634  
           masterJobStatePath + ")");
 635  0
     } catch (KeeperException e) {
 636  0
       throw new IllegalStateException("Failed to create job state path " +
 637  
           "due to KeeperException", e);
 638  0
     } catch (InterruptedException e) {
 639  0
       throw new IllegalStateException("Failed to create job state path " +
 640  
           "due to InterruptedException", e);
 641  0
     }
 642  0
     String jobState = null;
 643  
     try {
 644  0
       List<String> childList =
 645  0
           getZkExt().getChildrenExt(
 646  
               masterJobStatePath, true, true, true);
 647  0
       if (childList.isEmpty()) {
 648  0
         return null;
 649  
       }
 650  0
       jobState =
 651  0
           new String(getZkExt().getData(childList.get(childList.size() - 1),
 652  0
               true, null), Charset.defaultCharset());
 653  0
     } catch (KeeperException.NoNodeException e) {
 654  0
       LOG.info("getJobState: Job state path is empty! - " +
 655  
           masterJobStatePath);
 656  0
     } catch (KeeperException e) {
 657  0
       throw new IllegalStateException("Failed to get job state path " +
 658  
           "children due to KeeperException", e);
 659  0
     } catch (InterruptedException e) {
 660  0
       throw new IllegalStateException("Failed to get job state path " +
 661  
           "children due to InterruptedException", e);
 662  0
     }
 663  
     try {
 664  0
       return new JSONObject(jobState);
 665  0
     } catch (JSONException e) {
 666  0
       throw new RuntimeException(
 667  
           "getJobState: Failed to parse job state " + jobState);
 668  
     }
 669  
   }
 670  
 
 671  
   /**
 672  
    * Get the job id
 673  
    *
 674  
    * @return job id
 675  
    */
 676  
   public final String getJobId() {
 677  0
     return jobId;
 678  
   }
 679  
 
 680  
   /**
 681  
    * Get the latest application attempt and cache it.
 682  
    *
 683  
    * @return the latest application attempt
 684  
    */
 685  
   public final long getApplicationAttempt() {
 686  0
     if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) {
 687  0
       return cachedApplicationAttempt;
 688  
     }
 689  
     try {
 690  0
       getZkExt().createExt(applicationAttemptsPath,
 691  
           null,
 692  
           Ids.OPEN_ACL_UNSAFE,
 693  
           CreateMode.PERSISTENT,
 694  
           true);
 695  0
     } catch (KeeperException.NodeExistsException e) {
 696  0
       LOG.info("getApplicationAttempt: Node " +
 697  
           applicationAttemptsPath + " already exists!");
 698  0
     } catch (KeeperException e) {
 699  0
       throw new IllegalStateException("Couldn't create application " +
 700  
           "attempts path due to KeeperException", e);
 701  0
     } catch (InterruptedException e) {
 702  0
       throw new IllegalStateException("Couldn't create application " +
 703  
           "attempts path due to InterruptedException", e);
 704  0
     }
 705  
     try {
 706  0
       List<String> attemptList =
 707  0
           getZkExt().getChildrenExt(
 708  
               applicationAttemptsPath, true, false, false);
 709  0
       if (attemptList.isEmpty()) {
 710  0
         cachedApplicationAttempt = 0;
 711  
       } else {
 712  0
         cachedApplicationAttempt =
 713  0
             Long.parseLong(Collections.max(attemptList));
 714  
       }
 715  0
     } catch (KeeperException e) {
 716  0
       throw new IllegalStateException("Couldn't get application " +
 717  
           "attempts to KeeperException", e);
 718  0
     } catch (InterruptedException e) {
 719  0
       throw new IllegalStateException("Couldn't get application " +
 720  
           "attempts to InterruptedException", e);
 721  0
     }
 722  
 
 723  0
     return cachedApplicationAttempt;
 724  
   }
 725  
 
 726  
   /**
 727  
    * Get the latest superstep and cache it.
 728  
    *
 729  
    * @return the latest superstep
 730  
    */
 731  
   public final long getSuperstep() {
 732  0
     if (cachedSuperstep != UNSET_SUPERSTEP) {
 733  0
       return cachedSuperstep;
 734  
     }
 735  0
     String superstepPath = getSuperstepPath(getApplicationAttempt());
 736  
     try {
 737  0
       getZkExt().createExt(superstepPath,
 738  
           null,
 739  
           Ids.OPEN_ACL_UNSAFE,
 740  
           CreateMode.PERSISTENT,
 741  
           true);
 742  0
     } catch (KeeperException.NodeExistsException e) {
 743  0
       if (LOG.isInfoEnabled()) {
 744  0
         LOG.info("getApplicationAttempt: Node " +
 745  
             applicationAttemptsPath + " already exists!");
 746  
       }
 747  0
     } catch (KeeperException e) {
 748  0
       throw new IllegalStateException(
 749  
           "getSuperstep: KeeperException", e);
 750  0
     } catch (InterruptedException e) {
 751  0
       throw new IllegalStateException(
 752  
           "getSuperstep: InterruptedException", e);
 753  0
     }
 754  
 
 755  
     List<String> superstepList;
 756  
     try {
 757  0
       superstepList =
 758  0
           getZkExt().getChildrenExt(superstepPath, true, false, false);
 759  0
     } catch (KeeperException e) {
 760  0
       throw new IllegalStateException(
 761  
           "getSuperstep: KeeperException", e);
 762  0
     } catch (InterruptedException e) {
 763  0
       throw new IllegalStateException(
 764  
           "getSuperstep: InterruptedException", e);
 765  0
     }
 766  0
     if (superstepList.isEmpty()) {
 767  0
       cachedSuperstep = INPUT_SUPERSTEP;
 768  
     } else {
 769  0
       cachedSuperstep =
 770  0
           Long.parseLong(Collections.max(superstepList));
 771  
     }
 772  
 
 773  0
     return cachedSuperstep;
 774  
   }
 775  
 
 776  
   /**
 777  
    * Increment the cached superstep.  Shouldn't be the initial value anymore.
 778  
    */
 779  
   public final void incrCachedSuperstep() {
 780  0
     if (cachedSuperstep == UNSET_SUPERSTEP) {
 781  0
       throw new IllegalStateException(
 782  
           "incrSuperstep: Invalid unset cached superstep " +
 783  
               UNSET_SUPERSTEP);
 784  
     }
 785  0
     ++cachedSuperstep;
 786  0
   }
 787  
 
 788  
   /**
 789  
    * Set the cached superstep (should only be used for loading checkpoints
 790  
    * or recovering from failure).
 791  
    *
 792  
    * @param superstep will be used as the next superstep iteration
 793  
    */
 794  
   public final void setCachedSuperstep(long superstep) {
 795  0
     cachedSuperstep = superstep;
 796  0
   }
 797  
 
 798  
   /**
 799  
    * Set the cached application attempt (should only be used for restart from
 800  
    * failure by the master)
 801  
    *
 802  
    * @param applicationAttempt Will denote the new application attempt
 803  
    */
 804  
   public final void setApplicationAttempt(long applicationAttempt) {
 805  0
     cachedApplicationAttempt = applicationAttempt;
 806  0
     String superstepPath = getSuperstepPath(cachedApplicationAttempt);
 807  
     try {
 808  0
       getZkExt().createExt(superstepPath,
 809  
           null,
 810  
           Ids.OPEN_ACL_UNSAFE,
 811  
           CreateMode.PERSISTENT,
 812  
           true);
 813  0
     } catch (KeeperException.NodeExistsException e) {
 814  0
       throw new IllegalArgumentException(
 815  
           "setApplicationAttempt: Attempt already exists! - " +
 816  
               superstepPath, e);
 817  0
     } catch (KeeperException e) {
 818  0
       throw new RuntimeException(
 819  
           "setApplicationAttempt: KeeperException - " +
 820  
               superstepPath, e);
 821  0
     } catch (InterruptedException e) {
 822  0
       throw new RuntimeException(
 823  
           "setApplicationAttempt: InterruptedException - " +
 824  
               superstepPath, e);
 825  0
     }
 826  0
   }
 827  
 
 828  
   /**
 829  
    * Register a BspEvent.  Ensure that it will be signaled
 830  
    * by catastrophic failure so that threads waiting on an event signal
 831  
    * will be unblocked.
 832  
    *
 833  
    * @param event Event to be registered.
 834  
    */
 835  
   public void registerBspEvent(BspEvent event) {
 836  0
     registeredBspEvents.add(event);
 837  0
   }
 838  
 
 839  
   /**
 840  
    * Subclasses can use this to instantiate their respective partitioners
 841  
    *
 842  
    * @return Instantiated graph partitioner factory
 843  
    */
 844  
   protected GraphPartitionerFactory<I, V, E> getGraphPartitionerFactory() {
 845  0
     return graphPartitionerFactory;
 846  
   }
 847  
 
 848  
   /**
 849  
    * Derived classes that want additional ZooKeeper events to take action
 850  
    * should override this.
 851  
    *
 852  
    * @param event Event that occurred
 853  
    * @return true if the event was processed here, false otherwise
 854  
    */
 855  
   protected boolean processEvent(WatchedEvent event) {
 856  0
     return false;
 857  
   }
 858  
 
 859  
   @Override
 860  
   public final void process(WatchedEvent event) {
 861  
     // 1. Process all shared events
 862  
     // 2. Process specific derived class events
 863  0
     if (LOG.isDebugEnabled()) {
 864  0
       LOG.debug("process: Got a new event, path = " + event.getPath() +
 865  0
           ", type = " + event.getType() + ", state = " +
 866  0
           event.getState());
 867  
     }
 868  
 
 869  0
     if ((event.getPath() == null) && (event.getType() == EventType.None)) {
 870  0
       if (event.getState() == KeeperState.Disconnected) {
 871  
         // Watches may not be triggered for some time, so signal all BspEvents
 872  0
         for (BspEvent bspEvent : registeredBspEvents) {
 873  0
           bspEvent.signal();
 874  0
         }
 875  0
         LOG.warn("process: Disconnected from ZooKeeper (will automatically " +
 876  
             "try to recover) " + event);
 877  0
       } else if (event.getState() == KeeperState.SyncConnected) {
 878  0
         if (LOG.isInfoEnabled()) {
 879  0
           LOG.info("process: Asynchronous connection complete.");
 880  
         }
 881  0
         connectedEvent.signal();
 882  
       } else {
 883  0
         LOG.warn("process: Got unknown null path event " + event);
 884  
       }
 885  0
       return;
 886  
     }
 887  
 
 888  0
     boolean eventProcessed = false;
 889  0
     if (event.getPath().startsWith(masterJobStatePath)) {
 890  
       // This will cause all becomeMaster() MasterThreads to notice the
 891  
       // change in job state and quit trying to become the master.
 892  0
       masterElectionChildrenChanged.signal();
 893  0
       eventProcessed = true;
 894  0
     } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) ||
 895  0
         event.getPath().contains(WORKER_UNHEALTHY_DIR)) &&
 896  0
         (event.getType() == EventType.NodeChildrenChanged)) {
 897  0
       if (LOG.isDebugEnabled()) {
 898  0
         LOG.debug("process: workerHealthRegistrationChanged " +
 899  
             "(worker health reported - healthy/unhealthy )");
 900  
       }
 901  0
       workerHealthRegistrationChanged.signal();
 902  0
       eventProcessed = true;
 903  0
     } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) &&
 904  0
         event.getType() == EventType.NodeCreated) {
 905  0
       if (LOG.isInfoEnabled()) {
 906  0
         LOG.info("process: all input splits done");
 907  
       }
 908  0
       inputSplitsAllDoneEvent.signal();
 909  0
       eventProcessed = true;
 910  0
     } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) &&
 911  0
         event.getType() == EventType.NodeChildrenChanged) {
 912  0
       if (LOG.isDebugEnabled()) {
 913  0
         LOG.debug("process: worker done reading input splits");
 914  
       }
 915  0
       inputSplitsWorkerDoneEvent.signal();
 916  0
       eventProcessed = true;
 917  0
     } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
 918  0
         event.getType() == EventType.NodeCreated) {
 919  0
       if (LOG.isInfoEnabled()) {
 920  0
         LOG.info("process: superstepFinished signaled");
 921  
       }
 922  0
       superstepFinished.signal();
 923  0
       eventProcessed = true;
 924  0
     } else if (event.getPath().endsWith(applicationAttemptsPath) &&
 925  0
         event.getType() == EventType.NodeChildrenChanged) {
 926  0
       if (LOG.isInfoEnabled()) {
 927  0
         LOG.info("process: applicationAttemptChanged signaled");
 928  
       }
 929  0
       applicationAttemptChanged.signal();
 930  0
       eventProcessed = true;
 931  0
     } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
 932  0
         event.getType() == EventType.NodeChildrenChanged) {
 933  0
       if (LOG.isInfoEnabled()) {
 934  0
         LOG.info("process: masterElectionChildrenChanged signaled");
 935  
       }
 936  0
       masterElectionChildrenChanged.signal();
 937  0
       eventProcessed = true;
 938  0
     } else if (event.getPath().equals(cleanedUpPath) &&
 939  0
         event.getType() == EventType.NodeChildrenChanged) {
 940  0
       if (LOG.isInfoEnabled()) {
 941  0
         LOG.info("process: cleanedUpChildrenChanged signaled");
 942  
       }
 943  0
       cleanedUpChildrenChanged.signal();
 944  0
       eventProcessed = true;
 945  0
     } else if (event.getPath().endsWith(COUNTERS_DIR) &&
 946  0
             event.getType() == EventType.NodeChildrenChanged) {
 947  0
       LOG.info("process: writtenCountersToZK signaled");
 948  0
       getWrittenCountersToZKEvent().signal();
 949  0
       eventProcessed = true;
 950  
     }
 951  
 
 952  0
     if (!(processEvent(event)) && (!eventProcessed)) {
 953  0
       LOG.warn("process: Unknown and unprocessed event (path=" +
 954  0
           event.getPath() + ", type=" + event.getType() +
 955  0
           ", state=" + event.getState() + ")");
 956  
     }
 957  0
   }
 958  
 
 959  
   /**
 960  
    * Get the last saved superstep.
 961  
    *
 962  
    * @return Last good superstep number
 963  
    * @throws IOException
 964  
    */
 965  
   protected long getLastCheckpointedSuperstep() throws IOException {
 966  0
     return CheckpointingUtils.getLastCheckpointedSuperstep(getFs(),
 967  
         savedCheckpointBasePath);
 968  
   }
 969  
 
 970  
   @Override
 971  
   public JobProgressTracker getJobProgressTracker() {
 972  0
     return getGraphTaskManager().getJobProgressTracker();
 973  
   }
 974  
 
 975  
 
 976  
   /**
 977  
    * For every worker this method returns unique number
 978  
    * between 0 and N, where N is the total number of workers.
 979  
    * This number stays the same throughout the computation.
 980  
    * TaskID may be different from this number and task ID
 981  
    * is not necessarily continuous
 982  
    * @param workerInfo worker info object
 983  
    * @return worker number
 984  
    */
 985  
   protected int getWorkerId(WorkerInfo workerInfo) {
 986  0
     return getWorkerInfoList().indexOf(workerInfo);
 987  
   }
 988  
 
 989  
   /**
 990  
    * Returns worker info corresponding to specified worker id.
 991  
    * @param id unique worker id
 992  
    * @return WorkerInfo
 993  
    */
 994  
   protected WorkerInfo getWorkerInfoById(int id) {
 995  0
     return getWorkerInfoList().get(id);
 996  
   }
 997  
 }