Coverage Report - org.apache.giraph.io.hcatalog.HCatalogEdgeInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
HCatalogEdgeInputFormat
0%
0/9
N/A
1.136
HCatalogEdgeInputFormat$HCatalogEdgeReader
0%
0/14
N/A
1.136
HCatalogEdgeInputFormat$SingleRowHCatalogEdgeNoValueReader
0%
0/6
N/A
1.136
HCatalogEdgeInputFormat$SingleRowHCatalogEdgeReader
0%
0/7
N/A
1.136
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.hcatalog;
 20  
 
 21  
 import java.io.IOException;
 22  
 import java.util.List;
 23  
 import org.apache.giraph.edge.Edge;
 24  
 import org.apache.giraph.edge.EdgeFactory;
 25  
 import org.apache.giraph.io.EdgeInputFormat;
 26  
 import org.apache.giraph.io.EdgeReader;
 27  
 import org.apache.hadoop.io.NullWritable;
 28  
 import org.apache.hadoop.io.Writable;
 29  
 import org.apache.hadoop.io.WritableComparable;
 30  
 import org.apache.hadoop.mapreduce.InputSplit;
 31  
 import org.apache.hadoop.mapreduce.JobContext;
 32  
 import org.apache.hadoop.mapreduce.RecordReader;
 33  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 34  
 import org.apache.hcatalog.data.HCatRecord;
 35  
 
 36  
 /**
 37  
  * HCatalog {@link EdgeInputFormat} for reading edges from Hive/Pig.
 38  
  *
 39  
  * @param <I> Vertex id
 40  
  * @param <E> Edge value
 41  
  */
 42  0
 public abstract class HCatalogEdgeInputFormat<
 43  
     I extends WritableComparable,
 44  
     E extends Writable>
 45  
     extends EdgeInputFormat<I, E> {
 46  
   /**
 47  
    * HCatalog input format.
 48  
    */
 49  0
   private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
 50  
 
 51  
   @Override
 52  
   public final List<InputSplit> getSplits(JobContext context,
 53  
                                           int minSplitCountHint)
 54  
     throws IOException, InterruptedException {
 55  0
     return hCatInputFormat.getEdgeSplits(context);
 56  
   }
 57  
 
 58  
   /**
 59  
    * Get underlying HCatalog input format. Used for creating readers.
 60  
    *
 61  
    * @return GiraphHCatInputFormat stored.
 62  
    */
 63  
   protected GiraphHCatInputFormat getHCatInputFormat() {
 64  0
     return hCatInputFormat;
 65  
   }
 66  
 
 67  
   /**
 68  
    * {@link EdgeReader} for {@link HCatalogEdgeInputFormat}.
 69  
    */
 70  
   protected abstract static class HCatalogEdgeReader<
 71  
       I extends WritableComparable, E extends Writable>
 72  
       extends EdgeReader<I, E> {
 73  
     /** HCatalog input format to use */
 74  
     private final GiraphHCatInputFormat hCatInputFormat;
 75  
     /** Internal {@link RecordReader}. */
 76  
     private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
 77  
     /** Context passed to initialize. */
 78  
     private TaskAttemptContext context;
 79  
 
 80  
     /**
 81  
      * Constructor taking hcat input format to use.
 82  
      *
 83  
      * @param hCatInputFormat HCatalog input format
 84  
      */
 85  0
     public HCatalogEdgeReader(GiraphHCatInputFormat hCatInputFormat) {
 86  0
       this.hCatInputFormat = hCatInputFormat;
 87  0
     }
 88  
 
 89  
     @Override
 90  
     public final void initialize(InputSplit inputSplit,
 91  
                                  TaskAttemptContext context)
 92  
       throws IOException, InterruptedException {
 93  0
       hCatRecordReader =
 94  0
           hCatInputFormat.createEdgeRecordReader(inputSplit, context);
 95  0
       hCatRecordReader.initialize(inputSplit, context);
 96  0
       this.context = context;
 97  0
     }
 98  
 
 99  
     @Override
 100  
     public boolean nextEdge() throws IOException, InterruptedException {
 101  0
       return hCatRecordReader.nextKeyValue();
 102  
     }
 103  
 
 104  
     @Override
 105  
     public final void close() throws IOException {
 106  0
       hCatRecordReader.close();
 107  0
     }
 108  
 
 109  
     @Override
 110  
     public final float getProgress() throws IOException, InterruptedException {
 111  0
       return hCatRecordReader.getProgress();
 112  
     }
 113  
 
 114  
     /**
 115  
      * Get the record reader.
 116  
      *
 117  
      * @return Record reader to be used for reading.
 118  
      */
 119  
     protected final RecordReader<WritableComparable, HCatRecord>
 120  
     getRecordReader() {
 121  0
       return hCatRecordReader;
 122  
     }
 123  
 
 124  
     /**
 125  
      * Get the context.
 126  
      *
 127  
      * @return Context passed to initialize.
 128  
      */
 129  
     protected final TaskAttemptContext getContext() {
 130  0
       return context;
 131  
     }
 132  
   }
 133  
 
 134  
   /**
 135  
    * Create {@link EdgeReader}.
 136  
    *
 137  
    * @return {@link HCatalogEdgeReader} instance.
 138  
    */
 139  
   protected abstract HCatalogEdgeReader<I, E> createEdgeReader();
 140  
 
 141  
   @Override
 142  
   public EdgeReader<I, E>
 143  
   createEdgeReader(InputSplit split, TaskAttemptContext context)
 144  
     throws IOException {
 145  
     try {
 146  0
       HCatalogEdgeReader reader = createEdgeReader();
 147  0
       reader.initialize(split, context);
 148  0
       return reader;
 149  0
     } catch (InterruptedException e) {
 150  0
       throw new IllegalStateException(
 151  
           "createEdgeReader: Interrupted creating reader.", e);
 152  
     }
 153  
   }
 154  
 
 155  
   /**
 156  
    * {@link HCatalogEdgeReader} for tables holding a complete edge
 157  
    * in each row.
 158  
    */
 159  
   protected abstract static class SingleRowHCatalogEdgeReader<
 160  
       I extends WritableComparable, E extends Writable>
 161  
       extends HCatalogEdgeReader<I, E> {
 162  
     /**
 163  
      * Constructor
 164  
      * @param hCatInputFormat giraph input format to use
 165  
      */
 166  
     public SingleRowHCatalogEdgeReader(GiraphHCatInputFormat hCatInputFormat) {
 167  0
       super(hCatInputFormat);
 168  0
     }
 169  
 
 170  
     /**
 171  
      * Get source vertex id from a record.
 172  
      *
 173  
      * @param record Input record
 174  
      * @return I Source vertex id
 175  
      */
 176  
     protected abstract I getSourceVertexId(HCatRecord record);
 177  
 
 178  
     /**
 179  
      * Get target vertex id from a record.
 180  
      *
 181  
      * @param record Input record
 182  
      * @return I Target vertex id
 183  
      */
 184  
     protected abstract I getTargetVertexId(HCatRecord record);
 185  
 
 186  
     /**
 187  
      * Get edge value from a record.
 188  
      *
 189  
      * @param record Input record
 190  
      * @return E Edge value
 191  
      */
 192  
     protected abstract E getEdgeValue(HCatRecord record);
 193  
 
 194  
     @Override
 195  
     public I getCurrentSourceId() throws IOException, InterruptedException {
 196  0
       HCatRecord record = getRecordReader().getCurrentValue();
 197  0
       return getSourceVertexId(record);
 198  
     }
 199  
 
 200  
     @Override
 201  
     public Edge<I, E> getCurrentEdge() throws IOException,
 202  
         InterruptedException {
 203  0
       HCatRecord record = getRecordReader().getCurrentValue();
 204  0
       return EdgeFactory.create(getTargetVertexId(record),
 205  0
           getEdgeValue(record));
 206  
     }
 207  
   }
 208  
 
 209  
   /**
 210  
    * {@link HCatalogEdgeReader} for tables holding a complete edge
 211  
    * in each row where the edges contain no data other than IDs they point to.
 212  
    */
 213  
   protected abstract static class SingleRowHCatalogEdgeNoValueReader<
 214  
       I extends WritableComparable>
 215  
       extends HCatalogEdgeReader<I, NullWritable> {
 216  
     /**
 217  
      * Constructor
 218  
      * @param hCatInputFormat giraph input format to use
 219  
      */
 220  
     public SingleRowHCatalogEdgeNoValueReader(
 221  
         GiraphHCatInputFormat hCatInputFormat) {
 222  0
       super(hCatInputFormat);
 223  0
     }
 224  
 
 225  
     /**
 226  
      * Get source vertex id from a record.
 227  
      *
 228  
      * @param record Input record
 229  
      * @return I Source vertex id
 230  
      */
 231  
     protected abstract I getSourceVertexId(HCatRecord record);
 232  
 
 233  
     /**
 234  
      * Get target vertex id from a record.
 235  
      *
 236  
      * @param record Input record
 237  
      * @return I Target vertex id
 238  
      */
 239  
     protected abstract I getTargetVertexId(HCatRecord record);
 240  
 
 241  
     @Override
 242  
     public I getCurrentSourceId() throws IOException, InterruptedException {
 243  0
       HCatRecord record = getRecordReader().getCurrentValue();
 244  0
       return getSourceVertexId(record);
 245  
     }
 246  
 
 247  
     @Override
 248  
     public Edge<I, NullWritable> getCurrentEdge() throws IOException,
 249  
         InterruptedException {
 250  0
       HCatRecord record = getRecordReader().getCurrentValue();
 251  0
       return EdgeFactory.create(getTargetVertexId(record));
 252  
     }
 253  
   }
 254  
 }