1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.master.input;
20
21 import org.apache.giraph.worker.WorkerInfo;
22 import org.apache.hadoop.mapreduce.InputSplit;
23
24 import java.io.IOException;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32
33
34
35
36 public class LocalityAwareInputSplitsMasterOrganizer
37 implements InputSplitsMasterOrganizer {
38
39 private final AtomicInteger listPointer = new AtomicInteger();
40
41 private final List<byte[]> serializedSplits;
42
43 private final AtomicBoolean[] splitsTaken;
44
45
46 private final Map<Integer, ConcurrentLinkedQueue<Integer>>
47 workerToPreferredSplitsMap;
48
49
50
51
52
53
54
55
56
57 public LocalityAwareInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
58 List<InputSplit> splits, List<WorkerInfo> workers) {
59 this.serializedSplits = serializedSplits;
60 splitsTaken = new AtomicBoolean[serializedSplits.size()];
61
62 for (int i = 0; i < serializedSplits.size(); i++) {
63 splitsTaken[i] = new AtomicBoolean(false);
64 }
65
66 workerToPreferredSplitsMap = new HashMap<>();
67 for (WorkerInfo worker : workers) {
68 workerToPreferredSplitsMap.put(worker.getTaskId(),
69 new ConcurrentLinkedQueue<Integer>());
70 }
71
72 for (int i = 0; i < splits.size(); i++) {
73 try {
74 String[] locations = splits.get(i).getLocations();
75
76 for (WorkerInfo worker : workers) {
77
78 for (String location : locations) {
79
80 if (location.contains(worker.getHostname())) {
81 workerToPreferredSplitsMap.get(worker.getTaskId()).add(i);
82 break;
83 }
84 }
85 }
86 } catch (IOException | InterruptedException e) {
87 throw new IllegalStateException(
88 "Exception occurred while getting splits locations", e);
89 }
90 }
91 }
92
93 @Override
94 public byte[] getSerializedSplitFor(int workerTaskId) {
95 ConcurrentLinkedQueue<Integer> preferredSplits =
96 workerToPreferredSplitsMap.get(workerTaskId);
97
98 while (true) {
99
100 Integer splitIndex = preferredSplits.poll();
101
102 if (splitIndex == null) {
103 break;
104 }
105
106 if (splitsTaken[splitIndex].compareAndSet(false, true)) {
107 return serializedSplits.get(splitIndex);
108 }
109 }
110
111
112 while (true) {
113
114 int splitIndex = listPointer.getAndIncrement();
115
116 if (splitIndex >= serializedSplits.size()) {
117 return null;
118 }
119
120 if (splitsTaken[splitIndex].compareAndSet(false, true)) {
121 return serializedSplits.get(splitIndex);
122 }
123 }
124 }
125 }