1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.edge.Edge;
24 import org.apache.hadoop.io.Text;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27 import org.apache.hadoop.mapreduce.TaskAttemptContext;
28
29 import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR;
30 import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE;
31
32
33
34
35
36
37
38
39
40
41
42 @SuppressWarnings("rawtypes")
43 public class SrcIdDstIdEdgeValueTextOutputFormat<I extends WritableComparable,
44 V extends Writable, E extends Writable>
45 extends TextEdgeOutputFormat<I, V, E> {
46
47 @Override
48 public TextEdgeWriter createEdgeWriter(TaskAttemptContext context) {
49 return new SrcIdDstIdEdgeValueEdgeWriter();
50 }
51
52
53
54
55 protected class SrcIdDstIdEdgeValueEdgeWriter<I extends WritableComparable,
56 V extends Writable, E extends Writable>
57 extends TextEdgeWriterToEachLine<I, V, E> {
58
59
60 private String delimiter;
61
62 private boolean reverseOutput;
63
64 @Override
65 public void initialize(TaskAttemptContext context)
66 throws IOException, InterruptedException {
67 super.initialize(context);
68 delimiter = GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR.get(getConf());
69 reverseOutput = GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE.get(getConf());
70 }
71
72 @Override
73 protected Text convertEdgeToLine(I sourceId, V sourceValue, Edge<I, E> edge)
74 throws IOException {
75 StringBuilder msg = new StringBuilder();
76 if (reverseOutput) {
77 msg.append(edge.getValue().toString());
78 msg.append(delimiter);
79 msg.append(edge.getTargetVertexId().toString());
80 msg.append(delimiter);
81 msg.append(sourceId.toString());
82 } else {
83 msg.append(sourceId.toString());
84 msg.append(delimiter);
85 msg.append(edge.getTargetVertexId().toString());
86 msg.append(delimiter);
87 msg.append(edge.getValue().toString());
88 }
89 return new Text(msg.toString());
90 }
91 }
92 }