Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
JythonComputation |
|
| 1.0;1 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | package org.apache.giraph.jython; | |
19 | ||
20 | import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; | |
21 | import org.apache.giraph.edge.Edge; | |
22 | import org.apache.giraph.edge.OutEdges; | |
23 | import org.apache.giraph.graph.GraphType; | |
24 | import org.apache.giraph.graph.Vertex; | |
25 | import org.apache.giraph.worker.WorkerContext; | |
26 | import org.apache.hadoop.io.Writable; | |
27 | import org.apache.hadoop.io.WritableComparable; | |
28 | import org.apache.hadoop.mapreduce.Mapper; | |
29 | ||
30 | import java.io.IOException; | |
31 | import java.util.Iterator; | |
32 | ||
33 | /** | |
34 | * Base class for writing computations in Jython. | |
35 | * | |
36 | * Note that this class DOES NOT implement | |
37 | * {@link org.apache.giraph.graph.Computation}. | |
38 | * This is because we want to support passing in pure Jython types, | |
39 | * and implementing the {@link org.apache.giraph.graph.Computation} | |
40 | * requires passing in {@link Writable}s. | |
41 | * Calling such methods from Jython would throw errors. So, instead, | |
42 | * we have recreated the methods with the same name here. In each method | |
43 | * we check if the type is a pure Jython value, and if so wrap it in | |
44 | * the necessary | |
45 | * {@link org.apache.giraph.jython.wrappers.JythonWritableWrapper}. | |
46 | * | |
47 | * This class works together with {@link JythonGiraphComputation} which takes | |
48 | * care of the {@link org.apache.giraph.graph.Computation} | |
49 | * Giraph infrastructure side of things. | |
50 | */ | |
51 | 0 | public abstract class JythonComputation extends |
52 | DefaultImmutableClassesGiraphConfigurable { | |
53 | /** The computation to callback to */ | |
54 | private JythonGiraphComputation giraphCompute; | |
55 | ||
56 | public void setGiraphCompute(JythonGiraphComputation giraphCompute) { | |
57 | 0 | this.giraphCompute = giraphCompute; |
58 | 0 | } |
59 | ||
60 | /** | |
61 | * User's computation function | |
62 | * | |
63 | * @param vertex the Vertex to compute on | |
64 | * @param messages iterable of messages | |
65 | */ | |
66 | public abstract void compute(Object vertex, Iterable messages); | |
67 | ||
68 | /** | |
69 | * Prepare for computation. This method is executed exactly once prior to | |
70 | * {@link #compute(Object, Iterable)} being called for any of the vertices | |
71 | * in the partition. | |
72 | */ | |
73 | 0 | public void preSuperstep() { } |
74 | ||
75 | /** | |
76 | * Finish computation. This method is executed exactly once after computation | |
77 | * for all vertices in the partition is complete. | |
78 | */ | |
79 | 0 | public void postSuperstep() { } |
80 | ||
81 | /** | |
82 | * Retrieves the current superstep. | |
83 | * | |
84 | * @return Current superstep | |
85 | */ | |
86 | public long getSuperstep() { | |
87 | 0 | return giraphCompute.getSuperstep(); |
88 | } | |
89 | ||
90 | /** | |
91 | * Get the total (all workers) number of vertices that | |
92 | * existed in the previous superstep. | |
93 | * | |
94 | * @return Total number of vertices (-1 if first superstep) | |
95 | */ | |
96 | public long getTotalNumVertices() { | |
97 | 0 | return giraphCompute.getTotalNumVertices(); |
98 | } | |
99 | ||
100 | /** | |
101 | * Get the total (all workers) number of edges that | |
102 | * existed in the previous superstep. | |
103 | * | |
104 | * @return Total number of edges (-1 if first superstep) | |
105 | */ | |
106 | public long getTotalNumEdges() { | |
107 | 0 | return giraphCompute.getTotalNumEdges(); |
108 | } | |
109 | ||
110 | /** | |
111 | * Send a message to a vertex id. | |
112 | * | |
113 | * @param id Vertex id to send the message to | |
114 | * @param message Message data to send | |
115 | */ | |
116 | public void sendMessage(Object id, Object message) { | |
117 | 0 | WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id); |
118 | 0 | Writable wrappedMessage = giraphCompute.wrapIfNecessary(message, |
119 | GraphType.OUTGOING_MESSAGE_VALUE); | |
120 | 0 | giraphCompute.sendMessage(wrappedId, wrappedMessage); |
121 | 0 | } |
122 | ||
123 | /** | |
124 | * Send a message to all edges. | |
125 | * | |
126 | * @param vertex Vertex whose edges to send the message to. | |
127 | * @param message Message sent to all edges. | |
128 | */ | |
129 | public void sendMessageToAllEdges(Vertex vertex, Object message) { | |
130 | 0 | Writable wrappedMessage = giraphCompute.wrapIfNecessary(message, |
131 | GraphType.OUTGOING_MESSAGE_VALUE); | |
132 | 0 | giraphCompute.sendMessageToAllEdges(vertex, wrappedMessage); |
133 | 0 | } |
134 | ||
135 | /** | |
136 | * Send a message to multiple target vertex ids in the iterator. | |
137 | * | |
138 | * @param vertexIdIterator An iterator to multiple target vertex ids. | |
139 | * @param message Message sent to all targets in the iterator. | |
140 | */ | |
141 | public void sendMessageToMultipleEdges(Iterator vertexIdIterator, | |
142 | Object message) { | |
143 | 0 | Writable wrappedMessage = giraphCompute.wrapIfNecessary(message, |
144 | GraphType.OUTGOING_MESSAGE_VALUE); | |
145 | 0 | giraphCompute.sendMessageToMultipleEdges(vertexIdIterator, wrappedMessage); |
146 | 0 | } |
147 | ||
148 | /** | |
149 | * Sends a request to create a vertex that will be available during the | |
150 | * next superstep. | |
151 | * | |
152 | * @param id Vertex id | |
153 | * @param vertexValue Vertex value | |
154 | * @param edges Initial edges | |
155 | */ | |
156 | public void addVertexRequest(Object id, Object vertexValue, | |
157 | OutEdges edges) throws IOException { | |
158 | 0 | WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id); |
159 | 0 | Writable wrappedValue = giraphCompute.wrapIfNecessary(vertexValue, |
160 | GraphType.VERTEX_VALUE); | |
161 | 0 | giraphCompute.addVertexRequest(wrappedId, wrappedValue, edges); |
162 | 0 | } |
163 | ||
164 | /** | |
165 | * Sends a request to create a vertex that will be available during the | |
166 | * next superstep. | |
167 | * | |
168 | * @param id Vertex id | |
169 | * @param vertexValue Vertex value | |
170 | */ | |
171 | public void addVertexRequest(Object id, Object vertexValue) | |
172 | throws IOException { | |
173 | 0 | WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id); |
174 | 0 | Writable wrappedVertexValue = giraphCompute.wrapIfNecessary(vertexValue, |
175 | GraphType.VERTEX_VALUE); | |
176 | 0 | giraphCompute.addVertexRequest(wrappedId, wrappedVertexValue); |
177 | 0 | } |
178 | ||
179 | /** | |
180 | * Request to remove a vertex from the graph | |
181 | * (applied just prior to the next superstep). | |
182 | * | |
183 | * @param id Id of the vertex to be removed. | |
184 | */ | |
185 | public void removeVertexRequest(Object id) throws IOException { | |
186 | 0 | WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id); |
187 | 0 | giraphCompute.removeVertexRequest(wrappedId); |
188 | 0 | } |
189 | ||
190 | /** | |
191 | * Request to add an edge of a vertex in the graph | |
192 | * (processed just prior to the next superstep) | |
193 | * | |
194 | * @param sourceVertexId Source vertex id of edge | |
195 | * @param edge Edge to add | |
196 | */ | |
197 | public void addEdgeRequest(Object sourceVertexId, Edge edge) | |
198 | throws IOException { | |
199 | 0 | WritableComparable wrappedSourceId = |
200 | 0 | giraphCompute.wrapIdIfNecessary(sourceVertexId); |
201 | 0 | giraphCompute.addEdgeRequest(wrappedSourceId, edge); |
202 | 0 | } |
203 | ||
204 | /** | |
205 | * Request to remove all edges from a given source vertex to a given target | |
206 | * vertex (processed just prior to the next superstep). | |
207 | * | |
208 | * @param sourceVertexId Source vertex id | |
209 | * @param targetVertexId Target vertex id | |
210 | */ | |
211 | public void removeEdgesRequest(Object sourceVertexId, Object targetVertexId) | |
212 | throws IOException { | |
213 | 0 | WritableComparable wrappedSourceVertexId = |
214 | 0 | giraphCompute.wrapIdIfNecessary(sourceVertexId); |
215 | 0 | WritableComparable wrappedTargetVertexId = |
216 | 0 | giraphCompute.wrapIdIfNecessary(targetVertexId); |
217 | 0 | giraphCompute.removeEdgesRequest(wrappedSourceVertexId, |
218 | wrappedTargetVertexId); | |
219 | 0 | } |
220 | ||
221 | /** | |
222 | * Get the mapper context | |
223 | * | |
224 | * @return Mapper context | |
225 | */ | |
226 | public Mapper.Context getContext() { | |
227 | 0 | return giraphCompute.getContext(); |
228 | } | |
229 | ||
230 | /** | |
231 | * Get the worker context | |
232 | * | |
233 | * @param <W> WorkerContext class | |
234 | * @return WorkerContext context | |
235 | */ | |
236 | @SuppressWarnings("unchecked") | |
237 | public <W extends WorkerContext> W getWorkerContext() { | |
238 | 0 | return (W) giraphCompute.getWorkerContext(); |
239 | } | |
240 | } | |
241 |