Coverage Report - org.apache.giraph.io.formats.TextEdgeInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
TextEdgeInputFormat
0%
0/4
N/A
1.045
TextEdgeInputFormat$TextEdgeReader
0%
0/11
N/A
1.045
TextEdgeInputFormat$TextEdgeReaderFromEachLine
0%
0/8
N/A
1.045
TextEdgeInputFormat$TextEdgeReaderFromEachLineProcessed
0%
0/13
0%
0/2
1.045
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.formats;
 20  
 
 21  
 import java.io.IOException;
 22  
 import java.util.List;
 23  
 import org.apache.giraph.edge.Edge;
 24  
 import org.apache.giraph.edge.EdgeFactory;
 25  
 import org.apache.giraph.io.EdgeInputFormat;
 26  
 import org.apache.giraph.io.EdgeReader;
 27  
 import org.apache.hadoop.conf.Configuration;
 28  
 import org.apache.hadoop.io.LongWritable;
 29  
 import org.apache.hadoop.io.Text;
 30  
 import org.apache.hadoop.io.Writable;
 31  
 import org.apache.hadoop.io.WritableComparable;
 32  
 import org.apache.hadoop.mapreduce.InputSplit;
 33  
 import org.apache.hadoop.mapreduce.JobContext;
 34  
 import org.apache.hadoop.mapreduce.RecordReader;
 35  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 36  
 
 37  
 /**
 38  
  * Abstract class that users should subclass to use their own text based
 39  
  * edge output format.
 40  
  *
 41  
  * @param <I> Vertex id
 42  
  * @param <E> Edge data
 43  
  */
 44  
 @SuppressWarnings("rawtypes")
 45  0
 public abstract class TextEdgeInputFormat<I extends WritableComparable,
 46  
     E extends Writable> extends EdgeInputFormat<I, E> {
 47  
   /** Underlying GiraphTextInputFormat. */
 48  0
   protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
 49  
 
 50  0
   @Override public void checkInputSpecs(Configuration conf) { }
 51  
 
 52  
   @Override
 53  
   public List<InputSplit> getSplits(
 54  
       JobContext context, int minSplitCountHint) throws IOException,
 55  
       InterruptedException {
 56  
     // Ignore the hint of numWorkers here since we are using
 57  
     // GiraphTextInputFormat to do this for us
 58  0
     return textInputFormat.getEdgeSplits(context);
 59  
   }
 60  
 
 61  
   /**
 62  
    * {@link EdgeReader} for {@link TextEdgeInputFormat}.
 63  
    */
 64  0
   protected abstract class TextEdgeReader extends EdgeReader<I, E> {
 65  
     /** Internal line record reader */
 66  
     private RecordReader<LongWritable, Text> lineRecordReader;
 67  
     /** Context passed to initialize */
 68  
     private TaskAttemptContext context;
 69  
 
 70  
     @Override
 71  
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
 72  
       throws IOException, InterruptedException {
 73  0
       this.context = context;
 74  0
       lineRecordReader = createLineRecordReader(inputSplit, context);
 75  0
       lineRecordReader.initialize(inputSplit, context);
 76  0
     }
 77  
 
 78  
     /**
 79  
      * Create the line record reader. Override this to use a different
 80  
      * underlying record reader (useful for testing).
 81  
      *
 82  
      * @param inputSplit
 83  
      *          the split to read
 84  
      * @param context
 85  
      *          the context passed to initialize
 86  
      * @return
 87  
      *         the record reader to be used
 88  
      * @throws IOException
 89  
      *           exception that can be thrown during creation
 90  
      * @throws InterruptedException
 91  
      *           exception that can be thrown during creation
 92  
      */
 93  
     protected RecordReader<LongWritable, Text>
 94  
     createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
 95  
       throws IOException, InterruptedException {
 96  0
       return textInputFormat.createRecordReader(inputSplit, context);
 97  
     }
 98  
 
 99  
     @Override
 100  
     public void close() throws IOException {
 101  0
       lineRecordReader.close();
 102  0
     }
 103  
 
 104  
     @Override
 105  
     public float getProgress() throws IOException, InterruptedException {
 106  0
       return lineRecordReader.getProgress();
 107  
     }
 108  
 
 109  
     /**
 110  
      * Get the line record reader.
 111  
      *
 112  
      * @return Record reader to be used for reading.
 113  
      */
 114  
     protected RecordReader<LongWritable, Text> getRecordReader() {
 115  0
       return lineRecordReader;
 116  
     }
 117  
 
 118  
     /**
 119  
      * Get the context.
 120  
      *
 121  
      * @return Context passed to initialize.
 122  
      */
 123  
     protected TaskAttemptContext getContext() {
 124  0
       return context;
 125  
     }
 126  
   }
 127  
 
 128  
   /**
 129  
    * Abstract class to be implemented by the user to read an edge from each
 130  
    * text line.
 131  
    */
 132  0
   protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader {
 133  
     @Override
 134  
     public final I getCurrentSourceId() throws IOException,
 135  
         InterruptedException {
 136  0
       Text line = getRecordReader().getCurrentValue();
 137  0
       return getSourceVertexId(line);
 138  
     }
 139  
 
 140  
     @Override
 141  
     public final Edge<I, E> getCurrentEdge() throws IOException,
 142  
         InterruptedException {
 143  0
       Text line = getRecordReader().getCurrentValue();
 144  0
       I targetVertexId = getTargetVertexId(line);
 145  0
       E edgeValue = getValue(line);
 146  0
       return EdgeFactory.create(targetVertexId, edgeValue);
 147  
     }
 148  
 
 149  
     @Override
 150  
     public final boolean nextEdge() throws IOException, InterruptedException {
 151  0
       return getRecordReader().nextKeyValue();
 152  
     }
 153  
 
 154  
     /**
 155  
      * Reads source vertex id from the current line.
 156  
      *
 157  
      * @param line
 158  
      *          the current line
 159  
      * @return
 160  
      *         the source vertex id corresponding to the line
 161  
      * @throws IOException
 162  
      *           exception that can be thrown while reading
 163  
      */
 164  
     protected abstract I getSourceVertexId(Text line) throws IOException;
 165  
 
 166  
 
 167  
     /**
 168  
      * Reads target vertex id from the current line.
 169  
      *
 170  
      * @param line
 171  
      *          the current line
 172  
      * @return
 173  
      *         the target vertex id corresponding to the line
 174  
      * @throws IOException
 175  
      *           exception that can be thrown while reading
 176  
      */
 177  
     protected abstract I getTargetVertexId(Text line) throws IOException;
 178  
 
 179  
     /**
 180  
      * Reads edge value from the current line.
 181  
      *
 182  
      * @param line
 183  
      *          the current line
 184  
      * @return
 185  
      *         the edge value corresponding to the line
 186  
      * @throws IOException
 187  
      *           exception that can be thrown while reading
 188  
      */
 189  
     protected abstract E getValue(Text line) throws IOException;
 190  
   }
 191  
 
 192  
   /**
 193  
    * Abstract class to be implemented by the user to read an edge from each
 194  
    * text line after preprocessing it.
 195  
    *
 196  
    * @param <T>
 197  
    *          The resulting type of preprocessing.
 198  
    */
 199  0
   protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends
 200  
       TextEdgeReader {
 201  
     /** Generic type holding processed line */
 202  
     private T processedLine;
 203  
 
 204  
     @Override
 205  
     public I getCurrentSourceId() throws IOException, InterruptedException {
 206  0
       T processed = processCurrentLine();
 207  0
       return getSourceVertexId(processed);
 208  
     }
 209  
 
 210  
     @Override
 211  
     public final Edge<I, E> getCurrentEdge() throws IOException,
 212  
         InterruptedException {
 213  0
       T processed = processCurrentLine();
 214  0
       I targetVertexId = getTargetVertexId(processed);
 215  0
       E edgeValue = getValue(processed);
 216  0
       return EdgeFactory.create(targetVertexId, edgeValue);
 217  
     }
 218  
 
 219  
     /**
 220  
      * Process the current line to the user's type.
 221  
      *
 222  
      * @return T processed line
 223  
      * @throws IOException on I/O error
 224  
      * @throws InterruptedException on interruption
 225  
      */
 226  
     private T processCurrentLine() throws IOException, InterruptedException {
 227  0
       if (processedLine == null) {
 228  0
         Text line = getRecordReader().getCurrentValue();
 229  0
         processedLine = preprocessLine(line);
 230  
       }
 231  0
       return processedLine;
 232  
     }
 233  
 
 234  
     @Override
 235  
     public final boolean nextEdge() throws IOException, InterruptedException {
 236  0
       processedLine = null;
 237  0
       return getRecordReader().nextKeyValue();
 238  
     }
 239  
 
 240  
     /**
 241  
      * Preprocess the line so other methods can easily read necessary
 242  
      * information for creating edge
 243  
      *
 244  
      * @param line
 245  
      *          the current line to be read
 246  
      * @return
 247  
      *         the preprocessed object
 248  
      * @throws IOException
 249  
      *           exception that can be thrown while reading
 250  
      */
 251  
     protected abstract T preprocessLine(Text line) throws IOException;
 252  
 
 253  
     /**
 254  
      * Reads target vertex id from the preprocessed line.
 255  
      *
 256  
      * @param line
 257  
      *          the object obtained by preprocessing the line
 258  
      * @return
 259  
      *         the target vertex id
 260  
      * @throws IOException
 261  
      *           exception that can be thrown while reading
 262  
      */
 263  
     protected abstract I getTargetVertexId(T line) throws IOException;
 264  
 
 265  
     /**
 266  
      * Reads source vertex id from the preprocessed line.
 267  
      *
 268  
      * @param line
 269  
      *          the object obtained by preprocessing the line
 270  
      * @return
 271  
      *         the source vertex id
 272  
      * @throws IOException
 273  
      *           exception that can be thrown while reading
 274  
      */
 275  
     protected abstract I getSourceVertexId(T line) throws IOException;
 276  
 
 277  
     /**
 278  
      * Reads edge value from the preprocessed line.
 279  
      *
 280  
      * @param line
 281  
      *          the object obtained by preprocessing the line
 282  
      * @return
 283  
      *         the edge value
 284  
      * @throws IOException
 285  
      *           exception that can be thrown while reading
 286  
      */
 287  
     protected abstract E getValue(T line) throws IOException;
 288  
   }
 289  
 }