Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
DiskBackedDataStore |
|
| 0.0;0 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.ooc.data; | |
20 | ||
21 | import com.google.common.collect.Maps; | |
22 | import com.google.common.collect.Sets; | |
23 | import org.apache.commons.lang3.tuple.MutablePair; | |
24 | import org.apache.commons.lang3.tuple.Pair; | |
25 | import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; | |
26 | import org.apache.giraph.conf.IntConfOption; | |
27 | import org.apache.giraph.ooc.OutOfCoreEngine; | |
28 | import org.apache.giraph.ooc.persistence.DataIndex; | |
29 | import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry; | |
30 | import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; | |
31 | import org.apache.log4j.Logger; | |
32 | ||
33 | import java.io.DataInput; | |
34 | import java.io.DataOutput; | |
35 | import java.io.IOException; | |
36 | import java.util.ArrayList; | |
37 | import java.util.HashSet; | |
38 | import java.util.List; | |
39 | import java.util.Map; | |
40 | import java.util.Set; | |
41 | import java.util.concurrent.ConcurrentMap; | |
42 | import java.util.concurrent.locks.ReadWriteLock; | |
43 | import java.util.concurrent.locks.ReentrantReadWriteLock; | |
44 | ||
45 | import static com.google.common.base.Preconditions.checkNotNull; | |
46 | import static com.google.common.base.Preconditions.checkState; | |
47 | import static org.apache.giraph.conf.GiraphConstants.ONE_MB; | |
48 | ||
49 | /** | |
50 | * This class provides basic operations for data structures that have to | |
51 | * participate in out-of-core mechanism. Essential subclasses of this class are: | |
52 | * - DiskBackedPartitionStore (for partition data) | |
53 | * - DiskBackedMessageStore (for messages) | |
54 | * - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP) | |
55 | * Basically, any data structure that may cause OOM to happen can be implemented | |
56 | * as a subclass of this class. | |
57 | * | |
58 | * There are two different terms used in the rest of this class: | |
59 | * - "data store" refers to in-memory representation of data. Usually this is | |
60 | * stored per-partition in in-memory implementations of data structures. For | |
61 | * instance, "data store" of a DiskBackedPartitionStore would collection of | |
62 | * all partitions kept in the in-memory partition store within the | |
63 | * DiskBackedPartitionStore. | |
64 | * - "raw data buffer" refers to raw data which were supposed to be | |
65 | * de-serialized and added to the data store, but they remain 'as is' in the | |
66 | * memory because their corresponding partition is offloaded to disk and is | |
67 | * not available in the data store. | |
68 | * | |
69 | * @param <T> raw data format of the data store subclassing this class | |
70 | */ | |
71 | public abstract class DiskBackedDataStore<T> { | |
72 | /** | |
73 | * Minimum size of a buffer (in bytes) to flush to disk. This is used to | |
74 | * decide whether vertex/edge buffers are large enough to flush to disk. | |
75 | */ | |
76 | 0 | public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH = |
77 | new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB, | |
78 | "Minimum size of a buffer (in bytes) to flush to disk."); | |
79 | ||
80 | /** Class logger. */ | |
81 | 0 | private static final Logger LOG = Logger.getLogger( |
82 | DiskBackedDataStore.class); | |
83 | /** Out-of-core engine */ | |
84 | protected final OutOfCoreEngine oocEngine; | |
85 | /** | |
86 | * Set containing ids of all partitions where the partition data is in some | |
87 | * file on disk. | |
88 | * Note that the out-of-core mechanism may decide to put the data for a | |
89 | * partition on disk, while the partition data is empty. For instance, at the | |
90 | * beginning of a superstep, out-of-core mechanism may decide to put incoming | |
91 | * messages of a partition on disk, while the partition has not received any | |
92 | * messages. In such scenarios, the "out-of-core mechanism" thinks that the | |
93 | * partition data is on disk, while disk-backed data stores may want to | |
94 | * optimize for IO/metadata accesses and decide not to create/write anything | |
95 | * on files on disk. | |
96 | * In summary, there is a subtle difference between this field and | |
97 | * `hasPartitionOnDisk` field. Basically, this field is used for optimizing | |
98 | * IO (mainly metadata) accesses by disk-backed stores, while | |
99 | * `hasPartitionDataOnDisk` is the view that out-of-core mechanism has | |
100 | * regarding partition storage statuses. Since out-of-core mechanism does not | |
101 | * know about the actual data for a partition, these two fields have to be | |
102 | * separate. | |
103 | */ | |
104 | 0 | protected final Set<Integer> hasPartitionDataOnFile = |
105 | 0 | Sets.newConcurrentHashSet(); |
106 | /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */ | |
107 | private final int minBufferSizeToOffload; | |
108 | /** Set containing ids of all out-of-core partitions */ | |
109 | 0 | private final Set<Integer> hasPartitionDataOnDisk = |
110 | 0 | Sets.newConcurrentHashSet(); |
111 | /** | |
112 | * Map of partition ids to list of raw data buffers. The map will have entries | |
113 | * only for partitions that their in-memory data structures are currently | |
114 | * offloaded to disk. We keep the aggregate size of buffers for each partition | |
115 | * as part of the values in the map to estimate how much memory we can free up | |
116 | * if we offload data buffers of a particular partition to disk. | |
117 | */ | |
118 | 0 | private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers = |
119 | 0 | Maps.newConcurrentMap(); |
120 | /** | |
121 | * Map of partition ids to number of raw data buffers offloaded to disk for | |
122 | * each partition. The map will have entries only for partitions that their | |
123 | * in-memory data structures are currently out of core. It is necessary to | |
124 | * know the number of data buffers on disk for a particular partition when we | |
125 | * are loading all these buffers back in memory. | |
126 | */ | |
127 | 0 | private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk = |
128 | 0 | Maps.newConcurrentMap(); |
129 | /** | |
130 | * Lock to avoid overlapping of read and write on data associated with each | |
131 | * partition. | |
132 | * */ | |
133 | 0 | private final ConcurrentMap<Integer, ReadWriteLock> locks = |
134 | 0 | Maps.newConcurrentMap(); |
135 | ||
136 | /** | |
137 | * Constructor. | |
138 | * | |
139 | * @param conf Configuration | |
140 | * @param oocEngine Out-of-core engine | |
141 | */ | |
142 | DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf, | |
143 | 0 | OutOfCoreEngine oocEngine) { |
144 | 0 | this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf); |
145 | 0 | this.oocEngine = oocEngine; |
146 | 0 | } |
147 | ||
148 | /** | |
149 | * Retrieves a lock for a given partition. If the lock for the given partition | |
150 | * does not exist, creates a new lock. | |
151 | * | |
152 | * @param partitionId id of the partition the lock is needed for | |
153 | * @return lock for a given partition | |
154 | */ | |
155 | private ReadWriteLock getPartitionLock(int partitionId) { | |
156 | 0 | ReadWriteLock readWriteLock = locks.get(partitionId); |
157 | 0 | if (readWriteLock == null) { |
158 | 0 | readWriteLock = new ReentrantReadWriteLock(); |
159 | 0 | ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock); |
160 | 0 | if (temp != null) { |
161 | 0 | readWriteLock = temp; |
162 | } | |
163 | } | |
164 | 0 | return readWriteLock; |
165 | } | |
166 | ||
167 | /** | |
168 | * Adds a data entry for a given partition to the current data store. If data | |
169 | * of a given partition in data store is already offloaded to disk, adds the | |
170 | * data entry to appropriate raw data buffer list. | |
171 | * | |
172 | * @param partitionId id of the partition to add the data entry to | |
173 | * @param entry data entry to add | |
174 | */ | |
175 | protected void addEntry(int partitionId, T entry) { | |
176 | // Addition of data entries to a data store is much more common than | |
177 | // out-of-core operations. Besides, in-memory data store implementations | |
178 | // existing in the code base already account for parallel addition to data | |
179 | // stores. Therefore, using read lock would optimize for parallel addition | |
180 | // to data stores, specially for cases where the addition should happen for | |
181 | // partitions that are entirely in memory. | |
182 | 0 | ReadWriteLock rwLock = getPartitionLock(partitionId); |
183 | 0 | rwLock.readLock().lock(); |
184 | 0 | if (hasPartitionDataOnDisk.contains(partitionId)) { |
185 | 0 | List<T> entryList = new ArrayList<>(); |
186 | 0 | entryList.add(entry); |
187 | 0 | int entrySize = entrySerializedSize(entry); |
188 | 0 | MutablePair<Integer, List<T>> newPair = |
189 | 0 | new MutablePair<>(entrySize, entryList); |
190 | 0 | Pair<Integer, List<T>> oldPair = |
191 | 0 | dataBuffers.putIfAbsent(partitionId, newPair); |
192 | 0 | if (oldPair != null) { |
193 | 0 | synchronized (oldPair) { |
194 | 0 | newPair = (MutablePair<Integer, List<T>>) oldPair; |
195 | 0 | newPair.setLeft(oldPair.getLeft() + entrySize); |
196 | 0 | newPair.getRight().add(entry); |
197 | 0 | } |
198 | } | |
199 | 0 | } else { |
200 | 0 | addEntryToInMemoryPartitionData(partitionId, entry); |
201 | } | |
202 | 0 | rwLock.readLock().unlock(); |
203 | 0 | } |
204 | ||
205 | /** | |
206 | * Loads and assembles all data for a given partition, and put it into the | |
207 | * data store. Returns the number of bytes transferred from disk to memory in | |
208 | * the loading process. | |
209 | * | |
210 | * @param partitionId id of the partition to load and assemble all data for | |
211 | * @return number of bytes loaded from disk to memory | |
212 | * @throws IOException | |
213 | */ | |
214 | public abstract long loadPartitionData(int partitionId) throws IOException; | |
215 | ||
216 | /** | |
217 | * The proxy method that does the actual operation for `loadPartitionData`, | |
218 | * but uses the data index given by the caller. | |
219 | * | |
220 | * @param partitionId id of the partition to load and assemble all data for | |
221 | * @param index data index chain for the data to load | |
222 | * @return number of bytes loaded from disk to memory | |
223 | * @throws IOException | |
224 | */ | |
225 | protected long loadPartitionDataProxy(int partitionId, DataIndex index) | |
226 | throws IOException { | |
227 | 0 | long numBytes = 0; |
228 | 0 | ReadWriteLock rwLock = getPartitionLock(partitionId); |
229 | 0 | rwLock.writeLock().lock(); |
230 | 0 | if (hasPartitionDataOnDisk.contains(partitionId)) { |
231 | 0 | int ioThreadId = |
232 | 0 | oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId); |
233 | 0 | numBytes += loadInMemoryPartitionData(partitionId, ioThreadId, |
234 | 0 | index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))); |
235 | 0 | hasPartitionDataOnDisk.remove(partitionId); |
236 | // Loading raw data buffers from disk if there is any and applying those | |
237 | // to already loaded in-memory data. | |
238 | 0 | Integer numBuffers = numDataBuffersOnDisk.remove(partitionId); |
239 | 0 | if (numBuffers != null) { |
240 | 0 | checkState(numBuffers > 0); |
241 | 0 | index.addIndex(DataIndex.TypeIndexEntry.BUFFER); |
242 | 0 | OutOfCoreDataAccessor.DataInputWrapper inputWrapper = |
243 | 0 | oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy()); |
244 | 0 | DataInput dataInput = inputWrapper.getDataInput(); |
245 | 0 | for (int i = 0; i < numBuffers; ++i) { |
246 | 0 | T entry = readNextEntry(dataInput); |
247 | 0 | addEntryToInMemoryPartitionData(partitionId, entry); |
248 | } | |
249 | 0 | numBytes += inputWrapper.finalizeInput(true); |
250 | 0 | index.removeLastIndex(); |
251 | } | |
252 | 0 | index.removeLastIndex(); |
253 | // Applying in-memory raw data buffers to in-memory partition data. | |
254 | 0 | Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId); |
255 | 0 | if (pair != null) { |
256 | 0 | for (T entry : pair.getValue()) { |
257 | 0 | addEntryToInMemoryPartitionData(partitionId, entry); |
258 | 0 | } |
259 | } | |
260 | } | |
261 | 0 | rwLock.writeLock().unlock(); |
262 | 0 | return numBytes; |
263 | } | |
264 | ||
265 | /** | |
266 | * Offloads partition data of a given partition in the data store to disk, and | |
267 | * returns the number of bytes offloaded from memory to disk. | |
268 | * | |
269 | * @param partitionId id of the partition to offload its data | |
270 | * @return number of bytes offloaded from memory to disk | |
271 | * @throws IOException | |
272 | */ | |
273 | public abstract long offloadPartitionData(int partitionId) throws IOException; | |
274 | ||
275 | /** | |
276 | * The proxy method that does the actual operation for `offloadPartitionData`, | |
277 | * but uses the data index given by the caller. | |
278 | * | |
279 | * @param partitionId id of the partition to offload its data | |
280 | * @param index data index chain for the data to offload | |
281 | * @return number of bytes offloaded from memory to disk | |
282 | * @throws IOException | |
283 | */ | |
284 | @edu.umd.cs.findbugs.annotations.SuppressWarnings( | |
285 | "UL_UNRELEASED_LOCK_EXCEPTION_PATH") | |
286 | protected long offloadPartitionDataProxy( | |
287 | int partitionId, DataIndex index) throws IOException { | |
288 | 0 | ReadWriteLock rwLock = getPartitionLock(partitionId); |
289 | 0 | rwLock.writeLock().lock(); |
290 | 0 | hasPartitionDataOnDisk.add(partitionId); |
291 | 0 | rwLock.writeLock().unlock(); |
292 | 0 | int ioThreadId = |
293 | 0 | oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId); |
294 | 0 | long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId, |
295 | 0 | index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))); |
296 | 0 | index.removeLastIndex(); |
297 | 0 | return numBytes; |
298 | } | |
299 | ||
300 | /** | |
301 | * Offloads raw data buffers of a given partition to disk, and returns the | |
302 | * number of bytes offloaded from memory to disk. | |
303 | * | |
304 | * @param partitionId id of the partition to offload its raw data buffers | |
305 | * @return number of bytes offloaded from memory to disk | |
306 | * @throws IOException | |
307 | */ | |
308 | public abstract long offloadBuffers(int partitionId) throws IOException; | |
309 | ||
310 | /** | |
311 | * The proxy method that does the actual operation for `offloadBuffers`, | |
312 | * but uses the data index given by the caller. | |
313 | * | |
314 | * @param partitionId id of the partition to offload its raw data buffers | |
315 | * @param index data index chain for the data to offload its buffers | |
316 | * @return number of bytes offloaded from memory to disk | |
317 | * @throws IOException | |
318 | */ | |
319 | protected long offloadBuffersProxy(int partitionId, DataIndex index) | |
320 | throws IOException { | |
321 | 0 | Pair<Integer, List<T>> pair = dataBuffers.get(partitionId); |
322 | 0 | if (pair == null || pair.getLeft() < minBufferSizeToOffload) { |
323 | 0 | return 0; |
324 | } | |
325 | 0 | ReadWriteLock rwLock = getPartitionLock(partitionId); |
326 | 0 | rwLock.writeLock().lock(); |
327 | 0 | pair = dataBuffers.remove(partitionId); |
328 | 0 | rwLock.writeLock().unlock(); |
329 | 0 | checkNotNull(pair); |
330 | 0 | checkState(!pair.getRight().isEmpty()); |
331 | 0 | int ioThreadId = |
332 | 0 | oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId); |
333 | 0 | index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)) |
334 | 0 | .addIndex(DataIndex.TypeIndexEntry.BUFFER); |
335 | 0 | OutOfCoreDataAccessor.DataOutputWrapper outputWrapper = |
336 | 0 | oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(), |
337 | true); | |
338 | 0 | for (T entry : pair.getRight()) { |
339 | 0 | writeEntry(entry, outputWrapper.getDataOutput()); |
340 | 0 | } |
341 | 0 | long numBytes = outputWrapper.finalizeOutput(); |
342 | 0 | index.removeLastIndex().removeLastIndex(); |
343 | 0 | int numBuffers = pair.getRight().size(); |
344 | 0 | Integer oldNumBuffersOnDisk = |
345 | 0 | numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers); |
346 | 0 | if (oldNumBuffersOnDisk != null) { |
347 | 0 | numDataBuffersOnDisk.replace(partitionId, |
348 | 0 | oldNumBuffersOnDisk + numBuffers); |
349 | } | |
350 | 0 | return numBytes; |
351 | } | |
352 | ||
353 | /** | |
354 | * Looks through all partitions that their data is not in the data store (is | |
355 | * offloaded to disk), and sees if any of them has enough raw data buffer in | |
356 | * memory. If so, puts that partition in a list to return. | |
357 | * | |
358 | * @param ioThreadId Id of the IO thread who would offload the buffers | |
359 | * @return Set of partition ids of all partition raw buffers where the | |
360 | * aggregate size of buffers are large enough and it is worth flushing | |
361 | * those buffers to disk | |
362 | */ | |
363 | public Set<Integer> getCandidateBuffersToOffload(int ioThreadId) { | |
364 | 0 | Set<Integer> result = new HashSet<>(); |
365 | for (Map.Entry<Integer, Pair<Integer, List<T>>> entry : | |
366 | 0 | dataBuffers.entrySet()) { |
367 | 0 | int partitionId = entry.getKey(); |
368 | 0 | long aggregateBufferSize = entry.getValue().getLeft(); |
369 | 0 | if (aggregateBufferSize > minBufferSizeToOffload && |
370 | 0 | oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId) == |
371 | ioThreadId) { | |
372 | 0 | result.add(partitionId); |
373 | } | |
374 | 0 | } |
375 | 0 | return result; |
376 | } | |
377 | ||
378 | /** | |
379 | * Writes a single raw entry to a given output stream. | |
380 | * | |
381 | * @param entry entry to write to output | |
382 | * @param out output stream to write the entry to | |
383 | * @throws IOException | |
384 | */ | |
385 | protected abstract void writeEntry(T entry, DataOutput out) | |
386 | throws IOException; | |
387 | ||
388 | /** | |
389 | * Reads the next available raw entry from a given input stream. | |
390 | * | |
391 | * @param in input stream to read the entry from | |
392 | * @return entry read from an input stream | |
393 | * @throws IOException | |
394 | */ | |
395 | protected abstract T readNextEntry(DataInput in) throws IOException; | |
396 | ||
397 | /** | |
398 | * Loads data of a partition into data store. Returns number of bytes loaded. | |
399 | * | |
400 | * @param partitionId id of the partition to load its data | |
401 | * @param ioThreadId id of the IO thread performing the load | |
402 | * @param index data index chain for the data to load | |
403 | * @return number of bytes loaded from disk to memory | |
404 | * @throws IOException | |
405 | */ | |
406 | protected abstract long loadInMemoryPartitionData( | |
407 | int partitionId, int ioThreadId, DataIndex index) throws IOException; | |
408 | ||
409 | /** | |
410 | * Offloads data of a partition in data store to disk. Returns the number of | |
411 | * bytes offloaded to disk | |
412 | * | |
413 | * @param partitionId id of the partition to offload to disk | |
414 | * @param ioThreadId id of the IO thread performing the offload | |
415 | * @param index data index chain for the data to offload | |
416 | * @return number of bytes offloaded from memory to disk | |
417 | * @throws IOException | |
418 | */ | |
419 | protected abstract long offloadInMemoryPartitionData( | |
420 | int partitionId, int ioThreadId, DataIndex index) throws IOException; | |
421 | ||
422 | /** | |
423 | * Gets the size of a given entry in bytes. | |
424 | * | |
425 | * @param entry input entry to find its size | |
426 | * @return size of given input entry in bytes | |
427 | */ | |
428 | protected abstract int entrySerializedSize(T entry); | |
429 | ||
430 | /** | |
431 | * Adds a single entry for a given partition to the in-memory data store. | |
432 | * | |
433 | * @param partitionId id of the partition to add the data to | |
434 | * @param entry input entry to add to the data store | |
435 | */ | |
436 | protected abstract void addEntryToInMemoryPartitionData(int partitionId, | |
437 | T entry); | |
438 | } |