1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import org.apache.giraph.io.VertexValueInputFormat;
22 import org.apache.giraph.io.VertexValueReader;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.io.LongWritable;
25 import org.apache.hadoop.io.Text;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.mapreduce.InputSplit;
29 import org.apache.hadoop.mapreduce.JobContext;
30 import org.apache.hadoop.mapreduce.RecordReader;
31 import org.apache.hadoop.mapreduce.TaskAttemptContext;
32
33 import java.io.IOException;
34 import java.util.List;
35
36
37
38
39
40
41
42
43
44 @SuppressWarnings("rawtypes")
45 public abstract class TextVertexValueInputFormat<I extends WritableComparable,
46 V extends Writable, E extends Writable>
47 extends VertexValueInputFormat<I, V> {
48
49 protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
50
51 @Override public void checkInputSpecs(Configuration conf) { }
52
53 @Override
54 public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
55 throws IOException, InterruptedException {
56
57
58 return textInputFormat.getVertexSplits(context);
59 }
60
61 @Override
62 public abstract TextVertexValueReader createVertexValueReader(
63 InputSplit split, TaskAttemptContext context) throws IOException;
64
65
66
67
68 protected abstract class TextVertexValueReader extends
69 VertexValueReader<I, V> {
70
71 private RecordReader<LongWritable, Text> lineRecordReader;
72
73 private TaskAttemptContext context;
74
75 @Override
76 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
77 throws IOException, InterruptedException {
78 super.initialize(inputSplit, context);
79 this.context = context;
80 lineRecordReader = createLineRecordReader(inputSplit, context);
81 lineRecordReader.initialize(inputSplit, context);
82 }
83
84
85
86
87
88
89
90
91
92
93
94 protected RecordReader<LongWritable, Text>
95 createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
96 throws IOException, InterruptedException {
97 return textInputFormat.createRecordReader(inputSplit, context);
98 }
99
100 @Override
101 public void close() throws IOException {
102 lineRecordReader.close();
103 }
104
105 @Override
106 public float getProgress() throws IOException, InterruptedException {
107 return lineRecordReader.getProgress();
108 }
109
110
111
112
113
114
115 protected RecordReader<LongWritable, Text> getRecordReader() {
116 return lineRecordReader;
117 }
118
119
120
121
122
123
124 protected TaskAttemptContext getContext() {
125 return context;
126 }
127 }
128
129
130
131
132
133 protected abstract class TextVertexValueReaderFromEachLine extends
134 TextVertexValueReader {
135 @Override
136 public final I getCurrentVertexId() throws IOException,
137 InterruptedException {
138 return getId(getRecordReader().getCurrentValue());
139 }
140
141 @Override
142 public final V getCurrentVertexValue() throws IOException,
143 InterruptedException {
144 return getValue(getRecordReader().getCurrentValue());
145 }
146
147 @Override
148 public final boolean nextVertex() throws IOException, InterruptedException {
149 return getRecordReader().nextKeyValue();
150 }
151
152
153
154
155
156
157
158
159 protected abstract I getId(Text line) throws IOException;
160
161
162
163
164
165
166
167
168
169 protected abstract V getValue(Text line) throws IOException;
170 }
171
172
173
174
175
176
177
178 protected abstract class TextVertexValueReaderFromEachLineProcessed<T>
179 extends TextVertexValueReader {
180
181 private T processedLine = null;
182
183
184
185
186
187
188
189 private T getProcessedLine() throws IOException, InterruptedException {
190 if (processedLine == null) {
191 processedLine = preprocessLine(getRecordReader().getCurrentValue());
192 }
193 return processedLine;
194 }
195
196 @Override
197 public I getCurrentVertexId() throws IOException,
198 InterruptedException {
199 return getId(getProcessedLine());
200 }
201
202 @Override
203 public V getCurrentVertexValue() throws IOException,
204 InterruptedException {
205 return getValue(getProcessedLine());
206 }
207
208 @Override
209 public final boolean nextVertex() throws IOException, InterruptedException {
210 processedLine = null;
211 return getRecordReader().nextKeyValue();
212 }
213
214
215
216
217
218
219
220
221
222 protected abstract T preprocessLine(Text line) throws IOException;
223
224
225
226
227
228
229
230
231
232 protected abstract I getId(T line) throws IOException;
233
234
235
236
237
238
239
240
241 protected abstract V getValue(T line) throws IOException;
242 }
243 }