1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.data;
20
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import org.apache.commons.lang3.tuple.MutablePair;
24 import org.apache.commons.lang3.tuple.Pair;
25 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26 import org.apache.giraph.conf.IntConfOption;
27 import org.apache.giraph.ooc.OutOfCoreEngine;
28 import org.apache.giraph.ooc.persistence.DataIndex;
29 import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
30 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
31 import org.apache.log4j.Logger;
32
33 import java.io.DataInput;
34 import java.io.DataOutput;
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.concurrent.ConcurrentMap;
42 import java.util.concurrent.locks.ReadWriteLock;
43 import java.util.concurrent.locks.ReentrantReadWriteLock;
44
45 import static com.google.common.base.Preconditions.checkNotNull;
46 import static com.google.common.base.Preconditions.checkState;
47 import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public abstract class DiskBackedDataStore<T> {
72
73
74
75
76 public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
77 new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
78 "Minimum size of a buffer (in bytes) to flush to disk.");
79
80
81 private static final Logger LOG = Logger.getLogger(
82 DiskBackedDataStore.class);
83
84 protected final OutOfCoreEngine oocEngine;
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 protected final Set<Integer> hasPartitionDataOnFile =
105 Sets.newConcurrentHashSet();
106
107 private final int minBufferSizeToOffload;
108
109 private final Set<Integer> hasPartitionDataOnDisk =
110 Sets.newConcurrentHashSet();
111
112
113
114
115
116
117
118 private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
119 Maps.newConcurrentMap();
120
121
122
123
124
125
126
127 private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
128 Maps.newConcurrentMap();
129
130
131
132
133 private final ConcurrentMap<Integer, ReadWriteLock> locks =
134 Maps.newConcurrentMap();
135
136
137
138
139
140
141
142 DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf,
143 OutOfCoreEngine oocEngine) {
144 this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
145 this.oocEngine = oocEngine;
146 }
147
148
149
150
151
152
153
154
155 private ReadWriteLock getPartitionLock(int partitionId) {
156 ReadWriteLock readWriteLock = locks.get(partitionId);
157 if (readWriteLock == null) {
158 readWriteLock = new ReentrantReadWriteLock();
159 ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
160 if (temp != null) {
161 readWriteLock = temp;
162 }
163 }
164 return readWriteLock;
165 }
166
167
168
169
170
171
172
173
174
175 protected void addEntry(int partitionId, T entry) {
176
177
178
179
180
181
182 ReadWriteLock rwLock = getPartitionLock(partitionId);
183 rwLock.readLock().lock();
184 if (hasPartitionDataOnDisk.contains(partitionId)) {
185 List<T> entryList = new ArrayList<>();
186 entryList.add(entry);
187 int entrySize = entrySerializedSize(entry);
188 MutablePair<Integer, List<T>> newPair =
189 new MutablePair<>(entrySize, entryList);
190 Pair<Integer, List<T>> oldPair =
191 dataBuffers.putIfAbsent(partitionId, newPair);
192 if (oldPair != null) {
193 synchronized (oldPair) {
194 newPair = (MutablePair<Integer, List<T>>) oldPair;
195 newPair.setLeft(oldPair.getLeft() + entrySize);
196 newPair.getRight().add(entry);
197 }
198 }
199 } else {
200 addEntryToInMemoryPartitionData(partitionId, entry);
201 }
202 rwLock.readLock().unlock();
203 }
204
205
206
207
208
209
210
211
212
213
214 public abstract long loadPartitionData(int partitionId) throws IOException;
215
216
217
218
219
220
221
222
223
224
225 protected long loadPartitionDataProxy(int partitionId, DataIndex index)
226 throws IOException {
227 long numBytes = 0;
228 ReadWriteLock rwLock = getPartitionLock(partitionId);
229 rwLock.writeLock().lock();
230 if (hasPartitionDataOnDisk.contains(partitionId)) {
231 int ioThreadId =
232 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
233 numBytes += loadInMemoryPartitionData(partitionId, ioThreadId,
234 index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
235 hasPartitionDataOnDisk.remove(partitionId);
236
237
238 Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
239 if (numBuffers != null) {
240 checkState(numBuffers > 0);
241 index.addIndex(DataIndex.TypeIndexEntry.BUFFER);
242 OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
243 oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
244 DataInput dataInput = inputWrapper.getDataInput();
245 for (int i = 0; i < numBuffers; ++i) {
246 T entry = readNextEntry(dataInput);
247 addEntryToInMemoryPartitionData(partitionId, entry);
248 }
249 numBytes += inputWrapper.finalizeInput(true);
250 index.removeLastIndex();
251 }
252 index.removeLastIndex();
253
254 Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
255 if (pair != null) {
256 for (T entry : pair.getValue()) {
257 addEntryToInMemoryPartitionData(partitionId, entry);
258 }
259 }
260 }
261 rwLock.writeLock().unlock();
262 return numBytes;
263 }
264
265
266
267
268
269
270
271
272
273 public abstract long offloadPartitionData(int partitionId) throws IOException;
274
275
276
277
278
279
280
281
282
283
284 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
285 "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
286 protected long offloadPartitionDataProxy(
287 int partitionId, DataIndex index) throws IOException {
288 ReadWriteLock rwLock = getPartitionLock(partitionId);
289 rwLock.writeLock().lock();
290 hasPartitionDataOnDisk.add(partitionId);
291 rwLock.writeLock().unlock();
292 int ioThreadId =
293 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
294 long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId,
295 index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
296 index.removeLastIndex();
297 return numBytes;
298 }
299
300
301
302
303
304
305
306
307
308 public abstract long offloadBuffers(int partitionId) throws IOException;
309
310
311
312
313
314
315
316
317
318
319 protected long offloadBuffersProxy(int partitionId, DataIndex index)
320 throws IOException {
321 Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
322 if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
323 return 0;
324 }
325 ReadWriteLock rwLock = getPartitionLock(partitionId);
326 rwLock.writeLock().lock();
327 pair = dataBuffers.remove(partitionId);
328 rwLock.writeLock().unlock();
329 checkNotNull(pair);
330 checkState(!pair.getRight().isEmpty());
331 int ioThreadId =
332 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
333 index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))
334 .addIndex(DataIndex.TypeIndexEntry.BUFFER);
335 OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
336 oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
337 true);
338 for (T entry : pair.getRight()) {
339 writeEntry(entry, outputWrapper.getDataOutput());
340 }
341 long numBytes = outputWrapper.finalizeOutput();
342 index.removeLastIndex().removeLastIndex();
343 int numBuffers = pair.getRight().size();
344 Integer oldNumBuffersOnDisk =
345 numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
346 if (oldNumBuffersOnDisk != null) {
347 numDataBuffersOnDisk.replace(partitionId,
348 oldNumBuffersOnDisk + numBuffers);
349 }
350 return numBytes;
351 }
352
353
354
355
356
357
358
359
360
361
362
363 public Set<Integer> getCandidateBuffersToOffload(int ioThreadId) {
364 Set<Integer> result = new HashSet<>();
365 for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
366 dataBuffers.entrySet()) {
367 int partitionId = entry.getKey();
368 long aggregateBufferSize = entry.getValue().getLeft();
369 if (aggregateBufferSize > minBufferSizeToOffload &&
370 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId) ==
371 ioThreadId) {
372 result.add(partitionId);
373 }
374 }
375 return result;
376 }
377
378
379
380
381
382
383
384
385 protected abstract void writeEntry(T entry, DataOutput out)
386 throws IOException;
387
388
389
390
391
392
393
394
395 protected abstract T readNextEntry(DataInput in) throws IOException;
396
397
398
399
400
401
402
403
404
405
406 protected abstract long loadInMemoryPartitionData(
407 int partitionId, int ioThreadId, DataIndex index) throws IOException;
408
409
410
411
412
413
414
415
416
417
418
419 protected abstract long offloadInMemoryPartitionData(
420 int partitionId, int ioThreadId, DataIndex index) throws IOException;
421
422
423
424
425
426
427
428 protected abstract int entrySerializedSize(T entry);
429
430
431
432
433
434
435
436 protected abstract void addEntryToInMemoryPartitionData(int partitionId,
437 T entry);
438 }