1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.yarn;
19
20 import com.google.common.collect.Sets;
21 import java.io.FileOutputStream;
22 import java.util.Set;
23 import org.apache.giraph.conf.GiraphConfiguration;
24 import org.apache.giraph.conf.GiraphConstants;
25 import org.apache.hadoop.fs.FileStatus;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.mapreduce.MRJobConfig;
29 import org.apache.hadoop.yarn.api.records.ApplicationId;
30 import org.apache.hadoop.yarn.api.records.LocalResource;
31 import org.apache.hadoop.yarn.api.records.LocalResourceType;
32 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
33 import org.apache.hadoop.yarn.conf.YarnConfiguration;
34 import org.apache.hadoop.yarn.util.ConverterUtils;
35 import org.apache.hadoop.yarn.util.Records;
36 import org.apache.hadoop.util.StringUtils;
37 import java.io.File;
38 import java.io.IOException;
39 import java.util.Map;
40 import org.apache.log4j.Logger;
41
42
43
44
45
46 public class YarnUtils {
47
48 private static final Logger LOG = Logger.getLogger(YarnUtils.class);
49
50 private static final String HDFS_RESOURCE_DIR = "giraph_yarn_jar_cache";
51
52
53 private YarnUtils() {
54
55
56
57
58
59
60
61
62
63
64 public static void addFsResourcesToMap(Map<String, LocalResource> map,
65 GiraphConfiguration giraphConf, ApplicationId appId) throws IOException {
66 FileSystem fs = FileSystem.get(giraphConf);
67 Path baseDir = YarnUtils.getFsCachePath(fs, appId);
68 boolean coreJarFound = false;
69 for (String fileName : giraphConf.getYarnLibJars().split(",")) {
70 if (fileName.length() > 0) {
71 Path filePath = new Path(baseDir, fileName);
72 LOG.info("Adding " + fileName + " to LocalResources for export.to " +
73 filePath);
74 if (fileName.contains("giraph-core")) {
75 coreJarFound = true;
76 }
77 addFileToResourceMap(map, fs, filePath);
78 }
79 }
80 if (!coreJarFound) {
81 LOG.warn("Job jars (-yj option) didn't include giraph-core.");
82 }
83 Path confPath = new Path(baseDir, GiraphConstants.GIRAPH_YARN_CONF_FILE);
84 addFileToResourceMap(map, fs, confPath);
85 }
86
87
88
89
90
91
92
93
94 public static Set<Path> getLocalFiles(final Set<String> fileNames) {
95 Set<Path> jarPaths = Sets.newHashSet();
96 String classPath = ".:" + System.getenv("HADOOP_HOME");
97 if (classPath.length() > 2) {
98 classPath += ":";
99 }
100 classPath += System.getenv("CLASSPATH");
101 for (String baseDir : classPath.split(":")) {
102 LOG.info("Class path name " + baseDir);
103 if (baseDir.length() > 0) {
104
105 final int lastFileSep = baseDir.lastIndexOf("/");
106 if (lastFileSep > 0) {
107 String test = baseDir.substring(lastFileSep);
108 if (test.contains("*")) {
109 baseDir = baseDir.substring(0, lastFileSep);
110 }
111 }
112 LOG.info("base path checking " + baseDir);
113 populateJarList(new File(baseDir), jarPaths, fileNames);
114 }
115 if (jarPaths.size() >= fileNames.size()) {
116 break;
117 }
118 }
119 return jarPaths;
120 }
121
122
123
124
125
126
127
128 private static void populateJarList(final File dir,
129 final Set<Path> fileSet, final Set<String> fileNames) {
130 File[] filesInThisDir = dir.listFiles();
131 if (null == filesInThisDir) {
132 return;
133 }
134 for (File f : dir.listFiles()) {
135 if (f.isDirectory()) {
136 populateJarList(f, fileSet, fileNames);
137 } else if (f.isFile() && fileNames.contains(f.getName())) {
138 fileSet.add(new Path(f.getAbsolutePath()));
139 }
140 }
141 }
142
143
144
145
146
147
148
149 public static void addFileToResourceMap(Map<String, LocalResource>
150 localResources, FileSystem fs, Path target)
151 throws IOException {
152 LocalResource resource = Records.newRecord(LocalResource.class);
153 FileStatus destStatus = fs.getFileStatus(target);
154 resource.setResource(ConverterUtils.getYarnUrlFromURI(target.toUri()));
155 resource.setSize(destStatus.getLen());
156 resource.setTimestamp(destStatus.getModificationTime());
157 resource.setType(LocalResourceType.FILE);
158 resource.setVisibility(LocalResourceVisibility.APPLICATION);
159 localResources.put(target.getName(), resource);
160 LOG.info("Registered file in LocalResources :: " + target);
161 }
162
163
164
165
166
167
168
169 public static Path getFsCachePath(final FileSystem fs,
170 final ApplicationId appId) {
171 return new Path(fs.getHomeDirectory(), HDFS_RESOURCE_DIR + "/" + appId);
172 }
173
174
175
176
177
178
179
180
181 public static void addLocalClasspathToEnv(final Map<String, String> env,
182 final GiraphConfiguration giraphConf) {
183 StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
184 for (String cpEntry : giraphConf.getStrings(
185 YarnConfiguration.YARN_APPLICATION_CLASSPATH,
186 YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
187 classPathEnv.append(':').append(cpEntry.trim());
188 }
189 for (String cpEntry : giraphConf.getStrings(
190 MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
191 StringUtils.getStrings(
192 MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) {
193 classPathEnv.append(':').append(cpEntry.trim());
194 }
195
196 if (giraphConf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
197 classPathEnv.append(':').append(System.getenv("CLASSPATH"));
198 }
199 env.put("CLASSPATH", classPathEnv.toString());
200 }
201
202
203
204
205
206
207
208 public static void addGiraphConfToLocalResourceMap(GiraphConfiguration
209 giraphConf, ApplicationId appId, Map<String, LocalResource>
210 localResourceMap) throws IOException {
211 FileSystem fs = FileSystem.get(giraphConf);
212 Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
213 GiraphConstants.GIRAPH_YARN_CONF_FILE);
214 YarnUtils.addFileToResourceMap(localResourceMap, fs, hdfsConfPath);
215 }
216
217
218
219
220
221
222
223 public static void exportGiraphConfiguration(GiraphConfiguration giraphConf,
224 ApplicationId appId) throws IOException {
225 File confFile = new File(System.getProperty("java.io.tmpdir"),
226 GiraphConstants.GIRAPH_YARN_CONF_FILE);
227 if (confFile.exists()) {
228 if (!confFile.delete()) {
229 LOG.warn("Unable to delete file " + confFile);
230 }
231 }
232 String localConfPath = confFile.getAbsolutePath();
233 FileOutputStream fos = null;
234 try {
235 fos = new FileOutputStream(localConfPath);
236 giraphConf.writeXml(fos);
237 FileSystem fs = FileSystem.get(giraphConf);
238 Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
239 GiraphConstants.GIRAPH_YARN_CONF_FILE);
240 fos.flush();
241 fs.copyFromLocalFile(false, true, new Path(localConfPath), hdfsConfPath);
242 } finally {
243 if (null != fos) {
244 fos.close();
245 }
246 }
247 }
248 }