Coverage Report - org.apache.giraph.io.gora.GoraEdgeOutputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
GoraEdgeOutputFormat
0%
0/26
N/A
1.13
GoraEdgeOutputFormat$1
N/A
N/A
1.13
GoraEdgeOutputFormat$GoraEdgeWriter
0%
0/29
0%
0/2
1.13
GoraEdgeOutputFormat$NullOutputCommitter
0%
0/6
N/A
1.13
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.io.gora;
 19  
 
 20  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
 21  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
 22  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
 23  
 
 24  
 import java.io.IOException;
 25  
 
 26  
 import org.apache.giraph.edge.Edge;
 27  
 import org.apache.giraph.io.EdgeOutputFormat;
 28  
 import org.apache.giraph.io.EdgeWriter;
 29  
 import org.apache.giraph.io.gora.utils.GoraUtils;
 30  
 import org.apache.gora.persistency.Persistent;
 31  
 import org.apache.gora.store.DataStore;
 32  
 import org.apache.gora.util.GoraException;
 33  
 import org.apache.hadoop.conf.Configuration;
 34  
 import org.apache.hadoop.io.Writable;
 35  
 import org.apache.hadoop.io.WritableComparable;
 36  
 import org.apache.hadoop.mapreduce.JobContext;
 37  
 import org.apache.hadoop.mapreduce.OutputCommitter;
 38  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 39  
 import org.apache.log4j.Logger;
 40  
 
 41  
 /**
 42  
  *  Class which wraps the GoraInputFormat. It's designed
 43  
  *  as an extension point to EdgeOutputFormat subclasses who wish
 44  
  *  to write to Gora data sources.
 45  
  *
 46  
  *  Works with
 47  
  *  {@link GoraEdgeInputFormat}
 48  
  *
 49  
  * @param <I> edge id type
 50  
  * @param <V>  vertex type
 51  
  * @param <E>  edge type
 52  
  */
 53  0
 public abstract class GoraEdgeOutputFormat<I extends WritableComparable,
 54  
   V extends Writable, E extends Writable>
 55  
   extends EdgeOutputFormat<I, V, E> {
 56  
 
 57  
   /** Logger for Gora's vertex input format. */
 58  0
   private static final Logger LOG =
 59  0
           Logger.getLogger(GoraEdgeOutputFormat.class);
 60  
 
 61  
   /** KeyClass used for getting data. */
 62  
   private static Class<?> KEY_CLASS;
 63  
 
 64  
   /** The vertex itself will be used as a value inside Gora. */
 65  
   private static Class<? extends Persistent> PERSISTENT_CLASS;
 66  
 
 67  
   /** Data store class to be used as backend. */
 68  
   private static Class<? extends DataStore> DATASTORE_CLASS;
 69  
 
 70  
   /** Data store used for querying data. */
 71  
   private static DataStore DATA_STORE;
 72  
 
 73  
   /**
 74  
    * checkOutputSpecs
 75  
    *
 76  
    * @param context information about the job
 77  
    * @throws IOException
 78  
    * @throws InterruptedException
 79  
    */
 80  
   @Override
 81  
   public void checkOutputSpecs(JobContext context)
 82  
     throws IOException, InterruptedException {
 83  0
   }
 84  
 
 85  
   /**
 86  
    * Gets the data store object initialized.
 87  
    * @param conf Configuration
 88  
    * @return DataStore created
 89  
    */
 90  
   public DataStore createDataStore(Configuration conf) {
 91  0
     DataStore dsCreated = null;
 92  
     try {
 93  0
       dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
 94  0
           getKeyClass(), getPersistentClass());
 95  0
     } catch (GoraException e) {
 96  0
       getLogger().error("Error creating data store.");
 97  0
       e.printStackTrace();
 98  0
     }
 99  0
     return dsCreated;
 100  
   }
 101  
 
 102  
   @Override
 103  
   public abstract GoraEdgeWriter
 104  
   createEdgeWriter(TaskAttemptContext context)
 105  
     throws IOException, InterruptedException;
 106  
 
 107  
   /**
 108  
    * getOutputCommitter
 109  
    *
 110  
    * @param context the task context
 111  
    * @return OutputCommitter
 112  
    * @throws IOException
 113  
    * @throws InterruptedException
 114  
    */
 115  
   @Override
 116  
   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
 117  
     throws IOException, InterruptedException {
 118  0
     return new NullOutputCommitter();
 119  
   }
 120  
 
 121  
   /**
 122  
    * Empty output commiter for hadoop.
 123  
    */
 124  0
   private static class NullOutputCommitter extends OutputCommitter {
 125  
     @Override
 126  0
     public void abortTask(TaskAttemptContext arg0) throws IOException {    }
 127  
 
 128  
     @Override
 129  0
     public void commitTask(TaskAttemptContext arg0) throws IOException {    }
 130  
 
 131  
     @Override
 132  
     public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
 133  0
       return false;
 134  
     }
 135  
 
 136  
     @Override
 137  0
     public void setupJob(JobContext arg0) throws IOException {    }
 138  
 
 139  
     @Override
 140  0
     public void setupTask(TaskAttemptContext arg0) throws IOException {    }
 141  
   }
 142  
 
 143  
   /**
 144  
    * Abstract class to be implemented by the user based on their specific
 145  
    * vertex/edges output.
 146  
    */
 147  0
   protected abstract class GoraEdgeWriter extends EdgeWriter<I, V, E> {
 148  
     @Override
 149  
     public void initialize(TaskAttemptContext context) throws IOException,
 150  
       InterruptedException {
 151  0
       String sDataStoreType =
 152  0
           GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf());
 153  0
       String sKeyType =
 154  0
           GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf());
 155  0
       String sPersistentType =
 156  0
           GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf());
 157  
       try {
 158  0
         Class<?> keyClass = Class.forName(sKeyType);
 159  0
         Class<?> persistentClass = Class.forName(sPersistentType);
 160  0
         Class<?> dataStoreClass = Class.forName(sDataStoreType);
 161  0
         setKeyClass(keyClass);
 162  0
         setPersistentClass((Class<? extends Persistent>) persistentClass);
 163  0
         setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
 164  0
         setDataStore(createDataStore(context.getConfiguration()));
 165  0
         if (getDataStore() != null) {
 166  0
           getLogger().debug("The data store has been created.");
 167  
         }
 168  0
       } catch (ClassNotFoundException e) {
 169  0
         getLogger().error("Error while reading Gora Output parameters");
 170  0
         e.printStackTrace();
 171  0
       }
 172  0
     }
 173  
 
 174  
     @Override
 175  
     public void close(TaskAttemptContext context)
 176  
       throws IOException, InterruptedException {
 177  0
       getDataStore().flush();
 178  0
       getDataStore().close();
 179  0
     }
 180  
 
 181  
     @Override
 182  
     public void writeEdge(I srcId, V srcValue, Edge<I, E> edge)
 183  
       throws IOException, InterruptedException {
 184  0
       Persistent goraEdge = null;
 185  0
       Object goraKey = getGoraKey(srcId, srcValue, edge);
 186  0
       goraEdge = getGoraEdge(srcId, srcValue, edge);
 187  0
       getDataStore().put(goraKey, goraEdge);
 188  0
     }
 189  
 
 190  
     /**
 191  
      * Each edge needs to be transformed into a Gora object to be sent to
 192  
      * a specific data store.
 193  
      *
 194  
      * @param  edge   edge to be transformed into a Gora object
 195  
      * @param  srcId  source vertex id
 196  
      * @param  srcValue  source vertex value
 197  
      * @return          Gora representation of the vertex
 198  
      */
 199  
     protected abstract Persistent getGoraEdge
 200  
       (I srcId, V srcValue, Edge<I, E> edge);
 201  
 
 202  
     /**
 203  
      * Gets the correct key from a computed vertex.
 204  
      * @param edge  edge to extract the key from.
 205  
      * @param  srcId  source vertex id
 206  
      * @param  srcValue  source vertex value
 207  
      * @return      The key representing such edge.
 208  
      */
 209  
     protected abstract Object getGoraKey(I srcId, V srcValue, Edge<I, E> edge);
 210  
   }
 211  
 
 212  
   /**
 213  
    * Gets the data store.
 214  
    * @return DataStore
 215  
    */
 216  
   public static DataStore getDataStore() {
 217  0
     return DATA_STORE;
 218  
   }
 219  
 
 220  
   /**
 221  
    * Sets the data store
 222  
    * @param dStore the dATA_STORE to set
 223  
    */
 224  
   public static void setDataStore(DataStore dStore) {
 225  0
     DATA_STORE = dStore;
 226  0
   }
 227  
 
 228  
   /**
 229  
    * Gets the persistent Class
 230  
    * @return persistentClass used
 231  
    */
 232  
   static Class<? extends Persistent> getPersistentClass() {
 233  0
     return PERSISTENT_CLASS;
 234  
   }
 235  
 
 236  
   /**
 237  
    * Sets the persistent Class
 238  
    * @param persistentClassUsed to be set
 239  
    */
 240  
   static void setPersistentClass
 241  
   (Class<? extends Persistent> persistentClassUsed) {
 242  0
     PERSISTENT_CLASS = persistentClassUsed;
 243  0
   }
 244  
 
 245  
   /**
 246  
    * Gets the key class used.
 247  
    * @return the key class used.
 248  
    */
 249  
   static Class<?> getKeyClass() {
 250  0
     return KEY_CLASS;
 251  
   }
 252  
 
 253  
   /**
 254  
    * Sets the key class used.
 255  
    * @param keyClassUsed key class used.
 256  
    */
 257  
   static void setKeyClass(Class<?> keyClassUsed) {
 258  0
     KEY_CLASS = keyClassUsed;
 259  0
   }
 260  
 
 261  
   /**
 262  
    * @return Class the DATASTORE_CLASS
 263  
    */
 264  
   public static Class<? extends DataStore> getDatastoreClass() {
 265  0
     return DATASTORE_CLASS;
 266  
   }
 267  
 
 268  
   /**
 269  
    * @param dataStoreClass the dataStore class to set
 270  
    */
 271  
   public static void setDatastoreClass(
 272  
       Class<? extends DataStore> dataStoreClass) {
 273  0
     DATASTORE_CLASS = dataStoreClass;
 274  0
   }
 275  
 
 276  
   /**
 277  
    * Gets the logger for the class.
 278  
    * @return the log of the class.
 279  
    */
 280  
   public static Logger getLogger() {
 281  0
     return LOG;
 282  
   }
 283  
 }