Coverage Report - org.apache.giraph.io.formats.TextVertexValueInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
TextVertexValueInputFormat
0%
0/5
N/A
1.048
TextVertexValueInputFormat$TextVertexValueReader
0%
0/12
N/A
1.048
TextVertexValueInputFormat$TextVertexValueReaderFromEachLine
0%
0/4
N/A
1.048
TextVertexValueInputFormat$TextVertexValueReaderFromEachLineProcessed
0%
0/9
0%
0/2
1.048
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.formats;
 20  
 
 21  
 import org.apache.giraph.io.VertexValueInputFormat;
 22  
 import org.apache.giraph.io.VertexValueReader;
 23  
 import org.apache.hadoop.conf.Configuration;
 24  
 import org.apache.hadoop.io.LongWritable;
 25  
 import org.apache.hadoop.io.Text;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 import org.apache.hadoop.mapreduce.InputSplit;
 29  
 import org.apache.hadoop.mapreduce.JobContext;
 30  
 import org.apache.hadoop.mapreduce.RecordReader;
 31  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 32  
 
 33  
 import java.io.IOException;
 34  
 import java.util.List;
 35  
 
 36  
 /**
 37  
  * Abstract class that users should subclass to use their own text based
 38  
  * vertex value input format.
 39  
  *
 40  
  * @param <I> Vertex index value
 41  
  * @param <V> Vertex value
 42  
  * @param <E> Edge value
 43  
  */
 44  0
 @SuppressWarnings("rawtypes")
 45  0
 public abstract class TextVertexValueInputFormat<I extends WritableComparable,
 46  
     V extends Writable, E extends Writable>
 47  
     extends VertexValueInputFormat<I, V> {
 48  
   /** Uses the GiraphTextInputFormat to do everything */
 49  0
   protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
 50  
 
 51  0
   @Override public void checkInputSpecs(Configuration conf) { }
 52  
 
 53  
   @Override
 54  
   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
 55  
     throws IOException, InterruptedException {
 56  
     // Ignore the hint of numWorkers here since we are using
 57  
     // GiraphTextInputFormat to do this for us
 58  0
     return textInputFormat.getVertexSplits(context);
 59  
   }
 60  
 
 61  
   @Override
 62  
   public abstract TextVertexValueReader createVertexValueReader(
 63  
       InputSplit split, TaskAttemptContext context) throws IOException;
 64  
 
 65  
   /**
 66  
    * {@link VertexValueReader} for {@link VertexValueInputFormat}.
 67  
    */
 68  0
   protected abstract class TextVertexValueReader extends
 69  
       VertexValueReader<I, V> {
 70  
     /** Internal line record reader */
 71  
     private RecordReader<LongWritable, Text> lineRecordReader;
 72  
     /** Context passed to initialize */
 73  
     private TaskAttemptContext context;
 74  
 
 75  
     @Override
 76  
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
 77  
       throws IOException, InterruptedException {
 78  0
       super.initialize(inputSplit, context);
 79  0
       this.context = context;
 80  0
       lineRecordReader = createLineRecordReader(inputSplit, context);
 81  0
       lineRecordReader.initialize(inputSplit, context);
 82  0
     }
 83  
 
 84  
     /**
 85  
      * Create the line record reader. Override this to use a different
 86  
      * underlying record reader (useful for testing).
 87  
      *
 88  
      * @param inputSplit the split to read
 89  
      * @param context the context passed to initialize
 90  
      * @return the record reader to be used
 91  
      * @throws IOException exception that can be thrown during creation
 92  
      * @throws InterruptedException exception that can be thrown during creation
 93  
      */
 94  
     protected RecordReader<LongWritable, Text>
 95  
     createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
 96  
       throws IOException, InterruptedException {
 97  0
       return textInputFormat.createRecordReader(inputSplit, context);
 98  
     }
 99  
 
 100  
     @Override
 101  
     public void close() throws IOException {
 102  0
       lineRecordReader.close();
 103  0
     }
 104  
 
 105  
     @Override
 106  
     public float getProgress() throws IOException, InterruptedException {
 107  0
       return lineRecordReader.getProgress();
 108  
     }
 109  
 
 110  
     /**
 111  
      * Get the line record reader.
 112  
      *
 113  
      * @return Record reader to be used for reading.
 114  
      */
 115  
     protected RecordReader<LongWritable, Text> getRecordReader() {
 116  0
       return lineRecordReader;
 117  
     }
 118  
 
 119  
     /**
 120  
      * Get the context.
 121  
      *
 122  
      * @return Context passed to initialize.
 123  
      */
 124  
     protected TaskAttemptContext getContext() {
 125  0
       return context;
 126  
     }
 127  
   }
 128  
 
 129  
   /**
 130  
    * Abstract class to be implemented by the user to read a vertex value from
 131  
    * each text line.
 132  
    */
 133  0
   protected abstract class TextVertexValueReaderFromEachLine extends
 134  
       TextVertexValueReader {
 135  
     @Override
 136  
     public final I getCurrentVertexId() throws IOException,
 137  
         InterruptedException {
 138  0
       return getId(getRecordReader().getCurrentValue());
 139  
     }
 140  
 
 141  
     @Override
 142  
     public final V getCurrentVertexValue() throws IOException,
 143  
         InterruptedException {
 144  0
       return getValue(getRecordReader().getCurrentValue());
 145  
     }
 146  
 
 147  
     @Override
 148  
     public final boolean nextVertex() throws IOException, InterruptedException {
 149  0
       return getRecordReader().nextKeyValue();
 150  
     }
 151  
 
 152  
     /**
 153  
      * Reads vertex id from the current line.
 154  
      *
 155  
      * @param line the current line
 156  
      * @return the vertex id corresponding to the line
 157  
      * @throws IOException exception that can be thrown while reading
 158  
      */
 159  
     protected abstract I getId(Text line) throws IOException;
 160  
 
 161  
     /**
 162  
      * Reads vertex value from the current line.
 163  
      *
 164  
      * @param line the current line
 165  
      * @return the vertex value corresponding to the line
 166  
      * @throws IOException
 167  
      *           exception that can be thrown while reading
 168  
      */
 169  
     protected abstract V getValue(Text line) throws IOException;
 170  
   }
 171  
 
 172  
   /**
 173  
    * Abstract class to be implemented by the user to read a vertex value from
 174  
    * each text line after preprocessing it.
 175  
    *
 176  
    * @param <T> The resulting type of preprocessing.
 177  
    */
 178  0
   protected abstract class TextVertexValueReaderFromEachLineProcessed<T>
 179  
       extends TextVertexValueReader {
 180  
     /** Last preprocessed line. */
 181  0
     private T processedLine = null;
 182  
 
 183  
     /** Get last preprocessed line. Generate it if missing.
 184  
      *
 185  
      * @return The last preprocessed line
 186  
      * @throws IOException
 187  
      * @throws InterruptedException
 188  
      */
 189  
     private T getProcessedLine() throws IOException, InterruptedException {
 190  0
       if (processedLine == null) {
 191  0
         processedLine = preprocessLine(getRecordReader().getCurrentValue());
 192  
       }
 193  0
       return processedLine;
 194  
     }
 195  
 
 196  
     @Override
 197  
     public I getCurrentVertexId() throws IOException,
 198  
         InterruptedException {
 199  0
       return getId(getProcessedLine());
 200  
     }
 201  
 
 202  
     @Override
 203  
     public V getCurrentVertexValue() throws IOException,
 204  
         InterruptedException {
 205  0
       return getValue(getProcessedLine());
 206  
     }
 207  
 
 208  
     @Override
 209  
     public final boolean nextVertex() throws IOException, InterruptedException {
 210  0
       processedLine = null;
 211  0
       return getRecordReader().nextKeyValue();
 212  
     }
 213  
 
 214  
     /**
 215  
      * Preprocess the line so other methods can easily read necessary
 216  
      * information for creating vertex.
 217  
      *
 218  
      * @param line the current line to be read
 219  
      * @return the preprocessed object
 220  
      * @throws IOException exception that can be thrown while reading
 221  
      */
 222  
     protected abstract T preprocessLine(Text line) throws IOException;
 223  
 
 224  
     /**
 225  
      * Reads vertex id from the preprocessed line.
 226  
      *
 227  
      * @param line
 228  
      *          the object obtained by preprocessing the line
 229  
      * @return the vertex id
 230  
      * @throws IOException exception that can be thrown while reading
 231  
      */
 232  
     protected abstract I getId(T line) throws IOException;
 233  
 
 234  
     /**
 235  
      * Reads vertex value from the preprocessed line.
 236  
      *
 237  
      * @param line the object obtained by preprocessing the line
 238  
      * @return the vertex value
 239  
      * @throws IOException exception that can be thrown while reading
 240  
      */
 241  
     protected abstract V getValue(T line) throws IOException;
 242  
   }
 243  
 }