1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.formats;
19
20 import org.apache.giraph.edge.Edge;
21 import org.apache.giraph.graph.Vertex;
22 import org.apache.hadoop.io.Text;
23 import org.apache.hadoop.io.Writable;
24 import org.apache.hadoop.io.WritableComparable;
25 import org.apache.hadoop.mapreduce.TaskAttemptContext;
26
27 import java.io.IOException;
28
29
30
31
32
33
34
35
36
37
38
39
40 @SuppressWarnings("rawtypes")
41 public class AdjacencyListTextVertexOutputFormat<I extends WritableComparable,
42 V extends Writable, E extends Writable>
43 extends TextVertexOutputFormat<I, V, E> {
44
45 public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
46
47 public static final String LINE_TOKENIZE_VALUE_DEFAULT =
48 AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE_DEFAULT;
49
50 @Override
51 public AdjacencyListTextVertexWriter createVertexWriter(
52 TaskAttemptContext context) {
53 return new AdjacencyListTextVertexWriter();
54 }
55
56
57
58
59 protected class AdjacencyListTextVertexWriter extends
60 TextVertexWriterToEachLine {
61
62 private String delimiter;
63
64 @Override
65 public void initialize(TaskAttemptContext context) throws IOException,
66 InterruptedException {
67 super.initialize(context);
68 delimiter =
69 getConf().get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
70 }
71
72 @Override
73 public Text convertVertexToLine(Vertex<I, V, E> vertex)
74 throws IOException {
75 StringBuffer sb = new StringBuffer(vertex.getId().toString());
76 sb.append(delimiter);
77 sb.append(vertex.getValue());
78
79 for (Edge<I, E> edge : vertex.getEdges()) {
80 sb.append(delimiter).append(edge.getTargetVertexId());
81 sb.append(delimiter).append(edge.getValue());
82 }
83
84 return new Text(sb.toString());
85 }
86 }
87
88 }