1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.graph.Vertex;
24 import org.apache.giraph.io.VertexOutputFormat;
25 import org.apache.giraph.io.VertexWriter;
26 import org.apache.hadoop.io.Text;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.hadoop.mapreduce.JobContext;
30 import org.apache.hadoop.mapreduce.OutputCommitter;
31 import org.apache.hadoop.mapreduce.RecordWriter;
32 import org.apache.hadoop.mapreduce.TaskAttemptContext;
33
34 import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_SUBDIR;
35
36
37
38
39
40
41
42
43
44 @SuppressWarnings("rawtypes")
45 public abstract class TextVertexOutputFormat<I extends WritableComparable,
46 V extends Writable, E extends Writable>
47 extends VertexOutputFormat<I, V, E> {
48
49 protected GiraphTextOutputFormat textOutputFormat =
50 new GiraphTextOutputFormat() {
51 @Override
52 protected String getSubdir() {
53 return VERTEX_OUTPUT_FORMAT_SUBDIR.get(getConf());
54 }
55 };
56
57 @Override
58 public void checkOutputSpecs(JobContext context)
59 throws IOException, InterruptedException {
60 textOutputFormat.checkOutputSpecs(context);
61 }
62
63 @Override
64 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
65 throws IOException, InterruptedException {
66 return textOutputFormat.getOutputCommitter(context);
67 }
68
69
70
71
72
73
74
75
76
77
78 @Override
79 public abstract TextVertexWriter createVertexWriter(TaskAttemptContext
80 context) throws IOException, InterruptedException;
81
82
83
84
85
86
87 protected abstract class TextVertexWriter
88 extends VertexWriter<I, V, E> {
89
90 private RecordWriter<Text, Text> lineRecordWriter;
91
92 private TaskAttemptContext context;
93
94 @Override
95 public void initialize(TaskAttemptContext context) throws IOException,
96 InterruptedException {
97 lineRecordWriter = createLineRecordWriter(context);
98 this.context = context;
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 protected RecordWriter<Text, Text> createLineRecordWriter(
115 TaskAttemptContext context) throws IOException, InterruptedException {
116 return textOutputFormat.getRecordWriter(context);
117 }
118
119 @Override
120 public void close(TaskAttemptContext context) throws IOException,
121 InterruptedException {
122 lineRecordWriter.close(context);
123 }
124
125
126
127
128
129
130 public RecordWriter<Text, Text> getRecordWriter() {
131 return lineRecordWriter;
132 }
133
134
135
136
137
138
139 public TaskAttemptContext getContext() {
140 return context;
141 }
142 }
143
144
145
146
147
148 protected abstract class TextVertexWriterToEachLine extends TextVertexWriter {
149
150 @SuppressWarnings("unchecked")
151 @Override
152 public final void writeVertex(Vertex vertex) throws
153 IOException, InterruptedException {
154
155 getRecordWriter().write(convertVertexToLine(vertex), null);
156 }
157
158
159
160
161
162
163
164
165
166
167 protected abstract Text convertVertexToLine(Vertex<I, V, E> vertex)
168 throws IOException;
169 }
170 }