1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm;
20
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Maps;
23 import org.apache.giraph.bsp.CentralizedServiceWorker;
24 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25 import org.apache.giraph.partition.PartitionOwner;
26 import org.apache.giraph.utils.PairList;
27 import org.apache.giraph.worker.WorkerInfo;
28
29 import javax.annotation.concurrent.NotThreadSafe;
30 import java.util.List;
31 import java.util.Map;
32
33
34
35
36
37
38
39 @NotThreadSafe
40 @SuppressWarnings("unchecked")
41 public abstract class SendDataCache<D> {
42
43
44
45
46 private final D[] dataCache;
47
48 private final int[] initialBufferSizes;
49
50 private final CentralizedServiceWorker serviceWorker;
51
52 private final int[] dataSizes;
53
54 private final int numWorkers;
55
56 private final Map<WorkerInfo, List<Integer>> workerPartitions =
57 Maps.newHashMap();
58
59 private final ImmutableClassesGiraphConfiguration conf;
60
61
62
63
64
65
66
67
68
69
70 public SendDataCache(ImmutableClassesGiraphConfiguration conf,
71 CentralizedServiceWorker<?, ?, ?> serviceWorker,
72 int maxRequestSize,
73 float additionalRequestSize) {
74 this.conf = conf;
75 this.serviceWorker = serviceWorker;
76 int maxPartition = 0;
77 for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
78 List<Integer> workerPartitionIds =
79 workerPartitions.get(partitionOwner.getWorkerInfo());
80 if (workerPartitionIds == null) {
81 workerPartitionIds = Lists.newArrayList();
82 workerPartitions.put(partitionOwner.getWorkerInfo(),
83 workerPartitionIds);
84 }
85 workerPartitionIds.add(partitionOwner.getPartitionId());
86 maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
87 }
88 dataCache = (D[]) new Object[maxPartition + 1];
89
90 int maxWorker = 0;
91 for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
92 maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
93 }
94 dataSizes = new int[maxWorker + 1];
95
96 int initialRequestSize =
97 (int) (maxRequestSize * (1 + additionalRequestSize));
98 initialBufferSizes = new int[maxWorker + 1];
99 for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
100 initialBufferSizes[workerInfo.getTaskId()] =
101 initialRequestSize / workerPartitions.get(workerInfo).size();
102 }
103 numWorkers = maxWorker + 1;
104 }
105
106
107
108
109
110
111
112
113
114 public PairList<Integer, D>
115 removeWorkerData(WorkerInfo workerInfo) {
116 PairList<Integer, D> workerData = new PairList<Integer, D>();
117 List<Integer> partitions = workerPartitions.get(workerInfo);
118 workerData.initialize(partitions.size());
119 for (Integer partitionId : partitions) {
120 if (dataCache[partitionId] != null) {
121 workerData.add(partitionId, (D) dataCache[partitionId]);
122 dataCache[partitionId] = null;
123 }
124 }
125 dataSizes[workerInfo.getTaskId()] = 0;
126 return workerData;
127 }
128
129
130
131
132
133
134 public PairList<WorkerInfo, PairList<Integer, D>> removeAllData() {
135 PairList<WorkerInfo, PairList<Integer, D>> allData =
136 new PairList<WorkerInfo, PairList<Integer, D>>();
137 allData.initialize(dataSizes.length);
138 for (WorkerInfo workerInfo : workerPartitions.keySet()) {
139 PairList<Integer, D> workerData = removeWorkerData(workerInfo);
140 if (!workerData.isEmpty()) {
141 allData.add(workerInfo, workerData);
142 }
143 dataSizes[workerInfo.getTaskId()] = 0;
144 }
145 return allData;
146 }
147
148
149
150
151
152
153
154 public D getData(int partitionId) {
155 return dataCache[partitionId];
156 }
157
158
159
160
161
162
163
164 public void setData(int partitionId, D data) {
165 dataCache[partitionId] = data;
166 }
167
168
169
170
171
172
173
174 public int getInitialBufferSize(int partitionId) {
175 return initialBufferSizes[partitionId];
176 }
177
178
179
180
181
182
183
184
185 public int incrDataSize(int partitionId, int size) {
186 dataSizes[partitionId] += size;
187 return dataSizes[partitionId];
188 }
189
190 public ImmutableClassesGiraphConfiguration getConf() {
191 return conf;
192 }
193
194
195
196
197
198
199 protected CentralizedServiceWorker getServiceWorker() {
200 return serviceWorker;
201 }
202
203
204
205
206
207
208
209 protected int getSendWorkerInitialBufferSize(int taskId) {
210 return initialBufferSizes[taskId];
211 }
212
213 protected int getNumWorkers() {
214 return this.numWorkers;
215 }
216
217 protected Map<WorkerInfo, List<Integer>> getWorkerPartitions() {
218 return workerPartitions;
219 }
220 }