1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.partition;
20
21 import org.apache.giraph.worker.WorkerInfo;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.log4j.Logger;
24
25 import com.google.common.base.Objects;
26
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.Comparator;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.PriorityQueue;
36 import java.util.Set;
37
38
39
40
41 public class PartitionBalancer {
42
43 public static final String PARTITION_BALANCE_ALGORITHM =
44 "hash.partitionBalanceAlgorithm";
45
46 public static final String STATIC_BALANCE_ALGORITHM =
47 "static";
48
49 public static final String EGDE_BALANCE_ALGORITHM =
50 "edges";
51
52 public static final String VERTICES_BALANCE_ALGORITHM =
53 "vertices";
54
55 private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
56
57
58
59
60 private enum BalanceValue {
61
62 UNSET,
63
64 EDGES,
65
66 VERTICES
67 }
68
69
70
71
72 private PartitionBalancer() { }
73
74
75
76
77
78
79
80
81 private static long getBalanceValue(PartitionStats partitionStat,
82 BalanceValue balanceValue) {
83 switch (balanceValue) {
84 case EDGES:
85 return partitionStat.getEdgeCount();
86 case VERTICES:
87 return partitionStat.getVertexCount();
88 default:
89 throw new IllegalArgumentException(
90 "getBalanceValue: Illegal balance value " + balanceValue);
91 }
92 }
93
94
95
96
97 private static class PartitionOwnerComparator implements
98 Comparator<PartitionOwner> {
99
100 private final Map<PartitionOwner, PartitionStats> ownerStatMap;
101
102 private final BalanceValue balanceValue;
103
104
105
106
107
108
109
110
111 public PartitionOwnerComparator(
112 Map<PartitionOwner, PartitionStats> ownerStatMap,
113 BalanceValue balanceValue) {
114 this.ownerStatMap = ownerStatMap;
115 this.balanceValue = balanceValue;
116 }
117
118 @Override
119 public int compare(PartitionOwner owner1, PartitionOwner owner2) {
120 return (int)
121 (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
122 getBalanceValue(ownerStatMap.get(owner2), balanceValue));
123 }
124 }
125
126
127
128
129
130 private static class WorkerInfoAssignments implements
131 Comparable<WorkerInfoAssignments> {
132
133 private final WorkerInfo workerInfo;
134
135 private final BalanceValue balanceValue;
136
137 private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
138
139 private long value = 0;
140
141
142
143
144
145
146
147
148 public WorkerInfoAssignments(
149 WorkerInfo workerInfo,
150 BalanceValue balanceValue,
151 Map<PartitionOwner, PartitionStats> ownerStatsMap) {
152 this.workerInfo = workerInfo;
153 this.balanceValue = balanceValue;
154 this.ownerStatsMap = ownerStatsMap;
155 }
156
157
158
159
160
161
162 public long getValue() {
163 return value;
164 }
165
166
167
168
169
170
171 public void assignPartitionOwner(
172 PartitionOwner partitionOwner) {
173 value += getBalanceValue(ownerStatsMap.get(partitionOwner),
174 balanceValue);
175 if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
176 partitionOwner.setPreviousWorkerInfo(
177 partitionOwner.getWorkerInfo());
178 partitionOwner.setWorkerInfo(workerInfo);
179 } else {
180 partitionOwner.setPreviousWorkerInfo(null);
181 }
182 }
183
184 @Override
185 public int compareTo(WorkerInfoAssignments other) {
186 return (int)
187 (getValue() - ((WorkerInfoAssignments) other).getValue());
188 }
189
190 @Override
191 public boolean equals(Object obj) {
192 return obj instanceof WorkerInfoAssignments &&
193 compareTo((WorkerInfoAssignments) obj) == 0;
194 }
195
196 @Override
197 public int hashCode() {
198 return Objects.hashCode(value);
199 }
200 }
201
202
203
204
205
206
207
208
209
210
211 public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
212 Configuration conf,
213 Collection<PartitionOwner> partitionOwners,
214 Collection<PartitionStats> allPartitionStats,
215 Collection<WorkerInfo> availableWorkerInfos) {
216
217 String balanceAlgorithm =
218 conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
219 if (LOG.isInfoEnabled()) {
220 LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
221 balanceAlgorithm);
222 }
223 BalanceValue balanceValue = BalanceValue.UNSET;
224 if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
225 return partitionOwners;
226 } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
227 balanceValue = BalanceValue.EDGES;
228 } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
229 balanceValue = BalanceValue.VERTICES;
230 } else {
231 throw new IllegalArgumentException(
232 "balancePartitionsAcrossWorkers: Illegal balance " +
233 "algorithm - " + balanceAlgorithm);
234 }
235
236
237 Map<Integer, PartitionStats> idStatMap =
238 new HashMap<Integer, PartitionStats>();
239 for (PartitionStats partitionStats : allPartitionStats) {
240 if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
241 null) {
242 throw new IllegalStateException(
243 "balancePartitionsAcrossWorkers: Duplicate partition id " +
244 "for " + partitionStats);
245 }
246 }
247 Map<PartitionOwner, PartitionStats> ownerStatsMap =
248 new HashMap<PartitionOwner, PartitionStats>();
249 for (PartitionOwner partitionOwner : partitionOwners) {
250 PartitionStats partitionStats =
251 idStatMap.get(partitionOwner.getPartitionId());
252 if (partitionStats == null) {
253 throw new IllegalStateException(
254 "balancePartitionsAcrossWorkers: Missing partition " +
255 "stats for " + partitionOwner);
256 }
257 if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
258 throw new IllegalStateException(
259 "balancePartitionsAcrossWorkers: Duplicate partition " +
260 "owner " + partitionOwner);
261 }
262 }
263 if (ownerStatsMap.size() != partitionOwners.size()) {
264 throw new IllegalStateException(
265 "balancePartitionsAcrossWorkers: ownerStats count = " +
266 ownerStatsMap.size() + ", partitionOwners count = " +
267 partitionOwners.size() + " and should match.");
268 }
269
270 List<WorkerInfoAssignments> workerInfoAssignmentsList =
271 new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
272 for (WorkerInfo workerInfo : availableWorkerInfos) {
273 workerInfoAssignmentsList.add(
274 new WorkerInfoAssignments(
275 workerInfo, balanceValue, ownerStatsMap));
276 }
277
278
279
280
281
282
283
284
285
286
287 List<PartitionOwner> partitionOwnerList =
288 new ArrayList<PartitionOwner>(partitionOwners);
289 Collections.sort(partitionOwnerList,
290 Collections.reverseOrder(
291 new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
292 PriorityQueue<WorkerInfoAssignments> minQueue =
293 new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
294 for (PartitionOwner partitionOwner : partitionOwnerList) {
295 WorkerInfoAssignments chosenWorker = minQueue.remove();
296 chosenWorker.assignPartitionOwner(partitionOwner);
297 minQueue.add(chosenWorker);
298 }
299
300 return partitionOwnerList;
301 }
302
303
304
305
306
307
308
309
310
311
312
313
314 public static PartitionExchange updatePartitionOwners(
315 List<PartitionOwner> partitionOwnerList,
316 WorkerInfo myWorkerInfo,
317 Collection<? extends PartitionOwner> masterSetPartitionOwners) {
318 partitionOwnerList.clear();
319 partitionOwnerList.addAll(masterSetPartitionOwners);
320
321 Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
322 Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
323 new HashMap<WorkerInfo, List<Integer>>();
324 for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
325 if (partitionOwner.getPreviousWorkerInfo() == null) {
326 continue;
327 } else if (partitionOwner.getWorkerInfo().equals(
328 myWorkerInfo) &&
329 partitionOwner.getPreviousWorkerInfo().equals(
330 myWorkerInfo)) {
331 throw new IllegalStateException(
332 "updatePartitionOwners: Impossible to have the same " +
333 "previous and current worker info " + partitionOwner +
334 " as me " + myWorkerInfo);
335 } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
336 dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
337 } else if (partitionOwner.getPreviousWorkerInfo().equals(
338 myWorkerInfo)) {
339 if (workerPartitionOwnerMap.containsKey(
340 partitionOwner.getWorkerInfo())) {
341 workerPartitionOwnerMap.get(
342 partitionOwner.getWorkerInfo()).add(
343 partitionOwner.getPartitionId());
344 } else {
345 List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
346 tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
347 workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
348 tmpPartitionOwnerList);
349 }
350 }
351 }
352
353 return new PartitionExchange(dependentWorkerSet,
354 workerPartitionOwnerMap);
355 }
356 }
357