1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.policy;
20
21 import com.sun.management.GarbageCollectionNotificationInfo;
22 import org.apache.giraph.conf.GiraphConstants;
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.ooc.OutOfCoreEngine;
25 import org.apache.giraph.ooc.command.IOCommand;
26 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
27 import org.apache.giraph.ooc.command.StorePartitionIOCommand;
28 import org.apache.log4j.Logger;
29
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import static com.google.common.base.Preconditions.checkState;
33
34
35 public class FixedPartitionsOracle implements OutOfCoreOracle {
36
37 private static final Logger LOG =
38 Logger.getLogger(FixedPartitionsOracle.class);
39
40 private final int maxPartitionsInMemory;
41
42
43
44
45
46 private final AtomicInteger deltaNumPartitionsInMemory =
47 new AtomicInteger(0);
48
49 private final OutOfCoreEngine oocEngine;
50
51
52
53
54
55
56
57 public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
58 OutOfCoreEngine oocEngine) {
59 this.maxPartitionsInMemory =
60 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
61 this.oocEngine = oocEngine;
62 }
63
64 @Override
65 public IOAction[] getNextIOActions() {
66 int numPartitionsInMemory =
67 oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
68 int numPartialPartitionsInMemory =
69 oocEngine.getMetaPartitionManager().getNumPartiallyInMemoryPartitions();
70 if (LOG.isDebugEnabled()) {
71 LOG.debug("getNextIOActions: calling with " + numPartitionsInMemory +
72 " partitions entirely in memory and " + numPartialPartitionsInMemory +
73 " partitions partially in memory, " +
74 deltaNumPartitionsInMemory.get() + " to be loaded");
75 }
76 checkState(numPartitionsInMemory >= 0);
77 checkState(numPartialPartitionsInMemory >= 0);
78 int numPartitions =
79 numPartitionsInMemory + deltaNumPartitionsInMemory.get();
80
81
82
83
84
85
86
87
88
89
90
91
92
93 if (numPartitions < maxPartitionsInMemory) {
94 return new IOAction[]{
95 IOAction.LOAD_PARTITION,
96 IOAction.STORE_MESSAGES_AND_BUFFERS};
97 } else if (numPartitions > maxPartitionsInMemory) {
98 if (LOG.isDebugEnabled()) {
99 LOG.debug("getNextIOActions: number of partitions in memory passed " +
100 "the specified threshold!");
101 }
102 return new IOAction[]{
103 IOAction.STORE_PARTITION,
104 IOAction.STORE_MESSAGES_AND_BUFFERS};
105 } else {
106 return new IOAction[]{
107 IOAction.STORE_MESSAGES_AND_BUFFERS,
108 IOAction.LOAD_TO_SWAP_PARTITION};
109 }
110 }
111
112 @Override
113 public boolean approve(IOCommand command) {
114 int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
115 .getNumInMemoryPartitions();
116
117
118
119
120 if (command instanceof LoadPartitionIOCommand &&
121 numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
122 maxPartitionsInMemory) {
123 deltaNumPartitionsInMemory.getAndDecrement();
124 return false;
125
126 } else if (command instanceof StorePartitionIOCommand &&
127 numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
128 maxPartitionsInMemory) {
129 deltaNumPartitionsInMemory.getAndIncrement();
130 return false;
131 }
132 return true;
133 }
134
135 @Override
136 public void commandCompleted(IOCommand command) {
137 if (command instanceof LoadPartitionIOCommand) {
138 deltaNumPartitionsInMemory.getAndDecrement();
139 } else if (command instanceof StorePartitionIOCommand) {
140 deltaNumPartitionsInMemory.getAndIncrement();
141 }
142 }
143
144 @Override
145 public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
146
147 @Override
148 public void startIteration() {
149 }
150 }