1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import static org.apache.giraph.conf.GiraphConstants.EDGE_OUTPUT_FORMAT_SUBDIR;
22
23 import java.io.IOException;
24
25 import org.apache.giraph.edge.Edge;
26 import org.apache.giraph.io.EdgeOutputFormat;
27 import org.apache.giraph.io.EdgeWriter;
28 import org.apache.hadoop.io.Text;
29 import org.apache.hadoop.io.Writable;
30 import org.apache.hadoop.io.WritableComparable;
31 import org.apache.hadoop.mapreduce.JobContext;
32 import org.apache.hadoop.mapreduce.OutputCommitter;
33 import org.apache.hadoop.mapreduce.RecordWriter;
34 import org.apache.hadoop.mapreduce.TaskAttemptContext;
35
36
37
38
39
40
41
42
43
44 @SuppressWarnings("rawtypes")
45 public abstract class TextEdgeOutputFormat<I extends WritableComparable,
46 V extends Writable, E extends Writable>
47 extends EdgeOutputFormat<I, V, E> {
48
49 protected GiraphTextOutputFormat textOutputFormat =
50 new GiraphTextOutputFormat() {
51 @Override
52 protected String getSubdir() {
53 return EDGE_OUTPUT_FORMAT_SUBDIR.get(getConf());
54 }
55 };
56
57 @Override
58 public void checkOutputSpecs(JobContext context)
59 throws IOException, InterruptedException {
60 textOutputFormat.checkOutputSpecs(context);
61 }
62
63 @Override
64 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
65 throws IOException, InterruptedException {
66 return textOutputFormat.getOutputCommitter(context);
67 }
68
69
70
71
72
73
74
75
76 @Override
77 public abstract TextEdgeWriter createEdgeWriter(TaskAttemptContext
78 context) throws IOException, InterruptedException;
79
80
81
82
83
84
85 protected abstract class TextEdgeWriter<I extends WritableComparable,
86 V extends Writable, E extends Writable>
87 extends EdgeWriter<I, V, E> {
88
89 private RecordWriter<Text, Text> lineRecordWriter;
90
91 private TaskAttemptContext context;
92
93 @Override
94 public void initialize(TaskAttemptContext context) throws IOException,
95 InterruptedException {
96 lineRecordWriter = createLineRecordWriter(context);
97 this.context = context;
98 }
99
100
101
102
103
104
105
106
107
108
109 protected RecordWriter<Text, Text> createLineRecordWriter(
110 TaskAttemptContext context) throws IOException, InterruptedException {
111 return textOutputFormat.getRecordWriter(context);
112 }
113
114 @Override
115 public void close(TaskAttemptContext context) throws IOException,
116 InterruptedException {
117 lineRecordWriter.close(context);
118 }
119
120
121
122
123
124
125 public RecordWriter<Text, Text> getRecordWriter() {
126 return lineRecordWriter;
127 }
128
129
130
131
132
133
134 public TaskAttemptContext getContext() {
135 return context;
136 }
137 }
138
139
140
141
142
143 protected abstract class TextEdgeWriterToEachLine<
144 I extends WritableComparable, V extends Writable, E extends Writable>
145 extends TextEdgeWriter<I, V, E> {
146
147 @Override
148 public final void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
149 throws IOException, InterruptedException {
150
151
152 getRecordWriter().write(
153 convertEdgeToLine(sourceId, sourceValue, edge), null);
154 }
155
156
157
158
159
160
161
162
163
164
165 protected abstract Text convertEdgeToLine(I sourceId,
166 V sourceValue, Edge<I, E> edge) throws IOException;
167 }
168 }