1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.cache.memcached;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.util.Collection;
32 import java.util.HashMap;
33 import java.util.Map;
34 import java.util.concurrent.ExecutionException;
35
36 import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
37 import org.apache.hc.client5.http.cache.ResourceIOException;
38 import org.apache.hc.client5.http.impl.Operations;
39 import org.apache.hc.client5.http.impl.cache.AbstractBinaryAsyncCacheStorage;
40 import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer;
41 import org.apache.hc.client5.http.impl.cache.CacheConfig;
42 import org.apache.hc.core5.concurrent.Cancellable;
43 import org.apache.hc.core5.concurrent.FutureCallback;
44 import org.apache.hc.core5.util.Args;
45
46 import net.spy.memcached.CASResponse;
47 import net.spy.memcached.CASValue;
48 import net.spy.memcached.MemcachedClient;
49 import net.spy.memcached.internal.BulkFuture;
50 import net.spy.memcached.internal.BulkGetCompletionListener;
51 import net.spy.memcached.internal.BulkGetFuture;
52 import net.spy.memcached.internal.GetCompletionListener;
53 import net.spy.memcached.internal.GetFuture;
54 import net.spy.memcached.internal.OperationCompletionListener;
55 import net.spy.memcached.internal.OperationFuture;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 public class MemcachedHttpAsyncCacheStorage extends AbstractBinaryAsyncCacheStorage<CASValue<Object>> {
96
97 private final MemcachedClient client;
98 private final KeyHashingScheme keyHashingScheme;
99
100
101
102
103
104
105
106
107
108 public MemcachedHttpAsyncCacheStorage(final InetSocketAddress address) throws IOException {
109 this(new MemcachedClient(address));
110 }
111
112
113
114
115
116
117 public MemcachedHttpAsyncCacheStorage(final MemcachedClient cache) {
118 this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE);
119 }
120
121
122
123
124
125
126
127
128
129
130
131 public MemcachedHttpAsyncCacheStorage(
132 final MemcachedClient client,
133 final CacheConfig config,
134 final HttpCacheEntrySerializer<byte[]> serializer,
135 final KeyHashingScheme keyHashingScheme) {
136 super((config != null ? config : CacheConfig.DEFAULT).getMaxUpdateRetries(),
137 serializer != null ? serializer : ByteArrayCacheEntrySerializer.INSTANCE);
138 this.client = Args.notNull(client, "Memcached client");
139 this.keyHashingScheme = keyHashingScheme;
140 }
141
142 @Override
143 protected String digestToStorageKey(final String key) {
144 return keyHashingScheme.hash(key);
145 }
146
147 private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException {
148 if (storageObject == null) {
149 return null;
150 }
151 if (storageObject instanceof byte[]) {
152 return (byte[]) storageObject;
153 }
154 throw new ResourceIOException("Unexpected cache content: " + storageObject.getClass());
155 }
156
157 @Override
158 protected byte[] getStorageObject(final CASValue<Object> casValue) throws ResourceIOException {
159 return castAsByteArray(casValue.getValue());
160 }
161
162 private <T> Cancellable operation(final OperationFuture<T> operationFuture, final FutureCallback<T> callback) {
163 operationFuture.addListener(new OperationCompletionListener() {
164
165 @Override
166 public void onComplete(final OperationFuture<?> future) throws Exception {
167 try {
168 callback.completed(operationFuture.get());
169 } catch (final ExecutionException ex) {
170 if (ex.getCause() instanceof Exception) {
171 callback.failed((Exception) ex.getCause());
172 } else {
173 callback.failed(ex);
174 }
175 }
176 }
177
178 });
179 return Operations.cancellable(operationFuture);
180 }
181
182 @Override
183 protected Cancellable store(final String storageKey, final byte[] storageObject, final FutureCallback<Boolean> callback) {
184 return operation(client.set(storageKey, 0, storageObject), callback);
185 }
186
187 @Override
188 protected Cancellable restore(final String storageKey, final FutureCallback<byte[]> callback) {
189 final GetFuture<Object> getFuture = client.asyncGet(storageKey);
190 getFuture.addListener(new GetCompletionListener() {
191
192 @Override
193 public void onComplete(final GetFuture<?> future) throws Exception {
194 try {
195 callback.completed(castAsByteArray(getFuture.get()));
196 } catch (final ExecutionException ex) {
197 if (ex.getCause() instanceof Exception) {
198 callback.failed((Exception) ex.getCause());
199 } else {
200 callback.failed(ex);
201 }
202 }
203 }
204
205 });
206 return Operations.cancellable(getFuture);
207 }
208
209 @Override
210 protected Cancellable getForUpdateCAS(final String storageKey, final FutureCallback<CASValue<Object>> callback) {
211 return operation(client.asyncGets(storageKey), callback);
212 }
213
214 @Override
215 protected Cancellable updateCAS(
216 final String storageKey, final CASValue<Object> casValue, final byte[] storageObject, final FutureCallback<Boolean> callback) {
217 return operation(client.asyncCAS(storageKey, casValue.getCas(), storageObject), new FutureCallback<CASResponse>() {
218
219 @Override
220 public void completed(final CASResponse result) {
221 callback.completed(result == CASResponse.OK);
222 }
223
224 @Override
225 public void failed(final Exception ex) {
226 callback.failed(ex);
227 }
228
229 @Override
230 public void cancelled() {
231 callback.cancelled();
232 }
233
234 });
235 }
236
237 @Override
238 protected Cancellable delete(final String storageKey, final FutureCallback<Boolean> callback) {
239 return operation(client.delete(storageKey), callback);
240 }
241
242 @Override
243 protected Cancellable bulkRestore(final Collection<String> storageKeys, final FutureCallback<Map<String, byte[]>> callback) {
244 final BulkFuture<Map<String, Object>> future = client.asyncGetBulk(storageKeys);
245 future.addListener(new BulkGetCompletionListener() {
246
247 @Override
248 public void onComplete(final BulkGetFuture<?> future) throws Exception {
249 final Map<String, ?> storageObjectMap = future.get();
250 final Map<String, byte[]> resultMap = new HashMap<>(storageObjectMap.size());
251 for (final Map.Entry<String, ?> resultEntry: storageObjectMap.entrySet()) {
252 resultMap.put(resultEntry.getKey(), castAsByteArray(resultEntry.getValue()));
253 }
254 callback.completed(resultMap);
255 }
256 });
257 return Operations.cancellable(future);
258 }
259
260 }