1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.partition;
20
21 import org.apache.giraph.worker.LocalData;
22 import org.apache.hadoop.io.LongWritable;
23 import org.apache.hadoop.io.Writable;
24 import org.apache.log4j.Logger;
25
26
27
28
29
30
31
32 @SuppressWarnings("unchecked")
33 public class LongMappingStorePartitionerFactory<V extends Writable,
34 E extends Writable> extends GraphPartitionerFactory<LongWritable, V, E> {
35
36 private static final Logger LOG = Logger.getLogger(
37 LongMappingStorePartitionerFactory.class);
38
39 protected LocalData<LongWritable, V, E, ? extends Writable> localData = null;
40
41 @Override
42 public void initialize(LocalData<LongWritable, V, E,
43 ? extends Writable> localData) {
44 this.localData = localData;
45 LOG.info("Initializing LongMappingStorePartitionerFactory with localData");
46 }
47
48 @Override
49 public int getPartition(LongWritable id, int partitionCount,
50 int workerCount) {
51 return localData.getMappingStoreOps().getPartition(id,
52 partitionCount, workerCount);
53 }
54
55 @Override
56 public int getWorker(int partition, int partitionCount, int workerCount) {
57 int numRows = partitionCount / workerCount;
58 numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
59 return partition / numRows;
60 }
61 }