Coverage Report - org.apache.giraph.graph.GraphTaskManager
 
Classes in this File Line Coverage Branch Coverage Complexity
GraphTaskManager
0%
0/383
0%
0/152
2.981
GraphTaskManager$1
0%
0/17
0%
0/8
2.981
GraphTaskManager$2
0%
0/4
N/A
2.981
GraphTaskManager$CheckerIfWorkerShouldFailAfterException
N/A
N/A
2.981
GraphTaskManager$FailWithEveryExceptionOccurred
0%
0/2
N/A
2.981
GraphTaskManager$OverrideExceptionHandler
0%
0/17
0%
0/2
2.981
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.graph;
 20  
 
 21  
 import java.io.IOException;
 22  
 import java.lang.management.GarbageCollectorMXBean;
 23  
 import java.lang.management.ManagementFactory;
 24  
 import java.util.ArrayList;
 25  
 import java.util.Collection;
 26  
 import java.util.Enumeration;
 27  
 import java.util.List;
 28  
 import java.util.concurrent.Callable;
 29  
 import java.util.concurrent.TimeUnit;
 30  
 
 31  
 import com.sun.management.GarbageCollectionNotificationInfo;
 32  
 import com.yammer.metrics.core.Counter;
 33  
 
 34  
 import org.apache.commons.lang3.exception.ExceptionUtils;
 35  
 import org.apache.giraph.bsp.BspService;
 36  
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 37  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 38  
 import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 39  
 import org.apache.giraph.comm.messages.MessageStore;
 40  
 import org.apache.giraph.conf.ClassConfOption;
 41  
 import org.apache.giraph.conf.GiraphConstants;
 42  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 43  
 import org.apache.giraph.job.JobProgressTracker;
 44  
 import org.apache.giraph.master.BspServiceMaster;
 45  
 import org.apache.giraph.master.MasterThread;
 46  
 import org.apache.giraph.metrics.GiraphMetrics;
 47  
 import org.apache.giraph.metrics.GiraphMetricsRegistry;
 48  
 import org.apache.giraph.metrics.GiraphTimer;
 49  
 import org.apache.giraph.metrics.GiraphTimerContext;
 50  
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 51  
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 52  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 53  
 import org.apache.giraph.partition.PartitionOwner;
 54  
 import org.apache.giraph.partition.PartitionStats;
 55  
 import org.apache.giraph.partition.PartitionStore;
 56  
 import org.apache.giraph.scripting.ScriptLoader;
 57  
 import org.apache.giraph.utils.CallableFactory;
 58  
 import org.apache.giraph.utils.GcObserver;
 59  
 import org.apache.giraph.utils.MemoryUtils;
 60  
 import org.apache.giraph.utils.ProgressableUtils;
 61  
 import org.apache.giraph.worker.BspServiceWorker;
 62  
 import org.apache.giraph.worker.InputSplitsCallable;
 63  
 import org.apache.giraph.worker.WorkerContext;
 64  
 import org.apache.giraph.worker.WorkerObserver;
 65  
 import org.apache.giraph.worker.WorkerProgress;
 66  
 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
 67  
 import org.apache.giraph.zk.ZooKeeperManager;
 68  
 import org.apache.hadoop.conf.Configuration;
 69  
 import org.apache.hadoop.fs.Path;
 70  
 import org.apache.hadoop.io.Writable;
 71  
 import org.apache.hadoop.io.WritableComparable;
 72  
 import org.apache.hadoop.mapreduce.Mapper;
 73  
 import org.apache.log4j.Appender;
 74  
 import org.apache.log4j.Level;
 75  
 import org.apache.log4j.LogManager;
 76  
 import org.apache.log4j.Logger;
 77  
 import org.apache.log4j.PatternLayout;
 78  
 
 79  
 import javax.management.Notification;
 80  
 import javax.management.NotificationEmitter;
 81  
 import javax.management.NotificationListener;
 82  
 import javax.management.openmbean.CompositeData;
 83  
 
 84  
 /**
 85  
  * The Giraph-specific business logic for a single BSP
 86  
  * compute node in whatever underlying type of cluster
 87  
  * our Giraph job will run on. Owning object will provide
 88  
  * the glue into the underlying cluster framework
 89  
  * and will call this object to perform Giraph work.
 90  
  *
 91  
  * @param <I> Vertex id
 92  
  * @param <V> Vertex data
 93  
  * @param <E> Edge data
 94  
  */
 95  
 @SuppressWarnings("rawtypes")
 96  0
 public class GraphTaskManager<I extends WritableComparable, V extends Writable,
 97  
   E extends Writable> implements
 98  
   ResetSuperstepMetricsObserver {
 99  
 /*if_not[PURE_YARN]
 100  
   static { // Eliminate this? Even MRv1 tasks should not need it here.
 101  
     Configuration.addDefaultResource("giraph-site.xml");
 102  
   }
 103  
 end[PURE_YARN]*/
 104  
   /**
 105  
    * Class which checks if an exception on some thread should cause worker
 106  
    * to fail
 107  
    */
 108  
   public static final ClassConfOption<CheckerIfWorkerShouldFailAfterException>
 109  0
   CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS = ClassConfOption.create(
 110  
       "giraph.checkerIfWorkerShouldFailAfterExceptionClass",
 111  
       FailWithEveryExceptionOccurred.class,
 112  
       CheckerIfWorkerShouldFailAfterException.class,
 113  
       "Class which checks if an exception on some thread should cause worker " +
 114  
           "to fail, by default all exceptions cause failure");
 115  
   /** Name of metric for superstep time in msec */
 116  
   public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
 117  
   /** Name of metric for compute on all vertices in msec */
 118  
   public static final String TIMER_COMPUTE_ALL = "compute-all-ms";
 119  
   /** Name of metric for time from begin compute to first message sent */
 120  
   public static final String TIMER_TIME_TO_FIRST_MSG =
 121  
       "time-to-first-message-ms";
 122  
   /** Name of metric for time from first message till last message flushed */
 123  
   public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms";
 124  
   /** Name of metric for time spent doing GC per superstep in msec */
 125  
   public static final String TIMER_SUPERSTEP_GC_TIME = "superstep-gc-time-ms";
 126  
 
 127  
   /** Class logger */
 128  0
   private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
 129  
   /** Coordination service worker */
 130  
   private CentralizedServiceWorker<I, V, E> serviceWorker;
 131  
   /** Coordination service master */
 132  
   private CentralizedServiceMaster<I, V, E> serviceMaster;
 133  
   /** Coordination service master thread */
 134  0
   private Thread masterThread = null;
 135  
   /** The worker should be run exactly once, or else there is a problem. */
 136  0
   private boolean alreadyRun = false;
 137  
   /** Manages the ZooKeeper servers if necessary (dynamic startup) */
 138  
   private ZooKeeperManager zkManager;
 139  
   /** Configuration */
 140  
   private ImmutableClassesGiraphConfiguration<I, V, E> conf;
 141  
   /** Already complete? */
 142  0
   private boolean done = false;
 143  
   /** What kind of functions is this mapper doing? */
 144  0
   private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN;
 145  
   /** Superstep stats */
 146  0
   private FinishedSuperstepStats finishedSuperstepStats =
 147  
       new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE);
 148  
   /** Job progress tracker */
 149  
   private JobProgressTrackerClient jobProgressTracker;
 150  
 
 151  
   // Per-Job Metrics
 152  
   /** Timer for WorkerContext#preApplication() */
 153  
   private GiraphTimer wcPreAppTimer;
 154  
   /** Timer for WorkerContext#postApplication() */
 155  
   private GiraphTimer wcPostAppTimer;
 156  
 
 157  
   // Per-Superstep Metrics
 158  
   /** Time for how long superstep took */
 159  
   private GiraphTimer superstepTimer;
 160  
   /** Time for all compute() calls in a superstep */
 161  
   private GiraphTimer computeAll;
 162  
   /** Time from starting compute to sending first message */
 163  
   private GiraphTimer timeToFirstMessage;
 164  
   /** Context for timing time to first message above */
 165  
   private GiraphTimerContext timeToFirstMessageTimerContext;
 166  
   /** Time from first sent message till last message flushed. */
 167  
   private GiraphTimer communicationTimer;
 168  
   /** Context for timing communication time above */
 169  
   private GiraphTimerContext communicationTimerContext;
 170  
   /** Timer for WorkerContext#preSuperstep() */
 171  
   private GiraphTimer wcPreSuperstepTimer;
 172  
   /** Timer to keep aggregated time spent in GC in a superstep */
 173  
   private Counter gcTimeMetric;
 174  
   /** The Hadoop Mapper#Context for this job */
 175  
   private final Mapper<?, ?, ?, ?>.Context context;
 176  
   /** is this GraphTaskManager the master? */
 177  
   private boolean isMaster;
 178  
   /** Mapper observers */
 179  
   private MapperObserver[] mapperObservers;
 180  
 
 181  
   /**
 182  
    * Default constructor for GiraphTaskManager.
 183  
    * @param context a handle to the underlying cluster framework.
 184  
    *                For Hadoop clusters, this is a Mapper#Context.
 185  
    */
 186  0
   public GraphTaskManager(Mapper<?, ?, ?, ?>.Context context) {
 187  0
     this.context = context;
 188  0
     this.isMaster = false;
 189  0
   }
 190  
 
 191  
   /**
 192  
    * Run the user's input checking code.
 193  
    */
 194  
   private void checkInput() {
 195  0
     if (conf.hasEdgeInputFormat()) {
 196  0
       conf.createWrappedEdgeInputFormat().checkInputSpecs(conf);
 197  
     }
 198  0
     if (conf.hasVertexInputFormat()) {
 199  0
       conf.createWrappedVertexInputFormat().checkInputSpecs(conf);
 200  
     }
 201  0
   }
 202  
 
 203  
   /**
 204  
    * In order for job client to know which ZooKeeper the job is using,
 205  
    * we create a counter with server:port as its name inside of
 206  
    * ZOOKEEPER_SERVER_PORT_COUNTER_GROUP.
 207  
    *
 208  
    * @param serverPortList Server:port list for ZooKeeper used
 209  
    */
 210  
   private void createZooKeeperCounter(String serverPortList) {
 211  
     // Getting the counter will actually create it.
 212  0
     context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP,
 213  
         serverPortList);
 214  0
   }
 215  
 
 216  
   /**
 217  
    * Called by owner of this GraphTaskManager on each compute node
 218  
    *
 219  
    * @param zkPathList the path to the ZK jars we need to run the job
 220  
    */
 221  
   public void setup(Path[] zkPathList)
 222  
     throws IOException, InterruptedException {
 223  0
     context.setStatus("setup: Beginning worker setup.");
 224  0
     Configuration hadoopConf = context.getConfiguration();
 225  0
     conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
 226  0
     initializeJobProgressTracker();
 227  
     // Setting the default handler for uncaught exceptions.
 228  0
     Thread.setDefaultUncaughtExceptionHandler(createUncaughtExceptionHandler());
 229  0
     setupMapperObservers();
 230  
     // Write user's graph types (I,V,E,M) back to configuration parameters so
 231  
     // that they are set for quicker access later. These types are often
 232  
     // inferred from the Computation class used.
 233  0
     conf.getGiraphTypes().writeIfUnset(conf);
 234  
     // configure global logging level for Giraph job
 235  0
     initializeAndConfigureLogging();
 236  
     // init the metrics objects
 237  0
     setupAndInitializeGiraphMetrics();
 238  
     // Check input
 239  0
     checkInput();
 240  
     // Load any scripts that were deployed
 241  0
     ScriptLoader.loadScripts(conf);
 242  
     // One time setup for computation factory
 243  0
     conf.createComputationFactory().initialize(conf);
 244  
     // Do some task setup (possibly starting up a Zookeeper service)
 245  0
     context.setStatus("setup: Initializing Zookeeper services.");
 246  0
     String serverPortList = conf.getZookeeperList();
 247  0
     if (serverPortList.isEmpty()) {
 248  0
       if (startZooKeeperManager()) {
 249  0
         return; // ZK connect/startup failed
 250  
       }
 251  
     } else {
 252  0
       createZooKeeperCounter(serverPortList);
 253  
     }
 254  0
     if (zkManager != null && zkManager.runsZooKeeper()) {
 255  0
       if (LOG.isInfoEnabled()) {
 256  0
         LOG.info("setup: Chosen to run ZooKeeper...");
 257  
       }
 258  
     }
 259  0
     context
 260  0
         .setStatus("setup: Connected to Zookeeper service " + serverPortList);
 261  0
     this.graphFunctions = determineGraphFunctions(conf, zkManager);
 262  0
     if (zkManager != null && this.graphFunctions.isMaster()) {
 263  0
       zkManager.cleanupOnExit();
 264  
     }
 265  
     try {
 266  0
       instantiateBspService();
 267  0
     } catch (IOException e) {
 268  0
       LOG.error("setup: Caught exception just before end of setup", e);
 269  0
       if (zkManager != null) {
 270  0
         zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FAILED);
 271  
       }
 272  0
       throw new RuntimeException(
 273  
         "setup: Offlining servers due to exception...", e);
 274  0
     }
 275  0
     context.setStatus(getGraphFunctions().toString() + " starting...");
 276  0
   }
 277  
 
 278  
   /**
 279  
    * Create and connect a client to JobProgressTrackerService,
 280  
    * or no-op implementation if progress shouldn't be tracked or something
 281  
    * goes wrong
 282  
    */
 283  
   private void initializeJobProgressTracker() {
 284  0
     if (!conf.trackJobProgressOnClient()) {
 285  0
       jobProgressTracker = new JobProgressTrackerClientNoOp();
 286  
     } else {
 287  0
       jobProgressTracker =
 288  0
         GiraphConstants.JOB_PROGRESS_TRACKER_CLIENT_CLASS.newInstance(conf);
 289  
       try {
 290  0
         jobProgressTracker.init(conf);
 291  
         // CHECKSTYLE: stop IllegalCatch
 292  0
       } catch (Exception e) {
 293  
         // CHECKSTYLE: resume IllegalCatch
 294  0
         throw new RuntimeException(
 295  
           "Failed to initialize JobProgressTrackerClient", e);
 296  0
       }
 297  
     }
 298  0
     jobProgressTracker.mapperStarted();
 299  0
   }
 300  
 
 301  
   /**
 302  
   * Perform the work assigned to this compute node for this job run.
 303  
   * 1) Run checkpoint per frequency policy.
 304  
   * 2) For every vertex on this mapper, run the compute() function
 305  
   * 3) Wait until all messaging is done.
 306  
   * 4) Check if all vertices are done.  If not goto 2).
 307  
   * 5) Dump output.
 308  
   */
 309  
   public void execute() throws IOException, InterruptedException {
 310  0
     if (checkTaskState()) {
 311  0
       return;
 312  
     }
 313  0
     preLoadOnWorkerObservers();
 314  0
     GiraphTimerContext superstepTimerContext = superstepTimer.time();
 315  0
     finishedSuperstepStats = serviceWorker.setup();
 316  0
     superstepTimerContext.stop();
 317  0
     if (collectInputSuperstepStats(finishedSuperstepStats)) {
 318  0
       return;
 319  
     }
 320  0
     prepareGraphStateAndWorkerContext();
 321  0
     List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>();
 322  0
     int numComputeThreads = conf.getNumComputeThreads();
 323  
 
 324  
     // main superstep processing loop
 325  0
     while (!finishedSuperstepStats.allVerticesHalted()) {
 326  0
       final long superstep = serviceWorker.getSuperstep();
 327  0
       superstepTimerContext = getTimerForThisSuperstep(superstep);
 328  0
       GraphState graphState = new GraphState(superstep,
 329  0
           finishedSuperstepStats.getVertexCount(),
 330  0
           finishedSuperstepStats.getEdgeCount(),
 331  
           context);
 332  0
       Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
 333  0
         serviceWorker.startSuperstep();
 334  0
       if (LOG.isDebugEnabled()) {
 335  0
         LOG.debug("execute: " + MemoryUtils.getRuntimeMemoryStats());
 336  
       }
 337  0
       context.progress();
 338  0
       serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
 339  0
       context.progress();
 340  0
       boolean hasBeenRestarted = checkSuperstepRestarted(superstep);
 341  
 
 342  0
       GlobalStats globalStats = serviceWorker.getGlobalStats();
 343  
 
 344  0
       if (hasBeenRestarted) {
 345  0
         graphState = new GraphState(superstep,
 346  0
             finishedSuperstepStats.getVertexCount(),
 347  0
             finishedSuperstepStats.getEdgeCount(),
 348  
             context);
 349  0
       } else if (storeCheckpoint(globalStats.getCheckpointStatus())) {
 350  0
         break;
 351  
       }
 352  0
       serviceWorker.getServerData().prepareResolveMutations();
 353  0
       context.progress();
 354  0
       prepareForSuperstep(graphState);
 355  0
       context.progress();
 356  0
       MessageStore<I, Writable> messageStore =
 357  0
           serviceWorker.getServerData().getCurrentMessageStore();
 358  0
       int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
 359  0
       int numThreads = Math.min(numComputeThreads, numPartitions);
 360  0
       if (LOG.isInfoEnabled()) {
 361  0
         LOG.info("execute: " + numPartitions + " partitions to process with " +
 362  
           numThreads + " compute thread(s), originally " +
 363  
           numComputeThreads + " thread(s) on superstep " + superstep);
 364  
       }
 365  0
       partitionStatsList.clear();
 366  
       // execute the current superstep
 367  0
       if (numPartitions > 0) {
 368  0
         processGraphPartitions(context, partitionStatsList, graphState,
 369  
           messageStore, numThreads);
 370  
       }
 371  0
       finishedSuperstepStats = completeSuperstepAndCollectStats(
 372  
         partitionStatsList, superstepTimerContext);
 373  
 
 374  
       // END of superstep compute loop
 375  0
     }
 376  
 
 377  0
     if (LOG.isInfoEnabled()) {
 378  0
       LOG.info("execute: BSP application done (global vertices marked done)");
 379  
     }
 380  0
     updateSuperstepGraphState();
 381  0
     postApplication();
 382  0
   }
 383  
 
 384  
   /**
 385  
    * Handle post-application callbacks.
 386  
    */
 387  
   private void postApplication() throws IOException, InterruptedException {
 388  0
     GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
 389  0
     serviceWorker.getWorkerContext().postApplication();
 390  0
     serviceWorker.getSuperstepOutput().postApplication();
 391  0
     postAppTimerContext.stop();
 392  0
     context.progress();
 393  
 
 394  0
     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
 395  0
       obs.postApplication();
 396  0
       context.progress();
 397  
     }
 398  0
   }
 399  
 
 400  
   /**
 401  
    * Sets the "isMaster" flag for final output commit to happen on master.
 402  
    * @param im the boolean input to set isMaster. Applies to "pure YARN only"
 403  
    */
 404  
   public void setIsMaster(final boolean im) {
 405  0
     this.isMaster = im;
 406  0
   }
 407  
 
 408  
   /**
 409  
    * Get "isMaster" status flag -- we need to know if we're the master in the
 410  
    * "finally" block of our GiraphYarnTask#execute() to commit final job output.
 411  
    * @return true if this task IS the master.
 412  
    */
 413  
   public boolean isMaster() {
 414  0
     return isMaster;
 415  
   }
 416  
 
 417  
   /**
 418  
    * Produce a reference to the "start" superstep timer for the current
 419  
    * superstep.
 420  
    * @param superstep the current superstep count
 421  
    * @return a GiraphTimerContext representing the "start" of the supestep
 422  
    */
 423  
   private GiraphTimerContext getTimerForThisSuperstep(long superstep) {
 424  0
     GiraphMetrics.get().resetSuperstepMetrics(superstep);
 425  0
     return superstepTimer.time();
 426  
   }
 427  
 
 428  
   /**
 429  
    * Utility to encapsulate Giraph metrics setup calls
 430  
    */
 431  
   private void setupAndInitializeGiraphMetrics() {
 432  0
     GiraphMetrics.init(conf);
 433  0
     GiraphMetrics.get().addSuperstepResetObserver(this);
 434  0
     initJobMetrics();
 435  0
     MemoryUtils.initMetrics();
 436  0
     InputSplitsCallable.initMetrics();
 437  0
   }
 438  
 
 439  
   /**
 440  
    * Instantiate and configure ZooKeeperManager for this job. This will
 441  
    * result in a Giraph-owned Zookeeper instance, a connection to an
 442  
    * existing quorum as specified in the job configuration, or task failure
 443  
    * @return true if this task should terminate
 444  
    */
 445  
   private boolean startZooKeeperManager()
 446  
     throws IOException, InterruptedException {
 447  0
     zkManager = new ZooKeeperManager(context, conf);
 448  0
     context.setStatus("setup: Setting up Zookeeper manager.");
 449  0
     zkManager.setup();
 450  0
     if (zkManager.computationDone()) {
 451  0
       done = true;
 452  0
       return true;
 453  
     }
 454  0
     zkManager.onlineZooKeeperServer();
 455  0
     String serverPortList = zkManager.getZooKeeperServerPortString();
 456  0
     conf.setZookeeperList(serverPortList);
 457  0
     createZooKeeperCounter(serverPortList);
 458  0
     return false;
 459  
   }
 460  
 
 461  
   /**
 462  
    * Utility to place a new, updated GraphState object into the serviceWorker.
 463  
    */
 464  
   private void updateSuperstepGraphState() {
 465  0
     serviceWorker.getWorkerContext().setGraphState(
 466  0
         new GraphState(serviceWorker.getSuperstep(),
 467  0
             finishedSuperstepStats.getVertexCount(),
 468  0
             finishedSuperstepStats.getEdgeCount(), context));
 469  0
   }
 470  
 
 471  
   /**
 472  
    * Utility function for boilerplate updates and cleanup done at the
 473  
    * end of each superstep processing loop in the <code>execute</code> method.
 474  
    * @param partitionStatsList list of stas for each superstep to append to
 475  
    * @param superstepTimerContext for job metrics
 476  
    * @return the collected stats at the close of the current superstep.
 477  
    */
 478  
   private FinishedSuperstepStats completeSuperstepAndCollectStats(
 479  
     List<PartitionStats> partitionStatsList,
 480  
     GiraphTimerContext superstepTimerContext) {
 481  
 
 482  
     // the superstep timer is stopped inside the finishSuperstep function
 483  
     // (otherwise metrics are not available at the end of the computation
 484  
     //  using giraph.metrics.enable=true).
 485  0
     finishedSuperstepStats =
 486  0
       serviceWorker.finishSuperstep(partitionStatsList, superstepTimerContext);
 487  0
     if (conf.metricsEnabled()) {
 488  0
       GiraphMetrics.get().perSuperstep().printSummary(System.err);
 489  
     }
 490  0
     return finishedSuperstepStats;
 491  
   }
 492  
 
 493  
   /**
 494  
    * Utility function to prepare various objects managing BSP superstep
 495  
    * operations for the next superstep.
 496  
    * @param graphState graph state metadata object
 497  
    */
 498  
   private void prepareForSuperstep(GraphState graphState) {
 499  0
     serviceWorker.prepareSuperstep();
 500  
 
 501  0
     serviceWorker.getWorkerContext().setGraphState(graphState);
 502  0
     serviceWorker.getWorkerContext().setupSuperstep(serviceWorker);
 503  0
     GiraphTimerContext preSuperstepTimer = wcPreSuperstepTimer.time();
 504  0
     serviceWorker.getWorkerContext().preSuperstep();
 505  0
     preSuperstepTimer.stop();
 506  0
     context.progress();
 507  
 
 508  0
     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
 509  0
       obs.preSuperstep(graphState.getSuperstep());
 510  0
       context.progress();
 511  
     }
 512  0
   }
 513  
 
 514  
   /**
 515  
    * Prepare graph state and worker context for superstep cycles.
 516  
    */
 517  
   private void prepareGraphStateAndWorkerContext() {
 518  0
     updateSuperstepGraphState();
 519  0
     workerContextPreApp();
 520  0
   }
 521  
 
 522  
   /**
 523  
     * Get the worker function enum.
 524  
     *
 525  
     * @return an enum detailing the roles assigned to this
 526  
     *         compute node for this Giraph job.
 527  
     */
 528  
   public GraphFunctions getGraphFunctions() {
 529  0
     return graphFunctions;
 530  
   }
 531  
 
 532  
   public final WorkerContext getWorkerContext() {
 533  0
     return serviceWorker.getWorkerContext();
 534  
   }
 535  
 
 536  
   public JobProgressTracker getJobProgressTracker() {
 537  0
     return jobProgressTracker;
 538  
   }
 539  
 
 540  
   /**
 541  
    * Figure out what roles this BSP compute node should take on in the job.
 542  
    * Basic logic is as follows:
 543  
    * 1) If not split master, everyone does the everything and/or running
 544  
    *    ZooKeeper.
 545  
    * 2) If split master/worker, masters also run ZooKeeper
 546  
    *
 547  
    * 3) If split master/worker == true and <code>giraph.zkList</code> is
 548  
    *    externally provided, the master will not instantiate a ZK instance, but
 549  
    *    will assume a quorum is already active on the cluster for Giraph to use.
 550  
    *
 551  
    * @param conf Configuration to use
 552  
    * @param zkManager ZooKeeper manager to help determine whether to run
 553  
    *        ZooKeeper.
 554  
    * @return Functions that this mapper should do.
 555  
    */
 556  
   private static GraphFunctions determineGraphFunctions(
 557  
       ImmutableClassesGiraphConfiguration conf,
 558  
       ZooKeeperManager zkManager) {
 559  0
     boolean splitMasterWorker = conf.getSplitMasterWorker();
 560  0
     int taskPartition = conf.getTaskPartition();
 561  0
     boolean zkAlreadyProvided = conf.isZookeeperExternal();
 562  0
     GraphFunctions functions = GraphFunctions.UNKNOWN;
 563  
     // What functions should this mapper do?
 564  0
     if (!splitMasterWorker) {
 565  0
       if ((zkManager != null) && zkManager.runsZooKeeper()) {
 566  0
         functions = GraphFunctions.ALL;
 567  
       } else {
 568  0
         functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
 569  
       }
 570  
     } else {
 571  0
       if (zkAlreadyProvided) {
 572  0
         if (taskPartition == 0) {
 573  0
           functions = GraphFunctions.MASTER_ONLY;
 574  
         } else {
 575  0
           functions = GraphFunctions.WORKER_ONLY;
 576  
         }
 577  
       } else {
 578  0
         if ((zkManager != null) && zkManager.runsZooKeeper()) {
 579  0
           functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY;
 580  
         } else {
 581  0
           functions = GraphFunctions.WORKER_ONLY;
 582  
         }
 583  
       }
 584  
     }
 585  0
     return functions;
 586  
   }
 587  
 
 588  
   /**
 589  
    * Instantiate the appropriate BspService object (Master or Worker)
 590  
    * for this compute node.
 591  
    */
 592  
   private void instantiateBspService()
 593  
     throws IOException, InterruptedException {
 594  0
     if (graphFunctions.isMaster()) {
 595  0
       if (LOG.isInfoEnabled()) {
 596  0
         LOG.info("setup: Starting up BspServiceMaster " +
 597  
           "(master thread)...");
 598  
       }
 599  0
       serviceMaster = new BspServiceMaster<I, V, E>(context, this);
 600  0
       masterThread = new MasterThread<I, V, E>(serviceMaster, context);
 601  0
       masterThread.setUncaughtExceptionHandler(
 602  0
           createUncaughtExceptionHandler());
 603  0
       masterThread.start();
 604  
     }
 605  0
     if (graphFunctions.isWorker()) {
 606  0
       if (LOG.isInfoEnabled()) {
 607  0
         LOG.info("setup: Starting up BspServiceWorker...");
 608  
       }
 609  0
       serviceWorker = new BspServiceWorker<I, V, E>(context, this);
 610  0
       installGCMonitoring();
 611  0
       if (LOG.isInfoEnabled()) {
 612  0
         LOG.info("setup: Registering health of this worker...");
 613  
       }
 614  
     }
 615  0
   }
 616  
 
 617  
   /**
 618  
    * Install GC monitoring. This method intercepts all GC, log the gc, and
 619  
    * notifies an out-of-core engine (if any is used) about the GC.
 620  
    */
 621  
   private void installGCMonitoring() {
 622  0
     final GcObserver[] gcObservers = conf.createGcObservers(context);
 623  
     List<GarbageCollectorMXBean> mxBeans = ManagementFactory
 624  0
         .getGarbageCollectorMXBeans();
 625  0
     final OutOfCoreEngine oocEngine =
 626  0
         serviceWorker.getServerData().getOocEngine();
 627  0
     for (GarbageCollectorMXBean gcBean : mxBeans) {
 628  0
       NotificationEmitter emitter = (NotificationEmitter) gcBean;
 629  0
       NotificationListener listener = new NotificationListener() {
 630  
         @Override
 631  
         public void handleNotification(Notification notification,
 632  
                                        Object handle) {
 633  0
           if (notification.getType().equals(GarbageCollectionNotificationInfo
 634  
               .GARBAGE_COLLECTION_NOTIFICATION)) {
 635  0
             GarbageCollectionNotificationInfo info =
 636  0
                 GarbageCollectionNotificationInfo.from(
 637  0
                     (CompositeData) notification.getUserData());
 638  
 
 639  0
             if (LOG.isInfoEnabled()) {
 640  0
               LOG.info("installGCMonitoring: name = " + info.getGcName() +
 641  0
                   ", action = " + info.getGcAction() + ", cause = " +
 642  0
                   info.getGcCause() + ", duration = " +
 643  0
                   info.getGcInfo().getDuration() + "ms");
 644  
             }
 645  0
             gcTimeMetric.inc(info.getGcInfo().getDuration());
 646  0
             GiraphMetrics.get().getGcTracker().gcOccurred(info.getGcInfo());
 647  0
             for (GcObserver gcObserver : gcObservers) {
 648  0
               gcObserver.gcOccurred(info);
 649  
             }
 650  0
             if (oocEngine != null) {
 651  0
               oocEngine.gcCompleted(info);
 652  
             }
 653  
           }
 654  0
         }
 655  
       };
 656  
       //Add the listener
 657  0
       emitter.addNotificationListener(listener, null, null);
 658  0
     }
 659  0
   }
 660  
 
 661  
   /**
 662  
    * Initialize the root logger and appender to the settings in conf.
 663  
    */
 664  
   private void initializeAndConfigureLogging() {
 665  
     // Set the log level
 666  0
     String logLevel = conf.getLocalLevel();
 667  0
     if (!Logger.getRootLogger().getLevel().equals(Level.toLevel(logLevel))) {
 668  0
       Logger.getRootLogger().setLevel(Level.toLevel(logLevel));
 669  0
       if (LOG.isInfoEnabled()) {
 670  0
         LOG.info("setup: Set log level to " + logLevel);
 671  
       }
 672  
     } else {
 673  0
       if (LOG.isInfoEnabled()) {
 674  0
         LOG.info("setup: Log level remains at " + logLevel);
 675  
       }
 676  
     }
 677  
     // Sets pattern layout for all appenders
 678  0
     if (conf.useLogThreadLayout()) {
 679  0
       PatternLayout layout =
 680  
         new PatternLayout("%-7p %d [%t] %c %x - %m%n");
 681  
       Enumeration<Appender> appenderEnum =
 682  0
         Logger.getRootLogger().getAllAppenders();
 683  0
       while (appenderEnum.hasMoreElements()) {
 684  0
         appenderEnum.nextElement().setLayout(layout);
 685  
       }
 686  
     }
 687  
     // Change ZooKeeper logging level to error (info is quite verbose) for
 688  
     // testing only
 689  0
     if (conf.getLocalTestMode()) {
 690  0
       LogManager.getLogger(org.apache.zookeeper.server.PrepRequestProcessor.
 691  0
           class.getName()).setLevel(Level.ERROR);
 692  
     }
 693  0
   }
 694  
 
 695  
   /**
 696  
    * Initialize job-level metrics used by this class.
 697  
    */
 698  
   private void initJobMetrics() {
 699  0
     GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJobOptional();
 700  0
     wcPreAppTimer = new GiraphTimer(jobMetrics, "worker-context-pre-app",
 701  
         TimeUnit.MILLISECONDS);
 702  0
     wcPostAppTimer = new GiraphTimer(jobMetrics, "worker-context-post-app",
 703  
         TimeUnit.MILLISECONDS);
 704  0
   }
 705  
 
 706  
   @Override
 707  
   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
 708  0
     superstepTimer = new GiraphTimer(superstepMetrics,
 709  
         TIMER_SUPERSTEP_TIME, TimeUnit.MILLISECONDS);
 710  0
     computeAll = new GiraphTimer(superstepMetrics,
 711  
         TIMER_COMPUTE_ALL, TimeUnit.MILLISECONDS);
 712  0
     timeToFirstMessage = new GiraphTimer(superstepMetrics,
 713  
         TIMER_TIME_TO_FIRST_MSG, TimeUnit.MICROSECONDS);
 714  0
     communicationTimer = new GiraphTimer(superstepMetrics,
 715  
         TIMER_COMMUNICATION_TIME, TimeUnit.MILLISECONDS);
 716  0
     gcTimeMetric = superstepMetrics.getCounter(TIMER_SUPERSTEP_GC_TIME);
 717  0
     wcPreSuperstepTimer = new GiraphTimer(superstepMetrics,
 718  
         "worker-context-pre-superstep", TimeUnit.MILLISECONDS);
 719  0
   }
 720  
 
 721  
   /**
 722  
    * Notification from Vertex that a message has been sent.
 723  
    */
 724  
   public void notifySentMessages() {
 725  
     // We are tracking the time between when the compute started and the first
 726  
     // message get sent. We use null to flag that we have already recorded it.
 727  0
     GiraphTimerContext tmp = timeToFirstMessageTimerContext;
 728  0
     if (tmp != null) {
 729  0
       synchronized (timeToFirstMessage) {
 730  0
         if (timeToFirstMessageTimerContext != null) {
 731  0
           timeToFirstMessageTimerContext.stop();
 732  0
           timeToFirstMessageTimerContext = null;
 733  0
           communicationTimerContext = communicationTimer.time();
 734  
         }
 735  0
       }
 736  
     }
 737  0
   }
 738  
 
 739  
   /**
 740  
    * Notification of last message flushed. Comes when we finish the superstep
 741  
    * and are done waiting for all messages to send.
 742  
    */
 743  
   public void notifyFinishedCommunication() {
 744  0
     GiraphTimerContext tmp = communicationTimerContext;
 745  0
     if (tmp != null) {
 746  0
       synchronized (communicationTimer) {
 747  0
         if (communicationTimerContext != null) {
 748  0
           communicationTimerContext.stop();
 749  0
           communicationTimerContext = null;
 750  
         }
 751  0
       }
 752  
     }
 753  0
   }
 754  
 
 755  
   /**
 756  
    * Process graph data partitions active in this superstep.
 757  
    * @param context handle to the underlying cluster framework
 758  
    * @param partitionStatsList to pick up this superstep's processing stats
 759  
    * @param graphState the BSP graph state
 760  
    * @param messageStore the messages to be processed in this superstep
 761  
    * @param numThreads number of concurrent threads to do processing
 762  
    */
 763  
   private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
 764  
       List<PartitionStats> partitionStatsList,
 765  
       final GraphState graphState,
 766  
       final MessageStore<I, Writable> messageStore,
 767  
       int numThreads) {
 768  0
     PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
 769  0
     long verticesToCompute = 0;
 770  0
     for (Integer partitionId : partitionStore.getPartitionIds()) {
 771  0
       verticesToCompute += partitionStore.getPartitionVertexCount(partitionId);
 772  0
     }
 773  0
     WorkerProgress.get().startSuperstep(
 774  0
         serviceWorker.getSuperstep(), verticesToCompute,
 775  0
         serviceWorker.getPartitionStore().getNumPartitions());
 776  0
     partitionStore.startIteration();
 777  
 
 778  0
     GiraphTimerContext computeAllTimerContext = computeAll.time();
 779  0
     timeToFirstMessageTimerContext = timeToFirstMessage.time();
 780  
 
 781  0
     CallableFactory<Collection<PartitionStats>> callableFactory =
 782  0
       new CallableFactory<Collection<PartitionStats>>() {
 783  
         @Override
 784  
         public Callable<Collection<PartitionStats>> newCallable(
 785  
             int callableId) {
 786  0
           return new ComputeCallable<I, V, E, Writable, Writable>(
 787  
               context,
 788  
               graphState,
 789  
               messageStore,
 790  0
               conf,
 791  0
               serviceWorker);
 792  
         }
 793  
       };
 794  0
     List<Collection<PartitionStats>> results =
 795  0
         ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
 796  
             "compute-%d", context);
 797  
 
 798  0
     for (Collection<PartitionStats> result : results) {
 799  0
       partitionStatsList.addAll(result);
 800  0
     }
 801  
 
 802  0
     computeAllTimerContext.stop();
 803  0
   }
 804  
 
 805  
   /**
 806  
    * Handle the event that this superstep is a restart of a failed one.
 807  
    * @param superstep current superstep
 808  
    * @return the graph state, updated if this is a restart superstep
 809  
    */
 810  
   private boolean checkSuperstepRestarted(long superstep) throws IOException {
 811  
     // Might need to restart from another superstep
 812  
     // (manually or automatic), or store a checkpoint
 813  0
     if (serviceWorker.getRestartedSuperstep() == superstep) {
 814  0
       if (LOG.isInfoEnabled()) {
 815  0
         LOG.info("execute: Loading from checkpoint " + superstep);
 816  
       }
 817  0
       VertexEdgeCount vertexEdgeCount = serviceWorker.loadCheckpoint(
 818  0
         serviceWorker.getRestartedSuperstep());
 819  0
       finishedSuperstepStats = new FinishedSuperstepStats(0, false,
 820  0
           vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(),
 821  
           false, CheckpointStatus.NONE);
 822  0
       return true;
 823  
     }
 824  0
     return false;
 825  
   }
 826  
 
 827  
   /**
 828  
    * Check if it's time to checkpoint and actually does checkpointing
 829  
    * if it is.
 830  
    * @param checkpointStatus master's decision
 831  
    * @return true if we need to stop computation after checkpoint
 832  
    * @throws IOException
 833  
    */
 834  
   private boolean storeCheckpoint(CheckpointStatus checkpointStatus)
 835  
     throws IOException {
 836  0
     if (checkpointStatus != CheckpointStatus.NONE) {
 837  0
       serviceWorker.storeCheckpoint();
 838  
     }
 839  0
     return checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT;
 840  
   }
 841  
 
 842  
   /**
 843  
    * Attempt to collect the final statistics on the graph data
 844  
    * processed in this superstep by this compute node
 845  
    * @param inputSuperstepStats the final graph data stats object for the
 846  
    *                            input superstep
 847  
    * @return true if the graph data has no vertices (error?) and
 848  
    *         this node should terminate
 849  
    */
 850  
   private boolean collectInputSuperstepStats(
 851  
     FinishedSuperstepStats inputSuperstepStats) {
 852  0
     if (inputSuperstepStats.getVertexCount() == 0 &&
 853  0
         !inputSuperstepStats.mustLoadCheckpoint()) {
 854  0
       LOG.warn("map: No vertices in the graph, exiting.");
 855  0
       return true;
 856  
     }
 857  0
     if (conf.metricsEnabled()) {
 858  0
       GiraphMetrics.get().perSuperstep().printSummary(System.err);
 859  
     }
 860  0
     return false;
 861  
   }
 862  
 
 863  
   /**
 864  
    * Did the state of this compute node change?
 865  
    * @return true if the processing of supersteps should terminate.
 866  
    */
 867  
   private boolean checkTaskState() {
 868  0
     if (done) {
 869  0
       return true;
 870  
     }
 871  0
     GiraphMetrics.get().resetSuperstepMetrics(BspService.INPUT_SUPERSTEP);
 872  0
     if (graphFunctions.isNotAWorker()) {
 873  0
       if (LOG.isInfoEnabled()) {
 874  0
         LOG.info("map: No need to do anything when not a worker");
 875  
       }
 876  0
       return true;
 877  
     }
 878  0
     if (alreadyRun) {
 879  0
       throw new RuntimeException("map: In BSP, map should have only been" +
 880  
         " run exactly once, (already run)");
 881  
     }
 882  0
     alreadyRun = true;
 883  0
     return false;
 884  
   }
 885  
 
 886  
   /**
 887  
    * Call to the WorkerContext before application begins.
 888  
    */
 889  
   private void workerContextPreApp() {
 890  0
     GiraphTimerContext preAppTimerContext = wcPreAppTimer.time();
 891  
     try {
 892  0
       serviceWorker.getWorkerContext().preApplication();
 893  0
     } catch (InstantiationException e) {
 894  0
       LOG.fatal("execute: preApplication failed in instantiation", e);
 895  0
       throw new RuntimeException(
 896  
           "execute: preApplication failed in instantiation", e);
 897  0
     } catch (IllegalAccessException e) {
 898  0
       LOG.fatal("execute: preApplication failed in access", e);
 899  0
       throw new RuntimeException(
 900  
           "execute: preApplication failed in access", e);
 901  0
     }
 902  0
     preAppTimerContext.stop();
 903  0
     context.progress();
 904  
 
 905  0
     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
 906  0
       obs.preApplication();
 907  0
       context.progress();
 908  
     }
 909  0
   }
 910  
 
 911  
   /**
 912  
    * Setup mapper observers
 913  
    */
 914  
   public void setupMapperObservers() {
 915  0
     mapperObservers = conf.createMapperObservers(context);
 916  0
     for (MapperObserver mapperObserver : mapperObservers) {
 917  0
       mapperObserver.setup();
 918  
     }
 919  0
   }
 920  
 
 921  
   /**
 922  
    * Executes preLoad() on worker observers.
 923  
    */
 924  
   private void preLoadOnWorkerObservers() {
 925  0
     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
 926  0
       obs.preLoad();
 927  0
       context.progress();
 928  
     }
 929  0
   }
 930  
 
 931  
   /**
 932  
    * Executes postSave() on worker observers.
 933  
    */
 934  
   private void postSaveOnWorkerObservers() {
 935  0
     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
 936  0
       obs.postSave();
 937  0
       context.progress();
 938  
     }
 939  0
   }
 940  
 
 941  
   /**
 942  
    * Called by owner of this GraphTaskManager object on each compute node
 943  
    */
 944  
   public void cleanup()
 945  
     throws IOException, InterruptedException {
 946  0
     if (LOG.isInfoEnabled()) {
 947  0
       LOG.info("cleanup: Starting for " + getGraphFunctions());
 948  
     }
 949  0
     jobProgressTracker.cleanup();
 950  0
     if (done) {
 951  0
       return;
 952  
     }
 953  
 
 954  0
     if (serviceWorker != null) {
 955  0
       serviceWorker.cleanup(finishedSuperstepStats);
 956  
     }
 957  0
   }
 958  
 
 959  
   /**
 960  
    * Method to send the counter values from the worker to the master,
 961  
    * after all supersteps are done, and finish cleanup
 962  
    */
 963  
   public void sendWorkerCountersAndFinishCleanup() {
 964  0
     if (serviceWorker != null) {
 965  0
       postSaveOnWorkerObservers();
 966  0
       serviceWorker.storeCountersInZooKeeper(true);
 967  0
       serviceWorker.closeZooKeeper();
 968  
     }
 969  
     try {
 970  0
       if (masterThread != null) {
 971  0
         masterThread.join();
 972  0
         LOG.info("cleanup: Joined with master thread");
 973  
       }
 974  0
     } catch (InterruptedException e) {
 975  
       // cleanup phase -- just log the error
 976  0
       LOG.error("cleanup: Master thread couldn't join");
 977  0
     }
 978  0
     if (zkManager != null) {
 979  0
       LOG.info("cleanup: Offlining ZooKeeper servers");
 980  
       try {
 981  0
         zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED);
 982  
         // We need this here cause apparently exceptions are eaten by Hadoop
 983  
         // when they come from the cleanup lifecycle and it's useful to know
 984  
         // if something is wrong.
 985  
         //
 986  
         // And since it's cleanup nothing too bad should happen if we don't
 987  
         // propagate and just allow the job to finish normally.
 988  
         // CHECKSTYLE: stop IllegalCatch
 989  0
       } catch (Throwable e) {
 990  
         // CHECKSTYLE: resume IllegalCatch
 991  0
         LOG.error("cleanup: Error offlining zookeeper", e);
 992  0
       }
 993  
     }
 994  
 
 995  
     // Stop tracking metrics
 996  0
     GiraphMetrics.get().shutdown();
 997  0
   }
 998  
 
 999  
   /**
 1000  
    * Cleanup a ZooKeeper instance managed by this
 1001  
    * GiraphWorker upon job run failure.
 1002  
    */
 1003  
   public void zooKeeperCleanup() {
 1004  0
     if (graphFunctions.isZooKeeper()) {
 1005  
       // ZooKeeper may have had an issue
 1006  0
       if (zkManager != null) {
 1007  0
         zkManager.cleanup();
 1008  
       }
 1009  
     }
 1010  0
   }
 1011  
 
 1012  
   /**
 1013  
    * Cleanup all of Giraph's framework-agnostic resources
 1014  
    * regardless of which type of cluster Giraph is running on.
 1015  
    */
 1016  
   public void workerFailureCleanup() {
 1017  
     try {
 1018  0
       if (graphFunctions.isWorker()) {
 1019  0
         serviceWorker.failureCleanup();
 1020  
       }
 1021  
       // Stop tracking metrics
 1022  0
       GiraphMetrics.get().shutdown();
 1023  
     // Checkstyle exception due to needing to get the original
 1024  
     // exception on failure
 1025  
     // CHECKSTYLE: stop IllegalCatch
 1026  0
     } catch (RuntimeException e1) {
 1027  
     // CHECKSTYLE: resume IllegalCatch
 1028  0
       LOG.error("run: Worker failure failed on another RuntimeException, " +
 1029  
           "original expection will be rethrown", e1);
 1030  0
     }
 1031  0
   }
 1032  
 
 1033  
   /**
 1034  
    * Creates exception handler that will terminate process gracefully in case
 1035  
    * of any uncaught exception.
 1036  
    * @return new exception handler object.
 1037  
    */
 1038  
   public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
 1039  0
     return new OverrideExceptionHandler(
 1040  0
         CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance(
 1041  0
             getConf()), getJobProgressTracker());
 1042  
   }
 1043  
 
 1044  
   /**
 1045  
    * Creates exception handler with the passed implementation of
 1046  
    * {@link CheckerIfWorkerShouldFailAfterException}.
 1047  
    *
 1048  
    * @param checker Instance that checks whether the job should fail.
 1049  
    * @return Exception handler.
 1050  
    */
 1051  
   public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler(
 1052  
     CheckerIfWorkerShouldFailAfterException checker) {
 1053  0
     return new OverrideExceptionHandler(checker, getJobProgressTracker());
 1054  
   }
 1055  
 
 1056  
   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
 1057  0
     return conf;
 1058  
   }
 1059  
 
 1060  
   /**
 1061  
    * @return Time spent in GC recorder by the GC listener
 1062  
    */
 1063  
   public long getSuperstepGCTime() {
 1064  0
     return (gcTimeMetric == null) ? 0 : gcTimeMetric.count();
 1065  
   }
 1066  
 
 1067  
   /**
 1068  
    * Returns a list of zookeeper servers to connect to.
 1069  
    * If the port is set to 0 and Giraph is starting a single
 1070  
    * ZooKeeper server, then Zookeeper will pick its own port.
 1071  
    * Otherwise, the ZooKeeper port set by the user will be used.
 1072  
    * @return host:port,host:port for each zookeeper
 1073  
    */
 1074  
   public String getZookeeperList() {
 1075  0
     if (zkManager != null) {
 1076  0
       return zkManager.getZooKeeperServerPortString();
 1077  
     } else {
 1078  0
       return conf.getZookeeperList();
 1079  
     }
 1080  
   }
 1081  
 
 1082  
   /**
 1083  
    * Default handler for uncaught exceptions.
 1084  
    * It will do the best to clean up and then will terminate current giraph job.
 1085  
    */
 1086  
   class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
 1087  
     /** Checker if worker should fail after a thread gets an exception */
 1088  
     private final CheckerIfWorkerShouldFailAfterException checker;
 1089  
     /** JobProgressTracker to log problems to */
 1090  
     private final JobProgressTracker jobProgressTracker;
 1091  
 
 1092  
     /**
 1093  
      * Constructor
 1094  
      *
 1095  
      * @param checker Checker if worker should fail after a thread gets an
 1096  
      *                exception
 1097  
      * @param jobProgressTracker JobProgressTracker to log problems to
 1098  
      */
 1099  
     public OverrideExceptionHandler(
 1100  
         CheckerIfWorkerShouldFailAfterException checker,
 1101  0
         JobProgressTracker jobProgressTracker) {
 1102  0
       this.checker = checker;
 1103  0
       this.jobProgressTracker = jobProgressTracker;
 1104  0
     }
 1105  
 
 1106  
     @Override
 1107  
     public void uncaughtException(final Thread t, final Throwable e) {
 1108  0
       if (!checker.checkIfWorkerShouldFail(t, e)) {
 1109  0
         LOG.error(
 1110  
           "uncaughtException: OverrideExceptionHandler on thread " +
 1111  0
             t.getName() + ", msg = " +  e.getMessage(), e);
 1112  0
         return;
 1113  
       }
 1114  
       try {
 1115  0
         LOG.fatal(
 1116  
             "uncaughtException: OverrideExceptionHandler on thread " +
 1117  0
                 t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
 1118  0
         byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
 1119  0
         jobProgressTracker.logError(ExceptionUtils.getStackTrace(e),
 1120  
                 exByteArray);
 1121  0
         zooKeeperCleanup();
 1122  0
         workerFailureCleanup();
 1123  
       } finally {
 1124  0
         System.exit(1);
 1125  0
       }
 1126  0
     }
 1127  
   }
 1128  
 
 1129  
   /**
 1130  
    * Interface to check if worker should fail after a thread gets an exception
 1131  
    */
 1132  
   public interface CheckerIfWorkerShouldFailAfterException {
 1133  
     /**
 1134  
      * Check if worker should fail after a thread gets an exception
 1135  
      *
 1136  
      * @param thread Thread which raised the exception
 1137  
      * @param exception Exception which occurred
 1138  
      * @return True iff worker should fail after this exception
 1139  
      */
 1140  
     boolean checkIfWorkerShouldFail(Thread thread, Throwable exception);
 1141  
   }
 1142  
 
 1143  
   /**
 1144  
    * Class to use by default, where each exception causes job failure
 1145  
    */
 1146  0
   public static class FailWithEveryExceptionOccurred
 1147  
       implements CheckerIfWorkerShouldFailAfterException {
 1148  
     @Override
 1149  
     public boolean checkIfWorkerShouldFail(Thread thread, Throwable exception) {
 1150  0
       return true;
 1151  
     }
 1152  
   }
 1153  
 
 1154  
   /**
 1155  
    * Checks the message of a throwable, and checks whether it is a
 1156  
    * "connection reset by peer" type of exception.
 1157  
    *
 1158  
    * @param throwable Throwable
 1159  
    * @return True if the throwable is a "connection reset by peer",
 1160  
    * false otherwise.
 1161  
    */
 1162  
   public static boolean isConnectionResetByPeer(Throwable throwable) {
 1163  0
     return throwable.getMessage().startsWith(
 1164  
       "Connection reset by peer") ? true : false;
 1165  
   }
 1166  
 }