1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.io.hcatalog; |
20 | |
|
21 | |
import org.apache.hadoop.conf.Configuration; |
22 | |
import org.apache.hadoop.fs.FileSystem; |
23 | |
import org.apache.hadoop.fs.Path; |
24 | |
import org.apache.hadoop.io.WritableComparable; |
25 | |
import org.apache.hadoop.mapred.JobConf; |
26 | |
import org.apache.hadoop.mapreduce.InputSplit; |
27 | |
import org.apache.hadoop.mapreduce.Job; |
28 | |
import org.apache.hadoop.mapreduce.JobContext; |
29 | |
import org.apache.hadoop.mapreduce.RecordReader; |
30 | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; |
31 | |
import org.apache.hadoop.util.StringUtils; |
32 | |
import org.apache.hcatalog.common.HCatConstants; |
33 | |
import org.apache.hcatalog.common.HCatUtil; |
34 | |
import org.apache.hcatalog.data.HCatRecord; |
35 | |
import org.apache.hcatalog.data.schema.HCatFieldSchema; |
36 | |
import org.apache.hcatalog.data.schema.HCatSchema; |
37 | |
import org.apache.hcatalog.mapreduce.HCatBaseInputFormat; |
38 | |
import org.apache.hcatalog.mapreduce.HCatSplit; |
39 | |
import org.apache.hcatalog.mapreduce.HCatStorageHandler; |
40 | |
import org.apache.hcatalog.mapreduce.HCatUtils; |
41 | |
import org.apache.hcatalog.mapreduce.InputJobInfo; |
42 | |
import org.apache.hcatalog.mapreduce.PartInfo; |
43 | |
|
44 | |
import java.io.IOException; |
45 | |
import java.util.ArrayList; |
46 | |
import java.util.HashMap; |
47 | |
import java.util.LinkedList; |
48 | |
import java.util.List; |
49 | |
import java.util.Map; |
50 | |
|
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | 0 | public class GiraphHCatInputFormat extends HCatBaseInputFormat { |
57 | |
|
58 | |
public static final String VERTEX_INPUT_JOB_INFO = |
59 | |
"giraph.hcat.vertex.input.job.info"; |
60 | |
|
61 | |
public static final String EDGE_INPUT_JOB_INFO = |
62 | |
"giraph.hcat.edge.input.job.info"; |
63 | |
|
64 | |
|
65 | |
|
66 | |
|
67 | |
|
68 | |
|
69 | |
|
70 | |
|
71 | |
public static void setVertexInput(Job job, |
72 | |
InputJobInfo inputJobInfo) |
73 | |
throws IOException { |
74 | 0 | InputJobInfo vertexInputJobInfo = InputJobInfo.create( |
75 | 0 | inputJobInfo.getDatabaseName(), |
76 | 0 | inputJobInfo.getTableName(), |
77 | 0 | inputJobInfo.getFilter()); |
78 | 0 | vertexInputJobInfo.getProperties().putAll(inputJobInfo.getProperties()); |
79 | 0 | Configuration conf = job.getConfiguration(); |
80 | 0 | conf.set(VERTEX_INPUT_JOB_INFO, HCatUtil.serialize( |
81 | 0 | HCatUtils.getInputJobInfo(conf, vertexInputJobInfo))); |
82 | 0 | } |
83 | |
|
84 | |
|
85 | |
|
86 | |
|
87 | |
|
88 | |
|
89 | |
|
90 | |
|
91 | |
public static void setEdgeInput(Job job, |
92 | |
InputJobInfo inputJobInfo) |
93 | |
throws IOException { |
94 | 0 | InputJobInfo edgeInputJobInfo = InputJobInfo.create( |
95 | 0 | inputJobInfo.getDatabaseName(), |
96 | 0 | inputJobInfo.getTableName(), |
97 | 0 | inputJobInfo.getFilter()); |
98 | 0 | edgeInputJobInfo.getProperties().putAll(inputJobInfo.getProperties()); |
99 | 0 | Configuration conf = job.getConfiguration(); |
100 | 0 | conf.set(EDGE_INPUT_JOB_INFO, HCatUtil.serialize( |
101 | 0 | HCatUtils.getInputJobInfo(conf, edgeInputJobInfo))); |
102 | 0 | } |
103 | |
|
104 | |
|
105 | |
|
106 | |
|
107 | |
|
108 | |
|
109 | |
|
110 | |
|
111 | |
private static HCatSchema getTableSchema(InputJobInfo inputJobInfo) |
112 | |
throws IOException { |
113 | 0 | HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>()); |
114 | |
for (HCatFieldSchema field : |
115 | 0 | inputJobInfo.getTableInfo().getDataColumns().getFields()) { |
116 | 0 | allCols.append(field); |
117 | 0 | } |
118 | |
for (HCatFieldSchema field : |
119 | 0 | inputJobInfo.getTableInfo().getPartitionColumns().getFields()) { |
120 | 0 | allCols.append(field); |
121 | 0 | } |
122 | 0 | return allCols; |
123 | |
} |
124 | |
|
125 | |
|
126 | |
|
127 | |
|
128 | |
|
129 | |
|
130 | |
|
131 | |
|
132 | |
public static HCatSchema getVertexTableSchema(Configuration conf) |
133 | |
throws IOException { |
134 | 0 | return getTableSchema(getVertexJobInfo(conf)); |
135 | |
} |
136 | |
|
137 | |
|
138 | |
|
139 | |
|
140 | |
|
141 | |
|
142 | |
|
143 | |
|
144 | |
public static HCatSchema getEdgeTableSchema(Configuration conf) |
145 | |
throws IOException { |
146 | 0 | return getTableSchema(getEdgeJobInfo(conf)); |
147 | |
} |
148 | |
|
149 | |
|
150 | |
|
151 | |
|
152 | |
|
153 | |
|
154 | |
|
155 | |
|
156 | |
private void setInputPath(JobConf jobConf, String location) |
157 | |
throws IOException { |
158 | 0 | int length = location.length(); |
159 | 0 | int curlyOpen = 0; |
160 | 0 | int pathStart = 0; |
161 | 0 | boolean globPattern = false; |
162 | 0 | List<String> pathStrings = new ArrayList<String>(); |
163 | |
|
164 | 0 | for (int i = 0; i < length; i++) { |
165 | 0 | char ch = location.charAt(i); |
166 | 0 | switch (ch) { |
167 | |
case '{': |
168 | 0 | curlyOpen++; |
169 | 0 | if (!globPattern) { |
170 | 0 | globPattern = true; |
171 | |
} |
172 | |
break; |
173 | |
case '}': |
174 | 0 | curlyOpen--; |
175 | 0 | if (curlyOpen == 0 && globPattern) { |
176 | 0 | globPattern = false; |
177 | |
} |
178 | |
break; |
179 | |
case ',': |
180 | 0 | if (!globPattern) { |
181 | 0 | pathStrings.add(location.substring(pathStart, i)); |
182 | 0 | pathStart = i + 1; |
183 | |
} |
184 | |
break; |
185 | |
default: |
186 | |
} |
187 | |
} |
188 | 0 | pathStrings.add(location.substring(pathStart, length)); |
189 | |
|
190 | 0 | Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0])); |
191 | |
|
192 | 0 | FileSystem fs = FileSystem.get(jobConf); |
193 | 0 | Path path = paths[0].makeQualified(fs); |
194 | 0 | StringBuilder str = new StringBuilder(StringUtils.escapeString( |
195 | 0 | path.toString())); |
196 | 0 | for (int i = 1; i < paths.length; i++) { |
197 | 0 | str.append(StringUtils.COMMA_STR); |
198 | 0 | path = paths[i].makeQualified(fs); |
199 | 0 | str.append(StringUtils.escapeString(path.toString())); |
200 | |
} |
201 | |
|
202 | 0 | jobConf.set("mapred.input.dir", str.toString()); |
203 | 0 | } |
204 | |
|
205 | |
|
206 | |
|
207 | |
|
208 | |
|
209 | |
|
210 | |
|
211 | |
|
212 | |
|
213 | |
|
214 | |
private List<InputSplit> getSplits(JobContext jobContext, |
215 | |
InputJobInfo inputJobInfo) |
216 | |
throws IOException, InterruptedException { |
217 | 0 | Configuration conf = jobContext.getConfiguration(); |
218 | |
|
219 | 0 | List<InputSplit> splits = new ArrayList<InputSplit>(); |
220 | 0 | List<PartInfo> partitionInfoList = inputJobInfo.getPartitions(); |
221 | 0 | if (partitionInfoList == null) { |
222 | |
|
223 | 0 | return splits; |
224 | |
} |
225 | |
|
226 | |
HCatStorageHandler storageHandler; |
227 | |
JobConf jobConf; |
228 | |
|
229 | 0 | for (PartInfo partitionInfo : partitionInfoList) { |
230 | 0 | jobConf = HCatUtil.getJobConfFromContext(jobContext); |
231 | 0 | setInputPath(jobConf, partitionInfo.getLocation()); |
232 | 0 | Map<String, String> jobProperties = partitionInfo.getJobProperties(); |
233 | |
|
234 | 0 | HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>()); |
235 | |
for (HCatFieldSchema field : |
236 | 0 | inputJobInfo.getTableInfo().getDataColumns().getFields()) { |
237 | 0 | allCols.append(field); |
238 | 0 | } |
239 | |
for (HCatFieldSchema field : |
240 | 0 | inputJobInfo.getTableInfo().getPartitionColumns().getFields()) { |
241 | 0 | allCols.append(field); |
242 | 0 | } |
243 | |
|
244 | 0 | HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); |
245 | |
|
246 | 0 | storageHandler = HCatUtil.getStorageHandler( |
247 | |
jobConf, partitionInfo); |
248 | |
|
249 | |
|
250 | 0 | Class inputFormatClass = storageHandler.getInputFormatClass(); |
251 | 0 | org.apache.hadoop.mapred.InputFormat inputFormat = |
252 | 0 | getMapRedInputFormat(jobConf, inputFormatClass); |
253 | |
|
254 | |
|
255 | |
|
256 | |
|
257 | |
|
258 | |
|
259 | |
|
260 | |
|
261 | 0 | int desiredNumSplits = |
262 | 0 | conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0); |
263 | 0 | org.apache.hadoop.mapred.InputSplit[] baseSplits = |
264 | 0 | inputFormat.getSplits(jobConf, desiredNumSplits); |
265 | |
|
266 | 0 | for (org.apache.hadoop.mapred.InputSplit split : baseSplits) { |
267 | 0 | splits.add(new HCatSplit(partitionInfo, split, allCols)); |
268 | |
} |
269 | 0 | } |
270 | |
|
271 | 0 | return splits; |
272 | |
} |
273 | |
|
274 | |
|
275 | |
|
276 | |
|
277 | |
|
278 | |
|
279 | |
|
280 | |
|
281 | |
private static InputJobInfo getVertexJobInfo(Configuration conf) |
282 | |
throws IOException { |
283 | 0 | String jobString = conf.get(VERTEX_INPUT_JOB_INFO); |
284 | 0 | if (jobString == null) { |
285 | 0 | throw new IOException("Vertex job information not found in JobContext." + |
286 | |
" GiraphHCatInputFormat.setVertexInput() not called?"); |
287 | |
} |
288 | 0 | return (InputJobInfo) HCatUtil.deserialize(jobString); |
289 | |
} |
290 | |
|
291 | |
|
292 | |
|
293 | |
|
294 | |
|
295 | |
|
296 | |
|
297 | |
|
298 | |
private static InputJobInfo getEdgeJobInfo(Configuration conf) |
299 | |
throws IOException { |
300 | 0 | String jobString = conf.get(EDGE_INPUT_JOB_INFO); |
301 | 0 | if (jobString == null) { |
302 | 0 | throw new IOException("Edge job information not found in JobContext." + |
303 | |
" GiraphHCatInputFormat.setEdgeInput() not called?"); |
304 | |
} |
305 | 0 | return (InputJobInfo) HCatUtil.deserialize(jobString); |
306 | |
} |
307 | |
|
308 | |
|
309 | |
|
310 | |
|
311 | |
|
312 | |
|
313 | |
|
314 | |
|
315 | |
|
316 | |
public List<InputSplit> getVertexSplits(JobContext jobContext) |
317 | |
throws IOException, InterruptedException { |
318 | 0 | return getSplits(jobContext, |
319 | 0 | getVertexJobInfo(jobContext.getConfiguration())); |
320 | |
} |
321 | |
|
322 | |
|
323 | |
|
324 | |
|
325 | |
|
326 | |
|
327 | |
|
328 | |
|
329 | |
|
330 | |
public List<InputSplit> getEdgeSplits(JobContext jobContext) |
331 | |
throws IOException, InterruptedException { |
332 | 0 | return getSplits(jobContext, |
333 | 0 | getEdgeJobInfo(jobContext.getConfiguration())); |
334 | |
} |
335 | |
|
336 | |
|
337 | |
|
338 | |
|
339 | |
|
340 | |
|
341 | |
|
342 | |
|
343 | |
|
344 | |
|
345 | |
|
346 | |
private RecordReader<WritableComparable, HCatRecord> |
347 | |
createRecordReader(InputSplit split, |
348 | |
HCatSchema schema, |
349 | |
TaskAttemptContext taskContext) |
350 | |
throws IOException, InterruptedException { |
351 | 0 | HCatSplit hcatSplit = HCatUtils.castToHCatSplit(split); |
352 | 0 | PartInfo partitionInfo = hcatSplit.getPartitionInfo(); |
353 | 0 | JobContext jobContext = taskContext; |
354 | 0 | Configuration conf = jobContext.getConfiguration(); |
355 | |
|
356 | 0 | HCatStorageHandler storageHandler = HCatUtil.getStorageHandler( |
357 | |
conf, partitionInfo); |
358 | |
|
359 | 0 | JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext); |
360 | 0 | Map<String, String> jobProperties = partitionInfo.getJobProperties(); |
361 | 0 | HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); |
362 | |
|
363 | 0 | Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns( |
364 | |
schema, partitionInfo); |
365 | |
|
366 | 0 | return HCatUtils.newHCatReader(storageHandler, valuesNotInDataCols); |
367 | |
} |
368 | |
|
369 | |
|
370 | |
|
371 | |
|
372 | |
|
373 | |
|
374 | |
|
375 | |
|
376 | |
|
377 | |
|
378 | |
public RecordReader<WritableComparable, HCatRecord> |
379 | |
createVertexRecordReader(InputSplit split, TaskAttemptContext taskContext) |
380 | |
throws IOException, InterruptedException { |
381 | 0 | return createRecordReader(split, getVertexTableSchema( |
382 | 0 | taskContext.getConfiguration()), taskContext); |
383 | |
} |
384 | |
|
385 | |
|
386 | |
|
387 | |
|
388 | |
|
389 | |
|
390 | |
|
391 | |
|
392 | |
|
393 | |
|
394 | |
public RecordReader<WritableComparable, HCatRecord> |
395 | |
createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext) |
396 | |
throws IOException, InterruptedException { |
397 | 0 | return createRecordReader(split, getEdgeTableSchema( |
398 | 0 | taskContext.getConfiguration()), taskContext); |
399 | |
} |
400 | |
|
401 | |
|
402 | |
|
403 | |
|
404 | |
|
405 | |
|
406 | |
|
407 | |
|
408 | |
|
409 | |
private static Map<String, String> getColValsNotInDataColumns( |
410 | |
HCatSchema outputSchema, |
411 | |
PartInfo partInfo) { |
412 | 0 | HCatSchema dataSchema = partInfo.getPartitionSchema(); |
413 | 0 | Map<String, String> vals = new HashMap<String, String>(); |
414 | 0 | for (String fieldName : outputSchema.getFieldNames()) { |
415 | 0 | if (dataSchema.getPosition(fieldName) == null) { |
416 | |
|
417 | |
|
418 | 0 | if (partInfo.getPartitionValues().containsKey(fieldName)) { |
419 | 0 | vals.put(fieldName, partInfo.getPartitionValues().get(fieldName)); |
420 | |
} else { |
421 | 0 | vals.put(fieldName, null); |
422 | |
} |
423 | |
} |
424 | 0 | } |
425 | 0 | return vals; |
426 | |
} |
427 | |
} |