1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io;
20
21 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.io.Text;
24 import org.apache.hadoop.io.Writable;
25 import org.apache.hadoop.io.WritableComparable;
26 import org.apache.hadoop.mapreduce.InputSplit;
27 import org.apache.hadoop.mapreduce.JobContext;
28 import org.apache.hadoop.util.ReflectionUtils;
29
30 import java.io.DataInput;
31 import java.io.DataOutput;
32 import java.io.IOException;
33 import java.util.List;
34
35
36
37
38
39
40
41
42 public abstract class GiraphInputFormat<I extends WritableComparable,
43 V extends Writable, E extends Writable> extends
44 DefaultImmutableClassesGiraphConfigurable<I, V, E> {
45
46
47
48
49
50 public abstract void checkInputSpecs(Configuration conf);
51
52
53
54
55
56
57
58
59
60
61 public abstract List<InputSplit> getSplits(JobContext context,
62 int minSplitCountHint) throws IOException, InterruptedException;
63
64
65
66
67
68
69
70 public void writeInputSplit(InputSplit inputSplit,
71 DataOutput dataOutput) throws IOException {
72 Text.writeString(dataOutput, inputSplit.getClass().getName());
73 ((Writable) inputSplit).write(dataOutput);
74 }
75
76
77
78
79
80
81
82 public InputSplit readInputSplit(DataInput dataInput) throws IOException,
83 ClassNotFoundException {
84 String inputSplitClass = Text.readString(dataInput);
85 InputSplit inputSplit = (InputSplit) ReflectionUtils.newInstance(
86 getConf().getClassByName(inputSplitClass), getConf());
87 ((Writable) inputSplit).readFields(dataInput);
88 return inputSplit;
89 }
90 }