1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.master.input;
20
21 import org.apache.giraph.worker.WorkerInfo;
22
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28
29
30
31
32
33 public class MappingInputSplitsMasterOrganizer
34 implements InputSplitsMasterOrganizer {
35
36 private final List<byte[]> splits;
37
38 private final Map<Integer, AtomicInteger>
39 workerTaskIdToNextSplitIndexMap;
40
41
42
43
44
45
46
47 public MappingInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
48 List<WorkerInfo> workers) {
49 this.splits = serializedSplits;
50 workerTaskIdToNextSplitIndexMap = new HashMap<>();
51 for (WorkerInfo worker : workers) {
52 workerTaskIdToNextSplitIndexMap.put(
53 worker.getTaskId(), new AtomicInteger(0));
54 }
55 }
56
57 @Override
58 public byte[] getSerializedSplitFor(int workerTaskId) {
59 AtomicInteger nextSplitIndex =
60 workerTaskIdToNextSplitIndexMap.get(workerTaskId);
61 int splitIndex = nextSplitIndex.getAndIncrement();
62 return splitIndex < splits.size() ? splits.get(splitIndex) : null;
63 }
64 }