Coverage Report - org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor
 
Classes in this File Line Coverage Branch Coverage Complexity
NettyWorkerClientRequestProcessor
0%
0/174
0%
0/38
2
NettyWorkerClientRequestProcessor$1
0%
0/2
N/A
2
NettyWorkerClientRequestProcessor$2
0%
0/3
N/A
2
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.comm.netty;
 19  
 
 20  
 import java.io.IOException;
 21  
 import java.util.Iterator;
 22  
 import java.util.Map;
 23  
 import java.util.concurrent.ConcurrentMap;
 24  
 
 25  
 import org.apache.giraph.bsp.BspService;
 26  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 27  
 import org.apache.giraph.comm.SendEdgeCache;
 28  
 import org.apache.giraph.comm.SendMessageCache;
 29  
 import org.apache.giraph.comm.SendMutationsCache;
 30  
 import org.apache.giraph.comm.SendOneMessageToManyCache;
 31  
 import org.apache.giraph.comm.SendPartitionCache;
 32  
 import org.apache.giraph.comm.ServerData;
 33  
 import org.apache.giraph.comm.WorkerClient;
 34  
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 35  
 import org.apache.giraph.comm.messages.MessageStore;
 36  
 import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
 37  
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 38  
 import org.apache.giraph.comm.requests.SendVertexRequest;
 39  
 import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
 40  
 import org.apache.giraph.comm.requests.SendWorkerVerticesRequest;
 41  
 import org.apache.giraph.comm.requests.WorkerRequest;
 42  
 import org.apache.giraph.comm.requests.WritableRequest;
 43  
 import org.apache.giraph.conf.GiraphConfiguration;
 44  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 45  
 import org.apache.giraph.edge.Edge;
 46  
 import org.apache.giraph.factories.MessageValueFactory;
 47  
 import org.apache.giraph.graph.Vertex;
 48  
 import org.apache.giraph.graph.VertexMutations;
 49  
 import org.apache.giraph.metrics.GiraphMetrics;
 50  
 import org.apache.giraph.metrics.MetricNames;
 51  
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 52  
 import org.apache.giraph.partition.Partition;
 53  
 import org.apache.giraph.partition.PartitionOwner;
 54  
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 55  
 import org.apache.giraph.utils.ExtendedDataOutput;
 56  
 import org.apache.giraph.utils.PairList;
 57  
 import org.apache.giraph.utils.VertexIdEdges;
 58  
 import org.apache.giraph.worker.WorkerInfo;
 59  
 import org.apache.hadoop.io.Writable;
 60  
 import org.apache.hadoop.io.WritableComparable;
 61  
 import org.apache.hadoop.mapreduce.Mapper;
 62  
 import org.apache.log4j.Logger;
 63  
 
 64  
 import com.yammer.metrics.core.Counter;
 65  
 import com.yammer.metrics.core.Gauge;
 66  
 import com.yammer.metrics.util.PercentGauge;
 67  
 
 68  
 /**
 69  
  * Aggregate requests and sends them to the thread-safe NettyClient.  This
 70  
  * class is not thread-safe and expected to be used and then thrown away after
 71  
  * a phase of communication has completed.
 72  
  *
 73  
  * @param <I> Vertex id
 74  
  * @param <V> Vertex data
 75  
  * @param <E> Edge data
 76  
  */
 77  
 @SuppressWarnings("unchecked")
 78  
 public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
 79  
     V extends Writable, E extends Writable> implements
 80  
     WorkerClientRequestProcessor<I, V, E> {
 81  
   /** Class logger */
 82  0
   private static final Logger LOG =
 83  0
       Logger.getLogger(NettyWorkerClientRequestProcessor.class);
 84  
   /** Cached partitions of vertices to send */
 85  
   private final SendPartitionCache<I, V, E> sendPartitionCache;
 86  
   /** Cached map of partitions to vertex indices to messages */
 87  
   private final SendMessageCache<I, Writable> sendMessageCache;
 88  
   /** Cache of edges to be sent. */
 89  
   private final SendEdgeCache<I, E> sendEdgeCache;
 90  
   /** Cached map of partitions to vertex indices to mutations */
 91  0
   private final SendMutationsCache<I, V, E> sendMutationsCache =
 92  
       new SendMutationsCache<I, V, E>();
 93  
   /** NettyClient that could be shared among one or more instances */
 94  
   private final WorkerClient<I, V, E> workerClient;
 95  
   /** Maximum size of messages per remote worker to cache before sending */
 96  
   private final int maxMessagesSizePerWorker;
 97  
   /** Maximum size of vertices per remote worker to cache before sending. */
 98  
   private final int maxVerticesSizePerWorker;
 99  
   /** Maximum size of edges per remote worker to cache before sending. */
 100  
   private final int maxEdgesSizePerWorker;
 101  
   /** Maximum number of mutations per partition before sending */
 102  
   private final int maxMutationsPerPartition;
 103  
   /** Giraph configuration */
 104  
   private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
 105  
   /** Service worker */
 106  
   private final CentralizedServiceWorker<I, V, E> serviceWorker;
 107  
   /** Server data from the server (used for local requests) */
 108  
   private final ServerData<I, V, E> serverData;
 109  
 
 110  
   // Per-Superstep Metrics
 111  
   /** Number of requests that went on the wire */
 112  
   private final Counter localRequests;
 113  
   /** Number of requests that were handled locally */
 114  
   private final Counter remoteRequests;
 115  
   /** Cached message value factory */
 116  
   private final MessageValueFactory messageValueFactory;
 117  
 
 118  
   /**
 119  
    * Constructor.
 120  
    *
 121  
    * @param context Context
 122  
    * @param conf Configuration
 123  
    * @param serviceWorker Service worker
 124  
    * @param useOneMessageToManyIdsEncoding should use one message to many
 125  
    */
 126  
   public NettyWorkerClientRequestProcessor(
 127  
       Mapper<?, ?, ?, ?>.Context context,
 128  
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
 129  
       CentralizedServiceWorker<I, V, E> serviceWorker,
 130  0
       boolean useOneMessageToManyIdsEncoding) {
 131  0
     this.workerClient = serviceWorker.getWorkerClient();
 132  0
     this.configuration = conf;
 133  
 
 134  
 
 135  0
     sendPartitionCache =
 136  
         new SendPartitionCache<I, V, E>(conf, serviceWorker);
 137  0
     sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
 138  0
     maxMessagesSizePerWorker =
 139  0
         GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
 140  0
     maxVerticesSizePerWorker =
 141  0
         GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
 142  0
     if (useOneMessageToManyIdsEncoding) {
 143  0
       sendMessageCache =
 144  
         new SendOneMessageToManyCache<I, Writable>(conf, serviceWorker,
 145  
           this, maxMessagesSizePerWorker);
 146  
     } else {
 147  0
       sendMessageCache =
 148  
         new SendMessageCache<I, Writable>(conf, serviceWorker,
 149  
           this, maxMessagesSizePerWorker);
 150  
     }
 151  0
     maxEdgesSizePerWorker =
 152  0
         GiraphConfiguration.MAX_EDGE_REQUEST_SIZE.get(conf);
 153  0
     maxMutationsPerPartition =
 154  0
         GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST.get(conf);
 155  0
     this.serviceWorker = serviceWorker;
 156  0
     this.serverData = serviceWorker.getServerData();
 157  
 
 158  
     // Per-Superstep Metrics.
 159  
     // Since this object is not long lived we just initialize the metrics here.
 160  0
     SuperstepMetricsRegistry smr = GiraphMetrics.get().perSuperstep();
 161  0
     localRequests = smr.getCounter(MetricNames.LOCAL_REQUESTS);
 162  0
     remoteRequests = smr.getCounter(MetricNames.REMOTE_REQUESTS);
 163  0
     setupGauges(smr, localRequests, remoteRequests);
 164  0
     messageValueFactory = configuration.createOutgoingMessageValueFactory();
 165  0
   }
 166  
 
 167  
   @Override
 168  
   public void sendMessageRequest(I destVertexId, Writable message) {
 169  0
     this.sendMessageCache.sendMessageRequest(destVertexId, message);
 170  0
   }
 171  
 
 172  
   @Override
 173  
   public void sendMessageToAllRequest(
 174  
     Vertex<I, V, E> vertex, Writable message) {
 175  0
     this.sendMessageCache.sendMessageToAllRequest(vertex, message);
 176  0
   }
 177  
 
 178  
   @Override
 179  
   public void sendMessageToAllRequest(
 180  
     Iterator<I> vertexIdIterator, Writable message) {
 181  0
     this.sendMessageCache.sendMessageToAllRequest(vertexIdIterator, message);
 182  0
   }
 183  
 
 184  
   @Override
 185  
   public void sendPartitionRequest(WorkerInfo workerInfo,
 186  
                                    Partition<I, V, E> partition) {
 187  0
     if (LOG.isTraceEnabled()) {
 188  0
       LOG.trace("sendVertexRequest: Sending to " + workerInfo +
 189  
           ", with partition " + partition);
 190  
     }
 191  
 
 192  0
     WritableRequest vertexRequest = new SendVertexRequest<I, V, E>(partition);
 193  0
     doRequest(workerInfo, vertexRequest);
 194  
 
 195  
     // Messages are stored separately
 196  0
     if (serviceWorker.getSuperstep() != BspService.INPUT_SUPERSTEP) {
 197  0
       sendPartitionMessages(workerInfo, partition);
 198  0
       ConcurrentMap<I, VertexMutations<I, V, E>> vertexMutationMap =
 199  0
           serverData.getPartitionMutations().remove(partition.getId());
 200  0
       WritableRequest partitionMutationsRequest =
 201  0
           new SendPartitionMutationsRequest<I, V, E>(partition.getId(),
 202  
               vertexMutationMap);
 203  0
       doRequest(workerInfo, partitionMutationsRequest);
 204  
     }
 205  0
   }
 206  
 
 207  
   /**
 208  
    * Send all messages for a partition to another worker.
 209  
    *
 210  
    * @param workerInfo Worker to send the partition messages to
 211  
    * @param partition Partition whose messages to send
 212  
    */
 213  
   private void sendPartitionMessages(WorkerInfo workerInfo,
 214  
                                      Partition<I, V, E> partition) {
 215  0
     final int partitionId = partition.getId();
 216  0
     MessageStore<I, Writable> messageStore =
 217  0
         serverData.getCurrentMessageStore();
 218  0
     ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
 219  
         new ByteArrayVertexIdMessages<I, Writable>(
 220  
             messageValueFactory);
 221  0
     vertexIdMessages.setConf(configuration);
 222  0
     vertexIdMessages.initialize();
 223  
     for (I vertexId :
 224  0
         messageStore.getPartitionDestinationVertices(partitionId)) {
 225  
       // Messages cannot be re-used from this iterable, but add()
 226  
       // serializes the message, making this safe
 227  0
       Iterable<Writable> messages = messageStore.getVertexMessages(vertexId);
 228  0
       for (Writable message : messages) {
 229  0
         vertexIdMessages.add(vertexId, message);
 230  0
       }
 231  0
       if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
 232  0
         WritableRequest messagesRequest =
 233  
             new SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
 234  
             partitionId, vertexIdMessages);
 235  0
         doRequest(workerInfo, messagesRequest);
 236  0
         vertexIdMessages =
 237  
             new ByteArrayVertexIdMessages<I, Writable>(
 238  
                 messageValueFactory);
 239  0
         vertexIdMessages.setConf(configuration);
 240  0
         vertexIdMessages.initialize();
 241  
       }
 242  0
     }
 243  0
     if (!vertexIdMessages.isEmpty()) {
 244  0
       WritableRequest messagesRequest = new
 245  
           SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
 246  
           partitionId, vertexIdMessages);
 247  0
       doRequest(workerInfo, messagesRequest);
 248  
     }
 249  0
     messageStore.clearPartition(partitionId);
 250  0
   }
 251  
 
 252  
   @Override
 253  
   public boolean sendVertexRequest(PartitionOwner partitionOwner,
 254  
       Vertex<I, V, E> vertex) {
 255  
     // Add the vertex to the cache
 256  0
     int workerMessageSize = sendPartitionCache.addVertex(
 257  
         partitionOwner, vertex);
 258  
 
 259  
     // Send a request if the cache of outgoing message to
 260  
     // the remote worker 'workerInfo' is full enough to be flushed
 261  0
     if (workerMessageSize >= maxVerticesSizePerWorker) {
 262  
       PairList<Integer, ExtendedDataOutput>
 263  0
           workerPartitionVertices =
 264  0
           sendPartitionCache.removeWorkerData(partitionOwner.getWorkerInfo());
 265  0
       WritableRequest writableRequest =
 266  
           new SendWorkerVerticesRequest<I, V, E>(
 267  
               configuration, workerPartitionVertices);
 268  0
       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
 269  0
       return true;
 270  
     }
 271  
 
 272  0
     return false;
 273  
   }
 274  
 
 275  
   @Override
 276  
   public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
 277  
       IOException {
 278  0
     PartitionOwner partitionOwner =
 279  0
         serviceWorker.getVertexPartitionOwner(vertexIndex);
 280  0
     int partitionId = partitionOwner.getPartitionId();
 281  0
     if (LOG.isTraceEnabled()) {
 282  0
       LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
 283  
           vertexIndex + " with partition " + partitionId);
 284  
     }
 285  
 
 286  
     // Add the message to the cache
 287  0
     int partitionMutationCount =
 288  0
         sendMutationsCache.addEdgeMutation(partitionId, vertexIndex, edge);
 289  
 
 290  0
     sendMutationsRequestIfFull(
 291  
         partitionId, partitionOwner, partitionMutationCount);
 292  0
   }
 293  
 
 294  
   @Override
 295  
   public boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
 296  
     throws IOException {
 297  0
     PartitionOwner owner =
 298  0
         serviceWorker.getVertexPartitionOwner(sourceVertexId);
 299  0
     WorkerInfo workerInfo = owner.getWorkerInfo();
 300  0
     final int partitionId = owner.getPartitionId();
 301  0
     if (LOG.isTraceEnabled()) {
 302  0
       LOG.trace("sendEdgeRequest: Send bytes (" + edge.toString() +
 303  
           ") to " + sourceVertexId + " on worker " + workerInfo);
 304  
     }
 305  
 
 306  
     // Add the message to the cache
 307  0
     int workerEdgesSize = sendEdgeCache.addEdge(
 308  
         workerInfo, partitionId, sourceVertexId, edge);
 309  
 
 310  
     // Send a request if the cache of outgoing edges to the remote worker is
 311  
     // full
 312  0
     if (workerEdgesSize >= maxEdgesSizePerWorker) {
 313  0
       PairList<Integer, VertexIdEdges<I, E>> workerEdges =
 314  0
           sendEdgeCache.removeWorkerEdges(workerInfo);
 315  0
       WritableRequest writableRequest =
 316  
           new SendWorkerEdgesRequest<I, E>(workerEdges);
 317  0
       doRequest(workerInfo, writableRequest);
 318  0
       return true;
 319  
     }
 320  
 
 321  0
     return false;
 322  
   }
 323  
 
 324  
   /**
 325  
    * Send a mutations request if the maximum number of mutations per partition
 326  
    * was met.
 327  
    *
 328  
    * @param partitionId Partition id
 329  
    * @param partitionOwner Owner of the partition
 330  
    * @param partitionMutationCount Number of mutations for this partition
 331  
    */
 332  
   private void sendMutationsRequestIfFull(
 333  
       int partitionId, PartitionOwner partitionOwner,
 334  
       int partitionMutationCount) {
 335  
     // Send a request if enough mutations are there for a partition
 336  0
     if (partitionMutationCount >= maxMutationsPerPartition) {
 337  0
       Map<I, VertexMutations<I, V, E>> partitionMutations =
 338  0
           sendMutationsCache.removePartitionMutations(partitionId);
 339  0
       WritableRequest writableRequest =
 340  
           new SendPartitionMutationsRequest<I, V, E>(
 341  
               partitionId, partitionMutations);
 342  0
       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
 343  
     }
 344  0
   }
 345  
 
 346  
   @Override
 347  
   public void removeEdgesRequest(I vertexIndex,
 348  
                                  I destinationVertexIndex) throws IOException {
 349  0
     PartitionOwner partitionOwner =
 350  0
         serviceWorker.getVertexPartitionOwner(vertexIndex);
 351  0
     int partitionId = partitionOwner.getPartitionId();
 352  0
     if (LOG.isTraceEnabled()) {
 353  0
       LOG.trace("removeEdgesRequest: Removing edge " +
 354  
           destinationVertexIndex +
 355  
           " for index " + vertexIndex + " with partition " + partitionId);
 356  
     }
 357  
 
 358  
     // Add the message to the cache
 359  0
     int partitionMutationCount =
 360  0
         sendMutationsCache.removeEdgeMutation(
 361  0
             partitionId, vertexIndex, destinationVertexIndex);
 362  
 
 363  0
     sendMutationsRequestIfFull(
 364  
         partitionId, partitionOwner, partitionMutationCount);
 365  0
   }
 366  
 
 367  
   @Override
 368  
   public void addVertexRequest(Vertex<I, V, E> vertex) throws IOException {
 369  0
     PartitionOwner partitionOwner =
 370  0
         serviceWorker.getVertexPartitionOwner(vertex.getId());
 371  0
     int partitionId = partitionOwner.getPartitionId();
 372  0
     if (LOG.isTraceEnabled()) {
 373  0
       LOG.trace("addVertexRequest: Sending vertex " + vertex +
 374  
           " to partition " + partitionId);
 375  
     }
 376  
 
 377  
     // Add the message to the cache
 378  0
     int partitionMutationCount =
 379  0
         sendMutationsCache.addVertexMutation(partitionId, vertex);
 380  
 
 381  0
     sendMutationsRequestIfFull(
 382  
         partitionId, partitionOwner, partitionMutationCount);
 383  0
   }
 384  
 
 385  
   @Override
 386  
   public void removeVertexRequest(I vertexIndex) throws IOException {
 387  0
     PartitionOwner partitionOwner =
 388  0
         serviceWorker.getVertexPartitionOwner(vertexIndex);
 389  0
     int partitionId = partitionOwner.getPartitionId();
 390  0
     if (LOG.isTraceEnabled()) {
 391  0
       LOG.trace("removeVertexRequest: Removing vertex index " +
 392  
           vertexIndex + " from partition " + partitionId);
 393  
     }
 394  
 
 395  
     // Add the message to the cache
 396  0
     int partitionMutationCount =
 397  0
         sendMutationsCache.removeVertexMutation(partitionId, vertexIndex);
 398  
 
 399  0
     sendMutationsRequestIfFull(
 400  
         partitionId, partitionOwner, partitionMutationCount);
 401  0
   }
 402  
 
 403  
   @Override
 404  
   public void flush() throws IOException {
 405  
     // Execute the remaining sends messages (if any)
 406  
     // including individual and compact messages.
 407  0
     sendMessageCache.flush();
 408  
 
 409  
     // Execute the remaining sends vertices (if any)
 410  
     PairList<WorkerInfo, PairList<Integer, ExtendedDataOutput>>
 411  0
         remainingVertexCache = sendPartitionCache.removeAllData();
 412  
     PairList<WorkerInfo,
 413  
         PairList<Integer, ExtendedDataOutput>>.Iterator
 414  0
         vertexIterator = remainingVertexCache.getIterator();
 415  0
     while (vertexIterator.hasNext()) {
 416  0
       vertexIterator.next();
 417  0
       WritableRequest writableRequest =
 418  
           new SendWorkerVerticesRequest(
 419  0
               configuration, vertexIterator.getCurrentSecond());
 420  0
       doRequest(vertexIterator.getCurrentFirst(), writableRequest);
 421  0
     }
 422  
 
 423  
     // Execute the remaining sends edges (if any)
 424  
     PairList<WorkerInfo, PairList<Integer,
 425  
         VertexIdEdges<I, E>>>
 426  0
         remainingEdgeCache = sendEdgeCache.removeAllEdges();
 427  
     PairList<WorkerInfo,
 428  
         PairList<Integer, VertexIdEdges<I, E>>>.Iterator
 429  0
         edgeIterator = remainingEdgeCache.getIterator();
 430  0
     while (edgeIterator.hasNext()) {
 431  0
       edgeIterator.next();
 432  0
       WritableRequest writableRequest =
 433  
           new SendWorkerEdgesRequest<I, E>(
 434  0
               edgeIterator.getCurrentSecond());
 435  0
       doRequest(edgeIterator.getCurrentFirst(), writableRequest);
 436  0
     }
 437  
 
 438  
     // Execute the remaining sends mutations (if any)
 439  0
     Map<Integer, Map<I, VertexMutations<I, V, E>>> remainingMutationsCache =
 440  0
         sendMutationsCache.removeAllPartitionMutations();
 441  
     for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E>>> entry :
 442  0
         remainingMutationsCache.entrySet()) {
 443  0
       WritableRequest writableRequest =
 444  
           new SendPartitionMutationsRequest<I, V, E>(
 445  0
               entry.getKey(), entry.getValue());
 446  0
       PartitionOwner partitionOwner =
 447  0
           serviceWorker.getVertexPartitionOwner(
 448  0
               entry.getValue().keySet().iterator().next());
 449  0
       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
 450  0
     }
 451  0
   }
 452  
 
 453  
   @Override
 454  
   public long resetMessageCount() {
 455  0
     return this.sendMessageCache.resetMessageCount();
 456  
   }
 457  
 
 458  
   @Override
 459  
   public long resetMessageBytesCount() {
 460  0
     return this.sendMessageCache.resetMessageBytesCount();
 461  
   }
 462  
 
 463  
   /**
 464  
    * When doing the request, short circuit if it is local
 465  
    *
 466  
    * @param workerInfo Worker info
 467  
    * @param writableRequest Request to either submit or run locally
 468  
    */
 469  
   public void doRequest(WorkerInfo workerInfo,
 470  
                          WritableRequest writableRequest) {
 471  
     // If this is local, execute locally
 472  0
     if (serviceWorker.getWorkerInfo().getTaskId() ==
 473  0
         workerInfo.getTaskId()) {
 474  0
       ((WorkerRequest) writableRequest).doRequest(serverData);
 475  0
       localRequests.inc();
 476  
     } else {
 477  0
       workerClient.sendWritableRequest(
 478  0
           workerInfo.getTaskId(), writableRequest);
 479  0
       remoteRequests.inc();
 480  
     }
 481  0
   }
 482  
 
 483  
   /**
 484  
    * Sets up gauges for superstep metrics.
 485  
    * This has to be static so that internal objects created here don't
 486  
    * hold references to this$0. Otherwise we memory leaking
 487  
    * NettyWorkerClientRequestProcessor objects.
 488  
    *
 489  
    * @param smr metric registry for current superstep
 490  
    * @param localRequests counter for local requests
 491  
    * @param remoteRequests counter for remote requests
 492  
    */
 493  
   private static void setupGauges(SuperstepMetricsRegistry smr,
 494  
                                   final Counter localRequests,
 495  
                                   final Counter remoteRequests) {
 496  0
     final Gauge<Long> totalRequests = smr.getGauge(MetricNames.TOTAL_REQUESTS,
 497  0
         new Gauge<Long>() {
 498  
           @Override
 499  
           public Long value() {
 500  0
             return localRequests.count() + remoteRequests.count();
 501  
           }
 502  
         }
 503  
     );
 504  0
     smr.getGauge(MetricNames.PERCENT_LOCAL_REQUESTS, new PercentGauge() {
 505  
       @Override protected double getNumerator() {
 506  0
         return localRequests.count();
 507  
       }
 508  
 
 509  
       @Override protected double getDenominator() {
 510  0
         return totalRequests.value();
 511  
       }
 512  
     });
 513  0
   }
 514  
 }