1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.formats;
19
20 import com.google.common.collect.Lists;
21 import java.io.IOException;
22 import java.util.List;
23 import org.apache.giraph.edge.Edge;
24 import org.apache.hadoop.io.Text;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27 import org.apache.hadoop.mapreduce.InputSplit;
28 import org.apache.hadoop.mapreduce.TaskAttemptContext;
29
30
31
32
33
34
35
36
37
38
39
40
41
42 @SuppressWarnings("rawtypes")
43 public abstract class AdjacencyListTextVertexInputFormat<I extends
44 WritableComparable, V extends Writable, E extends Writable> extends
45 TextVertexInputFormat<I, V, E> {
46
47 public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
48
49 public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
50
51
52
53
54 public interface LineSanitizer {
55
56
57
58
59
60
61 String sanitize(String s);
62 }
63
64 @Override
65 public abstract AdjacencyListTextVertexReader createVertexReader(
66 InputSplit split, TaskAttemptContext context);
67
68
69
70
71 protected abstract class AdjacencyListTextVertexReader extends
72 TextVertexReaderFromEachLineProcessed<String[]> {
73
74 private String splitValue = null;
75
76 private final LineSanitizer sanitizer;
77
78
79
80
81 public AdjacencyListTextVertexReader() {
82 this(null);
83 }
84
85
86
87
88
89
90 public AdjacencyListTextVertexReader(LineSanitizer sanitizer) {
91 this.sanitizer = sanitizer;
92 }
93
94 @Override
95 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
96 throws IOException, InterruptedException {
97 super.initialize(inputSplit, context);
98 splitValue =
99 getConf().get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
100 }
101
102 @Override
103 protected String[] preprocessLine(Text line) throws IOException {
104 String sanitizedLine;
105 if (sanitizer != null) {
106 sanitizedLine = sanitizer.sanitize(line.toString());
107 } else {
108 sanitizedLine = line.toString();
109 }
110 String [] values = sanitizedLine.split(splitValue);
111 if ((values.length < 2) || (values.length % 2 != 0)) {
112 throw new IllegalArgumentException(
113 "Line did not split correctly: " + line);
114 }
115 return values;
116 }
117
118 @Override
119 protected I getId(String[] values) throws IOException {
120 return decodeId(values[0]);
121 }
122
123
124
125
126
127
128
129 public abstract I decodeId(String s);
130
131 @Override
132 protected V getValue(String[] values) throws IOException {
133 return decodeValue(values[1]);
134 }
135
136
137
138
139
140
141
142
143 public abstract V decodeValue(String s);
144
145 @Override
146 protected Iterable<Edge<I, E>> getEdges(String[] values) throws
147 IOException {
148 int i = 2;
149 List<Edge<I, E>> edges = Lists.newLinkedList();
150 while (i < values.length) {
151 edges.add(decodeEdge(values[i], values[i + 1]));
152 i += 2;
153 }
154 return edges;
155 }
156
157
158
159
160
161
162
163
164 public abstract Edge<I, E> decodeEdge(String id, String value);
165
166 }
167 }