1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.yarn;
19
20 import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
21
22 import com.google.common.collect.ImmutableList;
23 import com.google.common.collect.Maps;
24
25 import com.google.common.collect.Sets;
26 import java.util.Set;
27 import org.apache.giraph.conf.GiraphConfiguration;
28 import org.apache.giraph.conf.GiraphConstants;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileStatus;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.io.DataOutputBuffer;
34 import org.apache.hadoop.security.Credentials;
35 import org.apache.hadoop.security.UserGroupInformation;
36 import org.apache.hadoop.security.token.Token;
37 import org.apache.hadoop.yarn.api.ApplicationConstants;
38 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
39 import org.apache.hadoop.yarn.api.records.ApplicationId;
40 import org.apache.hadoop.yarn.api.records.ApplicationReport;
41 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
42 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
43 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
44 import org.apache.hadoop.yarn.api.records.LocalResource;
45 import org.apache.hadoop.yarn.api.records.NodeReport;
46 import org.apache.hadoop.yarn.api.records.NodeState;
47 import org.apache.hadoop.yarn.api.records.Resource;
48 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
49 import org.apache.hadoop.yarn.client.api.YarnClient;
50 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
51 import org.apache.hadoop.yarn.conf.YarnConfiguration;
52 import org.apache.hadoop.yarn.exceptions.YarnException;
53 import org.apache.hadoop.yarn.util.Records;
54
55 import org.apache.log4j.Logger;
56
57 import java.io.IOException;
58 import java.util.List;
59 import java.util.Map;
60 import java.nio.ByteBuffer;
61
62
63
64
65
66
67
68 public class GiraphYarnClient {
69 static {
70 Configuration.addDefaultResource("giraph-site.xml");
71 }
72
73 private static final Logger LOG = Logger.getLogger(GiraphYarnClient.class);
74
75 private static final int JOB_STATUS_INTERVAL_MSECS = 800;
76
77 private static final int YARN_APP_MASTER_MEMORY_MB = 512;
78
79
80 private final String jobName;
81
82 private final GiraphConfiguration giraphConf;
83
84 private ApplicationId appId;
85
86 private int reportCounter;
87
88 private YarnClient yarnClient;
89
90
91
92
93
94
95
96 public GiraphYarnClient(GiraphConfiguration giraphConf, String jobName)
97 throws IOException {
98 this.reportCounter = 0;
99 this.jobName = jobName;
100 this.appId = null;
101 this.giraphConf = giraphConf;
102 verifyOutputDirDoesNotExist();
103 yarnClient = YarnClient.createYarnClient();
104 yarnClient.init(giraphConf);
105 }
106
107
108
109
110
111
112
113
114
115 public boolean run(final boolean verbose) throws YarnException, IOException {
116
117 LOG.info("Running Client");
118 yarnClient.start();
119
120
121 YarnClientApplication app = yarnClient.createApplication();
122 GetNewApplicationResponse getNewAppResponse = app.
123 getNewApplicationResponse();
124 checkPerNodeResourcesAvailable(getNewAppResponse);
125
126 ApplicationSubmissionContext appContext = app.
127 getApplicationSubmissionContext();
128 appId = appContext.getApplicationId();
129
130 appContext.setApplicationId(appId);
131 appContext.setApplicationName(jobName);
132 LOG.info("Obtained new Application ID: " + appId);
133
134 applyConfigsForYarnGiraphJob();
135
136 ContainerLaunchContext containerContext = buildContainerLaunchContext();
137 appContext.setResource(buildContainerMemory());
138 appContext.setAMContainerSpec(containerContext);
139 LOG.info("ApplicationSumbissionContext for GiraphApplicationMaster " +
140 "launch container is populated.");
141
142
143
144
145
146
147
148
149
150
151
152 try {
153 LOG.info("Submitting application to ASM");
154
155 appId = yarnClient.submitApplication(appContext);
156 LOG.info("Got new appId after submission :" + appId);
157 } catch (YarnException yre) {
158
159
160
161 throw new RuntimeException("submitApplication(appContext) FAILED.", yre);
162 }
163 LOG.info("GiraphApplicationMaster container request was submitted to " +
164 "ResourceManager for job: " + jobName);
165 return awaitGiraphJobCompletion();
166 }
167
168
169
170
171 private void verifyOutputDirDoesNotExist() {
172 Path outDir = null;
173 try {
174 FileSystem fs = FileSystem.get(giraphConf);
175 String errorMsg = "__ERROR_NO_OUTPUT_DIR_SET__";
176 outDir =
177 new Path(fs.getHomeDirectory(), giraphConf.get(OUTDIR, errorMsg));
178 FileStatus outStatus = fs.getFileStatus(outDir);
179 if (outStatus.isDirectory() || outStatus.isFile() ||
180 outStatus.isSymlink()) {
181 throw new IllegalStateException("Path " + outDir + " already exists.");
182 }
183 } catch (IOException ioe) {
184 LOG.info("Final output path is: " + outDir);
185 }
186 }
187
188
189
190
191
192
193 private void applyConfigsForYarnGiraphJob() {
194 GiraphConstants.IS_PURE_YARN_JOB.set(giraphConf, true);
195 GiraphConstants.SPLIT_MASTER_WORKER.set(giraphConf, true);
196 giraphConf.set("mapred.job.id", "giraph_yarn_" + appId);
197 }
198
199
200
201
202
203
204 private void checkPerNodeResourcesAvailable(
205 final GetNewApplicationResponse cluster) throws YarnException, IOException {
206
207 List<NodeReport> nodes = null;
208 long totalAvailable = 0;
209 try {
210 nodes = yarnClient.getNodeReports(NodeState.RUNNING);
211 } catch (YarnException yre) {
212 throw new RuntimeException("GiraphYarnClient could not connect with " +
213 "the YARN ResourceManager to determine the number of available " +
214 "application containers.", yre);
215 }
216 for (NodeReport node : nodes) {
217 LOG.info("Got node report from ASM for" +
218 ", nodeId=" + node.getNodeId() +
219 ", nodeAddress " + node.getHttpAddress() +
220 ", nodeRackName " + node.getRackName() +
221 ", nodeNumContainers " + node.getNumContainers());
222 totalAvailable += node.getCapability().getMemory();
223 }
224
225 final int workers = giraphConf.getMaxWorkers() + 1;
226 checkAndAdjustPerTaskHeapSize(cluster);
227 final long totalAsk =
228 giraphConf.getYarnTaskHeapMb() * workers;
229 if (totalAsk > totalAvailable) {
230 throw new IllegalStateException("Giraph's estimated cluster heap " +
231 totalAsk + "MB ask is greater than the current available cluster " +
232 "heap of " + totalAvailable + "MB. Aborting Job.");
233 }
234 }
235
236
237
238
239
240
241
242 private void checkAndAdjustPerTaskHeapSize(
243 final GetNewApplicationResponse gnar) {
244
245
246
247 final int maxCapacity = gnar.getMaximumResourceCapability().getMemory();
248
249 int giraphMem = giraphConf.getYarnTaskHeapMb();
250 if (giraphMem == GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB_DEFAULT) {
251 LOG.info("Defaulting per-task heap size to " + giraphMem + "MB.");
252 }
253 if (giraphMem > maxCapacity) {
254 LOG.info("Giraph's request of heap MB per-task is more than the " +
255 "maximum; downgrading Giraph to" + maxCapacity + "MB.");
256 giraphMem = maxCapacity;
257 }
258
259
260
261
262
263 giraphConf.setYarnTaskHeapMb(giraphMem);
264 }
265
266
267
268
269
270
271 private boolean awaitGiraphJobCompletion() throws YarnException, IOException {
272 boolean done;
273 ApplicationReport report = null;
274 try {
275 do {
276 try {
277 Thread.sleep(JOB_STATUS_INTERVAL_MSECS);
278 } catch (InterruptedException ir) {
279 LOG.info("Progress reporter's sleep was interrupted!", ir);
280 }
281 report = yarnClient.getApplicationReport(appId);
282 done = checkProgress(report);
283 } while (!done);
284 if (!giraphConf.metricsEnabled()) {
285 cleanupJarCache();
286 }
287 } catch (IOException ex) {
288 final String diagnostics = (null == report) ? "" :
289 "Diagnostics: " + report.getDiagnostics();
290 LOG.error("Fatal fault encountered, failing " + jobName + ". " +
291 diagnostics, ex);
292 try {
293 LOG.error("FORCIBLY KILLING Application from AppMaster.");
294 yarnClient.killApplication(appId);
295 } catch (YarnException yre) {
296 LOG.error("Exception raised in attempt to kill application.", yre);
297 }
298 return false;
299 }
300 return printFinalJobReport();
301 }
302
303
304
305
306
307
308 private void cleanupJarCache() throws IOException {
309 FileSystem fs = FileSystem.get(giraphConf);
310 Path baseCacheDir = YarnUtils.getFsCachePath(fs, appId);
311 if (fs.exists(baseCacheDir)) {
312 LOG.info("Cleaning up HDFS distributed cache directory for Giraph job.");
313 fs.delete(baseCacheDir, true);
314 fs.delete(baseCacheDir, false);
315 }
316 }
317
318
319
320
321
322 private boolean printFinalJobReport() throws YarnException, IOException {
323 ApplicationReport report;
324 try {
325 report = yarnClient.getApplicationReport(appId);
326 FinalApplicationStatus finalAppStatus =
327 report.getFinalApplicationStatus();
328 final long secs =
329 (report.getFinishTime() - report.getStartTime()) / 1000L;
330 final String time = String.format("%d minutes, %d seconds.",
331 secs / 60L, secs % 60L);
332 LOG.info("Completed " + jobName + ": " +
333 finalAppStatus.name() + ", total running time: " + time);
334 } catch (YarnException yre) {
335 LOG.error("Exception encountered while attempting to request " +
336 "a final job report for " + jobName , yre);
337 return false;
338 }
339 return true;
340 }
341
342
343
344
345
346 private ContainerLaunchContext buildContainerLaunchContext()
347 throws IOException {
348 ContainerLaunchContext appMasterContainer =
349 Records.newRecord(ContainerLaunchContext.class);
350 appMasterContainer.setEnvironment(buildEnvironment());
351 appMasterContainer.setLocalResources(buildLocalResourceMap());
352 appMasterContainer.setCommands(buildAppMasterExecCommand());
353
354
355 setToken(appMasterContainer);
356 return appMasterContainer;
357 }
358
359
360
361
362
363
364 private void setToken(ContainerLaunchContext amContainer) throws IOException {
365
366 if (UserGroupInformation.isSecurityEnabled()) {
367 Credentials credentials = new Credentials();
368 String tokenRenewer = giraphConf.get(YarnConfiguration.RM_PRINCIPAL);
369 if (tokenRenewer == null || tokenRenewer.length() == 0) {
370 throw new IOException(
371 "Can't get Master Kerberos principal for the RM to use as renewer");
372 }
373 FileSystem fs = FileSystem.get(giraphConf);
374
375 final Token<?> [] tokens =
376 fs.addDelegationTokens(tokenRenewer, credentials);
377 if (tokens != null) {
378 for (Token<?> token : tokens) {
379 LOG.info("Got dt for " + fs.getUri() + "; " + token);
380 }
381 }
382 DataOutputBuffer dob = new DataOutputBuffer();
383 credentials.writeTokenStorageToStream(dob);
384 ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
385 amContainer.setTokens(fsTokens);
386 }
387 }
388
389
390
391
392
393
394
395 private boolean checkProgress(final ApplicationReport report) {
396 YarnApplicationState jobState = report.getYarnApplicationState();
397 if (jobState == YarnApplicationState.FINISHED ||
398 jobState == YarnApplicationState.KILLED) {
399 return true;
400 } else if (jobState == YarnApplicationState.FAILED) {
401 LOG.error(jobName + " reports FAILED state, diagnostics show: " +
402 report.getDiagnostics());
403 return true;
404 } else {
405 if (reportCounter++ % 5 == 0) {
406 displayJobReport(report);
407 }
408 }
409 return false;
410 }
411
412
413
414
415
416 private void displayJobReport(final ApplicationReport report) {
417 if (null == report) {
418 throw new IllegalStateException("[*] Latest ApplicationReport for job " +
419 jobName + " was not received by the local client.");
420 }
421 final float elapsed =
422 (System.currentTimeMillis() - report.getStartTime()) / 1000.0f;
423 LOG.info(jobName + ", Elapsed: " + String.format("%.2f secs", elapsed));
424 LOG.info(report.getCurrentApplicationAttemptId() + ", State: " +
425 report.getYarnApplicationState().name() + ", Containers used: " +
426 report.getApplicationResourceUsageReport().getNumUsedContainers());
427 }
428
429
430
431
432
433
434 private List<String> buildAppMasterExecCommand() {
435
436 return ImmutableList.of("${JAVA_HOME}/bin/java " +
437 "-Xmx" + YARN_APP_MASTER_MEMORY_MB + "M " +
438 "-Xms" + YARN_APP_MASTER_MEMORY_MB + "M " +
439
440 "-cp .:${CLASSPATH} org.apache.giraph.yarn.GiraphApplicationMaster " +
441 "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stdout.log " +
442 "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stderr.log "
443 );
444 }
445
446
447
448
449
450
451 private void addLocalJarsToResourceMap(Map<String, LocalResource> map)
452 throws IOException {
453 Set<String> jars = Sets.newHashSet();
454 LOG.info("LIB JARS :" + giraphConf.getYarnLibJars());
455 String[] libJars = giraphConf.getYarnLibJars().split(",");
456 for (String libJar : libJars) {
457 jars.add(libJar);
458 }
459 FileSystem fs = FileSystem.get(giraphConf);
460 Path baseDir = YarnUtils.getFsCachePath(fs, appId);
461 for (Path jar : YarnUtils.getLocalFiles(jars)) {
462 Path dest = new Path(baseDir, jar.getName());
463 LOG.info("Made local resource for :" + jar + " to " + dest);
464 fs.copyFromLocalFile(false, true, jar, dest);
465 YarnUtils.addFileToResourceMap(map, fs, dest);
466 }
467 }
468
469
470
471
472
473 private Resource buildContainerMemory() {
474 Resource capability = Records.newRecord(Resource.class);
475 capability.setMemory(YARN_APP_MASTER_MEMORY_MB);
476 return capability;
477 }
478
479
480
481
482
483
484 private Map<String, String> buildEnvironment() {
485 Map<String, String> environment =
486 Maps.<String, String>newHashMap();
487 LOG.info("Set the environment for the application master");
488 YarnUtils.addLocalClasspathToEnv(environment, giraphConf);
489
490 LOG.info("Environment for AM :" + environment);
491 return environment;
492 }
493
494
495
496
497
498
499
500 private Map<String, LocalResource> buildLocalResourceMap() {
501
502
503
504
505 Map<String, LocalResource> localResources =
506 Maps.<String, LocalResource>newHashMap();
507 LOG.info("buildLocalResourceMap ....");
508 try {
509
510
511 YarnUtils.exportGiraphConfiguration(giraphConf, appId);
512 YarnUtils.addGiraphConfToLocalResourceMap(
513 giraphConf, appId, localResources);
514
515 addLocalJarsToResourceMap(localResources);
516
517 return localResources;
518 } catch (IOException ioe) {
519 throw new IllegalStateException("Failed to build LocalResouce map.", ioe);
520 }
521 }
522
523 }