Coverage Report - org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor
 
Classes in this File Line Coverage Branch Coverage Complexity
NettyWorkerAggregatorRequestProcessor
0%
0/42
0%
0/16
2.143
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.netty;
 20  
 
 21  
 import java.io.IOException;
 22  
 
 23  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 24  
 import org.apache.giraph.comm.GlobalCommType;
 25  
 import org.apache.giraph.comm.WorkerClient;
 26  
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 27  
 import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
 28  
 import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
 29  
 import org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest;
 30  
 import org.apache.giraph.comm.requests.SendReducedToMasterRequest;
 31  
 import org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest;
 32  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 33  
 import org.apache.giraph.worker.WorkerInfo;
 34  
 import org.apache.hadoop.io.Writable;
 35  
 import org.apache.hadoop.util.Progressable;
 36  
 
 37  
 /**
 38  
  * Netty implementation of {@link WorkerAggregatorRequestProcessor}
 39  
  */
 40  
 public class NettyWorkerAggregatorRequestProcessor
 41  
     implements WorkerAggregatorRequestProcessor {
 42  
   /** Progressable used to report progress */
 43  
   private final Progressable progressable;
 44  
   /** NettyClient that could be shared among one or more instances */
 45  
   private final WorkerClient<?, ?, ?> workerClient;
 46  
   /** Service worker */
 47  
   private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
 48  
   /** Cached map of partition ids to serialized aggregator data */
 49  0
   private final SendGlobalCommCache sendReducedValuesCache =
 50  
       new SendGlobalCommCache(false);
 51  
   /** How big a single aggregator request can be */
 52  
   private final int maxBytesPerAggregatorRequest;
 53  
 
 54  
   /**
 55  
    * Constructor.
 56  
    *
 57  
    * @param progressable  Progressable used to report progress
 58  
    * @param configuration Configuration
 59  
    * @param serviceWorker Service worker
 60  
    */
 61  
   public NettyWorkerAggregatorRequestProcessor(
 62  
       Progressable progressable,
 63  
       ImmutableClassesGiraphConfiguration<?, ?, ?> configuration,
 64  0
       CentralizedServiceWorker<?, ?, ?> serviceWorker) {
 65  0
     this.serviceWorker = serviceWorker;
 66  0
     this.workerClient = serviceWorker.getWorkerClient();
 67  0
     this.progressable = progressable;
 68  0
     maxBytesPerAggregatorRequest = configuration.getInt(
 69  
         AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
 70  
         AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
 71  
 
 72  0
   }
 73  
 
 74  
   @Override
 75  
   public boolean sendReducedValue(String name,
 76  
       Writable reducedValue) throws IOException {
 77  0
     WorkerInfo owner =
 78  0
         AggregatorUtils.getOwner(name,
 79  0
             serviceWorker.getWorkerInfoList());
 80  0
     if (isThisWorker(owner)) {
 81  0
       return false;
 82  
     } else {
 83  0
       int currentSize = sendReducedValuesCache.addValue(owner.getTaskId(),
 84  
           name, GlobalCommType.REDUCED_VALUE, reducedValue);
 85  0
       if (currentSize >= maxBytesPerAggregatorRequest) {
 86  0
         flushAggregatorsToWorker(owner);
 87  
       }
 88  0
       return true;
 89  
     }
 90  
   }
 91  
 
 92  
   @Override
 93  
   public void flush() throws IOException {
 94  0
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
 95  0
       if (!isThisWorker(workerInfo)) {
 96  0
         sendReducedValuesCache.addSpecialCount(workerInfo.getTaskId());
 97  0
         flushAggregatorsToWorker(workerInfo);
 98  0
         progressable.progress();
 99  
       }
 100  0
     }
 101  0
     sendReducedValuesCache.reset();
 102  0
   }
 103  
 
 104  
   /**
 105  
    * Send aggregators from cache to worker.
 106  
    *
 107  
    * @param worker Worker which we want to send aggregators to
 108  
    */
 109  
   private void flushAggregatorsToWorker(WorkerInfo worker) {
 110  0
     byte[] data =
 111  0
         sendReducedValuesCache.removeSerialized(worker.getTaskId());
 112  0
     workerClient.sendWritableRequest(worker.getTaskId(),
 113  
         new SendWorkerAggregatorsRequest(data,
 114  0
             serviceWorker.getWorkerInfo().getTaskId()));
 115  0
   }
 116  
 
 117  
   @Override
 118  
   public void sendReducedValuesToMaster(byte[] data) throws IOException {
 119  0
     workerClient.sendWritableRequest(serviceWorker.getMasterInfo().getTaskId(),
 120  
         new SendReducedToMasterRequest(data));
 121  0
   }
 122  
 
 123  
   @Override
 124  
   public void distributeReducedValues(
 125  
       Iterable<byte[]> aggregatorDataList) throws IOException {
 126  0
     for (byte[] aggregatorData : aggregatorDataList) {
 127  0
       for (WorkerInfo worker : serviceWorker.getWorkerInfoList()) {
 128  0
         if (!isThisWorker(worker)) {
 129  0
           SendAggregatorsToWorkerRequest request =
 130  
               new SendAggregatorsToWorkerRequest(aggregatorData,
 131  0
                   serviceWorker.getWorkerInfo().getTaskId());
 132  0
           workerClient.sendWritableRequest(worker.getTaskId(), request);
 133  
         }
 134  0
         progressable.progress();
 135  0
       }
 136  0
     }
 137  0
   }
 138  
 
 139  
   /**
 140  
    * Check if workerInfo describes current worker.
 141  
    *
 142  
    * @param workerInfo Worker to check
 143  
    * @return True iff workerInfo corresponds to current worker.
 144  
    */
 145  
   private boolean isThisWorker(WorkerInfo workerInfo) {
 146  0
     return serviceWorker.getWorkerInfo().getTaskId() == workerInfo.getTaskId();
 147  
   }
 148  
 }