1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.partition;
20
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.List;
24
25 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26 import org.apache.giraph.worker.WorkerInfo;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29
30 import com.google.common.collect.Lists;
31
32
33
34
35
36
37
38
39
40 public abstract class MasterGraphPartitionerImpl<I extends WritableComparable,
41 V extends Writable, E extends Writable>
42 implements MasterGraphPartitioner<I, V, E> {
43
44 private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
45
46 private List<PartitionOwner> partitionOwnerList;
47
48
49
50
51
52
53
54 public MasterGraphPartitionerImpl(
55 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
56 this.conf = conf;
57 }
58
59 @Override
60 public Collection<PartitionOwner> createInitialPartitionOwners(
61 Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
62 int partitionCount = PartitionUtils.computePartitionCount(
63 availableWorkerInfos.size(), conf);
64 ArrayList<WorkerInfo> workerList =
65 new ArrayList<WorkerInfo>(availableWorkerInfos);
66
67 partitionOwnerList = new ArrayList<PartitionOwner>();
68 for (int i = 0; i < partitionCount; i++) {
69 partitionOwnerList.add(new BasicPartitionOwner(i, workerList.get(
70 getWorkerIndex(i, partitionCount, workerList.size()))));
71 }
72
73 return partitionOwnerList;
74 }
75
76 @Override
77 public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
78 partitionOwnerList = Lists.newArrayList(partitionOwners);
79 }
80
81 @Override
82 public Collection<PartitionOwner> generateChangedPartitionOwners(
83 Collection<PartitionStats> allPartitionStatsList,
84 Collection<WorkerInfo> availableWorkers,
85 int maxWorkers,
86 long superstep) {
87 return PartitionBalancer.balancePartitionsAcrossWorkers(conf,
88 partitionOwnerList, allPartitionStatsList, availableWorkers);
89 }
90
91 @Override
92 public Collection<PartitionOwner> getCurrentPartitionOwners() {
93 return partitionOwnerList;
94 }
95
96 @Override
97 public PartitionStats createPartitionStats() {
98 return new PartitionStats();
99 }
100
101
102
103
104
105
106
107
108
109 protected abstract int getWorkerIndex(
110 int partition, int partitionCount, int workerCount);
111 }