Coverage Report - org.apache.giraph.job.DefaultJobProgressTrackerService
 
Classes in this File Line Coverage Branch Coverage Complexity
DefaultJobProgressTrackerService
0%
0/75
0%
0/24
0
DefaultJobProgressTrackerService$1
0%
0/29
0%
0/20
0
DefaultJobProgressTrackerService$2
0%
0/4
0%
0/2
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.job;
 20  
 
 21  
 import org.apache.giraph.conf.GiraphConfiguration;
 22  
 import org.apache.giraph.conf.GiraphConstants;
 23  
 import org.apache.giraph.conf.IntConfOption;
 24  
 import org.apache.giraph.counters.CustomCounter;
 25  
 import org.apache.giraph.counters.GiraphCountersThriftStruct;
 26  
 import org.apache.giraph.master.MasterProgress;
 27  
 import org.apache.giraph.utils.ThreadUtils;
 28  
 import org.apache.giraph.worker.WorkerProgress;
 29  
 import org.apache.hadoop.mapreduce.Job;
 30  
 import org.apache.log4j.Logger;
 31  
 
 32  
 import java.io.IOException;
 33  
 import java.util.Collections;
 34  
 import java.util.List;
 35  
 import java.util.Map;
 36  
 import java.util.concurrent.ConcurrentHashMap;
 37  
 import java.util.concurrent.atomic.AtomicReference;
 38  
 
 39  
 /**
 40  
  * Default implementation of JobProgressTrackerService
 41  
  */
 42  0
 public class DefaultJobProgressTrackerService
 43  
     implements JobProgressTrackerService {
 44  
   /** Max time job is allowed to not make progress before getting killed */
 45  0
   public static final IntConfOption MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS =
 46  
       new IntConfOption(
 47  
           "giraph.maxAllowedTimeWithoutProgressMs",
 48  
           3 * 60 * 60 * 1000, // Allow 3h
 49  
           "Max time job is allowed to not make progress before getting killed");
 50  
   /** Class logger */
 51  0
   private static final Logger LOG =
 52  0
       Logger.getLogger(JobProgressTrackerService.class);
 53  
   /** How often to print job's progress */
 54  
   private static final int UPDATE_MILLISECONDS = 10 * 1000;
 55  
 
 56  
   /** Configuration */
 57  
   private GiraphConfiguration conf;
 58  
   /** Giraph job callback */
 59  
   private GiraphJobObserver jobObserver;
 60  
   /** Thread which periodically writes job's progress */
 61  
   private Thread writerThread;
 62  
   /** Whether application is finished */
 63  0
   private volatile boolean finished = false;
 64  
   /** Number of mappers which the job got */
 65  
   private int mappersStarted;
 66  
   /** Last time number of mappers started was logged */
 67  
   private long lastTimeMappersStartedLogged;
 68  
   /** Map of worker progresses */
 69  0
   private final Map<Integer, WorkerProgress> workerProgresses =
 70  
       new ConcurrentHashMap<>();
 71  
   /** Master progress */
 72  0
   private final AtomicReference<MasterProgress> masterProgress =
 73  
       new AtomicReference<>(new MasterProgress());
 74  
   /** Job */
 75  
   private Job job;
 76  
 
 77  
   @Override
 78  
   public void init(GiraphConfiguration conf, GiraphJobObserver jobObserver) {
 79  0
     this.conf = conf;
 80  0
     this.jobObserver = jobObserver;
 81  
 
 82  0
     if (LOG.isInfoEnabled()) {
 83  0
       LOG.info("Waiting for job to start... (this may take a minute)");
 84  
     }
 85  0
     startWriterThread();
 86  0
   }
 87  
 
 88  
   /**
 89  
    * Start the thread which writes progress periodically
 90  
    */
 91  
   private void startWriterThread() {
 92  0
     writerThread = ThreadUtils.startThread(new Runnable() {
 93  
       @Override
 94  
       public void run() {
 95  0
         long lastTimeProgressChanged = -1;
 96  0
         long maxAllowedTimeWithoutProgress =
 97  0
             MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS.get(conf);
 98  0
         CombinedWorkerProgress lastProgress = null;
 99  0
         while (!finished) {
 100  0
           if (mappersStarted == conf.getMaxMappers() &&
 101  0
               !workerProgresses.isEmpty()) {
 102  
             // Combine and log
 103  0
             CombinedWorkerProgress combinedWorkerProgress =
 104  0
                 new CombinedWorkerProgress(workerProgresses.values(),
 105  0
                     masterProgress.get(), conf);
 106  0
             if (LOG.isInfoEnabled()) {
 107  0
               LOG.info(combinedWorkerProgress.toString());
 108  
             }
 109  
             // Check if application is done
 110  0
             if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
 111  0
               break;
 112  
             }
 113  
 
 114  0
             if (!canFinishInTime(conf, job, combinedWorkerProgress)) {
 115  0
               killJobWithMessage("Killing the job because it won't " +
 116  
                 "complete in max allotted time: " +
 117  0
                 GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf) / 1000 +
 118  
                 "s");
 119  
             }
 120  
 
 121  0
             if (lastProgress == null ||
 122  0
                 combinedWorkerProgress.madeProgressFrom(lastProgress)) {
 123  0
               lastProgress = combinedWorkerProgress;
 124  0
               lastTimeProgressChanged = System.currentTimeMillis();
 125  0
             } else if (lastTimeProgressChanged +
 126  0
                 maxAllowedTimeWithoutProgress < System.currentTimeMillis()) {
 127  
               // Job didn't make progress in too long, killing it
 128  0
               killJobWithMessage(
 129  
                   "Killing the job because it didn't make progress for " +
 130  
                       maxAllowedTimeWithoutProgress / 1000 + "s");
 131  0
               break;
 132  
             }
 133  
           }
 134  0
           if (!ThreadUtils.trySleep(UPDATE_MILLISECONDS)) {
 135  0
             break;
 136  
           }
 137  
         }
 138  0
       }
 139  
     }, "progress-writer");
 140  0
   }
 141  
 
 142  
   /**
 143  
    * Determine if the job will finish in allotted time
 144  
    * @param conf Giraph configuration
 145  
    * @param job Job
 146  
    * @param progress Combined worker progress
 147  
    * @return true it the job can finish in allotted time, false otherwise
 148  
    */
 149  
   protected boolean canFinishInTime(GiraphConfiguration conf, Job job,
 150  
       CombinedWorkerProgress progress) {
 151  
     // No defaut implementation.
 152  0
     return true;
 153  
   }
 154  
 
 155  
   /**
 156  
    * Kill job with message describing why it's being killed
 157  
    *
 158  
    * @param message Message describing why job is being killed
 159  
    * @return True iff job was killed successfully, false if job was already
 160  
    * done or kill failed
 161  
    */
 162  
   protected boolean killJobWithMessage(String message) {
 163  
     try {
 164  0
       if (job.isComplete()) {
 165  0
         LOG.info("Job " + job.getJobID() + " is already done");
 166  0
         return false;
 167  
       } else {
 168  0
         LOG.error(message);
 169  0
         job.killJob();
 170  0
         return true;
 171  
       }
 172  0
     } catch (IOException e) {
 173  0
       LOG.error("Failed to kill the job", e);
 174  0
       return false;
 175  
     }
 176  
   }
 177  
 
 178  
   @Override
 179  
   public void setJob(Job job) {
 180  0
     this.job = job;
 181  0
   }
 182  
 
 183  
   /**
 184  
    * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS
 185  
    * and potentially start a thread which will kill the job after this time
 186  
    */
 187  
   protected void jobGotAllMappers() {
 188  0
     jobObserver.jobGotAllMappers(job);
 189  0
     final long maxAllowedJobTimeMs =
 190  0
         GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
 191  0
     if (maxAllowedJobTimeMs > 0) {
 192  
       // Start a thread which will kill the job if running for too long
 193  0
       ThreadUtils.startThread(new Runnable() {
 194  
         @Override
 195  
         public void run() {
 196  0
           if (ThreadUtils.trySleep(maxAllowedJobTimeMs)) {
 197  0
             killJobWithMessage("Killing the job because it took longer than " +
 198  
                 maxAllowedJobTimeMs + " milliseconds");
 199  
           }
 200  0
         }
 201  
       }, "job-runtime-observer");
 202  
     }
 203  0
   }
 204  
 
 205  
   @Override
 206  
   public synchronized void mapperStarted() {
 207  0
     mappersStarted++;
 208  0
     if (LOG.isInfoEnabled()) {
 209  0
       if (mappersStarted == conf.getMaxMappers()) {
 210  0
         LOG.info("Got all " + mappersStarted + " mappers");
 211  0
         jobGotAllMappers();
 212  
       } else {
 213  0
         if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
 214  
             UPDATE_MILLISECONDS) {
 215  0
           lastTimeMappersStartedLogged = System.currentTimeMillis();
 216  0
           LOG.info("Got " + mappersStarted + " but needs " +
 217  0
               conf.getMaxMappers() + " mappers");
 218  
         }
 219  
       }
 220  
     }
 221  0
   }
 222  
 
 223  
   @Override
 224  
   public void logInfo(String logLine) {
 225  0
     if (LOG.isInfoEnabled()) {
 226  0
       LOG.info(logLine);
 227  
     }
 228  0
   }
 229  
 
 230  
   @Override
 231  
   public void
 232  
   logError(String logLine, byte [] exByteArray) {
 233  0
     LOG.error(logLine);
 234  0
   }
 235  
 
 236  
   @Override
 237  
   public void logFailure(String reason) {
 238  0
     LOG.fatal(reason);
 239  0
     finished = true;
 240  0
     writerThread.interrupt();
 241  0
   }
 242  
 
 243  
   @Override
 244  
   public void updateProgress(WorkerProgress workerProgress) {
 245  0
     workerProgresses.put(workerProgress.getTaskId(), workerProgress);
 246  0
   }
 247  
 
 248  
   @Override
 249  
   public void updateMasterProgress(MasterProgress masterProgress) {
 250  0
     this.masterProgress.set(masterProgress);
 251  0
   }
 252  
 
 253  
   @Override
 254  
   public void sendMasterCounters(GiraphCountersThriftStruct giraphCounters) {
 255  0
     if (LOG.isInfoEnabled()) {
 256  0
       List<CustomCounter> counterList = giraphCounters.getCounters();
 257  0
       Collections.sort(counterList);
 258  0
       for (CustomCounter customCounter : counterList) {
 259  0
         LOG.info(String.format("%s: %s: %d%n", customCounter.getGroupName(),
 260  0
               customCounter.getCounterName(), customCounter.getValue()));
 261  0
       }
 262  
     }
 263  0
   }
 264  
 
 265  
   @Override
 266  
   public void stop(boolean succeeded) {
 267  0
     finished = true;
 268  0
     writerThread.interrupt();
 269  0
     if (LOG.isInfoEnabled()) {
 270  0
       LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
 271  
           ", cleaning up...");
 272  
     }
 273  0
   }
 274  
 
 275  
   /**
 276  
    * Create job progress server on job client if enabled in configuration.
 277  
    *
 278  
    * @param conf        Configuration
 279  
    * @param jobObserver Giraph job callbacks
 280  
    * @return JobProgressTrackerService
 281  
    */
 282  
   public static JobProgressTrackerService createJobProgressTrackerService(
 283  
       GiraphConfiguration conf, GiraphJobObserver jobObserver) {
 284  0
     if (!conf.trackJobProgressOnClient()) {
 285  0
       return null;
 286  
     }
 287  
 
 288  0
     JobProgressTrackerService jobProgressTrackerService =
 289  0
         GiraphConstants.JOB_PROGRESS_TRACKER_SERVICE_CLASS.newInstance(conf);
 290  0
     jobProgressTrackerService.init(conf, jobObserver);
 291  0
     return jobProgressTrackerService;
 292  
   }
 293  
 }