1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io;
20
21 import java.io.IOException;
22 import org.apache.giraph.edge.Edge;
23 import org.apache.giraph.edge.EdgeFactory;
24 import org.apache.hadoop.io.Writable;
25 import org.apache.hadoop.io.WritableComparable;
26 import org.apache.hadoop.mapreduce.InputSplit;
27 import org.apache.hadoop.mapreduce.TaskAttemptContext;
28
29
30
31
32
33
34
35
36
37 public class ReverseEdgeDuplicator<I extends WritableComparable,
38 E extends Writable> extends EdgeReader<I, E> {
39
40 private final EdgeReader<I, E> baseReader;
41
42
43 private boolean haveReverseEdge = true;
44
45 private Edge<I, E> reverseEdge;
46
47 private I reverseSourceId;
48
49
50
51
52
53 public ReverseEdgeDuplicator(EdgeReader<I, E> baseReader) {
54 this.baseReader = baseReader;
55 }
56
57
58
59
60
61 public EdgeReader<I, E> getBaseReader() {
62 return baseReader;
63 }
64
65 @Override
66 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
67 throws IOException, InterruptedException {
68 baseReader.initialize(inputSplit, context);
69 haveReverseEdge = true;
70 }
71
72 @Override
73 public boolean nextEdge() throws IOException, InterruptedException {
74 boolean result = true;
75 if (haveReverseEdge) {
76 result = baseReader.nextEdge();
77 haveReverseEdge = false;
78 } else {
79 Edge<I, E> currentEdge = baseReader.getCurrentEdge();
80 reverseSourceId = currentEdge.getTargetVertexId();
81 reverseEdge = EdgeFactory.create(baseReader.getCurrentSourceId(),
82 currentEdge.getValue());
83 haveReverseEdge = true;
84 }
85 return result;
86 }
87
88 @Override
89 public I getCurrentSourceId() throws IOException, InterruptedException {
90 if (haveReverseEdge) {
91 return reverseSourceId;
92 } else {
93 return baseReader.getCurrentSourceId();
94 }
95 }
96
97 @Override
98 public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
99 if (haveReverseEdge) {
100 return reverseEdge;
101 } else {
102 return baseReader.getCurrentEdge();
103 }
104 }
105
106 @Override
107 public void close() throws IOException {
108 baseReader.close();
109 }
110
111 @Override
112 public float getProgress() throws IOException, InterruptedException {
113 return baseReader.getProgress();
114 }
115 }