1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21
22 import java.io.IOException;
23 import org.apache.giraph.graph.Vertex;
24 import org.apache.hadoop.io.Text;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27 import org.apache.hadoop.mapreduce.TaskAttemptContext;
28
29
30
31
32
33
34
35
36
37
38
39 @SuppressWarnings("rawtypes")
40 public class IdWithValueTextOutputFormat<I extends WritableComparable,
41 V extends Writable, E extends Writable>
42 extends TextVertexOutputFormat<I, V, E> {
43
44
45 public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
46
47 public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
48
49 public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
50
51 public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
52
53 @Override
54 public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
55 return new IdWithValueVertexWriter();
56 }
57
58
59
60
61 protected class IdWithValueVertexWriter extends TextVertexWriterToEachLine {
62
63 private String delimiter;
64
65 private boolean reverseOutput;
66
67 @Override
68 public void initialize(TaskAttemptContext context) throws IOException,
69 InterruptedException {
70 super.initialize(context);
71 delimiter = getConf().get(
72 LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
73 reverseOutput = getConf().getBoolean(
74 REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT);
75 }
76
77 @Override
78 protected Text convertVertexToLine(Vertex<I, V, E> vertex)
79 throws IOException {
80
81 StringBuilder str = new StringBuilder();
82 if (reverseOutput) {
83 str.append(vertex.getValue().toString());
84 str.append(delimiter);
85 str.append(vertex.getId().toString());
86 } else {
87 str.append(vertex.getId().toString());
88 str.append(delimiter);
89 str.append(vertex.getValue().toString());
90 }
91 return new Text(str.toString());
92 }
93 }
94 }