Coverage Report - org.apache.giraph.master.MasterCompute
 
Classes in this File Line Coverage Branch Coverage Complexity
MasterCompute
0%
0/49
0%
0/4
1.143
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.master;
 20  
 
 21  
 import java.util.List;
 22  
 
 23  
 import org.apache.giraph.aggregators.Aggregator;
 24  
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 25  
 import org.apache.giraph.combiner.MessageCombiner;
 26  
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 27  
 import org.apache.giraph.conf.MessageClasses;
 28  
 import org.apache.giraph.graph.Computation;
 29  
 import org.apache.giraph.graph.GraphState;
 30  
 import org.apache.giraph.reducers.ReduceOperation;
 31  
 import org.apache.giraph.worker.WorkerInfo;
 32  
 import org.apache.hadoop.io.Writable;
 33  
 import org.apache.hadoop.io.WritableComparable;
 34  
 import org.apache.hadoop.mapreduce.Mapper;
 35  
 
 36  
 /**
 37  
  * Interface for defining a master vertex that can perform centralized
 38  
  * computation between supersteps. This class will be instantiated on the
 39  
  * master node and will run every superstep before the workers do.
 40  
  *
 41  
  * Communication with the workers should be performed via aggregators. The
 42  
  * values of the aggregators are broadcast to the workers before
 43  
  * vertex.compute() is called and collected by the master before
 44  
  * master.compute() is called. This means aggregator values used by the workers
 45  
  * are consistent with aggregator values from the master from the same
 46  
  * superstep and aggregator used by the master are consistent with aggregator
 47  
  * values from the workers from the previous superstep.
 48  
  */
 49  0
 public abstract class MasterCompute
 50  
     extends DefaultImmutableClassesGiraphConfigurable
 51  
     implements MasterAggregatorUsage, MasterGlobalCommUsage, Writable {
 52  
   /** If true, do not do anymore computation on this vertex. */
 53  0
   private boolean halt = false;
 54  
   /** Master aggregator usage */
 55  
   private CentralizedServiceMaster serviceMaster;
 56  
   /** Graph state */
 57  
   private GraphState graphState;
 58  
   /**
 59  
    * Computation and MessageCombiner classes used, which can be
 60  
    * switched by master
 61  
    */
 62  
   private SuperstepClasses superstepClasses;
 63  
 
 64  
   /**
 65  
    * Must be defined by user to specify what the master has to do.
 66  
    */
 67  
   public abstract void compute();
 68  
 
 69  
   /**
 70  
    * Initialize the MasterCompute class, this is the place to register
 71  
    * aggregators.
 72  
    *
 73  
    * @throws InstantiationException
 74  
    * @throws IllegalAccessException
 75  
    */
 76  
   public abstract void initialize() throws InstantiationException,
 77  
     IllegalAccessException;
 78  
 
 79  
   /**
 80  
    * Retrieves the current superstep.
 81  
    *
 82  
    * @return Current superstep
 83  
    */
 84  
   public final long getSuperstep() {
 85  0
     return graphState.getSuperstep();
 86  
   }
 87  
 
 88  
   /**
 89  
    * Get the total (all workers) number of vertices that
 90  
    * existed in the previous superstep.
 91  
    *
 92  
    * @return Total number of vertices (-1 if first superstep)
 93  
    */
 94  
   public final long getTotalNumVertices() {
 95  0
     return graphState.getTotalNumVertices();
 96  
   }
 97  
 
 98  
   /**
 99  
    * Get the total (all workers) number of edges that
 100  
    * existed in the previous superstep.
 101  
    *
 102  
    * @return Total number of edges (-1 if first superstep)
 103  
    */
 104  
   public final long getTotalNumEdges() {
 105  0
     return graphState.getTotalNumEdges();
 106  
   }
 107  
 
 108  
   /**
 109  
    * After this is called, the computation will stop, even if there are
 110  
    * still messages in the system or vertices that have not voted to halt.
 111  
    */
 112  
   public final void haltComputation() {
 113  0
     halt = true;
 114  0
   }
 115  
 
 116  
   /**
 117  
    * Has the master halted?
 118  
    *
 119  
    * @return True if halted, false otherwise.
 120  
    */
 121  
   public final boolean isHalted() {
 122  0
     return halt;
 123  
   }
 124  
 
 125  
   /**
 126  
    * Get the mapper context
 127  
    *
 128  
    * @return Mapper context
 129  
    */
 130  
   public final Mapper.Context getContext() {
 131  0
     return graphState.getContext();
 132  
   }
 133  
 
 134  
   /**
 135  
    * Get list of workers
 136  
    *
 137  
    * @return List of workers
 138  
    */
 139  
   public final List<WorkerInfo> getWorkerInfoList() {
 140  0
     return serviceMaster.getWorkerInfoList();
 141  
   }
 142  
 
 143  
   /**
 144  
    * Set Computation class to be used
 145  
    *
 146  
    * @param computationClass Computation class
 147  
    */
 148  
   public final void setComputation(
 149  
       Class<? extends Computation> computationClass) {
 150  0
     superstepClasses.setComputationClass(computationClass);
 151  0
   }
 152  
 
 153  
   /**
 154  
    * Get Computation class to be used
 155  
    *
 156  
    * @return Computation class
 157  
    */
 158  
   public final Class<? extends Computation> getComputation() {
 159  
     // Might be called prior to classes being set, do not return NPE
 160  0
     if (superstepClasses == null) {
 161  0
       return null;
 162  
     }
 163  
 
 164  0
     return superstepClasses.getComputationClass();
 165  
   }
 166  
 
 167  
   /**
 168  
    * Set MessageCombiner class to be used
 169  
    *
 170  
    * @param combinerClass MessageCombiner class
 171  
    */
 172  
   public final void setMessageCombiner(
 173  
       Class<? extends MessageCombiner> combinerClass) {
 174  0
     superstepClasses.setMessageCombinerClass(combinerClass);
 175  0
   }
 176  
 
 177  
   /**
 178  
    * Get MessageCombiner class to be used
 179  
    *
 180  
    * @return MessageCombiner class
 181  
    */
 182  
   public final Class<? extends MessageCombiner> getMessageCombiner() {
 183  
     // Might be called prior to classes being set, do not return NPE
 184  0
     if (superstepClasses == null) {
 185  0
       return null;
 186  
     }
 187  
 
 188  0
     return superstepClasses.getMessageCombinerClass();
 189  
   }
 190  
 
 191  
   /**
 192  
    * Set incoming message class to be used
 193  
    *
 194  
    * @param incomingMessageClass incoming message class
 195  
    */
 196  
   @Deprecated
 197  
   public final void setIncomingMessage(
 198  
       Class<? extends Writable> incomingMessageClass) {
 199  0
     superstepClasses.setIncomingMessageClass(incomingMessageClass);
 200  0
   }
 201  
 
 202  
   /**
 203  
    * Set outgoing message class to be used
 204  
    *
 205  
    * @param outgoingMessageClass outgoing message class
 206  
    */
 207  
   public final void setOutgoingMessage(
 208  
       Class<? extends Writable> outgoingMessageClass) {
 209  0
     superstepClasses.setOutgoingMessageClass(outgoingMessageClass);
 210  0
   }
 211  
 
 212  
   /**
 213  
    * Set outgoing message classes to be used
 214  
    *
 215  
    * @param outgoingMessageClasses outgoing message classes
 216  
    */
 217  
   public void setOutgoingMessageClasses(
 218  
       MessageClasses<? extends WritableComparable, ? extends Writable>
 219  
         outgoingMessageClasses) {
 220  0
     superstepClasses.setOutgoingMessageClasses(outgoingMessageClasses);
 221  0
   }
 222  
 
 223  
   @Override
 224  
   public final <S, R extends Writable> void registerReducer(
 225  
       String name, ReduceOperation<S, R> reduceOp) {
 226  0
     serviceMaster.getGlobalCommHandler().registerReducer(name, reduceOp);
 227  0
   }
 228  
 
 229  
   @Override
 230  
   public final <S, R extends Writable> void registerReducer(
 231  
       String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
 232  0
     serviceMaster.getGlobalCommHandler().registerReducer(
 233  
         name, reduceOp, globalInitialValue);
 234  0
   }
 235  
 
 236  
   @Override
 237  
   public final <T extends Writable> T getReduced(String name) {
 238  0
     return serviceMaster.getGlobalCommHandler().getReduced(name);
 239  
   }
 240  
 
 241  
   @Override
 242  
   public final void broadcast(String name, Writable object) {
 243  0
     serviceMaster.getGlobalCommHandler().broadcast(name, object);
 244  0
   }
 245  
 
 246  
   @Override
 247  
   public final <A extends Writable> boolean registerAggregator(
 248  
     String name, Class<? extends Aggregator<A>> aggregatorClass)
 249  
     throws InstantiationException, IllegalAccessException {
 250  0
     return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
 251  
         name, aggregatorClass);
 252  
   }
 253  
 
 254  
   @Override
 255  
   public final <A extends Writable> boolean registerPersistentAggregator(
 256  
       String name,
 257  
       Class<? extends Aggregator<A>> aggregatorClass) throws
 258  
       InstantiationException, IllegalAccessException {
 259  0
     return serviceMaster.getAggregatorTranslationHandler()
 260  0
         .registerPersistentAggregator(name, aggregatorClass);
 261  
   }
 262  
 
 263  
   @Override
 264  
   public final <A extends Writable> A getAggregatedValue(String name) {
 265  0
     return serviceMaster.getAggregatorTranslationHandler()
 266  0
         .<A>getAggregatedValue(name);
 267  
   }
 268  
 
 269  
   @Override
 270  
   public final <A extends Writable> void setAggregatedValue(
 271  
       String name, A value) {
 272  0
     serviceMaster.getAggregatorTranslationHandler()
 273  0
         .setAggregatedValue(name, value);
 274  0
   }
 275  
 
 276  
   /**
 277  
    * Call this to log a line to command line of the job. Use in moderation -
 278  
    * it's a synchronous call to Job client
 279  
    *
 280  
    * @param line Line to print
 281  
    */
 282  
   public void logToCommandLine(String line) {
 283  0
     serviceMaster.getJobProgressTracker().logInfo(line);
 284  0
   }
 285  
 
 286  
   public final void setGraphState(GraphState graphState) {
 287  0
     this.graphState = graphState;
 288  0
   }
 289  
 
 290  
   public final void setMasterService(CentralizedServiceMaster serviceMaster) {
 291  0
     this.serviceMaster = serviceMaster;
 292  0
   }
 293  
 
 294  
   public final void setSuperstepClasses(SuperstepClasses superstepClasses) {
 295  0
     this.superstepClasses = superstepClasses;
 296  0
   }
 297  
 }