1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.worker;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.List;
25
26 import org.apache.giraph.bsp.CentralizedServiceWorker;
27 import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest;
28 import org.apache.giraph.graph.GraphState;
29 import org.apache.hadoop.io.Writable;
30 import org.apache.hadoop.io.WritableComparable;
31 import org.apache.hadoop.mapreduce.Mapper;
32
33
34
35
36
37 @SuppressWarnings("rawtypes")
38 public abstract class WorkerContext
39 extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable>
40 implements Writable, WorkerIndexUsage<WritableComparable> {
41
42 private GraphState graphState;
43
44
45 private CentralizedServiceWorker serviceWorker;
46
47 private AllWorkersInfo allWorkersInfo;
48
49
50
51
52
53
54 public final void setGraphState(GraphState graphState) {
55 this.graphState = graphState;
56 }
57
58
59
60
61
62
63 public final void setupSuperstep(
64 CentralizedServiceWorker<?, ?, ?> serviceWorker) {
65 this.serviceWorker = serviceWorker;
66 allWorkersInfo = new AllWorkersInfo(
67 serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo());
68 }
69
70
71
72
73
74
75
76
77
78 public abstract void preApplication() throws InstantiationException,
79 IllegalAccessException;
80
81
82
83
84
85
86 public abstract void postApplication();
87
88
89
90
91
92
93 public abstract void preSuperstep();
94
95
96
97
98
99
100 @Override
101 public final int getWorkerCount() {
102 return allWorkersInfo.getWorkerCount();
103 }
104
105
106
107
108
109
110 @Override
111 public final int getMyWorkerIndex() {
112 return allWorkersInfo.getMyWorkerIndex();
113 }
114
115 @Override
116 public final int getWorkerForVertex(WritableComparable vertexId) {
117 return allWorkersInfo.getWorkerIndex(
118 serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
119 }
120
121
122
123
124
125
126
127 public final List<Writable> getAndClearMessagesFromOtherWorkers() {
128 return serviceWorker.getServerData().
129 getAndClearCurrentWorkerToWorkerMessages();
130 }
131
132
133
134
135
136
137
138 public final void sendMessageToWorker(Writable message, int workerIndex) {
139 SendWorkerToWorkerMessageRequest request =
140 new SendWorkerToWorkerMessageRequest(message);
141 if (workerIndex == getMyWorkerIndex()) {
142 request.doRequest(serviceWorker.getServerData());
143 } else {
144 serviceWorker.getWorkerClient().sendWritableRequest(
145 allWorkersInfo.getWorkerList().get(workerIndex).getTaskId(), request);
146 }
147 }
148
149
150
151
152
153
154 public abstract void postSuperstep();
155
156
157
158
159
160
161 public final long getSuperstep() {
162 return graphState.getSuperstep();
163 }
164
165
166
167
168
169
170
171 public final long getTotalNumVertices() {
172 return graphState.getTotalNumVertices();
173 }
174
175
176
177
178
179
180
181 public final long getTotalNumEdges() {
182 return graphState.getTotalNumEdges();
183 }
184
185
186
187
188
189
190 public final Mapper.Context getContext() {
191 return graphState.getContext();
192 }
193
194
195
196
197
198
199
200 public final void logToCommandLine(String line) {
201 serviceWorker.getJobProgressTracker().logInfo(line);
202 }
203
204 @Override
205 public void write(DataOutput dataOutput) throws IOException {
206 }
207
208 @Override
209 public void readFields(DataInput dataInput) throws IOException {
210 }
211 }