1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.job;
20
21 import com.google.common.collect.ImmutableList;
22 import org.apache.giraph.bsp.BspInputFormat;
23 import org.apache.giraph.conf.GiraphConfiguration;
24 import org.apache.giraph.conf.GiraphConstants;
25 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26 import org.apache.giraph.graph.GraphMapper;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.ipc.Client;
29 import org.apache.hadoop.mapreduce.Job;
30 import org.apache.log4j.Logger;
31
32 import java.io.IOException;
33
34
35
36
37
38
39 public class GiraphJob {
40 static {
41 Configuration.addDefaultResource("giraph-site.xml");
42 }
43
44
45 private static final Logger LOG = Logger.getLogger(GiraphJob.class);
46
47 private final DelegatedJob delegatedJob;
48
49 private String jobName;
50
51 private final GiraphConfiguration giraphConfiguration;
52
53
54
55
56 private class DelegatedJob extends Job {
57
58 private boolean jobInited = false;
59
60
61
62
63
64
65
66 DelegatedJob(Configuration conf) throws IOException {
67 super(conf);
68 }
69
70 @Override
71 public Configuration getConfiguration() {
72 if (jobInited) {
73 return giraphConfiguration;
74 } else {
75 return super.getConfiguration();
76 }
77 }
78 }
79
80
81
82
83
84
85
86 public GiraphJob(String jobName) throws IOException {
87 this(new GiraphConfiguration(), jobName);
88 }
89
90
91
92
93
94
95
96
97 public GiraphJob(Configuration configuration,
98 String jobName) throws IOException {
99 this(new GiraphConfiguration(configuration), jobName);
100 }
101
102
103
104
105
106
107
108
109 public GiraphJob(GiraphConfiguration giraphConfiguration,
110 String jobName) throws IOException {
111 this.jobName = jobName;
112 this.giraphConfiguration = giraphConfiguration;
113 this.delegatedJob = new DelegatedJob(giraphConfiguration);
114 }
115
116 public String getJobName() {
117 return jobName;
118 }
119
120 public void setJobName(String jobName) {
121 this.jobName = jobName;
122 }
123
124
125
126
127
128
129 public GiraphConfiguration getConfiguration() {
130 return giraphConfiguration;
131 }
132
133
134
135
136
137
138
139
140 public Job getInternalJob() {
141 delegatedJob.jobInited = true;
142 return delegatedJob;
143 }
144
145
146
147
148
149
150
151
152 private static void checkLocalJobRunnerConfiguration(
153 ImmutableClassesGiraphConfiguration conf) {
154 String jobTracker = conf.get("mapred.job.tracker", null);
155 if (!jobTracker.equals("local")) {
156
157 return;
158 }
159
160 int maxWorkers = conf.getMaxWorkers();
161 if (maxWorkers != 1) {
162 throw new IllegalArgumentException(
163 "checkLocalJobRunnerConfiguration: When using " +
164 "LocalJobRunner, must have only one worker since " +
165 "only 1 task at a time!");
166 }
167 if (conf.getSplitMasterWorker()) {
168 throw new IllegalArgumentException(
169 "checkLocalJobRunnerConfiguration: When using " +
170 "LocalJobRunner, you cannot run in split master / worker " +
171 "mode since there is only 1 task at a time!");
172 }
173 }
174
175
176
177
178
179
180
181 private void setIntConfIfDefault(String param, int defaultValue) {
182 if (giraphConfiguration.getInt(param, Integer.MIN_VALUE) ==
183 Integer.MIN_VALUE) {
184 giraphConfiguration.setInt(param, defaultValue);
185 }
186 }
187
188
189
190
191
192
193
194
195
196
197 public final boolean run(boolean verbose)
198 throws IOException, InterruptedException, ClassNotFoundException {
199
200 setIntConfIfDefault("mapreduce.job.counters.limit", 512);
201
202
203
204 setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
205 setIntConfIfDefault("mapred.job.reduce.memory.mb", 0);
206
207
208 giraphConfiguration.setBoolean(
209 "mapred.map.tasks.speculative.execution", false);
210
211
212
213 Client.setPingInterval(giraphConfiguration, 60000 * 5);
214
215
216
217 giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
218 giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first", true);
219
220
221
222 if (giraphConfiguration.getCheckpointFrequency() == 0) {
223 int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts();
224 giraphConfiguration.setMaxTaskAttempts(1);
225 if (LOG.isInfoEnabled()) {
226 LOG.info("run: Since checkpointing is disabled (default), " +
227 "do not allow any task retries (setting " +
228 GiraphConstants.MAX_TASK_ATTEMPTS.getKey() + " = 1, " +
229 "old value = " + oldMaxTaskAttempts + ")");
230 }
231 }
232
233
234 ImmutableClassesGiraphConfiguration conf =
235 new ImmutableClassesGiraphConfiguration(giraphConfiguration);
236 checkLocalJobRunnerConfiguration(conf);
237
238 int tryCount = 0;
239 GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker();
240 while (true) {
241 GiraphJobObserver jobObserver = conf.getJobObserver();
242
243 JobProgressTrackerService jobProgressTrackerService =
244 DefaultJobProgressTrackerService.createJobProgressTrackerService(
245 conf, jobObserver);
246 ClientThriftServer clientThriftServer = null;
247 if (jobProgressTrackerService != null) {
248 clientThriftServer = new ClientThriftServer(
249 conf, ImmutableList.of(jobProgressTrackerService));
250 }
251
252 tryCount++;
253 Job submittedJob = new Job(conf, jobName);
254 if (submittedJob.getJar() == null) {
255 submittedJob.setJarByClass(getClass());
256 }
257 submittedJob.setNumReduceTasks(0);
258 submittedJob.setMapperClass(GraphMapper.class);
259 submittedJob.setInputFormatClass(BspInputFormat.class);
260 submittedJob.setOutputFormatClass(
261 GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.get(conf));
262 if (jobProgressTrackerService != null) {
263 jobProgressTrackerService.setJob(submittedJob);
264 }
265
266 jobObserver.launchingJob(submittedJob);
267 submittedJob.submit();
268 if (LOG.isInfoEnabled()) {
269 LOG.info("Tracking URL: " + submittedJob.getTrackingURL());
270 LOG.info(
271 "Waiting for resources... Job will start only when it gets all " +
272 (conf.getMinWorkers() + 1) + " mappers");
273 }
274 jobObserver.jobRunning(submittedJob);
275 HaltApplicationUtils.printHaltInfo(submittedJob, conf);
276
277 boolean passed = submittedJob.waitForCompletion(verbose);
278 if (jobProgressTrackerService != null) {
279 jobProgressTrackerService.stop(passed);
280 }
281 if (clientThriftServer != null) {
282 clientThriftServer.stopThriftServer();
283 }
284
285 jobObserver.jobFinished(submittedJob, passed);
286
287 if (!passed) {
288 String restartFrom = retryChecker.shouldRestartCheckpoint(submittedJob);
289 if (restartFrom != null) {
290 GiraphConstants.RESTART_JOB_ID.set(conf, restartFrom);
291 continue;
292 }
293 }
294
295 if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {
296 return passed;
297 }
298 if (LOG.isInfoEnabled()) {
299 LOG.info("run: Retrying job, " + tryCount + " try");
300 }
301 }
302 }
303 }