Coverage Report - org.apache.giraph.scripting.ScriptLoader
 
Classes in this File Line Coverage Branch Coverage Complexity
ScriptLoader
0%
0/64
0%
0/19
2.417
ScriptLoader$1
0%
0/1
N/A
2.417
ScriptLoader$2
0%
0/2
N/A
2.417
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.scripting;
 19  
 
 20  
 import org.apache.giraph.conf.JsonStringConfOption;
 21  
 import org.apache.giraph.graph.Language;
 22  
 import org.apache.giraph.jython.JythonUtils;
 23  
 import org.apache.hadoop.conf.Configuration;
 24  
 import org.apache.hadoop.fs.Path;
 25  
 import org.apache.log4j.Logger;
 26  
 import org.codehaus.jackson.type.TypeReference;
 27  
 
 28  
 import com.google.common.base.Optional;
 29  
 import com.google.common.collect.Lists;
 30  
 import com.google.common.io.Closeables;
 31  
 
 32  
 import java.io.BufferedInputStream;
 33  
 import java.io.FileInputStream;
 34  
 import java.io.IOException;
 35  
 import java.io.InputStream;
 36  
 import java.util.List;
 37  
 
 38  
 import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile;
 39  
 
 40  
 /**
 41  
  * Loads scripts written by user in other languages, for example Jython.
 42  
  */
 43  
 public class ScriptLoader {
 44  
   /** Option for scripts to load on workers */
 45  0
   public static final JsonStringConfOption SCRIPTS_TO_LOAD =
 46  
       new JsonStringConfOption("giraph.scripts.to.load",
 47  
           "Scripts to load on workers");
 48  
 
 49  
   /** Scripts that were loaded */
 50  
   private static final List<DeployedScript> LOADED_SCRIPTS =
 51  0
       Lists.newArrayList();
 52  
 
 53  
   /** Logger */
 54  0
   private static final Logger LOG = Logger.getLogger(ScriptLoader.class);
 55  
 
 56  
   /** Don't construct */
 57  0
   private ScriptLoader() { }
 58  
 
 59  
   /**
 60  
    * Deploy a script
 61  
    *
 62  
    * @param conf {@link Configuration}
 63  
    * @param scriptPath Path to script
 64  
    * @param deployType type of deployment
 65  
    * @param language programming language
 66  
    */
 67  
   public static void setScriptsToLoad(Configuration conf,
 68  
       String scriptPath, DeployType deployType, Language language) {
 69  0
     DeployedScript deployedScript = new DeployedScript(scriptPath,
 70  
         deployType, language);
 71  0
     setScriptsToLoad(conf, deployedScript);
 72  0
   }
 73  
 
 74  
   /**
 75  
    * Deploy pair of scripts
 76  
    *
 77  
    * @param conf {@link Configuration}
 78  
    * @param script1 Path to script
 79  
    * @param deployType1 type of deployment
 80  
    * @param language1 programming language
 81  
    * @param script2 Path to script
 82  
    * @param deployType2 type of deployment
 83  
    * @param language2 programming language
 84  
    */
 85  
   public static void setScriptsToLoad(Configuration conf,
 86  
       String script1, DeployType deployType1, Language language1,
 87  
       String script2, DeployType deployType2, Language language2) {
 88  0
     DeployedScript deployedScript1 = new DeployedScript(script1,
 89  
         deployType1, language1);
 90  0
     DeployedScript deployedScript2 = new DeployedScript(script2,
 91  
         deployType2, language2);
 92  0
     setScriptsToLoad(conf, deployedScript1, deployedScript2);
 93  0
   }
 94  
 
 95  
   /**
 96  
    * Deploy scripts
 97  
    *
 98  
    * @param conf Configuration
 99  
    * @param scripts the scripts to deploy
 100  
    */
 101  
   public static void setScriptsToLoad(Configuration conf,
 102  
       DeployedScript... scripts) {
 103  0
     List<DeployedScript> scriptsToLoad = Lists.newArrayList(scripts);
 104  0
     SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
 105  0
   }
 106  
 
 107  
   /**
 108  
    * Add a script to load on workers
 109  
    *
 110  
    * @param conf {@link Configuration}
 111  
    * @param script  Path to script
 112  
    * @param deployType type of deployment
 113  
    * @param language programming language
 114  
    */
 115  
   public static void addScriptToLoad(Configuration conf,
 116  
       String script, DeployType deployType, Language language) {
 117  0
     addScriptToLoad(conf, new DeployedScript(script, deployType, language));
 118  0
   }
 119  
 
 120  
   /**
 121  
    * Add a script to load on workers
 122  
    *
 123  
    * @param conf {@link Configuration}
 124  
    * @param script the script to load
 125  
    */
 126  
   public static void addScriptToLoad(Configuration conf,
 127  
       DeployedScript script) {
 128  0
     List<DeployedScript> scriptsToLoad = getScriptsToLoad(conf);
 129  0
     if (scriptsToLoad == null) {
 130  0
       scriptsToLoad = Lists.<DeployedScript>newArrayList();
 131  
     }
 132  0
     scriptsToLoad.add(script);
 133  0
     SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
 134  0
   }
 135  
 
 136  
   /**
 137  
    * Get the list of scripts to load on workers
 138  
    *
 139  
    * @param conf {@link Configuration}
 140  
    * @return list of {@link DeployedScript}s
 141  
    */
 142  
   public static List<DeployedScript> getScriptsToLoad(Configuration conf) {
 143  0
     TypeReference<List<DeployedScript>> jsonType =
 144  0
         new TypeReference<List<DeployedScript>>() { };
 145  0
     return SCRIPTS_TO_LOAD.get(conf, jsonType);
 146  
   }
 147  
 
 148  
   /**
 149  
    * Load all the scripts deployed in Configuration
 150  
    *
 151  
    * @param conf Configuration
 152  
    * @throws IOException
 153  
    */
 154  
   public static void loadScripts(Configuration conf) throws IOException {
 155  0
     List<DeployedScript> deployedScripts = getScriptsToLoad(conf);
 156  0
     if (deployedScripts == null) {
 157  0
       return;
 158  
     }
 159  0
     for (DeployedScript deployedScript : deployedScripts) {
 160  0
       loadScript(conf, deployedScript);
 161  0
     }
 162  0
   }
 163  
 
 164  
   /**
 165  
    * Load a single deployed script
 166  
    *
 167  
    * @param conf Configuration
 168  
    * @param deployedScript the deployed script
 169  
    * @throws IOException
 170  
    */
 171  
   public static void loadScript(Configuration conf,
 172  
       DeployedScript deployedScript) throws IOException {
 173  0
     InputStream stream = openScriptInputStream(conf, deployedScript);
 174  0
     switch (deployedScript.getLanguage()) {
 175  
     case JYTHON:
 176  0
       loadJythonScript(stream);
 177  0
       break;
 178  
     default:
 179  0
       LOG.fatal("Don't know how to load script " + deployedScript);
 180  0
       throw new IllegalStateException("Don't know how to load script " +
 181  
           deployedScript);
 182  
     }
 183  
 
 184  0
     LOADED_SCRIPTS.add(deployedScript);
 185  0
     Closeables.close(stream, true);
 186  0
   }
 187  
 
 188  
   /**
 189  
    * Load a Jython deployed script
 190  
    *
 191  
    * @param stream InputStream with Jython code to load
 192  
    */
 193  
   private static void loadJythonScript(InputStream stream) {
 194  0
     JythonUtils.getInterpreter().execfile(stream);
 195  0
   }
 196  
 
 197  
   /**
 198  
    * Get list of scripts already loaded.
 199  
    *
 200  
    * @return list of loaded scripts
 201  
    */
 202  
   public static List<DeployedScript> getLoadedScripts() {
 203  0
     return LOADED_SCRIPTS;
 204  
   }
 205  
 
 206  
   /**
 207  
    * Get an {@link java.io.InputStream} for the deployed script.
 208  
    *
 209  
    * @param conf Configuration
 210  
    * @param deployedScript the deployed script
 211  
    * @return {@link java.io.InputStream} for reading script
 212  
    */
 213  
   private static InputStream openScriptInputStream(Configuration conf,
 214  
       DeployedScript deployedScript) {
 215  0
     DeployType deployType = deployedScript.getDeployType();
 216  0
     String path = deployedScript.getPath();
 217  
 
 218  
     InputStream stream;
 219  0
     switch (deployType) {
 220  
     case RESOURCE:
 221  0
       if (LOG.isInfoEnabled()) {
 222  0
         LOG.info("getScriptStream: Reading script from resource at " +
 223  0
             deployedScript.getPath());
 224  
       }
 225  0
       stream = ScriptLoader.class.getClassLoader().getResourceAsStream(path);
 226  0
       if (stream == null) {
 227  0
         throw new IllegalStateException("getScriptStream: Failed to " +
 228  
             "open script from resource at " + path);
 229  
       }
 230  
       break;
 231  
     case DISTRIBUTED_CACHE:
 232  0
       if (LOG.isInfoEnabled()) {
 233  0
         LOG.info("getScriptStream: Reading script from DistributedCache at " +
 234  
             path);
 235  
       }
 236  0
       Optional<Path> localPath = getLocalCacheFile(conf, path);
 237  0
       if (!localPath.isPresent()) {
 238  0
         throw new IllegalStateException("getScriptStream: Failed to " +
 239  
             "find script in local DistributedCache matching " + path);
 240  
       }
 241  0
       String pathStr = localPath.get().toString();
 242  
       try {
 243  0
         stream = new BufferedInputStream(new FileInputStream(pathStr));
 244  0
       } catch (IOException e) {
 245  0
         throw new IllegalStateException("getScriptStream: Failed open " +
 246  
             "script from DistributedCache at " + localPath);
 247  0
       }
 248  
       break;
 249  
     default:
 250  0
       throw new IllegalArgumentException("getScriptStream: Unknown " +
 251  
           "script deployment type: " + deployType);
 252  
     }
 253  0
     return stream;
 254  
   }
 255  
 }