1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.partition;
20
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Set;
25
26 import org.apache.giraph.worker.WorkerInfo;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29
30 import com.google.common.collect.Lists;
31 import org.apache.log4j.Logger;
32
33
34
35
36
37
38
39
40
41 public abstract class WorkerGraphPartitionerImpl<I extends WritableComparable,
42 V extends Writable, E extends Writable>
43 implements WorkerGraphPartitioner<I, V, E> {
44
45 private static final Logger LOG = Logger.getLogger(
46 WorkerGraphPartitionerImpl.class);
47
48 private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
49
50 private Set<WorkerInfo> availableWorkers = new HashSet<>();
51
52 @Override
53 public PartitionOwner createPartitionOwner() {
54 return new BasicPartitionOwner();
55 }
56
57 @Override
58 public PartitionOwner getPartitionOwner(I vertexId) {
59 return partitionOwnerList.get(
60 getPartitionIndex(vertexId, partitionOwnerList.size(),
61 availableWorkers.size()));
62 }
63
64 @Override
65 public Collection<PartitionStats> finalizePartitionStats(
66 Collection<PartitionStats> workerPartitionStats,
67 PartitionStore<I, V, E> partitionStore) {
68
69 return workerPartitionStats;
70 }
71
72 @Override
73 public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
74 Collection<? extends PartitionOwner> masterSetPartitionOwners) {
75 PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
76 partitionOwnerList, myWorkerInfo, masterSetPartitionOwners);
77 extractAvailableWorkers();
78 return exchange;
79 }
80
81 @Override
82 public Collection<? extends PartitionOwner> getPartitionOwners() {
83 return partitionOwnerList;
84 }
85
86
87
88
89 public void extractAvailableWorkers() {
90 availableWorkers.clear();
91 for (PartitionOwner partitionOwner : partitionOwnerList) {
92 availableWorkers.add(partitionOwner.getWorkerInfo());
93 }
94 LOG.info("After updating partitionOwnerList " + availableWorkers.size() +
95 " workers are available");
96 }
97
98
99
100
101
102
103
104
105
106
107 protected abstract int getPartitionIndex(I id, int partitionCount,
108 int workerCount);
109 }