1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import java.io.IOException;
22 import java.util.List;
23 import org.apache.giraph.edge.Edge;
24 import org.apache.giraph.edge.EdgeFactory;
25 import org.apache.giraph.io.EdgeInputFormat;
26 import org.apache.giraph.io.EdgeReader;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.io.LongWritable;
29 import org.apache.hadoop.io.Text;
30 import org.apache.hadoop.io.Writable;
31 import org.apache.hadoop.io.WritableComparable;
32 import org.apache.hadoop.mapreduce.InputSplit;
33 import org.apache.hadoop.mapreduce.JobContext;
34 import org.apache.hadoop.mapreduce.RecordReader;
35 import org.apache.hadoop.mapreduce.TaskAttemptContext;
36
37
38
39
40
41
42
43
44 @SuppressWarnings("rawtypes")
45 public abstract class TextEdgeInputFormat<I extends WritableComparable,
46 E extends Writable> extends EdgeInputFormat<I, E> {
47
48 protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
49
50 @Override public void checkInputSpecs(Configuration conf) { }
51
52 @Override
53 public List<InputSplit> getSplits(
54 JobContext context, int minSplitCountHint) throws IOException,
55 InterruptedException {
56
57
58 return textInputFormat.getEdgeSplits(context);
59 }
60
61
62
63
64 protected abstract class TextEdgeReader extends EdgeReader<I, E> {
65
66 private RecordReader<LongWritable, Text> lineRecordReader;
67
68 private TaskAttemptContext context;
69
70 @Override
71 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
72 throws IOException, InterruptedException {
73 this.context = context;
74 lineRecordReader = createLineRecordReader(inputSplit, context);
75 lineRecordReader.initialize(inputSplit, context);
76 }
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 protected RecordReader<LongWritable, Text>
94 createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
95 throws IOException, InterruptedException {
96 return textInputFormat.createRecordReader(inputSplit, context);
97 }
98
99 @Override
100 public void close() throws IOException {
101 lineRecordReader.close();
102 }
103
104 @Override
105 public float getProgress() throws IOException, InterruptedException {
106 return lineRecordReader.getProgress();
107 }
108
109
110
111
112
113
114 protected RecordReader<LongWritable, Text> getRecordReader() {
115 return lineRecordReader;
116 }
117
118
119
120
121
122
123 protected TaskAttemptContext getContext() {
124 return context;
125 }
126 }
127
128
129
130
131
132 protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader {
133 @Override
134 public final I getCurrentSourceId() throws IOException,
135 InterruptedException {
136 Text line = getRecordReader().getCurrentValue();
137 return getSourceVertexId(line);
138 }
139
140 @Override
141 public final Edge<I, E> getCurrentEdge() throws IOException,
142 InterruptedException {
143 Text line = getRecordReader().getCurrentValue();
144 I targetVertexId = getTargetVertexId(line);
145 E edgeValue = getValue(line);
146 return EdgeFactory.create(targetVertexId, edgeValue);
147 }
148
149 @Override
150 public final boolean nextEdge() throws IOException, InterruptedException {
151 return getRecordReader().nextKeyValue();
152 }
153
154
155
156
157
158
159
160
161
162
163
164 protected abstract I getSourceVertexId(Text line) throws IOException;
165
166
167
168
169
170
171
172
173
174
175
176
177 protected abstract I getTargetVertexId(Text line) throws IOException;
178
179
180
181
182
183
184
185
186
187
188
189 protected abstract E getValue(Text line) throws IOException;
190 }
191
192
193
194
195
196
197
198
199 protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends
200 TextEdgeReader {
201
202 private T processedLine;
203
204 @Override
205 public I getCurrentSourceId() throws IOException, InterruptedException {
206 T processed = processCurrentLine();
207 return getSourceVertexId(processed);
208 }
209
210 @Override
211 public final Edge<I, E> getCurrentEdge() throws IOException,
212 InterruptedException {
213 T processed = processCurrentLine();
214 I targetVertexId = getTargetVertexId(processed);
215 E edgeValue = getValue(processed);
216 return EdgeFactory.create(targetVertexId, edgeValue);
217 }
218
219
220
221
222
223
224
225
226 private T processCurrentLine() throws IOException, InterruptedException {
227 if (processedLine == null) {
228 Text line = getRecordReader().getCurrentValue();
229 processedLine = preprocessLine(line);
230 }
231 return processedLine;
232 }
233
234 @Override
235 public final boolean nextEdge() throws IOException, InterruptedException {
236 processedLine = null;
237 return getRecordReader().nextKeyValue();
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251 protected abstract T preprocessLine(Text line) throws IOException;
252
253
254
255
256
257
258
259
260
261
262
263 protected abstract I getTargetVertexId(T line) throws IOException;
264
265
266
267
268
269
270
271
272
273
274
275 protected abstract I getSourceVertexId(T line) throws IOException;
276
277
278
279
280
281
282
283
284
285
286
287 protected abstract E getValue(T line) throws IOException;
288 }
289 }