Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
TextVertexInputFormat |
|
| 1.1851851851851851;1.185 | ||||
TextVertexInputFormat$TextVertexReader |
|
| 1.1851851851851851;1.185 | ||||
TextVertexInputFormat$TextVertexReaderFromEachLine |
|
| 1.1851851851851851;1.185 | ||||
TextVertexInputFormat$TextVertexReaderFromEachLineProcessed |
|
| 1.1851851851851851;1.185 | ||||
TextVertexInputFormat$TextVertexReaderFromEachLineProcessedHandlingExceptions |
|
| 1.1851851851851851;1.185 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.io.formats; | |
20 | ||
21 | import org.apache.giraph.edge.Edge; | |
22 | import org.apache.giraph.graph.Vertex; | |
23 | import org.apache.giraph.io.VertexInputFormat; | |
24 | import org.apache.giraph.io.VertexReader; | |
25 | import org.apache.hadoop.conf.Configuration; | |
26 | import org.apache.hadoop.io.LongWritable; | |
27 | import org.apache.hadoop.io.Text; | |
28 | import org.apache.hadoop.io.Writable; | |
29 | import org.apache.hadoop.io.WritableComparable; | |
30 | import org.apache.hadoop.mapreduce.InputSplit; | |
31 | import org.apache.hadoop.mapreduce.JobContext; | |
32 | import org.apache.hadoop.mapreduce.RecordReader; | |
33 | import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
34 | ||
35 | import java.io.IOException; | |
36 | import java.util.List; | |
37 | ||
38 | /** | |
39 | * Abstract class that users should subclass to use their own text based | |
40 | * vertex input format. | |
41 | * | |
42 | * @param <I> Vertex index value | |
43 | * @param <V> Vertex value | |
44 | * @param <E> Edge value | |
45 | */ | |
46 | 0 | @SuppressWarnings("rawtypes") |
47 | 0 | public abstract class TextVertexInputFormat<I extends WritableComparable, |
48 | V extends Writable, E extends Writable> | |
49 | extends VertexInputFormat<I, V, E> { | |
50 | /** Uses the GiraphTextInputFormat to do everything */ | |
51 | 0 | protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat(); |
52 | ||
53 | 0 | @Override public void checkInputSpecs(Configuration conf) { } |
54 | ||
55 | @Override | |
56 | public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) | |
57 | throws IOException, InterruptedException { | |
58 | // Ignore the hint of numWorkers here since we are using | |
59 | // GiraphTextInputFormat to do this for us | |
60 | 0 | return textInputFormat.getVertexSplits(context); |
61 | } | |
62 | ||
63 | /** | |
64 | * The factory method which produces the {@link TextVertexReader} used by this | |
65 | * input format. | |
66 | * | |
67 | * @param split | |
68 | * the split to be read | |
69 | * @param context | |
70 | * the information about the task | |
71 | * @return | |
72 | * the text vertex reader to be used | |
73 | */ | |
74 | @Override | |
75 | public abstract TextVertexReader createVertexReader(InputSplit split, | |
76 | TaskAttemptContext context) throws IOException; | |
77 | ||
78 | /** | |
79 | * Abstract class to be implemented by the user based on their specific | |
80 | * vertex input. Easiest to ignore the key value separator and only use | |
81 | * key instead. | |
82 | * | |
83 | * When reading a vertex from each line, extend | |
84 | * {@link TextVertexReaderFromEachLine}. If you need to preprocess each line | |
85 | * first, then extend {@link TextVertexReaderFromEachLineProcessed}. If you | |
86 | * need common exception handling while preprocessing, then extend | |
87 | * {@link TextVertexReaderFromEachLineProcessedHandlingExceptions}. | |
88 | */ | |
89 | 0 | protected abstract class TextVertexReader extends VertexReader<I, V, E> { |
90 | /** Internal line record reader */ | |
91 | private RecordReader<LongWritable, Text> lineRecordReader; | |
92 | /** Context passed to initialize */ | |
93 | private TaskAttemptContext context; | |
94 | ||
95 | @Override | |
96 | public void initialize(InputSplit inputSplit, TaskAttemptContext context) | |
97 | throws IOException, InterruptedException { | |
98 | 0 | this.context = context; |
99 | 0 | lineRecordReader = createLineRecordReader(inputSplit, context); |
100 | 0 | lineRecordReader.initialize(inputSplit, context); |
101 | 0 | } |
102 | ||
103 | /** | |
104 | * Create the line record reader. Override this to use a different | |
105 | * underlying record reader (useful for testing). | |
106 | * | |
107 | * @param inputSplit | |
108 | * the split to read | |
109 | * @param context | |
110 | * the context passed to initialize | |
111 | * @return | |
112 | * the record reader to be used | |
113 | * @throws IOException | |
114 | * exception that can be thrown during creation | |
115 | * @throws InterruptedException | |
116 | * exception that can be thrown during creation | |
117 | */ | |
118 | protected RecordReader<LongWritable, Text> | |
119 | createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context) | |
120 | throws IOException, InterruptedException { | |
121 | 0 | return textInputFormat.createRecordReader(inputSplit, context); |
122 | } | |
123 | ||
124 | @Override | |
125 | public void close() throws IOException { | |
126 | 0 | lineRecordReader.close(); |
127 | 0 | } |
128 | ||
129 | @Override | |
130 | public float getProgress() throws IOException, InterruptedException { | |
131 | 0 | return lineRecordReader.getProgress(); |
132 | } | |
133 | ||
134 | /** | |
135 | * Get the line record reader. | |
136 | * | |
137 | * @return Record reader to be used for reading. | |
138 | */ | |
139 | protected RecordReader<LongWritable, Text> getRecordReader() { | |
140 | 0 | return lineRecordReader; |
141 | } | |
142 | ||
143 | /** | |
144 | * Get the context. | |
145 | * | |
146 | * @return Context passed to initialize. | |
147 | */ | |
148 | protected TaskAttemptContext getContext() { | |
149 | 0 | return context; |
150 | } | |
151 | } | |
152 | ||
153 | /** | |
154 | * Abstract class to be implemented by the user to read a vertex from each | |
155 | * text line. | |
156 | */ | |
157 | 0 | protected abstract class TextVertexReaderFromEachLine extends |
158 | TextVertexReader { | |
159 | ||
160 | @Override | |
161 | public final Vertex<I, V, E> getCurrentVertex() throws IOException, | |
162 | InterruptedException { | |
163 | 0 | Text line = getRecordReader().getCurrentValue(); |
164 | 0 | Vertex<I, V, E> vertex = getConf().createVertex(); |
165 | 0 | vertex.initialize(getId(line), getValue(line), getEdges(line)); |
166 | 0 | return vertex; |
167 | } | |
168 | ||
169 | @Override | |
170 | public final boolean nextVertex() throws IOException, InterruptedException { | |
171 | 0 | return getRecordReader().nextKeyValue(); |
172 | } | |
173 | ||
174 | /** | |
175 | * Reads vertex id from the current line. | |
176 | * | |
177 | * @param line | |
178 | * the current line | |
179 | * @return | |
180 | * the vertex id corresponding to the line | |
181 | * @throws IOException | |
182 | * exception that can be thrown while reading | |
183 | */ | |
184 | protected abstract I getId(Text line) throws IOException; | |
185 | ||
186 | /** | |
187 | * Reads vertex value from the current line. | |
188 | * | |
189 | * @param line | |
190 | * the current line | |
191 | * @return | |
192 | * the vertex value corresponding to the line | |
193 | * @throws IOException | |
194 | * exception that can be thrown while reading | |
195 | */ | |
196 | protected abstract V getValue(Text line) throws IOException; | |
197 | ||
198 | /** | |
199 | * Reads edges value from the current line. | |
200 | * | |
201 | * @param line | |
202 | * the current line | |
203 | * @return | |
204 | * the edges | |
205 | * @throws IOException | |
206 | * exception that can be thrown while reading | |
207 | */ | |
208 | protected abstract Iterable<Edge<I, E>> getEdges(Text line) throws | |
209 | IOException; | |
210 | ||
211 | } | |
212 | ||
213 | /** | |
214 | * Abstract class to be implemented by the user to read a vertex from each | |
215 | * text line after preprocessing it. | |
216 | * | |
217 | * @param <T> | |
218 | * The resulting type of preprocessing. | |
219 | */ | |
220 | 0 | protected abstract class TextVertexReaderFromEachLineProcessed<T> extends |
221 | TextVertexReader { | |
222 | ||
223 | @Override | |
224 | public final boolean nextVertex() throws IOException, InterruptedException { | |
225 | 0 | return getRecordReader().nextKeyValue(); |
226 | } | |
227 | ||
228 | @Override | |
229 | public final Vertex<I, V, E> getCurrentVertex() throws IOException, | |
230 | InterruptedException { | |
231 | 0 | Text line = getRecordReader().getCurrentValue(); |
232 | Vertex<I, V, E> vertex; | |
233 | 0 | T processed = preprocessLine(line); |
234 | 0 | vertex = getConf().createVertex(); |
235 | 0 | vertex.initialize(getId(processed), getValue(processed), |
236 | 0 | getEdges(processed)); |
237 | 0 | return vertex; |
238 | } | |
239 | ||
240 | /** | |
241 | * Preprocess the line so other methods can easily read necessary | |
242 | * information for creating vertex. | |
243 | * | |
244 | * @param line | |
245 | * the current line to be read | |
246 | * @return | |
247 | * the preprocessed object | |
248 | * @throws IOException | |
249 | * exception that can be thrown while reading | |
250 | */ | |
251 | protected abstract T preprocessLine(Text line) throws IOException; | |
252 | ||
253 | /** | |
254 | * Reads vertex id from the preprocessed line. | |
255 | * | |
256 | * @param line | |
257 | * the object obtained by preprocessing the line | |
258 | * @return | |
259 | * the vertex id | |
260 | * @throws IOException | |
261 | * exception that can be thrown while reading | |
262 | */ | |
263 | protected abstract I getId(T line) throws IOException; | |
264 | ||
265 | /** | |
266 | * Reads vertex value from the preprocessed line. | |
267 | * | |
268 | * @param line | |
269 | * the object obtained by preprocessing the line | |
270 | * @return | |
271 | * the vertex value | |
272 | * @throws IOException | |
273 | * exception that can be thrown while reading | |
274 | */ | |
275 | protected abstract V getValue(T line) throws IOException; | |
276 | ||
277 | /** | |
278 | * Reads edges from the preprocessed line. | |
279 | * | |
280 | * | |
281 | * @param line | |
282 | * the object obtained by preprocessing the line | |
283 | * @return | |
284 | * the edges | |
285 | * @throws IOException | |
286 | * exception that can be thrown while reading | |
287 | */ | |
288 | protected abstract Iterable<Edge<I, E>> getEdges(T line) throws IOException; | |
289 | ||
290 | } | |
291 | ||
292 | // CHECKSTYLE: stop RedundantThrows | |
293 | /** | |
294 | * Abstract class to be implemented by the user to read a vertex from each | |
295 | * text line after preprocessing it with exception handling. | |
296 | * | |
297 | * @param <T> | |
298 | * The resulting type of preprocessing. | |
299 | * @param <X> | |
300 | * The exception type that can be thrown due to preprocessing. | |
301 | */ | |
302 | 0 | protected abstract class |
303 | TextVertexReaderFromEachLineProcessedHandlingExceptions<T, X extends | |
304 | Throwable> extends TextVertexReader { | |
305 | ||
306 | @Override | |
307 | public final boolean nextVertex() throws IOException, InterruptedException { | |
308 | 0 | return getRecordReader().nextKeyValue(); |
309 | } | |
310 | ||
311 | @SuppressWarnings("unchecked") | |
312 | @Override | |
313 | public final Vertex<I, V, E> getCurrentVertex() throws IOException, | |
314 | InterruptedException { | |
315 | // Note we are reading from value only since key is the line number | |
316 | 0 | Text line = getRecordReader().getCurrentValue(); |
317 | Vertex<I, V, E> vertex; | |
318 | 0 | T processed = null; |
319 | try { | |
320 | 0 | processed = preprocessLine(line); |
321 | 0 | vertex = getConf().createVertex(); |
322 | 0 | vertex.initialize(getId(processed), getValue(processed), |
323 | 0 | getEdges(processed)); |
324 | 0 | } catch (IOException e) { |
325 | 0 | throw e; |
326 | // CHECKSTYLE: stop IllegalCatch | |
327 | 0 | } catch (Throwable t) { |
328 | 0 | return handleException(line, processed, (X) t); |
329 | // CHECKSTYLE: resume IllegalCatch | |
330 | 0 | } |
331 | 0 | return vertex; |
332 | } | |
333 | ||
334 | /** | |
335 | * Preprocess the line so other methods can easily read necessary | |
336 | * information for creating vertex. | |
337 | * | |
338 | * @param line | |
339 | * the current line to be read | |
340 | * @return | |
341 | * the preprocessed object | |
342 | * @throws X | |
343 | * exception that can be thrown while preprocessing the line | |
344 | * @throws IOException | |
345 | * exception that can be thrown while reading | |
346 | */ | |
347 | protected abstract T preprocessLine(Text line) throws X, IOException; | |
348 | ||
349 | /** | |
350 | * Reads vertex id from the preprocessed line. | |
351 | * | |
352 | * @param line | |
353 | * the object obtained by preprocessing the line | |
354 | * @return | |
355 | * the vertex id | |
356 | * @throws X | |
357 | * exception that can be thrown while reading the preprocessed | |
358 | * object | |
359 | * @throws IOException | |
360 | * exception that can be thrown while reading | |
361 | */ | |
362 | protected abstract I getId(T line) throws X, IOException; | |
363 | ||
364 | /** | |
365 | * Reads vertex value from the preprocessed line. | |
366 | * | |
367 | * @param line | |
368 | * the object obtained by preprocessing the line | |
369 | * @return | |
370 | * the vertex value | |
371 | * @throws X | |
372 | * exception that can be thrown while reading the preprocessed | |
373 | * object | |
374 | * @throws IOException | |
375 | * exception that can be thrown while reading | |
376 | */ | |
377 | protected abstract V getValue(T line) throws X, IOException; | |
378 | ||
379 | /** | |
380 | * Reads edges from the preprocessed line. | |
381 | * | |
382 | * | |
383 | * @param line | |
384 | * the object obtained by preprocessing the line | |
385 | * @return | |
386 | * the edges | |
387 | * @throws X | |
388 | * exception that can be thrown while reading the preprocessed | |
389 | * object | |
390 | * @throws IOException | |
391 | * exception that can be thrown while reading | |
392 | */ | |
393 | protected abstract Iterable<Edge<I, E>> getEdges(T line) throws X, | |
394 | IOException; | |
395 | ||
396 | /** | |
397 | * Handles exceptions while reading vertex from each line. | |
398 | * | |
399 | * @param line | |
400 | * the line that was being read when the exception was thrown | |
401 | * @param processed | |
402 | * the object obtained by preprocessing the line. Can be null if | |
403 | * exception was thrown during preprocessing. | |
404 | * @param e | |
405 | * the exception thrown while reading the line | |
406 | * @return the recovered/alternative vertex to be used | |
407 | */ | |
408 | protected Vertex<I, V, E> handleException(Text line, T processed, X e) { | |
409 | 0 | throw new IllegalArgumentException(e); |
410 | } | |
411 | ||
412 | } | |
413 | // CHECKSTYLE: resume RedundantThrows | |
414 | ||
415 | } |