Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
TextEdgeInputFormat |
|
| 1.0454545454545454;1.045 | ||||
TextEdgeInputFormat$TextEdgeReader |
|
| 1.0454545454545454;1.045 | ||||
TextEdgeInputFormat$TextEdgeReaderFromEachLine |
|
| 1.0454545454545454;1.045 | ||||
TextEdgeInputFormat$TextEdgeReaderFromEachLineProcessed |
|
| 1.0454545454545454;1.045 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.io.formats; | |
20 | ||
21 | import java.io.IOException; | |
22 | import java.util.List; | |
23 | import org.apache.giraph.edge.Edge; | |
24 | import org.apache.giraph.edge.EdgeFactory; | |
25 | import org.apache.giraph.io.EdgeInputFormat; | |
26 | import org.apache.giraph.io.EdgeReader; | |
27 | import org.apache.hadoop.conf.Configuration; | |
28 | import org.apache.hadoop.io.LongWritable; | |
29 | import org.apache.hadoop.io.Text; | |
30 | import org.apache.hadoop.io.Writable; | |
31 | import org.apache.hadoop.io.WritableComparable; | |
32 | import org.apache.hadoop.mapreduce.InputSplit; | |
33 | import org.apache.hadoop.mapreduce.JobContext; | |
34 | import org.apache.hadoop.mapreduce.RecordReader; | |
35 | import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
36 | ||
37 | /** | |
38 | * Abstract class that users should subclass to use their own text based | |
39 | * edge output format. | |
40 | * | |
41 | * @param <I> Vertex id | |
42 | * @param <E> Edge data | |
43 | */ | |
44 | @SuppressWarnings("rawtypes") | |
45 | 0 | public abstract class TextEdgeInputFormat<I extends WritableComparable, |
46 | E extends Writable> extends EdgeInputFormat<I, E> { | |
47 | /** Underlying GiraphTextInputFormat. */ | |
48 | 0 | protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat(); |
49 | ||
50 | 0 | @Override public void checkInputSpecs(Configuration conf) { } |
51 | ||
52 | @Override | |
53 | public List<InputSplit> getSplits( | |
54 | JobContext context, int minSplitCountHint) throws IOException, | |
55 | InterruptedException { | |
56 | // Ignore the hint of numWorkers here since we are using | |
57 | // GiraphTextInputFormat to do this for us | |
58 | 0 | return textInputFormat.getEdgeSplits(context); |
59 | } | |
60 | ||
61 | /** | |
62 | * {@link EdgeReader} for {@link TextEdgeInputFormat}. | |
63 | */ | |
64 | 0 | protected abstract class TextEdgeReader extends EdgeReader<I, E> { |
65 | /** Internal line record reader */ | |
66 | private RecordReader<LongWritable, Text> lineRecordReader; | |
67 | /** Context passed to initialize */ | |
68 | private TaskAttemptContext context; | |
69 | ||
70 | @Override | |
71 | public void initialize(InputSplit inputSplit, TaskAttemptContext context) | |
72 | throws IOException, InterruptedException { | |
73 | 0 | this.context = context; |
74 | 0 | lineRecordReader = createLineRecordReader(inputSplit, context); |
75 | 0 | lineRecordReader.initialize(inputSplit, context); |
76 | 0 | } |
77 | ||
78 | /** | |
79 | * Create the line record reader. Override this to use a different | |
80 | * underlying record reader (useful for testing). | |
81 | * | |
82 | * @param inputSplit | |
83 | * the split to read | |
84 | * @param context | |
85 | * the context passed to initialize | |
86 | * @return | |
87 | * the record reader to be used | |
88 | * @throws IOException | |
89 | * exception that can be thrown during creation | |
90 | * @throws InterruptedException | |
91 | * exception that can be thrown during creation | |
92 | */ | |
93 | protected RecordReader<LongWritable, Text> | |
94 | createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context) | |
95 | throws IOException, InterruptedException { | |
96 | 0 | return textInputFormat.createRecordReader(inputSplit, context); |
97 | } | |
98 | ||
99 | @Override | |
100 | public void close() throws IOException { | |
101 | 0 | lineRecordReader.close(); |
102 | 0 | } |
103 | ||
104 | @Override | |
105 | public float getProgress() throws IOException, InterruptedException { | |
106 | 0 | return lineRecordReader.getProgress(); |
107 | } | |
108 | ||
109 | /** | |
110 | * Get the line record reader. | |
111 | * | |
112 | * @return Record reader to be used for reading. | |
113 | */ | |
114 | protected RecordReader<LongWritable, Text> getRecordReader() { | |
115 | 0 | return lineRecordReader; |
116 | } | |
117 | ||
118 | /** | |
119 | * Get the context. | |
120 | * | |
121 | * @return Context passed to initialize. | |
122 | */ | |
123 | protected TaskAttemptContext getContext() { | |
124 | 0 | return context; |
125 | } | |
126 | } | |
127 | ||
128 | /** | |
129 | * Abstract class to be implemented by the user to read an edge from each | |
130 | * text line. | |
131 | */ | |
132 | 0 | protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader { |
133 | @Override | |
134 | public final I getCurrentSourceId() throws IOException, | |
135 | InterruptedException { | |
136 | 0 | Text line = getRecordReader().getCurrentValue(); |
137 | 0 | return getSourceVertexId(line); |
138 | } | |
139 | ||
140 | @Override | |
141 | public final Edge<I, E> getCurrentEdge() throws IOException, | |
142 | InterruptedException { | |
143 | 0 | Text line = getRecordReader().getCurrentValue(); |
144 | 0 | I targetVertexId = getTargetVertexId(line); |
145 | 0 | E edgeValue = getValue(line); |
146 | 0 | return EdgeFactory.create(targetVertexId, edgeValue); |
147 | } | |
148 | ||
149 | @Override | |
150 | public final boolean nextEdge() throws IOException, InterruptedException { | |
151 | 0 | return getRecordReader().nextKeyValue(); |
152 | } | |
153 | ||
154 | /** | |
155 | * Reads source vertex id from the current line. | |
156 | * | |
157 | * @param line | |
158 | * the current line | |
159 | * @return | |
160 | * the source vertex id corresponding to the line | |
161 | * @throws IOException | |
162 | * exception that can be thrown while reading | |
163 | */ | |
164 | protected abstract I getSourceVertexId(Text line) throws IOException; | |
165 | ||
166 | ||
167 | /** | |
168 | * Reads target vertex id from the current line. | |
169 | * | |
170 | * @param line | |
171 | * the current line | |
172 | * @return | |
173 | * the target vertex id corresponding to the line | |
174 | * @throws IOException | |
175 | * exception that can be thrown while reading | |
176 | */ | |
177 | protected abstract I getTargetVertexId(Text line) throws IOException; | |
178 | ||
179 | /** | |
180 | * Reads edge value from the current line. | |
181 | * | |
182 | * @param line | |
183 | * the current line | |
184 | * @return | |
185 | * the edge value corresponding to the line | |
186 | * @throws IOException | |
187 | * exception that can be thrown while reading | |
188 | */ | |
189 | protected abstract E getValue(Text line) throws IOException; | |
190 | } | |
191 | ||
192 | /** | |
193 | * Abstract class to be implemented by the user to read an edge from each | |
194 | * text line after preprocessing it. | |
195 | * | |
196 | * @param <T> | |
197 | * The resulting type of preprocessing. | |
198 | */ | |
199 | 0 | protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends |
200 | TextEdgeReader { | |
201 | /** Generic type holding processed line */ | |
202 | private T processedLine; | |
203 | ||
204 | @Override | |
205 | public I getCurrentSourceId() throws IOException, InterruptedException { | |
206 | 0 | T processed = processCurrentLine(); |
207 | 0 | return getSourceVertexId(processed); |
208 | } | |
209 | ||
210 | @Override | |
211 | public final Edge<I, E> getCurrentEdge() throws IOException, | |
212 | InterruptedException { | |
213 | 0 | T processed = processCurrentLine(); |
214 | 0 | I targetVertexId = getTargetVertexId(processed); |
215 | 0 | E edgeValue = getValue(processed); |
216 | 0 | return EdgeFactory.create(targetVertexId, edgeValue); |
217 | } | |
218 | ||
219 | /** | |
220 | * Process the current line to the user's type. | |
221 | * | |
222 | * @return T processed line | |
223 | * @throws IOException on I/O error | |
224 | * @throws InterruptedException on interruption | |
225 | */ | |
226 | private T processCurrentLine() throws IOException, InterruptedException { | |
227 | 0 | if (processedLine == null) { |
228 | 0 | Text line = getRecordReader().getCurrentValue(); |
229 | 0 | processedLine = preprocessLine(line); |
230 | } | |
231 | 0 | return processedLine; |
232 | } | |
233 | ||
234 | @Override | |
235 | public final boolean nextEdge() throws IOException, InterruptedException { | |
236 | 0 | processedLine = null; |
237 | 0 | return getRecordReader().nextKeyValue(); |
238 | } | |
239 | ||
240 | /** | |
241 | * Preprocess the line so other methods can easily read necessary | |
242 | * information for creating edge | |
243 | * | |
244 | * @param line | |
245 | * the current line to be read | |
246 | * @return | |
247 | * the preprocessed object | |
248 | * @throws IOException | |
249 | * exception that can be thrown while reading | |
250 | */ | |
251 | protected abstract T preprocessLine(Text line) throws IOException; | |
252 | ||
253 | /** | |
254 | * Reads target vertex id from the preprocessed line. | |
255 | * | |
256 | * @param line | |
257 | * the object obtained by preprocessing the line | |
258 | * @return | |
259 | * the target vertex id | |
260 | * @throws IOException | |
261 | * exception that can be thrown while reading | |
262 | */ | |
263 | protected abstract I getTargetVertexId(T line) throws IOException; | |
264 | ||
265 | /** | |
266 | * Reads source vertex id from the preprocessed line. | |
267 | * | |
268 | * @param line | |
269 | * the object obtained by preprocessing the line | |
270 | * @return | |
271 | * the source vertex id | |
272 | * @throws IOException | |
273 | * exception that can be thrown while reading | |
274 | */ | |
275 | protected abstract I getSourceVertexId(T line) throws IOException; | |
276 | ||
277 | /** | |
278 | * Reads edge value from the preprocessed line. | |
279 | * | |
280 | * @param line | |
281 | * the object obtained by preprocessing the line | |
282 | * @return | |
283 | * the edge value | |
284 | * @throws IOException | |
285 | * exception that can be thrown while reading | |
286 | */ | |
287 | protected abstract E getValue(T line) throws IOException; | |
288 | } | |
289 | } |