1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats.multi;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.io.EdgeInputFormat;
23 import org.apache.giraph.io.EdgeReader;
24 import org.apache.giraph.io.internal.WrappedEdgeReader;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.mapreduce.InputSplit;
29 import org.apache.hadoop.mapreduce.JobContext;
30 import org.apache.hadoop.mapreduce.TaskAttemptContext;
31
32 import java.io.DataInput;
33 import java.io.DataOutput;
34 import java.io.IOException;
35 import java.util.List;
36
37
38
39
40
41
42
43
44
45 public class MultiEdgeInputFormat<I extends WritableComparable,
46 E extends Writable> extends EdgeInputFormat<I, E> {
47
48 private List<EdgeInputFormat<I, E>> edgeInputFormats;
49
50 @Override public void checkInputSpecs(Configuration conf) {
51 for (EdgeInputFormat edgeInputFormat : edgeInputFormats) {
52 edgeInputFormat.checkInputSpecs(conf);
53 }
54 }
55
56 @Override
57 public void setConf(
58 ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
59 super.setConf(conf);
60 edgeInputFormats =
61 EdgeInputFormatDescription.createEdgeInputFormats(getConf());
62 if (edgeInputFormats.isEmpty()) {
63 throw new IllegalStateException("setConf: Using MultiEdgeInputFormat " +
64 "without specifying edge inputs");
65 }
66 }
67
68 @Override
69 public EdgeReader<I, E> createEdgeReader(InputSplit inputSplit,
70 TaskAttemptContext context) throws IOException {
71 if (inputSplit instanceof InputSplitWithInputFormatIndex) {
72
73
74 synchronized (context) {
75 InputSplitWithInputFormatIndex split =
76 (InputSplitWithInputFormatIndex) inputSplit;
77 EdgeInputFormat<I, E> edgeInputFormat =
78 edgeInputFormats.get(split.getInputFormatIndex());
79 EdgeReader<I, E> edgeReader =
80 edgeInputFormat.createEdgeReader(split.getSplit(), context);
81 return new WrappedEdgeReader<I, E>(
82 edgeReader, edgeInputFormat.getConf()) {
83 @Override
84 public void initialize(InputSplit inputSplit,
85 TaskAttemptContext context) throws IOException,
86 InterruptedException {
87
88
89 synchronized (context) {
90 super.initialize(inputSplit, context);
91 }
92 }
93 };
94 }
95 } else {
96 throw new IllegalStateException("createEdgeReader: Got InputSplit which" +
97 " was not created by this class: " + inputSplit.getClass().getName());
98 }
99 }
100
101 @Override
102 public List<InputSplit> getSplits(JobContext context,
103 int minSplitCountHint) throws IOException, InterruptedException {
104
105
106 synchronized (context) {
107 return MultiInputUtils.getSplits(
108 context, minSplitCountHint, edgeInputFormats);
109 }
110 }
111
112 @Override
113 public void writeInputSplit(InputSplit inputSplit,
114 DataOutput dataOutput) throws IOException {
115 MultiInputUtils.writeInputSplit(inputSplit, dataOutput, edgeInputFormats);
116 }
117
118 @Override
119 public InputSplit readInputSplit(
120 DataInput dataInput) throws IOException, ClassNotFoundException {
121 return MultiInputUtils.readInputSplit(dataInput, edgeInputFormats);
122 }
123 }