Coverage Report - org.apache.giraph.graph.AbstractComputation
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractComputation
0%
0/41
0%
0/2
1.05
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.graph;
 20  
 
 21  
 import java.io.IOException;
 22  
 import java.util.Iterator;
 23  
 
 24  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 25  
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 26  
 import org.apache.giraph.edge.Edge;
 27  
 import org.apache.giraph.edge.OutEdges;
 28  
 import org.apache.giraph.worker.AllWorkersInfo;
 29  
 import org.apache.giraph.worker.WorkerAggregatorDelegator;
 30  
 import org.apache.giraph.worker.WorkerContext;
 31  
 import org.apache.giraph.worker.WorkerGlobalCommUsage;
 32  
 import org.apache.hadoop.io.Writable;
 33  
 import org.apache.hadoop.io.WritableComparable;
 34  
 import org.apache.hadoop.mapreduce.Mapper;
 35  
 
 36  
 /**
 37  
  * See {@link Computation} for explanation of the interface.
 38  
  *
 39  
  * This is a abstract class helper for users to implement their computations.
 40  
  * It implements all of the methods required by the {@link Computation}
 41  
  * interface except for the {@link #compute(Vertex, Iterable)} which we leave
 42  
  * to the user to define.
 43  
  *
 44  
  * In most cases users should inherit from this class when implementing their
 45  
  * algorithms with Giraph.
 46  
  *
 47  
  * @param <I> Vertex id
 48  
  * @param <V> Vertex data
 49  
  * @param <E> Edge data
 50  
  * @param <M1> Incoming message type
 51  
  * @param <M2> Outgoing message type
 52  
  */
 53  0
 public abstract class AbstractComputation<I extends WritableComparable,
 54  
     V extends Writable, E extends Writable, M1 extends Writable,
 55  
     M2 extends Writable>
 56  
     extends WorkerAggregatorDelegator<I, V, E>
 57  
     implements Computation<I, V, E, M1, M2> {
 58  
   /** Global graph state **/
 59  
   private GraphState graphState;
 60  
   /** Handles requests */
 61  
   private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
 62  
   /** Service worker */
 63  
   private CentralizedServiceWorker<I, V, E> serviceWorker;
 64  
   /** Worker context */
 65  
   private WorkerContext workerContext;
 66  
   /** All workers info */
 67  
   private AllWorkersInfo allWorkersInfo;
 68  
 
 69  
   /**
 70  
    * Must be defined by user to do computation on a single Vertex.
 71  
    *
 72  
    * @param vertex   Vertex
 73  
    * @param messages Messages that were sent to this vertex in the previous
 74  
    *                 superstep.  Each message is only guaranteed to have
 75  
    *                 a life expectancy as long as next() is not called.
 76  
    */
 77  
   @Override
 78  
   public abstract void compute(Vertex<I, V, E> vertex,
 79  
       Iterable<M1> messages) throws IOException;
 80  
 
 81  
   /**
 82  
    * Prepare for computation. This method is executed exactly once prior to
 83  
    * {@link #compute(Vertex, Iterable)} being called for any of the vertices
 84  
    * in the partition.
 85  
    */
 86  
   @Override
 87  
   public void preSuperstep() {
 88  0
   }
 89  
 
 90  
   /**
 91  
    * Finish computation. This method is executed exactly once after computation
 92  
    * for all vertices in the partition is complete.
 93  
    */
 94  
   @Override
 95  
   public void postSuperstep() {
 96  0
   }
 97  
 
 98  
   /**
 99  
    * Initialize, called by infrastructure before the superstep starts.
 100  
    * Shouldn't be called by user code.
 101  
    *
 102  
    * @param graphState Graph state
 103  
    * @param workerClientRequestProcessor Processor for handling requests
 104  
    * @param serviceWorker Graph-wide BSP Mapper for this Vertex
 105  
    * @param workerGlobalCommUsage Worker global communication usage
 106  
    */
 107  
   @Override
 108  
   public void initialize(
 109  
       GraphState graphState,
 110  
       WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
 111  
       CentralizedServiceWorker<I, V, E> serviceWorker,
 112  
       WorkerGlobalCommUsage workerGlobalCommUsage) {
 113  0
     this.graphState = graphState;
 114  0
     this.workerClientRequestProcessor = workerClientRequestProcessor;
 115  0
     this.setWorkerGlobalCommUsage(workerGlobalCommUsage);
 116  0
     this.serviceWorker = serviceWorker;
 117  0
     if (serviceWorker != null) {
 118  0
       this.workerContext = serviceWorker.getWorkerContext();
 119  0
       this.allWorkersInfo = new AllWorkersInfo(
 120  0
           serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo());
 121  
     } else {
 122  0
       this.workerContext = null;
 123  0
       this.allWorkersInfo = null;
 124  
     }
 125  0
   }
 126  
 
 127  
   /**
 128  
    * Retrieves the current superstep.
 129  
    *
 130  
    * @return Current superstep
 131  
    */
 132  
   @Override
 133  
   public long getSuperstep() {
 134  0
     return graphState.getSuperstep();
 135  
   }
 136  
 
 137  
   /**
 138  
    * Get the total (all workers) number of vertices that
 139  
    * existed in the previous superstep.
 140  
    *
 141  
    * @return Total number of vertices (-1 if first superstep)
 142  
    */
 143  
   @Override
 144  
   public long getTotalNumVertices() {
 145  0
     return graphState.getTotalNumVertices();
 146  
   }
 147  
 
 148  
   /**
 149  
    * Get the total (all workers) number of edges that
 150  
    * existed in the previous superstep.
 151  
    *
 152  
    * @return Total number of edges (-1 if first superstep)
 153  
    */
 154  
   @Override
 155  
   public long getTotalNumEdges() {
 156  0
     return graphState.getTotalNumEdges();
 157  
   }
 158  
 
 159  
   /**
 160  
    * Send a message to a vertex id.
 161  
    *
 162  
    * @param id Vertex id to send the message to
 163  
    * @param message Message data to send
 164  
    */
 165  
   @Override
 166  
   public void sendMessage(I id, M2 message) {
 167  0
     workerClientRequestProcessor.sendMessageRequest(id, message);
 168  0
   }
 169  
 
 170  
   /**
 171  
    * Send a message to all edges.
 172  
    *
 173  
    * @param vertex Vertex whose edges to send the message to.
 174  
    * @param message Message sent to all edges.
 175  
    */
 176  
   @Override
 177  
   public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
 178  0
     workerClientRequestProcessor.sendMessageToAllRequest(vertex, message);
 179  0
   }
 180  
 
 181  
   /**
 182  
    * Send a message to multiple target vertex ids in the iterator.
 183  
    *
 184  
    * @param vertexIdIterator An iterator to multiple target vertex ids.
 185  
    * @param message Message sent to all targets in the iterator.
 186  
    */
 187  
   @Override
 188  
   public void sendMessageToMultipleEdges(
 189  
       Iterator<I> vertexIdIterator, M2 message) {
 190  0
     workerClientRequestProcessor.sendMessageToAllRequest(
 191  
         vertexIdIterator, message);
 192  0
   }
 193  
 
 194  
   /**
 195  
    * Sends a request to create a vertex that will be available during the
 196  
    * next superstep.
 197  
    *
 198  
    * @param id Vertex id
 199  
    * @param value Vertex value
 200  
    * @param edges Initial edges
 201  
    */
 202  
   @Override
 203  
   public void addVertexRequest(I id, V value,
 204  
       OutEdges<I, E> edges) throws IOException {
 205  0
     Vertex<I, V, E> vertex = getConf().createVertex();
 206  0
     vertex.initialize(id, value, edges);
 207  0
     workerClientRequestProcessor.addVertexRequest(vertex);
 208  0
   }
 209  
 
 210  
   /**
 211  
    * Sends a request to create a vertex that will be available during the
 212  
    * next superstep.
 213  
    *
 214  
    * @param id Vertex id
 215  
    * @param value Vertex value
 216  
    */
 217  
   @Override
 218  
   public void addVertexRequest(I id, V value) throws IOException {
 219  0
     addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
 220  0
   }
 221  
 
 222  
   /**
 223  
    * Request to remove a vertex from the graph
 224  
    * (applied just prior to the next superstep).
 225  
    *
 226  
    * @param vertexId Id of the vertex to be removed.
 227  
    */
 228  
   @Override
 229  
   public void removeVertexRequest(I vertexId) throws IOException {
 230  0
     workerClientRequestProcessor.removeVertexRequest(vertexId);
 231  0
   }
 232  
 
 233  
   /**
 234  
    * Request to add an edge of a vertex in the graph
 235  
    * (processed just prior to the next superstep)
 236  
    *
 237  
    * @param sourceVertexId Source vertex id of edge
 238  
    * @param edge Edge to add
 239  
    */
 240  
   @Override
 241  
   public void addEdgeRequest(I sourceVertexId,
 242  
       Edge<I, E> edge) throws IOException {
 243  0
     workerClientRequestProcessor.addEdgeRequest(sourceVertexId, edge);
 244  0
   }
 245  
 
 246  
   /**
 247  
    * Request to remove all edges from a given source vertex to a given target
 248  
    * vertex (processed just prior to the next superstep).
 249  
    *
 250  
    * @param sourceVertexId Source vertex id
 251  
    * @param targetVertexId Target vertex id
 252  
    */
 253  
   @Override
 254  
   public void removeEdgesRequest(I sourceVertexId,
 255  
       I targetVertexId) throws IOException {
 256  0
     workerClientRequestProcessor.removeEdgesRequest(
 257  
         sourceVertexId, targetVertexId);
 258  0
   }
 259  
 
 260  
   /**
 261  
    * Get the mapper context
 262  
    *
 263  
    * @return Mapper context
 264  
    */
 265  
   @Override
 266  
   public Mapper.Context getContext() {
 267  0
     return graphState.getContext();
 268  
   }
 269  
 
 270  
   /**
 271  
    * Get the worker context
 272  
    *
 273  
    * @param <W> WorkerContext class
 274  
    * @return WorkerContext context
 275  
    */
 276  
   @SuppressWarnings("unchecked")
 277  
   @Override
 278  
   public <W extends WorkerContext> W getWorkerContext() {
 279  0
     return (W) workerContext;
 280  
   }
 281  
 
 282  
   @Override
 283  
   public final int getWorkerCount() {
 284  0
     return allWorkersInfo.getWorkerCount();
 285  
   }
 286  
 
 287  
   @Override
 288  
   public final int getMyWorkerIndex() {
 289  0
     return allWorkersInfo.getMyWorkerIndex();
 290  
   }
 291  
 
 292  
   @Override
 293  
   public final int getWorkerForVertex(I vertexId) {
 294  0
     return allWorkersInfo.getWorkerIndex(
 295  0
         serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
 296  
   }
 297  
 }