Coverage Report - org.apache.giraph.worker.WorkerProgressWriter
 
Classes in this File Line Coverage Branch Coverage Complexity
WorkerProgressWriter
0%
0/16
N/A
1.5
WorkerProgressWriter$1
0%
0/8
0%
0/4
1.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.worker;
 20  
 
 21  
 import org.apache.giraph.job.JobProgressTracker;
 22  
 import org.apache.giraph.utils.ThreadUtils;
 23  
 import org.apache.log4j.Logger;
 24  
 
 25  
 /**
 26  
  * Class which periodically writes worker's progress to zookeeper
 27  
  */
 28  0
 public class WorkerProgressWriter {
 29  
   /** Class logger */
 30  0
   private static final Logger LOG =
 31  0
       Logger.getLogger(WorkerProgressWriter.class);
 32  
   /** How often to update worker's progress */
 33  
   private static final int WRITE_UPDATE_PERIOD_MILLISECONDS = 10 * 1000;
 34  
 
 35  
   /** Job progress tracker */
 36  
   private final JobProgressTracker jobProgressTracker;
 37  
   /** Thread which writes worker's progress */
 38  
   private final Thread writerThread;
 39  
   /** Whether worker finished application */
 40  0
   private volatile boolean finished = false;
 41  
 
 42  
   /**
 43  
    * Constructor, starts separate thread to periodically update worker's
 44  
    * progress
 45  
    *
 46  
    * @param jobProgressTracker JobProgressTracker to report job progress to
 47  
    */
 48  0
   public WorkerProgressWriter(JobProgressTracker jobProgressTracker) {
 49  0
     this.jobProgressTracker = jobProgressTracker;
 50  0
     writerThread = ThreadUtils.startThread(new Runnable() {
 51  
       @Override
 52  
       public void run() {
 53  0
         while (!finished) {
 54  0
           updateAndSendProgress();
 55  0
           double factor = 1 + Math.random();
 56  0
           if (!ThreadUtils.trySleep(
 57  
               (long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor))) {
 58  0
             break;
 59  
           }
 60  0
         }
 61  0
       }
 62  
     }, "workerProgressThread");
 63  0
   }
 64  
 
 65  
   /**
 66  
    * Update worker progress and send it
 67  
    */
 68  
   private void updateAndSendProgress() {
 69  0
     WorkerProgress.get().updateMemory();
 70  0
     jobProgressTracker.updateProgress(WorkerProgress.get());
 71  0
   }
 72  
 
 73  
   /**
 74  
    * Stop the thread which writes worker's progress
 75  
    */
 76  
   public void stop() throws InterruptedException {
 77  0
     finished = true;
 78  0
     writerThread.interrupt();
 79  0
     writerThread.join();
 80  0
     updateAndSendProgress();
 81  0
   }
 82  
 }