1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.command;
20
21 import org.apache.giraph.ooc.OutOfCoreEngine;
22 import org.apache.giraph.ooc.data.DiskBackedMessageStore;
23
24 import java.io.IOException;
25
26 import static com.google.common.base.Preconditions.checkState;
27
28
29
30
31 public class StoreIncomingMessageIOCommand extends IOCommand {
32
33
34
35
36
37
38 public StoreIncomingMessageIOCommand(OutOfCoreEngine oocEngine,
39 int partitionId) {
40 super(oocEngine, partitionId);
41 }
42
43 @Override
44 public boolean execute() throws IOException {
45 boolean executed = false;
46 if (oocEngine.getMetaPartitionManager()
47 .startOffloadingMessages(partitionId)) {
48 DiskBackedMessageStore messageStore =
49 (DiskBackedMessageStore)
50 oocEngine.getServerData().getIncomingMessageStore();
51 checkState(messageStore != null);
52 numBytesTransferred +=
53 messageStore.offloadPartitionData(partitionId);
54 oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId);
55 executed = true;
56 }
57 return executed;
58 }
59
60 @Override
61 public IOCommandType getType() {
62 return IOCommandType.STORE_MESSAGE;
63 }
64
65 @Override
66 public String toString() {
67 return "StoreIncomingMessageIOCommand: (partitionId = " + partitionId + ")";
68 }
69 }