1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats.multi;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.io.VertexInputFormat;
23 import org.apache.giraph.io.VertexReader;
24 import org.apache.giraph.io.internal.WrappedVertexReader;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.mapreduce.InputSplit;
29 import org.apache.hadoop.mapreduce.JobContext;
30 import org.apache.hadoop.mapreduce.TaskAttemptContext;
31
32 import java.io.DataInput;
33 import java.io.DataOutput;
34 import java.io.IOException;
35 import java.util.List;
36
37
38
39
40
41
42
43
44
45
46 public class MultiVertexInputFormat<I extends WritableComparable,
47 V extends Writable, E extends Writable> extends VertexInputFormat<I, V, E> {
48
49 private List<VertexInputFormat<I, V, E>> vertexInputFormats;
50
51 @Override public void checkInputSpecs(Configuration conf) {
52 for (VertexInputFormat vertexInputFormat : vertexInputFormats) {
53 vertexInputFormat.checkInputSpecs(conf);
54 }
55 }
56
57 @Override
58 public void setConf(
59 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
60 super.setConf(conf);
61 vertexInputFormats =
62 VertexInputFormatDescription.createVertexInputFormats(getConf());
63 if (vertexInputFormats.isEmpty()) {
64 throw new IllegalStateException("setConf: Using MultiVertexInputFormat " +
65 "without specifying vertex inputs");
66 }
67 }
68
69 @Override
70 public VertexReader<I, V, E> createVertexReader(InputSplit inputSplit,
71 TaskAttemptContext context) throws IOException {
72 if (inputSplit instanceof InputSplitWithInputFormatIndex) {
73
74
75 synchronized (context) {
76 InputSplitWithInputFormatIndex split =
77 (InputSplitWithInputFormatIndex) inputSplit;
78 VertexInputFormat<I, V, E> vertexInputFormat =
79 vertexInputFormats.get(split.getInputFormatIndex());
80 VertexReader<I, V, E> vertexReader =
81 vertexInputFormat.createVertexReader(split.getSplit(), context);
82 return new WrappedVertexReader<I, V, E>(
83 vertexReader, vertexInputFormat.getConf()) {
84 @Override
85 public void initialize(InputSplit inputSplit,
86 TaskAttemptContext context) throws IOException,
87 InterruptedException {
88
89
90 synchronized (context) {
91 super.initialize(inputSplit, context);
92 }
93 }
94 };
95 }
96 } else {
97 throw new IllegalStateException("createVertexReader: Got InputSplit " +
98 "which was not created by this class: " +
99 inputSplit.getClass().getName());
100 }
101 }
102
103 @Override
104 public List<InputSplit> getSplits(JobContext context,
105 int minSplitCountHint) throws IOException, InterruptedException {
106
107
108 synchronized (context) {
109 return MultiInputUtils.getSplits(
110 context, minSplitCountHint, vertexInputFormats);
111 }
112 }
113
114 @Override
115 public void writeInputSplit(InputSplit inputSplit,
116 DataOutput dataOutput) throws IOException {
117 MultiInputUtils.writeInputSplit(inputSplit, dataOutput, vertexInputFormats);
118 }
119
120 @Override
121 public InputSplit readInputSplit(
122 DataInput dataInput) throws IOException, ClassNotFoundException {
123 return MultiInputUtils.readInputSplit(dataInput, vertexInputFormats);
124 }
125 }