1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import org.apache.giraph.bsp.ImmutableOutputCommitter;
22 import org.apache.giraph.conf.GiraphConfiguration;
23 import org.apache.giraph.graph.Vertex;
24 import org.apache.giraph.io.VertexOutputFormat;
25 import org.apache.giraph.io.VertexWriter;
26 import org.apache.giraph.utils.TestGraph;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.hadoop.mapreduce.JobContext;
30 import org.apache.hadoop.mapreduce.OutputCommitter;
31 import org.apache.hadoop.mapreduce.TaskAttemptContext;
32
33 import java.io.IOException;
34
35
36
37
38
39
40
41
42 public class InMemoryVertexOutputFormat<I extends WritableComparable,
43 V extends Writable, E extends Writable> extends
44 VertexOutputFormat<I, V, E> {
45
46 private static TestGraph OUTPUT_GRAPH;
47
48
49
50
51
52
53
54 public static void initializeOutputGraph(GiraphConfiguration conf) {
55 OUTPUT_GRAPH = new TestGraph(conf);
56 }
57
58
59
60
61
62
63
64
65
66 public static <I extends WritableComparable, V extends Writable,
67 E extends Writable> TestGraph<I, V, E> getOutputGraph() {
68 return OUTPUT_GRAPH;
69 }
70
71 @Override
72 public VertexWriter<I, V, E> createVertexWriter(
73 TaskAttemptContext context) throws IOException, InterruptedException {
74 return new VertexWriter<I, V, E>() {
75 @Override
76 public void initialize(
77 TaskAttemptContext context) throws IOException, InterruptedException {
78 }
79
80 @Override
81 public void close(
82 TaskAttemptContext context) throws IOException, InterruptedException {
83 }
84
85 @Override
86 public void writeVertex(
87 Vertex<I, V, E> vertex) throws IOException, InterruptedException {
88 synchronized (OUTPUT_GRAPH) {
89 OUTPUT_GRAPH.addVertex(vertex);
90 }
91 }
92 };
93 }
94
95 @Override
96 public void checkOutputSpecs(
97 JobContext context) throws IOException, InterruptedException {
98 }
99
100 @Override
101 public OutputCommitter getOutputCommitter(
102 TaskAttemptContext context) throws IOException, InterruptedException {
103 return new ImmutableOutputCommitter();
104 }
105 }