1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import org.apache.giraph.io.EdgeReader;
22 import org.apache.giraph.utils.IntPair;
23 import org.apache.hadoop.io.IntWritable;
24 import org.apache.hadoop.io.NullWritable;
25 import org.apache.hadoop.io.Text;
26 import org.apache.hadoop.mapreduce.InputSplit;
27 import org.apache.hadoop.mapreduce.TaskAttemptContext;
28
29 import java.io.IOException;
30 import java.util.regex.Pattern;
31
32
33
34
35
36
37
38 public class IntNullTextEdgeInputFormat extends
39 TextEdgeInputFormat<IntWritable, NullWritable> {
40
41 private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
42
43 @Override
44 public EdgeReader<IntWritable, NullWritable> createEdgeReader(
45 InputSplit split, TaskAttemptContext context) throws IOException {
46 return new IntNullTextEdgeReader();
47 }
48
49
50
51
52
53 public class IntNullTextEdgeReader extends
54 TextEdgeReaderFromEachLineProcessed<IntPair> {
55 @Override
56 protected IntPair preprocessLine(Text line) throws IOException {
57 String[] tokens = SEPARATOR.split(line.toString());
58 return new IntPair(Integer.parseInt(tokens[0]),
59 Integer.parseInt(tokens[1]));
60 }
61
62 @Override
63 protected IntWritable getSourceVertexId(IntPair endpoints)
64 throws IOException {
65 return new IntWritable(endpoints.getFirst());
66 }
67
68 @Override
69 protected IntWritable getTargetVertexId(IntPair endpoints)
70 throws IOException {
71 return new IntWritable(endpoints.getSecond());
72 }
73
74 @Override
75 protected NullWritable getValue(IntPair endpoints) throws IOException {
76 return NullWritable.get();
77 }
78 }
79 }