1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.command;
20
21 import com.google.common.base.Preconditions;
22 import org.apache.giraph.bsp.BspService;
23 import org.apache.giraph.comm.messages.MessageStore;
24 import org.apache.giraph.ooc.OutOfCoreEngine;
25 import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
26 import org.apache.giraph.ooc.data.DiskBackedMessageStore;
27 import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
28
29 import java.io.IOException;
30
31
32
33
34
35
36 public class LoadPartitionIOCommand extends IOCommand {
37
38
39
40
41 private final long superstep;
42
43
44
45
46
47
48
49
50 public LoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId,
51 long superstep) {
52 super(oocEngine, partitionId);
53 this.superstep = superstep;
54 }
55
56 @Override
57 public boolean execute() throws IOException {
58 boolean executed = false;
59 if (oocEngine.getMetaPartitionManager()
60 .startLoadingPartition(partitionId, superstep)) {
61 long currentSuperstep = oocEngine.getSuperstep();
62 DiskBackedPartitionStore partitionStore =
63 (DiskBackedPartitionStore)
64 oocEngine.getServerData().getPartitionStore();
65 numBytesTransferred +=
66 partitionStore.loadPartitionData(partitionId);
67 if (currentSuperstep == BspService.INPUT_SUPERSTEP &&
68 superstep == currentSuperstep) {
69 DiskBackedEdgeStore edgeStore =
70 (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
71 numBytesTransferred +=
72 edgeStore.loadPartitionData(partitionId);
73 }
74 MessageStore messageStore;
75 if (currentSuperstep == superstep) {
76 messageStore = oocEngine.getServerData().getCurrentMessageStore();
77 } else {
78 Preconditions.checkState(superstep == currentSuperstep + 1);
79 messageStore = oocEngine.getServerData().getIncomingMessageStore();
80 }
81 if (messageStore != null) {
82 numBytesTransferred += ((DiskBackedMessageStore) messageStore)
83 .loadPartitionData(partitionId);
84 }
85 oocEngine.getMetaPartitionManager()
86 .doneLoadingPartition(partitionId, superstep);
87 executed = true;
88 }
89 return executed;
90 }
91
92 @Override
93 public IOCommandType getType() {
94 return IOCommandType.LOAD_PARTITION;
95 }
96
97 @Override
98 public String toString() {
99 return "LoadPartitionIOCommand: (partitionId = " + partitionId + ", " +
100 "superstep = " + superstep + ")";
101 }
102 }