1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import org.apache.giraph.edge.Edge;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.giraph.io.VertexInputFormat;
24 import org.apache.giraph.io.VertexReader;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.io.LongWritable;
27 import org.apache.hadoop.io.Text;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30 import org.apache.hadoop.mapreduce.InputSplit;
31 import org.apache.hadoop.mapreduce.JobContext;
32 import org.apache.hadoop.mapreduce.RecordReader;
33 import org.apache.hadoop.mapreduce.TaskAttemptContext;
34
35 import java.io.IOException;
36 import java.util.List;
37
38
39
40
41
42
43
44
45
46 @SuppressWarnings("rawtypes")
47 public abstract class TextVertexInputFormat<I extends WritableComparable,
48 V extends Writable, E extends Writable>
49 extends VertexInputFormat<I, V, E> {
50
51 protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
52
53 @Override public void checkInputSpecs(Configuration conf) { }
54
55 @Override
56 public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
57 throws IOException, InterruptedException {
58
59
60 return textInputFormat.getVertexSplits(context);
61 }
62
63
64
65
66
67
68
69
70
71
72
73
74 @Override
75 public abstract TextVertexReader createVertexReader(InputSplit split,
76 TaskAttemptContext context) throws IOException;
77
78
79
80
81
82
83
84
85
86
87
88
89 protected abstract class TextVertexReader extends VertexReader<I, V, E> {
90
91 private RecordReader<LongWritable, Text> lineRecordReader;
92
93 private TaskAttemptContext context;
94
95 @Override
96 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
97 throws IOException, InterruptedException {
98 this.context = context;
99 lineRecordReader = createLineRecordReader(inputSplit, context);
100 lineRecordReader.initialize(inputSplit, context);
101 }
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 protected RecordReader<LongWritable, Text>
119 createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
120 throws IOException, InterruptedException {
121 return textInputFormat.createRecordReader(inputSplit, context);
122 }
123
124 @Override
125 public void close() throws IOException {
126 lineRecordReader.close();
127 }
128
129 @Override
130 public float getProgress() throws IOException, InterruptedException {
131 return lineRecordReader.getProgress();
132 }
133
134
135
136
137
138
139 protected RecordReader<LongWritable, Text> getRecordReader() {
140 return lineRecordReader;
141 }
142
143
144
145
146
147
148 protected TaskAttemptContext getContext() {
149 return context;
150 }
151 }
152
153
154
155
156
157 protected abstract class TextVertexReaderFromEachLine extends
158 TextVertexReader {
159
160 @Override
161 public final Vertex<I, V, E> getCurrentVertex() throws IOException,
162 InterruptedException {
163 Text line = getRecordReader().getCurrentValue();
164 Vertex<I, V, E> vertex = getConf().createVertex();
165 vertex.initialize(getId(line), getValue(line), getEdges(line));
166 return vertex;
167 }
168
169 @Override
170 public final boolean nextVertex() throws IOException, InterruptedException {
171 return getRecordReader().nextKeyValue();
172 }
173
174
175
176
177
178
179
180
181
182
183
184 protected abstract I getId(Text line) throws IOException;
185
186
187
188
189
190
191
192
193
194
195
196 protected abstract V getValue(Text line) throws IOException;
197
198
199
200
201
202
203
204
205
206
207
208 protected abstract Iterable<Edge<I, E>> getEdges(Text line) throws
209 IOException;
210
211 }
212
213
214
215
216
217
218
219
220 protected abstract class TextVertexReaderFromEachLineProcessed<T> extends
221 TextVertexReader {
222
223 @Override
224 public final boolean nextVertex() throws IOException, InterruptedException {
225 return getRecordReader().nextKeyValue();
226 }
227
228 @Override
229 public final Vertex<I, V, E> getCurrentVertex() throws IOException,
230 InterruptedException {
231 Text line = getRecordReader().getCurrentValue();
232 Vertex<I, V, E> vertex;
233 T processed = preprocessLine(line);
234 vertex = getConf().createVertex();
235 vertex.initialize(getId(processed), getValue(processed),
236 getEdges(processed));
237 return vertex;
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251 protected abstract T preprocessLine(Text line) throws IOException;
252
253
254
255
256
257
258
259
260
261
262
263 protected abstract I getId(T line) throws IOException;
264
265
266
267
268
269
270
271
272
273
274
275 protected abstract V getValue(T line) throws IOException;
276
277
278
279
280
281
282
283
284
285
286
287
288 protected abstract Iterable<Edge<I, E>> getEdges(T line) throws IOException;
289
290 }
291
292
293
294
295
296
297
298
299
300
301
302 protected abstract class
303 TextVertexReaderFromEachLineProcessedHandlingExceptions<T, X extends
304 Throwable> extends TextVertexReader {
305
306 @Override
307 public final boolean nextVertex() throws IOException, InterruptedException {
308 return getRecordReader().nextKeyValue();
309 }
310
311 @SuppressWarnings("unchecked")
312 @Override
313 public final Vertex<I, V, E> getCurrentVertex() throws IOException,
314 InterruptedException {
315
316 Text line = getRecordReader().getCurrentValue();
317 Vertex<I, V, E> vertex;
318 T processed = null;
319 try {
320 processed = preprocessLine(line);
321 vertex = getConf().createVertex();
322 vertex.initialize(getId(processed), getValue(processed),
323 getEdges(processed));
324 } catch (IOException e) {
325 throw e;
326
327 } catch (Throwable t) {
328 return handleException(line, processed, (X) t);
329
330 }
331 return vertex;
332 }
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347 protected abstract T preprocessLine(Text line) throws X, IOException;
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362 protected abstract I getId(T line) throws X, IOException;
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377 protected abstract V getValue(T line) throws X, IOException;
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393 protected abstract Iterable<Edge<I, E>> getEdges(T line) throws X,
394 IOException;
395
396
397
398
399
400
401
402
403
404
405
406
407
408 protected Vertex<I, V, E> handleException(Text line, T processed, X e) {
409 throw new IllegalArgumentException(e);
410 }
411
412 }
413
414
415 }