Coverage Report - org.apache.giraph.worker.BspServiceWorker
 
Classes in this File Line Coverage Branch Coverage Complexity
BspServiceWorker$1
0%
0/2
N/A
0
BspServiceWorker$1$1
0%
0/35
0%
0/10
0
BspServiceWorker$2
0%
0/2
N/A
0
BspServiceWorker$2$1
0%
0/33
0%
0/10
0
BspServiceWorker$3
0%
0/2
N/A
0
BspServiceWorker$3$1
0%
0/17
0%
0/4
0
BspServiceWorker$4
0%
0/2
N/A
0
BspServiceWorker$4$1
0%
0/18
0%
0/6
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.worker;
 20  
 
 21  
 import java.io.DataInputStream;
 22  
 import java.io.DataOutputStream;
 23  
 import java.io.IOException;
 24  
 import java.nio.charset.Charset;
 25  
 import java.util.ArrayList;
 26  
 import java.util.Collection;
 27  
 import java.util.Collections;
 28  
 import java.util.HashSet;
 29  
 import java.util.List;
 30  
 import java.util.Map;
 31  
 import java.util.Map.Entry;
 32  
 import java.util.Queue;
 33  
 import java.util.Set;
 34  
 import java.util.concurrent.Callable;
 35  
 import java.util.concurrent.ConcurrentLinkedQueue;
 36  
 import java.util.concurrent.TimeUnit;
 37  
 
 38  
 import net.iharder.Base64;
 39  
 
 40  
 import org.apache.giraph.bsp.ApplicationState;
 41  
 import org.apache.giraph.bsp.BspService;
 42  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 43  
 import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 44  
 import org.apache.giraph.comm.ServerData;
 45  
 import org.apache.giraph.comm.WorkerClient;
 46  
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 47  
 import org.apache.giraph.comm.WorkerServer;
 48  
 import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
 49  
 import org.apache.giraph.comm.messages.MessageStore;
 50  
 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 51  
 import org.apache.giraph.comm.netty.NettyClient;
 52  
 import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
 53  
 import org.apache.giraph.comm.netty.NettyWorkerClient;
 54  
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 55  
 import org.apache.giraph.comm.netty.NettyWorkerServer;
 56  
 import org.apache.giraph.comm.requests.PartitionStatsRequest;
 57  
 import org.apache.giraph.conf.GiraphConstants;
 58  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 59  
 import org.apache.giraph.counters.CustomCounter;
 60  
 import org.apache.giraph.counters.CustomCounters;
 61  
 import org.apache.giraph.edge.Edge;
 62  
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 63  
 import org.apache.giraph.graph.FinishedSuperstepStats;
 64  
 import org.apache.giraph.graph.GlobalStats;
 65  
 import org.apache.giraph.graph.GraphTaskManager;
 66  
 import org.apache.giraph.graph.Vertex;
 67  
 import org.apache.giraph.graph.VertexEdgeCount;
 68  
 import org.apache.giraph.io.EdgeOutputFormat;
 69  
 import org.apache.giraph.io.EdgeWriter;
 70  
 import org.apache.giraph.io.VertexOutputFormat;
 71  
 import org.apache.giraph.io.VertexWriter;
 72  
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
 73  
 import org.apache.giraph.mapping.translate.TranslateEdge;
 74  
 import org.apache.giraph.master.MasterInfo;
 75  
 import org.apache.giraph.master.SuperstepClasses;
 76  
 import org.apache.giraph.metrics.GiraphMetrics;
 77  
 import org.apache.giraph.metrics.GiraphTimer;
 78  
 import org.apache.giraph.metrics.GiraphTimerContext;
 79  
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 80  
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 81  
 import org.apache.giraph.metrics.WorkerSuperstepMetrics;
 82  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 83  
 import org.apache.giraph.partition.Partition;
 84  
 import org.apache.giraph.partition.PartitionExchange;
 85  
 import org.apache.giraph.partition.PartitionOwner;
 86  
 import org.apache.giraph.partition.PartitionStats;
 87  
 import org.apache.giraph.partition.PartitionStore;
 88  
 import org.apache.giraph.partition.WorkerGraphPartitioner;
 89  
 import org.apache.giraph.utils.BlockingElementsSet;
 90  
 import org.apache.giraph.utils.CallableFactory;
 91  
 import org.apache.giraph.utils.CheckpointingUtils;
 92  
 import org.apache.giraph.utils.JMapHistoDumper;
 93  
 import org.apache.giraph.utils.LoggerUtils;
 94  
 import org.apache.giraph.utils.MemoryUtils;
 95  
 import org.apache.giraph.utils.ProgressableUtils;
 96  
 import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 97  
 import org.apache.giraph.utils.WritableUtils;
 98  
 import org.apache.giraph.zk.BspEvent;
 99  
 import org.apache.giraph.zk.PredicateLock;
 100  
 import org.apache.hadoop.fs.FSDataInputStream;
 101  
 import org.apache.hadoop.fs.FSDataOutputStream;
 102  
 import org.apache.hadoop.fs.Path;
 103  
 import org.apache.hadoop.io.Writable;
 104  
 import org.apache.hadoop.io.WritableComparable;
 105  
 import org.apache.hadoop.io.compress.CompressionCodec;
 106  
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 107  
 import org.apache.hadoop.mapreduce.Counter;
 108  
 import org.apache.hadoop.mapreduce.Mapper;
 109  
 import org.apache.hadoop.mapreduce.OutputCommitter;
 110  
 import org.apache.log4j.Level;
 111  
 import org.apache.log4j.Logger;
 112  
 import org.apache.zookeeper.CreateMode;
 113  
 import org.apache.zookeeper.KeeperException;
 114  
 import org.apache.zookeeper.WatchedEvent;
 115  
 import org.apache.zookeeper.Watcher.Event.EventType;
 116  
 import org.apache.zookeeper.ZooDefs.Ids;
 117  
 import org.apache.zookeeper.data.Stat;
 118  
 import org.json.JSONArray;
 119  
 import org.json.JSONException;
 120  
 import org.json.JSONObject;
 121  
 
 122  
 import com.google.common.collect.Lists;
 123  
 
 124  
 import static org.apache.giraph.graph.GraphTaskManager.isConnectionResetByPeer;
 125  
 
 126  
 /**
 127  
  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
 128  
  *
 129  
  * @param <I> Vertex id
 130  
  * @param <V> Vertex data
 131  
  * @param <E> Edge data
 132  
  */
 133  
 @SuppressWarnings("rawtypes")
 134  
 public class BspServiceWorker<I extends WritableComparable,
 135  
     V extends Writable, E extends Writable>
 136  
     extends BspService<I, V, E>
 137  
     implements CentralizedServiceWorker<I, V, E>,
 138  
     ResetSuperstepMetricsObserver {
 139  
   /** Name of gauge for time spent waiting on other workers */
 140  
   public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
 141  
   /** Class logger */
 142  
   private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
 143  
   /** My process health znode */
 144  
   private String myHealthZnode;
 145  
   /** Worker info */
 146  
   private final WorkerInfo workerInfo;
 147  
   /** Worker graph partitioner */
 148  
   private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
 149  
   /** Local Data for each worker */
 150  
   private final LocalData<I, V, E, ? extends Writable> localData;
 151  
   /** Used to translate Edges during vertex input phase based on localData */
 152  
   private final TranslateEdge<I, E> translateEdge;
 153  
   /** IPC Client */
 154  
   private final WorkerClient<I, V, E> workerClient;
 155  
   /** IPC Server */
 156  
   private final WorkerServer<I, V, E> workerServer;
 157  
   /** Request processor for aggregator requests */
 158  
   private final WorkerAggregatorRequestProcessor
 159  
   workerAggregatorRequestProcessor;
 160  
   /** Master info */
 161  
   private MasterInfo masterInfo = new MasterInfo();
 162  
   /** List of workers */
 163  
   private List<WorkerInfo> workerInfoList = Lists.newArrayList();
 164  
   /** Have the partition exchange children (workers) changed? */
 165  
   private final BspEvent partitionExchangeChildrenChanged;
 166  
 
 167  
   /** Addresses and partitions transfer */
 168  
   private BlockingElementsSet<AddressesAndPartitionsWritable>
 169  
       addressesAndPartitionsHolder = new BlockingElementsSet<>();
 170  
 
 171  
   /** Worker Context */
 172  
   private final WorkerContext workerContext;
 173  
 
 174  
   /** Handler for aggregators */
 175  
   private final WorkerAggregatorHandler globalCommHandler;
 176  
 
 177  
   /** Superstep output */
 178  
   private final SuperstepOutput<I, V, E> superstepOutput;
 179  
 
 180  
   /** array of observers to call back to */
 181  
   private final WorkerObserver[] observers;
 182  
   /** Writer for worker progress */
 183  
   private final WorkerProgressWriter workerProgressWriter;
 184  
 
 185  
   // Per-Superstep Metrics
 186  
   /** Timer for WorkerContext#postSuperstep */
 187  
   private GiraphTimer wcPostSuperstepTimer;
 188  
   /** Time spent waiting on requests to finish */
 189  
   private GiraphTimer waitRequestsTimer;
 190  
 
 191  
   /** InputSplit handlers used in INPUT_SUPERSTEP */
 192  
   private final WorkerInputSplitsHandler inputSplitsHandler;
 193  
 
 194  
   /** Memory observer */
 195  
   private final MemoryObserver memoryObserver;
 196  
 
 197  
   /**
 198  
    * Constructor for setting up the worker.
 199  
    *
 200  
    * @param context Mapper context
 201  
    * @param graphTaskManager GraphTaskManager for this compute node
 202  
    * @throws IOException
 203  
    * @throws InterruptedException
 204  
    */
 205  
   public BspServiceWorker(
 206  
     Mapper<?, ?, ?, ?>.Context context,
 207  
     GraphTaskManager<I, V, E> graphTaskManager)
 208  
     throws IOException, InterruptedException {
 209  
     super(context, graphTaskManager);
 210  
     ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
 211  
     localData = new LocalData<>(conf);
 212  
     translateEdge = getConfiguration().edgeTranslationInstance();
 213  
     if (translateEdge != null) {
 214  
       translateEdge.initialize(this);
 215  
     }
 216  
     partitionExchangeChildrenChanged = new PredicateLock(context);
 217  
     registerBspEvent(partitionExchangeChildrenChanged);
 218  
     workerGraphPartitioner =
 219  
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
 220  
     workerInfo = new WorkerInfo();
 221  
     workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
 222  
         graphTaskManager.createUncaughtExceptionHandler(
 223  
           (thread, throwable) -> {
 224  
             // If the connection was closed by the client, then we just log
 225  
             // the error, we do not fail the job, since the client will
 226  
             // attempt to reconnect.
 227  
             return !isConnectionResetByPeer(throwable);
 228  
           }
 229  
         )
 230  
     );
 231  
     workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
 232  
         workerServer.getLocalHostOrIp());
 233  
     workerInfo.setTaskId(getTaskId());
 234  
     workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
 235  
         graphTaskManager.createUncaughtExceptionHandler());
 236  
     workerServer.setFlowControl(workerClient.getFlowControl());
 237  
     OutOfCoreEngine oocEngine = workerServer.getServerData().getOocEngine();
 238  
     if (oocEngine != null) {
 239  
       oocEngine.setFlowControl(workerClient.getFlowControl());
 240  
     }
 241  
 
 242  
     workerAggregatorRequestProcessor =
 243  
         new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
 244  
 
 245  
     globalCommHandler = new WorkerAggregatorHandler(this, conf, context);
 246  
 
 247  
     workerContext = conf.createWorkerContext();
 248  
     workerContext.setWorkerGlobalCommUsage(globalCommHandler);
 249  
 
 250  
     superstepOutput = conf.createSuperstepOutput(context);
 251  
 
 252  
     if (conf.isJMapHistogramDumpEnabled()) {
 253  
       conf.addWorkerObserverClass(JMapHistoDumper.class);
 254  
     }
 255  
     if (conf.isReactiveJmapHistogramDumpEnabled()) {
 256  
       conf.addWorkerObserverClass(ReactiveJMapHistoDumper.class);
 257  
     }
 258  
     observers = conf.createWorkerObservers(context);
 259  
 
 260  
     WorkerProgress.get().setTaskId(getTaskId());
 261  
     workerProgressWriter = conf.trackJobProgressOnClient() ?
 262  
         new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
 263  
         null;
 264  
 
 265  
     GiraphMetrics.get().addSuperstepResetObserver(this);
 266  
 
 267  
     inputSplitsHandler = new WorkerInputSplitsHandler(
 268  
         workerInfo, masterInfo.getTaskId(), workerClient);
 269  
 
 270  
     memoryObserver = new MemoryObserver(getZkExt(), memoryObserverPath, conf);
 271  
   }
 272  
 
 273  
   @Override
 274  
   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
 275  
     waitRequestsTimer = new GiraphTimer(superstepMetrics,
 276  
         TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
 277  
     wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
 278  
         "worker-context-post-superstep", TimeUnit.MICROSECONDS);
 279  
   }
 280  
 
 281  
   @Override
 282  
   public WorkerContext getWorkerContext() {
 283  
     return workerContext;
 284  
   }
 285  
 
 286  
   @Override
 287  
   public WorkerObserver[] getWorkerObservers() {
 288  
     return observers;
 289  
   }
 290  
 
 291  
   @Override
 292  
   public WorkerClient<I, V, E> getWorkerClient() {
 293  
     return workerClient;
 294  
   }
 295  
 
 296  
   public LocalData<I, V, E, ? extends Writable> getLocalData() {
 297  
     return localData;
 298  
   }
 299  
 
 300  
   public TranslateEdge<I, E> getTranslateEdge() {
 301  
     return translateEdge;
 302  
   }
 303  
 
 304  
   /**
 305  
    * Intended to check the health of the node.  For instance, can it ssh,
 306  
    * dmesg, etc. For now, does nothing.
 307  
    * TODO: Make this check configurable by the user (i.e. search dmesg for
 308  
    * problems).
 309  
    *
 310  
    * @return True if healthy (always in this case).
 311  
    */
 312  
   public boolean isHealthy() {
 313  
     return true;
 314  
   }
 315  
 
 316  
   /**
 317  
    * Load the vertices/edges from input slits. Do this until all the
 318  
    * InputSplits have been processed.
 319  
    * All workers will try to do as many InputSplits as they can.  The master
 320  
    * will monitor progress and stop this once all the InputSplits have been
 321  
    * loaded and check-pointed.  Keep track of the last input split path to
 322  
    * ensure the input split cache is flushed prior to marking the last input
 323  
    * split complete.
 324  
    *
 325  
    * Use one or more threads to do the loading.
 326  
    *
 327  
    * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
 328  
    * @return Statistics of the vertices and edges loaded
 329  
    * @throws InterruptedException
 330  
    * @throws KeeperException
 331  
    */
 332  
   private VertexEdgeCount loadInputSplits(
 333  
       CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
 334  
     throws KeeperException, InterruptedException {
 335  
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
 336  
     int numThreads = getConfiguration().getNumInputSplitsThreads();
 337  
     if (LOG.isInfoEnabled()) {
 338  
       LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
 339  
           "originally " + getConfiguration().getNumInputSplitsThreads() +
 340  
           " threads(s)");
 341  
     }
 342  
 
 343  
     List<VertexEdgeCount> results =
 344  
         ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
 345  
             numThreads, "load-%d", getContext());
 346  
     for (VertexEdgeCount result : results) {
 347  
       vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
 348  
     }
 349  
 
 350  
     workerClient.waitAllRequests();
 351  
     return vertexEdgeCount;
 352  
   }
 353  
 
 354  
   /**
 355  
    * Load the mapping entries from the user-defined
 356  
    * {@link org.apache.giraph.io.MappingReader}
 357  
    *
 358  
    * @return Count of mapping entries loaded
 359  
    */
 360  
   private long loadMapping() throws KeeperException,
 361  
     InterruptedException {
 362  
     MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
 363  
         inputSplitsCallableFactory =
 364  
         new MappingInputSplitsCallableFactory<>(
 365  
             getConfiguration().createWrappedMappingInputFormat(),
 366  
             getContext(),
 367  
             getConfiguration(),
 368  
             this,
 369  
             inputSplitsHandler);
 370  
 
 371  
     long mappingsLoaded =
 372  
         loadInputSplits(inputSplitsCallableFactory).getMappingCount();
 373  
 
 374  
     // after all threads finish loading - call postFilling
 375  
     localData.getMappingStore().postFilling();
 376  
     return mappingsLoaded;
 377  
   }
 378  
 
 379  
   /**
 380  
    * Load the vertices from the user-defined
 381  
    * {@link org.apache.giraph.io.VertexReader}
 382  
    *
 383  
    * @return Count of vertices and edges loaded
 384  
    */
 385  
   private VertexEdgeCount loadVertices() throws KeeperException,
 386  
       InterruptedException {
 387  
     VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
 388  
         new VertexInputSplitsCallableFactory<I, V, E>(
 389  
             getConfiguration().createWrappedVertexInputFormat(),
 390  
             getContext(),
 391  
             getConfiguration(),
 392  
             this,
 393  
             inputSplitsHandler);
 394  
 
 395  
     return loadInputSplits(inputSplitsCallableFactory);
 396  
   }
 397  
 
 398  
   /**
 399  
    * Load the edges from the user-defined
 400  
    * {@link org.apache.giraph.io.EdgeReader}.
 401  
    *
 402  
    * @return Number of edges loaded
 403  
    */
 404  
   private long loadEdges() throws KeeperException, InterruptedException {
 405  
     EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
 406  
         new EdgeInputSplitsCallableFactory<I, V, E>(
 407  
             getConfiguration().createWrappedEdgeInputFormat(),
 408  
             getContext(),
 409  
             getConfiguration(),
 410  
             this,
 411  
             inputSplitsHandler);
 412  
 
 413  
     return loadInputSplits(inputSplitsCallableFactory).getEdgeCount();
 414  
   }
 415  
 
 416  
   @Override
 417  
   public MasterInfo getMasterInfo() {
 418  
     return masterInfo;
 419  
   }
 420  
 
 421  
   @Override
 422  
   public List<WorkerInfo> getWorkerInfoList() {
 423  
     return workerInfoList;
 424  
   }
 425  
 
 426  
   /**
 427  
    * Mark current worker as done and then wait for all workers
 428  
    * to finish processing input splits.
 429  
    */
 430  
   private void markCurrentWorkerDoneReadingThenWaitForOthers() {
 431  
     String workerInputSplitsDonePath =
 432  
         inputSplitsWorkerDonePath + "/" + getWorkerInfo().getHostnameId();
 433  
     try {
 434  
       getZkExt().createExt(workerInputSplitsDonePath,
 435  
           null,
 436  
           Ids.OPEN_ACL_UNSAFE,
 437  
           CreateMode.PERSISTENT,
 438  
           true);
 439  
     } catch (KeeperException e) {
 440  
       throw new IllegalStateException(
 441  
           "markCurrentWorkerDoneThenWaitForOthers: " +
 442  
               "KeeperException creating worker done splits", e);
 443  
     } catch (InterruptedException e) {
 444  
       throw new IllegalStateException(
 445  
           "markCurrentWorkerDoneThenWaitForOthers: " +
 446  
               "InterruptedException creating worker done splits", e);
 447  
     }
 448  
     while (true) {
 449  
       Stat inputSplitsDoneStat;
 450  
       try {
 451  
         inputSplitsDoneStat =
 452  
             getZkExt().exists(inputSplitsAllDonePath, true);
 453  
       } catch (KeeperException e) {
 454  
         throw new IllegalStateException(
 455  
             "markCurrentWorkerDoneThenWaitForOthers: " +
 456  
                 "KeeperException waiting on worker done splits", e);
 457  
       } catch (InterruptedException e) {
 458  
         throw new IllegalStateException(
 459  
             "markCurrentWorkerDoneThenWaitForOthers: " +
 460  
                 "InterruptedException waiting on worker done splits", e);
 461  
       }
 462  
       if (inputSplitsDoneStat != null) {
 463  
         break;
 464  
       }
 465  
       getInputSplitsAllDoneEvent().waitForTimeoutOrFail(
 466  
           GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
 467  
               getConfiguration()));
 468  
       getInputSplitsAllDoneEvent().reset();
 469  
     }
 470  
   }
 471  
 
 472  
   @Override
 473  
   public FinishedSuperstepStats setup() {
 474  
     // Unless doing a restart, prepare for computation:
 475  
     // 1. Start superstep INPUT_SUPERSTEP (no computation)
 476  
     // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
 477  
     // 3. Process input splits until there are no more.
 478  
     // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
 479  
     // 5. Process any mutations deriving from add edge requests
 480  
     // 6. Wait for superstep INPUT_SUPERSTEP to complete.
 481  
     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
 482  
       setCachedSuperstep(getRestartedSuperstep());
 483  
       return new FinishedSuperstepStats(0, false, 0, 0, true,
 484  
           CheckpointStatus.NONE);
 485  
     }
 486  
 
 487  
     JSONObject jobState = getJobState();
 488  
     if (jobState != null) {
 489  
       try {
 490  
         if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
 491  
             ApplicationState.START_SUPERSTEP) &&
 492  
             jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
 493  
             getSuperstep()) {
 494  
           if (LOG.isInfoEnabled()) {
 495  
             LOG.info("setup: Restarting from an automated " +
 496  
                 "checkpointed superstep " +
 497  
                 getSuperstep() + ", attempt " +
 498  
                 getApplicationAttempt());
 499  
           }
 500  
           setRestartedSuperstep(getSuperstep());
 501  
           return new FinishedSuperstepStats(0, false, 0, 0, true,
 502  
               CheckpointStatus.NONE);
 503  
         }
 504  
       } catch (JSONException e) {
 505  
         throw new RuntimeException(
 506  
             "setup: Failed to get key-values from " +
 507  
                 jobState.toString(), e);
 508  
       }
 509  
     }
 510  
 
 511  
     // Add the partitions that this worker owns
 512  
     Collection<? extends PartitionOwner> masterSetPartitionOwners =
 513  
         startSuperstep();
 514  
     workerGraphPartitioner.updatePartitionOwners(
 515  
         getWorkerInfo(), masterSetPartitionOwners);
 516  
     getPartitionStore().initialize();
 517  
 
 518  
 /*if[HADOOP_NON_SECURE]
 519  
     workerClient.setup();
 520  
 else[HADOOP_NON_SECURE]*/
 521  
     workerClient.setup(getConfiguration().authenticate());
 522  
 /*end[HADOOP_NON_SECURE]*/
 523  
 
 524  
     // Initialize aggregator at worker side during setup.
 525  
     // Do this just before vertex and edge loading.
 526  
     globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
 527  
 
 528  
     VertexEdgeCount vertexEdgeCount;
 529  
     long entriesLoaded;
 530  
 
 531  
     if (getConfiguration().hasMappingInputFormat()) {
 532  
       getContext().progress();
 533  
       try {
 534  
         entriesLoaded = loadMapping();
 535  
         // successfully loaded mapping
 536  
         // now initialize graphPartitionerFactory with this data
 537  
         getGraphPartitionerFactory().initialize(localData);
 538  
       } catch (InterruptedException e) {
 539  
         throw new IllegalStateException(
 540  
             "setup: loadMapping failed with InterruptedException", e);
 541  
       } catch (KeeperException e) {
 542  
         throw new IllegalStateException(
 543  
             "setup: loadMapping failed with KeeperException", e);
 544  
       }
 545  
       getContext().progress();
 546  
       if (LOG.isInfoEnabled()) {
 547  
         LOG.info("setup: Finally loaded a total of " +
 548  
             entriesLoaded + " entries from inputSplits");
 549  
       }
 550  
 
 551  
       // Print stats for data stored in localData once mapping is fully
 552  
       // loaded on all the workers
 553  
       localData.printStats();
 554  
     }
 555  
 
 556  
     if (getConfiguration().hasVertexInputFormat()) {
 557  
       getContext().progress();
 558  
       try {
 559  
         vertexEdgeCount = loadVertices();
 560  
       } catch (InterruptedException e) {
 561  
         throw new IllegalStateException(
 562  
             "setup: loadVertices failed with InterruptedException", e);
 563  
       } catch (KeeperException e) {
 564  
         throw new IllegalStateException(
 565  
             "setup: loadVertices failed with KeeperException", e);
 566  
       }
 567  
       getContext().progress();
 568  
     } else {
 569  
       vertexEdgeCount = new VertexEdgeCount();
 570  
     }
 571  
     WorkerProgress.get().finishLoadingVertices();
 572  
 
 573  
     if (getConfiguration().hasEdgeInputFormat()) {
 574  
       getContext().progress();
 575  
       try {
 576  
         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
 577  
       } catch (InterruptedException e) {
 578  
         throw new IllegalStateException(
 579  
             "setup: loadEdges failed with InterruptedException", e);
 580  
       } catch (KeeperException e) {
 581  
         throw new IllegalStateException(
 582  
             "setup: loadEdges failed with KeeperException", e);
 583  
       }
 584  
       getContext().progress();
 585  
     }
 586  
     WorkerProgress.get().finishLoadingEdges();
 587  
 
 588  
     if (LOG.isInfoEnabled()) {
 589  
       LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
 590  
     }
 591  
 
 592  
     markCurrentWorkerDoneReadingThenWaitForOthers();
 593  
 
 594  
     // Create remaining partitions owned by this worker.
 595  
     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
 596  
       if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
 597  
           !getPartitionStore().hasPartition(
 598  
               partitionOwner.getPartitionId())) {
 599  
         Partition<I, V, E> partition =
 600  
             getConfiguration().createPartition(
 601  
                 partitionOwner.getPartitionId(), getContext());
 602  
         getPartitionStore().addPartition(partition);
 603  
       }
 604  
     }
 605  
 
 606  
     // remove mapping store if possible
 607  
     localData.removeMappingStoreIfPossible();
 608  
 
 609  
     if (getConfiguration().hasEdgeInputFormat()) {
 610  
       // Move edges from temporary storage to their source vertices.
 611  
       getServerData().getEdgeStore().moveEdgesToVertices();
 612  
     }
 613  
 
 614  
     // Generate the partition stats for the input superstep and process
 615  
     // if necessary
 616  
     List<PartitionStats> partitionStatsList =
 617  
         new ArrayList<PartitionStats>();
 618  
     PartitionStore<I, V, E> partitionStore = getPartitionStore();
 619  
     for (Integer partitionId : partitionStore.getPartitionIds()) {
 620  
       PartitionStats partitionStats =
 621  
           new PartitionStats(partitionId,
 622  
               partitionStore.getPartitionVertexCount(partitionId),
 623  
               0,
 624  
               partitionStore.getPartitionEdgeCount(partitionId),
 625  
               0,
 626  
               0,
 627  
               workerInfo.getHostnameId());
 628  
       partitionStatsList.add(partitionStats);
 629  
     }
 630  
     workerGraphPartitioner.finalizePartitionStats(
 631  
         partitionStatsList, getPartitionStore());
 632  
 
 633  
     return finishSuperstep(partitionStatsList, null);
 634  
   }
 635  
 
 636  
   /**
 637  
    * Register the health of this worker for a given superstep
 638  
    */
 639  
   private void registerHealth() {
 640  
 
 641  
     String myHealthPath = null;
 642  
     if (isHealthy()) {
 643  
       myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
 644  
           getSuperstep());
 645  
     } else {
 646  
       myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
 647  
           getSuperstep());
 648  
     }
 649  
     myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
 650  
     try {
 651  
       myHealthZnode = getZkExt().createExt(
 652  
           myHealthPath,
 653  
           WritableUtils.writeToByteArray(workerInfo),
 654  
           Ids.OPEN_ACL_UNSAFE,
 655  
           CreateMode.EPHEMERAL,
 656  
           true);
 657  
     } catch (KeeperException.NodeExistsException e) {
 658  
       LOG.warn("registerHealth: myHealthPath already exists (likely " +
 659  
           "from previous failure): " + myHealthPath +
 660  
           ".  Waiting for change in attempts " +
 661  
           "to re-join the application");
 662  
       getApplicationAttemptChangedEvent().waitForTimeoutOrFail(
 663  
           GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
 664  
               getConfiguration()));
 665  
       if (LOG.isInfoEnabled()) {
 666  
         LOG.info("registerHealth: Got application " +
 667  
             "attempt changed event, killing self");
 668  
       }
 669  
       throw new IllegalStateException(
 670  
           "registerHealth: Trying " +
 671  
               "to get the new application attempt by killing self", e);
 672  
     } catch (KeeperException e) {
 673  
       throw new IllegalStateException("Creating " + myHealthPath +
 674  
           " failed with KeeperException", e);
 675  
     } catch (InterruptedException e) {
 676  
       throw new IllegalStateException("Creating " + myHealthPath +
 677  
           " failed with InterruptedException", e);
 678  
     }
 679  
     if (LOG.isInfoEnabled()) {
 680  
       LOG.info("registerHealth: Created my health node for attempt=" +
 681  
           getApplicationAttempt() + ", superstep=" +
 682  
           getSuperstep() + " with " + myHealthZnode +
 683  
           " and workerInfo= " + workerInfo);
 684  
     }
 685  
   }
 686  
 
 687  
   /**
 688  
    * Do this to help notify the master quicker that this worker has failed.
 689  
    */
 690  
   private void unregisterHealth() {
 691  
     LOG.error("unregisterHealth: Got failure, unregistering health on " +
 692  
         myHealthZnode + " on superstep " + getSuperstep());
 693  
     try {
 694  
       getZkExt().deleteExt(myHealthZnode, -1, false);
 695  
     } catch (InterruptedException e) {
 696  
       throw new IllegalStateException(
 697  
           "unregisterHealth: InterruptedException - Couldn't delete " +
 698  
               myHealthZnode, e);
 699  
     } catch (KeeperException e) {
 700  
       throw new IllegalStateException(
 701  
           "unregisterHealth: KeeperException - Couldn't delete " +
 702  
               myHealthZnode, e);
 703  
     }
 704  
   }
 705  
 
 706  
   @Override
 707  
   public void failureCleanup() {
 708  
     unregisterHealth();
 709  
   }
 710  
 
 711  
   @Override
 712  
   public Collection<? extends PartitionOwner> startSuperstep() {
 713  
     // Algorithm:
 714  
     // 1. Communication service will combine message from previous
 715  
     //    superstep
 716  
     // 2. Register my health for the next superstep.
 717  
     // 3. Wait until the partition assignment is complete and get it
 718  
     // 4. Get the aggregator values from the previous superstep
 719  
     if (getSuperstep() != INPUT_SUPERSTEP) {
 720  
       workerServer.prepareSuperstep();
 721  
     }
 722  
 
 723  
     registerHealth();
 724  
 
 725  
     AddressesAndPartitionsWritable addressesAndPartitions =
 726  
         addressesAndPartitionsHolder.getElement(getContext());
 727  
 
 728  
     workerInfoList.clear();
 729  
     workerInfoList = addressesAndPartitions.getWorkerInfos();
 730  
     masterInfo = addressesAndPartitions.getMasterInfo();
 731  
     workerServer.resetBytesReceivedPerSuperstep();
 732  
 
 733  
     if (LOG.isInfoEnabled()) {
 734  
       LOG.info("startSuperstep: " + masterInfo);
 735  
     }
 736  
 
 737  
     getContext().setStatus("startSuperstep: " +
 738  
         getGraphTaskManager().getGraphFunctions().toString() +
 739  
         " - Attempt=" + getApplicationAttempt() +
 740  
         ", Superstep=" + getSuperstep());
 741  
 
 742  
     if (LOG.isDebugEnabled()) {
 743  
       LOG.debug("startSuperstep: addressesAndPartitions" +
 744  
           addressesAndPartitions.getWorkerInfos());
 745  
       for (PartitionOwner partitionOwner : addressesAndPartitions
 746  
           .getPartitionOwners()) {
 747  
         LOG.debug(partitionOwner.getPartitionId() + " " +
 748  
             partitionOwner.getWorkerInfo());
 749  
       }
 750  
     }
 751  
 
 752  
     return addressesAndPartitions.getPartitionOwners();
 753  
   }
 754  
 
 755  
   @Override
 756  
   public FinishedSuperstepStats finishSuperstep(
 757  
       List<PartitionStats> partitionStatsList,
 758  
       GiraphTimerContext superstepTimerContext) {
 759  
     // This barrier blocks until success (or the master signals it to
 760  
     // restart).
 761  
     //
 762  
     // Master will coordinate the barriers and aggregate "doneness" of all
 763  
     // the vertices.  Each worker will:
 764  
     // 1. Ensure that the requests are complete
 765  
     // 2. Execute user postSuperstep() if necessary.
 766  
     // 3. Save aggregator values that are in use.
 767  
     // 4. Report the statistics (vertices, edges, messages, etc.)
 768  
     //    of this worker
 769  
     // 5. Let the master know it is finished.
 770  
     // 6. Wait for the master's superstep info, and check if done
 771  
     waitForRequestsToFinish();
 772  
 
 773  
     getGraphTaskManager().notifyFinishedCommunication();
 774  
 
 775  
     long workerSentMessages = 0;
 776  
     long workerSentMessageBytes = 0;
 777  
     long localVertices = 0;
 778  
     for (PartitionStats partitionStats : partitionStatsList) {
 779  
       workerSentMessages += partitionStats.getMessagesSentCount();
 780  
       workerSentMessageBytes += partitionStats.getMessageBytesSentCount();
 781  
       localVertices += partitionStats.getVertexCount();
 782  
     }
 783  
 
 784  
     if (getSuperstep() != INPUT_SUPERSTEP) {
 785  
       postSuperstepCallbacks();
 786  
     }
 787  
 
 788  
     globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
 789  
 
 790  
     MessageStore<I, Writable> incomingMessageStore =
 791  
         getServerData().getIncomingMessageStore();
 792  
     if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
 793  
       ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
 794  
     }
 795  
 
 796  
     if (LOG.isInfoEnabled()) {
 797  
       LOG.info("finishSuperstep: Superstep " + getSuperstep() +
 798  
           ", messages = " + workerSentMessages + " " +
 799  
           ", message bytes = " + workerSentMessageBytes + " , " +
 800  
           MemoryUtils.getRuntimeMemoryStats());
 801  
     }
 802  
 
 803  
     if (superstepTimerContext != null) {
 804  
       superstepTimerContext.stop();
 805  
     }
 806  
     writeFinshedSuperstepInfoToZK(partitionStatsList,
 807  
       workerSentMessages, workerSentMessageBytes);
 808  
     // Store the counters uptil the end of the current superstep to the
 809  
     // zookeeper, as best-effort so that the master has some counter values
 810  
     // in case of a job failure
 811  
     storeCountersInZooKeeper(false);
 812  
 
 813  
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
 814  
         "finishSuperstep: (waiting for rest " +
 815  
             "of workers) " +
 816  
             getGraphTaskManager().getGraphFunctions().toString() +
 817  
             " - Attempt=" + getApplicationAttempt() +
 818  
             ", Superstep=" + getSuperstep());
 819  
 
 820  
     String superstepFinishedNode =
 821  
         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
 822  
 
 823  
     waitForOtherWorkers(superstepFinishedNode);
 824  
 
 825  
     GlobalStats globalStats = new GlobalStats();
 826  
     SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
 827  
         getConfiguration());
 828  
     WritableUtils.readFieldsFromZnode(
 829  
         getZkExt(), superstepFinishedNode, false, null, globalStats,
 830  
         superstepClasses);
 831  
     if (LOG.isInfoEnabled()) {
 832  
       LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
 833  
           " with global stats " + globalStats + " and classes " +
 834  
           superstepClasses);
 835  
     }
 836  
     getContext().setStatus("finishSuperstep: (all workers done) " +
 837  
         getGraphTaskManager().getGraphFunctions().toString() +
 838  
         " - Attempt=" + getApplicationAttempt() +
 839  
         ", Superstep=" + getSuperstep());
 840  
     incrCachedSuperstep();
 841  
     getConfiguration().updateSuperstepClasses(superstepClasses);
 842  
 
 843  
     return new FinishedSuperstepStats(
 844  
         localVertices,
 845  
         globalStats.getHaltComputation(),
 846  
         globalStats.getVertexCount(),
 847  
         globalStats.getEdgeCount(),
 848  
         false,
 849  
         globalStats.getCheckpointStatus());
 850  
   }
 851  
 
 852  
   /**
 853  
    * Handle post-superstep callbacks
 854  
    */
 855  
   private void postSuperstepCallbacks() {
 856  
     GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
 857  
     getWorkerContext().postSuperstep();
 858  
     timerContext.stop();
 859  
     getContext().progress();
 860  
 
 861  
     for (WorkerObserver obs : getWorkerObservers()) {
 862  
       obs.postSuperstep(getSuperstep());
 863  
       getContext().progress();
 864  
     }
 865  
   }
 866  
 
 867  
   /**
 868  
    * Wait for all the requests to finish.
 869  
    */
 870  
   private void waitForRequestsToFinish() {
 871  
     if (LOG.isInfoEnabled()) {
 872  
       LOG.info("finishSuperstep: Waiting on all requests, superstep " +
 873  
           getSuperstep() + " " +
 874  
           MemoryUtils.getRuntimeMemoryStats());
 875  
     }
 876  
     GiraphTimerContext timerContext = waitRequestsTimer.time();
 877  
     workerClient.waitAllRequests();
 878  
     timerContext.stop();
 879  
   }
 880  
 
 881  
   /**
 882  
    * Wait for all the other Workers to finish the superstep.
 883  
    *
 884  
    * @param superstepFinishedNode ZooKeeper path to wait on.
 885  
    */
 886  
   private void waitForOtherWorkers(String superstepFinishedNode) {
 887  
     try {
 888  
       while (getZkExt().exists(superstepFinishedNode, true) == null) {
 889  
         getSuperstepFinishedEvent().waitForTimeoutOrFail(
 890  
             GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
 891  
                 getConfiguration()));
 892  
         getSuperstepFinishedEvent().reset();
 893  
       }
 894  
     } catch (KeeperException e) {
 895  
       throw new IllegalStateException(
 896  
           "finishSuperstep: Failed while waiting for master to " +
 897  
               "signal completion of superstep " + getSuperstep(), e);
 898  
     } catch (InterruptedException e) {
 899  
       throw new IllegalStateException(
 900  
           "finishSuperstep: Failed while waiting for master to " +
 901  
               "signal completion of superstep " + getSuperstep(), e);
 902  
     }
 903  
   }
 904  
 
 905  
   /**
 906  
    * Write finished superstep info to ZooKeeper.
 907  
    *
 908  
    * @param partitionStatsList List of partition stats from superstep.
 909  
    * @param workerSentMessages Number of messages sent in superstep.
 910  
    * @param workerSentMessageBytes Number of message bytes sent
 911  
    *                               in superstep.
 912  
    */
 913  
   private void writeFinshedSuperstepInfoToZK(
 914  
       List<PartitionStats> partitionStatsList, long workerSentMessages,
 915  
       long workerSentMessageBytes) {
 916  
     Collection<PartitionStats> finalizedPartitionStats =
 917  
         workerGraphPartitioner.finalizePartitionStats(
 918  
             partitionStatsList, getPartitionStore());
 919  
     workerClient.sendWritableRequest(masterInfo.getTaskId(),
 920  
         new PartitionStatsRequest(finalizedPartitionStats));
 921  
     WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
 922  
     metrics.readFromRegistry();
 923  
     byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
 924  
 
 925  
     JSONObject workerFinishedInfoObj = new JSONObject();
 926  
     try {
 927  
       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
 928  
       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGE_BYTES_KEY,
 929  
         workerSentMessageBytes);
 930  
       workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
 931  
           Base64.encodeBytes(metricsBytes));
 932  
     } catch (JSONException e) {
 933  
       throw new RuntimeException(e);
 934  
     }
 935  
 
 936  
     String finishedWorkerPath =
 937  
         getWorkerMetricsFinishedPath(getApplicationAttempt(), getSuperstep()) +
 938  
         "/" + workerInfo.getHostnameId();
 939  
     try {
 940  
       getZkExt().createExt(finishedWorkerPath,
 941  
           workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
 942  
           Ids.OPEN_ACL_UNSAFE,
 943  
           CreateMode.PERSISTENT,
 944  
           true);
 945  
     } catch (KeeperException.NodeExistsException e) {
 946  
       LOG.warn("finishSuperstep: finished worker path " +
 947  
           finishedWorkerPath + " already exists!");
 948  
     } catch (KeeperException e) {
 949  
       throw new IllegalStateException("Creating " + finishedWorkerPath +
 950  
           " failed with KeeperException", e);
 951  
     } catch (InterruptedException e) {
 952  
       throw new IllegalStateException("Creating " + finishedWorkerPath +
 953  
           " failed with InterruptedException", e);
 954  
     }
 955  
   }
 956  
 
 957  
   /**
 958  
    * Save the vertices using the user-defined VertexOutputFormat from our
 959  
    * vertexArray based on the split.
 960  
    *
 961  
    * @param numLocalVertices Number of local vertices
 962  
    * @throws InterruptedException
 963  
    */
 964  
   private void saveVertices(long numLocalVertices) throws IOException,
 965  
       InterruptedException {
 966  
     ImmutableClassesGiraphConfiguration<I, V, E>  conf = getConfiguration();
 967  
 
 968  
     if (conf.getVertexOutputFormatClass() == null) {
 969  
       LOG.warn("saveVertices: " +
 970  
           GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
 971  
           " not specified -- there will be no saved output");
 972  
       return;
 973  
     }
 974  
     if (conf.doOutputDuringComputation()) {
 975  
       if (LOG.isInfoEnabled()) {
 976  
         LOG.info("saveVertices: The option for doing output during " +
 977  
             "computation is selected, so there will be no saving of the " +
 978  
             "output in the end of application");
 979  
       }
 980  
       return;
 981  
     }
 982  
 
 983  
     final int numPartitions = getPartitionStore().getNumPartitions();
 984  
     int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
 985  
         numPartitions);
 986  
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
 987  
         "saveVertices: Starting to save " + numLocalVertices + " vertices " +
 988  
             "using " + numThreads + " threads");
 989  
     final VertexOutputFormat<I, V, E> vertexOutputFormat =
 990  
         getConfiguration().createWrappedVertexOutputFormat();
 991  
     vertexOutputFormat.preWriting(getContext());
 992  
 
 993  
     getPartitionStore().startIteration();
 994  
 
 995  
     long verticesToStore = 0;
 996  
     PartitionStore<I, V, E> partitionStore = getPartitionStore();
 997  
     for (int partitionId : partitionStore.getPartitionIds()) {
 998  
       verticesToStore += partitionStore.getPartitionVertexCount(partitionId);
 999  
     }
 1000  
     WorkerProgress.get().startStoring(
 1001  
         verticesToStore, getPartitionStore().getNumPartitions());
 1002  
 
 1003  0
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
 1004  
       @Override
 1005  
       public Callable<Void> newCallable(int callableId) {
 1006  0
         return new Callable<Void>() {
 1007  
           /** How often to update WorkerProgress */
 1008  
           private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
 1009  
 
 1010  
           @Override
 1011  
           public Void call() throws Exception {
 1012  0
             VertexWriter<I, V, E> vertexWriter =
 1013  0
                 vertexOutputFormat.createVertexWriter(getContext());
 1014  0
             vertexWriter.setConf(getConfiguration());
 1015  0
             vertexWriter.initialize(getContext());
 1016  0
             long nextPrintVertices = 0;
 1017  0
             long nextUpdateProgressVertices = VERTICES_TO_UPDATE_PROGRESS;
 1018  0
             long nextPrintMsecs = System.currentTimeMillis() + 15000;
 1019  0
             int partitionIndex = 0;
 1020  0
             int numPartitions = getPartitionStore().getNumPartitions();
 1021  
             while (true) {
 1022  0
               Partition<I, V, E> partition =
 1023  0
                   getPartitionStore().getNextPartition();
 1024  0
               if (partition == null) {
 1025  0
                 break;
 1026  
               }
 1027  
 
 1028  0
               long verticesWritten = 0;
 1029  0
               for (Vertex<I, V, E> vertex : partition) {
 1030  0
                 vertexWriter.writeVertex(vertex);
 1031  0
                 ++verticesWritten;
 1032  
 
 1033  
                 // Update status at most every 250k vertices or 15 seconds
 1034  0
                 if (verticesWritten > nextPrintVertices &&
 1035  0
                     System.currentTimeMillis() > nextPrintMsecs) {
 1036  0
                   LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
 1037  
                       "saveVertices: Saved " + verticesWritten + " out of " +
 1038  0
                           partition.getVertexCount() + " partition vertices, " +
 1039  
                           "on partition " + partitionIndex +
 1040  
                           " out of " + numPartitions);
 1041  0
                   nextPrintMsecs = System.currentTimeMillis() + 15000;
 1042  0
                   nextPrintVertices = verticesWritten + 250000;
 1043  
                 }
 1044  
 
 1045  0
                 if (verticesWritten >= nextUpdateProgressVertices) {
 1046  0
                   WorkerProgress.get().addVerticesStored(
 1047  
                       VERTICES_TO_UPDATE_PROGRESS);
 1048  0
                   nextUpdateProgressVertices += VERTICES_TO_UPDATE_PROGRESS;
 1049  
                 }
 1050  0
               }
 1051  0
               getPartitionStore().putPartition(partition);
 1052  0
               ++partitionIndex;
 1053  0
               WorkerProgress.get().addVerticesStored(
 1054  
                   verticesWritten % VERTICES_TO_UPDATE_PROGRESS);
 1055  0
               WorkerProgress.get().incrementPartitionsStored();
 1056  0
             }
 1057  0
             vertexWriter.close(getContext()); // the temp results are saved now
 1058  0
             return null;
 1059  
           }
 1060  
         };
 1061  
       }
 1062  
     };
 1063  
     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
 1064  
         "save-vertices-%d", getContext());
 1065  
 
 1066  
     vertexOutputFormat.postWriting(getContext());
 1067  
 
 1068  
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
 1069  
       "saveVertices: Done saving vertices.");
 1070  
     // YARN: must complete the commit the "task" output, Hadoop isn't there.
 1071  
     if (getConfiguration().isPureYarnJob() &&
 1072  
       getConfiguration().getVertexOutputFormatClass() != null) {
 1073  
       try {
 1074  
         OutputCommitter outputCommitter =
 1075  
           vertexOutputFormat.getOutputCommitter(getContext());
 1076  
         if (outputCommitter.needsTaskCommit(getContext())) {
 1077  
           LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
 1078  
             "OutputCommitter: committing task output.");
 1079  
           // transfer from temp dirs to "task commit" dirs to prep for
 1080  
           // the master's OutputCommitter#commitJob(context) call to finish.
 1081  
           outputCommitter.commitTask(getContext());
 1082  
         }
 1083  
       } catch (InterruptedException ie) {
 1084  
         LOG.error("Interrupted while attempting to obtain " +
 1085  
           "OutputCommitter.", ie);
 1086  
       } catch (IOException ioe) {
 1087  
         LOG.error("Master task's attempt to commit output has " +
 1088  
           "FAILED.", ioe);
 1089  
       }
 1090  
     }
 1091  
   }
 1092  
 
 1093  
   /**
 1094  
    * Save the edges using the user-defined EdgeOutputFormat from our
 1095  
    * vertexArray based on the split.
 1096  
    *
 1097  
    * @throws InterruptedException
 1098  
    */
 1099  
   private void saveEdges() throws IOException, InterruptedException {
 1100  
     final ImmutableClassesGiraphConfiguration<I, V, E>  conf =
 1101  
       getConfiguration();
 1102  
 
 1103  
     if (conf.getEdgeOutputFormatClass() == null) {
 1104  
       LOG.warn("saveEdges: " +
 1105  
                GiraphConstants.EDGE_OUTPUT_FORMAT_CLASS +
 1106  
                "Make sure that the EdgeOutputFormat is not required.");
 1107  
       return;
 1108  
     }
 1109  
 
 1110  
     final int numPartitions = getPartitionStore().getNumPartitions();
 1111  
     int numThreads = Math.min(conf.getNumOutputThreads(),
 1112  
         numPartitions);
 1113  
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
 1114  
         "saveEdges: Starting to save the edges using " +
 1115  
         numThreads + " threads");
 1116  
     final EdgeOutputFormat<I, V, E> edgeOutputFormat =
 1117  
         conf.createWrappedEdgeOutputFormat();
 1118  
     edgeOutputFormat.preWriting(getContext());
 1119  
 
 1120  
     getPartitionStore().startIteration();
 1121  
 
 1122  0
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
 1123  
       @Override
 1124  
       public Callable<Void> newCallable(int callableId) {
 1125  0
         return new Callable<Void>() {
 1126  
           @Override
 1127  
           public Void call() throws Exception {
 1128  0
             EdgeWriter<I, V, E>  edgeWriter =
 1129  0
                 edgeOutputFormat.createEdgeWriter(getContext());
 1130  0
             edgeWriter.setConf(conf);
 1131  0
             edgeWriter.initialize(getContext());
 1132  
 
 1133  0
             long nextPrintVertices = 0;
 1134  0
             long nextPrintMsecs = System.currentTimeMillis() + 15000;
 1135  0
             int partitionIndex = 0;
 1136  0
             int numPartitions = getPartitionStore().getNumPartitions();
 1137  
             while (true) {
 1138  0
               Partition<I, V, E> partition =
 1139  0
                   getPartitionStore().getNextPartition();
 1140  0
               if (partition == null) {
 1141  0
                 break;
 1142  
               }
 1143  
 
 1144  0
               long vertices = 0;
 1145  0
               long edges = 0;
 1146  0
               long partitionEdgeCount = partition.getEdgeCount();
 1147  0
               for (Vertex<I, V, E> vertex : partition) {
 1148  0
                 for (Edge<I, E> edge : vertex.getEdges()) {
 1149  0
                   edgeWriter.writeEdge(vertex.getId(), vertex.getValue(), edge);
 1150  0
                   ++edges;
 1151  0
                 }
 1152  0
                 ++vertices;
 1153  
 
 1154  
                 // Update status at most every 250k vertices or 15 seconds
 1155  0
                 if (vertices > nextPrintVertices &&
 1156  0
                     System.currentTimeMillis() > nextPrintMsecs) {
 1157  0
                   LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
 1158  
                       "saveEdges: Saved " + edges +
 1159  
                       " edges out of " + partitionEdgeCount +
 1160  
                       " partition edges, on partition " + partitionIndex +
 1161  
                       " out of " + numPartitions);
 1162  0
                   nextPrintMsecs = System.currentTimeMillis() + 15000;
 1163  0
                   nextPrintVertices = vertices + 250000;
 1164  
                 }
 1165  0
               }
 1166  0
               getPartitionStore().putPartition(partition);
 1167  0
               ++partitionIndex;
 1168  0
             }
 1169  0
             edgeWriter.close(getContext()); // the temp results are saved now
 1170  0
             return null;
 1171  
           }
 1172  
         };
 1173  
       }
 1174  
     };
 1175  
     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
 1176  
         "save-vertices-%d", getContext());
 1177  
 
 1178  
     edgeOutputFormat.postWriting(getContext());
 1179  
 
 1180  
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
 1181  
       "saveEdges: Done saving edges.");
 1182  
     // YARN: must complete the commit the "task" output, Hadoop isn't there.
 1183  
     if (conf.isPureYarnJob() &&
 1184  
       conf.getVertexOutputFormatClass() != null) {
 1185  
       try {
 1186  
         OutputCommitter outputCommitter =
 1187  
           edgeOutputFormat.getOutputCommitter(getContext());
 1188  
         if (outputCommitter.needsTaskCommit(getContext())) {
 1189  
           LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
 1190  
             "OutputCommitter: committing task output.");
 1191  
           // transfer from temp dirs to "task commit" dirs to prep for
 1192  
           // the master's OutputCommitter#commitJob(context) call to finish.
 1193  
           outputCommitter.commitTask(getContext());
 1194  
         }
 1195  
       } catch (InterruptedException ie) {
 1196  
         LOG.error("Interrupted while attempting to obtain " +
 1197  
           "OutputCommitter.", ie);
 1198  
       } catch (IOException ioe) {
 1199  
         LOG.error("Master task's attempt to commit output has " +
 1200  
           "FAILED.", ioe);
 1201  
       }
 1202  
     }
 1203  
   }
 1204  
 
 1205  
   @Override
 1206  
   public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
 1207  
     throws IOException, InterruptedException {
 1208  
     workerClient.closeConnections();
 1209  
     setCachedSuperstep(getSuperstep() - 1);
 1210  
     if (finishedSuperstepStats.getCheckpointStatus() !=
 1211  
         CheckpointStatus.CHECKPOINT_AND_HALT) {
 1212  
       saveVertices(finishedSuperstepStats.getLocalVertexCount());
 1213  
       saveEdges();
 1214  
     }
 1215  
     WorkerProgress.get().finishStoring();
 1216  
     if (workerProgressWriter != null) {
 1217  
       workerProgressWriter.stop();
 1218  
     }
 1219  
     getPartitionStore().shutdown();
 1220  
     // All worker processes should denote they are done by adding special
 1221  
     // znode.  Once the number of znodes equals the number of partitions
 1222  
     // for workers and masters, the master will clean up the ZooKeeper
 1223  
     // znodes associated with this job.
 1224  
     String workerCleanedUpPath = cleanedUpPath  + "/" +
 1225  
         getTaskId() + WORKER_SUFFIX;
 1226  
     try {
 1227  
       String finalFinishedPath =
 1228  
           getZkExt().createExt(workerCleanedUpPath,
 1229  
               null,
 1230  
               Ids.OPEN_ACL_UNSAFE,
 1231  
               CreateMode.PERSISTENT,
 1232  
               true);
 1233  
       if (LOG.isInfoEnabled()) {
 1234  
         LOG.info("cleanup: Notifying master its okay to cleanup with " +
 1235  
             finalFinishedPath);
 1236  
       }
 1237  
     } catch (KeeperException.NodeExistsException e) {
 1238  
       if (LOG.isInfoEnabled()) {
 1239  
         LOG.info("cleanup: Couldn't create finished node '" +
 1240  
             workerCleanedUpPath);
 1241  
       }
 1242  
     } catch (KeeperException e) {
 1243  
       // Cleaning up, it's okay to fail after cleanup is successful
 1244  
       LOG.error("cleanup: Got KeeperException on notification " +
 1245  
           "to master about cleanup", e);
 1246  
     } catch (InterruptedException e) {
 1247  
       // Cleaning up, it's okay to fail after cleanup is successful
 1248  
       LOG.error("cleanup: Got InterruptedException on notification " +
 1249  
           "to master about cleanup", e);
 1250  
     }
 1251  
   }
 1252  
 
 1253  
   /**
 1254  
    * Method to store the counter values in the zookeeper
 1255  
    * This is called at the end of each superstep and after finishing all the
 1256  
    * supersteps
 1257  
    * @param allSuperstepsDone Whether the job has finished all supersteps
 1258  
    * This is needed to ensure the superstep number is the same for master and
 1259  
    * worker, when all supersteps are finished
 1260  
    *
 1261  
    */
 1262  
   public void storeCountersInZooKeeper(boolean allSuperstepsDone) {
 1263  
     Set<CustomCounter> additionalCounters =
 1264  
             CustomCounters.getAndClearCustomCounters();
 1265  
 
 1266  
     JSONArray jsonCounters = new JSONArray();
 1267  
     Mapper.Context context = getContext();
 1268  
     Counter counter;
 1269  
     for (CustomCounter customCounter : additionalCounters) {
 1270  
       String groupName = customCounter.getGroupName();
 1271  
       String counterName = customCounter.getCounterName();
 1272  
       counter = context.getCounter(groupName, counterName);
 1273  
       customCounter.setValue(counter.getValue());
 1274  
       jsonCounters.put(Base64.encodeBytes(
 1275  
               WritableUtils.writeToByteArray(customCounter)));
 1276  
     }
 1277  
     // Add the Netty counters
 1278  
     Map<String, Set<String>> nettyCounters =
 1279  
             NettyClient.getCounterGroupsAndNames();
 1280  
     for (Map.Entry<String, Set<String>> entry : nettyCounters.entrySet()) {
 1281  
       String groupName = entry.getKey();
 1282  
       for (String counterName: entry.getValue()) {
 1283  
         CustomCounter customCounter = new CustomCounter(groupName, counterName,
 1284  
                 CustomCounter.Aggregation.SUM);
 1285  
         counter = context.getCounter(groupName, counterName);
 1286  
         customCounter.setValue(counter.getValue());
 1287  
         jsonCounters.put(Base64.encodeBytes(
 1288  
                 WritableUtils.writeToByteArray(customCounter)));
 1289  
       }
 1290  
     }
 1291  
     long superStep = getSuperstep() + (allSuperstepsDone ? 1 : 0);
 1292  
     String finishedWorkerPath =
 1293  
             getWorkerCountersFinishedPath(getApplicationAttempt(), superStep) +
 1294  
                     "/" + workerInfo.getHostnameId();
 1295  
     LOG.info(String.format("Writing counters to zookeeper for superstep: %d",
 1296  
             superStep));
 1297  
     try {
 1298  
       getZkExt().createExt(finishedWorkerPath,
 1299  
               jsonCounters.toString().getBytes(
 1300  
                       Charset.defaultCharset()),
 1301  
               Ids.OPEN_ACL_UNSAFE,
 1302  
               CreateMode.PERSISTENT,
 1303  
               true);
 1304  
     } catch (KeeperException.NodeExistsException e) {
 1305  
       LOG.warn("storeCountersInZookeeper: finished worker path " +
 1306  
               finishedWorkerPath + " already exists!");
 1307  
     } catch (KeeperException e) {
 1308  
       LOG.warn("Creating " + finishedWorkerPath +
 1309  
               " failed with KeeperException", e);
 1310  
     } catch (InterruptedException e) {
 1311  
       LOG.warn("Creating " + finishedWorkerPath +
 1312  
               " failed with InterruptedException", e);
 1313  
     }
 1314  
   }
 1315  
 
 1316  
   /**
 1317  
    * Method to close the zookeeper connection, after the worker has sent
 1318  
    * the counters to the master
 1319  
    *
 1320  
    */
 1321  
   public void closeZooKeeper() {
 1322  
     try {
 1323  
       getZkExt().close();
 1324  
     } catch (InterruptedException e) {
 1325  
       // cleanup phase -- just log the error
 1326  
       LOG.error("cleanup: Zookeeper failed to close with " + e);
 1327  
     }
 1328  
 
 1329  
     if (getConfiguration().metricsEnabled()) {
 1330  
       GiraphMetrics.get().dumpToStream(System.err);
 1331  
     }
 1332  
 
 1333  
     // Preferably would shut down the service only after
 1334  
     // all clients have disconnected (or the exceptions on the
 1335  
     // client side ignored).
 1336  
     workerServer.close();
 1337  
   }
 1338  
 
 1339  
   @Override
 1340  
   public void storeCheckpoint() throws IOException {
 1341  
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
 1342  
         "storeCheckpoint: Starting checkpoint " +
 1343  
             getGraphTaskManager().getGraphFunctions().toString() +
 1344  
             " - Attempt=" + getApplicationAttempt() +
 1345  
             ", Superstep=" + getSuperstep());
 1346  
 
 1347  
     // Algorithm:
 1348  
     // For each partition, dump vertices and messages
 1349  
     Path metadataFilePath = createCheckpointFilePathSafe(
 1350  
         CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
 1351  
     Path validFilePath = createCheckpointFilePathSafe(
 1352  
         CheckpointingUtils.CHECKPOINT_VALID_POSTFIX);
 1353  
     Path checkpointFilePath = createCheckpointFilePathSafe(
 1354  
         CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
 1355  
 
 1356  
 
 1357  
     // Metadata is buffered and written at the end since it's small and
 1358  
     // needs to know how many partitions this worker owns
 1359  
     FSDataOutputStream metadataOutputStream =
 1360  
         getFs().create(metadataFilePath);
 1361  
     metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
 1362  
 
 1363  
     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
 1364  
       metadataOutputStream.writeInt(partitionId);
 1365  
     }
 1366  
     metadataOutputStream.close();
 1367  
 
 1368  
     storeCheckpointVertices();
 1369  
 
 1370  
     FSDataOutputStream checkpointOutputStream =
 1371  
         getFs().create(checkpointFilePath);
 1372  
     workerContext.write(checkpointOutputStream);
 1373  
     getContext().progress();
 1374  
 
 1375  
     // TODO: checkpointing messages along with vertices to avoid multiple loads
 1376  
     //       of a partition when out-of-core is enabled.
 1377  
     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
 1378  
       // write messages
 1379  
       checkpointOutputStream.writeInt(partitionId);
 1380  
       getServerData().getCurrentMessageStore()
 1381  
           .writePartition(checkpointOutputStream, partitionId);
 1382  
       getContext().progress();
 1383  
 
 1384  
     }
 1385  
 
 1386  
     List<Writable> w2wMessages =
 1387  
         getServerData().getCurrentWorkerToWorkerMessages();
 1388  
     WritableUtils.writeList(w2wMessages, checkpointOutputStream);
 1389  
 
 1390  
     checkpointOutputStream.close();
 1391  
 
 1392  
     getFs().createNewFile(validFilePath);
 1393  
 
 1394  
     // Notify master that checkpoint is stored
 1395  
     String workerWroteCheckpoint =
 1396  
         getWorkerWroteCheckpointPath(getApplicationAttempt(),
 1397  
             getSuperstep()) + "/" + workerInfo.getHostnameId();
 1398  
     try {
 1399  
       getZkExt().createExt(workerWroteCheckpoint,
 1400  
           new byte[0],
 1401  
           Ids.OPEN_ACL_UNSAFE,
 1402  
           CreateMode.PERSISTENT,
 1403  
           true);
 1404  
     } catch (KeeperException.NodeExistsException e) {
 1405  
       LOG.warn("storeCheckpoint: wrote checkpoint worker path " +
 1406  
           workerWroteCheckpoint + " already exists!");
 1407  
     } catch (KeeperException e) {
 1408  
       throw new IllegalStateException("Creating " + workerWroteCheckpoint +
 1409  
           " failed with KeeperException", e);
 1410  
     } catch (InterruptedException e) {
 1411  
       throw new IllegalStateException("Creating " +
 1412  
           workerWroteCheckpoint +
 1413  
           " failed with InterruptedException", e);
 1414  
     }
 1415  
   }
 1416  
 
 1417  
   /**
 1418  
    * Create checkpoint file safely. If file already exists remove it first.
 1419  
    * @param name file extension
 1420  
    * @return full file path to newly created file
 1421  
    * @throws IOException
 1422  
    */
 1423  
   private Path createCheckpointFilePathSafe(String name) throws IOException {
 1424  
     Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + '.' +
 1425  
         getWorkerId(workerInfo) + name);
 1426  
     // Remove these files if they already exist (shouldn't though, unless
 1427  
     // of previous failure of this worker)
 1428  
     if (getFs().delete(validFilePath, false)) {
 1429  
       LOG.warn("storeCheckpoint: Removed " + name + " file " +
 1430  
           validFilePath);
 1431  
     }
 1432  
     return validFilePath;
 1433  
   }
 1434  
 
 1435  
   /**
 1436  
    * Returns path to saved checkpoint.
 1437  
    * Doesn't check if file actually exists.
 1438  
    * @param superstep saved superstep.
 1439  
    * @param name extension name
 1440  
    * @return fill file path to checkpoint file
 1441  
    */
 1442  
   private Path getSavedCheckpoint(long superstep, String name) {
 1443  
     return new Path(getSavedCheckpointBasePath(superstep) + '.' +
 1444  
         getWorkerId(workerInfo) + name);
 1445  
   }
 1446  
 
 1447  
   /**
 1448  
    * Save partitions. To speed up this operation
 1449  
    * runs in multiple threads.
 1450  
    */
 1451  
   private void storeCheckpointVertices() {
 1452  
     final int numPartitions = getPartitionStore().getNumPartitions();
 1453  
     int numThreads = Math.min(
 1454  
         GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
 1455  
         numPartitions);
 1456  
 
 1457  
     getPartitionStore().startIteration();
 1458  
 
 1459  
     final CompressionCodec codec =
 1460  
         new CompressionCodecFactory(getConfiguration())
 1461  
             .getCodec(new Path(
 1462  
                 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
 1463  
                     .get(getConfiguration())));
 1464  
 
 1465  
     long t0 = System.currentTimeMillis();
 1466  
 
 1467  0
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
 1468  
       @Override
 1469  
       public Callable<Void> newCallable(int callableId) {
 1470  0
         return new Callable<Void>() {
 1471  
 
 1472  
           @Override
 1473  
           public Void call() throws Exception {
 1474  
             while (true) {
 1475  0
               Partition<I, V, E> partition =
 1476  0
                   getPartitionStore().getNextPartition();
 1477  0
               if (partition == null) {
 1478  0
                 break;
 1479  
               }
 1480  0
               Path path =
 1481  0
                   createCheckpointFilePathSafe("_" + partition.getId() +
 1482  
                       CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
 1483  
 
 1484  0
               FSDataOutputStream uncompressedStream =
 1485  0
                   getFs().create(path);
 1486  
 
 1487  
 
 1488  0
               DataOutputStream stream = codec == null ? uncompressedStream :
 1489  
                   new DataOutputStream(
 1490  0
                       codec.createOutputStream(uncompressedStream));
 1491  
 
 1492  
 
 1493  0
               partition.write(stream);
 1494  
 
 1495  0
               getPartitionStore().putPartition(partition);
 1496  
 
 1497  0
               stream.close();
 1498  0
               uncompressedStream.close();
 1499  0
             }
 1500  0
             return null;
 1501  
           }
 1502  
 
 1503  
 
 1504  
         };
 1505  
       }
 1506  
     };
 1507  
 
 1508  
     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
 1509  
         "checkpoint-vertices-%d", getContext());
 1510  
 
 1511  
     LOG.info("Save checkpoint in " + (System.currentTimeMillis() - t0) +
 1512  
         " ms, using " + numThreads + " threads");
 1513  
   }
 1514  
 
 1515  
   /**
 1516  
    * Load saved partitions in multiple threads.
 1517  
    * @param superstep superstep to load
 1518  
    * @param partitions list of partitions to load
 1519  
    */
 1520  
   private void loadCheckpointVertices(final long superstep,
 1521  
                                       List<Integer> partitions) {
 1522  
     int numThreads = Math.min(
 1523  
         GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
 1524  
         partitions.size());
 1525  
 
 1526  
     final Queue<Integer> partitionIdQueue =
 1527  
         new ConcurrentLinkedQueue<>(partitions);
 1528  
 
 1529  
     final CompressionCodec codec =
 1530  
         new CompressionCodecFactory(getConfiguration())
 1531  
             .getCodec(new Path(
 1532  
                 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
 1533  
                     .get(getConfiguration())));
 1534  
 
 1535  
     long t0 = System.currentTimeMillis();
 1536  
 
 1537  0
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
 1538  
       @Override
 1539  
       public Callable<Void> newCallable(int callableId) {
 1540  0
         return new Callable<Void>() {
 1541  
 
 1542  
           @Override
 1543  
           public Void call() throws Exception {
 1544  0
             while (!partitionIdQueue.isEmpty()) {
 1545  0
               Integer partitionId = partitionIdQueue.poll();
 1546  0
               if (partitionId == null) {
 1547  0
                 break;
 1548  
               }
 1549  0
               Path path =
 1550  0
                   getSavedCheckpoint(superstep, "_" + partitionId +
 1551  
                       CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
 1552  
 
 1553  0
               FSDataInputStream compressedStream =
 1554  0
                   getFs().open(path);
 1555  
 
 1556  0
               DataInputStream stream = codec == null ? compressedStream :
 1557  
                   new DataInputStream(
 1558  0
                       codec.createInputStream(compressedStream));
 1559  
 
 1560  0
               Partition<I, V, E> partition =
 1561  0
                   getConfiguration().createPartition(partitionId, getContext());
 1562  
 
 1563  0
               partition.readFields(stream);
 1564  
 
 1565  0
               getPartitionStore().addPartition(partition);
 1566  
 
 1567  0
               stream.close();
 1568  0
             }
 1569  0
             return null;
 1570  
           }
 1571  
 
 1572  
         };
 1573  
       }
 1574  
     };
 1575  
 
 1576  
     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
 1577  
         "load-vertices-%d", getContext());
 1578  
 
 1579  
     LOG.info("Loaded checkpoint in " + (System.currentTimeMillis() - t0) +
 1580  
         " ms, using " + numThreads + " threads");
 1581  
   }
 1582  
 
 1583  
   @Override
 1584  
   public VertexEdgeCount loadCheckpoint(long superstep) {
 1585  
     Path metadataFilePath = getSavedCheckpoint(
 1586  
         superstep, CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
 1587  
 
 1588  
     Path checkpointFilePath = getSavedCheckpoint(
 1589  
         superstep, CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
 1590  
     // Algorithm:
 1591  
     // Examine all the partition owners and load the ones
 1592  
     // that match my hostname and id from the master designated checkpoint
 1593  
     // prefixes.
 1594  
     try {
 1595  
       DataInputStream metadataStream =
 1596  
           getFs().open(metadataFilePath);
 1597  
 
 1598  
       int partitions = metadataStream.readInt();
 1599  
       List<Integer> partitionIds = new ArrayList<>(partitions);
 1600  
       for (int i = 0; i < partitions; i++) {
 1601  
         int partitionId = metadataStream.readInt();
 1602  
         partitionIds.add(partitionId);
 1603  
       }
 1604  
 
 1605  
       loadCheckpointVertices(superstep, partitionIds);
 1606  
 
 1607  
       getContext().progress();
 1608  
 
 1609  
       metadataStream.close();
 1610  
 
 1611  
       DataInputStream checkpointStream =
 1612  
           getFs().open(checkpointFilePath);
 1613  
       workerContext.readFields(checkpointStream);
 1614  
 
 1615  
       // Load global stats and superstep classes
 1616  
       GlobalStats globalStats = new GlobalStats();
 1617  
       SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
 1618  
           getConfiguration());
 1619  
       String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
 1620  
           CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
 1621  
       DataInputStream finalizedStream =
 1622  
           getFs().open(new Path(finalizedCheckpointPath));
 1623  
       globalStats.readFields(finalizedStream);
 1624  
       superstepClasses.readFields(finalizedStream);
 1625  
       getConfiguration().updateSuperstepClasses(superstepClasses);
 1626  
       getServerData().resetMessageStores();
 1627  
 
 1628  
       // TODO: checkpointing messages along with vertices to avoid multiple
 1629  
       //       loads of a partition when out-of-core is enabled.
 1630  
       for (int i = 0; i < partitions; i++) {
 1631  
         int partitionId = checkpointStream.readInt();
 1632  
         getServerData().getCurrentMessageStore()
 1633  
             .readFieldsForPartition(checkpointStream, partitionId);
 1634  
       }
 1635  
 
 1636  
       List<Writable> w2wMessages = (List<Writable>) WritableUtils.readList(
 1637  
           checkpointStream);
 1638  
       getServerData().getCurrentWorkerToWorkerMessages().addAll(w2wMessages);
 1639  
 
 1640  
       checkpointStream.close();
 1641  
 
 1642  
       if (LOG.isInfoEnabled()) {
 1643  
         LOG.info("loadCheckpoint: Loaded " +
 1644  
             workerGraphPartitioner.getPartitionOwners().size() +
 1645  
             " total.");
 1646  
       }
 1647  
 
 1648  
       // Communication service needs to setup the connections prior to
 1649  
       // processing vertices
 1650  
 /*if[HADOOP_NON_SECURE]
 1651  
       workerClient.setup();
 1652  
 else[HADOOP_NON_SECURE]*/
 1653  
       workerClient.setup(getConfiguration().authenticate());
 1654  
 /*end[HADOOP_NON_SECURE]*/
 1655  
       return new VertexEdgeCount(globalStats.getVertexCount(),
 1656  
           globalStats.getEdgeCount(), 0);
 1657  
 
 1658  
     } catch (IOException e) {
 1659  
       throw new RuntimeException(
 1660  
           "loadCheckpoint: Failed for superstep=" + superstep, e);
 1661  
     }
 1662  
   }
 1663  
 
 1664  
   /**
 1665  
    * Send the worker partitions to their destination workers
 1666  
    *
 1667  
    * @param workerPartitionMap Map of worker info to the partitions stored
 1668  
    *        on this worker to be sent
 1669  
    */
 1670  
   private void sendWorkerPartitions(
 1671  
       Map<WorkerInfo, List<Integer>> workerPartitionMap) {
 1672  
     List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
 1673  
         new ArrayList<Entry<WorkerInfo, List<Integer>>>(
 1674  
             workerPartitionMap.entrySet());
 1675  
     Collections.shuffle(randomEntryList);
 1676  
     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
 1677  
         new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
 1678  
             getConfiguration(), this,
 1679  
             false /* useOneMessageToManyIdsEncoding */);
 1680  
     for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
 1681  
       randomEntryList) {
 1682  
       for (Integer partitionId : workerPartitionList.getValue()) {
 1683  
         Partition<I, V, E> partition =
 1684  
             getPartitionStore().removePartition(partitionId);
 1685  
         if (partition == null) {
 1686  
           throw new IllegalStateException(
 1687  
               "sendWorkerPartitions: Couldn't find partition " +
 1688  
                   partitionId + " to send to " +
 1689  
                   workerPartitionList.getKey());
 1690  
         }
 1691  
         if (LOG.isInfoEnabled()) {
 1692  
           LOG.info("sendWorkerPartitions: Sending worker " +
 1693  
               workerPartitionList.getKey() + " partition " +
 1694  
               partitionId);
 1695  
         }
 1696  
         workerClientRequestProcessor.sendPartitionRequest(
 1697  
             workerPartitionList.getKey(),
 1698  
             partition);
 1699  
       }
 1700  
     }
 1701  
 
 1702  
     try {
 1703  
       workerClientRequestProcessor.flush();
 1704  
       workerClient.waitAllRequests();
 1705  
     } catch (IOException e) {
 1706  
       throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
 1707  
     }
 1708  
     String myPartitionExchangeDonePath =
 1709  
         getPartitionExchangeWorkerPath(
 1710  
             getApplicationAttempt(), getSuperstep(), getWorkerInfo());
 1711  
     try {
 1712  
       getZkExt().createExt(myPartitionExchangeDonePath,
 1713  
           null,
 1714  
           Ids.OPEN_ACL_UNSAFE,
 1715  
           CreateMode.PERSISTENT,
 1716  
           true);
 1717  
     } catch (KeeperException e) {
 1718  
       throw new IllegalStateException(
 1719  
           "sendWorkerPartitions: KeeperException to create " +
 1720  
               myPartitionExchangeDonePath, e);
 1721  
     } catch (InterruptedException e) {
 1722  
       throw new IllegalStateException(
 1723  
           "sendWorkerPartitions: InterruptedException to create " +
 1724  
               myPartitionExchangeDonePath, e);
 1725  
     }
 1726  
     if (LOG.isInfoEnabled()) {
 1727  
       LOG.info("sendWorkerPartitions: Done sending all my partitions.");
 1728  
     }
 1729  
   }
 1730  
 
 1731  
   @Override
 1732  
   public final void exchangeVertexPartitions(
 1733  
       Collection<? extends PartitionOwner> masterSetPartitionOwners) {
 1734  
     // 1. Fix the addresses of the partition ids if they have changed.
 1735  
     // 2. Send all the partitions to their destination workers in a random
 1736  
     //    fashion.
 1737  
     // 3. Notify completion with a ZooKeeper stamp
 1738  
     // 4. Wait for all my dependencies to be done (if any)
 1739  
     // 5. Add the partitions to myself.
 1740  
     PartitionExchange partitionExchange =
 1741  
         workerGraphPartitioner.updatePartitionOwners(
 1742  
             getWorkerInfo(), masterSetPartitionOwners);
 1743  
     workerClient.openConnections();
 1744  
 
 1745  
     Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
 1746  
         partitionExchange.getSendWorkerPartitionMap();
 1747  
     if (!getPartitionStore().isEmpty()) {
 1748  
       sendWorkerPartitions(sendWorkerPartitionMap);
 1749  
     }
 1750  
 
 1751  
     Set<WorkerInfo> myDependencyWorkerSet =
 1752  
         partitionExchange.getMyDependencyWorkerSet();
 1753  
     Set<String> workerIdSet = new HashSet<String>();
 1754  
     for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
 1755  
       if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
 1756  
         throw new IllegalStateException(
 1757  
             "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
 1758  
       }
 1759  
     }
 1760  
     if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
 1761  
       if (LOG.isInfoEnabled()) {
 1762  
         LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
 1763  
             "exiting early");
 1764  
       }
 1765  
       return;
 1766  
     }
 1767  
 
 1768  
     String vertexExchangePath =
 1769  
         getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
 1770  
     List<String> workerDoneList;
 1771  
     try {
 1772  
       while (true) {
 1773  
         workerDoneList = getZkExt().getChildrenExt(
 1774  
             vertexExchangePath, true, false, false);
 1775  
         workerIdSet.removeAll(workerDoneList);
 1776  
         if (workerIdSet.isEmpty()) {
 1777  
           break;
 1778  
         }
 1779  
         if (LOG.isInfoEnabled()) {
 1780  
           LOG.info("exchangeVertexPartitions: Waiting for workers " +
 1781  
               workerIdSet);
 1782  
         }
 1783  
         getPartitionExchangeChildrenChangedEvent().waitForTimeoutOrFail(
 1784  
             GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
 1785  
                 getConfiguration()));
 1786  
         getPartitionExchangeChildrenChangedEvent().reset();
 1787  
       }
 1788  
     } catch (KeeperException | InterruptedException e) {
 1789  
       throw new RuntimeException(
 1790  
           "exchangeVertexPartitions: Got runtime exception", e);
 1791  
     }
 1792  
 
 1793  
     if (LOG.isInfoEnabled()) {
 1794  
       LOG.info("exchangeVertexPartitions: Done with exchange.");
 1795  
     }
 1796  
   }
 1797  
 
 1798  
   /**
 1799  
    * Get event when the state of a partition exchange has changed.
 1800  
    *
 1801  
    * @return Event to check.
 1802  
    */
 1803  
   public final BspEvent getPartitionExchangeChildrenChangedEvent() {
 1804  
     return partitionExchangeChildrenChanged;
 1805  
   }
 1806  
 
 1807  
   @Override
 1808  
   protected boolean processEvent(WatchedEvent event) {
 1809  
     boolean foundEvent = false;
 1810  
     if (event.getPath().startsWith(masterJobStatePath) &&
 1811  
         (event.getType() == EventType.NodeChildrenChanged)) {
 1812  
       if (LOG.isInfoEnabled()) {
 1813  
         LOG.info("processEvent: Job state changed, checking " +
 1814  
             "to see if it needs to restart");
 1815  
       }
 1816  
       JSONObject jsonObj = getJobState();
 1817  
       // in YARN, we have to manually commit our own output in 2 stages that we
 1818  
       // do not have to do in Hadoop-based Giraph. So jsonObj can be null.
 1819  
       if (getConfiguration().isPureYarnJob() && null == jsonObj) {
 1820  
         LOG.error("BspServiceWorker#getJobState() came back NULL.");
 1821  
         return false; // the event has been processed.
 1822  
       }
 1823  
       try {
 1824  
         if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
 1825  
             ApplicationState.START_SUPERSTEP) &&
 1826  
             jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
 1827  
             getApplicationAttempt()) {
 1828  
           LOG.fatal("processEvent: Worker will restart " +
 1829  
               "from command - " + jsonObj.toString());
 1830  
           System.exit(-1);
 1831  
         }
 1832  
       } catch (JSONException e) {
 1833  
         throw new RuntimeException(
 1834  
             "processEvent: Couldn't properly get job state from " +
 1835  
                 jsonObj.toString());
 1836  
       }
 1837  
       foundEvent = true;
 1838  
     } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
 1839  
         event.getType() == EventType.NodeChildrenChanged) {
 1840  
       if (LOG.isInfoEnabled()) {
 1841  
         LOG.info("processEvent : partitionExchangeChildrenChanged " +
 1842  
             "(at least one worker is done sending partitions)");
 1843  
       }
 1844  
       partitionExchangeChildrenChanged.signal();
 1845  
       foundEvent = true;
 1846  
     } else if (event.getPath().contains(MEMORY_OBSERVER_DIR) &&
 1847  
         event.getType() == EventType.NodeChildrenChanged) {
 1848  
       memoryObserver.callGc();
 1849  
       foundEvent = true;
 1850  
     }
 1851  
 
 1852  
     return foundEvent;
 1853  
   }
 1854  
 
 1855  
   @Override
 1856  
   public WorkerInfo getWorkerInfo() {
 1857  
     return workerInfo;
 1858  
   }
 1859  
 
 1860  
   @Override
 1861  
   public PartitionStore<I, V, E> getPartitionStore() {
 1862  
     return getServerData().getPartitionStore();
 1863  
   }
 1864  
 
 1865  
   @Override
 1866  
   public PartitionOwner getVertexPartitionOwner(I vertexId) {
 1867  
     return workerGraphPartitioner.getPartitionOwner(vertexId);
 1868  
   }
 1869  
 
 1870  
   @Override
 1871  
   public Iterable<? extends PartitionOwner> getPartitionOwners() {
 1872  
     return workerGraphPartitioner.getPartitionOwners();
 1873  
   }
 1874  
 
 1875  
   @Override
 1876  
   public int getPartitionId(I vertexId) {
 1877  
     PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
 1878  
     return partitionOwner.getPartitionId();
 1879  
   }
 1880  
 
 1881  
   @Override
 1882  
   public boolean hasPartition(Integer partitionId) {
 1883  
     return getPartitionStore().hasPartition(partitionId);
 1884  
   }
 1885  
 
 1886  
   @Override
 1887  
   public Iterable<Integer> getPartitionIds() {
 1888  
     return getPartitionStore().getPartitionIds();
 1889  
   }
 1890  
 
 1891  
   @Override
 1892  
   public long getPartitionVertexCount(Integer partitionId) {
 1893  
     return getPartitionStore().getPartitionVertexCount(partitionId);
 1894  
   }
 1895  
 
 1896  
   @Override
 1897  
   public void startIteration() {
 1898  
     getPartitionStore().startIteration();
 1899  
   }
 1900  
 
 1901  
   @Override
 1902  
   public Partition getNextPartition() {
 1903  
     return getPartitionStore().getNextPartition();
 1904  
   }
 1905  
 
 1906  
   @Override
 1907  
   public void putPartition(Partition partition) {
 1908  
     getPartitionStore().putPartition(partition);
 1909  
   }
 1910  
 
 1911  
   @Override
 1912  
   public ServerData<I, V, E> getServerData() {
 1913  
     return workerServer.getServerData();
 1914  
   }
 1915  
 
 1916  
 
 1917  
   @Override
 1918  
   public WorkerAggregatorHandler getAggregatorHandler() {
 1919  
     return globalCommHandler;
 1920  
   }
 1921  
 
 1922  
   @Override
 1923  
   public void prepareSuperstep() {
 1924  
     if (getSuperstep() != INPUT_SUPERSTEP) {
 1925  
       globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
 1926  
     }
 1927  
   }
 1928  
 
 1929  
   @Override
 1930  
   public SuperstepOutput<I, V, E> getSuperstepOutput() {
 1931  
     return superstepOutput;
 1932  
   }
 1933  
 
 1934  
   @Override
 1935  
   public GlobalStats getGlobalStats() {
 1936  
     GlobalStats globalStats = new GlobalStats();
 1937  
     if (getSuperstep() > Math.max(INPUT_SUPERSTEP, getRestartedSuperstep())) {
 1938  
       String superstepFinishedNode =
 1939  
           getSuperstepFinishedPath(getApplicationAttempt(),
 1940  
               getSuperstep() - 1);
 1941  
       WritableUtils.readFieldsFromZnode(
 1942  
           getZkExt(), superstepFinishedNode, false, null,
 1943  
           globalStats);
 1944  
     }
 1945  
     return globalStats;
 1946  
   }
 1947  
 
 1948  
   @Override
 1949  
   public WorkerInputSplitsHandler getInputSplitsHandler() {
 1950  
     return inputSplitsHandler;
 1951  
   }
 1952  
 
 1953  
   @Override
 1954  
   public void addressesAndPartitionsReceived(
 1955  
       AddressesAndPartitionsWritable addressesAndPartitions) {
 1956  
     addressesAndPartitionsHolder.offer(addressesAndPartitions);
 1957  
   }
 1958  
 }