1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.command;
20
21 import org.apache.giraph.bsp.BspService;
22 import org.apache.giraph.comm.messages.MessageStore;
23 import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
24 import org.apache.giraph.ooc.data.DiskBackedMessageStore;
25 import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
26 import org.apache.giraph.ooc.OutOfCoreEngine;
27
28 import java.io.IOException;
29
30
31
32
33
34 public class StorePartitionIOCommand extends IOCommand {
35
36
37
38
39
40
41 public StorePartitionIOCommand(OutOfCoreEngine oocEngine,
42 int partitionId) {
43 super(oocEngine, partitionId);
44 }
45
46 @Override
47 public boolean execute() throws IOException {
48 boolean executed = false;
49 if (oocEngine.getMetaPartitionManager()
50 .startOffloadingPartition(partitionId)) {
51 DiskBackedPartitionStore partitionStore =
52 (DiskBackedPartitionStore)
53 oocEngine.getServerData().getPartitionStore();
54 numBytesTransferred +=
55 partitionStore.offloadPartitionData(partitionId);
56 if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) {
57 MessageStore messageStore =
58 oocEngine.getServerData().getCurrentMessageStore();
59 if (messageStore != null) {
60 numBytesTransferred += ((DiskBackedMessageStore) messageStore)
61 .offloadPartitionData(partitionId);
62 }
63 } else {
64 DiskBackedEdgeStore edgeStore =
65 (DiskBackedEdgeStore)
66 oocEngine.getServerData().getEdgeStore();
67 numBytesTransferred +=
68 edgeStore.offloadPartitionData(partitionId);
69 }
70 oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId);
71 executed = true;
72 }
73 return executed;
74 }
75
76 @Override
77 public IOCommandType getType() {
78 return IOCommandType.STORE_PARTITION;
79 }
80
81 @Override
82 public String toString() {
83 return "StorePartitionIOCommand: (partitionId = " + partitionId + ")";
84 }
85 }