1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.comm;
19
20 import org.apache.giraph.edge.Edge;
21 import org.apache.giraph.graph.Vertex;
22 import org.apache.giraph.graph.VertexMutations;
23 import org.apache.hadoop.io.Writable;
24 import org.apache.hadoop.io.WritableComparable;
25
26 import java.util.HashMap;
27 import java.util.Map;
28
29
30
31
32
33
34
35
36
37 @SuppressWarnings("rawtypes")
38 public class SendMutationsCache<I extends WritableComparable,
39 V extends Writable, E extends Writable> {
40
41 private Map<Integer, Map<I, VertexMutations<I, V, E>>> mutationCache =
42 new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
43
44 private final Map<Integer, Integer> mutationCountMap =
45 new HashMap<Integer, Integer>();
46
47
48
49
50
51
52
53
54
55 private VertexMutations<I, V, E> getVertexMutations(
56 Integer partitionId, I destVertexId) {
57 Map<I, VertexMutations<I, V, E>> idMutations =
58 mutationCache.get(partitionId);
59 if (idMutations == null) {
60 idMutations = new HashMap<I, VertexMutations<I, V, E>>();
61 mutationCache.put(partitionId, idMutations);
62 }
63 VertexMutations<I, V, E> mutations = idMutations.get(destVertexId);
64 if (mutations == null) {
65 mutations = new VertexMutations<I, V, E>();
66 idMutations.put(destVertexId, mutations);
67 }
68 return mutations;
69 }
70
71
72
73
74
75
76
77 private int incrementPartitionMutationCount(int partitionId) {
78 Integer currentPartitionMutationCount = mutationCountMap.get(partitionId);
79 if (currentPartitionMutationCount == null) {
80 currentPartitionMutationCount = 0;
81 }
82 Integer updatedPartitionMutationCount =
83 currentPartitionMutationCount + 1;
84 mutationCountMap.put(partitionId, updatedPartitionMutationCount);
85 return updatedPartitionMutationCount;
86 }
87
88
89
90
91
92
93
94
95
96 public int addEdgeMutation(
97 Integer partitionId, I destVertexId, Edge<I, E> edge) {
98
99 VertexMutations<I, V, E> mutations =
100 getVertexMutations(partitionId, destVertexId);
101
102
103 mutations.addEdge(edge);
104
105
106 return incrementPartitionMutationCount(partitionId);
107 }
108
109
110
111
112
113
114
115
116
117 public int removeEdgeMutation(
118 Integer partitionId, I vertexIndex, I destinationVertexIndex) {
119
120 VertexMutations<I, V, E> mutations =
121 getVertexMutations(partitionId, vertexIndex);
122
123
124 mutations.removeEdge(destinationVertexIndex);
125
126
127 return incrementPartitionMutationCount(partitionId);
128 }
129
130
131
132
133
134
135
136
137 public int addVertexMutation(
138 Integer partitionId, Vertex<I, V, E> vertex) {
139
140 VertexMutations<I, V, E> mutations =
141 getVertexMutations(partitionId, vertex.getId());
142
143
144 mutations.addVertex(vertex);
145
146
147 return incrementPartitionMutationCount(partitionId);
148 }
149
150
151
152
153
154
155
156
157 public int removeVertexMutation(
158 Integer partitionId, I destVertexId) {
159
160 VertexMutations<I, V, E> mutations =
161 getVertexMutations(partitionId, destVertexId);
162
163
164 mutations.removeVertex();
165
166
167 return incrementPartitionMutationCount(partitionId);
168 }
169
170
171
172
173
174
175
176 public Map<I, VertexMutations<I, V, E>> removePartitionMutations(
177 int partitionId) {
178 Map<I, VertexMutations<I, V, E>> idMutations =
179 mutationCache.remove(partitionId);
180 mutationCountMap.put(partitionId, 0);
181 return idMutations;
182 }
183
184
185
186
187
188
189 public Map<Integer, Map<I, VertexMutations<I, V, E>>>
190 removeAllPartitionMutations() {
191 Map<Integer, Map<I, VertexMutations<I, V, E>>> allMutations =
192 mutationCache;
193 mutationCache =
194 new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
195 mutationCountMap.clear();
196 return allMutations;
197 }
198 }