1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.ooc.data; |
20 | |
|
21 | |
import com.google.common.collect.Maps; |
22 | |
import org.apache.giraph.bsp.BspService; |
23 | |
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
24 | |
import org.apache.giraph.edge.OutEdges; |
25 | |
import org.apache.giraph.graph.Vertex; |
26 | |
import org.apache.giraph.ooc.OutOfCoreEngine; |
27 | |
import org.apache.giraph.ooc.persistence.DataIndex; |
28 | |
import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; |
29 | |
import org.apache.giraph.partition.Partition; |
30 | |
import org.apache.giraph.partition.PartitionStore; |
31 | |
import org.apache.giraph.utils.ExtendedDataOutput; |
32 | |
import org.apache.giraph.utils.WritableUtils; |
33 | |
import org.apache.giraph.worker.BspServiceWorker; |
34 | |
import org.apache.hadoop.io.Writable; |
35 | |
import org.apache.hadoop.io.WritableComparable; |
36 | |
import org.apache.hadoop.mapreduce.Mapper; |
37 | |
import org.apache.log4j.Logger; |
38 | |
|
39 | |
import java.io.DataInput; |
40 | |
import java.io.DataOutput; |
41 | |
import java.io.IOException; |
42 | |
import java.util.Map; |
43 | |
|
44 | |
import static com.google.common.base.Preconditions.checkNotNull; |
45 | |
|
46 | |
|
47 | |
|
48 | |
|
49 | |
|
50 | |
|
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | 0 | public class DiskBackedPartitionStore<I extends WritableComparable, |
57 | |
V extends Writable, E extends Writable> |
58 | |
extends DiskBackedDataStore<ExtendedDataOutput> |
59 | |
implements PartitionStore<I, V, E> { |
60 | |
|
61 | 0 | private static final Logger LOG = |
62 | 0 | Logger.getLogger(DiskBackedPartitionStore.class); |
63 | |
|
64 | |
private final ImmutableClassesGiraphConfiguration<I, V, E> conf; |
65 | |
|
66 | |
private final Mapper<?, ?, ?, ?>.Context context; |
67 | |
|
68 | |
private final PartitionStore<I, V, E> partitionStore; |
69 | |
|
70 | |
|
71 | |
|
72 | |
|
73 | |
|
74 | 0 | private final Map<Integer, Long> partitionVertexCount = |
75 | 0 | Maps.newConcurrentMap(); |
76 | |
|
77 | |
|
78 | |
|
79 | |
|
80 | |
|
81 | 0 | private final Map<Integer, Long> partitionEdgeCount = |
82 | 0 | Maps.newConcurrentMap(); |
83 | |
|
84 | |
|
85 | |
|
86 | |
|
87 | |
|
88 | |
|
89 | |
|
90 | |
|
91 | |
|
92 | |
|
93 | |
public DiskBackedPartitionStore( |
94 | |
PartitionStore<I, V, E> partitionStore, |
95 | |
ImmutableClassesGiraphConfiguration<I, V, E> conf, |
96 | |
Mapper<?, ?, ?, ?>.Context context, |
97 | |
OutOfCoreEngine oocEngine) { |
98 | 0 | super(conf, oocEngine); |
99 | 0 | this.partitionStore = partitionStore; |
100 | 0 | this.conf = conf; |
101 | 0 | this.context = context; |
102 | 0 | } |
103 | |
|
104 | |
@Override |
105 | |
public boolean addPartition(Partition<I, V, E> partition) { |
106 | 0 | boolean added = partitionStore.addPartition(partition); |
107 | 0 | if (added) { |
108 | 0 | oocEngine.getMetaPartitionManager() |
109 | 0 | .addPartition(partition.getId()); |
110 | |
} |
111 | 0 | return added; |
112 | |
} |
113 | |
|
114 | |
@Override |
115 | |
public Partition<I, V, E> removePartition(Integer partitionId) { |
116 | |
|
117 | |
|
118 | 0 | oocEngine.getMetaPartitionManager().markPartitionAsInProcess(partitionId); |
119 | 0 | oocEngine.retrievePartition(partitionId); |
120 | 0 | Partition<I, V, E> partition = partitionStore.removePartition(partitionId); |
121 | 0 | checkNotNull(partition, "removePartition: partition " + partitionId + |
122 | |
" is not in memory for removal!"); |
123 | 0 | oocEngine.getMetaPartitionManager().removePartition(partitionId); |
124 | 0 | return partition; |
125 | |
} |
126 | |
|
127 | |
@Override |
128 | |
public boolean hasPartition(Integer partitionId) { |
129 | 0 | return oocEngine.getMetaPartitionManager().hasPartition(partitionId); |
130 | |
} |
131 | |
|
132 | |
@Override |
133 | |
public Iterable<Integer> getPartitionIds() { |
134 | 0 | return oocEngine.getMetaPartitionManager().getPartitionIds(); |
135 | |
} |
136 | |
|
137 | |
@Override |
138 | |
public int getNumPartitions() { |
139 | 0 | return oocEngine.getMetaPartitionManager().getNumPartitions(); |
140 | |
} |
141 | |
|
142 | |
@Override |
143 | |
public long getPartitionVertexCount(Integer partitionId) { |
144 | 0 | if (partitionStore.hasPartition(partitionId)) { |
145 | 0 | return partitionStore.getPartitionVertexCount(partitionId); |
146 | |
} else { |
147 | 0 | return partitionVertexCount.get(partitionId); |
148 | |
} |
149 | |
} |
150 | |
|
151 | |
@Override |
152 | |
public long getPartitionEdgeCount(Integer partitionId) { |
153 | 0 | if (partitionStore.hasPartition(partitionId)) { |
154 | 0 | return partitionStore.getPartitionEdgeCount(partitionId); |
155 | |
} else { |
156 | 0 | return partitionEdgeCount.get(partitionId); |
157 | |
} |
158 | |
} |
159 | |
|
160 | |
@Override |
161 | |
public boolean isEmpty() { |
162 | 0 | return getNumPartitions() == 0; |
163 | |
} |
164 | |
|
165 | |
@Override |
166 | |
public void startIteration() { |
167 | 0 | oocEngine.startIteration(); |
168 | 0 | } |
169 | |
|
170 | |
@Override |
171 | |
public Partition<I, V, E> getNextPartition() { |
172 | 0 | Integer partitionId = oocEngine.getNextPartition(); |
173 | 0 | if (partitionId == null) { |
174 | 0 | return null; |
175 | |
} |
176 | 0 | Partition<I, V, E> partition = partitionStore.removePartition(partitionId); |
177 | 0 | if (partition == null) { |
178 | 0 | if (LOG.isInfoEnabled()) { |
179 | 0 | LOG.info("getNextPartition: partition " + partitionId + " is not in " + |
180 | |
"the partition store. Creating an empty partition for it."); |
181 | |
} |
182 | 0 | partition = conf.createPartition(partitionId, context); |
183 | |
} |
184 | 0 | partitionStore.addPartition(partition); |
185 | 0 | return partition; |
186 | |
} |
187 | |
|
188 | |
@Override |
189 | |
public void putPartition(Partition<I, V, E> partition) { |
190 | 0 | oocEngine.doneProcessingPartition(partition.getId()); |
191 | 0 | } |
192 | |
|
193 | |
@Override |
194 | |
public void addPartitionVertices(Integer partitionId, |
195 | |
ExtendedDataOutput extendedDataOutput) { |
196 | 0 | addEntry(partitionId, extendedDataOutput); |
197 | 0 | } |
198 | |
|
199 | |
@Override |
200 | |
public void shutdown() { |
201 | 0 | oocEngine.shutdown(); |
202 | 0 | } |
203 | |
|
204 | |
@Override |
205 | |
public void initialize() { |
206 | 0 | oocEngine.initialize(); |
207 | 0 | } |
208 | |
|
209 | |
|
210 | |
|
211 | |
|
212 | |
|
213 | |
|
214 | |
|
215 | |
|
216 | |
private void readVertexData(DataInput in, Vertex<I, V, E> vertex) |
217 | |
throws IOException { |
218 | 0 | I id = conf.createVertexId(); |
219 | 0 | id.readFields(in); |
220 | 0 | V value = null; |
221 | 0 | boolean hasNullValue = in.readBoolean(); |
222 | 0 | if (!hasNullValue) { |
223 | 0 | value = conf.createVertexValue(); |
224 | 0 | value.readFields(in); |
225 | |
} |
226 | 0 | OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0); |
227 | 0 | vertex.initialize(id, value, edges); |
228 | 0 | if (in.readBoolean()) { |
229 | 0 | vertex.voteToHalt(); |
230 | |
} else { |
231 | 0 | vertex.wakeUp(); |
232 | |
} |
233 | 0 | } |
234 | |
|
235 | |
|
236 | |
|
237 | |
|
238 | |
|
239 | |
|
240 | |
|
241 | |
|
242 | |
private void readOutEdges(DataInput in, Partition<I, V, E> partition) |
243 | |
throws IOException { |
244 | 0 | I id = conf.createVertexId(); |
245 | 0 | id.readFields(in); |
246 | 0 | Vertex<I, V, E> v = partition.getVertex(id); |
247 | 0 | if (v == null) { |
248 | 0 | throw new IllegalStateException("Vertex with ID " + id + |
249 | 0 | " not found in partition " + partition.getId() + |
250 | 0 | " which has " + partition.getVertexCount() + " vertices and " + |
251 | 0 | partition.getEdgeCount() + " edges."); |
252 | |
} |
253 | 0 | OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges(); |
254 | 0 | edges.readFields(in); |
255 | 0 | partition.saveVertex(v); |
256 | 0 | } |
257 | |
|
258 | |
@Override |
259 | |
protected long loadInMemoryPartitionData(int partitionId, int ioThreadId, |
260 | |
DataIndex index) throws IOException { |
261 | 0 | long numBytes = 0; |
262 | |
|
263 | 0 | if (hasPartitionDataOnFile.remove(partitionId)) { |
264 | 0 | Partition<I, V, E> partition = conf.createPartition(partitionId, context); |
265 | 0 | OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor(); |
266 | 0 | index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES); |
267 | 0 | OutOfCoreDataAccessor.DataInputWrapper inputWrapper = |
268 | 0 | dataAccessor.prepareInput(ioThreadId, index.copy()); |
269 | 0 | DataInput dataInput = inputWrapper.getDataInput(); |
270 | 0 | long numVertices = dataInput.readLong(); |
271 | 0 | for (long i = 0; i < numVertices; ++i) { |
272 | 0 | Vertex<I, V, E> vertex = conf.createVertex(); |
273 | 0 | readVertexData(dataInput, vertex); |
274 | 0 | partition.putVertex(vertex); |
275 | |
} |
276 | 0 | numBytes += inputWrapper.finalizeInput(true); |
277 | |
|
278 | |
|
279 | 0 | index.removeLastIndex() |
280 | 0 | .addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES); |
281 | 0 | inputWrapper = dataAccessor.prepareInput(ioThreadId, index.copy()); |
282 | 0 | dataInput = inputWrapper.getDataInput(); |
283 | 0 | for (int i = 0; i < numVertices; ++i) { |
284 | 0 | readOutEdges(dataInput, partition); |
285 | |
} |
286 | |
|
287 | |
|
288 | 0 | boolean shouldDeleteEdges = false; |
289 | 0 | if (!conf.isStaticGraph() || |
290 | 0 | oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) { |
291 | 0 | shouldDeleteEdges = true; |
292 | |
} |
293 | 0 | numBytes += inputWrapper.finalizeInput(shouldDeleteEdges); |
294 | 0 | index.removeLastIndex(); |
295 | 0 | partitionStore.addPartition(partition); |
296 | |
} |
297 | 0 | return numBytes; |
298 | |
} |
299 | |
|
300 | |
@Override |
301 | |
protected ExtendedDataOutput readNextEntry(DataInput in) throws IOException { |
302 | 0 | return WritableUtils.readExtendedDataOutput(in, conf); |
303 | |
} |
304 | |
|
305 | |
@Override |
306 | |
protected void addEntryToInMemoryPartitionData(int partitionId, |
307 | |
ExtendedDataOutput vertices) { |
308 | 0 | if (!partitionStore.hasPartition(partitionId)) { |
309 | 0 | oocEngine.getMetaPartitionManager().addPartition(partitionId); |
310 | |
} |
311 | 0 | partitionStore.addPartitionVertices(partitionId, vertices); |
312 | 0 | } |
313 | |
|
314 | |
@Override |
315 | |
public long loadPartitionData(int partitionId) |
316 | |
throws IOException { |
317 | 0 | return loadPartitionDataProxy(partitionId, |
318 | 0 | new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION)); |
319 | |
} |
320 | |
|
321 | |
@Override |
322 | |
public long offloadPartitionData(int partitionId) |
323 | |
throws IOException { |
324 | 0 | return offloadPartitionDataProxy(partitionId, |
325 | 0 | new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION)); |
326 | |
} |
327 | |
|
328 | |
|
329 | |
|
330 | |
|
331 | |
|
332 | |
|
333 | |
|
334 | |
|
335 | |
private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex) |
336 | |
throws IOException { |
337 | 0 | vertex.getId().write(output); |
338 | 0 | V value = vertex.getValue(); |
339 | 0 | if (value != null) { |
340 | 0 | output.writeBoolean(false); |
341 | 0 | value.write(output); |
342 | |
} else { |
343 | 0 | output.writeBoolean(true); |
344 | |
} |
345 | 0 | output.writeBoolean(vertex.isHalted()); |
346 | 0 | } |
347 | |
|
348 | |
|
349 | |
|
350 | |
|
351 | |
|
352 | |
|
353 | |
|
354 | |
|
355 | |
private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex) |
356 | |
throws IOException { |
357 | 0 | vertex.getId().write(output); |
358 | 0 | OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges(); |
359 | 0 | edges.write(output); |
360 | 0 | } |
361 | |
|
362 | |
@Override |
363 | |
protected long offloadInMemoryPartitionData( |
364 | |
int partitionId, int ioThreadId, DataIndex index) throws IOException { |
365 | 0 | long numBytes = 0; |
366 | 0 | if (partitionStore.hasPartition(partitionId)) { |
367 | 0 | OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor(); |
368 | 0 | partitionVertexCount.put(partitionId, |
369 | 0 | partitionStore.getPartitionVertexCount(partitionId)); |
370 | 0 | partitionEdgeCount.put(partitionId, |
371 | 0 | partitionStore.getPartitionEdgeCount(partitionId)); |
372 | 0 | Partition<I, V, E> partition = |
373 | 0 | partitionStore.removePartition(partitionId); |
374 | 0 | LOG.debug( |
375 | |
"Offloading partition " + partition + " DataIndex[" + index + "]"); |
376 | 0 | index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES); |
377 | 0 | OutOfCoreDataAccessor.DataOutputWrapper outputWrapper = |
378 | 0 | dataAccessor.prepareOutput(ioThreadId, index.copy(), false); |
379 | 0 | DataOutput dataOutput = outputWrapper.getDataOutput(); |
380 | 0 | dataOutput.writeLong(partition.getVertexCount()); |
381 | 0 | for (Vertex<I, V, E> vertex : partition) { |
382 | 0 | writeVertexData(dataOutput, vertex); |
383 | 0 | } |
384 | 0 | numBytes += outputWrapper.finalizeOutput(); |
385 | 0 | index.removeLastIndex(); |
386 | |
|
387 | |
|
388 | |
|
389 | |
|
390 | 0 | index.addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES); |
391 | 0 | if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP || |
392 | 0 | !conf.isStaticGraph() || |
393 | 0 | !dataAccessor.dataExist(ioThreadId, index)) { |
394 | 0 | outputWrapper = dataAccessor.prepareOutput(ioThreadId, index.copy(), |
395 | |
false); |
396 | 0 | for (Vertex<I, V, E> vertex : partition) { |
397 | 0 | writeOutEdges(outputWrapper.getDataOutput(), vertex); |
398 | 0 | } |
399 | 0 | numBytes += outputWrapper.finalizeOutput(); |
400 | |
} |
401 | 0 | index.removeLastIndex(); |
402 | 0 | hasPartitionDataOnFile.add(partitionId); |
403 | |
} |
404 | 0 | return numBytes; |
405 | |
} |
406 | |
|
407 | |
@Override |
408 | |
protected void writeEntry(ExtendedDataOutput vertices, DataOutput out) |
409 | |
throws IOException { |
410 | 0 | WritableUtils.writeExtendedDataOutput(vertices, out); |
411 | 0 | } |
412 | |
|
413 | |
@Override |
414 | |
public long offloadBuffers(int partitionId) |
415 | |
throws IOException { |
416 | 0 | return offloadBuffersProxy(partitionId, |
417 | 0 | new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION)); |
418 | |
} |
419 | |
|
420 | |
@Override |
421 | |
protected int entrySerializedSize(ExtendedDataOutput vertices) { |
422 | 0 | return vertices.getPos(); |
423 | |
} |
424 | |
} |