1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.examples.scc;
19
20 import java.io.IOException;
21 import java.util.List;
22 import java.util.regex.Pattern;
23
24 import org.apache.giraph.edge.Edge;
25 import org.apache.giraph.edge.EdgeFactory;
26 import org.apache.giraph.io.formats.TextVertexInputFormat;
27 import org.apache.hadoop.io.LongWritable;
28 import org.apache.hadoop.io.NullWritable;
29 import org.apache.hadoop.io.Text;
30 import org.apache.hadoop.mapreduce.InputSplit;
31 import org.apache.hadoop.mapreduce.TaskAttemptContext;
32
33 import com.google.common.collect.Lists;
34
35
36
37
38
39
40
41 public class SccLongLongNullTextInputFormat extends
42 TextVertexInputFormat<LongWritable, SccVertexValue, NullWritable> {
43
44 private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
45
46 @Override
47 public TextVertexReader createVertexReader(InputSplit split,
48 TaskAttemptContext context)
49 throws IOException {
50 return new LongLongNullVertexReader();
51 }
52
53
54
55
56 public class LongLongNullVertexReader extends
57 TextVertexReaderFromEachLineProcessed<String[]> {
58
59 private LongWritable id;
60
61 @Override
62 protected String[] preprocessLine(Text line) throws IOException {
63 String[] tokens = SEPARATOR.split(line.toString());
64 id = new LongWritable(Long.parseLong(tokens[0]));
65 return tokens;
66 }
67
68 @Override
69 protected LongWritable getId(String[] tokens) throws IOException {
70 return id;
71 }
72
73 @Override
74 protected SccVertexValue getValue(String[] tokens) throws IOException {
75 return new SccVertexValue(Long.parseLong(tokens[0]));
76 }
77
78 @Override
79 protected Iterable<Edge<LongWritable, NullWritable>> getEdges(
80 String[] tokens) throws IOException {
81 List<Edge<LongWritable, NullWritable>> edges =
82 Lists.newArrayListWithCapacity(tokens.length - 1);
83 for (int n = 1; n < tokens.length; n++) {
84 edges.add(EdgeFactory.create(
85 new LongWritable(Long.parseLong(tokens[n]))));
86 }
87 return edges;
88 }
89 }
90 }