1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats.multi;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.io.MappingInputFormat;
23 import org.apache.giraph.io.MappingReader;
24 import org.apache.giraph.io.internal.WrappedMappingReader;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.mapreduce.InputSplit;
29 import org.apache.hadoop.mapreduce.JobContext;
30 import org.apache.hadoop.mapreduce.TaskAttemptContext;
31
32 import java.io.DataInput;
33 import java.io.DataOutput;
34 import java.io.IOException;
35 import java.util.List;
36
37
38
39
40
41
42
43
44
45
46
47 public class MultiMappingInputFormat<I extends WritableComparable,
48 V extends Writable, E extends Writable, B extends Writable>
49 extends MappingInputFormat<I, V, E, B> {
50
51
52 private List<MappingInputFormat<I, V, E, B>> mappingInputFormats;
53
54
55
56
57 public MultiMappingInputFormat() {
58 }
59
60 @Override
61 public void setConf(
62 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
63 super.setConf(conf);
64 mappingInputFormats =
65 MappingInputFormatDescription.createMappingInputFormats(getConf());
66 if (mappingInputFormats.isEmpty()) {
67 throw new IllegalStateException("setConf: Using MultiVertexInputFormat " +
68 "without specifying vertex inputs");
69 }
70 }
71
72 @Override
73 public MappingReader createMappingReader(
74 InputSplit inputSplit, TaskAttemptContext context
75 ) throws IOException {
76 if (inputSplit instanceof InputSplitWithInputFormatIndex) {
77
78
79 synchronized (context) {
80 InputSplitWithInputFormatIndex split =
81 (InputSplitWithInputFormatIndex) inputSplit;
82 MappingInputFormat<I, V, E, B> mappingInputFormat =
83 mappingInputFormats.get(split.getInputFormatIndex());
84 MappingReader<I, V, E, B> mappingReader =
85 mappingInputFormat.createMappingReader(split.getSplit(), context);
86 return new WrappedMappingReader<I, V, E, B>(
87 mappingReader, mappingInputFormat.getConf()) {
88 @Override
89 public void initialize(InputSplit inputSplit,
90 TaskAttemptContext context) throws IOException,
91 InterruptedException {
92
93
94 synchronized (context) {
95 super.initialize(inputSplit, context);
96 }
97 }
98 };
99 }
100 } else {
101 throw new IllegalStateException("createVertexReader: Got InputSplit " +
102 "which was not created by this class: " +
103 inputSplit.getClass().getName());
104 }
105 }
106
107 @Override
108 public void checkInputSpecs(Configuration conf) {
109 for (MappingInputFormat mappingInputFormat : mappingInputFormats) {
110 mappingInputFormat.checkInputSpecs(conf);
111 }
112 }
113
114 @Override
115 public List<InputSplit> getSplits(
116 JobContext context, int minSplitCountHint
117 ) throws IOException, InterruptedException {
118 synchronized (context) {
119 return MultiInputUtils.getSplits(
120 context, minSplitCountHint, mappingInputFormats);
121 }
122 }
123
124 @Override
125 public void writeInputSplit(InputSplit inputSplit, DataOutput dataOutput)
126 throws IOException {
127 MultiInputUtils.writeInputSplit(
128 inputSplit, dataOutput, mappingInputFormats);
129 }
130
131 @Override
132 public InputSplit readInputSplit(
133 DataInput dataInput) throws IOException, ClassNotFoundException {
134 return MultiInputUtils.readInputSplit(dataInput, mappingInputFormats);
135 }
136 }