Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
SendMutationsCache |
|
| 1.375;1.375 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | package org.apache.giraph.comm; | |
19 | ||
20 | import org.apache.giraph.edge.Edge; | |
21 | import org.apache.giraph.graph.Vertex; | |
22 | import org.apache.giraph.graph.VertexMutations; | |
23 | import org.apache.hadoop.io.Writable; | |
24 | import org.apache.hadoop.io.WritableComparable; | |
25 | ||
26 | import java.util.HashMap; | |
27 | import java.util.Map; | |
28 | ||
29 | /** | |
30 | * Aggregates the mutations to be sent to partitions so they can be sent in | |
31 | * bulk. Not thread-safe. | |
32 | * | |
33 | * @param <I> Vertex id | |
34 | * @param <V> Vertex data | |
35 | * @param <E> Edge data | |
36 | */ | |
37 | @SuppressWarnings("rawtypes") | |
38 | 0 | public class SendMutationsCache<I extends WritableComparable, |
39 | V extends Writable, E extends Writable> { | |
40 | /** Internal cache */ | |
41 | 0 | private Map<Integer, Map<I, VertexMutations<I, V, E>>> mutationCache = |
42 | new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>(); | |
43 | /** Number of mutations in each partition */ | |
44 | 0 | private final Map<Integer, Integer> mutationCountMap = |
45 | new HashMap<Integer, Integer>(); | |
46 | ||
47 | /** | |
48 | * Get the mutations for a partition and destination vertex (creating if | |
49 | * it doesn't exist). | |
50 | * | |
51 | * @param partitionId Partition id | |
52 | * @param destVertexId Destination vertex id | |
53 | * @return Mutations for the vertex | |
54 | */ | |
55 | private VertexMutations<I, V, E> getVertexMutations( | |
56 | Integer partitionId, I destVertexId) { | |
57 | 0 | Map<I, VertexMutations<I, V, E>> idMutations = |
58 | 0 | mutationCache.get(partitionId); |
59 | 0 | if (idMutations == null) { |
60 | 0 | idMutations = new HashMap<I, VertexMutations<I, V, E>>(); |
61 | 0 | mutationCache.put(partitionId, idMutations); |
62 | } | |
63 | 0 | VertexMutations<I, V, E> mutations = idMutations.get(destVertexId); |
64 | 0 | if (mutations == null) { |
65 | 0 | mutations = new VertexMutations<I, V, E>(); |
66 | 0 | idMutations.put(destVertexId, mutations); |
67 | } | |
68 | 0 | return mutations; |
69 | } | |
70 | ||
71 | /** | |
72 | * Increment the number of mutations in a partition. | |
73 | * | |
74 | * @param partitionId Partition id | |
75 | * @return Number of mutations in a partition after the increment | |
76 | */ | |
77 | private int incrementPartitionMutationCount(int partitionId) { | |
78 | 0 | Integer currentPartitionMutationCount = mutationCountMap.get(partitionId); |
79 | 0 | if (currentPartitionMutationCount == null) { |
80 | 0 | currentPartitionMutationCount = 0; |
81 | } | |
82 | 0 | Integer updatedPartitionMutationCount = |
83 | 0 | currentPartitionMutationCount + 1; |
84 | 0 | mutationCountMap.put(partitionId, updatedPartitionMutationCount); |
85 | 0 | return updatedPartitionMutationCount; |
86 | } | |
87 | ||
88 | /** | |
89 | * Add an add edge mutation to the cache. | |
90 | * | |
91 | * @param partitionId Partition id | |
92 | * @param destVertexId Destination vertex id | |
93 | * @param edge Edge to be added | |
94 | * @return Number of mutations in the partition. | |
95 | */ | |
96 | public int addEdgeMutation( | |
97 | Integer partitionId, I destVertexId, Edge<I, E> edge) { | |
98 | // Get the mutations for this partition | |
99 | 0 | VertexMutations<I, V, E> mutations = |
100 | 0 | getVertexMutations(partitionId, destVertexId); |
101 | ||
102 | // Add the edge | |
103 | 0 | mutations.addEdge(edge); |
104 | ||
105 | // Update the number of mutations per partition | |
106 | 0 | return incrementPartitionMutationCount(partitionId); |
107 | } | |
108 | ||
109 | /** | |
110 | * Add a remove edge mutation to the cache. | |
111 | * | |
112 | * @param partitionId Partition id | |
113 | * @param vertexIndex Destination vertex id | |
114 | * @param destinationVertexIndex Edge vertex index to be removed | |
115 | * @return Number of mutations in the partition. | |
116 | */ | |
117 | public int removeEdgeMutation( | |
118 | Integer partitionId, I vertexIndex, I destinationVertexIndex) { | |
119 | // Get the mutations for this partition | |
120 | 0 | VertexMutations<I, V, E> mutations = |
121 | 0 | getVertexMutations(partitionId, vertexIndex); |
122 | ||
123 | // Remove the edge | |
124 | 0 | mutations.removeEdge(destinationVertexIndex); |
125 | ||
126 | // Update the number of mutations per partition | |
127 | 0 | return incrementPartitionMutationCount(partitionId); |
128 | } | |
129 | ||
130 | /** | |
131 | * Add a add vertex mutation to the cache. | |
132 | * | |
133 | * @param partitionId Partition id | |
134 | * @param vertex Vertex to be added | |
135 | * @return Number of mutations in the partition. | |
136 | */ | |
137 | public int addVertexMutation( | |
138 | Integer partitionId, Vertex<I, V, E> vertex) { | |
139 | // Get the mutations for this partition | |
140 | 0 | VertexMutations<I, V, E> mutations = |
141 | 0 | getVertexMutations(partitionId, vertex.getId()); |
142 | ||
143 | // Add the vertex | |
144 | 0 | mutations.addVertex(vertex); |
145 | ||
146 | // Update the number of mutations per partition | |
147 | 0 | return incrementPartitionMutationCount(partitionId); |
148 | } | |
149 | ||
150 | /** | |
151 | * Add a remove vertex mutation to the cache. | |
152 | * | |
153 | * @param partitionId Partition id | |
154 | * @param destVertexId Vertex index to be removed | |
155 | * @return Number of mutations in the partition. | |
156 | */ | |
157 | public int removeVertexMutation( | |
158 | Integer partitionId, I destVertexId) { | |
159 | // Get the mutations for this partition | |
160 | 0 | VertexMutations<I, V, E> mutations = |
161 | 0 | getVertexMutations(partitionId, destVertexId); |
162 | ||
163 | // Remove the vertex | |
164 | 0 | mutations.removeVertex(); |
165 | ||
166 | // Update the number of mutations per partition | |
167 | 0 | return incrementPartitionMutationCount(partitionId); |
168 | } | |
169 | ||
170 | /** | |
171 | * Gets the mutations for a partition and removes it from the cache. | |
172 | * | |
173 | * @param partitionId Partition id | |
174 | * @return Removed partition mutations | |
175 | */ | |
176 | public Map<I, VertexMutations<I, V, E>> removePartitionMutations( | |
177 | int partitionId) { | |
178 | 0 | Map<I, VertexMutations<I, V, E>> idMutations = |
179 | 0 | mutationCache.remove(partitionId); |
180 | 0 | mutationCountMap.put(partitionId, 0); |
181 | 0 | return idMutations; |
182 | } | |
183 | ||
184 | /** | |
185 | * Gets all the mutations and removes them from the cache. | |
186 | * | |
187 | * @return All vertex mutations for all partitions | |
188 | */ | |
189 | public Map<Integer, Map<I, VertexMutations<I, V, E>>> | |
190 | removeAllPartitionMutations() { | |
191 | 0 | Map<Integer, Map<I, VertexMutations<I, V, E>>> allMutations = |
192 | mutationCache; | |
193 | 0 | mutationCache = |
194 | new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>(); | |
195 | 0 | mutationCountMap.clear(); |
196 | 0 | return allMutations; |
197 | } | |
198 | } |