Coverage Report - org.apache.giraph.comm.flow_control.StaticFlowControl
 
Classes in this File Line Coverage Branch Coverage Complexity
StaticFlowControl
0%
0/47
0%
0/10
1.7
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.flow_control;
 20  
 
 21  
 import com.yammer.metrics.core.Counter;
 22  
 import org.apache.giraph.comm.netty.NettyClient;
 23  
 import org.apache.giraph.comm.netty.handler.AckSignalFlag;
 24  
 import org.apache.giraph.comm.requests.WritableRequest;
 25  
 import org.apache.giraph.conf.FloatConfOption;
 26  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 27  
 import org.apache.giraph.conf.IntConfOption;
 28  
 import org.apache.giraph.metrics.GiraphMetrics;
 29  
 import org.apache.giraph.metrics.MetricNames;
 30  
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 31  
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 32  
 import org.apache.log4j.Logger;
 33  
 
 34  
 import java.util.concurrent.atomic.AtomicInteger;
 35  
 
 36  
 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
 37  
 
 38  
 /**
 39  
  * Representation of a flow control that limits the aggregate number of open
 40  
  * requests to all other workers to a constant user-defined value
 41  
  */
 42  
 public class StaticFlowControl implements
 43  
     FlowControl, ResetSuperstepMetricsObserver {
 44  
   /** Maximum number of requests without confirmation we should have */
 45  0
   public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS =
 46  
       new IntConfOption("giraph.maxNumberOfOpenRequests", 10000,
 47  
           "Maximum number of requests without confirmation we should have");
 48  
   /**
 49  
    * After pausing a thread due to too large number of open requests,
 50  
    * which fraction of these requests need to be closed before we continue
 51  
    */
 52  
   public static final FloatConfOption
 53  0
       FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING =
 54  
       new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding",
 55  
           0.2f, "Fraction of requests to close before proceeding");
 56  
   /** Class logger */
 57  0
   private static final Logger LOG = Logger.getLogger(StaticFlowControl.class);
 58  
 
 59  
   /** Maximum number of requests without confirmation we can have */
 60  
   private final int maxNumberOfOpenRequests;
 61  
   /**
 62  
    * Maximum number of requests that can be open after the pause in order to
 63  
    * proceed
 64  
    */
 65  
   private final int numberOfRequestsToProceed;
 66  
   /** Netty client used for sending requests */
 67  
   private final NettyClient nettyClient;
 68  
   /** Waiting interval for checking outstanding requests msecs */
 69  
   private final int waitingRequestMsecs;
 70  
   /** Dummy object to wait on until enough open requests get completed */
 71  0
   private final Object requestSpotAvailable = new Object();
 72  
   /** Counter for time spent waiting on too many open requests */
 73  
   private Counter timeWaitingOnOpenRequests;
 74  
   /** Number of threads waiting on too many open requests */
 75  0
   private final AtomicInteger numWaitingThreads = new AtomicInteger(0);
 76  
 
 77  
   /**
 78  
    * Constructor
 79  
    *
 80  
    * @param conf configuration
 81  
    * @param nettyClient netty client
 82  
    */
 83  
   public StaticFlowControl(ImmutableClassesGiraphConfiguration conf,
 84  0
                            NettyClient nettyClient) {
 85  0
     this.nettyClient = nettyClient;
 86  0
     maxNumberOfOpenRequests = MAX_NUMBER_OF_OPEN_REQUESTS.get(conf);
 87  0
     numberOfRequestsToProceed = (int) (maxNumberOfOpenRequests *
 88  0
         (1 - FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING.get(conf)));
 89  0
     if (LOG.isInfoEnabled()) {
 90  0
       LOG.info("StaticFlowControl: Limit number of open requests to " +
 91  
           maxNumberOfOpenRequests + " and proceed when <= " +
 92  
           numberOfRequestsToProceed);
 93  
     }
 94  0
     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
 95  0
     GiraphMetrics.get().addSuperstepResetObserver(this);
 96  0
   }
 97  
 
 98  
   @Override
 99  
   public void newSuperstep(SuperstepMetricsRegistry metrics) {
 100  0
     timeWaitingOnOpenRequests = metrics.getCounter(
 101  
         MetricNames.TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS);
 102  0
   }
 103  
 
 104  
   @Override
 105  
   public void sendRequest(int destTaskId, WritableRequest request) {
 106  0
     nettyClient.doSend(destTaskId, request);
 107  0
     if (nettyClient.getNumberOfOpenRequests() > maxNumberOfOpenRequests) {
 108  0
       long startTime = System.currentTimeMillis();
 109  0
       waitSomeRequests();
 110  0
       timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime);
 111  
     }
 112  0
   }
 113  
 
 114  
   /**
 115  
    * Ensure that at most numberOfRequestsToProceed are not complete.
 116  
    * Periodically, check the state of every request.  If we find the connection
 117  
    * failed, re-establish it and re-send the request.
 118  
    */
 119  
   private void waitSomeRequests() {
 120  0
     numWaitingThreads.getAndIncrement();
 121  0
     while (nettyClient.getNumberOfOpenRequests() > numberOfRequestsToProceed) {
 122  
       // Wait for requests to complete for some time
 123  0
       synchronized (requestSpotAvailable) {
 124  0
         if (nettyClient.getNumberOfOpenRequests() <=
 125  
             numberOfRequestsToProceed) {
 126  0
           break;
 127  
         }
 128  
         try {
 129  0
           requestSpotAvailable.wait(waitingRequestMsecs);
 130  0
         } catch (InterruptedException e) {
 131  0
           throw new IllegalStateException("waitSomeRequests: Got unexpected " +
 132  
               "InterruptedException", e);
 133  0
         }
 134  0
       }
 135  0
       nettyClient.logAndSanityCheck();
 136  
     }
 137  0
     numWaitingThreads.getAndDecrement();
 138  0
   }
 139  
 
 140  
   @Override
 141  
   public void messageAckReceived(int taskId, long requestId, int response) {
 142  0
     synchronized (requestSpotAvailable) {
 143  0
       requestSpotAvailable.notifyAll();
 144  0
     }
 145  0
   }
 146  
 
 147  
   @Override
 148  
   public AckSignalFlag getAckSignalFlag(int response) {
 149  0
     return AckSignalFlag.values()[response];
 150  
   }
 151  
 
 152  
   @Override
 153  
   public int calculateResponse(AckSignalFlag alreadyDone, int clientId) {
 154  0
     return alreadyDone.ordinal();
 155  
   }
 156  
 
 157  
   @Override
 158  
   public void logInfo() {
 159  0
     if (LOG.isInfoEnabled()) {
 160  0
       LOG.info("logInfo: " + numWaitingThreads.get() + " threads waiting " +
 161  
           "until number of open requests falls below " +
 162  
           numberOfRequestsToProceed);
 163  
     }
 164  0
   }
 165  
 
 166  
   @Override
 167  
   public void waitAllRequests() {
 168  
     // This flow control policy does not keep any unsent request. All the open
 169  
     // requests are in possession of the network client, so the network client
 170  
     // will wait for them to complete. Thus, this flow control policy does not
 171  
     // need to do anything regarding flushing the remaining requests.
 172  0
   }
 173  
 
 174  
   @Override
 175  
   public int getNumberOfUnsentRequests() {
 176  0
     return 0;
 177  
   }
 178  
 }