Coverage Report - org.apache.giraph.io.formats.TextEdgeOutputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
TextEdgeOutputFormat
0%
0/6
N/A
1
TextEdgeOutputFormat$1
0%
0/2
N/A
1
TextEdgeOutputFormat$TextEdgeWriter
0%
0/9
N/A
1
TextEdgeOutputFormat$TextEdgeWriterToEachLine
0%
0/4
N/A
1
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.formats;
 20  
 
 21  
 import static org.apache.giraph.conf.GiraphConstants.EDGE_OUTPUT_FORMAT_SUBDIR;
 22  
 
 23  
 import java.io.IOException;
 24  
 
 25  
 import org.apache.giraph.edge.Edge;
 26  
 import org.apache.giraph.io.EdgeOutputFormat;
 27  
 import org.apache.giraph.io.EdgeWriter;
 28  
 import org.apache.hadoop.io.Text;
 29  
 import org.apache.hadoop.io.Writable;
 30  
 import org.apache.hadoop.io.WritableComparable;
 31  
 import org.apache.hadoop.mapreduce.JobContext;
 32  
 import org.apache.hadoop.mapreduce.OutputCommitter;
 33  
 import org.apache.hadoop.mapreduce.RecordWriter;
 34  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 35  
 
 36  
 /**
 37  
  * Abstract class that users should subclass to use their own text based
 38  
  * edge output format.
 39  
  *
 40  
  * @param <I> Vertex index value
 41  
  * @param <V> Vertex value
 42  
  * @param <E> Edge value
 43  
  */
 44  0
 @SuppressWarnings("rawtypes")
 45  0
 public abstract class TextEdgeOutputFormat<I extends WritableComparable,
 46  
     V extends Writable, E extends Writable>
 47  
     extends EdgeOutputFormat<I, V, E> {
 48  
   /** Uses the TextOutputFormat to do everything */
 49  0
   protected GiraphTextOutputFormat textOutputFormat =
 50  0
     new GiraphTextOutputFormat() {
 51  
       @Override
 52  
       protected String getSubdir() {
 53  0
         return EDGE_OUTPUT_FORMAT_SUBDIR.get(getConf());
 54  
       }
 55  
     };
 56  
 
 57  
   @Override
 58  
   public void checkOutputSpecs(JobContext context)
 59  
     throws IOException, InterruptedException {
 60  0
     textOutputFormat.checkOutputSpecs(context);
 61  0
   }
 62  
 
 63  
   @Override
 64  
   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
 65  
     throws IOException, InterruptedException {
 66  0
     return textOutputFormat.getOutputCommitter(context);
 67  
   }
 68  
 
 69  
   /**
 70  
    * The factory method which produces the {@link TextEdgeWriter} used by this
 71  
    * output format.
 72  
    *
 73  
    * @param context  the information about the task
 74  
    * @return         the text edge writer to be used
 75  
    */
 76  
   @Override
 77  
   public abstract TextEdgeWriter createEdgeWriter(TaskAttemptContext
 78  
       context) throws IOException, InterruptedException;
 79  
 
 80  
   /**
 81  
    * Abstract class to be implemented by the user based on their specific
 82  
    * edge output.  Easiest to ignore the key value separator and only use
 83  
    * key instead.
 84  
    */
 85  0
   protected abstract class TextEdgeWriter<I extends WritableComparable,
 86  
     V extends Writable, E extends Writable>
 87  
       extends EdgeWriter<I, V, E> {
 88  
     /** Internal line record writer */
 89  
     private RecordWriter<Text, Text> lineRecordWriter;
 90  
     /** Context passed to initialize */
 91  
     private TaskAttemptContext context;
 92  
 
 93  
     @Override
 94  
     public void initialize(TaskAttemptContext context) throws IOException,
 95  
            InterruptedException {
 96  0
       lineRecordWriter = createLineRecordWriter(context);
 97  0
       this.context = context;
 98  0
     }
 99  
 
 100  
     /**
 101  
      * Create the line record writer. Override this to use a different
 102  
      * underlying record writer (useful for testing).
 103  
      *
 104  
      * @param  context the context passed to initialize
 105  
      * @return the record writer to be used
 106  
      * @throws IOException          exception that can be thrown during creation
 107  
      * @throws InterruptedException exception that can be thrown during creation
 108  
      */
 109  
     protected RecordWriter<Text, Text> createLineRecordWriter(
 110  
         TaskAttemptContext context) throws IOException, InterruptedException {
 111  0
       return textOutputFormat.getRecordWriter(context);
 112  
     }
 113  
 
 114  
     @Override
 115  
     public void close(TaskAttemptContext context) throws IOException,
 116  
         InterruptedException {
 117  0
       lineRecordWriter.close(context);
 118  0
     }
 119  
 
 120  
     /**
 121  
      * Get the line record writer.
 122  
      *
 123  
      * @return Record writer to be used for writing.
 124  
      */
 125  
     public RecordWriter<Text, Text> getRecordWriter() {
 126  0
       return lineRecordWriter;
 127  
     }
 128  
 
 129  
     /**
 130  
      * Get the context.
 131  
      *
 132  
      * @return Context passed to initialize.
 133  
      */
 134  
     public TaskAttemptContext getContext() {
 135  0
       return context;
 136  
     }
 137  
   }
 138  
 
 139  
   /**
 140  
    * Abstract class to be implemented by the user to write a line for each
 141  
    * edge.
 142  
    */
 143  0
   protected abstract class TextEdgeWriterToEachLine<
 144  
     I extends WritableComparable, V extends Writable, E extends Writable>
 145  
     extends TextEdgeWriter<I, V, E> {
 146  
 
 147  
     @Override
 148  
     public final void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
 149  
       throws IOException, InterruptedException {
 150  
 
 151  
       // Note we are writing line as key with null value
 152  0
       getRecordWriter().write(
 153  0
         convertEdgeToLine(sourceId, sourceValue, edge), null);
 154  0
     }
 155  
 
 156  
     /**
 157  
      * Writes a line for the given edge.
 158  
      *
 159  
      * @param sourceId    the current id of the source vertex
 160  
      * @param sourceValue the current value of the source vertex
 161  
      * @param edge        the current vertex for writing
 162  
      * @return the text line to be written
 163  
      * @throws IOException exception that can be thrown while writing
 164  
      */
 165  
     protected abstract Text convertEdgeToLine(I sourceId,
 166  
       V sourceValue, Edge<I, E> edge) throws IOException;
 167  
   }
 168  
 }