1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import org.apache.giraph.edge.Edge;
22 import org.apache.giraph.edge.EdgeFactory;
23 import org.apache.hadoop.io.IntWritable;
24 import org.apache.hadoop.io.NullWritable;
25 import org.apache.hadoop.io.Text;
26 import org.apache.hadoop.mapreduce.InputSplit;
27 import org.apache.hadoop.mapreduce.TaskAttemptContext;
28
29 import com.google.common.collect.Lists;
30
31 import java.io.IOException;
32 import java.util.List;
33 import java.util.regex.Pattern;
34
35
36
37
38
39
40
41 public class IntIntNullTextInputFormat extends
42 TextVertexInputFormat<IntWritable, IntWritable, NullWritable> {
43
44 private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
45
46 @Override
47 public TextVertexReader createVertexReader(InputSplit split,
48 TaskAttemptContext context)
49 throws IOException {
50 return new IntIntNullVertexReader();
51 }
52
53
54
55
56 public class IntIntNullVertexReader extends
57 TextVertexReaderFromEachLineProcessed<String[]> {
58
59
60
61 private IntWritable id;
62
63 @Override
64 protected String[] preprocessLine(Text line) throws IOException {
65 String[] tokens = SEPARATOR.split(line.toString());
66 id = new IntWritable(Integer.parseInt(tokens[0]));
67 return tokens;
68 }
69
70 @Override
71 protected IntWritable getId(String[] tokens) throws IOException {
72 return id;
73 }
74
75 @Override
76 protected IntWritable getValue(String[] tokens) throws IOException {
77 return id;
78 }
79
80 @Override
81 protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
82 String[] tokens) throws IOException {
83 List<Edge<IntWritable, NullWritable>> edges =
84 Lists.newArrayListWithCapacity(tokens.length - 1);
85 for (int n = 1; n < tokens.length; n++) {
86 edges.add(EdgeFactory.create(
87 new IntWritable(Integer.parseInt(tokens[n]))));
88 }
89 return edges;
90 }
91 }
92 }