Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
AbstractComputation |
|
| 1.05;1.05 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.graph; | |
20 | ||
21 | import java.io.IOException; | |
22 | import java.util.Iterator; | |
23 | ||
24 | import org.apache.giraph.bsp.CentralizedServiceWorker; | |
25 | import org.apache.giraph.comm.WorkerClientRequestProcessor; | |
26 | import org.apache.giraph.edge.Edge; | |
27 | import org.apache.giraph.edge.OutEdges; | |
28 | import org.apache.giraph.worker.AllWorkersInfo; | |
29 | import org.apache.giraph.worker.WorkerAggregatorDelegator; | |
30 | import org.apache.giraph.worker.WorkerContext; | |
31 | import org.apache.giraph.worker.WorkerGlobalCommUsage; | |
32 | import org.apache.hadoop.io.Writable; | |
33 | import org.apache.hadoop.io.WritableComparable; | |
34 | import org.apache.hadoop.mapreduce.Mapper; | |
35 | ||
36 | /** | |
37 | * See {@link Computation} for explanation of the interface. | |
38 | * | |
39 | * This is a abstract class helper for users to implement their computations. | |
40 | * It implements all of the methods required by the {@link Computation} | |
41 | * interface except for the {@link #compute(Vertex, Iterable)} which we leave | |
42 | * to the user to define. | |
43 | * | |
44 | * In most cases users should inherit from this class when implementing their | |
45 | * algorithms with Giraph. | |
46 | * | |
47 | * @param <I> Vertex id | |
48 | * @param <V> Vertex data | |
49 | * @param <E> Edge data | |
50 | * @param <M1> Incoming message type | |
51 | * @param <M2> Outgoing message type | |
52 | */ | |
53 | 0 | public abstract class AbstractComputation<I extends WritableComparable, |
54 | V extends Writable, E extends Writable, M1 extends Writable, | |
55 | M2 extends Writable> | |
56 | extends WorkerAggregatorDelegator<I, V, E> | |
57 | implements Computation<I, V, E, M1, M2> { | |
58 | /** Global graph state **/ | |
59 | private GraphState graphState; | |
60 | /** Handles requests */ | |
61 | private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor; | |
62 | /** Service worker */ | |
63 | private CentralizedServiceWorker<I, V, E> serviceWorker; | |
64 | /** Worker context */ | |
65 | private WorkerContext workerContext; | |
66 | /** All workers info */ | |
67 | private AllWorkersInfo allWorkersInfo; | |
68 | ||
69 | /** | |
70 | * Must be defined by user to do computation on a single Vertex. | |
71 | * | |
72 | * @param vertex Vertex | |
73 | * @param messages Messages that were sent to this vertex in the previous | |
74 | * superstep. Each message is only guaranteed to have | |
75 | * a life expectancy as long as next() is not called. | |
76 | */ | |
77 | @Override | |
78 | public abstract void compute(Vertex<I, V, E> vertex, | |
79 | Iterable<M1> messages) throws IOException; | |
80 | ||
81 | /** | |
82 | * Prepare for computation. This method is executed exactly once prior to | |
83 | * {@link #compute(Vertex, Iterable)} being called for any of the vertices | |
84 | * in the partition. | |
85 | */ | |
86 | @Override | |
87 | public void preSuperstep() { | |
88 | 0 | } |
89 | ||
90 | /** | |
91 | * Finish computation. This method is executed exactly once after computation | |
92 | * for all vertices in the partition is complete. | |
93 | */ | |
94 | @Override | |
95 | public void postSuperstep() { | |
96 | 0 | } |
97 | ||
98 | /** | |
99 | * Initialize, called by infrastructure before the superstep starts. | |
100 | * Shouldn't be called by user code. | |
101 | * | |
102 | * @param graphState Graph state | |
103 | * @param workerClientRequestProcessor Processor for handling requests | |
104 | * @param serviceWorker Graph-wide BSP Mapper for this Vertex | |
105 | * @param workerGlobalCommUsage Worker global communication usage | |
106 | */ | |
107 | @Override | |
108 | public void initialize( | |
109 | GraphState graphState, | |
110 | WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor, | |
111 | CentralizedServiceWorker<I, V, E> serviceWorker, | |
112 | WorkerGlobalCommUsage workerGlobalCommUsage) { | |
113 | 0 | this.graphState = graphState; |
114 | 0 | this.workerClientRequestProcessor = workerClientRequestProcessor; |
115 | 0 | this.setWorkerGlobalCommUsage(workerGlobalCommUsage); |
116 | 0 | this.serviceWorker = serviceWorker; |
117 | 0 | if (serviceWorker != null) { |
118 | 0 | this.workerContext = serviceWorker.getWorkerContext(); |
119 | 0 | this.allWorkersInfo = new AllWorkersInfo( |
120 | 0 | serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo()); |
121 | } else { | |
122 | 0 | this.workerContext = null; |
123 | 0 | this.allWorkersInfo = null; |
124 | } | |
125 | 0 | } |
126 | ||
127 | /** | |
128 | * Retrieves the current superstep. | |
129 | * | |
130 | * @return Current superstep | |
131 | */ | |
132 | @Override | |
133 | public long getSuperstep() { | |
134 | 0 | return graphState.getSuperstep(); |
135 | } | |
136 | ||
137 | /** | |
138 | * Get the total (all workers) number of vertices that | |
139 | * existed in the previous superstep. | |
140 | * | |
141 | * @return Total number of vertices (-1 if first superstep) | |
142 | */ | |
143 | @Override | |
144 | public long getTotalNumVertices() { | |
145 | 0 | return graphState.getTotalNumVertices(); |
146 | } | |
147 | ||
148 | /** | |
149 | * Get the total (all workers) number of edges that | |
150 | * existed in the previous superstep. | |
151 | * | |
152 | * @return Total number of edges (-1 if first superstep) | |
153 | */ | |
154 | @Override | |
155 | public long getTotalNumEdges() { | |
156 | 0 | return graphState.getTotalNumEdges(); |
157 | } | |
158 | ||
159 | /** | |
160 | * Send a message to a vertex id. | |
161 | * | |
162 | * @param id Vertex id to send the message to | |
163 | * @param message Message data to send | |
164 | */ | |
165 | @Override | |
166 | public void sendMessage(I id, M2 message) { | |
167 | 0 | workerClientRequestProcessor.sendMessageRequest(id, message); |
168 | 0 | } |
169 | ||
170 | /** | |
171 | * Send a message to all edges. | |
172 | * | |
173 | * @param vertex Vertex whose edges to send the message to. | |
174 | * @param message Message sent to all edges. | |
175 | */ | |
176 | @Override | |
177 | public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) { | |
178 | 0 | workerClientRequestProcessor.sendMessageToAllRequest(vertex, message); |
179 | 0 | } |
180 | ||
181 | /** | |
182 | * Send a message to multiple target vertex ids in the iterator. | |
183 | * | |
184 | * @param vertexIdIterator An iterator to multiple target vertex ids. | |
185 | * @param message Message sent to all targets in the iterator. | |
186 | */ | |
187 | @Override | |
188 | public void sendMessageToMultipleEdges( | |
189 | Iterator<I> vertexIdIterator, M2 message) { | |
190 | 0 | workerClientRequestProcessor.sendMessageToAllRequest( |
191 | vertexIdIterator, message); | |
192 | 0 | } |
193 | ||
194 | /** | |
195 | * Sends a request to create a vertex that will be available during the | |
196 | * next superstep. | |
197 | * | |
198 | * @param id Vertex id | |
199 | * @param value Vertex value | |
200 | * @param edges Initial edges | |
201 | */ | |
202 | @Override | |
203 | public void addVertexRequest(I id, V value, | |
204 | OutEdges<I, E> edges) throws IOException { | |
205 | 0 | Vertex<I, V, E> vertex = getConf().createVertex(); |
206 | 0 | vertex.initialize(id, value, edges); |
207 | 0 | workerClientRequestProcessor.addVertexRequest(vertex); |
208 | 0 | } |
209 | ||
210 | /** | |
211 | * Sends a request to create a vertex that will be available during the | |
212 | * next superstep. | |
213 | * | |
214 | * @param id Vertex id | |
215 | * @param value Vertex value | |
216 | */ | |
217 | @Override | |
218 | public void addVertexRequest(I id, V value) throws IOException { | |
219 | 0 | addVertexRequest(id, value, getConf().createAndInitializeOutEdges()); |
220 | 0 | } |
221 | ||
222 | /** | |
223 | * Request to remove a vertex from the graph | |
224 | * (applied just prior to the next superstep). | |
225 | * | |
226 | * @param vertexId Id of the vertex to be removed. | |
227 | */ | |
228 | @Override | |
229 | public void removeVertexRequest(I vertexId) throws IOException { | |
230 | 0 | workerClientRequestProcessor.removeVertexRequest(vertexId); |
231 | 0 | } |
232 | ||
233 | /** | |
234 | * Request to add an edge of a vertex in the graph | |
235 | * (processed just prior to the next superstep) | |
236 | * | |
237 | * @param sourceVertexId Source vertex id of edge | |
238 | * @param edge Edge to add | |
239 | */ | |
240 | @Override | |
241 | public void addEdgeRequest(I sourceVertexId, | |
242 | Edge<I, E> edge) throws IOException { | |
243 | 0 | workerClientRequestProcessor.addEdgeRequest(sourceVertexId, edge); |
244 | 0 | } |
245 | ||
246 | /** | |
247 | * Request to remove all edges from a given source vertex to a given target | |
248 | * vertex (processed just prior to the next superstep). | |
249 | * | |
250 | * @param sourceVertexId Source vertex id | |
251 | * @param targetVertexId Target vertex id | |
252 | */ | |
253 | @Override | |
254 | public void removeEdgesRequest(I sourceVertexId, | |
255 | I targetVertexId) throws IOException { | |
256 | 0 | workerClientRequestProcessor.removeEdgesRequest( |
257 | sourceVertexId, targetVertexId); | |
258 | 0 | } |
259 | ||
260 | /** | |
261 | * Get the mapper context | |
262 | * | |
263 | * @return Mapper context | |
264 | */ | |
265 | @Override | |
266 | public Mapper.Context getContext() { | |
267 | 0 | return graphState.getContext(); |
268 | } | |
269 | ||
270 | /** | |
271 | * Get the worker context | |
272 | * | |
273 | * @param <W> WorkerContext class | |
274 | * @return WorkerContext context | |
275 | */ | |
276 | @SuppressWarnings("unchecked") | |
277 | @Override | |
278 | public <W extends WorkerContext> W getWorkerContext() { | |
279 | 0 | return (W) workerContext; |
280 | } | |
281 | ||
282 | @Override | |
283 | public final int getWorkerCount() { | |
284 | 0 | return allWorkersInfo.getWorkerCount(); |
285 | } | |
286 | ||
287 | @Override | |
288 | public final int getMyWorkerIndex() { | |
289 | 0 | return allWorkersInfo.getMyWorkerIndex(); |
290 | } | |
291 | ||
292 | @Override | |
293 | public final int getWorkerForVertex(I vertexId) { | |
294 | 0 | return allWorkersInfo.getWorkerIndex( |
295 | 0 | serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo()); |
296 | } | |
297 | } |