Coverage Report - org.apache.giraph.io.hcatalog.HCatalogVertexInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
HCatalogVertexInputFormat
0%
0/9
N/A
1.619
HCatalogVertexInputFormat$HCatalogVertexReader
0%
0/12
N/A
1.619
HCatalogVertexInputFormat$MultiRowHCatalogVertexReader
0%
0/35
0%
0/12
1.619
HCatalogVertexInputFormat$SingleRowHCatalogVertexReader
0%
0/20
0%
0/4
1.619
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.hcatalog;
 20  
 
 21  
 import org.apache.giraph.edge.Edge;
 22  
 import org.apache.giraph.edge.EdgeFactory;
 23  
 import org.apache.giraph.graph.Vertex;
 24  
 import org.apache.giraph.io.VertexInputFormat;
 25  
 import org.apache.giraph.io.VertexReader;
 26  
 import org.apache.giraph.utils.TimedLogger;
 27  
 import org.apache.hadoop.io.Writable;
 28  
 import org.apache.hadoop.io.WritableComparable;
 29  
 import org.apache.hadoop.mapreduce.InputSplit;
 30  
 import org.apache.hadoop.mapreduce.JobContext;
 31  
 import org.apache.hadoop.mapreduce.RecordReader;
 32  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 33  
 import org.apache.hcatalog.data.HCatRecord;
 34  
 import org.apache.log4j.Logger;
 35  
 
 36  
 import com.google.common.collect.Lists;
 37  
 
 38  
 import java.io.IOException;
 39  
 import java.util.List;
 40  
 
 41  
 /**
 42  
  * Abstract class that users should subclass to load data from a Hive or Pig
 43  
  * table. You can easily implement a {@link HCatalogVertexReader} by extending
 44  
  * either {@link SingleRowHCatalogVertexReader} or
 45  
  * {@link MultiRowHCatalogVertexReader} depending on how data for each vertex is
 46  
  * stored in the input table.
 47  
  * <p>
 48  
  * The desired database and table name to load from can be specified via
 49  
  * {@link GiraphHCatInputFormat#setVertexInput(org.apache.hadoop.mapreduce.Job,
 50  
  * org.apache.hcatalog.mapreduce.InputJobInfo)}
 51  
  * as you setup your vertex input format with {@link
 52  
  * org.apache.giraph.conf.GiraphConfiguration#setVertexInputFormatClass(Class)
 53  
  * }.
 54  
  *
 55  
  * @param <I> Vertex id
 56  
  * @param <V> Vertex value
 57  
  * @param <E> Edge value
 58  
  */
 59  
 
 60  
 @SuppressWarnings("rawtypes")
 61  0
 public abstract class HCatalogVertexInputFormat<
 62  
     I extends WritableComparable,
 63  
     V extends Writable,
 64  
     E extends Writable>
 65  
     extends VertexInputFormat<I, V, E> {
 66  
   /**
 67  
    * HCatalog input format.
 68  
    */
 69  0
   private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
 70  
 
 71  
   @Override
 72  
   public final List<InputSplit> getSplits(
 73  
       final JobContext context, final int minSplitCountHint)
 74  
     throws IOException, InterruptedException {
 75  0
     return hCatInputFormat.getVertexSplits(context);
 76  
   }
 77  
 
 78  
   /**
 79  
    * Abstract class that users should subclass
 80  
    * based on their specific vertex
 81  
    * input. HCatRecord can be parsed to get the
 82  
    * required data for implementing
 83  
    * getCurrentVertex(). If the vertex spans more
 84  
    * than one HCatRecord,
 85  
    * nextVertex() should be overwritten to handle that logic as well.
 86  
    */
 87  0
   protected abstract class HCatalogVertexReader
 88  
       extends VertexReader<I, V, E> {
 89  
     /** Internal HCatRecordReader. */
 90  
     private RecordReader<WritableComparable,
 91  
         HCatRecord> hCatRecordReader;
 92  
     /** Context passed to initialize. */
 93  
     private TaskAttemptContext context;
 94  
 
 95  
     /**
 96  
      * Initialize with the HCatRecordReader.
 97  
      *
 98  
      * @param recordReader internal reader
 99  
      */
 100  
     private void initialize(
 101  
         final RecordReader<
 102  
             WritableComparable, HCatRecord>
 103  
             recordReader) {
 104  0
       this.hCatRecordReader = recordReader;
 105  0
     }
 106  
 
 107  
     @Override
 108  
     public final void initialize(
 109  
         final InputSplit inputSplit,
 110  
         final TaskAttemptContext ctxt)
 111  
       throws IOException, InterruptedException {
 112  0
       hCatRecordReader.initialize(inputSplit, ctxt);
 113  0
       this.context = ctxt;
 114  0
     }
 115  
 
 116  
     @Override
 117  
     public boolean nextVertex() throws IOException, InterruptedException {
 118  
       // Users can override this if desired,
 119  
       // and a vertex is bigger than
 120  
       // a single row.
 121  0
       return hCatRecordReader.nextKeyValue();
 122  
     }
 123  
 
 124  
     @Override
 125  
     public final void close() throws IOException {
 126  0
       hCatRecordReader.close();
 127  0
     }
 128  
 
 129  
     @Override
 130  
     public final float getProgress() throws IOException, InterruptedException {
 131  0
       return hCatRecordReader.getProgress();
 132  
     }
 133  
 
 134  
     /**
 135  
      * Get the record reader.
 136  
      * @return Record reader to be used for reading.
 137  
      */
 138  
     protected final RecordReader<WritableComparable, HCatRecord>
 139  
     getRecordReader() {
 140  0
       return hCatRecordReader;
 141  
     }
 142  
 
 143  
     /**
 144  
      * Get the context.
 145  
      *
 146  
      *
 147  
      *
 148  
      * @return Context passed to initialize.
 149  
      */
 150  
     protected final TaskAttemptContext getContext() {
 151  0
       return context;
 152  
     }
 153  
   }
 154  
 
 155  
   /**
 156  
    * create vertex reader instance.
 157  
    * @return HCatalogVertexReader
 158  
    */
 159  
   protected abstract HCatalogVertexReader createVertexReader();
 160  
 
 161  
   @Override
 162  
   public final VertexReader<I, V, E>
 163  
   createVertexReader(final InputSplit split,
 164  
                      final TaskAttemptContext context)
 165  
     throws IOException {
 166  
     try {
 167  0
       HCatalogVertexReader reader = createVertexReader();
 168  0
       reader.initialize(hCatInputFormat.
 169  0
           createVertexRecordReader(split, context));
 170  0
       return reader;
 171  0
     } catch (InterruptedException e) {
 172  0
       throw new IllegalStateException(
 173  
           "createVertexReader: " +
 174  
               "Interrupted creating reader.", e);
 175  
     }
 176  
   }
 177  
 
 178  
   /**
 179  
    * HCatalogVertexReader for tables holding
 180  
    * complete vertex info within each
 181  
    * row.
 182  
    */
 183  0
   protected abstract class SingleRowHCatalogVertexReader
 184  
       extends HCatalogVertexReader {
 185  
     /**
 186  
      * 1024 const.
 187  
      */
 188  
     private static final int BYTE_CONST = 1024;
 189  
     /**
 190  
      *  logger
 191  
      */
 192  0
     private final Logger log =
 193  0
         Logger.getLogger(SingleRowHCatalogVertexReader.class);
 194  
     /**
 195  
      * record count.
 196  
      */
 197  0
     private int recordCount = 0;
 198  
     /**
 199  
      * modulus check counter.
 200  
      */
 201  0
     private final int recordModLimit = 1000;
 202  
     /**
 203  
      * Timed logger to print every 30 seconds
 204  
      */
 205  0
     private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
 206  
         log);
 207  
 
 208  
     /**
 209  
      * get vertex id.
 210  
      * @param record hcat record
 211  
      * @return I id
 212  
      */
 213  
     protected abstract I getVertexId(HCatRecord record);
 214  
 
 215  
     /**
 216  
      * get vertex value.
 217  
      * @param record hcat record
 218  
      * @return V value
 219  
      */
 220  
     protected abstract V getVertexValue(HCatRecord record);
 221  
 
 222  
     /**
 223  
      * get edges.
 224  
      * @param record hcat record
 225  
      * @return Edges
 226  
      */
 227  
     protected abstract Iterable<Edge<I, E>> getEdges(HCatRecord record);
 228  
 
 229  
     @Override
 230  
     public final Vertex<I, V, E> getCurrentVertex()
 231  
       throws IOException, InterruptedException {
 232  0
       HCatRecord record = getRecordReader().getCurrentValue();
 233  0
       Vertex<I, V, E> vertex = getConf().createVertex();
 234  0
       vertex.initialize(getVertexId(record), getVertexValue(record),
 235  0
           getEdges(record));
 236  0
       ++recordCount;
 237  0
       if (log.isInfoEnabled() &&
 238  
           ((recordCount % recordModLimit) == 0)) {
 239  
         // memory usage
 240  0
         Runtime runtime = Runtime.getRuntime();
 241  0
         double gb = BYTE_CONST *
 242  
             BYTE_CONST *
 243  
             BYTE_CONST;
 244  0
         timedLogger.info(
 245  
             "read " + recordCount + " records. Memory: " +
 246  0
             (runtime.totalMemory() / gb) +
 247  
             "GB total = " +
 248  0
             ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
 249  0
             "GB used + " + (runtime.freeMemory() / gb) +
 250  0
             "GB free, " + (runtime.maxMemory() / gb) + "GB max");
 251  
       }
 252  0
       return vertex;
 253  
     }
 254  
   }
 255  
   /**
 256  
    * HCatalogVertexReader for tables
 257  
    * holding vertex info across multiple rows
 258  
    * sorted by vertex id column,
 259  
    * so that they appear consecutively to the
 260  
    * RecordReader.
 261  
    */
 262  0
   protected abstract class MultiRowHCatalogVertexReader extends
 263  
       HCatalogVertexReader {
 264  
     /**
 265  
      * modulus check counter.
 266  
      */
 267  
     private static final int RECORD_MOD_LIMIT = 1000;
 268  
     /**
 269  
      *  logger
 270  
      */
 271  0
     private final Logger log =
 272  0
         Logger.getLogger(MultiRowHCatalogVertexReader.class);
 273  
     /**
 274  
      * current vertex id.
 275  
      */
 276  0
     private I currentVertexId = null;
 277  
     /**
 278  
      * current vertex edges.
 279  
      */
 280  0
     private List<Edge<I, E>> currentEdges = Lists.newLinkedList();
 281  
     /**
 282  
      * record for vertex.
 283  
      */
 284  0
     private List<HCatRecord> recordsForVertex = Lists.newArrayList();
 285  
     /**
 286  
      * record count.
 287  
      */
 288  0
     private int recordCount = 0;
 289  
     /**
 290  
      * vertex.
 291  
      */
 292  0
     private Vertex<I, V, E> vertex = null;
 293  
     /**
 294  
      * Timed logger to print every 30 seconds
 295  
      */
 296  0
     private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
 297  
         log);
 298  
 
 299  
 
 300  
     /**
 301  
      * get vertex id from record.
 302  
      *
 303  
      * @param record hcat
 304  
      * @return I vertex id
 305  
      */
 306  
     protected abstract I getVertexId(HCatRecord record);
 307  
 
 308  
     /**
 309  
      * get vertex value from record.
 310  
      * @param records all vertex values
 311  
      * @return V iterable of record values
 312  
      */
 313  
     protected abstract V getVertexValue(
 314  
         Iterable<HCatRecord> records);
 315  
 
 316  
     /**
 317  
      * get target vertex id from record.
 318  
      *
 319  
      * @param record hcat
 320  
      * @return I vertex id of target.
 321  
      */
 322  
     protected abstract I getTargetVertexId(HCatRecord record);
 323  
 
 324  
     /**
 325  
      * get edge value from record.
 326  
      *
 327  
      * @param record hcat.
 328  
      * @return E edge value.
 329  
      */
 330  
     protected abstract E getEdgeValue(HCatRecord record);
 331  
 
 332  
     @Override
 333  
     public final Vertex<I, V, E>
 334  
     getCurrentVertex() throws IOException, InterruptedException {
 335  0
       return vertex;
 336  
     }
 337  
 
 338  
     @Override
 339  
     public boolean nextVertex() throws IOException, InterruptedException {
 340  0
       while (getRecordReader().nextKeyValue()) {
 341  0
         HCatRecord record = getRecordReader().getCurrentValue();
 342  0
         if (currentVertexId == null) {
 343  0
           currentVertexId = getVertexId(record);
 344  
         }
 345  0
         if (currentVertexId.equals(getVertexId(record))) {
 346  0
           currentEdges.add(EdgeFactory.create(getTargetVertexId(record),
 347  0
               getEdgeValue(record)));
 348  0
           recordsForVertex.add(record);
 349  
         } else {
 350  0
           createCurrentVertex();
 351  0
           if (log.isInfoEnabled() && (recordCount % RECORD_MOD_LIMIT) == 0) {
 352  0
             timedLogger.info("read " + recordCount);
 353  
           }
 354  0
           currentVertexId = getVertexId(record);
 355  0
           recordsForVertex.add(record);
 356  0
           return true;
 357  
         }
 358  0
       }
 359  
 
 360  0
       if (currentEdges.isEmpty()) {
 361  0
         return false;
 362  
       } else {
 363  0
         createCurrentVertex();
 364  0
         return true;
 365  
       }
 366  
     }
 367  
 
 368  
     /**
 369  
      * create current vertex.
 370  
      */
 371  
     private void createCurrentVertex() {
 372  0
       vertex = getConf().createVertex();
 373  0
       vertex.initialize(currentVertexId, getVertexValue(recordsForVertex),
 374  
           currentEdges);
 375  0
       currentEdges.clear();
 376  0
       recordsForVertex.clear();
 377  0
       ++recordCount;
 378  0
     }
 379  
   }
 380  
 }