1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.utils;
19
20 import org.apache.commons.io.FilenameUtils;
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.filecache.DistributedCache;
23 import org.apache.hadoop.fs.FileSystem;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.log4j.Logger;
26
27 import com.google.common.base.Optional;
28
29 import java.io.IOException;
30
31 import static org.apache.commons.io.FilenameUtils.getBaseName;
32
33
34
35
36 public class DistributedCacheUtils {
37
38 private static final Logger LOG = Logger.getLogger(
39 DistributedCacheUtils.class);
40
41
42 private DistributedCacheUtils() { }
43
44
45
46
47
48
49
50
51 public static Optional<Path>
52 getLocalCacheFile(Configuration conf, String pathToMatch) {
53 String nameToPath = FilenameUtils.getName(pathToMatch);
54 Path[] paths;
55 try {
56 paths = DistributedCache.getLocalCacheFiles(conf);
57 } catch (IOException e) {
58 return Optional.absent();
59 }
60 for (Path path : paths) {
61 if (FilenameUtils.getName(path.toString()).equals(nameToPath)) {
62 return Optional.of(path);
63 }
64 }
65 return Optional.absent();
66 }
67
68
69
70
71
72
73
74
75
76 public static Path copyToHdfs(Path path, Configuration conf) {
77 if (path.toString().startsWith("hdfs://")) {
78
79 return path;
80 }
81
82 FileSystem fs = null;
83 try {
84 fs = FileSystem.get(conf);
85 } catch (IOException e) {
86 throw new IllegalArgumentException("Failed to get HDFS FileSystem", e);
87 }
88 String name = getBaseName(path.toString()) + "-" + System.nanoTime();
89 Path remotePath = new Path("/tmp/giraph", name);
90 LOG.info("copyToHdfsIfNecessary: Copying " + path + " to " +
91 remotePath + " on hdfs " + fs.getUri());
92 try {
93 fs.copyFromLocalFile(false, true, path, remotePath);
94 } catch (IOException e) {
95 throw new IllegalArgumentException(
96 "Failed to copy jython script from local path " + path +
97 " to hdfs path " + remotePath + " on hdfs " + fs.getUri(), e);
98 }
99 return remotePath;
100 }
101
102
103
104
105
106
107
108
109 public static Path copyAndAdd(Path path, Configuration conf) {
110 Path remotePath = copyToHdfs(path, conf);
111 DistributedCache.addCacheFile(remotePath.toUri(), conf);
112 return remotePath;
113 }
114 }