Coverage Report - org.apache.giraph.master.MasterGlobalCommHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
MasterGlobalCommHandler
0%
0/18
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.master;
 20  
 
 21  
 import org.apache.giraph.master.input.MasterInputSplitsHandler;
 22  
 import org.apache.giraph.partition.PartitionStats;
 23  
 import org.apache.giraph.reducers.ReduceOperation;
 24  
 import org.apache.giraph.utils.BlockingElementsSet;
 25  
 import org.apache.hadoop.io.Writable;
 26  
 import org.apache.hadoop.util.Progressable;
 27  
 
 28  
 import com.google.common.collect.Iterables;
 29  
 
 30  
 import java.util.List;
 31  
 
 32  
 /**
 33  
  * Handler for all master communications
 34  
  */
 35  
 public class MasterGlobalCommHandler implements MasterGlobalCommUsage {
 36  
   /** Aggregator handler */
 37  
   private final MasterAggregatorHandler aggregatorHandler;
 38  
   /** Input splits handler*/
 39  
   private final MasterInputSplitsHandler inputSplitsHandler;
 40  
   /** Partition stats received from workers */
 41  0
   private final BlockingElementsSet<List<PartitionStats>> partitionStats =
 42  
       new BlockingElementsSet<>();
 43  
 
 44  
   /**
 45  
    * Constructor
 46  
    *
 47  
    * @param aggregatorHandler Aggregator handler
 48  
    * @param inputSplitsHandler Input splits handler
 49  
    */
 50  
   public MasterGlobalCommHandler(
 51  
       MasterAggregatorHandler aggregatorHandler,
 52  0
       MasterInputSplitsHandler inputSplitsHandler) {
 53  0
     this.aggregatorHandler = aggregatorHandler;
 54  0
     this.inputSplitsHandler = inputSplitsHandler;
 55  0
   }
 56  
 
 57  
   public MasterAggregatorHandler getAggregatorHandler() {
 58  0
     return aggregatorHandler;
 59  
   }
 60  
 
 61  
   public MasterInputSplitsHandler getInputSplitsHandler() {
 62  0
     return inputSplitsHandler;
 63  
   }
 64  
 
 65  
   @Override
 66  
   public <S, R extends Writable> void registerReducer(String name,
 67  
       ReduceOperation<S, R> reduceOp) {
 68  0
     aggregatorHandler.registerReducer(name, reduceOp);
 69  0
   }
 70  
 
 71  
   @Override
 72  
   public <S, R extends Writable> void registerReducer(String name,
 73  
       ReduceOperation<S, R> reduceOp, R globalInitialValue) {
 74  0
     aggregatorHandler.registerReducer(name, reduceOp, globalInitialValue);
 75  0
   }
 76  
 
 77  
   @Override
 78  
   public <R extends Writable> R getReduced(String name) {
 79  0
     return aggregatorHandler.getReduced(name);
 80  
   }
 81  
 
 82  
   @Override
 83  
   public void broadcast(String name, Writable value) {
 84  0
     aggregatorHandler.broadcast(name, value);
 85  0
   }
 86  
 
 87  
   /**
 88  
    * Received partition stats from a worker
 89  
    *
 90  
    * @param partitionStats Partition stats
 91  
    */
 92  
   public void receivedPartitionStats(List<PartitionStats> partitionStats) {
 93  0
     this.partitionStats.offer(partitionStats);
 94  0
   }
 95  
 
 96  
   /**
 97  
    * Get all partition stats. Blocks until all workers have sent their stats
 98  
    *
 99  
    * @param numWorkers Number of workers to wait for
 100  
    * @param progressable Progressable to report progress to
 101  
    * @return All partition stats
 102  
    */
 103  
   public Iterable<PartitionStats> getAllPartitionStats(int numWorkers,
 104  
       Progressable progressable) {
 105  0
     return Iterables.concat(
 106  0
         partitionStats.getElements(numWorkers, progressable));
 107  
   }
 108  
 }