1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.formats;
19
20 import org.apache.giraph.edge.Edge;
21 import org.apache.giraph.edge.EdgeFactory;
22 import org.apache.hadoop.io.LongWritable;
23 import org.apache.hadoop.io.NullWritable;
24 import org.apache.hadoop.io.Text;
25 import org.apache.hadoop.mapreduce.InputSplit;
26 import org.apache.hadoop.mapreduce.TaskAttemptContext;
27
28 import com.google.common.collect.Lists;
29
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.regex.Pattern;
33
34
35
36
37
38
39
40 public class LongLongNullTextInputFormat extends
41 TextVertexInputFormat<LongWritable, LongWritable, NullWritable> {
42
43 private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
44
45 @Override
46 public TextVertexReader createVertexReader(InputSplit split,
47 TaskAttemptContext context)
48 throws IOException {
49 return new LongLongNullVertexReader();
50 }
51
52
53
54
55 public class LongLongNullVertexReader extends
56 TextVertexReaderFromEachLineProcessed<String[]> {
57
58 private LongWritable id;
59
60 @Override
61 protected String[] preprocessLine(Text line) throws IOException {
62 String[] tokens = SEPARATOR.split(line.toString());
63 id = new LongWritable(Long.parseLong(tokens[0]));
64 return tokens;
65 }
66
67 @Override
68 protected LongWritable getId(String[] tokens) throws IOException {
69 return id;
70 }
71
72 @Override
73 protected LongWritable getValue(String[] tokens) throws IOException {
74 return id;
75 }
76
77 @Override
78 protected Iterable<Edge<LongWritable, NullWritable>> getEdges(
79 String[] tokens) throws IOException {
80 List<Edge<LongWritable, NullWritable>> edges =
81 Lists.newArrayListWithCapacity(tokens.length - 1);
82 for (int n = 1; n < tokens.length; n++) {
83 edges.add(EdgeFactory.create(
84 new LongWritable(Long.parseLong(tokens[n]))));
85 }
86 return edges;
87 }
88 }
89 }