Coverage Report - org.apache.giraph.io.formats.GiraphTextOutputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
GiraphTextOutputFormat
0%
0/27
0%
0/6
3
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.io.formats;
 20  
 
 21  
 import java.io.DataOutputStream;
 22  
 import java.io.IOException;
 23  
 
 24  
 import org.apache.hadoop.conf.Configuration;
 25  
 import org.apache.hadoop.fs.FSDataOutputStream;
 26  
 import org.apache.hadoop.fs.FileSystem;
 27  
 import org.apache.hadoop.fs.Path;
 28  
 import org.apache.hadoop.io.Text;
 29  
 import org.apache.hadoop.io.compress.CompressionCodec;
 30  
 import org.apache.hadoop.io.compress.GzipCodec;
 31  
 import org.apache.hadoop.mapreduce.RecordWriter;
 32  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 33  
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 34  
 import org.apache.hadoop.util.ReflectionUtils;
 35  
 
 36  
 /**
 37  
  * The text output format used for Giraph text writing.
 38  
  */
 39  0
 public abstract class GiraphTextOutputFormat
 40  
   extends TextOutputFormat<Text, Text> {
 41  
 
 42  
   @Override
 43  
   public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
 44  
     throws IOException, InterruptedException {
 45  0
     String extension = "";
 46  0
     CompressionCodec codec = null;
 47  0
     Configuration conf = job.getConfiguration();
 48  0
     boolean isCompressed = getCompressOutput(job);
 49  
 
 50  0
     if (isCompressed) {
 51  0
       Class<? extends CompressionCodec> codecClass =
 52  0
         getOutputCompressorClass(job, GzipCodec.class);
 53  0
       codec =
 54  0
         (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
 55  0
       extension = codec.getDefaultExtension();
 56  
     }
 57  0
     Path file = getDefaultWorkFile(job, extension);
 58  
 
 59  
     /* adjust the path */
 60  
     FSDataOutputStream fileOut;
 61  0
     FileSystem fs = file.getFileSystem(conf);
 62  0
     String subdir = getSubdir();
 63  0
     if (!subdir.isEmpty()) {
 64  0
       Path subdirPath = new Path(subdir);
 65  0
       Path subdirAbsPath = new Path(file.getParent(), subdirPath);
 66  0
       Path vertexFile = new Path(subdirAbsPath, file.getName());
 67  0
       fileOut = fs.create(vertexFile, false);
 68  0
     } else {
 69  0
       fileOut = fs.create(file, false);
 70  
     }
 71  
 
 72  0
     String separator = "\t";
 73  
 
 74  0
     if (!isCompressed) {
 75  0
       return new LineRecordWriter<Text, Text>(fileOut, separator);
 76  
     } else {
 77  0
       DataOutputStream out =
 78  0
         new DataOutputStream(codec.createOutputStream(fileOut));
 79  0
       return new LineRecordWriter<Text, Text>(out, separator);
 80  
     }
 81  
   }
 82  
 
 83  
   /**
 84  
    * This function is used to provide an additional path level to keep
 85  
    * different text outputs into different directories.
 86  
    *
 87  
    * @return  the subdirectory to be created under the output path
 88  
    */
 89  
   protected abstract String getSubdir();
 90  
 }