Coverage Report - org.apache.giraph.master.MasterThread
 
Classes in this File Line Coverage Branch Coverage Complexity
MasterThread
0%
0/86
0%
0/34
10.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.master;
 20  
 
 21  
 import org.apache.giraph.bsp.ApplicationState;
 22  
 import org.apache.giraph.bsp.BspService;
 23  
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 24  
 import org.apache.giraph.bsp.SuperstepState;
 25  
 import org.apache.giraph.counters.GiraphTimers;
 26  
 import org.apache.giraph.graph.Computation;
 27  
 import org.apache.giraph.metrics.GiraphMetrics;
 28  
 import org.apache.hadoop.io.Writable;
 29  
 import org.apache.hadoop.io.WritableComparable;
 30  
 import org.apache.hadoop.mapreduce.Mapper.Context;
 31  
 import org.apache.log4j.Logger;
 32  
 
 33  
 import java.util.Map;
 34  
 import java.util.Map.Entry;
 35  
 import java.util.TreeMap;
 36  
 
 37  
 import static org.apache.giraph.conf.GiraphConstants.SPLIT_MASTER_WORKER;
 38  
 import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;
 39  
 
 40  
 /**
 41  
  * Master thread that will coordinate the activities of the tasks.  It runs
 42  
  * on all task processes, however, will only execute its algorithm if it knows
 43  
  * it is the "leader" from ZooKeeper.
 44  
  *
 45  
  * @param <I> Vertex id
 46  
  * @param <V> Vertex value
 47  
  * @param <E> Edge value
 48  
  */
 49  
 @SuppressWarnings("rawtypes")
 50  
 public class MasterThread<I extends WritableComparable, V extends Writable,
 51  
     E extends Writable> extends Thread {
 52  
   /** Counter group name for the Giraph timers */
 53  
   public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
 54  
   /** Class logger */
 55  0
   private static final Logger LOG = Logger.getLogger(MasterThread.class);
 56  
   /** Reference to shared BspService */
 57  0
   private CentralizedServiceMaster<I, V, E> bspServiceMaster = null;
 58  
   /** Context (for counters) */
 59  
   private final Context context;
 60  
   /** Use superstep counters? */
 61  
   private final boolean superstepCounterOn;
 62  
   /** Are master and worker split or not? */
 63  
   private final boolean splitMasterWorker;
 64  
   /** Setup seconds */
 65  0
   private double setupSecs = 0d;
 66  
   /** Superstep timer (in seconds) map */
 67  0
   private final Map<Long, Double> superstepSecsMap =
 68  
       new TreeMap<Long, Double>();
 69  
 
 70  
   /**
 71  
    * Constructor.
 72  
    *
 73  
    * @param bspServiceMaster Master that already exists and setup() has
 74  
    *        been called.
 75  
    * @param context Context from the Mapper.
 76  
    */
 77  
   public MasterThread(CentralizedServiceMaster<I, V, E> bspServiceMaster,
 78  
       Context context) {
 79  0
     super(MasterThread.class.getName());
 80  0
     this.bspServiceMaster = bspServiceMaster;
 81  0
     this.context = context;
 82  0
     GiraphTimers.init(context);
 83  0
     superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration());
 84  0
     splitMasterWorker = SPLIT_MASTER_WORKER.get(context.getConfiguration());
 85  0
   }
 86  
 
 87  
   /**
 88  
    * The master algorithm.  The algorithm should be able to withstand
 89  
    * failures and resume as necessary since the master may switch during a
 90  
    * job.
 91  
    */
 92  
   @Override
 93  
   public void run() {
 94  
     // Algorithm:
 95  
     // 1. Become the master
 96  
     // 2. If desired, restart from a manual checkpoint
 97  
     // 3. Run all supersteps until complete
 98  
     try {
 99  0
       long startMillis = System.currentTimeMillis();
 100  0
       long initializeMillis = 0;
 101  0
       long endMillis = 0;
 102  0
       bspServiceMaster.setup();
 103  0
       SuperstepState superstepState = SuperstepState.INITIAL;
 104  
 
 105  0
       if (bspServiceMaster.becomeMaster()) {
 106  
         // First call to checkWorkers waits for all pending resources.
 107  
         // If these resources are still available at subsequent calls it just
 108  
         // reads zookeeper for the list of healthy workers.
 109  0
         bspServiceMaster.checkWorkers();
 110  0
         initializeMillis = System.currentTimeMillis();
 111  0
         GiraphTimers.getInstance().getInitializeMs().increment(
 112  
             initializeMillis - startMillis);
 113  
         // Attempt to create InputSplits if necessary. Bail out if that fails.
 114  0
         if (bspServiceMaster.getRestartedSuperstep() !=
 115  
             BspService.UNSET_SUPERSTEP ||
 116  0
             (bspServiceMaster.createMappingInputSplits() != -1 &&
 117  0
                 bspServiceMaster.createVertexInputSplits() != -1 &&
 118  0
                 bspServiceMaster.createEdgeInputSplits() != -1)) {
 119  0
           long setupMillis = System.currentTimeMillis() - initializeMillis;
 120  0
           GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
 121  0
           setupSecs = setupMillis / 1000.0d;
 122  0
           while (!superstepState.isExecutionComplete()) {
 123  0
             long startSuperstepMillis = System.currentTimeMillis();
 124  0
             long cachedSuperstep = bspServiceMaster.getSuperstep();
 125  
             // If master and worker are running together, worker will call reset
 126  0
             if (splitMasterWorker) {
 127  0
               GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
 128  
             }
 129  0
             Class<? extends Computation> computationClass =
 130  0
                 bspServiceMaster.getMasterCompute().getComputation();
 131  0
             superstepState = bspServiceMaster.coordinateSuperstep();
 132  0
             long superstepMillis = System.currentTimeMillis() -
 133  
                 startSuperstepMillis;
 134  0
             superstepSecsMap.put(cachedSuperstep,
 135  0
                 superstepMillis / 1000.0d);
 136  0
             if (LOG.isInfoEnabled()) {
 137  0
               LOG.info("masterThread: Coordination of superstep " +
 138  
                   cachedSuperstep + " took " +
 139  
                   superstepMillis / 1000.0d +
 140  
                   " seconds ended with state " + superstepState +
 141  
                   " and is now on superstep " +
 142  0
                   bspServiceMaster.getSuperstep());
 143  
             }
 144  0
             if (superstepCounterOn) {
 145  0
               String computationName = (computationClass == null) ?
 146  0
                   null : computationClass.getSimpleName();
 147  0
               GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep,
 148  0
                   computationName).increment(superstepMillis);
 149  
             }
 150  0
             bspServiceMaster.addGiraphTimersAndSendCounters(cachedSuperstep);
 151  
 
 152  0
             bspServiceMaster.postSuperstep();
 153  
 
 154  
             // If a worker failed, restart from a known good superstep
 155  0
             if (superstepState == SuperstepState.WORKER_FAILURE) {
 156  0
               bspServiceMaster.restartFromCheckpoint(
 157  0
                   bspServiceMaster.getLastGoodCheckpoint());
 158  
             }
 159  0
             endMillis = System.currentTimeMillis();
 160  0
           }
 161  0
           bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
 162  
         }
 163  
       }
 164  0
       bspServiceMaster.cleanup(superstepState);
 165  0
       if (!superstepSecsMap.isEmpty()) {
 166  0
         GiraphTimers.getInstance().getShutdownMs().
 167  0
           increment(System.currentTimeMillis() - endMillis);
 168  0
         if (LOG.isInfoEnabled()) {
 169  0
           LOG.info("setup: Took " + setupSecs + " seconds.");
 170  
         }
 171  0
         for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
 172  0
           if (LOG.isInfoEnabled()) {
 173  0
             if (entry.getKey().longValue() ==
 174  
                 BspService.INPUT_SUPERSTEP) {
 175  0
               LOG.info("input superstep: Took " +
 176  0
                   entry.getValue() + " seconds.");
 177  
             } else {
 178  0
               LOG.info("superstep " + entry.getKey() + ": Took " +
 179  0
                   entry.getValue() + " seconds.");
 180  
             }
 181  
           }
 182  0
           context.progress();
 183  0
         }
 184  0
         if (LOG.isInfoEnabled()) {
 185  0
           LOG.info("shutdown: Took " +
 186  0
               (System.currentTimeMillis() - endMillis) /
 187  
               1000.0d + " seconds.");
 188  0
           LOG.info("total: Took " +
 189  0
               ((System.currentTimeMillis() - initializeMillis) /
 190  
               1000.0d) + " seconds.");
 191  
         }
 192  0
         GiraphTimers.getInstance().getTotalMs().
 193  0
           increment(System.currentTimeMillis() - initializeMillis);
 194  
       }
 195  0
       bspServiceMaster.addGiraphTimersAndSendCounters(
 196  0
               bspServiceMaster.getSuperstep());
 197  0
       bspServiceMaster.postApplication();
 198  
       // CHECKSTYLE: stop IllegalCatchCheck
 199  0
     } catch (Exception e) {
 200  
       // CHECKSTYLE: resume IllegalCatchCheck
 201  0
       LOG.error("masterThread: Master algorithm failed with " +
 202  0
           e.getClass().getSimpleName(), e);
 203  0
       bspServiceMaster.failureCleanup(e);
 204  0
       throw new IllegalStateException(e);
 205  0
     }
 206  0
   }
 207  
 }