1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.io.formats; |
19 | |
|
20 | |
import org.apache.giraph.edge.Edge; |
21 | |
import org.apache.giraph.graph.Vertex; |
22 | |
import org.apache.hadoop.fs.FSDataOutputStream; |
23 | |
import org.apache.hadoop.fs.FileSystem; |
24 | |
import org.apache.hadoop.fs.Path; |
25 | |
import org.apache.hadoop.io.NullWritable; |
26 | |
import org.apache.hadoop.io.Text; |
27 | |
import org.apache.hadoop.io.Writable; |
28 | |
import org.apache.hadoop.io.WritableComparable; |
29 | |
import org.apache.hadoop.mapreduce.JobContext; |
30 | |
import org.apache.hadoop.mapreduce.JobStatus; |
31 | |
import org.apache.hadoop.mapreduce.OutputCommitter; |
32 | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; |
33 | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
34 | |
|
35 | |
import java.io.IOException; |
36 | |
|
37 | |
|
38 | |
|
39 | |
|
40 | |
|
41 | |
|
42 | |
|
43 | 0 | public class GraphvizOutputFormat extends TextVertexOutputFormat< |
44 | |
WritableComparable, Writable, Writable> { |
45 | |
|
46 | |
private static final String NODE_TEXT_COLOR = "blue:orange"; |
47 | |
|
48 | |
@Override |
49 | |
public TextVertexWriter createVertexWriter(TaskAttemptContext context) |
50 | |
throws IOException, InterruptedException { |
51 | 0 | return new VertexWriter(); |
52 | |
} |
53 | |
|
54 | |
@Override |
55 | |
public OutputCommitter getOutputCommitter(TaskAttemptContext context) |
56 | |
throws IOException, InterruptedException { |
57 | 0 | return new GraphvizOutputCommitter(super.getOutputCommitter(context)); |
58 | |
} |
59 | |
|
60 | |
|
61 | |
|
62 | |
|
63 | |
|
64 | |
|
65 | |
|
66 | |
private static Path getOutputDir(JobContext context) { |
67 | 0 | return FileOutputFormat.getOutputPath(context); |
68 | |
} |
69 | |
|
70 | |
|
71 | |
|
72 | |
|
73 | |
|
74 | |
|
75 | |
|
76 | |
private static Path getPathAtBeginning(JobContext context) { |
77 | 0 | return new Path(getOutputDir(context), "____" + System.currentTimeMillis()); |
78 | |
} |
79 | |
|
80 | |
|
81 | |
|
82 | |
|
83 | |
|
84 | |
|
85 | |
private static Path getPathAtEnd(JobContext context) { |
86 | 0 | return new Path(getOutputDir(context), "zzz_" + System.currentTimeMillis()); |
87 | |
} |
88 | |
|
89 | |
|
90 | |
|
91 | |
|
92 | |
|
93 | |
|
94 | |
private static void writeStart(JobContext context) throws IOException { |
95 | 0 | Path path = getPathAtBeginning(context); |
96 | 0 | FileSystem fs = path.getFileSystem(context.getConfiguration()); |
97 | 0 | FSDataOutputStream file = fs.create(path, false); |
98 | 0 | file.writeBytes("digraph g {\n"); |
99 | 0 | file.close(); |
100 | 0 | } |
101 | |
|
102 | |
|
103 | |
|
104 | |
|
105 | |
|
106 | |
|
107 | |
private static void writeEnd(JobContext context) throws IOException { |
108 | 0 | Path path = getPathAtEnd(context); |
109 | 0 | FileSystem fs = path.getFileSystem(context.getConfiguration()); |
110 | 0 | FSDataOutputStream file = fs.create(path, false); |
111 | 0 | file.writeBytes("}\n"); |
112 | 0 | file.close(); |
113 | 0 | } |
114 | |
|
115 | |
|
116 | |
|
117 | |
|
118 | |
|
119 | |
|
120 | |
private static void addNodeInfo( |
121 | |
Vertex<WritableComparable, Writable, Writable> vertex, StringBuilder sb) { |
122 | 0 | sb.append('"').append(vertex.getId()).append('"'); |
123 | 0 | sb.append(" [").append("label=").append('"').append("<id> "); |
124 | 0 | sb.append(vertex.getId()); |
125 | 0 | if (!(vertex.getValue() instanceof NullWritable)) { |
126 | 0 | sb.append("|").append(vertex.getValue()); |
127 | |
} |
128 | 0 | sb.append('"').append(",shape=record,fillcolor=") |
129 | 0 | .append('"').append(NODE_TEXT_COLOR).append('"') |
130 | 0 | .append("];"); |
131 | 0 | } |
132 | |
|
133 | |
|
134 | |
|
135 | |
|
136 | |
|
137 | |
|
138 | |
|
139 | |
private static void addEdge(StringBuilder sb, Writable sourceID, |
140 | |
Edge<WritableComparable, Writable> edge) { |
141 | 0 | sb.append(sourceID).append(":id") |
142 | 0 | .append(" -> ") |
143 | 0 | .append(edge.getTargetVertexId()).append(":id"); |
144 | 0 | addEdgeInfo(sb, edge); |
145 | 0 | sb.append("\n"); |
146 | 0 | } |
147 | |
|
148 | |
|
149 | |
|
150 | |
|
151 | |
|
152 | |
|
153 | |
private static void addEdgeInfo(StringBuilder sb, |
154 | |
Edge<WritableComparable, Writable> edge) { |
155 | 0 | if (!(edge.getValue() instanceof NullWritable)) { |
156 | 0 | sb.append(" [label=").append(edge.getValue()).append(" ];"); |
157 | |
} |
158 | 0 | } |
159 | |
|
160 | |
|
161 | |
|
162 | |
|
163 | 0 | private static class GraphvizOutputCommitter extends OutputCommitter { |
164 | |
|
165 | |
private final OutputCommitter delegate; |
166 | |
|
167 | |
|
168 | |
|
169 | |
|
170 | |
|
171 | 0 | private GraphvizOutputCommitter(OutputCommitter delegate) { |
172 | 0 | this.delegate = delegate; |
173 | 0 | } |
174 | |
|
175 | |
@Override public boolean equals(Object o) { |
176 | 0 | return delegate.equals(o); |
177 | |
} |
178 | |
|
179 | |
@Override public String toString() { |
180 | 0 | return delegate.toString(); |
181 | |
} |
182 | |
|
183 | |
@Override public int hashCode() { |
184 | 0 | return delegate.hashCode(); |
185 | |
} |
186 | |
|
187 | |
@Override public void abortJob(JobContext jobContext, JobStatus.State state) |
188 | |
throws IOException { |
189 | 0 | delegate.abortJob(jobContext, state); |
190 | 0 | } |
191 | |
|
192 | |
@Override public void abortTask(TaskAttemptContext taskContext) |
193 | |
throws IOException { |
194 | 0 | delegate.abortTask(taskContext); |
195 | 0 | } |
196 | |
|
197 | |
@Override @Deprecated public void cleanupJob(JobContext context) |
198 | |
throws IOException { |
199 | 0 | delegate.cleanupJob(context); |
200 | 0 | } |
201 | |
|
202 | |
@Override public void commitJob(JobContext jobContext) throws IOException { |
203 | 0 | writeEnd(jobContext); |
204 | 0 | delegate.commitJob(jobContext); |
205 | 0 | } |
206 | |
|
207 | |
@Override public void commitTask(TaskAttemptContext taskContext) |
208 | |
throws IOException { |
209 | 0 | delegate.commitTask(taskContext); |
210 | 0 | } |
211 | |
|
212 | |
@Override public boolean needsTaskCommit(TaskAttemptContext taskContext) |
213 | |
throws IOException { |
214 | 0 | return delegate.needsTaskCommit(taskContext); |
215 | |
} |
216 | |
|
217 | |
@Override public void setupJob(JobContext jobContext) throws IOException { |
218 | 0 | delegate.setupJob(jobContext); |
219 | 0 | writeStart(jobContext); |
220 | 0 | } |
221 | |
|
222 | |
@Override public void setupTask(TaskAttemptContext taskContext) |
223 | |
throws IOException { |
224 | 0 | delegate.setupTask(taskContext); |
225 | 0 | } |
226 | |
} |
227 | |
|
228 | |
|
229 | |
|
230 | |
|
231 | 0 | private class VertexWriter extends TextVertexWriter { |
232 | |
@Override |
233 | |
public void writeVertex( |
234 | |
Vertex<WritableComparable, Writable, Writable> vertex) |
235 | |
throws IOException, InterruptedException { |
236 | 0 | StringBuilder sb = new StringBuilder(vertex.getNumEdges() * 10); |
237 | 0 | for (Edge<WritableComparable, Writable> edge : vertex.getEdges()) { |
238 | 0 | addEdge(sb, vertex.getId(), edge); |
239 | 0 | } |
240 | 0 | addNodeInfo(vertex, sb); |
241 | 0 | getRecordWriter().write(new Text(sb.toString()), null); |
242 | 0 | } |
243 | |
} |
244 | |
} |