1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import net.iharder.Base64;
22 import org.apache.giraph.edge.Edge;
23 import org.apache.giraph.graph.Vertex;
24 import org.apache.hadoop.io.Text;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27 import org.apache.hadoop.mapreduce.TaskAttemptContext;
28 import org.json.JSONArray;
29 import org.json.JSONException;
30 import org.json.JSONObject;
31
32 import java.io.ByteArrayOutputStream;
33 import java.io.DataOutput;
34 import java.io.DataOutputStream;
35 import java.io.IOException;
36
37
38
39
40
41
42
43
44
45
46
47 @SuppressWarnings("rawtypes")
48 public class JsonBase64VertexOutputFormat<I extends WritableComparable,
49 V extends Writable, E extends Writable> extends
50 TextVertexOutputFormat<I, V, E> {
51
52 @Override
53 public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
54 return new JsonBase64VertexWriter();
55 }
56
57
58
59
60 protected class JsonBase64VertexWriter extends TextVertexWriterToEachLine {
61
62 @Override
63 protected Text convertVertexToLine(Vertex<I, V, E> vertex)
64 throws IOException {
65 ByteArrayOutputStream outputStream =
66 new ByteArrayOutputStream();
67 DataOutput output = new DataOutputStream(outputStream);
68 JSONObject vertexObject = new JSONObject();
69 vertex.getId().write(output);
70 try {
71 vertexObject.put(
72 JsonBase64VertexFormat.VERTEX_ID_KEY,
73 Base64.encodeBytes(outputStream.toByteArray()));
74 } catch (JSONException e) {
75 throw new IllegalStateException(
76 "writerVertex: Failed to insert vertex id", e);
77 }
78 outputStream.reset();
79 vertex.getValue().write(output);
80 try {
81 vertexObject.put(
82 JsonBase64VertexFormat.VERTEX_VALUE_KEY,
83 Base64.encodeBytes(outputStream.toByteArray()));
84 } catch (JSONException e) {
85 throw new IllegalStateException(
86 "writerVertex: Failed to insert vertex value", e);
87 }
88 JSONArray edgeArray = new JSONArray();
89 for (Edge<I, E> edge : vertex.getEdges()) {
90 outputStream.reset();
91 edge.getTargetVertexId().write(output);
92 edge.getValue().write(output);
93 edgeArray.put(Base64.encodeBytes(outputStream.toByteArray()));
94 }
95 try {
96 vertexObject.put(
97 JsonBase64VertexFormat.EDGE_ARRAY_KEY,
98 edgeArray);
99 } catch (JSONException e) {
100 throw new IllegalStateException(
101 "writerVertex: Failed to insert edge array", e);
102 }
103 return new Text(vertexObject.toString());
104 }
105
106 }
107
108 }