Coverage Report - org.apache.giraph.io.gora.utils.ExtraGoraInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
ExtraGoraInputFormat
0%
0/26
0%
0/2
1.111
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.io.gora.utils;
 19  
 
 20  
 import java.io.IOException;
 21  
 import java.util.ArrayList;
 22  
 import java.util.List;
 23  
 
 24  
 import org.apache.gora.mapreduce.GoraInputSplit;
 25  
 import org.apache.gora.mapreduce.GoraMapReduceUtils;
 26  
 import org.apache.gora.mapreduce.GoraRecordReader;
 27  
 import org.apache.gora.persistency.Persistent;
 28  
 import org.apache.gora.persistency.impl.PersistentBase;
 29  
 import org.apache.gora.query.PartitionQuery;
 30  
 import org.apache.gora.query.Query;
 31  
 import org.apache.gora.query.impl.PartitionQueryImpl;
 32  
 import org.apache.gora.store.DataStore;
 33  
 import org.apache.gora.util.IOUtils;
 34  
 import org.apache.hadoop.conf.Configuration;
 35  
 import org.apache.hadoop.mapreduce.InputFormat;
 36  
 import org.apache.hadoop.mapreduce.InputSplit;
 37  
 import org.apache.hadoop.mapreduce.Job;
 38  
 import org.apache.hadoop.mapreduce.JobContext;
 39  
 import org.apache.hadoop.mapreduce.RecordReader;
 40  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 41  
 
 42  
 /**
 43  
  * InputFormat to fetch the input from Gora data stores. The
 44  
  * query to fetch the items from the datastore should be prepared and
 45  
  * set via setQuery(Job, Query), before submitting the job.
 46  
  *
 47  
  * Hadoop jobs can be either configured through static
 48  
  *<code>setInput()</code> methods, or from GoraMapper.
 49  
  * @param <K> KeyClass.
 50  
  * @param <T> PersistentClass.
 51  
  */
 52  0
 public class ExtraGoraInputFormat<K, T extends PersistentBase>
 53  
   extends InputFormat<K, T> {
 54  
 
 55  
   /**
 56  
    * String used to map partitioned queries into configuration object.
 57  
    */
 58  
   public static final String QUERY_KEY = "gora.inputformat.query";
 59  
 
 60  
   /**
 61  
    * Data store to be used.
 62  
    */
 63  
   private DataStore<K, T> dataStore;
 64  
 
 65  
   /**
 66  
    * Query to be performed.
 67  
    */
 68  
   private Query<K, T> query;
 69  
 
 70  
   /**
 71  
    * @param split InputSplit to be used.
 72  
    * @param context JobContext to be used.
 73  
    * @return RecordReader record reader used inside Hadoop job.
 74  
    */
 75  
   @Override
 76  
   @SuppressWarnings("unchecked")
 77  
   public RecordReader<K, T> createRecordReader(InputSplit split,
 78  
       TaskAttemptContext context) throws IOException, InterruptedException {
 79  
 
 80  0
     PartitionQuery<K, T> partitionQuery = (PartitionQuery<K, T>)
 81  0
         ((GoraInputSplit) split).getQuery();
 82  
 
 83  
     //setInputPath(partitionQuery, context);
 84  0
     return new GoraRecordReader<K, T>(partitionQuery, context);
 85  
   }
 86  
 
 87  
   /**
 88  
    * Gets splits.
 89  
    * @param context for the job.
 90  
    * @return splits found
 91  
    */
 92  
   @Override
 93  
   public List<InputSplit> getSplits(JobContext context) throws IOException,
 94  
       InterruptedException {
 95  0
     List<PartitionQuery<K, T>> queries =
 96  0
         getDataStore().getPartitions(getQuery());
 97  0
     List<InputSplit> splits = new ArrayList<InputSplit>(queries.size());
 98  0
     for (PartitionQuery<K, T> partQuery : queries) {
 99  0
       ((PartitionQueryImpl) partQuery).setConf(context.getConfiguration());
 100  0
       splits.add(new GoraInputSplit(context.getConfiguration(), partQuery));
 101  0
     }
 102  0
     return splits;
 103  
   }
 104  
 
 105  
   /**
 106  
    * @return the dataStore
 107  
    */
 108  
   public DataStore<K, T> getDataStore() {
 109  0
     return dataStore;
 110  
   }
 111  
 
 112  
   /**
 113  
    * @param datStore the dataStore to set
 114  
    */
 115  
   public void setDataStore(DataStore<K, T> datStore) {
 116  0
     this.dataStore = datStore;
 117  0
   }
 118  
 
 119  
   /**
 120  
    * @return the query
 121  
    */
 122  
   public Query<K, T> getQuery() {
 123  0
     return query;
 124  
   }
 125  
 
 126  
   /**
 127  
    * @param query the query to set
 128  
    */
 129  
   public void setQuery(Query<K, T> query) {
 130  0
     this.query = query;
 131  0
   }
 132  
 
 133  
   /**
 134  
    * Sets the partitioned query inside the job object.
 135  
    * @param conf Configuration used.
 136  
    * @param query Query to be executed.
 137  
    * @param <K> Key class
 138  
    * @param <T> Persistent class
 139  
    * @throws IOException Exception that be might thrown.
 140  
    */
 141  
   public static <K, T extends Persistent> void setQuery(Configuration conf,
 142  
       Query<K, T> query) throws IOException {
 143  0
     IOUtils.storeToConf(query, conf, QUERY_KEY);
 144  0
   }
 145  
 
 146  
   /**
 147  
    * Gets the partitioned query from the conf object passed.
 148  
    * @param conf Configuration object.
 149  
    * @return passed inside the configuration object
 150  
    * @throws IOException Exception that might be thrown.
 151  
    */
 152  
   public Query<K, T> getQuery(Configuration conf) throws IOException {
 153  0
     return IOUtils.loadFromConf(conf, QUERY_KEY);
 154  
   }
 155  
 
 156  
   /**
 157  
    * Sets the input parameters for the job
 158  
    * @param job the job to set the properties for
 159  
    * @param query the query to get the inputs from
 160  
    * @param dataStore the datastore as the input
 161  
    * @param reuseObjects whether to reuse objects in serialization
 162  
    * @param <K> Key class
 163  
    * @param <V> Persistent class
 164  
    * @throws IOException
 165  
    */
 166  
   public static <K, V extends Persistent> void setInput(Job job,
 167  
   Query<K, V> query, DataStore<K, V> dataStore, boolean reuseObjects)
 168  
     throws IOException {
 169  
 
 170  0
     Configuration conf = job.getConfiguration();
 171  
 
 172  0
     GoraMapReduceUtils.setIOSerializations(conf, reuseObjects);
 173  
 
 174  0
     job.setInputFormatClass(ExtraGoraInputFormat.class);
 175  0
     ExtraGoraInputFormat.setQuery(job.getConfiguration(), query);
 176  0
   }
 177  
 }