1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.persistence;
20
21 import org.apache.giraph.conf.GiraphConstants;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.conf.IntConfOption;
24 import org.apache.giraph.utils.ExtendedDataOutput;
25 import org.apache.giraph.utils.io.BigDataInput;
26 import org.apache.giraph.utils.io.BigDataOutput;
27
28 import java.io.DataInput;
29 import java.io.DataOutput;
30 import java.io.IOException;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.LinkedBlockingDeque;
33
34
35
36
37
38 public class InMemoryDataAccessor implements OutOfCoreDataAccessor {
39
40 private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
41
42 private final PooledBigDataOutputFactory outputFactory;
43
44 private final ConcurrentHashMap<
45 DataIndex, PooledBigDataOutputFactory.Output> data;
46
47
48
49
50
51
52 public InMemoryDataAccessor(
53 ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
54 this.conf = conf;
55 outputFactory = new PooledBigDataOutputFactory(conf);
56 data = new ConcurrentHashMap<>();
57 }
58
59 @Override
60 public void initialize() {
61
62 }
63
64 @Override
65 public void shutdown() {
66
67 }
68
69 @Override
70 public int getNumAccessorThreads() {
71 return GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf);
72 }
73
74 @Override
75 public DataInputWrapper prepareInput(int threadId,
76 DataIndex index) throws IOException {
77 return new InMemoryDataInputWrapper(
78 new BigDataInput(data.get(index)), index);
79 }
80
81 @Override
82 public DataOutputWrapper prepareOutput(int threadId,
83 DataIndex index, boolean shouldAppend) throws IOException {
84
85
86 PooledBigDataOutputFactory.Output output = data.get(index);
87 if (output == null || !shouldAppend) {
88 output = outputFactory.createOutput();
89 data.put(index, output);
90 }
91 return new InMemoryDataOutputWrapper(output);
92 }
93
94 @Override
95 public boolean dataExist(int threadId, DataIndex index) {
96 return data.containsKey(index);
97 }
98
99
100
101
102 public static class InMemoryDataOutputWrapper implements DataOutputWrapper {
103
104 private final BigDataOutput output;
105
106 private final long initialSize;
107
108
109
110
111
112
113 public InMemoryDataOutputWrapper(BigDataOutput output) {
114 this.output = output;
115 initialSize = output.getSize();
116 }
117
118 @Override
119 public DataOutput getDataOutput() {
120 return output;
121 }
122
123 @Override
124 public long finalizeOutput() {
125 return output.getSize() - initialSize;
126 }
127 }
128
129
130
131
132 public class InMemoryDataInputWrapper implements DataInputWrapper {
133
134 private final BigDataInput input;
135
136 private final DataIndex index;
137
138
139
140
141
142
143
144 public InMemoryDataInputWrapper(
145 BigDataInput input, DataIndex index) {
146 this.input = input;
147 this.index = index;
148 }
149
150 @Override
151 public DataInput getDataInput() {
152 return input;
153 }
154
155 @Override
156 public long finalizeInput(boolean deleteOnClose) {
157 if (deleteOnClose) {
158 data.remove(index).returnData();
159 }
160 return input.getPos();
161 }
162 }
163
164
165
166
167 private static class PooledBigDataOutputFactory {
168
169 public static final IntConfOption BYTE_ARRAY_POOL_SIZE =
170 new IntConfOption("giraph.inMemoryDataAccessor.poolSize", 1024,
171 "How big pool of byte arrays to keep");
172
173 public static final IntConfOption BYTE_ARRAY_SIZE =
174 new IntConfOption("giraph.inMemoryDataAccessor.byteArraySize", 1 << 21,
175 "How big byte arrays to make");
176
177
178 private final ImmutableClassesGiraphConfiguration conf;
179
180 private final LinkedBlockingDeque<byte[]> byteArrayPool;
181
182 private final int byteArraySize;
183
184
185
186
187
188
189 public PooledBigDataOutputFactory(
190 ImmutableClassesGiraphConfiguration conf) {
191 this.conf = conf;
192 byteArrayPool = new LinkedBlockingDeque<>(BYTE_ARRAY_POOL_SIZE.get(conf));
193 byteArraySize = BYTE_ARRAY_SIZE.get(conf);
194 }
195
196
197
198
199
200
201 public Output createOutput() {
202 return new Output(conf);
203 }
204
205
206
207
208 private class Output extends BigDataOutput {
209
210
211
212
213
214 public Output(ImmutableClassesGiraphConfiguration conf) {
215 super(conf);
216 }
217
218
219
220
221
222 protected void returnData() {
223 if (dataOutputs != null) {
224 for (ExtendedDataOutput dataOutput : dataOutputs) {
225 byteArrayPool.offer(dataOutput.getByteArray());
226 }
227 }
228 byteArrayPool.offer(currentDataOutput.getByteArray());
229 }
230
231 @Override
232 protected ExtendedDataOutput createOutput(int size) {
233 byte[] data = byteArrayPool.pollLast();
234 return conf.createExtendedDataOutput(
235 data == null ? new byte[byteArraySize] : data, 0);
236 }
237
238 @Override
239 protected int getMaxSize() {
240 return byteArraySize;
241 }
242 }
243 }
244 }