Coverage Report - org.apache.giraph.io.formats.TextVertexInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
TextVertexInputFormat
0%
0/5
N/A
1.185
TextVertexInputFormat$TextVertexReader
0%
0/11
N/A
1.185
TextVertexInputFormat$TextVertexReaderFromEachLine
0%
0/6
N/A
1.185
TextVertexInputFormat$TextVertexReaderFromEachLineProcessed
0%
0/8
N/A
1.185
TextVertexInputFormat$TextVertexReaderFromEachLineProcessedHandlingExceptions
0%
0/15
N/A
1.185
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.formats;
 20  
 
 21  
 import org.apache.giraph.edge.Edge;
 22  
 import org.apache.giraph.graph.Vertex;
 23  
 import org.apache.giraph.io.VertexInputFormat;
 24  
 import org.apache.giraph.io.VertexReader;
 25  
 import org.apache.hadoop.conf.Configuration;
 26  
 import org.apache.hadoop.io.LongWritable;
 27  
 import org.apache.hadoop.io.Text;
 28  
 import org.apache.hadoop.io.Writable;
 29  
 import org.apache.hadoop.io.WritableComparable;
 30  
 import org.apache.hadoop.mapreduce.InputSplit;
 31  
 import org.apache.hadoop.mapreduce.JobContext;
 32  
 import org.apache.hadoop.mapreduce.RecordReader;
 33  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 34  
 
 35  
 import java.io.IOException;
 36  
 import java.util.List;
 37  
 
 38  
 /**
 39  
  * Abstract class that users should subclass to use their own text based
 40  
  * vertex input format.
 41  
  *
 42  
  * @param <I> Vertex index value
 43  
  * @param <V> Vertex value
 44  
  * @param <E> Edge value
 45  
  */
 46  0
 @SuppressWarnings("rawtypes")
 47  0
 public abstract class TextVertexInputFormat<I extends WritableComparable,
 48  
     V extends Writable, E extends Writable>
 49  
     extends VertexInputFormat<I, V, E> {
 50  
   /** Uses the GiraphTextInputFormat to do everything */
 51  0
   protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
 52  
 
 53  0
   @Override public void checkInputSpecs(Configuration conf) { }
 54  
 
 55  
   @Override
 56  
   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
 57  
     throws IOException, InterruptedException {
 58  
     // Ignore the hint of numWorkers here since we are using
 59  
     // GiraphTextInputFormat to do this for us
 60  0
     return textInputFormat.getVertexSplits(context);
 61  
   }
 62  
 
 63  
   /**
 64  
    * The factory method which produces the {@link TextVertexReader} used by this
 65  
    * input format.
 66  
    *
 67  
    * @param split
 68  
    *          the split to be read
 69  
    * @param context
 70  
    *          the information about the task
 71  
    * @return
 72  
    *         the text vertex reader to be used
 73  
    */
 74  
   @Override
 75  
   public abstract TextVertexReader createVertexReader(InputSplit split,
 76  
       TaskAttemptContext context) throws IOException;
 77  
 
 78  
   /**
 79  
    * Abstract class to be implemented by the user based on their specific
 80  
    * vertex input. Easiest to ignore the key value separator and only use
 81  
    * key instead.
 82  
    *
 83  
    * When reading a vertex from each line, extend
 84  
    * {@link TextVertexReaderFromEachLine}. If you need to preprocess each line
 85  
    * first, then extend {@link TextVertexReaderFromEachLineProcessed}. If you
 86  
    * need common exception handling while preprocessing, then extend
 87  
    * {@link TextVertexReaderFromEachLineProcessedHandlingExceptions}.
 88  
    */
 89  0
   protected abstract class TextVertexReader extends VertexReader<I, V, E> {
 90  
     /** Internal line record reader */
 91  
     private RecordReader<LongWritable, Text> lineRecordReader;
 92  
     /** Context passed to initialize */
 93  
     private TaskAttemptContext context;
 94  
 
 95  
     @Override
 96  
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
 97  
       throws IOException, InterruptedException {
 98  0
       this.context = context;
 99  0
       lineRecordReader = createLineRecordReader(inputSplit, context);
 100  0
       lineRecordReader.initialize(inputSplit, context);
 101  0
     }
 102  
 
 103  
     /**
 104  
      * Create the line record reader. Override this to use a different
 105  
      * underlying record reader (useful for testing).
 106  
      *
 107  
      * @param inputSplit
 108  
      *          the split to read
 109  
      * @param context
 110  
      *          the context passed to initialize
 111  
      * @return
 112  
      *         the record reader to be used
 113  
      * @throws IOException
 114  
      *           exception that can be thrown during creation
 115  
      * @throws InterruptedException
 116  
      *           exception that can be thrown during creation
 117  
      */
 118  
     protected RecordReader<LongWritable, Text>
 119  
     createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
 120  
       throws IOException, InterruptedException {
 121  0
       return textInputFormat.createRecordReader(inputSplit, context);
 122  
     }
 123  
 
 124  
     @Override
 125  
     public void close() throws IOException {
 126  0
       lineRecordReader.close();
 127  0
     }
 128  
 
 129  
     @Override
 130  
     public float getProgress() throws IOException, InterruptedException {
 131  0
       return lineRecordReader.getProgress();
 132  
     }
 133  
 
 134  
     /**
 135  
      * Get the line record reader.
 136  
      *
 137  
      * @return Record reader to be used for reading.
 138  
      */
 139  
     protected RecordReader<LongWritable, Text> getRecordReader() {
 140  0
       return lineRecordReader;
 141  
     }
 142  
 
 143  
     /**
 144  
      * Get the context.
 145  
      *
 146  
      * @return Context passed to initialize.
 147  
      */
 148  
     protected TaskAttemptContext getContext() {
 149  0
       return context;
 150  
     }
 151  
   }
 152  
 
 153  
   /**
 154  
    * Abstract class to be implemented by the user to read a vertex from each
 155  
    * text line.
 156  
    */
 157  0
   protected abstract class TextVertexReaderFromEachLine extends
 158  
     TextVertexReader {
 159  
 
 160  
     @Override
 161  
     public final Vertex<I, V, E> getCurrentVertex() throws IOException,
 162  
     InterruptedException {
 163  0
       Text line = getRecordReader().getCurrentValue();
 164  0
       Vertex<I, V, E> vertex = getConf().createVertex();
 165  0
       vertex.initialize(getId(line), getValue(line), getEdges(line));
 166  0
       return vertex;
 167  
     }
 168  
 
 169  
     @Override
 170  
     public final boolean nextVertex() throws IOException, InterruptedException {
 171  0
       return getRecordReader().nextKeyValue();
 172  
     }
 173  
 
 174  
     /**
 175  
      * Reads vertex id from the current line.
 176  
      *
 177  
      * @param line
 178  
      *          the current line
 179  
      * @return
 180  
      *         the vertex id corresponding to the line
 181  
      * @throws IOException
 182  
      *           exception that can be thrown while reading
 183  
      */
 184  
     protected abstract I getId(Text line) throws IOException;
 185  
 
 186  
     /**
 187  
      * Reads vertex value from the current line.
 188  
      *
 189  
      * @param line
 190  
      *          the current line
 191  
      * @return
 192  
      *         the vertex value corresponding to the line
 193  
      * @throws IOException
 194  
      *           exception that can be thrown while reading
 195  
      */
 196  
     protected abstract V getValue(Text line) throws IOException;
 197  
 
 198  
     /**
 199  
      * Reads edges value from the current line.
 200  
      *
 201  
      * @param line
 202  
      *          the current line
 203  
      * @return
 204  
      *         the edges
 205  
      * @throws IOException
 206  
      *           exception that can be thrown while reading
 207  
      */
 208  
     protected abstract Iterable<Edge<I, E>> getEdges(Text line) throws
 209  
         IOException;
 210  
 
 211  
   }
 212  
 
 213  
   /**
 214  
    * Abstract class to be implemented by the user to read a vertex from each
 215  
    * text line after preprocessing it.
 216  
    *
 217  
    * @param <T>
 218  
    *          The resulting type of preprocessing.
 219  
    */
 220  0
   protected abstract class TextVertexReaderFromEachLineProcessed<T> extends
 221  
       TextVertexReader {
 222  
 
 223  
     @Override
 224  
     public final boolean nextVertex() throws IOException, InterruptedException {
 225  0
       return getRecordReader().nextKeyValue();
 226  
     }
 227  
 
 228  
     @Override
 229  
     public final Vertex<I, V, E> getCurrentVertex() throws IOException,
 230  
     InterruptedException {
 231  0
       Text line = getRecordReader().getCurrentValue();
 232  
       Vertex<I, V, E> vertex;
 233  0
       T processed = preprocessLine(line);
 234  0
       vertex = getConf().createVertex();
 235  0
       vertex.initialize(getId(processed), getValue(processed),
 236  0
           getEdges(processed));
 237  0
       return vertex;
 238  
     }
 239  
 
 240  
     /**
 241  
      * Preprocess the line so other methods can easily read necessary
 242  
      * information for creating vertex.
 243  
      *
 244  
      * @param line
 245  
      *          the current line to be read
 246  
      * @return
 247  
      *         the preprocessed object
 248  
      * @throws IOException
 249  
      *           exception that can be thrown while reading
 250  
      */
 251  
     protected abstract T preprocessLine(Text line) throws IOException;
 252  
 
 253  
     /**
 254  
      * Reads vertex id from the preprocessed line.
 255  
      *
 256  
      * @param line
 257  
      *          the object obtained by preprocessing the line
 258  
      * @return
 259  
      *         the vertex id
 260  
      * @throws IOException
 261  
      *           exception that can be thrown while reading
 262  
      */
 263  
     protected abstract I getId(T line) throws IOException;
 264  
 
 265  
     /**
 266  
      * Reads vertex value from the preprocessed line.
 267  
      *
 268  
      * @param line
 269  
      *          the object obtained by preprocessing the line
 270  
      * @return
 271  
      *         the vertex value
 272  
      * @throws IOException
 273  
      *           exception that can be thrown while reading
 274  
      */
 275  
     protected abstract V getValue(T line) throws IOException;
 276  
 
 277  
     /**
 278  
      * Reads edges from the preprocessed line.
 279  
      *
 280  
      *
 281  
      * @param line
 282  
      *          the object obtained by preprocessing the line
 283  
      * @return
 284  
      *         the edges
 285  
      * @throws IOException
 286  
      *           exception that can be thrown while reading
 287  
      */
 288  
     protected abstract Iterable<Edge<I, E>> getEdges(T line) throws IOException;
 289  
 
 290  
   }
 291  
 
 292  
   // CHECKSTYLE: stop RedundantThrows
 293  
   /**
 294  
    * Abstract class to be implemented by the user to read a vertex from each
 295  
    * text line after preprocessing it with exception handling.
 296  
    *
 297  
    * @param <T>
 298  
    *          The resulting type of preprocessing.
 299  
    * @param <X>
 300  
    *          The exception type that can be thrown due to preprocessing.
 301  
    */
 302  0
   protected abstract class
 303  
   TextVertexReaderFromEachLineProcessedHandlingExceptions<T, X extends
 304  
     Throwable> extends TextVertexReader {
 305  
 
 306  
     @Override
 307  
     public final boolean nextVertex() throws IOException, InterruptedException {
 308  0
       return getRecordReader().nextKeyValue();
 309  
     }
 310  
 
 311  
     @SuppressWarnings("unchecked")
 312  
     @Override
 313  
     public final Vertex<I, V, E> getCurrentVertex() throws IOException,
 314  
         InterruptedException {
 315  
       // Note we are reading from value only since key is the line number
 316  0
       Text line = getRecordReader().getCurrentValue();
 317  
       Vertex<I, V, E> vertex;
 318  0
       T processed = null;
 319  
       try {
 320  0
         processed = preprocessLine(line);
 321  0
         vertex = getConf().createVertex();
 322  0
         vertex.initialize(getId(processed), getValue(processed),
 323  0
             getEdges(processed));
 324  0
       } catch (IOException e) {
 325  0
         throw e;
 326  
       // CHECKSTYLE: stop IllegalCatch
 327  0
       } catch (Throwable t) {
 328  0
         return handleException(line, processed, (X) t);
 329  
       // CHECKSTYLE: resume IllegalCatch
 330  0
       }
 331  0
       return vertex;
 332  
     }
 333  
 
 334  
     /**
 335  
      * Preprocess the line so other methods can easily read necessary
 336  
      * information for creating vertex.
 337  
      *
 338  
      * @param line
 339  
      *          the current line to be read
 340  
      * @return
 341  
      *         the preprocessed object
 342  
      * @throws X
 343  
      *           exception that can be thrown while preprocessing the line
 344  
      * @throws IOException
 345  
      *           exception that can be thrown while reading
 346  
      */
 347  
     protected abstract T preprocessLine(Text line) throws X, IOException;
 348  
 
 349  
     /**
 350  
      * Reads vertex id from the preprocessed line.
 351  
      *
 352  
      * @param line
 353  
      *          the object obtained by preprocessing the line
 354  
      * @return
 355  
      *         the vertex id
 356  
      * @throws X
 357  
      *           exception that can be thrown while reading the preprocessed
 358  
      *           object
 359  
      * @throws IOException
 360  
      *           exception that can be thrown while reading
 361  
      */
 362  
     protected abstract I getId(T line) throws X, IOException;
 363  
 
 364  
     /**
 365  
      * Reads vertex value from the preprocessed line.
 366  
      *
 367  
      * @param line
 368  
      *          the object obtained by preprocessing the line
 369  
      * @return
 370  
      *         the vertex value
 371  
      * @throws X
 372  
      *           exception that can be thrown while reading the preprocessed
 373  
      *           object
 374  
      * @throws IOException
 375  
      *           exception that can be thrown while reading
 376  
      */
 377  
     protected abstract V getValue(T line) throws X, IOException;
 378  
 
 379  
     /**
 380  
      * Reads edges from the preprocessed line.
 381  
      *
 382  
      *
 383  
      * @param line
 384  
      *          the object obtained by preprocessing the line
 385  
      * @return
 386  
      *         the edges
 387  
      * @throws X
 388  
      *           exception that can be thrown while reading the preprocessed
 389  
      *           object
 390  
      * @throws IOException
 391  
      *           exception that can be thrown while reading
 392  
      */
 393  
     protected abstract Iterable<Edge<I, E>> getEdges(T line) throws X,
 394  
         IOException;
 395  
 
 396  
     /**
 397  
      * Handles exceptions while reading vertex from each line.
 398  
      *
 399  
      * @param line
 400  
      *          the line that was being read when the exception was thrown
 401  
      * @param processed
 402  
      *          the object obtained by preprocessing the line. Can be null if
 403  
      *          exception was thrown during preprocessing.
 404  
      * @param e
 405  
      *          the exception thrown while reading the line
 406  
      * @return the recovered/alternative vertex to be used
 407  
      */
 408  
     protected Vertex<I, V, E> handleException(Text line, T processed, X e) {
 409  0
       throw new IllegalArgumentException(e);
 410  
     }
 411  
 
 412  
   }
 413  
   // CHECKSTYLE: resume RedundantThrows
 414  
 
 415  
 }