Coverage Report - org.apache.giraph.bsp.CentralizedServiceWorker
 
Classes in this File Line Coverage Branch Coverage Complexity
CentralizedServiceWorker
N/A
N/A
1
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.bsp;
 20  
 
 21  
 import java.io.IOException;
 22  
 import java.util.Collection;
 23  
 import java.util.List;
 24  
 
 25  
 import org.apache.giraph.comm.ServerData;
 26  
 import org.apache.giraph.comm.WorkerClient;
 27  
 import org.apache.giraph.comm.messages.PartitionSplitInfo;
 28  
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 29  
 import org.apache.giraph.graph.FinishedSuperstepStats;
 30  
 import org.apache.giraph.graph.GlobalStats;
 31  
 import org.apache.giraph.graph.GraphTaskManager;
 32  
 import org.apache.giraph.graph.VertexEdgeCount;
 33  
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
 34  
 import org.apache.giraph.metrics.GiraphTimerContext;
 35  
 import org.apache.giraph.partition.PartitionOwner;
 36  
 import org.apache.giraph.partition.PartitionStats;
 37  
 import org.apache.giraph.partition.PartitionStore;
 38  
 import org.apache.giraph.worker.WorkerAggregatorHandler;
 39  
 import org.apache.giraph.worker.WorkerContext;
 40  
 import org.apache.giraph.worker.WorkerInfo;
 41  
 import org.apache.giraph.worker.WorkerInputSplitsHandler;
 42  
 import org.apache.giraph.worker.WorkerObserver;
 43  
 import org.apache.hadoop.io.Writable;
 44  
 import org.apache.hadoop.io.WritableComparable;
 45  
 
 46  
 /**
 47  
  * All workers should have access to this centralized service to
 48  
  * execute the following methods.
 49  
  *
 50  
  * @param <I> Vertex id
 51  
  * @param <V> Vertex value
 52  
  * @param <E> Edge value
 53  
  */
 54  
 @SuppressWarnings("rawtypes")
 55  
 public interface CentralizedServiceWorker<I extends WritableComparable,
 56  
   V extends Writable, E extends Writable>
 57  
   extends CentralizedService<I, V, E>, PartitionSplitInfo<I> {
 58  
   /**
 59  
    * Setup (must be called prior to any other function)
 60  
    *
 61  
    * @return Finished superstep stats for the input superstep
 62  
    */
 63  
   FinishedSuperstepStats setup();
 64  
 
 65  
   /**
 66  
    * Get the worker information
 67  
    *
 68  
    * @return Worker information
 69  
    */
 70  
   WorkerInfo getWorkerInfo();
 71  
 
 72  
   /**
 73  
    * Get the worker client (for instantiating WorkerClientRequestProcessor
 74  
    * instances.
 75  
    *
 76  
    * @return Worker client
 77  
    */
 78  
   WorkerClient<I, V, E> getWorkerClient();
 79  
 
 80  
   /**
 81  
    * Get the worker context.
 82  
    *
 83  
    * @return worker's WorkerContext
 84  
    */
 85  
   WorkerContext getWorkerContext();
 86  
 
 87  
   /**
 88  
    * Get the observers for this Worker.
 89  
    *
 90  
    * @return array of WorkerObservers.
 91  
    */
 92  
   WorkerObserver[] getWorkerObservers();
 93  
 
 94  
   /**
 95  
    * Get the partition store for this worker.
 96  
    * The partitions contain the vertices for
 97  
    * this worker and can be used to run compute() for the vertices or do
 98  
    * checkpointing.
 99  
    *
 100  
    * @return The partition store for this worker.
 101  
    */
 102  
   PartitionStore<I, V, E> getPartitionStore();
 103  
 
 104  
   /**
 105  
    *  Both the vertices and the messages need to be checkpointed in order
 106  
    *  for them to be used.  This is done after all messages have been
 107  
    *  delivered, but prior to a superstep starting.
 108  
    */
 109  
   void storeCheckpoint() throws IOException;
 110  
 
 111  
   /**
 112  
    * Load the vertices, edges, messages from the beginning of a superstep.
 113  
    * Will load the vertex partitions as designated by the master and set the
 114  
    * appropriate superstep.
 115  
    *
 116  
    * @param superstep which checkpoint to use
 117  
    * @return Graph-wide vertex and edge counts
 118  
    * @throws IOException
 119  
    */
 120  
   VertexEdgeCount loadCheckpoint(long superstep) throws IOException;
 121  
 
 122  
   /**
 123  
    * Take all steps prior to actually beginning the computation of a
 124  
    * superstep.
 125  
    *
 126  
    * @return Collection of all the partition owners from the master for this
 127  
    *         superstep.
 128  
    */
 129  
   Collection<? extends PartitionOwner> startSuperstep();
 130  
 
 131  
   /**
 132  
    * Worker is done with its portion of the superstep.  Report the
 133  
    * worker level statistics after the computation.
 134  
    *
 135  
    * @param partitionStatsList All the partition stats for this worker
 136  
    * @param superstepTimerContext superstep timer context only given when the
 137  
    *      function needs to stop the timer, otherwise null.
 138  
    * @return Stats of the superstep completion
 139  
    */
 140  
   FinishedSuperstepStats finishSuperstep(
 141  
       List<PartitionStats> partitionStatsList,
 142  
       GiraphTimerContext superstepTimerContext);
 143  
 
 144  
   /**
 145  
    * Get the partition id that a vertex id would belong to.
 146  
    *
 147  
    * @param vertexId Vertex id
 148  
    * @return Partition id
 149  
    */
 150  
   @Override
 151  
   int getPartitionId(I vertexId);
 152  
 
 153  
   /**
 154  
    * Whether a partition with given id exists on this worker.
 155  
    *
 156  
    * @param partitionId Partition id
 157  
    * @return True iff this worker has the specified partition
 158  
    */
 159  
   boolean hasPartition(Integer partitionId);
 160  
 
 161  
   /**
 162  
    * Every client will need to get a partition owner from a vertex id so that
 163  
    * they know which worker to sent the request to.
 164  
    *
 165  
    * @param vertexId Vertex index to look for
 166  
    * @return PartitionOnwer that should contain this vertex if it exists
 167  
    */
 168  
   PartitionOwner getVertexPartitionOwner(I vertexId);
 169  
 
 170  
   /**
 171  
    * Get all partition owners.
 172  
    *
 173  
    * @return Iterable through partition owners
 174  
    */
 175  
   Iterable<? extends PartitionOwner> getPartitionOwners();
 176  
 
 177  
   /**
 178  
    * If desired by the user, vertex partitions are redistributed among
 179  
    * workers according to the chosen WorkerGraphPartitioner.
 180  
    *
 181  
    * @param masterSetPartitionOwners Partition owner info passed from the
 182  
    *        master.
 183  
    */
 184  
   void exchangeVertexPartitions(
 185  
       Collection<? extends PartitionOwner> masterSetPartitionOwners);
 186  
 
 187  
   /**
 188  
    * Get the GraphTaskManager that this service is using.  Vertices need to know
 189  
    * this.
 190  
    *
 191  
    * @return the GraphTaskManager instance for this compute node
 192  
    */
 193  
   GraphTaskManager<I, V, E> getGraphTaskManager();
 194  
 
 195  
   /**
 196  
    * Operations that will be called if there is a failure by a worker.
 197  
    */
 198  
   void failureCleanup();
 199  
 
 200  
   /**
 201  
    * Get server data
 202  
    *
 203  
    * @return Server data
 204  
    */
 205  
   ServerData<I, V, E> getServerData();
 206  
 
 207  
   /**
 208  
    * Get worker aggregator handler
 209  
    *
 210  
    * @return Worker aggregator handler
 211  
    */
 212  
   WorkerAggregatorHandler getAggregatorHandler();
 213  
 
 214  
   /**
 215  
    * Final preparation for superstep, called after startSuperstep and
 216  
    * potential loading from checkpoint, right before the computation started
 217  
    * TODO how to avoid this additional function
 218  
    */
 219  
   void prepareSuperstep();
 220  
 
 221  
   /**
 222  
    * Get the superstep output class
 223  
    *
 224  
    * @return SuperstepOutput
 225  
    */
 226  
   SuperstepOutput<I, V, E> getSuperstepOutput();
 227  
 
 228  
   /**
 229  
    * Clean up the service (no calls may be issued after this)
 230  
    *
 231  
    * @param finishedSuperstepStats Finished supestep stats
 232  
    * @throws IOException
 233  
    * @throws InterruptedException
 234  
    */
 235  
   void cleanup(FinishedSuperstepStats finishedSuperstepStats)
 236  
     throws IOException, InterruptedException;
 237  
 
 238  
   /**
 239  
    * Loads Global stats from zookeeper.
 240  
    * @return global stats stored in zookeeper for
 241  
    * previous superstep.
 242  
    */
 243  
   GlobalStats getGlobalStats();
 244  
 
 245  
   /**
 246  
    * Get input splits handler used during input
 247  
    *
 248  
    * @return Input splits handler
 249  
    */
 250  
   WorkerInputSplitsHandler getInputSplitsHandler();
 251  
 
 252  
   /**
 253  
    * Received addresses and partitions assignments from master.
 254  
    *
 255  
    * @param addressesAndPartitions Addresses and partitions assignment
 256  
    */
 257  
   void addressesAndPartitionsReceived(
 258  
       AddressesAndPartitionsWritable addressesAndPartitions);
 259  
 
 260  
   /**
 261  
    * Store the counter values in the zookeeper after every superstep
 262  
    * and also after all supersteps are done. This is called before closing
 263  
    * the zookeeper. We need to call this method after calling cleanup on the
 264  
    * worker, since some counters are updated during cleanup
 265  
    * @param allSuperstepsDone boolean value whether all the supersteps
 266  
    *                          are completed
 267  
    */
 268  
   void storeCountersInZooKeeper(boolean allSuperstepsDone);
 269  
 
 270  
   /**
 271  
    * Close zookeeper
 272  
    */
 273  
   void closeZooKeeper();
 274  
 }