Coverage Report - org.apache.giraph.io.gora.GoraVertexInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
GoraVertexInputFormat
0%
0/84
0%
0/4
1.296
GoraVertexInputFormat$GoraVertexReader
0%
0/27
0%
0/2
1.296
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.io.gora;
 19  
 
 20  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
 21  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
 22  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
 23  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
 24  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
 25  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
 26  
 
 27  
 import java.io.IOException;
 28  
 import java.util.List;
 29  
 
 30  
 import org.apache.giraph.graph.Vertex;
 31  
 import org.apache.giraph.io.VertexInputFormat;
 32  
 import org.apache.giraph.io.VertexReader;
 33  
 import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat;
 34  
 import org.apache.giraph.io.gora.utils.GoraUtils;
 35  
 import org.apache.giraph.io.gora.utils.KeyFactory;
 36  
 import org.apache.gora.persistency.Persistent;
 37  
 import org.apache.gora.query.Result;
 38  
 import org.apache.gora.query.impl.QueryBase;
 39  
 import org.apache.gora.store.DataStore;
 40  
 import org.apache.gora.util.GoraException;
 41  
 import org.apache.hadoop.conf.Configuration;
 42  
 import org.apache.hadoop.io.Writable;
 43  
 import org.apache.hadoop.io.WritableComparable;
 44  
 import org.apache.hadoop.mapreduce.InputSplit;
 45  
 import org.apache.hadoop.mapreduce.JobContext;
 46  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 47  
 import org.apache.log4j.Logger;
 48  
 
 49  
 /**
 50  
  *  Class which wraps the GoraInputFormat. It's designed
 51  
  *  as an extension point to VertexInputFormat subclasses who wish
 52  
  *  to read from Gora data sources.
 53  
  *
 54  
  *  Works with
 55  
  *  {@link GoraVertexOutputFormat}
 56  
  *
 57  
  * @param <I> vertex id type
 58  
  * @param <V>  vertex value type
 59  
  * @param <E>  edge type
 60  
  */
 61  0
 public abstract class GoraVertexInputFormat<
 62  
         I extends WritableComparable,
 63  
         V extends Writable,
 64  
         E extends Writable>
 65  
         extends VertexInputFormat<I, V, E> {
 66  
 
 67  
   /** Start key for querying Gora data store. */
 68  
   private static Object START_KEY;
 69  
 
 70  
   /** End key for querying Gora data store. */
 71  
   private static Object END_KEY;
 72  
 
 73  
   /** Logger for Gora's vertex input format. */
 74  0
   private static final Logger LOG =
 75  0
           Logger.getLogger(GoraVertexInputFormat.class);
 76  
 
 77  
   /** KeyClass used for getting data. */
 78  
   private static Class<?> KEY_CLASS;
 79  
 
 80  
   /** The vertex itself will be used as a value inside Gora. */
 81  
   private static Class<? extends Persistent> PERSISTENT_CLASS;
 82  
 
 83  
   /** Data store class to be used as backend. */
 84  
   private static Class<? extends DataStore> DATASTORE_CLASS;
 85  
 
 86  
   /** Class used to transform strings into Keys */
 87  
   private static Class<?> KEY_FACTORY_CLASS;
 88  
 
 89  
   /** Data store used for querying data. */
 90  
   private static DataStore DATA_STORE;
 91  
 
 92  
   /** counter for iinput records */
 93  0
   private static int RECORD_COUNTER = 0;
 94  
 
 95  
   /** Delegate Gora input format */
 96  0
   private static ExtraGoraInputFormat GORA_INPUT_FORMAT =
 97  
          new ExtraGoraInputFormat();
 98  
 
 99  
   /** @param conf configuration parameters */
 100  
   public void checkInputSpecs(Configuration conf) {
 101  0
     String sDataStoreType =
 102  0
         GIRAPH_GORA_DATASTORE_CLASS.get(getConf());
 103  0
     String sKeyType =
 104  0
         GIRAPH_GORA_KEY_CLASS.get(getConf());
 105  0
     String sPersistentType =
 106  0
         GIRAPH_GORA_PERSISTENT_CLASS.get(getConf());
 107  0
     String sKeyFactoryClass =
 108  0
         GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf());
 109  
     try {
 110  0
       Class<?> keyClass = Class.forName(sKeyType);
 111  0
       Class<?> persistentClass = Class.forName(sPersistentType);
 112  0
       Class<?> dataStoreClass = Class.forName(sDataStoreType);
 113  0
       Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass);
 114  0
       setKeyClass(keyClass);
 115  0
       setPersistentClass((Class<? extends Persistent>) persistentClass);
 116  0
       setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
 117  0
       setKeyFactoryClass(keyFactoryClass);
 118  0
       setDataStore(createDataStore(conf));
 119  0
       GORA_INPUT_FORMAT.setDataStore(getDataStore());
 120  0
     } catch (ClassNotFoundException e) {
 121  0
       LOG.error("Error while reading Gora Input parameters");
 122  0
       e.printStackTrace();
 123  0
     }
 124  0
   }
 125  
 
 126  
   /**
 127  
    * Create a vertex reader for a given split. Guaranteed to have been
 128  
    * configured with setConf() prior to use. The framework will also call
 129  
    * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
 130  
    * the split is used.
 131  
    *
 132  
    * @param split the split to be read
 133  
    * @param context the information about the task
 134  
    * @return a new record reader
 135  
    * @throws IOException
 136  
    */
 137  
   public abstract GoraVertexReader createVertexReader(InputSplit split,
 138  
     TaskAttemptContext context) throws IOException;
 139  
 
 140  
   /**
 141  
    * Gets the splits for a data store.
 142  
    * @param context JobContext
 143  
    * @param minSplitCountHint Hint for a minimum split count
 144  
    * @return A list of splits
 145  
    */
 146  
   @Override
 147  
   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
 148  
     throws IOException, InterruptedException {
 149  0
     KeyFactory kFact = null;
 150  
     try {
 151  0
       kFact = (KeyFactory) getKeyFactoryClass().newInstance();
 152  0
       kFact.setDataStore(getDataStore());
 153  0
     } catch (InstantiationException e) {
 154  0
       LOG.error("Key factory was not instantiated. Please verify.");
 155  0
       LOG.error(e.getMessage());
 156  0
       e.printStackTrace();
 157  0
     } catch (IllegalAccessException e) {
 158  0
       LOG.error("Key factory was not instantiated. Please verify.");
 159  0
       LOG.error(e.getMessage());
 160  0
       e.printStackTrace();
 161  0
     }
 162  0
     String sKey = GIRAPH_GORA_START_KEY.get(getConf());
 163  0
     String eKey = GIRAPH_GORA_END_KEY.get(getConf());
 164  0
     if (sKey == null || sKey.isEmpty()) {
 165  0
       LOG.warn("No start key has been defined.");
 166  0
       LOG.warn("Querying all the data store.");
 167  0
       sKey = null;
 168  0
       eKey = null;
 169  
     } else {
 170  0
       setStartKey(kFact.buildKey(sKey));
 171  0
       setEndKey(kFact.buildKey(eKey));
 172  
     }
 173  0
     QueryBase tmpQuery = GoraUtils.getQuery(
 174  0
         getDataStore(), getStartKey(), getEndKey());
 175  0
     tmpQuery.setConf(getConf());
 176  0
     GORA_INPUT_FORMAT.setQuery(tmpQuery);
 177  0
     List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context);
 178  0
     return splits;
 179  
   }
 180  
 
 181  
   /**
 182  
    * Gets the data store object initialized.
 183  
    * @param conf Configuration
 184  
    * @return DataStore created
 185  
    */
 186  
   public DataStore createDataStore(Configuration conf) {
 187  0
     DataStore dsCreated = null;
 188  
     try {
 189  0
       dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
 190  0
           getKeyClass(), getPersistentClass());
 191  0
     } catch (GoraException e) {
 192  0
       LOG.error("Error creating data store.");
 193  0
       e.printStackTrace();
 194  0
     }
 195  0
     return dsCreated;
 196  
   }
 197  
 
 198  
   /**
 199  
    * Abstract class to be implemented by the user based on their specific
 200  
    * vertex input. Easiest to ignore the key value separator and only use
 201  
    * key instead.
 202  
    */
 203  0
   protected abstract class GoraVertexReader extends VertexReader<I, V, E> {
 204  
     /** Current vertex */
 205  
     private Vertex<I, V, E> vertex;
 206  
     /** Results gotten from Gora data store. */
 207  
     private Result readResults;
 208  
 
 209  
     @Override
 210  
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
 211  
       throws IOException, InterruptedException {
 212  0
       getResults();
 213  0
       RECORD_COUNTER = 0;
 214  0
     }
 215  
 
 216  
     /**
 217  
      * Gets the next vertex from Gora data store.
 218  
      * @return true/false depending on the existence of vertices.
 219  
      * @throws IOException exceptions passed along.
 220  
      * @throws InterruptedException exceptions passed along.
 221  
      */
 222  
     @Override
 223  
     // CHECKSTYLE: stop IllegalCatch
 224  
     public boolean nextVertex() throws IOException, InterruptedException {
 225  0
       boolean flg = false;
 226  
       try {
 227  0
         flg = this.getReadResults().next();
 228  0
         this.vertex = transformVertex(this.getReadResults().get());
 229  0
         RECORD_COUNTER++;
 230  0
       } catch (Exception e) {
 231  0
         LOG.error("Error transforming vertices.");
 232  0
         LOG.error(e.getMessage());
 233  0
         flg = false;
 234  0
       }
 235  0
       LOG.debug(RECORD_COUNTER + " were transformed.");
 236  0
       return flg;
 237  
     }
 238  
     // CHECKSTYLE: resume IllegalCatch
 239  
 
 240  
     /**
 241  
      * Gets the progress of reading results from Gora.
 242  
      * @return the progress of reading results from Gora.
 243  
      */
 244  
     @Override
 245  
     public float getProgress() throws IOException, InterruptedException {
 246  0
       float progress = 0.0f;
 247  0
       if (getReadResults() != null) {
 248  0
         progress = getReadResults().getProgress();
 249  
       }
 250  0
       return progress;
 251  
     }
 252  
 
 253  
     /**
 254  
      * Gets current vertex.
 255  
      *
 256  
      * @return  The vertex object represented by a Gora object
 257  
      */
 258  
     @Override
 259  
     public Vertex<I, V, E> getCurrentVertex()
 260  
       throws IOException, InterruptedException {
 261  0
       return this.vertex;
 262  
     }
 263  
 
 264  
     /**
 265  
      * Parser for a single Gora object
 266  
      *
 267  
      * @param   goraObject vertex represented as a GoraObject
 268  
      * @return  The vertex object represented by a Gora object
 269  
      */
 270  
     protected abstract Vertex<I, V, E> transformVertex(Object goraObject);
 271  
 
 272  
     /**
 273  
      * Performs a range query to a Gora data store.
 274  
      */
 275  
     protected void getResults() {
 276  0
       setReadResults(GoraUtils.getRequest(getDataStore(),
 277  0
           getStartKey(), getEndKey()));
 278  0
     }
 279  
 
 280  
     /**
 281  
      * Finishes the reading process.
 282  
      * @throws IOException
 283  
      */
 284  
     @Override
 285  
     public void close() throws IOException {
 286  0
     }
 287  
 
 288  
     /**
 289  
      * Gets the results read.
 290  
      * @return results read.
 291  
      */
 292  
     Result getReadResults() {
 293  0
       return readResults;
 294  
     }
 295  
 
 296  
     /**
 297  
      * Sets the results read.
 298  
      * @param readResults results read.
 299  
      */
 300  
     void setReadResults(Result readResults) {
 301  0
       this.readResults = readResults;
 302  0
     }
 303  
   }
 304  
 
 305  
   /**
 306  
    * Gets the persistent Class
 307  
    * @return persistentClass used
 308  
    */
 309  
   static Class<? extends Persistent> getPersistentClass() {
 310  0
     return PERSISTENT_CLASS;
 311  
   }
 312  
 
 313  
   /**
 314  
    * Sets the persistent Class
 315  
    * @param persistentClassUsed to be set
 316  
    */
 317  
   static void setPersistentClass
 318  
   (Class<? extends Persistent> persistentClassUsed) {
 319  0
     PERSISTENT_CLASS = persistentClassUsed;
 320  0
   }
 321  
 
 322  
   /**
 323  
    * Gets the key class used.
 324  
    * @return the key class used.
 325  
    */
 326  
   static Class<?> getKeyClass() {
 327  0
     return KEY_CLASS;
 328  
   }
 329  
 
 330  
   /**
 331  
    * Sets the key class used.
 332  
    * @param keyClassUsed key class used.
 333  
    */
 334  
   static void setKeyClass(Class<?> keyClassUsed) {
 335  0
     KEY_CLASS = keyClassUsed;
 336  0
   }
 337  
 
 338  
   /**
 339  
    * @return Class the DATASTORE_CLASS
 340  
    */
 341  
   public static Class<? extends DataStore> getDatastoreClass() {
 342  0
     return DATASTORE_CLASS;
 343  
   }
 344  
 
 345  
   /**
 346  
    * @param dataStoreClass the dataStore class to set
 347  
    */
 348  
   public static void setDatastoreClass(
 349  
       Class<? extends DataStore> dataStoreClass) {
 350  0
     DATASTORE_CLASS = dataStoreClass;
 351  0
   }
 352  
 
 353  
   /**
 354  
    * Gets the start key for querying.
 355  
    * @return the start key.
 356  
    */
 357  
   public Object getStartKey() {
 358  0
     return START_KEY;
 359  
   }
 360  
 
 361  
   /**
 362  
    * Gets the start key for querying.
 363  
    * @param startKey start key.
 364  
    */
 365  
   public static void setStartKey(Object startKey) {
 366  0
     START_KEY = startKey;
 367  0
   }
 368  
 
 369  
   /**
 370  
    * Gets the end key for querying.
 371  
    * @return the end key.
 372  
    */
 373  
   static Object getEndKey() {
 374  0
     return END_KEY;
 375  
   }
 376  
 
 377  
   /**
 378  
    * Sets the end key for querying.
 379  
    * @param pEndKey start key.
 380  
    */
 381  
   static void setEndKey(Object pEndKey) {
 382  0
     END_KEY = pEndKey;
 383  0
   }
 384  
 
 385  
   /**
 386  
    * Gets the key factory class.
 387  
    * @return the kEY_FACTORY_CLASS
 388  
    */
 389  
   static Class<?> getKeyFactoryClass() {
 390  0
     return KEY_FACTORY_CLASS;
 391  
   }
 392  
 
 393  
   /**
 394  
    * Sets the key factory class.
 395  
    * @param keyFactoryClass the keyFactoryClass to set.
 396  
    */
 397  
   static void setKeyFactoryClass(Class<?> keyFactoryClass) {
 398  0
     KEY_FACTORY_CLASS = keyFactoryClass;
 399  0
   }
 400  
 
 401  
   /**
 402  
    * Gets the data store.
 403  
    * @return DataStore
 404  
    */
 405  
   public static DataStore getDataStore() {
 406  0
     return DATA_STORE;
 407  
   }
 408  
 
 409  
   /**
 410  
    * Sets the data store
 411  
    * @param dStore the dATA_STORE to set
 412  
    */
 413  
   public static void setDataStore(DataStore dStore) {
 414  0
     DATA_STORE = dStore;
 415  0
   }
 416  
 }