1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.formats;
19
20 import java.io.IOException;
21 import org.apache.giraph.graph.Vertex;
22 import org.apache.giraph.io.VertexOutputFormat;
23 import org.apache.giraph.io.VertexWriter;
24 import org.apache.hadoop.io.Writable;
25 import org.apache.hadoop.io.WritableComparable;
26 import org.apache.hadoop.mapreduce.JobContext;
27 import org.apache.hadoop.mapreduce.OutputCommitter;
28 import org.apache.hadoop.mapreduce.RecordWriter;
29 import org.apache.hadoop.mapreduce.TaskAttemptContext;
30 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
31
32
33
34
35
36
37
38
39
40
41
42
43
44 public abstract class SequenceFileVertexOutputFormat<
45 I extends WritableComparable,
46 V extends Writable,
47 E extends Writable,
48 OK extends Writable,
49 OV extends Writable>
50 extends VertexOutputFormat<I, V, E> {
51
52
53
54
55 private SequenceFileOutputFormat<OK, OV> sequenceFileOutputFormat =
56 new SequenceFileOutputFormat<OK, OV>();
57
58 @Override
59 public void checkOutputSpecs(JobContext context)
60 throws IOException, InterruptedException {
61 sequenceFileOutputFormat.checkOutputSpecs(context);
62 }
63
64 @Override
65 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
66 throws IOException, InterruptedException {
67 return sequenceFileOutputFormat.getOutputCommitter(context);
68 }
69
70 @Override
71 public VertexWriter createVertexWriter(TaskAttemptContext
72 context) throws IOException, InterruptedException {
73 return new SequenceFileVertexWriter();
74 }
75
76
77
78
79
80
81 protected abstract OK convertToSequenceFileKey(I vertexId);
82
83
84
85
86
87
88 protected abstract OV convertToSequenceFileValue(V vertexValue);
89
90
91
92
93
94 private class SequenceFileVertexWriter extends VertexWriter<I, V, E> {
95
96
97
98
99 private RecordWriter<OK, OV> recordWriter;
100
101 @Override
102 public void initialize(TaskAttemptContext context) throws IOException,
103 InterruptedException {
104 recordWriter = sequenceFileOutputFormat.getRecordWriter(context);
105 }
106
107 @Override
108 public final void writeVertex(Vertex<I, V, E> vertex) throws
109 IOException, InterruptedException {
110
111 OK outKey = convertToSequenceFileKey(vertex.getId());
112
113 OV outValue = convertToSequenceFileValue(vertex.getValue());
114 recordWriter.write(outKey, outValue);
115 }
116
117 @Override
118 public void close(TaskAttemptContext context) throws IOException,
119 InterruptedException {
120 recordWriter.close(context);
121 }
122 }
123 }