1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import com.google.common.collect.Lists;
22 import org.apache.giraph.edge.Edge;
23 import org.apache.giraph.edge.EdgeFactory;
24 import org.apache.hadoop.io.IntWritable;
25 import org.apache.hadoop.io.NullWritable;
26 import org.apache.hadoop.io.Text;
27 import org.apache.hadoop.mapreduce.InputSplit;
28 import org.apache.hadoop.mapreduce.TaskAttemptContext;
29
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.regex.Pattern;
33
34
35
36
37
38
39
40 public class IntIntNullTextVertexInputFormat
41 extends
42 TextVertexInputFormat<IntWritable, IntWritable, NullWritable> {
43
44 private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
45
46 @Override
47 public TextVertexReader createVertexReader(InputSplit split,
48 TaskAttemptContext context)
49 throws IOException {
50 return new IntIntNullVertexReader();
51 }
52
53
54
55
56
57 public class IntIntNullVertexReader extends
58 TextVertexReaderFromEachLineProcessed<String[]> {
59
60 private IntWritable id;
61
62 private IntWritable value;
63
64 @Override
65 protected String[] preprocessLine(Text line) throws IOException {
66 String[] tokens = SEPARATOR.split(line.toString());
67 id = new IntWritable(Integer.parseInt(tokens[0]));
68 value = new IntWritable(Integer.parseInt(tokens[1]));
69 return tokens;
70 }
71
72 @Override
73 protected IntWritable getId(String[] tokens) throws IOException {
74 return id;
75 }
76
77 @Override
78 protected IntWritable getValue(String[] tokens) throws IOException {
79 return value;
80 }
81
82 @Override
83 protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
84 String[] tokens) throws IOException {
85 List<Edge<IntWritable, NullWritable>> edges =
86 Lists.newArrayListWithCapacity(tokens.length - 2);
87 for (int n = 2; n < tokens.length; n++) {
88 edges.add(EdgeFactory.create(
89 new IntWritable(Integer.parseInt(tokens[n]))));
90 }
91 return edges;
92 }
93 }
94 }