Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
WorkerContext |
|
| 1.0555555555555556;1.056 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.worker; | |
20 | ||
21 | import java.io.DataInput; | |
22 | import java.io.DataOutput; | |
23 | import java.io.IOException; | |
24 | import java.util.List; | |
25 | ||
26 | import org.apache.giraph.bsp.CentralizedServiceWorker; | |
27 | import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest; | |
28 | import org.apache.giraph.graph.GraphState; | |
29 | import org.apache.hadoop.io.Writable; | |
30 | import org.apache.hadoop.io.WritableComparable; | |
31 | import org.apache.hadoop.mapreduce.Mapper; | |
32 | ||
33 | /** | |
34 | * WorkerContext allows for the execution of user code | |
35 | * on a per-worker basis. There's one WorkerContext per worker. | |
36 | */ | |
37 | 0 | @SuppressWarnings("rawtypes") |
38 | 0 | public abstract class WorkerContext |
39 | extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable> | |
40 | implements Writable, WorkerIndexUsage<WritableComparable> { | |
41 | /** Global graph state */ | |
42 | private GraphState graphState; | |
43 | ||
44 | /** Service worker */ | |
45 | private CentralizedServiceWorker serviceWorker; | |
46 | /** All workers info */ | |
47 | private AllWorkersInfo allWorkersInfo; | |
48 | ||
49 | /** | |
50 | * Set the graph state. | |
51 | * | |
52 | * @param graphState Used to set the graph state. | |
53 | */ | |
54 | public final void setGraphState(GraphState graphState) { | |
55 | 0 | this.graphState = graphState; |
56 | 0 | } |
57 | ||
58 | /** | |
59 | * Setup superstep. | |
60 | * | |
61 | * @param serviceWorker Service worker containing all the information | |
62 | */ | |
63 | public final void setupSuperstep( | |
64 | CentralizedServiceWorker<?, ?, ?> serviceWorker) { | |
65 | 0 | this.serviceWorker = serviceWorker; |
66 | 0 | allWorkersInfo = new AllWorkersInfo( |
67 | 0 | serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo()); |
68 | 0 | } |
69 | ||
70 | /** | |
71 | * Initialize the WorkerContext. | |
72 | * This method is executed once on each Worker before the first | |
73 | * superstep starts. | |
74 | * | |
75 | * @throws IllegalAccessException Thrown for getting the class | |
76 | * @throws InstantiationException Expected instantiation in this method. | |
77 | */ | |
78 | public abstract void preApplication() throws InstantiationException, | |
79 | IllegalAccessException; | |
80 | ||
81 | /** | |
82 | * Finalize the WorkerContext. | |
83 | * This method is executed once on each Worker after the last | |
84 | * superstep ends. | |
85 | */ | |
86 | public abstract void postApplication(); | |
87 | ||
88 | /** | |
89 | * Execute user code. | |
90 | * This method is executed once on each Worker before each | |
91 | * superstep starts. | |
92 | */ | |
93 | public abstract void preSuperstep(); | |
94 | ||
95 | /** | |
96 | * Get number of workers | |
97 | * | |
98 | * @return Number of workers | |
99 | */ | |
100 | @Override | |
101 | public final int getWorkerCount() { | |
102 | 0 | return allWorkersInfo.getWorkerCount(); |
103 | } | |
104 | ||
105 | /** | |
106 | * Get index for this worker | |
107 | * | |
108 | * @return Index of this worker | |
109 | */ | |
110 | @Override | |
111 | public final int getMyWorkerIndex() { | |
112 | 0 | return allWorkersInfo.getMyWorkerIndex(); |
113 | } | |
114 | ||
115 | @Override | |
116 | public final int getWorkerForVertex(WritableComparable vertexId) { | |
117 | 0 | return allWorkersInfo.getWorkerIndex( |
118 | 0 | serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo()); |
119 | } | |
120 | ||
121 | /** | |
122 | * Get messages which other workers sent to this worker and clear them (can | |
123 | * be called once per superstep) | |
124 | * | |
125 | * @return Messages received | |
126 | */ | |
127 | public final List<Writable> getAndClearMessagesFromOtherWorkers() { | |
128 | 0 | return serviceWorker.getServerData(). |
129 | 0 | getAndClearCurrentWorkerToWorkerMessages(); |
130 | } | |
131 | ||
132 | /** | |
133 | * Send message to another worker | |
134 | * | |
135 | * @param message Message to send | |
136 | * @param workerIndex Index of the worker to send the message to | |
137 | */ | |
138 | public final void sendMessageToWorker(Writable message, int workerIndex) { | |
139 | 0 | SendWorkerToWorkerMessageRequest request = |
140 | new SendWorkerToWorkerMessageRequest(message); | |
141 | 0 | if (workerIndex == getMyWorkerIndex()) { |
142 | 0 | request.doRequest(serviceWorker.getServerData()); |
143 | } else { | |
144 | 0 | serviceWorker.getWorkerClient().sendWritableRequest( |
145 | 0 | allWorkersInfo.getWorkerList().get(workerIndex).getTaskId(), request); |
146 | } | |
147 | 0 | } |
148 | ||
149 | /** | |
150 | * Execute user code. | |
151 | * This method is executed once on each Worker after each | |
152 | * superstep ends. | |
153 | */ | |
154 | public abstract void postSuperstep(); | |
155 | ||
156 | /** | |
157 | * Retrieves the current superstep. | |
158 | * | |
159 | * @return Current superstep | |
160 | */ | |
161 | public final long getSuperstep() { | |
162 | 0 | return graphState.getSuperstep(); |
163 | } | |
164 | ||
165 | /** | |
166 | * Get the total (all workers) number of vertices that | |
167 | * existed in the previous superstep. | |
168 | * | |
169 | * @return Total number of vertices (-1 if first superstep) | |
170 | */ | |
171 | public final long getTotalNumVertices() { | |
172 | 0 | return graphState.getTotalNumVertices(); |
173 | } | |
174 | ||
175 | /** | |
176 | * Get the total (all workers) number of edges that | |
177 | * existed in the previous superstep. | |
178 | * | |
179 | * @return Total number of edges (-1 if first superstep) | |
180 | */ | |
181 | public final long getTotalNumEdges() { | |
182 | 0 | return graphState.getTotalNumEdges(); |
183 | } | |
184 | ||
185 | /** | |
186 | * Get the mapper context | |
187 | * | |
188 | * @return Mapper context | |
189 | */ | |
190 | public final Mapper.Context getContext() { | |
191 | 0 | return graphState.getContext(); |
192 | } | |
193 | ||
194 | /** | |
195 | * Call this to log a line to command line of the job. Use in moderation - | |
196 | * it's a synchronous call to Job client | |
197 | * | |
198 | * @param line Line to print | |
199 | */ | |
200 | public final void logToCommandLine(String line) { | |
201 | 0 | serviceWorker.getJobProgressTracker().logInfo(line); |
202 | 0 | } |
203 | ||
204 | @Override | |
205 | public void write(DataOutput dataOutput) throws IOException { | |
206 | 0 | } |
207 | ||
208 | @Override | |
209 | public void readFields(DataInput dataInput) throws IOException { | |
210 | 0 | } |
211 | } |