1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.scripting; |
19 | |
|
20 | |
import org.apache.giraph.conf.JsonStringConfOption; |
21 | |
import org.apache.giraph.graph.Language; |
22 | |
import org.apache.giraph.jython.JythonUtils; |
23 | |
import org.apache.hadoop.conf.Configuration; |
24 | |
import org.apache.hadoop.fs.Path; |
25 | |
import org.apache.log4j.Logger; |
26 | |
import org.codehaus.jackson.type.TypeReference; |
27 | |
|
28 | |
import com.google.common.base.Optional; |
29 | |
import com.google.common.collect.Lists; |
30 | |
import com.google.common.io.Closeables; |
31 | |
|
32 | |
import java.io.BufferedInputStream; |
33 | |
import java.io.FileInputStream; |
34 | |
import java.io.IOException; |
35 | |
import java.io.InputStream; |
36 | |
import java.util.List; |
37 | |
|
38 | |
import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile; |
39 | |
|
40 | |
|
41 | |
|
42 | |
|
43 | |
public class ScriptLoader { |
44 | |
|
45 | 0 | public static final JsonStringConfOption SCRIPTS_TO_LOAD = |
46 | |
new JsonStringConfOption("giraph.scripts.to.load", |
47 | |
"Scripts to load on workers"); |
48 | |
|
49 | |
|
50 | |
private static final List<DeployedScript> LOADED_SCRIPTS = |
51 | 0 | Lists.newArrayList(); |
52 | |
|
53 | |
|
54 | 0 | private static final Logger LOG = Logger.getLogger(ScriptLoader.class); |
55 | |
|
56 | |
|
57 | 0 | private ScriptLoader() { } |
58 | |
|
59 | |
|
60 | |
|
61 | |
|
62 | |
|
63 | |
|
64 | |
|
65 | |
|
66 | |
|
67 | |
public static void setScriptsToLoad(Configuration conf, |
68 | |
String scriptPath, DeployType deployType, Language language) { |
69 | 0 | DeployedScript deployedScript = new DeployedScript(scriptPath, |
70 | |
deployType, language); |
71 | 0 | setScriptsToLoad(conf, deployedScript); |
72 | 0 | } |
73 | |
|
74 | |
|
75 | |
|
76 | |
|
77 | |
|
78 | |
|
79 | |
|
80 | |
|
81 | |
|
82 | |
|
83 | |
|
84 | |
|
85 | |
public static void setScriptsToLoad(Configuration conf, |
86 | |
String script1, DeployType deployType1, Language language1, |
87 | |
String script2, DeployType deployType2, Language language2) { |
88 | 0 | DeployedScript deployedScript1 = new DeployedScript(script1, |
89 | |
deployType1, language1); |
90 | 0 | DeployedScript deployedScript2 = new DeployedScript(script2, |
91 | |
deployType2, language2); |
92 | 0 | setScriptsToLoad(conf, deployedScript1, deployedScript2); |
93 | 0 | } |
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | |
|
99 | |
|
100 | |
|
101 | |
public static void setScriptsToLoad(Configuration conf, |
102 | |
DeployedScript... scripts) { |
103 | 0 | List<DeployedScript> scriptsToLoad = Lists.newArrayList(scripts); |
104 | 0 | SCRIPTS_TO_LOAD.set(conf, scriptsToLoad); |
105 | 0 | } |
106 | |
|
107 | |
|
108 | |
|
109 | |
|
110 | |
|
111 | |
|
112 | |
|
113 | |
|
114 | |
|
115 | |
public static void addScriptToLoad(Configuration conf, |
116 | |
String script, DeployType deployType, Language language) { |
117 | 0 | addScriptToLoad(conf, new DeployedScript(script, deployType, language)); |
118 | 0 | } |
119 | |
|
120 | |
|
121 | |
|
122 | |
|
123 | |
|
124 | |
|
125 | |
|
126 | |
public static void addScriptToLoad(Configuration conf, |
127 | |
DeployedScript script) { |
128 | 0 | List<DeployedScript> scriptsToLoad = getScriptsToLoad(conf); |
129 | 0 | if (scriptsToLoad == null) { |
130 | 0 | scriptsToLoad = Lists.<DeployedScript>newArrayList(); |
131 | |
} |
132 | 0 | scriptsToLoad.add(script); |
133 | 0 | SCRIPTS_TO_LOAD.set(conf, scriptsToLoad); |
134 | 0 | } |
135 | |
|
136 | |
|
137 | |
|
138 | |
|
139 | |
|
140 | |
|
141 | |
|
142 | |
public static List<DeployedScript> getScriptsToLoad(Configuration conf) { |
143 | 0 | TypeReference<List<DeployedScript>> jsonType = |
144 | 0 | new TypeReference<List<DeployedScript>>() { }; |
145 | 0 | return SCRIPTS_TO_LOAD.get(conf, jsonType); |
146 | |
} |
147 | |
|
148 | |
|
149 | |
|
150 | |
|
151 | |
|
152 | |
|
153 | |
|
154 | |
public static void loadScripts(Configuration conf) throws IOException { |
155 | 0 | List<DeployedScript> deployedScripts = getScriptsToLoad(conf); |
156 | 0 | if (deployedScripts == null) { |
157 | 0 | return; |
158 | |
} |
159 | 0 | for (DeployedScript deployedScript : deployedScripts) { |
160 | 0 | loadScript(conf, deployedScript); |
161 | 0 | } |
162 | 0 | } |
163 | |
|
164 | |
|
165 | |
|
166 | |
|
167 | |
|
168 | |
|
169 | |
|
170 | |
|
171 | |
public static void loadScript(Configuration conf, |
172 | |
DeployedScript deployedScript) throws IOException { |
173 | 0 | InputStream stream = openScriptInputStream(conf, deployedScript); |
174 | 0 | switch (deployedScript.getLanguage()) { |
175 | |
case JYTHON: |
176 | 0 | loadJythonScript(stream); |
177 | 0 | break; |
178 | |
default: |
179 | 0 | LOG.fatal("Don't know how to load script " + deployedScript); |
180 | 0 | throw new IllegalStateException("Don't know how to load script " + |
181 | |
deployedScript); |
182 | |
} |
183 | |
|
184 | 0 | LOADED_SCRIPTS.add(deployedScript); |
185 | 0 | Closeables.close(stream, true); |
186 | 0 | } |
187 | |
|
188 | |
|
189 | |
|
190 | |
|
191 | |
|
192 | |
|
193 | |
private static void loadJythonScript(InputStream stream) { |
194 | 0 | JythonUtils.getInterpreter().execfile(stream); |
195 | 0 | } |
196 | |
|
197 | |
|
198 | |
|
199 | |
|
200 | |
|
201 | |
|
202 | |
public static List<DeployedScript> getLoadedScripts() { |
203 | 0 | return LOADED_SCRIPTS; |
204 | |
} |
205 | |
|
206 | |
|
207 | |
|
208 | |
|
209 | |
|
210 | |
|
211 | |
|
212 | |
|
213 | |
private static InputStream openScriptInputStream(Configuration conf, |
214 | |
DeployedScript deployedScript) { |
215 | 0 | DeployType deployType = deployedScript.getDeployType(); |
216 | 0 | String path = deployedScript.getPath(); |
217 | |
|
218 | |
InputStream stream; |
219 | 0 | switch (deployType) { |
220 | |
case RESOURCE: |
221 | 0 | if (LOG.isInfoEnabled()) { |
222 | 0 | LOG.info("getScriptStream: Reading script from resource at " + |
223 | 0 | deployedScript.getPath()); |
224 | |
} |
225 | 0 | stream = ScriptLoader.class.getClassLoader().getResourceAsStream(path); |
226 | 0 | if (stream == null) { |
227 | 0 | throw new IllegalStateException("getScriptStream: Failed to " + |
228 | |
"open script from resource at " + path); |
229 | |
} |
230 | |
break; |
231 | |
case DISTRIBUTED_CACHE: |
232 | 0 | if (LOG.isInfoEnabled()) { |
233 | 0 | LOG.info("getScriptStream: Reading script from DistributedCache at " + |
234 | |
path); |
235 | |
} |
236 | 0 | Optional<Path> localPath = getLocalCacheFile(conf, path); |
237 | 0 | if (!localPath.isPresent()) { |
238 | 0 | throw new IllegalStateException("getScriptStream: Failed to " + |
239 | |
"find script in local DistributedCache matching " + path); |
240 | |
} |
241 | 0 | String pathStr = localPath.get().toString(); |
242 | |
try { |
243 | 0 | stream = new BufferedInputStream(new FileInputStream(pathStr)); |
244 | 0 | } catch (IOException e) { |
245 | 0 | throw new IllegalStateException("getScriptStream: Failed open " + |
246 | |
"script from DistributedCache at " + localPath); |
247 | 0 | } |
248 | |
break; |
249 | |
default: |
250 | 0 | throw new IllegalArgumentException("getScriptStream: Unknown " + |
251 | |
"script deployment type: " + deployType); |
252 | |
} |
253 | 0 | return stream; |
254 | |
} |
255 | |
} |