Coverage Report - org.apache.giraph.io.gora.GoraEdgeInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
GoraEdgeInputFormat
0%
0/85
0%
0/4
1.286
GoraEdgeInputFormat$GoraEdgeReader
0%
0/26
0%
0/2
1.286
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.io.gora;
 19  
 
 20  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
 21  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
 22  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
 23  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
 24  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
 25  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
 26  
 
 27  
 import java.io.IOException;
 28  
 import java.util.List;
 29  
 
 30  
 import org.apache.giraph.edge.Edge;
 31  
 import org.apache.giraph.io.EdgeInputFormat;
 32  
 import org.apache.giraph.io.EdgeReader;
 33  
 import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat;
 34  
 import org.apache.giraph.io.gora.utils.GoraUtils;
 35  
 import org.apache.giraph.io.gora.utils.KeyFactory;
 36  
 import org.apache.gora.persistency.Persistent;
 37  
 import org.apache.gora.query.Result;
 38  
 import org.apache.gora.query.impl.QueryBase;
 39  
 import org.apache.gora.store.DataStore;
 40  
 import org.apache.gora.util.GoraException;
 41  
 import org.apache.hadoop.conf.Configuration;
 42  
 import org.apache.hadoop.io.Writable;
 43  
 import org.apache.hadoop.io.WritableComparable;
 44  
 import org.apache.hadoop.mapreduce.InputSplit;
 45  
 import org.apache.hadoop.mapreduce.JobContext;
 46  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 47  
 import org.apache.log4j.Logger;
 48  
 
 49  
 /**
 50  
  *  Class which wraps the GoraInputFormat. It's designed
 51  
  *  as an extension point to EdgeInputFormat subclasses who wish
 52  
  *  to read from Gora data sources.
 53  
  *
 54  
  *  Works with
 55  
  *  {@link GoraVertexOutputFormat}
 56  
  *
 57  
  * @param <I> vertex id type
 58  
  * @param <E>  edge type
 59  
  */
 60  0
 public abstract class GoraEdgeInputFormat
 61  
   <I extends WritableComparable, E extends Writable>
 62  
   extends EdgeInputFormat<I, E> {
 63  
 
 64  
   /** Start key for querying Gora data store. */
 65  
   private static Object START_KEY;
 66  
 
 67  
   /** End key for querying Gora data store. */
 68  
   private static Object END_KEY;
 69  
 
 70  
   /** Logger for Gora's vertex input format. */
 71  0
   private static final Logger LOG =
 72  0
           Logger.getLogger(GoraEdgeInputFormat.class);
 73  
 
 74  
   /** KeyClass used for getting data. */
 75  
   private static Class<?> KEY_CLASS;
 76  
 
 77  
   /** The vertex itself will be used as a value inside Gora. */
 78  
   private static Class<? extends Persistent> PERSISTENT_CLASS;
 79  
 
 80  
   /** Data store class to be used as backend. */
 81  
   private static Class<? extends DataStore> DATASTORE_CLASS;
 82  
 
 83  
   /** Class used to transform strings into Keys */
 84  
   private static Class<?> KEY_FACTORY_CLASS;
 85  
 
 86  
   /** Data store used for querying data. */
 87  
   private static DataStore DATA_STORE;
 88  
 
 89  
   /** counter for iinput records */
 90  0
   private static int RECORD_COUNTER = 0;
 91  
 
 92  
   /** Delegate Gora input format */
 93  0
   private static ExtraGoraInputFormat GORA_INPUT_FORMAT =
 94  
          new ExtraGoraInputFormat();
 95  
 
 96  
   /**
 97  
    * @param conf configuration parameters
 98  
    */
 99  
   public void checkInputSpecs(Configuration conf) {
 100  0
     String sDataStoreType =
 101  0
         GIRAPH_GORA_DATASTORE_CLASS.get(getConf());
 102  0
     String sKeyType =
 103  0
         GIRAPH_GORA_KEY_CLASS.get(getConf());
 104  0
     String sPersistentType =
 105  0
         GIRAPH_GORA_PERSISTENT_CLASS.get(getConf());
 106  0
     String sKeyFactoryClass =
 107  0
         GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf());
 108  
     try {
 109  0
       Class<?> keyClass = Class.forName(sKeyType);
 110  0
       Class<?> persistentClass = Class.forName(sPersistentType);
 111  0
       Class<?> dataStoreClass = Class.forName(sDataStoreType);
 112  0
       Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass);
 113  0
       setKeyClass(keyClass);
 114  0
       setPersistentClass((Class<? extends Persistent>) persistentClass);
 115  0
       setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
 116  0
       setKeyFactoryClass(keyFactoryClass);
 117  0
       setDataStore(createDataStore(getConf()));
 118  0
       GORA_INPUT_FORMAT.setDataStore(getDataStore());
 119  0
     } catch (ClassNotFoundException e) {
 120  0
       LOG.error("Error while reading Gora Input parameters");
 121  0
       e.printStackTrace();
 122  0
     }
 123  0
   }
 124  
 
 125  
   /**
 126  
    * Gets the splits for a data store.
 127  
    * @param context JobContext
 128  
    * @param minSplitCountHint Hint for a minimum split count
 129  
    * @return A list of splits
 130  
    */
 131  
   @Override
 132  
   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
 133  
     throws IOException, InterruptedException {
 134  0
     KeyFactory kFact = null;
 135  
     try {
 136  0
       kFact = (KeyFactory) getKeyFactoryClass().newInstance();
 137  0
     } catch (InstantiationException e) {
 138  0
       LOG.error("Key factory was not instantiated. Please verify.");
 139  0
       LOG.error(e.getMessage());
 140  0
       e.printStackTrace();
 141  0
     } catch (IllegalAccessException e) {
 142  0
       LOG.error("Key factory was not instantiated. Please verify.");
 143  0
       LOG.error(e.getMessage());
 144  0
       e.printStackTrace();
 145  0
     }
 146  0
     String sKey = GIRAPH_GORA_START_KEY.get(getConf());
 147  0
     String eKey = GIRAPH_GORA_END_KEY.get(getConf());
 148  0
     if (sKey == null || sKey.isEmpty()) {
 149  0
       LOG.warn("No start key has been defined.");
 150  0
       LOG.warn("Querying all the data store.");
 151  0
       sKey = null;
 152  0
       eKey = null;
 153  
     }
 154  0
     kFact.setDataStore(getDataStore());
 155  0
     setStartKey(kFact.buildKey(sKey));
 156  0
     setEndKey(kFact.buildKey(eKey));
 157  0
     QueryBase tmpQuery = GoraUtils.getQuery(
 158  0
         getDataStore(), getStartKey(), getEndKey());
 159  0
     tmpQuery.setConf(context.getConfiguration());
 160  0
     GORA_INPUT_FORMAT.setQuery(tmpQuery);
 161  0
     List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context);
 162  0
     return splits;
 163  
   }
 164  
 
 165  
   @Override
 166  
   public abstract GoraEdgeReader createEdgeReader(InputSplit split,
 167  
       TaskAttemptContext context) throws IOException;
 168  
 
 169  
   /**
 170  
    * Abstract class to be implemented by the user based on their specific
 171  
    * vertex input. Easiest to ignore the key value separator and only use
 172  
    * key instead.
 173  
    */
 174  0
   protected abstract class GoraEdgeReader extends EdgeReader<I, E> {
 175  
     /** current edge obtained from Rexster */
 176  
     private Edge<I, E> edge;
 177  
     /** Results gotten from Gora data store. */
 178  
     private Result readResults;
 179  
 
 180  
     @Override
 181  
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
 182  
       throws IOException, InterruptedException {
 183  0
       getResults();
 184  0
       RECORD_COUNTER = 0;
 185  0
     }
 186  
 
 187  
     /**
 188  
      * Gets the next edge from Gora data store.
 189  
      * @return true/false depending on the existence of vertices.
 190  
      * @throws IOException exceptions passed along.
 191  
      * @throws InterruptedException exceptions passed along.
 192  
      */
 193  
     @Override
 194  
     // CHECKSTYLE: stop IllegalCatch
 195  
     public boolean nextEdge() throws IOException, InterruptedException {
 196  0
       boolean flg = false;
 197  
       try {
 198  0
         flg = this.getReadResults().next();
 199  0
         this.edge = transformEdge(this.getReadResults().get());
 200  0
         RECORD_COUNTER++;
 201  0
       } catch (Exception e) {
 202  0
         LOG.debug("Error transforming vertices.");
 203  0
         flg = false;
 204  0
       }
 205  0
       LOG.debug(RECORD_COUNTER + " were transformed.");
 206  0
       return flg;
 207  
     }
 208  
     // CHECKSTYLE: resume IllegalCatch
 209  
 
 210  
     /**
 211  
      * Gets the progress of reading results from Gora.
 212  
      * @return the progress of reading results from Gora.
 213  
      */
 214  
     @Override
 215  
     public float getProgress() throws IOException, InterruptedException {
 216  0
       float progress = 0.0f;
 217  0
       if (getReadResults() != null) {
 218  0
         progress = getReadResults().getProgress();
 219  
       }
 220  0
       return progress;
 221  
     }
 222  
 
 223  
     /**
 224  
      * Gets current edge.
 225  
      *
 226  
      * @return  The edge object represented by a Gora object
 227  
      */
 228  
     @Override
 229  
     public Edge<I, E> getCurrentEdge()
 230  
       throws IOException, InterruptedException {
 231  0
       return this.edge;
 232  
     }
 233  
 
 234  
     /**
 235  
      * Parser for a single Gora object
 236  
      *
 237  
      * @param   goraObject vertex represented as a GoraObject
 238  
      * @return  The edge object represented by a Gora object
 239  
      */
 240  
     protected abstract Edge<I, E> transformEdge(Object goraObject);
 241  
 
 242  
     /**
 243  
      * Performs a range query to a Gora data store.
 244  
      */
 245  
     protected void getResults() {
 246  0
       setReadResults(GoraUtils.getRequest(getDataStore(),
 247  0
           getStartKey(), getEndKey()));
 248  0
     }
 249  
 
 250  
     /**
 251  
      * Finishes the reading process.
 252  
      * @throws IOException
 253  
      */
 254  
     @Override
 255  
     public void close() throws IOException {
 256  0
     }
 257  
 
 258  
     /**
 259  
      * Gets the results read.
 260  
      * @return results read.
 261  
      */
 262  
     Result getReadResults() {
 263  0
       return readResults;
 264  
     }
 265  
 
 266  
     /**
 267  
      * Sets the results read.
 268  
      * @param readResults results read.
 269  
      */
 270  
     void setReadResults(Result readResults) {
 271  0
       this.readResults = readResults;
 272  0
     }
 273  
   }
 274  
 
 275  
   /**
 276  
    * Gets the data store object initialized.
 277  
    * @param conf Configuration
 278  
    * @return DataStore created
 279  
    */
 280  
   public DataStore createDataStore(Configuration conf) {
 281  0
     DataStore dsCreated = null;
 282  
     try {
 283  0
       dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
 284  0
           getKeyClass(), getPersistentClass());
 285  0
     } catch (GoraException e) {
 286  0
       LOG.error("Error creating data store.");
 287  0
       e.printStackTrace();
 288  0
     }
 289  0
     return dsCreated;
 290  
   }
 291  
 
 292  
   /**
 293  
    * Gets the persistent Class
 294  
    * @return persistentClass used
 295  
    */
 296  
   static Class<? extends Persistent> getPersistentClass() {
 297  0
     return PERSISTENT_CLASS;
 298  
   }
 299  
 
 300  
   /**
 301  
    * Sets the persistent Class
 302  
    * @param persistentClassUsed to be set
 303  
    */
 304  
   static void setPersistentClass
 305  
   (Class<? extends Persistent> persistentClassUsed) {
 306  0
     PERSISTENT_CLASS = persistentClassUsed;
 307  0
   }
 308  
 
 309  
   /**
 310  
    * Gets the key class used.
 311  
    * @return the key class used.
 312  
    */
 313  
   static Class<?> getKeyClass() {
 314  0
     return KEY_CLASS;
 315  
   }
 316  
 
 317  
   /**
 318  
    * Sets the key class used.
 319  
    * @param keyClassUsed key class used.
 320  
    */
 321  
   static void setKeyClass(Class<?> keyClassUsed) {
 322  0
     KEY_CLASS = keyClassUsed;
 323  0
   }
 324  
 
 325  
   /**
 326  
    * @return Class the DATASTORE_CLASS
 327  
    */
 328  
   public static Class<? extends DataStore> getDatastoreClass() {
 329  0
     return DATASTORE_CLASS;
 330  
   }
 331  
 
 332  
   /**
 333  
    * @param dataStoreClass the dataStore class to set
 334  
    */
 335  
   public static void setDatastoreClass(
 336  
       Class<? extends DataStore> dataStoreClass) {
 337  0
     DATASTORE_CLASS = dataStoreClass;
 338  0
   }
 339  
 
 340  
   /**
 341  
    * Gets the start key for querying.
 342  
    * @return the start key.
 343  
    */
 344  
   public Object getStartKey() {
 345  0
     return START_KEY;
 346  
   }
 347  
 
 348  
   /**
 349  
    * Gets the start key for querying.
 350  
    * @param startKey start key.
 351  
    */
 352  
   public static void setStartKey(Object startKey) {
 353  0
     START_KEY = startKey;
 354  0
   }
 355  
 
 356  
   /**
 357  
    * Gets the end key for querying.
 358  
    * @return the end key.
 359  
    */
 360  
   static Object getEndKey() {
 361  0
     return END_KEY;
 362  
   }
 363  
 
 364  
   /**
 365  
    * Sets the end key for querying.
 366  
    * @param pEndKey start key.
 367  
    */
 368  
   static void setEndKey(Object pEndKey) {
 369  0
     END_KEY = pEndKey;
 370  0
   }
 371  
 
 372  
   /**
 373  
    * Gets the key factory class.
 374  
    * @return the kEY_FACTORY_CLASS
 375  
    */
 376  
   static Class<?> getKeyFactoryClass() {
 377  0
     return KEY_FACTORY_CLASS;
 378  
   }
 379  
 
 380  
   /**
 381  
    * Sets the key factory class.
 382  
    * @param keyFactoryClass the keyFactoryClass to set.
 383  
    */
 384  
   static void setKeyFactoryClass(Class<?> keyFactoryClass) {
 385  0
     KEY_FACTORY_CLASS = keyFactoryClass;
 386  0
   }
 387  
 
 388  
   /**
 389  
    * Gets the data store.
 390  
    * @return DataStore
 391  
    */
 392  
   public static DataStore getDataStore() {
 393  0
     return DATA_STORE;
 394  
   }
 395  
 
 396  
   /**
 397  
    * Sets the data store
 398  
    * @param dStore the dATA_STORE to set
 399  
    */
 400  
   public static void setDataStore(DataStore dStore) {
 401  0
     DATA_STORE = dStore;
 402  0
   }
 403  
 
 404  
   /**
 405  
    * Returns a logger.
 406  
    * @return the log for the output format.
 407  
    */
 408  
   public static Logger getLogger() {
 409  0
     return LOG;
 410  
   }
 411  
 }