1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.graph;
20
21 import java.io.IOException;
22 import java.util.Iterator;
23
24 import org.apache.giraph.bsp.CentralizedServiceWorker;
25 import org.apache.giraph.comm.WorkerClientRequestProcessor;
26 import org.apache.giraph.edge.Edge;
27 import org.apache.giraph.edge.OutEdges;
28 import org.apache.giraph.worker.AllWorkersInfo;
29 import org.apache.giraph.worker.WorkerAggregatorDelegator;
30 import org.apache.giraph.worker.WorkerContext;
31 import org.apache.giraph.worker.WorkerGlobalCommUsage;
32 import org.apache.hadoop.io.Writable;
33 import org.apache.hadoop.io.WritableComparable;
34 import org.apache.hadoop.mapreduce.Mapper;
35
36 /**
37 * See {@link Computation} for explanation of the interface.
38 *
39 * This is a abstract class helper for users to implement their computations.
40 * It implements all of the methods required by the {@link Computation}
41 * interface except for the {@link #compute(Vertex, Iterable)} which we leave
42 * to the user to define.
43 *
44 * In most cases users should inherit from this class when implementing their
45 * algorithms with Giraph.
46 *
47 * @param <I> Vertex id
48 * @param <V> Vertex data
49 * @param <E> Edge data
50 * @param <M1> Incoming message type
51 * @param <M2> Outgoing message type
52 */
53 public abstract class AbstractComputation<I extends WritableComparable,
54 V extends Writable, E extends Writable, M1 extends Writable,
55 M2 extends Writable>
56 extends WorkerAggregatorDelegator<I, V, E>
57 implements Computation<I, V, E, M1, M2> {
58 /** Global graph state **/
59 private GraphState graphState;
60 /** Handles requests */
61 private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
62 /** Service worker */
63 private CentralizedServiceWorker<I, V, E> serviceWorker;
64 /** Worker context */
65 private WorkerContext workerContext;
66 /** All workers info */
67 private AllWorkersInfo allWorkersInfo;
68
69 /**
70 * Must be defined by user to do computation on a single Vertex.
71 *
72 * @param vertex Vertex
73 * @param messages Messages that were sent to this vertex in the previous
74 * superstep. Each message is only guaranteed to have
75 * a life expectancy as long as next() is not called.
76 */
77 @Override
78 public abstract void compute(Vertex<I, V, E> vertex,
79 Iterable<M1> messages) throws IOException;
80
81 /**
82 * Prepare for computation. This method is executed exactly once prior to
83 * {@link #compute(Vertex, Iterable)} being called for any of the vertices
84 * in the partition.
85 */
86 @Override
87 public void preSuperstep() {
88 }
89
90 /**
91 * Finish computation. This method is executed exactly once after computation
92 * for all vertices in the partition is complete.
93 */
94 @Override
95 public void postSuperstep() {
96 }
97
98 /**
99 * Initialize, called by infrastructure before the superstep starts.
100 * Shouldn't be called by user code.
101 *
102 * @param graphState Graph state
103 * @param workerClientRequestProcessor Processor for handling requests
104 * @param serviceWorker Graph-wide BSP Mapper for this Vertex
105 * @param workerGlobalCommUsage Worker global communication usage
106 */
107 @Override
108 public void initialize(
109 GraphState graphState,
110 WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
111 CentralizedServiceWorker<I, V, E> serviceWorker,
112 WorkerGlobalCommUsage workerGlobalCommUsage) {
113 this.graphState = graphState;
114 this.workerClientRequestProcessor = workerClientRequestProcessor;
115 this.setWorkerGlobalCommUsage(workerGlobalCommUsage);
116 this.serviceWorker = serviceWorker;
117 if (serviceWorker != null) {
118 this.workerContext = serviceWorker.getWorkerContext();
119 this.allWorkersInfo = new AllWorkersInfo(
120 serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo());
121 } else {
122 this.workerContext = null;
123 this.allWorkersInfo = null;
124 }
125 }
126
127 /**
128 * Retrieves the current superstep.
129 *
130 * @return Current superstep
131 */
132 @Override
133 public long getSuperstep() {
134 return graphState.getSuperstep();
135 }
136
137 /**
138 * Get the total (all workers) number of vertices that
139 * existed in the previous superstep.
140 *
141 * @return Total number of vertices (-1 if first superstep)
142 */
143 @Override
144 public long getTotalNumVertices() {
145 return graphState.getTotalNumVertices();
146 }
147
148 /**
149 * Get the total (all workers) number of edges that
150 * existed in the previous superstep.
151 *
152 * @return Total number of edges (-1 if first superstep)
153 */
154 @Override
155 public long getTotalNumEdges() {
156 return graphState.getTotalNumEdges();
157 }
158
159 /**
160 * Send a message to a vertex id.
161 *
162 * @param id Vertex id to send the message to
163 * @param message Message data to send
164 */
165 @Override
166 public void sendMessage(I id, M2 message) {
167 workerClientRequestProcessor.sendMessageRequest(id, message);
168 }
169
170 /**
171 * Send a message to all edges.
172 *
173 * @param vertex Vertex whose edges to send the message to.
174 * @param message Message sent to all edges.
175 */
176 @Override
177 public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
178 workerClientRequestProcessor.sendMessageToAllRequest(vertex, message);
179 }
180
181 /**
182 * Send a message to multiple target vertex ids in the iterator.
183 *
184 * @param vertexIdIterator An iterator to multiple target vertex ids.
185 * @param message Message sent to all targets in the iterator.
186 */
187 @Override
188 public void sendMessageToMultipleEdges(
189 Iterator<I> vertexIdIterator, M2 message) {
190 workerClientRequestProcessor.sendMessageToAllRequest(
191 vertexIdIterator, message);
192 }
193
194 /**
195 * Sends a request to create a vertex that will be available during the
196 * next superstep.
197 *
198 * @param id Vertex id
199 * @param value Vertex value
200 * @param edges Initial edges
201 */
202 @Override
203 public void addVertexRequest(I id, V value,
204 OutEdges<I, E> edges) throws IOException {
205 Vertex<I, V, E> vertex = getConf().createVertex();
206 vertex.initialize(id, value, edges);
207 workerClientRequestProcessor.addVertexRequest(vertex);
208 }
209
210 /**
211 * Sends a request to create a vertex that will be available during the
212 * next superstep.
213 *
214 * @param id Vertex id
215 * @param value Vertex value
216 */
217 @Override
218 public void addVertexRequest(I id, V value) throws IOException {
219 addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
220 }
221
222 /**
223 * Request to remove a vertex from the graph
224 * (applied just prior to the next superstep).
225 *
226 * @param vertexId Id of the vertex to be removed.
227 */
228 @Override
229 public void removeVertexRequest(I vertexId) throws IOException {
230 workerClientRequestProcessor.removeVertexRequest(vertexId);
231 }
232
233 /**
234 * Request to add an edge of a vertex in the graph
235 * (processed just prior to the next superstep)
236 *
237 * @param sourceVertexId Source vertex id of edge
238 * @param edge Edge to add
239 */
240 @Override
241 public void addEdgeRequest(I sourceVertexId,
242 Edge<I, E> edge) throws IOException {
243 workerClientRequestProcessor.addEdgeRequest(sourceVertexId, edge);
244 }
245
246 /**
247 * Request to remove all edges from a given source vertex to a given target
248 * vertex (processed just prior to the next superstep).
249 *
250 * @param sourceVertexId Source vertex id
251 * @param targetVertexId Target vertex id
252 */
253 @Override
254 public void removeEdgesRequest(I sourceVertexId,
255 I targetVertexId) throws IOException {
256 workerClientRequestProcessor.removeEdgesRequest(
257 sourceVertexId, targetVertexId);
258 }
259
260 /**
261 * Get the mapper context
262 *
263 * @return Mapper context
264 */
265 @Override
266 public Mapper.Context getContext() {
267 return graphState.getContext();
268 }
269
270 /**
271 * Get the worker context
272 *
273 * @param <W> WorkerContext class
274 * @return WorkerContext context
275 */
276 @SuppressWarnings("unchecked")
277 @Override
278 public <W extends WorkerContext> W getWorkerContext() {
279 return (W) workerContext;
280 }
281
282 @Override
283 public final int getWorkerCount() {
284 return allWorkersInfo.getWorkerCount();
285 }
286
287 @Override
288 public final int getMyWorkerIndex() {
289 return allWorkersInfo.getMyWorkerIndex();
290 }
291
292 @Override
293 public final int getWorkerForVertex(I vertexId) {
294 return allWorkersInfo.getWorkerIndex(
295 serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
296 }
297 }