Coverage Report - org.apache.giraph.io.gora.GoraVertexOutputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
GoraVertexOutputFormat
0%
0/26
N/A
1.217
GoraVertexOutputFormat$1
N/A
N/A
1.217
GoraVertexOutputFormat$GoraVertexWriter
0%
0/38
0%
0/6
1.217
GoraVertexOutputFormat$NullOutputCommitter
0%
0/6
N/A
1.217
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.io.gora;
 19  
 
 20  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
 21  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
 22  
 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
 23  
 
 24  
 import java.io.IOException;
 25  
 
 26  
 import org.apache.giraph.graph.Vertex;
 27  
 import org.apache.giraph.io.VertexOutputFormat;
 28  
 import org.apache.giraph.io.VertexWriter;
 29  
 import org.apache.giraph.io.gora.utils.GoraUtils;
 30  
 import org.apache.gora.persistency.Persistent;
 31  
 import org.apache.gora.store.DataStore;
 32  
 import org.apache.gora.util.GoraException;
 33  
 import org.apache.hadoop.conf.Configuration;
 34  
 import org.apache.hadoop.io.Writable;
 35  
 import org.apache.hadoop.io.WritableComparable;
 36  
 import org.apache.hadoop.mapreduce.JobContext;
 37  
 import org.apache.hadoop.mapreduce.OutputCommitter;
 38  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 39  
 import org.apache.log4j.Logger;
 40  
 import org.apache.zookeeper.WatchedEvent;
 41  
 import org.apache.zookeeper.Watcher;
 42  
 import org.apache.zookeeper.Watcher.Event.EventType;
 43  
 /**
 44  
  *
 45  
  *  Class which wraps the GoraOutputFormat. It's designed
 46  
  *  as an extension point to VertexOutputFormat subclasses who wish
 47  
  *  to write vertices back to an Accumulo table.
 48  
  *
 49  
  *  Works with
 50  
  *  {@link GoraVertexInputFormat}
 51  
  *
 52  
  *
 53  
  * @param <I> vertex id type
 54  
  * @param <V>  vertex value type
 55  
  * @param <E>  edge type
 56  
  */
 57  0
 public abstract class GoraVertexOutputFormat<
 58  
         I extends WritableComparable,
 59  
         V extends Writable,
 60  
         E extends Writable>
 61  
         extends VertexOutputFormat<I, V, E> {
 62  
 
 63  
   /** Logger for Gora's vertex input format. */
 64  0
   private static final Logger LOG =
 65  0
         Logger.getLogger(GoraVertexOutputFormat.class);
 66  
 
 67  
   /** KeyClass used for getting data. */
 68  
   private static Class<?> KEY_CLASS;
 69  
 
 70  
   /** The vertex itself will be used as a value inside Gora. */
 71  
   private static Class<? extends Persistent> PERSISTENT_CLASS;
 72  
 
 73  
   /** Data store class to be used as backend. */
 74  
   private static Class<? extends DataStore> DATASTORE_CLASS;
 75  
 
 76  
   /** Data store used for querying data. */
 77  
   private static DataStore DATA_STORE;
 78  
 
 79  
   /**
 80  
    * checkOutputSpecs
 81  
    *
 82  
    * @param context information about the job
 83  
    * @throws IOException
 84  
    * @throws InterruptedException
 85  
    */
 86  
   @Override
 87  
   public void checkOutputSpecs(JobContext context)
 88  
     throws IOException, InterruptedException {
 89  0
   }
 90  
 
 91  
   /**
 92  
    * Gets the data store object initialized.
 93  
    * @param conf Configuration.
 94  
    * @return DataStore created
 95  
    */
 96  
   public DataStore createDataStore(Configuration conf) {
 97  0
     DataStore dsCreated = null;
 98  
     try {
 99  0
       dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
 100  0
           getKeyClass(), getPersistentClass());
 101  0
     } catch (GoraException e) {
 102  0
       getLogger().error("Error creating data store.");
 103  0
       e.printStackTrace();
 104  0
     }
 105  0
     return dsCreated;
 106  
   }
 107  
 
 108  
   /**
 109  
    * getOutputCommitter
 110  
    *
 111  
    * @param context the task context
 112  
    * @return OutputCommitter
 113  
    * @throws IOException
 114  
    * @throws InterruptedException
 115  
    */
 116  
   @Override
 117  
   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
 118  
     throws IOException, InterruptedException {
 119  0
     return new NullOutputCommitter();
 120  
   }
 121  
 
 122  
   /**
 123  
    * Empty output commiter for hadoop.
 124  
    */
 125  0
   private static class NullOutputCommitter extends OutputCommitter {
 126  
     @Override
 127  0
     public void abortTask(TaskAttemptContext arg0) throws IOException {    }
 128  
 
 129  
     @Override
 130  0
     public void commitTask(TaskAttemptContext arg0) throws IOException {    }
 131  
 
 132  
     @Override
 133  
     public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
 134  0
       return false;
 135  
     }
 136  
 
 137  
     @Override
 138  0
     public void setupJob(JobContext arg0) throws IOException {    }
 139  
 
 140  
     @Override
 141  0
     public void setupTask(TaskAttemptContext arg0) throws IOException {    }
 142  
   }
 143  
 
 144  
   /**
 145  
    * Abstract class to be implemented by the user based on their specific
 146  
    * vertex/edges output. Easiest to ignore the key value separator and only
 147  
    * use key instead.
 148  
    */
 149  0
   protected abstract class GoraVertexWriter
 150  
     extends VertexWriter<I, V, E>
 151  
     implements Watcher {
 152  
     /** lock for management of the barrier */
 153  0
     private final Object lock = new Object();
 154  
 
 155  
     @Override
 156  
     public void initialize(TaskAttemptContext context)
 157  
       throws IOException, InterruptedException {
 158  0
       String sDataStoreType =
 159  0
         GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf());
 160  0
       String sKeyType =
 161  0
         GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf());
 162  0
       String sPersistentType =
 163  0
         GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf());
 164  
       try {
 165  0
         Class<?> keyClass = Class.forName(sKeyType);
 166  0
         Class<?> persistentClass = Class.forName(sPersistentType);
 167  0
         Class<?> dataStoreClass = Class.forName(sDataStoreType);
 168  0
         setKeyClass(keyClass);
 169  0
         setPersistentClass((Class<? extends Persistent>) persistentClass);
 170  0
         setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
 171  0
         setDataStore(createDataStore(context.getConfiguration()));
 172  0
         if (getDataStore() != null) {
 173  0
           getLogger().info("The output data store has been created.");
 174  
         }
 175  0
       } catch (ClassNotFoundException e) {
 176  0
         getLogger().error("Error while reading Gora Output parameters");
 177  0
         e.printStackTrace();
 178  0
       }
 179  0
     }
 180  
 
 181  
     @Override
 182  
     public void close(TaskAttemptContext context)
 183  
       throws IOException, InterruptedException {
 184  0
       getDataStore().flush();
 185  0
       getDataStore().close();
 186  0
     }
 187  
 
 188  
     @Override
 189  
     public void writeVertex(Vertex<I, V, E> vertex)
 190  
       throws IOException, InterruptedException {
 191  0
       Persistent goraVertex = null;
 192  0
       Object goraKey = getGoraKey(vertex);
 193  0
       goraVertex = getGoraVertex(vertex);
 194  0
       getDataStore().put(goraKey, goraVertex);
 195  0
     }
 196  
 
 197  
     @Override
 198  
     public void process(WatchedEvent event) {
 199  0
       EventType type = event.getType();
 200  0
       if (type == EventType.NodeChildrenChanged) {
 201  0
         if (getLogger().isDebugEnabled()) {
 202  0
           getLogger().debug("signal: number of children changed.");
 203  
         }
 204  0
         synchronized (lock) {
 205  0
           lock.notify();
 206  0
         }
 207  
       }
 208  0
     }
 209  
 
 210  
     /**
 211  
      * Each vertex needs to be transformed into a Gora object to be sent to
 212  
      * a specific data store.
 213  
      *
 214  
      * @param  vertex   vertex to be transformed into a Gora object
 215  
      * @return          Gora representation of the vertex
 216  
      */
 217  
     protected abstract Persistent getGoraVertex(Vertex<I, V, E> vertex);
 218  
 
 219  
     /**
 220  
      * Gets the correct key from a computed vertex.
 221  
      * @param vertex  vertex to extract the key from.
 222  
      * @return        The key representing such vertex
 223  
      */
 224  
     protected abstract Object getGoraKey(Vertex<I, V, E> vertex);
 225  
 
 226  
   }
 227  
 
 228  
   /**
 229  
    * Gets the data store.
 230  
    * @return DataStore
 231  
    */
 232  
   public static DataStore getDataStore() {
 233  0
     return DATA_STORE;
 234  
   }
 235  
 
 236  
   /**
 237  
    * Sets the data store
 238  
    * @param dStore the dATA_STORE to set
 239  
    */
 240  
   public static void setDataStore(DataStore dStore) {
 241  0
     DATA_STORE = dStore;
 242  0
   }
 243  
 
 244  
   /**
 245  
    * Gets the persistent Class
 246  
    * @return persistentClass used
 247  
    */
 248  
   static Class<? extends Persistent> getPersistentClass() {
 249  0
     return PERSISTENT_CLASS;
 250  
   }
 251  
 
 252  
   /**
 253  
    * Sets the persistent Class
 254  
    * @param persistentClassUsed to be set
 255  
    */
 256  
   static void setPersistentClass
 257  
   (Class<? extends Persistent> persistentClassUsed) {
 258  0
     PERSISTENT_CLASS = persistentClassUsed;
 259  0
   }
 260  
 
 261  
   /**
 262  
    * Gets the key class used.
 263  
    * @return the key class used.
 264  
    */
 265  
   static Class<?> getKeyClass() {
 266  0
     return KEY_CLASS;
 267  
   }
 268  
 
 269  
   /**
 270  
    * Sets the key class used.
 271  
    * @param keyClassUsed key class used.
 272  
    */
 273  
   static void setKeyClass(Class<?> keyClassUsed) {
 274  0
     KEY_CLASS = keyClassUsed;
 275  0
   }
 276  
 
 277  
   /**
 278  
    * @return Class the DATASTORE_CLASS
 279  
    */
 280  
   public static Class<? extends DataStore> getDatastoreClass() {
 281  0
     return DATASTORE_CLASS;
 282  
   }
 283  
 
 284  
   /**
 285  
    * @param dataStoreClass the dataStore class to set
 286  
    */
 287  
   public static void setDatastoreClass(
 288  
       Class<? extends DataStore> dataStoreClass) {
 289  0
     DATASTORE_CLASS = dataStoreClass;
 290  0
   }
 291  
 
 292  
   /**
 293  
    * Returns a logger.
 294  
    * @return the log for the output format.
 295  
    */
 296  
   public static Logger getLogger() {
 297  0
     return LOG;
 298  
   }
 299  
 }