Coverage Report - org.apache.giraph.jython.JythonComputation
 
Classes in this File Line Coverage Branch Coverage Complexity
JythonComputation
0%
0/41
N/A
1
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.jython;
 19  
 
 20  
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 21  
 import org.apache.giraph.edge.Edge;
 22  
 import org.apache.giraph.edge.OutEdges;
 23  
 import org.apache.giraph.graph.GraphType;
 24  
 import org.apache.giraph.graph.Vertex;
 25  
 import org.apache.giraph.worker.WorkerContext;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 import org.apache.hadoop.mapreduce.Mapper;
 29  
 
 30  
 import java.io.IOException;
 31  
 import java.util.Iterator;
 32  
 
 33  
 /**
 34  
  * Base class for writing computations in Jython.
 35  
  *
 36  
  * Note that this class DOES NOT implement
 37  
  * {@link org.apache.giraph.graph.Computation}.
 38  
  * This is because we want to support passing in pure Jython types,
 39  
  * and implementing the {@link org.apache.giraph.graph.Computation}
 40  
  * requires passing in {@link Writable}s.
 41  
  * Calling such methods from Jython would throw errors. So, instead,
 42  
  * we have recreated the methods with the same name here. In each method
 43  
  * we check if the type is a pure Jython value, and if so wrap it in
 44  
  * the necessary
 45  
  * {@link org.apache.giraph.jython.wrappers.JythonWritableWrapper}.
 46  
  *
 47  
  * This class works together with {@link JythonGiraphComputation} which takes
 48  
  * care of the {@link org.apache.giraph.graph.Computation}
 49  
  * Giraph infrastructure side of things.
 50  
  */
 51  0
 public abstract class JythonComputation extends
 52  
     DefaultImmutableClassesGiraphConfigurable {
 53  
   /** The computation to callback to */
 54  
   private JythonGiraphComputation giraphCompute;
 55  
 
 56  
   public void setGiraphCompute(JythonGiraphComputation giraphCompute) {
 57  0
     this.giraphCompute = giraphCompute;
 58  0
   }
 59  
 
 60  
   /**
 61  
    * User's computation function
 62  
    *
 63  
    * @param vertex the Vertex to compute on
 64  
    * @param messages iterable of messages
 65  
    */
 66  
   public abstract void compute(Object vertex, Iterable messages);
 67  
 
 68  
   /**
 69  
    * Prepare for computation. This method is executed exactly once prior to
 70  
    * {@link #compute(Object, Iterable)} being called for any of the vertices
 71  
    * in the partition.
 72  
    */
 73  0
   public void preSuperstep() { }
 74  
 
 75  
   /**
 76  
    * Finish computation. This method is executed exactly once after computation
 77  
    * for all vertices in the partition is complete.
 78  
    */
 79  0
   public void postSuperstep() { }
 80  
 
 81  
   /**
 82  
    * Retrieves the current superstep.
 83  
    *
 84  
    * @return Current superstep
 85  
    */
 86  
   public long getSuperstep() {
 87  0
     return giraphCompute.getSuperstep();
 88  
   }
 89  
 
 90  
   /**
 91  
    * Get the total (all workers) number of vertices that
 92  
    * existed in the previous superstep.
 93  
    *
 94  
    * @return Total number of vertices (-1 if first superstep)
 95  
    */
 96  
   public long getTotalNumVertices() {
 97  0
     return giraphCompute.getTotalNumVertices();
 98  
   }
 99  
 
 100  
   /**
 101  
    * Get the total (all workers) number of edges that
 102  
    * existed in the previous superstep.
 103  
    *
 104  
    * @return Total number of edges (-1 if first superstep)
 105  
    */
 106  
   public long getTotalNumEdges() {
 107  0
     return giraphCompute.getTotalNumEdges();
 108  
   }
 109  
 
 110  
   /**
 111  
    * Send a message to a vertex id.
 112  
    *
 113  
    * @param id Vertex id to send the message to
 114  
    * @param message Message data to send
 115  
    */
 116  
   public void sendMessage(Object id, Object message) {
 117  0
     WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
 118  0
     Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
 119  
         GraphType.OUTGOING_MESSAGE_VALUE);
 120  0
     giraphCompute.sendMessage(wrappedId, wrappedMessage);
 121  0
   }
 122  
 
 123  
   /**
 124  
    * Send a message to all edges.
 125  
    *
 126  
    * @param vertex Vertex whose edges to send the message to.
 127  
    * @param message Message sent to all edges.
 128  
    */
 129  
   public void sendMessageToAllEdges(Vertex vertex, Object message) {
 130  0
     Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
 131  
         GraphType.OUTGOING_MESSAGE_VALUE);
 132  0
     giraphCompute.sendMessageToAllEdges(vertex, wrappedMessage);
 133  0
   }
 134  
 
 135  
   /**
 136  
    * Send a message to multiple target vertex ids in the iterator.
 137  
    *
 138  
    * @param vertexIdIterator An iterator to multiple target vertex ids.
 139  
    * @param message Message sent to all targets in the iterator.
 140  
    */
 141  
   public void sendMessageToMultipleEdges(Iterator vertexIdIterator,
 142  
       Object message) {
 143  0
     Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
 144  
         GraphType.OUTGOING_MESSAGE_VALUE);
 145  0
     giraphCompute.sendMessageToMultipleEdges(vertexIdIterator, wrappedMessage);
 146  0
   }
 147  
 
 148  
   /**
 149  
    * Sends a request to create a vertex that will be available during the
 150  
    * next superstep.
 151  
    *
 152  
    * @param id Vertex id
 153  
    * @param vertexValue Vertex value
 154  
    * @param edges Initial edges
 155  
    */
 156  
   public void addVertexRequest(Object id, Object vertexValue,
 157  
       OutEdges edges) throws IOException {
 158  0
     WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
 159  0
     Writable wrappedValue = giraphCompute.wrapIfNecessary(vertexValue,
 160  
         GraphType.VERTEX_VALUE);
 161  0
     giraphCompute.addVertexRequest(wrappedId, wrappedValue, edges);
 162  0
   }
 163  
 
 164  
   /**
 165  
    * Sends a request to create a vertex that will be available during the
 166  
    * next superstep.
 167  
    *
 168  
    * @param id Vertex id
 169  
    * @param vertexValue Vertex value
 170  
    */
 171  
   public void addVertexRequest(Object id, Object vertexValue)
 172  
     throws IOException {
 173  0
     WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
 174  0
     Writable wrappedVertexValue = giraphCompute.wrapIfNecessary(vertexValue,
 175  
         GraphType.VERTEX_VALUE);
 176  0
     giraphCompute.addVertexRequest(wrappedId, wrappedVertexValue);
 177  0
   }
 178  
 
 179  
   /**
 180  
    * Request to remove a vertex from the graph
 181  
    * (applied just prior to the next superstep).
 182  
    *
 183  
    * @param id Id of the vertex to be removed.
 184  
    */
 185  
   public void removeVertexRequest(Object id) throws IOException {
 186  0
     WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
 187  0
     giraphCompute.removeVertexRequest(wrappedId);
 188  0
   }
 189  
 
 190  
   /**
 191  
    * Request to add an edge of a vertex in the graph
 192  
    * (processed just prior to the next superstep)
 193  
    *
 194  
    * @param sourceVertexId Source vertex id of edge
 195  
    * @param edge Edge to add
 196  
    */
 197  
   public void addEdgeRequest(Object sourceVertexId, Edge edge)
 198  
     throws IOException {
 199  0
     WritableComparable wrappedSourceId =
 200  0
         giraphCompute.wrapIdIfNecessary(sourceVertexId);
 201  0
     giraphCompute.addEdgeRequest(wrappedSourceId, edge);
 202  0
   }
 203  
 
 204  
   /**
 205  
    * Request to remove all edges from a given source vertex to a given target
 206  
    * vertex (processed just prior to the next superstep).
 207  
    *
 208  
    * @param sourceVertexId Source vertex id
 209  
    * @param targetVertexId Target vertex id
 210  
    */
 211  
   public void removeEdgesRequest(Object sourceVertexId, Object targetVertexId)
 212  
     throws IOException {
 213  0
     WritableComparable wrappedSourceVertexId =
 214  0
         giraphCompute.wrapIdIfNecessary(sourceVertexId);
 215  0
     WritableComparable wrappedTargetVertexId =
 216  0
         giraphCompute.wrapIdIfNecessary(targetVertexId);
 217  0
     giraphCompute.removeEdgesRequest(wrappedSourceVertexId,
 218  
         wrappedTargetVertexId);
 219  0
   }
 220  
 
 221  
   /**
 222  
    * Get the mapper context
 223  
    *
 224  
    * @return Mapper context
 225  
    */
 226  
   public Mapper.Context getContext() {
 227  0
     return giraphCompute.getContext();
 228  
   }
 229  
 
 230  
   /**
 231  
    * Get the worker context
 232  
    *
 233  
    * @param <W> WorkerContext class
 234  
    * @return WorkerContext context
 235  
    */
 236  
   @SuppressWarnings("unchecked")
 237  
   public <W extends WorkerContext> W getWorkerContext() {
 238  0
     return (W) giraphCompute.getWorkerContext();
 239  
   }
 240  
 }
 241