Coverage Report - org.apache.giraph.graph.RetryableJobProgressTrackerClient
 
Classes in this File Line Coverage Branch Coverage Complexity
RetryableJobProgressTrackerClient
0%
0/76
0%
0/12
0
RetryableJobProgressTrackerClient$1
0%
0/3
N/A
0
RetryableJobProgressTrackerClient$2
0%
0/3
N/A
0
RetryableJobProgressTrackerClient$3
0%
0/3
N/A
0
RetryableJobProgressTrackerClient$4
0%
0/3
N/A
0
RetryableJobProgressTrackerClient$5
0%
0/3
N/A
0
RetryableJobProgressTrackerClient$6
0%
0/3
N/A
0
RetryableJobProgressTrackerClient$7
0%
0/3
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.graph;
 20  
 
 21  
 import org.apache.giraph.conf.GiraphConfiguration;
 22  
 import org.apache.giraph.conf.IntConfOption;
 23  
 import org.apache.giraph.counters.GiraphCountersThriftStruct;
 24  
 import org.apache.giraph.job.ClientThriftServer;
 25  
 import org.apache.giraph.job.JobProgressTracker;
 26  
 import org.apache.giraph.master.MasterProgress;
 27  
 import org.apache.giraph.utils.ThreadUtils;
 28  
 import org.apache.giraph.worker.WorkerProgress;
 29  
 import org.apache.log4j.Logger;
 30  
 
 31  
 import com.facebook.nifty.client.FramedClientConnector;
 32  
 import com.facebook.nifty.client.NettyClientConfigBuilder;
 33  
 import com.facebook.nifty.client.NiftyClient;
 34  
 import com.facebook.swift.codec.ThriftCodec;
 35  
 import com.facebook.swift.codec.ThriftCodecManager;
 36  
 import com.facebook.swift.service.RuntimeTTransportException;
 37  
 import com.facebook.swift.service.ThriftClientEventHandler;
 38  
 import com.facebook.swift.service.ThriftClientManager;
 39  
 import com.google.common.collect.ImmutableSet;
 40  
 import com.google.common.io.Closeables;
 41  
 
 42  
 import java.io.IOException;
 43  
 import java.net.InetSocketAddress;
 44  
 import java.util.concurrent.ExecutionException;
 45  
 import java.util.concurrent.RejectedExecutionException;
 46  
 
 47  
 /**
 48  
  * Wrapper around JobProgressTracker which retries to connect and swallows
 49  
  * exceptions so app wouldn't crash if something goes wrong with progress
 50  
  * reports.
 51  
  */
 52  0
 public class RetryableJobProgressTrackerClient
 53  
     implements JobProgressTrackerClient {
 54  
   /** Conf option for number of retries */
 55  0
   public static final IntConfOption RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES =
 56  
     new IntConfOption("giraph.job.progress.client.num.retries", 1,
 57  
       "Number of times to retry a failed operation");
 58  
   /** Conf option for wait time between retries */
 59  
   public static final IntConfOption
 60  0
     RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS =
 61  
     new IntConfOption("giraph.job.progress.client.retries.wait", 1000,
 62  
       "Time (msec) to wait between retries");
 63  
   /** Class logger */
 64  0
   private static final Logger LOG =
 65  0
       Logger.getLogger(RetryableJobProgressTrackerClient.class);
 66  
   /** Configuration */
 67  
   private GiraphConfiguration conf;
 68  
   /** Thrift client manager to use to connect to job progress tracker */
 69  
   private ThriftClientManager clientManager;
 70  
   /** Job progress tracker */
 71  
   private JobProgressTracker jobProgressTracker;
 72  
   /** Cached value for number of retries */
 73  
   private int numRetries;
 74  
   /** Cached value for wait time between retries */
 75  
   private int retryWaitMsec;
 76  
 
 77  
   /**
 78  
    * Default constructor. Typically once an instance is created it should be
 79  
    * initialized by calling {@link #init(GiraphConfiguration)}.
 80  
    */
 81  0
   public RetryableJobProgressTrackerClient() {
 82  0
   }
 83  
 
 84  
   /**
 85  
    * Constructor
 86  
    *
 87  
    * @param conf Giraph configuration
 88  
    */
 89  
   public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws
 90  0
       ExecutionException, InterruptedException {
 91  0
     this.conf = conf;
 92  0
     numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
 93  0
     retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
 94  0
     resetConnection();
 95  0
   }
 96  
 
 97  
   @Override
 98  
   public void init(GiraphConfiguration conf) throws Exception {
 99  0
     this.conf = conf;
 100  0
     numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
 101  0
     retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
 102  0
     resetConnection();
 103  0
   }
 104  
 
 105  
   /**
 106  
    * Try to establish new connection to JobProgressTracker
 107  
    */
 108  
   private void resetConnection() throws ExecutionException,
 109  
       InterruptedException {
 110  0
     clientManager = new ThriftClientManager(
 111  
         new ThriftCodecManager(new ThriftCodec[0]),
 112  
         new NiftyClient(
 113  0
             new NettyClientConfigBuilder().setWorkerThreadCount(2).build()),
 114  0
         ImmutableSet.<ThriftClientEventHandler>of());
 115  0
     FramedClientConnector connector =
 116  
         new FramedClientConnector(new InetSocketAddress(
 117  0
             ClientThriftServer.CLIENT_THRIFT_SERVER_HOST.get(conf),
 118  0
             ClientThriftServer.CLIENT_THRIFT_SERVER_PORT.get(conf)));
 119  0
     jobProgressTracker =
 120  0
         clientManager.createClient(connector, JobProgressTracker.class).get();
 121  
 
 122  0
   }
 123  
 
 124  
   @Override
 125  
   public synchronized void cleanup() throws IOException {
 126  0
     Closeables.close(clientManager, true);
 127  
     try {
 128  0
       clientManager.close();
 129  
       // CHECKSTYLE: stop IllegalCatch
 130  0
     } catch (Exception e) {
 131  
       // CHECKSTYLE: resume IllegalCatch
 132  0
       if (LOG.isDebugEnabled()) {
 133  0
         LOG.debug(
 134  
             "Exception occurred while trying to close JobProgressTracker", e);
 135  
       }
 136  0
     }
 137  0
   }
 138  
 
 139  
   @Override
 140  
   public synchronized void mapperStarted() {
 141  0
     executeWithRetry(new Runnable() {
 142  
       @Override
 143  
       public void run() {
 144  0
         jobProgressTracker.mapperStarted();
 145  0
       }
 146  
     }, numRetries);
 147  0
   }
 148  
 
 149  
   @Override
 150  
   public synchronized void logInfo(final String logLine) {
 151  0
     executeWithRetry(new Runnable() {
 152  
       @Override
 153  
       public void run() {
 154  0
         jobProgressTracker.logInfo(logLine);
 155  0
       }
 156  
     }, numRetries);
 157  0
   }
 158  
 
 159  
   @Override
 160  
   public synchronized void logError(final String logLine,
 161  
                                     final byte [] exByteArray) {
 162  0
     executeWithRetry(new Runnable() {
 163  
       @Override
 164  
       public void run() {
 165  0
         jobProgressTracker.logError(logLine, exByteArray);
 166  0
       }
 167  
     }, numRetries);
 168  0
   }
 169  
 
 170  
   @Override
 171  
   public synchronized void logFailure(final String reason) {
 172  0
     executeWithRetry(new Runnable() {
 173  
       @Override
 174  
       public void run() {
 175  0
         jobProgressTracker.logFailure(reason);
 176  0
       }
 177  
     }, numRetries);
 178  0
   }
 179  
 
 180  
   @Override
 181  
   public synchronized void updateProgress(final WorkerProgress workerProgress) {
 182  0
     executeWithRetry(new Runnable() {
 183  
       @Override
 184  
       public void run() {
 185  0
         jobProgressTracker.updateProgress(workerProgress);
 186  0
       }
 187  
     }, numRetries);
 188  0
   }
 189  
 
 190  
   @Override
 191  
   public void updateMasterProgress(final MasterProgress masterProgress) {
 192  0
     executeWithRetry(new Runnable() {
 193  
       @Override
 194  
       public void run() {
 195  0
         jobProgressTracker.updateMasterProgress(masterProgress);
 196  0
       }
 197  
     }, numRetries);
 198  0
   }
 199  
 
 200  
   @Override
 201  
   public void sendMasterCounters(GiraphCountersThriftStruct giraphCounters) {
 202  0
     executeWithRetry(new Runnable() {
 203  
       @Override
 204  
       public void run() {
 205  0
         jobProgressTracker.sendMasterCounters(giraphCounters);
 206  0
       }
 207  
     }, numRetries);
 208  0
   }
 209  
 
 210  
   /**
 211  
    * Execute Runnable, if disconnected try to connect again and retry
 212  
    *
 213  
    * @param runnable Runnable to execute
 214  
    * @param numRetries Number of retries
 215  
    */
 216  
   private void executeWithRetry(Runnable runnable, int numRetries) {
 217  
     try {
 218  0
       runnable.run();
 219  0
     } catch (RuntimeTTransportException | RejectedExecutionException te) {
 220  0
       if (LOG.isDebugEnabled()) {
 221  0
         LOG.debug(te.getClass() + " occurred while talking to " +
 222  
           "JobProgressTracker server, trying to reconnect", te);
 223  
       }
 224  0
       for (int i = 1; i <= numRetries; i++) {
 225  
         try {
 226  0
           ThreadUtils.trySleep(retryWaitMsec);
 227  0
           retry(runnable);
 228  0
           break; // If the retry succeeded, we simply break from the loop
 229  
 
 230  0
         } catch (RuntimeTTransportException | RejectedExecutionException e) {
 231  
           // If a RuntimeTTTransportException happened, then we will retry
 232  0
           if (LOG.isInfoEnabled()) {
 233  0
             LOG.info("Exception occurred while talking to " +
 234  
               "JobProgressTracker server after retry " + i +
 235  
               " of " + numRetries, e);
 236  
           }
 237  
           // CHECKSTYLE: stop IllegalCatch
 238  0
         } catch (Exception e) {
 239  
           // CHECKSTYLE: resume IllegalCatch
 240  
           // If any other exception happened (e.g. application-specific),
 241  
           // then we stop.
 242  0
           LOG.info("Exception occurred while talking to " +
 243  
             "JobProgressTracker server after retry " + i +
 244  
             " of " + numRetries + ", giving up", e);
 245  0
           break;
 246  0
         }
 247  
       }
 248  
       // CHECKSTYLE: stop IllegalCatch
 249  0
     } catch (Exception e) {
 250  
       // CHECKSTYLE: resume IllegalCatch
 251  0
       if (LOG.isInfoEnabled()) {
 252  0
         LOG.info("Exception occurred while talking to " +
 253  
           "JobProgressTracker server, giving up", e);
 254  
       }
 255  0
     }
 256  0
   }
 257  
 
 258  
   /**
 259  
    * Executes a single retry by closing the existing {@link #clientManager}
 260  
    * connection, re-initializing it, and then executing the passed instance
 261  
    * of {@link Runnable}.
 262  
    *
 263  
    * @param runnable Instance of {@link Runnable} to execute.
 264  
    * @throws ExecutionException
 265  
    * @throws InterruptedException
 266  
    */
 267  
   private void retry(Runnable runnable) throws ExecutionException,
 268  
     InterruptedException {
 269  
     try {
 270  0
       clientManager.close();
 271  
       // CHECKSTYLE: stop IllegalCatch
 272  0
     } catch (Exception e) {
 273  
       // CHECKSTYLE: resume IllegalCatch
 274  0
       if (LOG.isDebugEnabled()) {
 275  0
         LOG.debug(
 276  
           "Exception occurred while trying to close client manager", e);
 277  
       }
 278  0
     }
 279  0
     resetConnection();
 280  0
     runnable.run();
 281  0
   }
 282  
 }