1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.bsp;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.hadoop.io.Text;
23 import org.apache.hadoop.mapreduce.JobContext;
24 import org.apache.hadoop.mapreduce.OutputCommitter;
25 import org.apache.hadoop.mapreduce.OutputFormat;
26 import org.apache.hadoop.mapreduce.RecordWriter;
27 import org.apache.hadoop.mapreduce.TaskAttemptContext;
28 import org.apache.log4j.Logger;
29
30 import java.io.IOException;
31
32
33
34
35
36 public class BspOutputFormat extends OutputFormat<Text, Text> {
37
38 private static Logger LOG = Logger.getLogger(BspOutputFormat.class);
39
40 @Override
41 public void checkOutputSpecs(JobContext context)
42 throws IOException, InterruptedException {
43 ImmutableClassesGiraphConfiguration conf =
44 new ImmutableClassesGiraphConfiguration(context.getConfiguration());
45 if (!conf.hasVertexOutputFormat() && !conf.hasEdgeOutputFormat()) {
46 LOG.warn("checkOutputSpecs: ImmutableOutputCommiter" +
47 " will not check anything");
48 return;
49 }
50
51 if (conf.hasVertexOutputFormat()) {
52 conf.createWrappedVertexOutputFormat().checkOutputSpecs(context);
53 }
54 if (conf.hasEdgeOutputFormat()) {
55 conf.createWrappedEdgeOutputFormat().checkOutputSpecs(context);
56 }
57 }
58
59 @Override
60 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
61 throws IOException, InterruptedException {
62 ImmutableClassesGiraphConfiguration conf =
63 new ImmutableClassesGiraphConfiguration(context.getConfiguration());
64 if (!conf.hasVertexOutputFormat() && !conf.hasEdgeOutputFormat()) {
65 LOG.warn("getOutputCommitter: Returning " +
66 "ImmutableOutputCommiter (does nothing).");
67 return new ImmutableOutputCommitter();
68 }
69
70 if (conf.hasVertexOutputFormat()) {
71 return conf.createWrappedVertexOutputFormat().getOutputCommitter(context);
72 } else {
73 return conf.createWrappedEdgeOutputFormat().getOutputCommitter(context);
74 }
75 }
76
77 @Override
78 public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
79 throws IOException, InterruptedException {
80 return new BspRecordWriter();
81 }
82 }