Coverage Report - org.apache.giraph.io.hcatalog.HCatalogVertexValueInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
HCatalogVertexValueInputFormat
0%
0/8
N/A
1.231
HCatalogVertexValueInputFormat$HCatalogVertexValueReader
0%
0/13
N/A
1.231
HCatalogVertexValueInputFormat$SingleRowHCatalogVertexValueReader
0%
0/3
N/A
1.231
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.hcatalog;
 20  
 
 21  
 import org.apache.giraph.io.VertexValueInputFormat;
 22  
 import org.apache.giraph.io.VertexValueReader;
 23  
 import org.apache.hadoop.io.Writable;
 24  
 import org.apache.hadoop.io.WritableComparable;
 25  
 import org.apache.hadoop.mapreduce.InputSplit;
 26  
 import org.apache.hadoop.mapreduce.JobContext;
 27  
 import org.apache.hadoop.mapreduce.RecordReader;
 28  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 29  
 import org.apache.hcatalog.data.HCatRecord;
 30  
 
 31  
 import java.io.IOException;
 32  
 import java.util.List;
 33  
 
 34  
 /**
 35  
  * HCatalog {@link VertexValueInputFormat} for reading vertex values from
 36  
  * Hive/Pig.
 37  
  *
 38  
  * @param <I> Vertex id
 39  
  * @param <V> Vertex value
 40  
  */
 41  0
 public abstract class HCatalogVertexValueInputFormat<I extends
 42  
     WritableComparable, V extends Writable>
 43  
     extends VertexValueInputFormat<I, V> {
 44  
   /**
 45  
    * HCatalog input format.
 46  
    */
 47  0
   private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
 48  
 
 49  
   @Override
 50  
   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
 51  
     throws IOException, InterruptedException {
 52  0
     return hCatInputFormat.getVertexSplits(context);
 53  
   }
 54  
 
 55  
   /**
 56  
    * {@link VertexValueReader} for {@link HCatalogVertexValueInputFormat}.
 57  
    */
 58  0
   protected abstract class HCatalogVertexValueReader
 59  
       extends VertexValueReader<I, V> {
 60  
     /** Internal {@link RecordReader}. */
 61  
     private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
 62  
     /** Context passed to initialize. */
 63  
     private TaskAttemptContext context;
 64  
 
 65  
     @Override
 66  
     public final void initialize(InputSplit inputSplit,
 67  
                                  TaskAttemptContext context)
 68  
       throws IOException, InterruptedException {
 69  0
       super.initialize(inputSplit, context);
 70  0
       hCatRecordReader =
 71  0
           hCatInputFormat.createVertexRecordReader(inputSplit, context);
 72  0
       hCatRecordReader.initialize(inputSplit, context);
 73  0
       this.context = context;
 74  0
     }
 75  
 
 76  
     @Override
 77  
     public boolean nextVertex() throws IOException, InterruptedException {
 78  0
       return hCatRecordReader.nextKeyValue();
 79  
     }
 80  
 
 81  
     @Override
 82  
     public final void close() throws IOException {
 83  0
       hCatRecordReader.close();
 84  0
     }
 85  
 
 86  
     @Override
 87  
     public final float getProgress() throws IOException, InterruptedException {
 88  0
       return hCatRecordReader.getProgress();
 89  
     }
 90  
 
 91  
     /**
 92  
      * Get the record reader.
 93  
      *
 94  
      * @return Record reader to be used for reading.
 95  
      */
 96  
     protected final RecordReader<WritableComparable, HCatRecord>
 97  
     getRecordReader() {
 98  0
       return hCatRecordReader;
 99  
     }
 100  
 
 101  
     /**
 102  
      * Get the context.
 103  
      *
 104  
      * @return Context passed to initialize.
 105  
      */
 106  
     protected final TaskAttemptContext getContext() {
 107  0
       return context;
 108  
     }
 109  
   }
 110  
 
 111  
   /**
 112  
    * Create {@link VertexValueReader}.
 113  
 
 114  
    * @return {@link HCatalogVertexValueReader} instance.
 115  
    */
 116  
   protected abstract HCatalogVertexValueReader createVertexValueReader();
 117  
 
 118  
   @Override
 119  
   public final VertexValueReader<I, V>
 120  
   createVertexValueReader(InputSplit split, TaskAttemptContext context)
 121  
     throws IOException {
 122  
     try {
 123  0
       HCatalogVertexValueReader reader = createVertexValueReader();
 124  0
       reader.initialize(split, context);
 125  0
       return reader;
 126  0
     } catch (InterruptedException e) {
 127  0
       throw new IllegalStateException(
 128  
           "createVertexValueReader: Interrupted creating reader.", e);
 129  
     }
 130  
   }
 131  
 
 132  
   /**
 133  
    * {@link HCatalogVertexValueReader} for tables holding a complete vertex
 134  
    * value in each row.
 135  
    */
 136  0
   protected abstract class SingleRowHCatalogVertexValueReader
 137  
       extends HCatalogVertexValueReader {
 138  
     /**
 139  
      * Get vertex id from a record.
 140  
      *
 141  
      * @param record Input record
 142  
      * @return I Vertex id
 143  
      */
 144  
     protected abstract I getVertexId(HCatRecord record);
 145  
 
 146  
     /**
 147  
      * Get vertex value from a record.
 148  
      *
 149  
      * @param record Input record
 150  
      * @return V Vertex value
 151  
      */
 152  
     protected abstract V getVertexValue(HCatRecord record);
 153  
 
 154  
     @Override
 155  
     public final I getCurrentVertexId() throws IOException,
 156  
         InterruptedException {
 157  0
       return getVertexId(getRecordReader().getCurrentValue());
 158  
     }
 159  
 
 160  
     @Override
 161  
     public final V getCurrentVertexValue() throws IOException,
 162  
         InterruptedException {
 163  0
       return getVertexValue(getRecordReader().getCurrentValue());
 164  
     }
 165  
   }
 166  
 }