Coverage Report - org.apache.giraph.master.BspServiceMaster
 
Classes in this File Line Coverage Branch Coverage Complexity
BspServiceMaster
0%
0/1036
0%
0/322
0
BspServiceMaster$1
0%
0/2
N/A
0
BspServiceMaster$2
0%
0/2
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.master;
 20  
 
 21  
 import com.google.common.collect.Lists;
 22  
 import com.google.common.collect.Sets;
 23  
 import net.iharder.Base64;
 24  
 import org.apache.commons.io.FilenameUtils;
 25  
 import org.apache.giraph.bsp.ApplicationState;
 26  
 import org.apache.giraph.bsp.BspInputFormat;
 27  
 import org.apache.giraph.bsp.BspService;
 28  
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 29  
 import org.apache.giraph.bsp.SuperstepState;
 30  
 import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 31  
 import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
 32  
 import org.apache.giraph.comm.MasterClient;
 33  
 import org.apache.giraph.comm.MasterServer;
 34  
 import org.apache.giraph.comm.netty.NettyClient;
 35  
 import org.apache.giraph.comm.netty.NettyMasterClient;
 36  
 import org.apache.giraph.comm.netty.NettyMasterServer;
 37  
 import org.apache.giraph.comm.requests.AddressesAndPartitionsRequest;
 38  
 import org.apache.giraph.conf.GiraphConfiguration;
 39  
 import org.apache.giraph.conf.GiraphConstants;
 40  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 41  
 import org.apache.giraph.counters.CustomCounter;
 42  
 import org.apache.giraph.counters.CustomCounters;
 43  
 import org.apache.giraph.counters.GiraphCountersThriftStruct;
 44  
 import org.apache.giraph.counters.GiraphStats;
 45  
 import org.apache.giraph.counters.GiraphTimers;
 46  
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 47  
 import org.apache.giraph.graph.GlobalStats;
 48  
 import org.apache.giraph.graph.GraphFunctions;
 49  
 import org.apache.giraph.graph.GraphState;
 50  
 import org.apache.giraph.graph.GraphTaskManager;
 51  
 import org.apache.giraph.io.EdgeInputFormat;
 52  
 import org.apache.giraph.io.GiraphInputFormat;
 53  
 import org.apache.giraph.io.InputType;
 54  
 import org.apache.giraph.io.MappingInputFormat;
 55  
 import org.apache.giraph.io.VertexInputFormat;
 56  
 import org.apache.giraph.master.input.MasterInputSplitsHandler;
 57  
 import org.apache.giraph.metrics.AggregatedMetrics;
 58  
 import org.apache.giraph.metrics.GiraphMetrics;
 59  
 import org.apache.giraph.metrics.GiraphTimer;
 60  
 import org.apache.giraph.metrics.GiraphTimerContext;
 61  
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 62  
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 63  
 import org.apache.giraph.metrics.WorkerSuperstepMetrics;
 64  
 import org.apache.giraph.partition.BasicPartitionOwner;
 65  
 import org.apache.giraph.partition.MasterGraphPartitioner;
 66  
 import org.apache.giraph.partition.PartitionOwner;
 67  
 import org.apache.giraph.partition.PartitionStats;
 68  
 import org.apache.giraph.partition.PartitionUtils;
 69  
 import org.apache.giraph.time.SystemTime;
 70  
 import org.apache.giraph.time.Time;
 71  
 import org.apache.giraph.utils.CheckpointingUtils;
 72  
 import org.apache.giraph.utils.JMapHistoDumper;
 73  
 import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 74  
 import org.apache.giraph.utils.ReflectionUtils;
 75  
 import org.apache.giraph.utils.WritableUtils;
 76  
 import org.apache.giraph.worker.WorkerInfo;
 77  
 import org.apache.giraph.zk.BspEvent;
 78  
 import org.apache.giraph.zk.PredicateLock;
 79  
 import org.apache.hadoop.fs.FSDataOutputStream;
 80  
 import org.apache.hadoop.fs.FileSystem;
 81  
 import org.apache.hadoop.fs.Path;
 82  
 import org.apache.hadoop.io.Writable;
 83  
 import org.apache.hadoop.io.WritableComparable;
 84  
 import org.apache.hadoop.mapred.JobID;
 85  
 import org.apache.hadoop.mapred.RunningJob;
 86  
 import org.apache.hadoop.mapreduce.Counter;
 87  
 import org.apache.hadoop.mapreduce.InputSplit;
 88  
 import org.apache.hadoop.mapreduce.Mapper;
 89  
 import org.apache.log4j.Logger;
 90  
 import org.apache.zookeeper.CreateMode;
 91  
 import org.apache.zookeeper.KeeperException;
 92  
 import org.apache.zookeeper.WatchedEvent;
 93  
 import org.apache.zookeeper.Watcher.Event.EventType;
 94  
 import org.apache.zookeeper.ZooDefs.Ids;
 95  
 import org.json.JSONArray;
 96  
 import org.json.JSONException;
 97  
 import org.json.JSONObject;
 98  
 
 99  
 import java.io.DataInputStream;
 100  
 import java.io.IOException;
 101  
 import java.io.PrintStream;
 102  
 import java.nio.charset.Charset;
 103  
 import java.util.ArrayList;
 104  
 import java.util.Collection;
 105  
 import java.util.Collections;
 106  
 import java.util.Comparator;
 107  
 import java.util.HashSet;
 108  
 import java.util.List;
 109  
 import java.util.Map;
 110  
 import java.util.Set;
 111  
 import java.util.TreeSet;
 112  
 import java.util.concurrent.TimeUnit;
 113  
 
 114  
 import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
 115  
 import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
 116  
 import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
 117  
 
 118  
 /**
 119  
  * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
 120  
  *
 121  
  * @param <I> Vertex id
 122  
  * @param <V> Vertex data
 123  
  * @param <E> Edge data
 124  
  */
 125  
 @SuppressWarnings("rawtypes, unchecked")
 126  
 public class BspServiceMaster<I extends WritableComparable,
 127  
     V extends Writable, E extends Writable>
 128  
     extends BspService<I, V, E>
 129  
     implements CentralizedServiceMaster<I, V, E>,
 130  
     ResetSuperstepMetricsObserver {
 131  
   /** Print worker names only if there are 10 workers left */
 132  
   public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
 133  
   /** How many threads to use when writing input splits to zookeeper*/
 134  
   public static final String NUM_MASTER_ZK_INPUT_SPLIT_THREADS =
 135  
       "giraph.numMasterZkInputSplitThreads";
 136  
   /** Default number of threads to use when writing input splits to zookeeper */
 137  
   public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
 138  
   /** Time instance to use for timing */
 139  0
   private static final Time TIME = SystemTime.get();
 140  
   /** Class logger */
 141  0
   private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
 142  
   /** Am I the master? */
 143  0
   private boolean isMaster = false;
 144  
   /** Max number of workers */
 145  
   private final int maxWorkers;
 146  
   /** Min number of workers */
 147  
   private final int minWorkers;
 148  
   /** Max number of supersteps */
 149  
   private final int maxNumberOfSupersteps;
 150  
   /** Min % responded workers */
 151  
   private final float minPercentResponded;
 152  
   /** Msecs to wait for an event */
 153  
   private final int eventWaitMsecs;
 154  
   /** Max msecs to wait for a superstep to get enough workers */
 155  
   private final int maxSuperstepWaitMsecs;
 156  
   /** Max msecs to wait for the workers to write their counters for a
 157  
    * superstep*/
 158  
   private final int maxCounterWaitMsecs;
 159  
   /** Min number of long tails before printing */
 160  
   private final int partitionLongTailMinPrint;
 161  
   /** Last finalized checkpoint */
 162  0
   private long lastCheckpointedSuperstep = -1;
 163  
   /** Worker wrote checkpoint */
 164  
   private final BspEvent workerWroteCheckpoint;
 165  
   /** State of the superstep changed */
 166  
   private final BspEvent superstepStateChanged;
 167  
   /** Master graph partitioner */
 168  
   private final MasterGraphPartitioner<I, V, E> masterGraphPartitioner;
 169  
   /** All the partition stats from the last superstep */
 170  0
   private final List<PartitionStats> allPartitionStatsList =
 171  
       new ArrayList<PartitionStats>();
 172  
   /** Handler for global communication */
 173  
   private MasterGlobalCommHandler globalCommHandler;
 174  
   /** Handler for aggregators to reduce/broadcast translation */
 175  
   private AggregatorToGlobalCommTranslation aggregatorTranslation;
 176  
   /** Master class */
 177  
   private MasterCompute masterCompute;
 178  
   /** IPC Client */
 179  
   private MasterClient masterClient;
 180  
   /** IPC Server */
 181  
   private MasterServer masterServer;
 182  
   /** Master info */
 183  
   private MasterInfo masterInfo;
 184  
   /** List of workers in current superstep, sorted by task id */
 185  0
   private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
 186  
   /** Observers over master lifecycle. */
 187  
   private final MasterObserver[] observers;
 188  
 
 189  
   // Per-Superstep Metrics
 190  
   /** MasterCompute time */
 191  
   private GiraphTimer masterComputeTimer;
 192  
 
 193  
   /** Checkpoint frequency */
 194  
   private final int checkpointFrequency;
 195  
   /** Current checkpoint status */
 196  
   private CheckpointStatus checkpointStatus;
 197  
   /** Checks if checkpointing supported */
 198  
   private final CheckpointSupportedChecker checkpointSupportedChecker;
 199  
   /** Thrift struct to store the aggregate counters */
 200  
   private final GiraphCountersThriftStruct giraphCountersThriftStruct;
 201  
 
 202  
   /**
 203  
    * Constructor for setting up the master.
 204  
    *
 205  
    * @param context Mapper context
 206  
    * @param graphTaskManager GraphTaskManager for this compute node
 207  
    */
 208  
   public BspServiceMaster(
 209  
       Mapper<?, ?, ?, ?>.Context context,
 210  
       GraphTaskManager<I, V, E> graphTaskManager) {
 211  0
     super(context, graphTaskManager);
 212  0
     workerWroteCheckpoint = new PredicateLock(context);
 213  0
     registerBspEvent(workerWroteCheckpoint);
 214  0
     superstepStateChanged = new PredicateLock(context);
 215  0
     registerBspEvent(superstepStateChanged);
 216  
 
 217  0
     ImmutableClassesGiraphConfiguration<I, V, E> conf =
 218  0
         getConfiguration();
 219  
 
 220  0
     maxWorkers = conf.getMaxWorkers();
 221  0
     minWorkers = conf.getMinWorkers();
 222  0
     maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps();
 223  0
     minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(conf);
 224  0
     eventWaitMsecs = conf.getEventWaitMsecs();
 225  0
     maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
 226  0
     maxCounterWaitMsecs = conf.getMaxCounterWaitMsecs();
 227  0
     partitionLongTailMinPrint = PARTITION_LONG_TAIL_MIN_PRINT.get(conf);
 228  0
     masterGraphPartitioner =
 229  0
         getGraphPartitionerFactory().createMasterGraphPartitioner();
 230  0
     if (conf.isJMapHistogramDumpEnabled()) {
 231  0
       conf.addMasterObserverClass(JMapHistoDumper.class);
 232  
     }
 233  0
     if (conf.isReactiveJmapHistogramDumpEnabled()) {
 234  0
       conf.addMasterObserverClass(ReactiveJMapHistoDumper.class);
 235  
     }
 236  0
     observers = conf.createMasterObservers(context);
 237  
 
 238  0
     this.checkpointFrequency = conf.getCheckpointFrequency();
 239  0
     this.checkpointStatus = CheckpointStatus.NONE;
 240  0
     this.checkpointSupportedChecker =
 241  0
         ReflectionUtils.newInstance(
 242  0
             GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.get(conf));
 243  0
     this.giraphCountersThriftStruct = new GiraphCountersThriftStruct();
 244  
 
 245  0
     GiraphMetrics.get().addSuperstepResetObserver(this);
 246  0
     GiraphStats.init(context);
 247  0
   }
 248  
 
 249  
   @Override
 250  
   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
 251  0
     masterComputeTimer = new GiraphTimer(superstepMetrics,
 252  
         "master-compute-call", TimeUnit.MILLISECONDS);
 253  0
   }
 254  
 
 255  
   @Override
 256  
   public void setJobState(ApplicationState state,
 257  
       long applicationAttempt,
 258  
       long desiredSuperstep) {
 259  0
     setJobState(state, applicationAttempt, desiredSuperstep, true);
 260  0
   }
 261  
 
 262  
   /**
 263  
    * Set the job state.
 264  
    *
 265  
    * @param state State of the application.
 266  
    * @param applicationAttempt Attempt to start on
 267  
    * @param desiredSuperstep Superstep to restart from (if applicable)
 268  
    * @param killJobOnFailure if true, and the desired state is FAILED,
 269  
    *                         then kill this job.
 270  
    */
 271  
   private void setJobState(ApplicationState state,
 272  
       long applicationAttempt,
 273  
       long desiredSuperstep,
 274  
       boolean killJobOnFailure) {
 275  0
     JSONObject jobState = new JSONObject();
 276  
     try {
 277  0
       jobState.put(JSONOBJ_STATE_KEY, state.toString());
 278  0
       jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
 279  0
       jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
 280  0
     } catch (JSONException e) {
 281  0
       throw new RuntimeException("setJobState: Couldn't put " +
 282  0
           state.toString());
 283  0
     }
 284  0
     if (LOG.isInfoEnabled()) {
 285  0
       LOG.info("setJobState: " + jobState.toString() + " on superstep " +
 286  0
           getSuperstep());
 287  
     }
 288  
     try {
 289  0
       getZkExt().createExt(masterJobStatePath + "/jobState",
 290  0
           jobState.toString().getBytes(Charset.defaultCharset()),
 291  
           Ids.OPEN_ACL_UNSAFE,
 292  
           CreateMode.PERSISTENT_SEQUENTIAL,
 293  
           true);
 294  0
       LOG.info("setJobState: " + jobState);
 295  0
     } catch (KeeperException.NodeExistsException e) {
 296  0
       throw new IllegalStateException(
 297  
           "setJobState: Imposible that " +
 298  
               masterJobStatePath + " already exists!", e);
 299  0
     } catch (KeeperException e) {
 300  0
       throw new IllegalStateException(
 301  
           "setJobState: Unknown KeeperException for " +
 302  
               masterJobStatePath, e);
 303  0
     } catch (InterruptedException e) {
 304  0
       throw new IllegalStateException(
 305  
           "setJobState: Unknown InterruptedException for " +
 306  
               masterJobStatePath, e);
 307  0
     }
 308  0
     if (state == ApplicationState.FAILED && killJobOnFailure) {
 309  0
       failJob(new IllegalStateException("FAILED"));
 310  
     }
 311  
 
 312  0
   }
 313  
 
 314  
   /**
 315  
    * Set the job state to FAILED. This will kill the job, and log exceptions to
 316  
    * any observers.
 317  
    *
 318  
    * @param reason The reason the job failed
 319  
    */
 320  
   private void setJobStateFailed(String reason) {
 321  0
     getGraphTaskManager().getJobProgressTracker().logFailure(reason);
 322  0
     setJobState(ApplicationState.FAILED, -1, -1, false);
 323  0
     failJob(new IllegalStateException(reason));
 324  0
   }
 325  
 
 326  
   /**
 327  
    * Common method for generating vertex/edge input splits.
 328  
    *
 329  
    * @param inputFormat The vertex/edge input format
 330  
    * @param minSplitCountHint Minimum number of splits to create (hint)
 331  
    * @param inputSplitType Type of input splits (for logging purposes)
 332  
    * @return List of input splits for the given format
 333  
    */
 334  
   private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
 335  
                                                int minSplitCountHint,
 336  
                                                InputType inputSplitType) {
 337  0
     String logPrefix = "generate" + inputSplitType + "InputSplits";
 338  
     List<InputSplit> splits;
 339  
     try {
 340  0
       splits = inputFormat.getSplits(getContext(), minSplitCountHint);
 341  0
     } catch (IOException e) {
 342  0
       throw new IllegalStateException(logPrefix + ": Got IOException", e);
 343  0
     } catch (InterruptedException e) {
 344  0
       throw new IllegalStateException(
 345  
           logPrefix + ": Got InterruptedException", e);
 346  0
     }
 347  0
     float samplePercent =
 348  0
         INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration());
 349  0
     if (samplePercent != INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) {
 350  0
       int lastIndex = (int) (samplePercent * splits.size() / 100f);
 351  0
       Collections.shuffle(splits);
 352  0
       List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
 353  0
       LOG.warn(logPrefix + ": Using sampling - Processing only " +
 354  0
           sampleSplits.size() + " instead of " + splits.size() +
 355  
           " expected splits.");
 356  0
       return sampleSplits;
 357  
     } else {
 358  0
       if (LOG.isInfoEnabled()) {
 359  0
         LOG.info(logPrefix + ": Got " + splits.size() +
 360  
             " input splits for " + minSplitCountHint + " input threads");
 361  
       }
 362  0
       return splits;
 363  
     }
 364  
   }
 365  
 
 366  
   /**
 367  
    * When there is no salvaging this job, fail it.
 368  
    *
 369  
    * @param e Exception to log to observers
 370  
    */
 371  
   private void failJob(Exception e) {
 372  0
     LOG.fatal("failJob: Killing job " + getJobId());
 373  0
     LOG.fatal("failJob: exception " + e.toString());
 374  
     try {
 375  0
       if (getConfiguration().isPureYarnJob()) {
 376  0
         throw new RuntimeException(
 377  
           "BspServiceMaster (YARN profile) is " +
 378  
           "FAILING this task, throwing exception to end job run.", e);
 379  
       } else {
 380  
         @SuppressWarnings("deprecation")
 381  0
         org.apache.hadoop.mapred.JobClient jobClient =
 382  
           new org.apache.hadoop.mapred.JobClient(
 383  
             (org.apache.hadoop.mapred.JobConf)
 384  0
             getContext().getConfiguration());
 385  
         try {
 386  
           @SuppressWarnings("deprecation")
 387  0
           JobID jobId = JobID.forName(getJobId());
 388  0
           RunningJob job = jobClient.getJob(jobId);
 389  0
           if (job != null) {
 390  0
             job.killJob();
 391  
           } else {
 392  0
             LOG.error("Job not found for jobId=" + getJobId());
 393  
           }
 394  0
         } catch (IllegalArgumentException iae) {
 395  0
           LOG.info("This job (" + getJobId() +
 396  
                        ") is not a legacy Hadoop job and will " +
 397  
                        "continue with failure cleanup." +
 398  0
                        e.getMessage(),
 399  
                    e);
 400  0
         }
 401  
       }
 402  0
     } catch (IOException ioe) {
 403  0
       throw new RuntimeException(ioe);
 404  
     } finally {
 405  0
       failureCleanup(e);
 406  0
     }
 407  0
   }
 408  
 
 409  
   /**
 410  
    * Parse the {@link WorkerInfo} objects from a ZooKeeper path
 411  
    * (and children).
 412  
    *
 413  
    * @param workerInfosPath Path where all the workers are children
 414  
    * @param watch Watch or not?
 415  
    * @return List of workers in that path
 416  
    */
 417  
   private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
 418  
       boolean watch) {
 419  0
     List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
 420  
     List<String> workerInfoPathList;
 421  
     try {
 422  0
       workerInfoPathList =
 423  0
           getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
 424  0
     } catch (KeeperException e) {
 425  0
       throw new IllegalStateException(
 426  
           "getWorkers: Got KeeperException", e);
 427  0
     } catch (InterruptedException e) {
 428  0
       throw new IllegalStateException(
 429  
           "getWorkers: Got InterruptedStateException", e);
 430  0
     }
 431  0
     for (String workerInfoPath : workerInfoPathList) {
 432  0
       WorkerInfo workerInfo = new WorkerInfo();
 433  
       try {
 434  0
         WritableUtils.readFieldsFromZnode(
 435  0
             getZkExt(), workerInfoPath, true, null, workerInfo);
 436  0
         workerInfoList.add(workerInfo);
 437  0
       } catch (IllegalStateException e) {
 438  0
         LOG.warn("Can't get info from worker, did it die in between? " +
 439  
             "workerInfoPath=" + workerInfoPath, e);
 440  0
       }
 441  0
     }
 442  0
     return workerInfoList;
 443  
   }
 444  
 
 445  
   /**
 446  
    * Get the healthy and unhealthy {@link WorkerInfo} objects for
 447  
    * a superstep
 448  
    *
 449  
    * @param superstep superstep to check
 450  
    * @param healthyWorkerInfoList filled in with current data
 451  
    * @param unhealthyWorkerInfoList filled in with current data
 452  
    */
 453  
   private void getAllWorkerInfos(
 454  
       long superstep,
 455  
       List<WorkerInfo> healthyWorkerInfoList,
 456  
       List<WorkerInfo> unhealthyWorkerInfoList) {
 457  0
     String healthyWorkerInfoPath =
 458  0
         getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
 459  0
     String unhealthyWorkerInfoPath =
 460  0
         getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
 461  
 
 462  
     try {
 463  0
       getZkExt().createOnceExt(healthyWorkerInfoPath,
 464  
           null,
 465  
           Ids.OPEN_ACL_UNSAFE,
 466  
           CreateMode.PERSISTENT,
 467  
           true);
 468  0
     } catch (KeeperException e) {
 469  0
       throw new IllegalStateException("getWorkers: KeeperException", e);
 470  0
     } catch (InterruptedException e) {
 471  0
       throw new IllegalStateException("getWorkers: InterruptedException", e);
 472  0
     }
 473  
 
 474  
     try {
 475  0
       getZkExt().createOnceExt(unhealthyWorkerInfoPath,
 476  
           null,
 477  
           Ids.OPEN_ACL_UNSAFE,
 478  
           CreateMode.PERSISTENT,
 479  
           true);
 480  0
     } catch (KeeperException e) {
 481  0
       throw new IllegalStateException("getWorkers: KeeperException", e);
 482  0
     } catch (InterruptedException e) {
 483  0
       throw new IllegalStateException("getWorkers: InterruptedException", e);
 484  0
     }
 485  
 
 486  0
     List<WorkerInfo> currentHealthyWorkerInfoList =
 487  0
         getWorkerInfosFromPath(healthyWorkerInfoPath, true);
 488  0
     List<WorkerInfo> currentUnhealthyWorkerInfoList =
 489  0
         getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
 490  
 
 491  0
     healthyWorkerInfoList.clear();
 492  0
     if (currentHealthyWorkerInfoList != null) {
 493  
       for (WorkerInfo healthyWorkerInfo :
 494  0
         currentHealthyWorkerInfoList) {
 495  0
         healthyWorkerInfoList.add(healthyWorkerInfo);
 496  0
       }
 497  
     }
 498  
 
 499  0
     unhealthyWorkerInfoList.clear();
 500  0
     if (currentUnhealthyWorkerInfoList != null) {
 501  
       for (WorkerInfo unhealthyWorkerInfo :
 502  0
         currentUnhealthyWorkerInfoList) {
 503  0
         unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
 504  0
       }
 505  
     }
 506  0
   }
 507  
 
 508  
   @Override
 509  
   public List<WorkerInfo> checkWorkers() {
 510  0
     boolean failJob = true;
 511  
     long failWorkerCheckMsecs =
 512  0
         SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;
 513  0
     List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
 514  0
     List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
 515  0
     int totalResponses = -1;
 516  0
     while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) {
 517  0
       getContext().progress();
 518  0
       getAllWorkerInfos(
 519  0
           getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
 520  0
       totalResponses = healthyWorkerInfoList.size() +
 521  0
           unhealthyWorkerInfoList.size();
 522  0
       if ((totalResponses * 100.0f / maxWorkers) >=
 523  
           minPercentResponded) {
 524  0
         failJob = false;
 525  0
         break;
 526  
       }
 527  0
       getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
 528  
           "checkWorkers: Only found " +
 529  
           totalResponses +
 530  
           " responses of " + maxWorkers +
 531  
           " needed to start superstep " +
 532  0
           getSuperstep());
 533  0
       if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
 534  
           eventWaitMsecs)) {
 535  0
         if (LOG.isDebugEnabled()) {
 536  0
           LOG.debug("checkWorkers: Got event that health " +
 537  
               "registration changed, not using poll attempt");
 538  
         }
 539  0
         getWorkerHealthRegistrationChangedEvent().reset();
 540  0
         continue;
 541  
       }
 542  0
       if (LOG.isInfoEnabled()) {
 543  0
         LOG.info("checkWorkers: Only found " + totalResponses +
 544  
             " responses of " + maxWorkers +
 545  
             " needed to start superstep " +
 546  0
             getSuperstep() + ".  Reporting every " +
 547  
             eventWaitMsecs + " msecs, " +
 548  0
             (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
 549  
             " more msecs left before giving up.");
 550  
         // Find the missing workers if there are only a few
 551  0
         if ((maxWorkers - totalResponses) <=
 552  
             partitionLongTailMinPrint) {
 553  0
           logMissingWorkersOnSuperstep(healthyWorkerInfoList,
 554  
               unhealthyWorkerInfoList);
 555  
         }
 556  
       }
 557  
     }
 558  0
     if (failJob) {
 559  0
       LOG.error("checkWorkers: Did not receive enough processes in " +
 560  
           "time (only " + totalResponses + " of " +
 561  
           minWorkers + " required) after waiting " + maxSuperstepWaitMsecs +
 562  
           "msecs).  This occurs if you do not have enough map tasks " +
 563  
           "available simultaneously on your Hadoop instance to fulfill " +
 564  
           "the number of requested workers.");
 565  0
       return null;
 566  
     }
 567  
 
 568  0
     if (healthyWorkerInfoList.size() < minWorkers) {
 569  0
       LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
 570  
           " available when " + minWorkers + " are required.");
 571  0
       logMissingWorkersOnSuperstep(healthyWorkerInfoList,
 572  
           unhealthyWorkerInfoList);
 573  0
       return null;
 574  
     }
 575  
 
 576  0
     getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
 577  
         "checkWorkers: Done - Found " + totalResponses +
 578  
         " responses of " + maxWorkers + " needed to start superstep " +
 579  0
         getSuperstep());
 580  
 
 581  0
     return healthyWorkerInfoList;
 582  
   }
 583  
 
 584  
   /**
 585  
    * Log info level of the missing workers on the superstep
 586  
    *
 587  
    * @param healthyWorkerInfoList Healthy worker list
 588  
    * @param unhealthyWorkerInfoList Unhealthy worker list
 589  
    */
 590  
   private void logMissingWorkersOnSuperstep(
 591  
       List<WorkerInfo> healthyWorkerInfoList,
 592  
       List<WorkerInfo> unhealthyWorkerInfoList) {
 593  0
     if (LOG.isInfoEnabled()) {
 594  0
       Set<Integer> partitionSet = new TreeSet<Integer>();
 595  0
       for (WorkerInfo workerInfo : healthyWorkerInfoList) {
 596  0
         partitionSet.add(workerInfo.getTaskId() % maxWorkers);
 597  0
       }
 598  0
       for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
 599  0
         partitionSet.add(workerInfo.getTaskId() % maxWorkers);
 600  0
       }
 601  0
       for (int i = 1; i <= maxWorkers; ++i) {
 602  0
         if (partitionSet.contains(Integer.valueOf(i))) {
 603  0
           continue;
 604  0
         } else if (i == getTaskId() % maxWorkers) {
 605  0
           continue;
 606  
         } else {
 607  0
           LOG.info("logMissingWorkersOnSuperstep: No response from " +
 608  
               "partition " + i + " (could be master)");
 609  
         }
 610  
       }
 611  
     }
 612  0
   }
 613  
 
 614  
   /**
 615  
    * Common method for creating vertex/edge input splits.
 616  
    *
 617  
    * @param inputFormat The vertex/edge input format
 618  
    * @param inputSplitType Type of input split (for logging purposes)
 619  
    * @return Number of splits. Returns -1 on failure to create
 620  
    *         valid input splits.
 621  
    */
 622  
   private int createInputSplits(GiraphInputFormat inputFormat,
 623  
                                 InputType inputSplitType) {
 624  0
     ImmutableClassesGiraphConfiguration conf = getConfiguration();
 625  0
     String logPrefix = "create" + inputSplitType + "InputSplits";
 626  
     // Only the 'master' should be doing this.  Wait until the number of
 627  
     // processes that have reported health exceeds the minimum percentage.
 628  
     // If the minimum percentage is not met, fail the job.  Otherwise
 629  
     // generate the input splits
 630  0
     List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
 631  0
     if (healthyWorkerInfoList == null) {
 632  0
       setJobStateFailed("Not enough healthy workers to create input splits");
 633  0
       return -1;
 634  
     }
 635  0
     globalCommHandler.getInputSplitsHandler().initialize(masterClient,
 636  
         healthyWorkerInfoList);
 637  
 
 638  
     // Create at least as many splits as the total number of input threads.
 639  0
     int minSplitCountHint = healthyWorkerInfoList.size() *
 640  0
         conf.getNumInputSplitsThreads();
 641  
 
 642  
     // Note that the input splits may only be a sample if
 643  
     // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
 644  0
     List<InputSplit> splitList = generateInputSplits(inputFormat,
 645  
         minSplitCountHint, inputSplitType);
 646  
 
 647  0
     if (splitList.isEmpty() && GiraphConstants.FAIL_ON_EMPTY_INPUT.get(conf)) {
 648  0
       LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " +
 649  0
           "check input of " + inputFormat.getClass().getName() + "!");
 650  0
       getContext().setStatus("Failing job due to 0 input splits, " +
 651  0
           "check input of " + inputFormat.getClass().getName() + "!");
 652  0
       setJobStateFailed("******* PLEASE CHECK YOUR INPUT TABLES - PARTITIONS " +
 653  
           "WHICH YOU SPECIFIED ARE MISSING (for " + inputSplitType +
 654  
           " input). FAILING THE JOB *******");
 655  
     }
 656  0
     if (minSplitCountHint > splitList.size()) {
 657  0
       LOG.warn(logPrefix + ": Number of inputSplits=" +
 658  0
           splitList.size() + " < " +
 659  
           minSplitCountHint +
 660  
           "=total number of input threads, " +
 661  
           "some threads will be not used");
 662  
     }
 663  
 
 664  0
     globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType,
 665  
         splitList, inputFormat);
 666  
 
 667  0
     return splitList.size();
 668  
   }
 669  
 
 670  
   @Override
 671  
   public int createMappingInputSplits() {
 672  0
     if (!getConfiguration().hasMappingInputFormat()) {
 673  0
       return 0;
 674  
     }
 675  0
     MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
 676  0
       getConfiguration().createWrappedMappingInputFormat();
 677  0
     return createInputSplits(mappingInputFormat, InputType.MAPPING);
 678  
   }
 679  
 
 680  
   @Override
 681  
   public int createVertexInputSplits() {
 682  0
     int splits = 0;
 683  0
     if (getConfiguration().hasVertexInputFormat()) {
 684  0
       VertexInputFormat<I, V, E> vertexInputFormat =
 685  0
           getConfiguration().createWrappedVertexInputFormat();
 686  0
       splits = createInputSplits(vertexInputFormat, InputType.VERTEX);
 687  
     }
 688  0
     MasterProgress.get().setVertexInputSplitCount(splits);
 689  0
     getJobProgressTracker().updateMasterProgress(MasterProgress.get());
 690  0
     return splits;
 691  
   }
 692  
 
 693  
   @Override
 694  
   public int createEdgeInputSplits() {
 695  0
     int splits = 0;
 696  0
     if (getConfiguration().hasEdgeInputFormat()) {
 697  0
       EdgeInputFormat<I, E> edgeInputFormat =
 698  0
           getConfiguration().createWrappedEdgeInputFormat();
 699  0
       splits = createInputSplits(edgeInputFormat, InputType.EDGE);
 700  
     }
 701  0
     MasterProgress.get().setEdgeInputSplitsCount(splits);
 702  0
     getJobProgressTracker().updateMasterProgress(MasterProgress.get());
 703  0
     return splits;
 704  
   }
 705  
 
 706  
   @Override
 707  
   public List<WorkerInfo> getWorkerInfoList() {
 708  0
     return chosenWorkerInfoList;
 709  
   }
 710  
 
 711  
   @Override
 712  
   public MasterGlobalCommHandler getGlobalCommHandler() {
 713  0
     return globalCommHandler;
 714  
   }
 715  
 
 716  
   @Override
 717  
   public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() {
 718  0
     return aggregatorTranslation;
 719  
   }
 720  
 
 721  
   @Override
 722  
   public MasterCompute getMasterCompute() {
 723  0
     return masterCompute;
 724  
   }
 725  
 
 726  
   /**
 727  
    * Read the finalized checkpoint file and associated metadata files for the
 728  
    * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
 729  
    * checkpoint prefixes.  It is an optimization to prevent all workers from
 730  
    * searching all the files.  Also read in the aggregator data from the
 731  
    * finalized checkpoint file and setting it.
 732  
    *
 733  
    * @param superstep Checkpoint set to examine.
 734  
    * @throws IOException
 735  
    * @throws InterruptedException
 736  
    * @throws KeeperException
 737  
    * @return Collection of generated partition owners.
 738  
    */
 739  
   private Collection<PartitionOwner> prepareCheckpointRestart(long superstep)
 740  
     throws IOException, KeeperException, InterruptedException {
 741  0
     List<PartitionOwner> partitionOwners = new ArrayList<>();
 742  0
     FileSystem fs = getFs();
 743  0
     String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
 744  
         CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
 745  0
     LOG.info("Loading checkpoint from " + finalizedCheckpointPath);
 746  0
     DataInputStream finalizedStream =
 747  0
         fs.open(new Path(finalizedCheckpointPath));
 748  0
     GlobalStats globalStats = new GlobalStats();
 749  0
     globalStats.readFields(finalizedStream);
 750  0
     updateCounters(globalStats);
 751  0
     SuperstepClasses superstepClasses =
 752  0
         SuperstepClasses.createToRead(getConfiguration());
 753  0
     superstepClasses.readFields(finalizedStream);
 754  0
     getConfiguration().updateSuperstepClasses(superstepClasses);
 755  0
     int prefixFileCount = finalizedStream.readInt();
 756  
 
 757  0
     String checkpointFile =
 758  0
         finalizedStream.readUTF();
 759  0
     for (int i = 0; i < prefixFileCount; ++i) {
 760  0
       int mrTaskId = finalizedStream.readInt();
 761  
 
 762  0
       DataInputStream metadataStream = fs.open(new Path(checkpointFile +
 763  
           "." + mrTaskId + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX));
 764  0
       long partitions = metadataStream.readInt();
 765  0
       WorkerInfo worker = getWorkerInfoById(mrTaskId);
 766  0
       for (long p = 0; p < partitions; ++p) {
 767  0
         int partitionId = metadataStream.readInt();
 768  0
         PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId,
 769  
             worker);
 770  0
         partitionOwners.add(partitionOwner);
 771  0
         LOG.info("prepareCheckpointRestart partitionId=" + partitionId +
 772  
             " assigned to " + partitionOwner);
 773  
       }
 774  0
       metadataStream.close();
 775  
     }
 776  
     //Ordering appears to be important as of right now we rely on this ordering
 777  
     //in WorkerGraphPartitioner
 778  0
     Collections.sort(partitionOwners, new Comparator<PartitionOwner>() {
 779  
       @Override
 780  
       public int compare(PartitionOwner p1, PartitionOwner p2) {
 781  0
         return Integer.compare(p1.getPartitionId(), p2.getPartitionId());
 782  
       }
 783  
     });
 784  
 
 785  
 
 786  0
     globalCommHandler.getAggregatorHandler().readFields(finalizedStream);
 787  0
     aggregatorTranslation.readFields(finalizedStream);
 788  0
     masterCompute.readFields(finalizedStream);
 789  0
     finalizedStream.close();
 790  
 
 791  0
     return partitionOwners;
 792  
   }
 793  
 
 794  
   @Override
 795  
   public void setup() {
 796  
     // Might have to manually load a checkpoint.
 797  
     // In that case, the input splits are not set, they will be faked by
 798  
     // the checkpoint files.  Each checkpoint file will be an input split
 799  
     // and the input split
 800  
 
 801  0
     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
 802  0
       GiraphStats.getInstance().getSuperstepCounter().
 803  0
         setValue(getRestartedSuperstep());
 804  
     }
 805  0
     for (MasterObserver observer : observers) {
 806  0
       observer.preApplication();
 807  0
       getContext().progress();
 808  
     }
 809  0
   }
 810  
 
 811  
   @Override
 812  
   public boolean becomeMaster() {
 813  
     // Create my bid to become the master, then try to become the worker
 814  
     // or return false.
 815  0
     String myBid = null;
 816  
     try {
 817  0
       myBid =
 818  0
           getZkExt().createExt(masterElectionPath +
 819  0
               "/" + getHostnameTaskId(),
 820  
               null,
 821  
               Ids.OPEN_ACL_UNSAFE,
 822  
               CreateMode.EPHEMERAL_SEQUENTIAL,
 823  
               true);
 824  0
     } catch (KeeperException e) {
 825  0
       throw new IllegalStateException(
 826  
           "becomeMaster: KeeperException", e);
 827  0
     } catch (InterruptedException e) {
 828  0
       throw new IllegalStateException(
 829  
           "becomeMaster: IllegalStateException", e);
 830  0
     }
 831  
     while (true) {
 832  0
       JSONObject jobState = getJobState();
 833  
       try {
 834  0
         if ((jobState != null) &&
 835  0
             ApplicationState.valueOf(
 836  0
                 jobState.getString(JSONOBJ_STATE_KEY)) ==
 837  
                 ApplicationState.FINISHED) {
 838  0
           LOG.info("becomeMaster: Job is finished, " +
 839  
               "give up trying to be the master!");
 840  0
           isMaster = false;
 841  0
           return isMaster;
 842  
         }
 843  0
       } catch (JSONException e) {
 844  0
         throw new IllegalStateException(
 845  
             "becomeMaster: Couldn't get state from " + jobState, e);
 846  0
       }
 847  
       try {
 848  0
         List<String> masterChildArr =
 849  0
             getZkExt().getChildrenExt(
 850  
                 masterElectionPath, true, true, true);
 851  0
         if (LOG.isInfoEnabled()) {
 852  0
           LOG.info("becomeMaster: First child is '" +
 853  0
               masterChildArr.get(0) + "' and my bid is '" +
 854  
               myBid + "'");
 855  
         }
 856  0
         if (masterChildArr.get(0).equals(myBid)) {
 857  0
           GiraphStats.getInstance().getCurrentMasterTaskPartition().
 858  0
               setValue(getTaskId());
 859  
 
 860  0
           globalCommHandler = new MasterGlobalCommHandler(
 861  0
               new MasterAggregatorHandler(getConfiguration(), getContext()),
 862  
               new MasterInputSplitsHandler(
 863  0
                   getConfiguration().useInputSplitLocality(), getContext()));
 864  0
           aggregatorTranslation = new AggregatorToGlobalCommTranslation(
 865  0
               getConfiguration(), globalCommHandler);
 866  
 
 867  0
           globalCommHandler.getAggregatorHandler().initialize(this);
 868  0
           masterCompute = getConfiguration().createMasterCompute();
 869  0
           masterCompute.setMasterService(this);
 870  
 
 871  0
           masterInfo = new MasterInfo();
 872  0
           masterServer =
 873  0
               new NettyMasterServer(getConfiguration(), this, getContext(),
 874  0
                   getGraphTaskManager().createUncaughtExceptionHandler());
 875  0
           masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
 876  0
               masterServer.getLocalHostOrIp());
 877  0
           masterInfo.setTaskId(getTaskId());
 878  0
           masterClient =
 879  0
               new NettyMasterClient(getContext(), getConfiguration(), this,
 880  0
                   getGraphTaskManager().createUncaughtExceptionHandler());
 881  0
           masterServer.setFlowControl(masterClient.getFlowControl());
 882  
 
 883  0
           if (LOG.isInfoEnabled()) {
 884  0
             LOG.info("becomeMaster: I am now the master!");
 885  
           }
 886  0
           isMaster = true;
 887  0
           return isMaster;
 888  
         }
 889  0
         LOG.info("becomeMaster: Waiting to become the master...");
 890  0
         getMasterElectionChildrenChangedEvent().waitForTimeoutOrFail(
 891  0
             GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
 892  0
                 getConfiguration()));
 893  0
         getMasterElectionChildrenChangedEvent().reset();
 894  0
       } catch (KeeperException e) {
 895  0
         throw new IllegalStateException(
 896  
             "becomeMaster: KeeperException", e);
 897  0
       } catch (InterruptedException e) {
 898  0
         throw new IllegalStateException(
 899  
             "becomeMaster: IllegalStateException", e);
 900  0
       }
 901  0
     }
 902  
   }
 903  
 
 904  
   @Override
 905  
   public MasterInfo getMasterInfo() {
 906  0
     return masterInfo;
 907  
   }
 908  
 
 909  
   /**
 910  
    * Collect and aggregate the worker statistics for a particular superstep.
 911  
    *
 912  
    * @param superstep Superstep to aggregate on
 913  
    * @return Global statistics aggregated on all worker statistics
 914  
    */
 915  
   private GlobalStats aggregateWorkerStats(long superstep) {
 916  0
     ImmutableClassesGiraphConfiguration conf = getConfiguration();
 917  
 
 918  0
     GlobalStats globalStats = new GlobalStats();
 919  
     // Get the stats from the all the worker selected nodes
 920  0
     String workerFinishedPath =
 921  0
         getWorkerMetricsFinishedPath(getApplicationAttempt(), superstep);
 922  0
     List<String> workerFinishedPathList = null;
 923  
     try {
 924  0
       workerFinishedPathList =
 925  0
           getZkExt().getChildrenExt(
 926  
               workerFinishedPath, false, false, true);
 927  0
     } catch (KeeperException e) {
 928  0
       throw new IllegalStateException(
 929  
           "aggregateWorkerStats: KeeperException", e);
 930  0
     } catch (InterruptedException e) {
 931  0
       throw new IllegalStateException(
 932  
           "aggregateWorkerStats: InterruptedException", e);
 933  0
     }
 934  
 
 935  0
     AggregatedMetrics aggregatedMetrics = new AggregatedMetrics();
 936  
 
 937  0
     for (String finishedPath : workerFinishedPathList) {
 938  0
       String hostnamePartitionId = FilenameUtils.getName(finishedPath);
 939  0
       JSONObject workerFinishedInfoObj = null;
 940  
       try {
 941  0
         byte [] zkData =
 942  0
             getZkExt().getData(finishedPath, false, null);
 943  0
         workerFinishedInfoObj = new JSONObject(new String(zkData,
 944  0
             Charset.defaultCharset()));
 945  0
         globalStats.addMessageCount(
 946  0
             workerFinishedInfoObj.getLong(
 947  
                 JSONOBJ_NUM_MESSAGES_KEY));
 948  0
         globalStats.addMessageBytesCount(
 949  0
           workerFinishedInfoObj.getLong(
 950  
               JSONOBJ_NUM_MESSAGE_BYTES_KEY));
 951  0
         if (conf.metricsEnabled() &&
 952  0
             workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) {
 953  0
           WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics();
 954  0
           WritableUtils.readFieldsFromByteArray(
 955  0
               Base64.decode(
 956  0
                   workerFinishedInfoObj.getString(
 957  
                       JSONOBJ_METRICS_KEY)),
 958  
               workerMetrics);
 959  0
           globalStats.addOocLoadBytesCount(
 960  0
               workerMetrics.getBytesLoadedFromDisk());
 961  0
           globalStats.addOocStoreBytesCount(
 962  0
               workerMetrics.getBytesStoredOnDisk());
 963  
           // Find the lowest percentage of graph in memory across all workers
 964  
           // for one superstep
 965  0
           globalStats.setLowestGraphPercentageInMemory(
 966  0
               Math.min(globalStats.getLowestGraphPercentageInMemory(),
 967  0
                   (int) Math.round(
 968  0
                       workerMetrics.getGraphPercentageInMemory())));
 969  0
           aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
 970  
         }
 971  0
       } catch (JSONException e) {
 972  0
         throw new IllegalStateException(
 973  
             "aggregateWorkerStats: JSONException", e);
 974  0
       } catch (KeeperException e) {
 975  0
         throw new IllegalStateException(
 976  
             "aggregateWorkerStats: KeeperException", e);
 977  0
       } catch (InterruptedException e) {
 978  0
         throw new IllegalStateException(
 979  
             "aggregateWorkerStats: InterruptedException", e);
 980  0
       } catch (IOException e) {
 981  0
         throw new IllegalStateException(
 982  
             "aggregateWorkerStats: IOException", e);
 983  0
       }
 984  0
     }
 985  
 
 986  0
     allPartitionStatsList.clear();
 987  0
     Iterable<PartitionStats> statsList = globalCommHandler.getAllPartitionStats(
 988  0
         workerFinishedPathList.size(), getContext());
 989  0
     for (PartitionStats partitionStats : statsList) {
 990  0
       globalStats.addPartitionStats(partitionStats);
 991  0
       allPartitionStatsList.add(partitionStats);
 992  0
     }
 993  
 
 994  0
     if (conf.metricsEnabled()) {
 995  0
       if (GiraphConstants.METRICS_DIRECTORY.isDefaultValue(conf)) {
 996  0
         aggregatedMetrics.print(superstep, System.err);
 997  
       } else {
 998  0
         printAggregatedMetricsToHDFS(superstep, aggregatedMetrics);
 999  
       }
 1000  0
       for (MasterObserver observer : observers) {
 1001  0
         observer.superstepMetricsUpdate(
 1002  
             superstep, aggregatedMetrics, allPartitionStatsList);
 1003  
       }
 1004  
     }
 1005  
 
 1006  0
     if (LOG.isInfoEnabled()) {
 1007  0
       LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
 1008  0
           " on superstep = " + getSuperstep());
 1009  
     }
 1010  0
     return globalStats;
 1011  
   }
 1012  
 
 1013  
   /**
 1014  
    * Write superstep metrics to own file in HDFS
 1015  
    * @param superstep the current superstep
 1016  
    * @param aggregatedMetrics the aggregated metrics to write
 1017  
    */
 1018  
   private void printAggregatedMetricsToHDFS(
 1019  
       long superstep, AggregatedMetrics aggregatedMetrics) {
 1020  0
     ImmutableClassesGiraphConfiguration conf = getConfiguration();
 1021  0
     PrintStream out = null;
 1022  0
     Path dir = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf));
 1023  0
     Path outFile = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf) +
 1024  
         Path.SEPARATOR_CHAR + "superstep_" + superstep + ".metrics");
 1025  
     try {
 1026  
       FileSystem fs;
 1027  0
       fs = FileSystem.get(conf);
 1028  0
       if (!fs.exists(dir)) {
 1029  0
         fs.mkdirs(dir);
 1030  
       }
 1031  0
       if (fs.exists(outFile)) {
 1032  0
         throw new RuntimeException(
 1033  
             "printAggregatedMetricsToHDFS: metrics file exists");
 1034  
       }
 1035  0
       out = new PrintStream(fs.create(outFile), false,
 1036  0
           Charset.defaultCharset().name());
 1037  0
       aggregatedMetrics.print(superstep, out);
 1038  0
     } catch (IOException e) {
 1039  0
       throw new RuntimeException(
 1040  
           "printAggregatedMetricsToHDFS: error creating metrics file", e);
 1041  
     } finally {
 1042  0
       if (out != null) {
 1043  0
         out.close();
 1044  
       }
 1045  0
     }
 1046  0
   }
 1047  
 
 1048  
   /**
 1049  
    * Finalize the checkpoint file prefixes by taking the chosen workers and
 1050  
    * writing them to a finalized file.  Also write out the master
 1051  
    * aggregated aggregator array from the previous superstep.
 1052  
    *
 1053  
    * @param superstep superstep to finalize
 1054  
    * @param chosenWorkerInfoList list of chosen workers that will be finalized
 1055  
    * @throws IOException
 1056  
    * @throws InterruptedException
 1057  
    * @throws KeeperException
 1058  
    */
 1059  
   private void finalizeCheckpoint(long superstep,
 1060  
     List<WorkerInfo> chosenWorkerInfoList)
 1061  
     throws IOException, KeeperException, InterruptedException {
 1062  0
     Path finalizedCheckpointPath =
 1063  0
         new Path(getCheckpointBasePath(superstep) +
 1064  
             CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX);
 1065  
     try {
 1066  0
       getFs().delete(finalizedCheckpointPath, false);
 1067  0
     } catch (IOException e) {
 1068  0
       LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " +
 1069  
           finalizedCheckpointPath);
 1070  0
     }
 1071  
 
 1072  
     // Format:
 1073  
     // <global statistics>
 1074  
     // <superstep classes>
 1075  
     // <number of files>
 1076  
     // <used file prefix 0><used file prefix 1>...
 1077  
     // <aggregator data>
 1078  
     // <masterCompute data>
 1079  0
     FSDataOutputStream finalizedOutputStream =
 1080  0
         getFs().create(finalizedCheckpointPath);
 1081  
 
 1082  0
     String superstepFinishedNode =
 1083  0
         getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1);
 1084  0
     finalizedOutputStream.write(
 1085  0
         getZkExt().getData(superstepFinishedNode, false, null));
 1086  
 
 1087  0
     finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
 1088  0
     finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep));
 1089  0
     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
 1090  0
       finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
 1091  0
     }
 1092  0
     globalCommHandler.getAggregatorHandler().write(finalizedOutputStream);
 1093  0
     aggregatorTranslation.write(finalizedOutputStream);
 1094  0
     masterCompute.write(finalizedOutputStream);
 1095  0
     finalizedOutputStream.close();
 1096  0
     lastCheckpointedSuperstep = superstep;
 1097  0
     GiraphStats.getInstance().
 1098  0
         getLastCheckpointedSuperstep().setValue(superstep);
 1099  0
   }
 1100  
 
 1101  
   /**
 1102  
    * Assign the partitions for this superstep.  If there are changes,
 1103  
    * the workers will know how to do the exchange.  If this was a restarted
 1104  
    * superstep, then make sure to provide information on where to find the
 1105  
    * checkpoint file.
 1106  
    */
 1107  
   private void assignPartitionOwners() {
 1108  
     Collection<PartitionOwner> partitionOwners;
 1109  0
     if (getSuperstep() == INPUT_SUPERSTEP) {
 1110  0
       partitionOwners =
 1111  0
           masterGraphPartitioner.createInitialPartitionOwners(
 1112  
               chosenWorkerInfoList, maxWorkers);
 1113  0
       if (partitionOwners.isEmpty()) {
 1114  0
         throw new IllegalStateException(
 1115  
             "assignAndExchangePartitions: No partition owners set");
 1116  
       }
 1117  0
     } else if (getRestartedSuperstep() == getSuperstep()) {
 1118  
       // If restarted, prepare the checkpoint restart
 1119  
       try {
 1120  0
         partitionOwners = prepareCheckpointRestart(getSuperstep());
 1121  0
       } catch (IOException e) {
 1122  0
         throw new IllegalStateException(
 1123  
             "assignPartitionOwners: IOException on preparing", e);
 1124  0
       } catch (KeeperException e) {
 1125  0
         throw new IllegalStateException(
 1126  
             "assignPartitionOwners: KeeperException on preparing", e);
 1127  0
       } catch (InterruptedException e) {
 1128  0
         throw new IllegalStateException(
 1129  
             "assignPartitionOwners: InteruptedException on preparing",
 1130  
             e);
 1131  0
       }
 1132  0
       masterGraphPartitioner.setPartitionOwners(partitionOwners);
 1133  
     } else {
 1134  0
       partitionOwners =
 1135  0
           masterGraphPartitioner.generateChangedPartitionOwners(
 1136  
               allPartitionStatsList,
 1137  
               chosenWorkerInfoList,
 1138  
               maxWorkers,
 1139  0
               getSuperstep());
 1140  
 
 1141  0
       PartitionUtils.analyzePartitionStats(partitionOwners,
 1142  
           allPartitionStatsList);
 1143  
     }
 1144  0
     checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
 1145  
 
 1146  
 
 1147  
 
 1148  
     // There will be some exchange of partitions
 1149  0
     if (!partitionOwners.isEmpty()) {
 1150  0
       String vertexExchangePath =
 1151  0
           getPartitionExchangePath(getApplicationAttempt(),
 1152  0
               getSuperstep());
 1153  
       try {
 1154  0
         getZkExt().createOnceExt(vertexExchangePath,
 1155  
             null,
 1156  
             Ids.OPEN_ACL_UNSAFE,
 1157  
             CreateMode.PERSISTENT,
 1158  
             true);
 1159  0
       } catch (KeeperException e) {
 1160  0
         throw new IllegalStateException(
 1161  
             "assignPartitionOwners: KeeperException creating " +
 1162  
                 vertexExchangePath);
 1163  0
       } catch (InterruptedException e) {
 1164  0
         throw new IllegalStateException(
 1165  
             "assignPartitionOwners: InterruptedException creating " +
 1166  
                 vertexExchangePath);
 1167  0
       }
 1168  
     }
 1169  
 
 1170  0
     AddressesAndPartitionsWritable addressesAndPartitions =
 1171  
         new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
 1172  
             partitionOwners);
 1173  
     // Send assignments to every worker
 1174  
     // TODO for very large number of partitions we might want to split this
 1175  
     // across multiple requests
 1176  0
     for (WorkerInfo workerInfo : chosenWorkerInfoList) {
 1177  0
       masterClient.sendWritableRequest(workerInfo.getTaskId(),
 1178  
           new AddressesAndPartitionsRequest(addressesAndPartitions));
 1179  0
     }
 1180  0
   }
 1181  
 
 1182  
   /**
 1183  
    * Check if partition ids are valid
 1184  
    *
 1185  
    * @param partitionOwners List of partition ids for current superstep
 1186  
    */
 1187  
   private void checkPartitions(Collection<PartitionOwner> partitionOwners) {
 1188  0
     for (PartitionOwner partitionOwner : partitionOwners) {
 1189  0
       int partitionId = partitionOwner.getPartitionId();
 1190  0
       if (partitionId < 0 || partitionId >= partitionOwners.size()) {
 1191  0
         throw new IllegalStateException("checkPartitions: " +
 1192  
             "Invalid partition id " + partitionId +
 1193  
             " - partition ids must be values from 0 to (numPartitions - 1)");
 1194  
       }
 1195  0
     }
 1196  0
   }
 1197  
 
 1198  
   /**
 1199  
    * Check whether the workers chosen for this superstep are still alive
 1200  
    *
 1201  
    * @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper
 1202  
    * @param chosenWorkerInfoList List of the healthy workers
 1203  
    * @return a list of dead workers. Empty list if all workers are alive.
 1204  
    * @throws InterruptedException
 1205  
    * @throws KeeperException
 1206  
    */
 1207  
   private Collection<WorkerInfo> superstepChosenWorkerAlive(
 1208  
     String chosenWorkerInfoHealthPath,
 1209  
     List<WorkerInfo> chosenWorkerInfoList)
 1210  
     throws KeeperException, InterruptedException {
 1211  0
     List<WorkerInfo> chosenWorkerInfoHealthyList =
 1212  0
         getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
 1213  0
     Set<WorkerInfo> chosenWorkerInfoHealthySet =
 1214  
         new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
 1215  0
     List<WorkerInfo> deadWorkers = new ArrayList<>();
 1216  0
     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
 1217  0
       if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
 1218  0
         deadWorkers.add(chosenWorkerInfo);
 1219  
       }
 1220  0
     }
 1221  0
     return deadWorkers;
 1222  
   }
 1223  
 
 1224  
   @Override
 1225  
   public void restartFromCheckpoint(long checkpoint) {
 1226  
     // Process:
 1227  
     // 1. Increase the application attempt and set to the correct checkpoint
 1228  
     // 2. Send command to all workers to restart their tasks
 1229  0
     setApplicationAttempt(getApplicationAttempt() + 1);
 1230  0
     setCachedSuperstep(checkpoint);
 1231  0
     setRestartedSuperstep(checkpoint);
 1232  0
     checkpointStatus = CheckpointStatus.NONE;
 1233  0
     setJobState(ApplicationState.START_SUPERSTEP,
 1234  0
         getApplicationAttempt(),
 1235  
         checkpoint);
 1236  0
   }
 1237  
 
 1238  
   /**
 1239  
    * Safely removes node from zookeeper.
 1240  
    * Ignores if node is already removed. Can only throw runtime exception if
 1241  
    * anything wrong.
 1242  
    * @param path path to the node to be removed.
 1243  
    */
 1244  
   private void zkDeleteNode(String path) {
 1245  
     try {
 1246  0
       getZkExt().deleteExt(path, -1, true);
 1247  0
     } catch (KeeperException.NoNodeException e) {
 1248  0
       LOG.info("zkDeleteNode: node has already been removed " + path);
 1249  0
     } catch (InterruptedException e) {
 1250  0
       throw new RuntimeException(
 1251  
           "zkDeleteNode: InterruptedException", e);
 1252  0
     } catch (KeeperException e) {
 1253  0
       throw new RuntimeException(
 1254  
           "zkDeleteNode: KeeperException", e);
 1255  0
     }
 1256  0
   }
 1257  
 
 1258  
   @Override
 1259  
   public long getLastGoodCheckpoint() throws IOException {
 1260  
     // Find the last good checkpoint if none have been written to the
 1261  
     // knowledge of this master
 1262  0
     if (lastCheckpointedSuperstep == -1) {
 1263  
       try {
 1264  0
         lastCheckpointedSuperstep = getLastCheckpointedSuperstep();
 1265  0
       } catch (IOException e) {
 1266  0
         LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
 1267  
             "found, killing the job.", e);
 1268  0
         failJob(e);
 1269  0
       }
 1270  
     }
 1271  
 
 1272  0
     return lastCheckpointedSuperstep;
 1273  
   }
 1274  
 
 1275  
   /**
 1276  
    * Wait for a set of workers to signal that they are done with the
 1277  
    * barrier.
 1278  
    *
 1279  
    * @param finishedWorkerPath Path to where the workers will register their
 1280  
    *        hostname and id
 1281  
    * @param workerInfoList List of the workers to wait for
 1282  
    * @param event Event to wait on for a chance to be done.
 1283  
    * @param ignoreDeath In case if worker died after making it through
 1284  
    *                    barrier, we will ignore death if set to true.
 1285  
    * @return True if barrier was successful, false if there was a worker
 1286  
    *         failure
 1287  
    */
 1288  
   private boolean barrierOnWorkerList(String finishedWorkerPath,
 1289  
       List<WorkerInfo> workerInfoList,
 1290  
       BspEvent event,
 1291  
       boolean ignoreDeath) {
 1292  
     try {
 1293  0
       getZkExt().createOnceExt(finishedWorkerPath,
 1294  
           null,
 1295  
           Ids.OPEN_ACL_UNSAFE,
 1296  
           CreateMode.PERSISTENT,
 1297  
           true);
 1298  0
     } catch (KeeperException e) {
 1299  0
       throw new IllegalStateException(
 1300  
           "barrierOnWorkerList: KeeperException - Couldn't create " +
 1301  
               finishedWorkerPath, e);
 1302  0
     } catch (InterruptedException e) {
 1303  0
       throw new IllegalStateException(
 1304  
           "barrierOnWorkerList: InterruptedException - Couldn't create " +
 1305  
               finishedWorkerPath, e);
 1306  0
     }
 1307  0
     List<String> hostnameIdList =
 1308  0
         new ArrayList<String>(workerInfoList.size());
 1309  0
     for (WorkerInfo workerInfo : workerInfoList) {
 1310  0
       hostnameIdList.add(workerInfo.getHostnameId());
 1311  0
     }
 1312  0
     String workerInfoHealthyPath =
 1313  0
         getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
 1314  0
     List<String> finishedHostnameIdList = new ArrayList<>();
 1315  
     List<String> tmpFinishedHostnameIdList;
 1316  0
     long nextInfoMillis = System.currentTimeMillis();
 1317  0
     final int defaultTaskTimeoutMsec = 10 * 60 * 1000;  // from TaskTracker
 1318  0
     final int waitBetweenLogInfoMsec = 30 * 1000;
 1319  0
     final int taskTimeoutMsec = getContext().getConfiguration().getInt(
 1320  
         "mapred.task.timeout", defaultTaskTimeoutMsec) / 2;
 1321  0
     long lastRegularRunTimeMsec = 0;
 1322  0
     int eventLoopTimeout =  Math.min(taskTimeoutMsec, waitBetweenLogInfoMsec);
 1323  0
     boolean logInfoOnlyRun = false;
 1324  0
     List<WorkerInfo> deadWorkers = new ArrayList<>();
 1325  
     while (true) {
 1326  0
       if (! logInfoOnlyRun) {
 1327  
         try {
 1328  0
           tmpFinishedHostnameIdList =
 1329  0
               getZkExt().getChildrenExt(finishedWorkerPath,
 1330  
                                         true,
 1331  
                                         false,
 1332  
                                         false);
 1333  0
         } catch (KeeperException e) {
 1334  0
           throw new IllegalStateException(
 1335  
               "barrierOnWorkerList: KeeperException - Couldn't get " +
 1336  
                   "children of " + finishedWorkerPath, e);
 1337  0
         } catch (InterruptedException e) {
 1338  0
           throw new IllegalStateException(
 1339  
               "barrierOnWorkerList: IllegalException - Couldn't get " +
 1340  
                   "children of " + finishedWorkerPath, e);
 1341  0
         }
 1342  0
         if (LOG.isDebugEnabled()) {
 1343  
           // Log the names of the new workers that have finished since last time
 1344  0
           Set<String> newFinishedHostnames = Sets.difference(
 1345  0
             Sets.newHashSet(tmpFinishedHostnameIdList),
 1346  0
             Sets.newHashSet(finishedHostnameIdList));
 1347  0
           LOG.debug("barrierOnWorkerList: Got new finished worker list = " +
 1348  
                         newFinishedHostnames + ", size = " +
 1349  0
                         newFinishedHostnames.size() +
 1350  
                         " from " + finishedWorkerPath);
 1351  
         }
 1352  0
         finishedHostnameIdList = tmpFinishedHostnameIdList;
 1353  
       }
 1354  
 
 1355  0
       if (LOG.isInfoEnabled() &&
 1356  0
           (System.currentTimeMillis() > nextInfoMillis)) {
 1357  0
         nextInfoMillis = System.currentTimeMillis() + waitBetweenLogInfoMsec;
 1358  0
         LOG.info("barrierOnWorkerList: " +
 1359  0
             finishedHostnameIdList.size() +
 1360  0
             " out of " + workerInfoList.size() +
 1361  
             " workers finished on superstep " +
 1362  0
             getSuperstep() + " on path " + finishedWorkerPath);
 1363  0
         if (workerInfoList.size() - finishedHostnameIdList.size() <
 1364  
             MAX_PRINTABLE_REMAINING_WORKERS) {
 1365  0
           Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList);
 1366  0
           remainingWorkers.removeAll(finishedHostnameIdList);
 1367  0
           LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
 1368  
         }
 1369  
       }
 1370  
 
 1371  0
       if (! logInfoOnlyRun) {
 1372  0
         getContext().setStatus(getGraphTaskManager().getGraphFunctions() +
 1373  
                                    " - " +
 1374  0
                                    finishedHostnameIdList.size() +
 1375  
                                    " finished out of " +
 1376  0
                                    workerInfoList.size() +
 1377  0
                                    " on superstep " + getSuperstep());
 1378  0
         if (finishedHostnameIdList.containsAll(hostnameIdList)) {
 1379  0
           break;
 1380  
         }
 1381  
 
 1382  0
         for (WorkerInfo deadWorker : deadWorkers) {
 1383  0
           if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) {
 1384  0
             LOG.error("barrierOnWorkerList: no results arived from " +
 1385  
                           "worker that was pronounced dead: " + deadWorker +
 1386  0
                           " on superstep " + getSuperstep());
 1387  0
             return false;
 1388  
           }
 1389  0
         }
 1390  
 
 1391  
         // wall-clock time skew is ignored
 1392  0
         lastRegularRunTimeMsec = System.currentTimeMillis();
 1393  
       }
 1394  
 
 1395  
       // Wait for a signal or timeout
 1396  0
       boolean eventTriggered = event.waitMsecs(eventLoopTimeout);
 1397  
 
 1398  
       // If the event was triggered, we reset it. In the next loop run, we will
 1399  
       // read ZK to get the new hosts.
 1400  0
       if (eventTriggered) {
 1401  0
         event.reset();
 1402  
       }
 1403  
 
 1404  0
       long elapsedTimeSinceRegularRunMsec = System.currentTimeMillis() -
 1405  
           lastRegularRunTimeMsec;
 1406  0
       getContext().progress();
 1407  
 
 1408  0
       if (eventTriggered ||
 1409  
           taskTimeoutMsec == eventLoopTimeout ||
 1410  
           elapsedTimeSinceRegularRunMsec >= taskTimeoutMsec) {
 1411  0
         logInfoOnlyRun = false;
 1412  
       } else {
 1413  0
         logInfoOnlyRun = true;
 1414  0
         continue;
 1415  
       }
 1416  
 
 1417  
       // Did a worker die?
 1418  
       try {
 1419  0
         deadWorkers.addAll(superstepChosenWorkerAlive(
 1420  
                 workerInfoHealthyPath,
 1421  
                 workerInfoList));
 1422  0
         if (!ignoreDeath && deadWorkers.size() > 0) {
 1423  0
           String errorMessage = "******* WORKERS " + deadWorkers +
 1424  
               " FAILED *******";
 1425  
           // If checkpointing is not used, we should fail the job
 1426  0
           if (!getConfiguration().useCheckpointing()) {
 1427  0
             setJobStateFailed(errorMessage);
 1428  
           } else {
 1429  0
             LOG.error("barrierOnWorkerList: Missing chosen " +
 1430  
                 "workers " + deadWorkers +
 1431  0
                 " on superstep " + getSuperstep());
 1432  
             // Log worker failure to command line
 1433  0
             getGraphTaskManager().getJobProgressTracker().logInfo(errorMessage);
 1434  
           }
 1435  0
           return false;
 1436  
         }
 1437  0
       } catch (KeeperException e) {
 1438  0
         throw new IllegalStateException(
 1439  
             "barrierOnWorkerList: KeeperException - " +
 1440  
                 "Couldn't get " + workerInfoHealthyPath, e);
 1441  0
       } catch (InterruptedException e) {
 1442  0
         throw new IllegalStateException(
 1443  
             "barrierOnWorkerList: InterruptedException - " +
 1444  
                 "Couldn't get " + workerInfoHealthyPath, e);
 1445  0
       }
 1446  0
     }
 1447  
 
 1448  0
     return true;
 1449  
   }
 1450  
 
 1451  
   /**
 1452  
    * Clean up old superstep data from Zookeeper
 1453  
    *
 1454  
    * @param removeableSuperstep Supersteo to clean up
 1455  
    * @throws InterruptedException
 1456  
    */
 1457  
   private void cleanUpOldSuperstep(long removeableSuperstep) throws
 1458  
       InterruptedException {
 1459  0
     if (KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) &&
 1460  
         (removeableSuperstep >= 0)) {
 1461  0
       String oldSuperstepPath =
 1462  0
           getSuperstepPath(getApplicationAttempt()) + "/" +
 1463  
               removeableSuperstep;
 1464  
       try {
 1465  0
         if (LOG.isInfoEnabled()) {
 1466  0
           LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
 1467  
               oldSuperstepPath);
 1468  
         }
 1469  0
         getZkExt().deleteExt(oldSuperstepPath,
 1470  
             -1,
 1471  
             true);
 1472  0
       } catch (KeeperException.NoNodeException e) {
 1473  0
         LOG.warn("coordinateBarrier: Already cleaned up " +
 1474  
             oldSuperstepPath);
 1475  0
       } catch (KeeperException e) {
 1476  0
         throw new IllegalStateException(
 1477  
             "coordinateSuperstep: KeeperException on " +
 1478  
                 "finalizing checkpoint", e);
 1479  0
       }
 1480  
     }
 1481  0
   }
 1482  
 
 1483  
   /**
 1484  
    * Coordinate the exchange of vertex/edge input splits among workers.
 1485  
    */
 1486  
   private void coordinateInputSplits() {
 1487  
     // Coordinate the workers finishing sending their vertices/edges to the
 1488  
     // correct workers and signal when everything is done.
 1489  0
     if (!barrierOnWorkerList(inputSplitsWorkerDonePath,
 1490  
         chosenWorkerInfoList,
 1491  0
         getInputSplitsWorkerDoneEvent(),
 1492  
         false)) {
 1493  0
       throw new IllegalStateException("coordinateInputSplits: Worker failed " +
 1494  
           "during input split (currently not supported)");
 1495  
     }
 1496  
     try {
 1497  0
       getZkExt().createExt(inputSplitsAllDonePath,
 1498  
           null,
 1499  
           Ids.OPEN_ACL_UNSAFE,
 1500  
           CreateMode.PERSISTENT,
 1501  
           false);
 1502  0
     } catch (KeeperException.NodeExistsException e) {
 1503  0
       LOG.info("coordinateInputSplits: Node " +
 1504  
           inputSplitsAllDonePath + " already exists.");
 1505  0
     } catch (KeeperException e) {
 1506  0
       throw new IllegalStateException(
 1507  
           "coordinateInputSplits: KeeperException", e);
 1508  0
     } catch (InterruptedException e) {
 1509  0
       throw new IllegalStateException(
 1510  
           "coordinateInputSplits: IllegalStateException", e);
 1511  0
     }
 1512  0
   }
 1513  
 
 1514  
   /**
 1515  
    * Initialize aggregator at the master side
 1516  
    * before vertex/edge loading.
 1517  
    * This methods cooperates with other code
 1518  
    * to enables aggregation usage at INPUT_SUPERSTEP
 1519  
    * Other codes are:
 1520  
    *  BSPServiceWorker:
 1521  
    *  aggregatorHandler.prepareSuperstep in
 1522  
    *  setup
 1523  
    *  set aggregator usage in vertexReader and
 1524  
    *  edgeReader
 1525  
    *
 1526  
    * @throws InterruptedException
 1527  
    */
 1528  
   private void initializeAggregatorInputSuperstep()
 1529  
     throws InterruptedException {
 1530  0
     globalCommHandler.getAggregatorHandler().prepareSuperstep();
 1531  
 
 1532  0
     prepareMasterCompute(getSuperstep());
 1533  
     try {
 1534  0
       masterCompute.initialize();
 1535  0
     } catch (InstantiationException e) {
 1536  0
       LOG.fatal(
 1537  
         "initializeAggregatorInputSuperstep: Failed in instantiation", e);
 1538  0
       throw new RuntimeException(
 1539  
         "initializeAggregatorInputSuperstep: Failed in instantiation", e);
 1540  0
     } catch (IllegalAccessException e) {
 1541  0
       LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e);
 1542  0
       throw new RuntimeException(
 1543  
         "initializeAggregatorInputSuperstep: Failed in access", e);
 1544  0
     }
 1545  0
     aggregatorTranslation.postMasterCompute();
 1546  0
     globalCommHandler.getAggregatorHandler().finishSuperstep();
 1547  
 
 1548  0
     globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
 1549  0
   }
 1550  
 
 1551  
   /**
 1552  
    * This is required before initialization
 1553  
    * and run of MasterCompute
 1554  
    *
 1555  
    * @param superstep superstep for which to run masterCompute
 1556  
    * @return Superstep classes set by masterCompute
 1557  
    */
 1558  
   private SuperstepClasses prepareMasterCompute(long superstep) {
 1559  0
     GraphState graphState = new GraphState(superstep ,
 1560  0
         GiraphStats.getInstance().getVertices().getValue(),
 1561  0
         GiraphStats.getInstance().getEdges().getValue(),
 1562  0
         getContext());
 1563  0
     SuperstepClasses superstepClasses =
 1564  0
         SuperstepClasses.createAndExtractTypes(getConfiguration());
 1565  0
     masterCompute.setGraphState(graphState);
 1566  0
     masterCompute.setSuperstepClasses(superstepClasses);
 1567  0
     return superstepClasses;
 1568  
   }
 1569  
 
 1570  
   @Override
 1571  
   public SuperstepState coordinateSuperstep() throws
 1572  
   KeeperException, InterruptedException {
 1573  
     // 1. Get chosen workers and set up watches on them.
 1574  
     // 2. Assign partitions to the workers
 1575  
     //    (possibly reloading from a superstep)
 1576  
     // 3. Wait for all workers to complete
 1577  
     // 4. Collect and process aggregators
 1578  
     // 5. Create superstep finished node
 1579  
     // 6. If the checkpoint frequency is met, finalize the checkpoint
 1580  
 
 1581  0
     for (MasterObserver observer : observers) {
 1582  0
       observer.preSuperstep(getSuperstep());
 1583  0
       getContext().progress();
 1584  
     }
 1585  
 
 1586  0
     chosenWorkerInfoList = checkWorkers();
 1587  0
     if (chosenWorkerInfoList == null) {
 1588  0
       setJobStateFailed("coordinateSuperstep: Not enough healthy workers for " +
 1589  0
                     "superstep " + getSuperstep());
 1590  
     } else {
 1591  
       // Sort this list, so order stays the same over supersteps
 1592  0
       Collections.sort(chosenWorkerInfoList, new Comparator<WorkerInfo>() {
 1593  
         @Override
 1594  
         public int compare(WorkerInfo wi1, WorkerInfo wi2) {
 1595  0
           return Integer.compare(wi1.getTaskId(), wi2.getTaskId());
 1596  
         }
 1597  
       });
 1598  0
       for (WorkerInfo workerInfo : chosenWorkerInfoList) {
 1599  0
         String workerInfoHealthyPath =
 1600  0
             getWorkerInfoHealthyPath(getApplicationAttempt(),
 1601  0
                 getSuperstep()) + "/" +
 1602  0
                 workerInfo.getHostnameId();
 1603  0
         if (getZkExt().exists(workerInfoHealthyPath, true) == null) {
 1604  0
           LOG.warn("coordinateSuperstep: Chosen worker " +
 1605  
               workerInfoHealthyPath +
 1606  
               " is no longer valid, failing superstep");
 1607  
         }
 1608  0
       }
 1609  
     }
 1610  
 
 1611  
     // We need to finalize aggregators from previous superstep
 1612  0
     if (getSuperstep() >= 0) {
 1613  0
       aggregatorTranslation.postMasterCompute();
 1614  0
       globalCommHandler.getAggregatorHandler().finishSuperstep();
 1615  
     }
 1616  
 
 1617  0
     masterClient.openConnections();
 1618  
 
 1619  0
     GiraphStats.getInstance().
 1620  0
         getCurrentWorkers().setValue(chosenWorkerInfoList.size());
 1621  0
     assignPartitionOwners();
 1622  
 
 1623  
     // Finalize the valid checkpoint file prefixes and possibly
 1624  
     // the aggregators.
 1625  0
     if (checkpointStatus != CheckpointStatus.NONE) {
 1626  0
       String workerWroteCheckpointPath =
 1627  0
           getWorkerWroteCheckpointPath(getApplicationAttempt(),
 1628  0
               getSuperstep());
 1629  
       // first wait for all the workers to write their checkpoint data
 1630  0
       if (!barrierOnWorkerList(workerWroteCheckpointPath,
 1631  
           chosenWorkerInfoList,
 1632  0
           getWorkerWroteCheckpointEvent(),
 1633  
           checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) {
 1634  0
         return SuperstepState.WORKER_FAILURE;
 1635  
       }
 1636  
       try {
 1637  0
         finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
 1638  0
       } catch (IOException e) {
 1639  0
         throw new IllegalStateException(
 1640  
             "coordinateSuperstep: IOException on finalizing checkpoint",
 1641  
             e);
 1642  0
       }
 1643  0
       if (checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) {
 1644  0
         return SuperstepState.CHECKPOINT_AND_HALT;
 1645  
       }
 1646  
     }
 1647  
 
 1648  
     // We need to send aggregators to worker owners after new worker assignments
 1649  0
     if (getSuperstep() >= 0) {
 1650  0
       globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
 1651  
     }
 1652  
 
 1653  0
     if (getSuperstep() == INPUT_SUPERSTEP) {
 1654  
       // Initialize aggregators before coordinating
 1655  0
       initializeAggregatorInputSuperstep();
 1656  0
       coordinateInputSplits();
 1657  
     }
 1658  
 
 1659  0
     String finishedWorkerPath =
 1660  0
         getWorkerMetricsFinishedPath(getApplicationAttempt(), getSuperstep());
 1661  0
     if (!barrierOnWorkerList(finishedWorkerPath,
 1662  
         chosenWorkerInfoList,
 1663  0
         getSuperstepStateChangedEvent(),
 1664  
         false)) {
 1665  0
       return SuperstepState.WORKER_FAILURE;
 1666  
     }
 1667  
 
 1668  
     // Collect aggregator values, then run the master.compute() and
 1669  
     // finally save the aggregator values
 1670  0
     globalCommHandler.getAggregatorHandler().prepareSuperstep();
 1671  0
     aggregatorTranslation.prepareSuperstep();
 1672  
 
 1673  0
     SuperstepClasses superstepClasses =
 1674  0
       prepareMasterCompute(getSuperstep() + 1);
 1675  0
     doMasterCompute();
 1676  
 
 1677  
     // If the master is halted or all the vertices voted to halt and there
 1678  
     // are no more messages in the system, stop the computation
 1679  0
     GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
 1680  0
     aggregateCountersFromWorkersAndMaster();
 1681  0
     if (masterCompute.isHalted() ||
 1682  0
         (globalStats.getFinishedVertexCount() ==
 1683  0
         globalStats.getVertexCount() &&
 1684  0
         globalStats.getMessageCount() == 0)) {
 1685  0
       globalStats.setHaltComputation(true);
 1686  0
     } else if (getZkExt().exists(haltComputationPath, false) != null) {
 1687  0
       if (LOG.isInfoEnabled()) {
 1688  0
         LOG.info("Halting computation because halt zookeeper node was created");
 1689  
       }
 1690  0
       globalStats.setHaltComputation(true);
 1691  
     }
 1692  
 
 1693  
     // If we have completed the maximum number of supersteps, stop
 1694  
     // the computation
 1695  0
     if (maxNumberOfSupersteps !=
 1696  0
         GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() &&
 1697  0
         (getSuperstep() == maxNumberOfSupersteps - 1)) {
 1698  0
       if (LOG.isInfoEnabled()) {
 1699  0
         LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps +
 1700  
             " supersteps (max specified by the user), halting");
 1701  
       }
 1702  0
       globalStats.setHaltComputation(true);
 1703  
     }
 1704  
 
 1705  
     // Superstep 0 doesn't need to have matching types (Message types may not
 1706  
     // match) and if the computation is halted, no need to check any of
 1707  
     // the types.
 1708  0
     if (!globalStats.getHaltComputation()) {
 1709  0
       superstepClasses.verifyTypesMatch(getSuperstep() > 0);
 1710  
     }
 1711  0
     getConfiguration().updateSuperstepClasses(superstepClasses);
 1712  
 
 1713  
     //Signal workers that we want to checkpoint
 1714  0
     checkpointStatus = getCheckpointStatus(getSuperstep() + 1);
 1715  0
     globalStats.setCheckpointStatus(checkpointStatus);
 1716  
     // Let everyone know the aggregated application state through the
 1717  
     // superstep finishing znode.
 1718  0
     String superstepFinishedNode =
 1719  0
         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
 1720  
 
 1721  0
     WritableUtils.writeToZnode(
 1722  0
         getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses);
 1723  0
     updateCounters(globalStats);
 1724  
 
 1725  0
     cleanUpOldSuperstep(getSuperstep() - 1);
 1726  0
     incrCachedSuperstep();
 1727  
     // Counter starts at zero, so no need to increment
 1728  0
     if (getSuperstep() > 0) {
 1729  0
       GiraphStats.getInstance().getSuperstepCounter().increment();
 1730  
     }
 1731  
     SuperstepState superstepState;
 1732  0
     if (globalStats.getHaltComputation()) {
 1733  0
       superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
 1734  
     } else {
 1735  0
       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
 1736  
     }
 1737  0
     globalCommHandler.getAggregatorHandler().writeAggregators(
 1738  0
         getSuperstep(), superstepState);
 1739  
 
 1740  0
     return superstepState;
 1741  
   }
 1742  
 
 1743  
   /**
 1744  
    * Should checkpoint on this superstep?  If checkpointing, always
 1745  
    * checkpoint the first user superstep.  If restarting, the first
 1746  
    * checkpoint is after the frequency has been met.
 1747  
    *
 1748  
    * @param superstep Decide if checkpointing no this superstep
 1749  
    * @return True if this superstep should be checkpointed, false otherwise
 1750  
    */
 1751  
   private CheckpointStatus getCheckpointStatus(long superstep) {
 1752  
     try {
 1753  0
       if (getZkExt().
 1754  0
           exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) {
 1755  0
         if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
 1756  0
           return CheckpointStatus.CHECKPOINT_AND_HALT;
 1757  
         } else {
 1758  0
           LOG.warn("Attempted to manually checkpoint the job that " +
 1759  
               "does not support checkpoints. Ignoring");
 1760  
         }
 1761  
       }
 1762  0
     } catch (KeeperException e) {
 1763  0
       throw new IllegalStateException(
 1764  
           "cleanupZooKeeper: Got KeeperException", e);
 1765  0
     } catch (InterruptedException e) {
 1766  0
       throw new IllegalStateException(
 1767  
           "cleanupZooKeeper: Got IllegalStateException", e);
 1768  0
     }
 1769  0
     if (checkpointFrequency == 0) {
 1770  0
       return CheckpointStatus.NONE;
 1771  
     }
 1772  0
     long firstCheckpoint = INPUT_SUPERSTEP + 1;
 1773  0
     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
 1774  0
       firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
 1775  
     }
 1776  0
     if (superstep < firstCheckpoint) {
 1777  0
       return CheckpointStatus.NONE;
 1778  
     }
 1779  0
     if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
 1780  0
       if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
 1781  0
         return CheckpointStatus.CHECKPOINT;
 1782  
       }
 1783  
     }
 1784  0
     return CheckpointStatus.NONE;
 1785  
   }
 1786  
 
 1787  
   /**
 1788  
    * Returns false if job doesn't support checkpoints.
 1789  
    * Job may not support checkpointing if it does output during
 1790  
    * computation, uses static variables to keep data between supersteps,
 1791  
    * starts new threads etc.
 1792  
    * @param conf Immutable configuration of the job
 1793  
    * @param masterCompute instance of master compute
 1794  
    * @return true if it is safe to checkpoint the job
 1795  
    */
 1796  
   private boolean isCheckpointingSupported(
 1797  
       GiraphConfiguration conf, MasterCompute masterCompute) {
 1798  0
     return checkpointSupportedChecker.isCheckpointSupported(
 1799  
         conf, masterCompute);
 1800  
   }
 1801  
 
 1802  
 
 1803  
   /**
 1804  
    * This doMasterCompute is only called
 1805  
    * after masterCompute is initialized
 1806  
    */
 1807  
   private void doMasterCompute() {
 1808  0
     GiraphTimerContext timerContext = masterComputeTimer.time();
 1809  0
     masterCompute.compute();
 1810  0
     timerContext.stop();
 1811  0
   }
 1812  
 
 1813  
   /**
 1814  
    * Use the counterGroupAndNames and context, to get the counter values,
 1815  
    * create a custom counter out of each, and add to the set of counters
 1816  
    * @param context Job context
 1817  
    * @param counterGroupAndNames List of counter names
 1818  
    * @param counters Set of CustomCounter which will be populated
 1819  
    */
 1820  
   private void populateCountersFromContext(Mapper.Context context,
 1821  
            Map<String, Set<String>> counterGroupAndNames,
 1822  
            Set<CustomCounter> counters) {
 1823  
     Counter counter;
 1824  
     for (Map.Entry<String, Set<String>> entry :
 1825  0
             counterGroupAndNames.entrySet()) {
 1826  0
       String groupName = entry.getKey();
 1827  0
       for (String counterName: entry.getValue()) {
 1828  0
         CustomCounter customCounter = new CustomCounter(groupName, counterName,
 1829  
                 CustomCounter.Aggregation.SUM);
 1830  0
         counter = context.getCounter(groupName, counterName);
 1831  0
         customCounter.setValue(counter.getValue());
 1832  0
         counters.add(customCounter);
 1833  0
       }
 1834  0
     }
 1835  0
   }
 1836  
 
 1837  
   /**
 1838  
    * Receive the counters from the workers, and aggregate them with the
 1839  
    * master counters.
 1840  
    * This method is called at the end of each superstep, and after the job
 1841  
    * finishes successfully. In order to ensure best-effort counter values
 1842  
    * in case of a job failure, we call this at the end of every superstep
 1843  
    * The aggregated counters are stored in a thrift struct
 1844  
    */
 1845  
   private void aggregateCountersFromWorkersAndMaster() {
 1846  0
     CustomCounters customCounters = new CustomCounters();
 1847  0
     long superstep = getSuperstep();
 1848  
     // Get the stats from the all the worker selected nodes
 1849  0
     String workerFinishedPath = getWorkerCountersFinishedPath(
 1850  0
             getApplicationAttempt(), superstep);
 1851  
     try {
 1852  0
       getZkExt().createOnceExt(workerFinishedPath,
 1853  
               null,
 1854  
               Ids.OPEN_ACL_UNSAFE,
 1855  
               CreateMode.PERSISTENT,
 1856  
               true);
 1857  0
     } catch (KeeperException e) {
 1858  0
       LOG.warn("aggregateCounters: KeeperException - " +
 1859  
               "Couldn't create " + workerFinishedPath, e);
 1860  0
     } catch (InterruptedException e) {
 1861  0
       LOG.warn("barrierOnWorkerList: InterruptedException - " +
 1862  
               "Couldn't create " + workerFinishedPath, e);
 1863  0
     }
 1864  0
     List<String> workerFinishedPathList = new ArrayList<>();
 1865  
     long waitForCountersTimeout =
 1866  0
             SystemTime.get().getMilliseconds() + maxCounterWaitMsecs;
 1867  
     // Subtract 1 for the master
 1868  0
     int numWorkers = BspInputFormat.getMaxTasks(getConfiguration()) - 1;
 1869  0
     if (numWorkers == 0) {
 1870  
       // When the job is run with 1 worker, numWorkers would be 0,
 1871  
       // and thus add 1 to it, to get the worker-related counters
 1872  0
       numWorkers += 1;
 1873  
     }
 1874  
     // Get the counter values from the zookeeper, written by the workers
 1875  
     // We keep retrying until all the workers have written
 1876  0
     while (SystemTime.get().getMilliseconds() < waitForCountersTimeout) {
 1877  
       try {
 1878  0
         workerFinishedPathList = getZkExt().getChildrenExt(
 1879  
                 workerFinishedPath, true,
 1880  
                 false, true);
 1881  0
         LOG.info(String.format("Fetching counter values from " +
 1882  
                         "workers for superstep %d. Got %d out of %d",
 1883  0
                 superstep, workerFinishedPathList.size(), numWorkers));
 1884  0
         if (workerFinishedPathList.size() == numWorkers) {
 1885  0
           break;
 1886  
         }
 1887  0
       } catch (KeeperException e) {
 1888  0
         LOG.warn("Got Keeper exception, but will retry: ", e);
 1889  0
       } catch (InterruptedException e) {
 1890  0
         LOG.warn("aggregateCounters: InterruptedException", e);
 1891  0
       }
 1892  0
       getWrittenCountersToZKEvent().waitMsecs(eventWaitMsecs);
 1893  0
       getWrittenCountersToZKEvent().reset();
 1894  
     }
 1895  0
     for (String finishedPath : workerFinishedPathList) {
 1896  0
       JSONArray jsonCounters = null;
 1897  
       try {
 1898  0
         byte [] zkData =
 1899  0
                 getZkExt().getData(finishedPath, false, null);
 1900  0
         jsonCounters = new JSONArray(new String(zkData,
 1901  0
                 Charset.defaultCharset()));
 1902  0
         Set<CustomCounter> workerCounters = new HashSet<>();
 1903  0
         for (int i = 0; i < jsonCounters.length(); i++) {
 1904  0
           CustomCounter customCounter = new CustomCounter();
 1905  0
           WritableUtils.readFieldsFromByteArray(Base64.decode(
 1906  0
                   jsonCounters.getString(i)), customCounter);
 1907  0
           workerCounters.add(customCounter);
 1908  
         }
 1909  0
         customCounters.mergeCounters(workerCounters);
 1910  0
       } catch (JSONException e) {
 1911  0
         LOG.warn("aggregateCounters: JSONException", e);
 1912  0
       } catch (KeeperException e) {
 1913  0
         LOG.warn("aggregateCounters: KeeperException", e);
 1914  0
       } catch (InterruptedException e) {
 1915  0
         LOG.warn("aggregateCounters: InterruptedException", e);
 1916  0
       } catch (IOException e) {
 1917  0
         LOG.warn("aggregateCounters: IOException", e);
 1918  0
       }
 1919  0
     }
 1920  0
     Mapper.Context context = getContext();
 1921  0
     Set<CustomCounter> masterCounters = new HashSet<>();
 1922  
     // Add master counters too
 1923  0
     if (numWorkers != 1) {
 1924  
       // If numWorkers=1, then the master and worker share the counters
 1925  
       // Since we have already added the counters from the worker,
 1926  
       // we should not add them again here.
 1927  
       Counter counter;
 1928  
       Set<CustomCounter> masterCounterNames =
 1929  0
               CustomCounters.getAndClearCustomCounters();
 1930  0
       for (CustomCounter customCounter : masterCounterNames) {
 1931  0
         String groupName = customCounter.getGroupName();
 1932  0
         String counterName = customCounter.getCounterName();
 1933  0
         counter = context.getCounter(groupName, counterName);
 1934  0
         customCounter.setValue(counter.getValue());
 1935  0
         masterCounters.add(customCounter);
 1936  0
       }
 1937  
       // Adding Netty related counters
 1938  
       Map<String, Set<String>> nettyCounters =
 1939  0
               NettyClient.getCounterGroupsAndNames();
 1940  0
       populateCountersFromContext(context, nettyCounters, masterCounters);
 1941  
     }
 1942  
     // Adding counters from MasterInputSplitsHandler
 1943  
     // Even if number of workers = 1, we still need to add these counters
 1944  
     // explicitly because it exists only on the master
 1945  
     Map<String, Set<String>> inputSplitCounter =
 1946  0
             MasterInputSplitsHandler.getCounterGroupAndNames();
 1947  0
     populateCountersFromContext(context, inputSplitCounter, masterCounters);
 1948  0
     customCounters.mergeCounters(masterCounters);
 1949  
     // Add GiraphStats
 1950  0
     List<CustomCounter> allCounters = new ArrayList<>();
 1951  0
     allCounters.addAll(GiraphStats.getInstance().getCounterList());
 1952  
     // Custom counters
 1953  0
     allCounters.addAll(customCounters.getCounterList());
 1954  
     // Store in Thrift Struct
 1955  0
     giraphCountersThriftStruct.setCounters(allCounters);
 1956  0
   }
 1957  
 
 1958  
   /**
 1959  
    * We add the Giraph Timers separately, because we need to include
 1960  
    * the time required for shutdown and cleanup
 1961  
    * This will fetch the final Giraph Timers, and send all the counters
 1962  
    * to the job client
 1963  
    * @param superstep superstep for which the GiraphTimer will be sent
 1964  
    *
 1965  
    */
 1966  
   public void addGiraphTimersAndSendCounters(long superstep) {
 1967  0
     List<CustomCounter> giraphCounters =
 1968  0
             giraphCountersThriftStruct.getCounters();
 1969  0
     giraphCounters.addAll(GiraphTimers.getInstance().getCounterList(superstep));
 1970  0
     giraphCountersThriftStruct.setCounters(giraphCounters);
 1971  0
     getJobProgressTracker().sendMasterCounters(giraphCountersThriftStruct);
 1972  0
   }
 1973  
 
 1974  
   /**
 1975  
    * Need to clean up ZooKeeper nicely. This function ensures all the
 1976  
    * masters and workers have reported ending their ZooKeeper connections.
 1977  
    */
 1978  
   private void waitForWorkersToCleanup() {
 1979  
     try {
 1980  0
       getZkExt().createExt(cleanedUpPath,
 1981  
           null,
 1982  
           Ids.OPEN_ACL_UNSAFE,
 1983  
           CreateMode.PERSISTENT,
 1984  
           true);
 1985  0
     } catch (KeeperException.NodeExistsException e) {
 1986  0
       if (LOG.isInfoEnabled()) {
 1987  0
         LOG.info("cleanUpZooKeeper: Node " + cleanedUpPath +
 1988  
             " already exists, no need to create.");
 1989  
       }
 1990  0
     } catch (KeeperException e) {
 1991  0
       throw new IllegalStateException(
 1992  
           "cleanupZooKeeper: Got KeeperException", e);
 1993  0
     } catch (InterruptedException e) {
 1994  0
       throw new IllegalStateException(
 1995  
           "cleanupZooKeeper: Got IllegalStateException", e);
 1996  0
     }
 1997  
     // Need to wait for the number of workers and masters to complete
 1998  0
     int maxTasks = BspInputFormat.getMaxTasks(getConfiguration());
 1999  0
     if ((getGraphTaskManager().getGraphFunctions() == GraphFunctions.ALL) ||
 2000  0
         (getGraphTaskManager().getGraphFunctions() ==
 2001  
         GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) {
 2002  0
       maxTasks *= 2;
 2003  
     }
 2004  0
     List<String> cleanedUpChildrenList = null;
 2005  
     while (true) {
 2006  
       try {
 2007  0
         cleanedUpChildrenList =
 2008  0
             getZkExt().getChildrenExt(
 2009  
                 cleanedUpPath, true, false, true);
 2010  0
         if (LOG.isInfoEnabled()) {
 2011  0
           LOG.info("cleanUpZooKeeper: Got " +
 2012  0
               cleanedUpChildrenList.size() + " of " +
 2013  
               maxTasks  +  " desired children from " +
 2014  
               cleanedUpPath);
 2015  
         }
 2016  0
         if (cleanedUpChildrenList.size() == maxTasks) {
 2017  0
           break;
 2018  
         }
 2019  0
         if (LOG.isInfoEnabled()) {
 2020  0
           LOG.info("cleanedUpZooKeeper: Waiting for the " +
 2021  
               "children of " + cleanedUpPath +
 2022  
               " to change since only got " +
 2023  0
               cleanedUpChildrenList.size() + " nodes.");
 2024  
         }
 2025  0
       } catch (KeeperException e) {
 2026  
         // We are in the cleanup phase -- just log the error
 2027  0
         LOG.error("cleanUpZooKeeper: Got KeeperException, " +
 2028  
             "but will continue", e);
 2029  0
         return;
 2030  0
       } catch (InterruptedException e) {
 2031  
         // We are in the cleanup phase -- just log the error
 2032  0
         LOG.error("cleanUpZooKeeper: Got InterruptedException, " +
 2033  
             "but will continue", e);
 2034  0
         return;
 2035  0
       }
 2036  
 
 2037  0
       getCleanedUpChildrenChangedEvent().waitForTimeoutOrFail(
 2038  0
           GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
 2039  0
               getConfiguration()));
 2040  0
       getCleanedUpChildrenChangedEvent().reset();
 2041  
     }
 2042  0
   }
 2043  
 
 2044  
   /**
 2045  
    * This will perform the final cleanup of the zookeeper i.e.
 2046  
    * delete the basePath directory
 2047  
    */
 2048  
   private void cleanUpZooKeeper() {
 2049  
     // At this point, all processes have acknowledged the cleanup,
 2050  
     // and the master can do any final cleanup if the ZooKeeper service was
 2051  
     // provided (not dynamically started) and we don't want to keep the data
 2052  
     try {
 2053  0
       if (getConfiguration().isZookeeperExternal() &&
 2054  0
               KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
 2055  0
         if (LOG.isInfoEnabled()) {
 2056  0
           LOG.info("cleanupZooKeeper: Removing the following path " +
 2057  
                   "and all children - " + basePath + " from ZooKeeper list " +
 2058  0
                   getConfiguration().getZookeeperList());
 2059  
         }
 2060  0
         getZkExt().deleteExt(basePath, -1, true);
 2061  
       }
 2062  0
     } catch (KeeperException e) {
 2063  0
       LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
 2064  
               basePath + " due to KeeperException", e);
 2065  0
     } catch (InterruptedException e) {
 2066  0
       LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
 2067  
               basePath + " due to InterruptedException", e);
 2068  0
     }
 2069  0
   }
 2070  
 
 2071  
   @Override
 2072  
   public void postApplication() {
 2073  0
     for (MasterObserver observer : observers) {
 2074  0
       observer.postApplication();
 2075  0
       getContext().progress();
 2076  
     }
 2077  0
   }
 2078  
 
 2079  
   @Override
 2080  
   public void postSuperstep() {
 2081  0
     for (MasterObserver observer : observers) {
 2082  0
       observer.postSuperstep(getSuperstep());
 2083  0
       getContext().progress();
 2084  
     }
 2085  0
   }
 2086  
 
 2087  
   @Override
 2088  
   public void failureCleanup(Exception e) {
 2089  0
     for (MasterObserver observer : observers) {
 2090  
       try {
 2091  0
         observer.applicationFailed(e);
 2092  
         // CHECKSTYLE: stop IllegalCatchCheck
 2093  0
       } catch (RuntimeException re) {
 2094  
         // CHECKSTYLE: resume IllegalCatchCheck
 2095  0
         LOG.error(re.getClass().getName() + " from observer " +
 2096  0
             observer.getClass().getName(), re);
 2097  0
       }
 2098  0
       getContext().progress();
 2099  
     }
 2100  0
   }
 2101  
 
 2102  
   @Override
 2103  
   public void cleanup(SuperstepState superstepState) throws IOException {
 2104  0
     ImmutableClassesGiraphConfiguration conf = getConfiguration();
 2105  
 
 2106  
     // All master processes should denote they are done by adding special
 2107  
     // znode.  Once the number of znodes equals the number of partitions
 2108  
     // for workers and masters, the master will clean up the ZooKeeper
 2109  
     // znodes associated with this job.
 2110  0
     String masterCleanedUpPath = cleanedUpPath  + "/" +
 2111  0
         getTaskId() + MASTER_SUFFIX;
 2112  
     try {
 2113  0
       String finalFinishedPath =
 2114  0
           getZkExt().createExt(masterCleanedUpPath,
 2115  
               null,
 2116  
               Ids.OPEN_ACL_UNSAFE,
 2117  
               CreateMode.PERSISTENT,
 2118  
               true);
 2119  0
       if (LOG.isInfoEnabled()) {
 2120  0
         LOG.info("cleanup: Notifying master its okay to cleanup with " +
 2121  
             finalFinishedPath);
 2122  
       }
 2123  0
     } catch (KeeperException.NodeExistsException e) {
 2124  0
       if (LOG.isInfoEnabled()) {
 2125  0
         LOG.info("cleanup: Couldn't create finished node '" +
 2126  
             masterCleanedUpPath);
 2127  
       }
 2128  0
     } catch (KeeperException e) {
 2129  0
       LOG.error("cleanup: Got KeeperException, continuing", e);
 2130  0
     } catch (InterruptedException e) {
 2131  0
       LOG.error("cleanup: Got InterruptedException, continuing", e);
 2132  0
     }
 2133  
 
 2134  0
     if (isMaster) {
 2135  0
       getGraphTaskManager().setIsMaster(true);
 2136  0
       waitForWorkersToCleanup();
 2137  0
       aggregateCountersFromWorkersAndMaster();
 2138  0
       cleanUpZooKeeper();
 2139  
       // If desired, cleanup the checkpoint directory
 2140  0
       if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE &&
 2141  0
           GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
 2142  0
         boolean success =
 2143  0
             getFs().delete(new Path(checkpointBasePath), true);
 2144  0
         if (LOG.isInfoEnabled()) {
 2145  0
           LOG.info("cleanup: Removed HDFS checkpoint directory (" +
 2146  
               checkpointBasePath + ") with return = " +
 2147  0
               success + " since the job " + getContext().getJobName() +
 2148  
               " succeeded ");
 2149  
         }
 2150  
       }
 2151  0
       if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) {
 2152  0
         getFs().create(CheckpointingUtils.getCheckpointMarkPath(conf,
 2153  0
             getJobId()), true);
 2154  0
         failJob(new Exception("Checkpoint and halt requested. " +
 2155  
             "Killing this job."));
 2156  
       }
 2157  0
       globalCommHandler.getAggregatorHandler().close();
 2158  0
       masterClient.closeConnections();
 2159  0
       masterServer.close();
 2160  
     }
 2161  
 
 2162  
     try {
 2163  0
       getZkExt().close();
 2164  0
     } catch (InterruptedException e) {
 2165  
       // cleanup phase -- just log the error
 2166  0
       LOG.error("cleanup: Zookeeper failed to close", e);
 2167  0
     }
 2168  0
   }
 2169  
 
 2170  
   /**
 2171  
    * Event that the master watches that denotes when a worker wrote checkpoint
 2172  
    *
 2173  
    * @return Event that denotes when a worker wrote checkpoint
 2174  
    */
 2175  
   public final BspEvent getWorkerWroteCheckpointEvent() {
 2176  0
     return workerWroteCheckpoint;
 2177  
   }
 2178  
 
 2179  
   /**
 2180  
    * Event that the master watches that denotes if a worker has done something
 2181  
    * that changes the state of a superstep (either a worker completed or died)
 2182  
    *
 2183  
    * @return Event that denotes a superstep state change
 2184  
    */
 2185  
   public final BspEvent getSuperstepStateChangedEvent() {
 2186  0
     return superstepStateChanged;
 2187  
   }
 2188  
 
 2189  
   /**
 2190  
    * Should this worker failure cause the current superstep to fail?
 2191  
    *
 2192  
    * @param failedWorkerPath Full path to the failed worker
 2193  
    */
 2194  
   private void checkHealthyWorkerFailure(String failedWorkerPath) {
 2195  0
     if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) {
 2196  0
       return;
 2197  
     }
 2198  
 
 2199  0
     Collection<PartitionOwner> partitionOwners =
 2200  0
         masterGraphPartitioner.getCurrentPartitionOwners();
 2201  0
     String hostnameId =
 2202  0
         getHealthyHostnameIdFromPath(failedWorkerPath);
 2203  0
     for (PartitionOwner partitionOwner : partitionOwners) {
 2204  0
       WorkerInfo workerInfo = partitionOwner.getWorkerInfo();
 2205  0
       WorkerInfo previousWorkerInfo =
 2206  0
           partitionOwner.getPreviousWorkerInfo();
 2207  0
       if (workerInfo.getHostnameId().equals(hostnameId) ||
 2208  
           ((previousWorkerInfo != null) &&
 2209  0
               previousWorkerInfo.getHostnameId().equals(hostnameId))) {
 2210  0
         LOG.warn("checkHealthyWorkerFailure: " +
 2211  
             "at least one healthy worker went down " +
 2212  0
             "for superstep " + getSuperstep() + " - " +
 2213  
             hostnameId + ", will try to restart from " +
 2214  
             "checkpointed superstep " +
 2215  
             lastCheckpointedSuperstep);
 2216  0
         superstepStateChanged.signal();
 2217  
       }
 2218  0
     }
 2219  0
   }
 2220  
 
 2221  
   @Override
 2222  
   public boolean processEvent(WatchedEvent event) {
 2223  0
     boolean foundEvent = false;
 2224  0
     if (event.getPath().contains(WORKER_HEALTHY_DIR) &&
 2225  0
         (event.getType() == EventType.NodeDeleted)) {
 2226  0
       if (LOG.isDebugEnabled()) {
 2227  0
         LOG.debug("processEvent: Healthy worker died (node deleted) " +
 2228  0
             "in " + event.getPath());
 2229  
       }
 2230  0
       checkHealthyWorkerFailure(event.getPath());
 2231  0
       superstepStateChanged.signal();
 2232  0
       foundEvent = true;
 2233  0
     } else if (event.getPath().endsWith(METRICS_DIR) &&
 2234  0
         event.getType() == EventType.NodeChildrenChanged) {
 2235  0
       if (LOG.isDebugEnabled()) {
 2236  0
         LOG.debug("processEvent: Worker finished (node change) " +
 2237  
             "event - superstepStateChanged signaled");
 2238  
       }
 2239  0
       superstepStateChanged.signal();
 2240  0
       foundEvent = true;
 2241  0
     } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) &&
 2242  0
         event.getType() == EventType.NodeChildrenChanged) {
 2243  0
       if (LOG.isDebugEnabled()) {
 2244  0
         LOG.debug("processEvent: Worker wrote checkpoint (node change) " +
 2245  
             "event - workerWroteCheckpoint signaled");
 2246  
       }
 2247  0
       workerWroteCheckpoint.signal();
 2248  0
       foundEvent = true;
 2249  
     }
 2250  
 
 2251  0
     return foundEvent;
 2252  
   }
 2253  
 
 2254  
   /**
 2255  
    * Set values of counters to match the ones from {@link GlobalStats}
 2256  
    *
 2257  
    * @param globalStats Global statistics which holds new counter values
 2258  
    */
 2259  
   private void updateCounters(GlobalStats globalStats) {
 2260  0
     GiraphStats gs = GiraphStats.getInstance();
 2261  0
     gs.getVertices().setValue(globalStats.getVertexCount());
 2262  0
     gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount());
 2263  0
     gs.getEdges().setValue(globalStats.getEdgeCount());
 2264  0
     gs.getSentMessages().setValue(globalStats.getMessageCount());
 2265  0
     gs.getSentMessageBytes().setValue(globalStats.getMessageBytesCount());
 2266  0
     gs.getAggregateSentMessages().increment(globalStats.getMessageCount());
 2267  0
     gs.getAggregateSentMessageBytes()
 2268  0
       .increment(globalStats.getMessageBytesCount());
 2269  0
     gs.getAggregateOOCBytesLoaded()
 2270  0
       .increment(globalStats.getOocLoadBytesCount());
 2271  0
     gs.getAggregateOOCBytesStored()
 2272  0
       .increment(globalStats.getOocStoreBytesCount());
 2273  
     // Updating the lowest percentage of graph in memory throughout the
 2274  
     // execution across all the supersteps
 2275  0
     int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue();
 2276  0
     gs.getLowestGraphPercentageInMemory().setValue(
 2277  0
         Math.min(percentage, globalStats.getLowestGraphPercentageInMemory()));
 2278  0
   }
 2279  
 }