1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import java.io.DataOutputStream;
22 import java.io.IOException;
23
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FSDataOutputStream;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.io.Text;
29 import org.apache.hadoop.io.compress.CompressionCodec;
30 import org.apache.hadoop.io.compress.GzipCodec;
31 import org.apache.hadoop.mapreduce.RecordWriter;
32 import org.apache.hadoop.mapreduce.TaskAttemptContext;
33 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
34 import org.apache.hadoop.util.ReflectionUtils;
35
36
37
38
39 public abstract class GiraphTextOutputFormat
40 extends TextOutputFormat<Text, Text> {
41
42 @Override
43 public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
44 throws IOException, InterruptedException {
45 String extension = "";
46 CompressionCodec codec = null;
47 Configuration conf = job.getConfiguration();
48 boolean isCompressed = getCompressOutput(job);
49
50 if (isCompressed) {
51 Class<? extends CompressionCodec> codecClass =
52 getOutputCompressorClass(job, GzipCodec.class);
53 codec =
54 (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
55 extension = codec.getDefaultExtension();
56 }
57 Path file = getDefaultWorkFile(job, extension);
58
59
60 FSDataOutputStream fileOut;
61 FileSystem fs = file.getFileSystem(conf);
62 String subdir = getSubdir();
63 if (!subdir.isEmpty()) {
64 Path subdirPath = new Path(subdir);
65 Path subdirAbsPath = new Path(file.getParent(), subdirPath);
66 Path vertexFile = new Path(subdirAbsPath, file.getName());
67 fileOut = fs.create(vertexFile, false);
68 } else {
69 fileOut = fs.create(file, false);
70 }
71
72 String separator = "\t";
73
74 if (!isCompressed) {
75 return new LineRecordWriter<Text, Text>(fileOut, separator);
76 } else {
77 DataOutputStream out =
78 new DataOutputStream(codec.createOutputStream(fileOut));
79 return new LineRecordWriter<Text, Text>(out, separator);
80 }
81 }
82
83
84
85
86
87
88
89 protected abstract String getSubdir();
90 }