1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.edge;
20
21 import com.google.common.collect.ArrayListMultimap;
22 import com.google.common.collect.UnmodifiableIterator;
23
24 import org.apache.giraph.utils.EdgeIterables;
25 import org.apache.giraph.utils.Trimmable;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28
29 import java.io.DataInput;
30 import java.io.DataOutput;
31 import java.io.IOException;
32 import java.util.Iterator;
33 import java.util.Map;
34
35
36
37
38
39
40
41
42
43
44 public class HashMultimapEdges<I extends WritableComparable, E extends Writable>
45 extends ConfigurableOutEdges<I, E>
46 implements MultiRandomAccessOutEdges<I, E>, Trimmable {
47
48 private ArrayListMultimap<I, E> edgeMultimap;
49
50 @Override
51 public void initialize(Iterable<Edge<I, E>> edges) {
52 EdgeIterables.initialize(this, edges);
53 }
54
55
56
57
58
59
60
61
62 public void initialize(int expectedNeighbors, int expectedEdgesPerNeighbor) {
63 edgeMultimap = ArrayListMultimap.create(expectedNeighbors,
64 expectedEdgesPerNeighbor);
65 }
66
67 @Override
68 public void initialize(int capacity) {
69
70
71 initialize(capacity, 1);
72 }
73
74 @Override
75 public void initialize() {
76 edgeMultimap = ArrayListMultimap.create();
77 }
78
79 @Override
80 public void add(Edge<I, E> edge) {
81 edgeMultimap.put(edge.getTargetVertexId(), edge.getValue());
82 }
83
84 @Override
85 public void remove(I targetVertexId) {
86 edgeMultimap.removeAll(targetVertexId);
87 }
88
89 @Override
90 public Iterable<E> getAllEdgeValues(I targetVertexId) {
91 return edgeMultimap.get(targetVertexId);
92 }
93
94 @Override
95 public int size() {
96 return edgeMultimap.size();
97 }
98
99 @Override
100 public Iterator<Edge<I, E>> iterator() {
101
102 return new UnmodifiableIterator<Edge<I, E>>() {
103
104 private Iterator<Map.Entry<I, E>> mapIterator =
105 edgeMultimap.entries().iterator();
106
107 private ReusableEdge<I, E> representativeEdge =
108 getConf().createReusableEdge();
109
110 @Override
111 public boolean hasNext() {
112 return mapIterator.hasNext();
113 }
114
115 @Override
116 public Edge<I, E> next() {
117 Map.Entry<I, E> nextEntry = mapIterator.next();
118 representativeEdge.setTargetVertexId(nextEntry.getKey());
119 representativeEdge.setValue(nextEntry.getValue());
120 return representativeEdge;
121 }
122 };
123 }
124
125 @Override
126 public void write(DataOutput out) throws IOException {
127
128
129 out.writeInt(edgeMultimap.size());
130 out.writeInt(edgeMultimap.keys().size());
131 for (Map.Entry<I, E> edge : edgeMultimap.entries()) {
132 edge.getKey().write(out);
133 edge.getValue().write(out);
134 }
135 }
136
137 @Override
138 public void readFields(DataInput in) throws IOException {
139
140
141 int numEdges = in.readInt();
142 int numNeighbors = in.readInt();
143 initialize(numEdges, numNeighbors == 0 ? 0 : numEdges / numNeighbors);
144 for (int i = 0; i < numEdges; ++i) {
145 I targetVertexId = getConf().createVertexId();
146 targetVertexId.readFields(in);
147 E edgeValue = getConf().createEdgeValue();
148 edgeValue.readFields(in);
149 edgeMultimap.put(targetVertexId, edgeValue);
150 }
151 }
152
153 @Override
154 public void trim() {
155 edgeMultimap.trimToSize();
156 }
157 }