1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.cache.memcached;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.util.Collection;
32 import java.util.HashMap;
33 import java.util.Map;
34 import java.util.concurrent.ExecutionException;
35
36 import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
37 import org.apache.hc.client5.http.cache.ResourceIOException;
38 import org.apache.hc.client5.http.impl.Operations;
39 import org.apache.hc.client5.http.impl.cache.AbstractBinaryAsyncCacheStorage;
40 import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer;
41 import org.apache.hc.client5.http.impl.cache.CacheConfig;
42 import org.apache.hc.core5.concurrent.Cancellable;
43 import org.apache.hc.core5.concurrent.FutureCallback;
44 import org.apache.hc.core5.util.Args;
45
46 import net.spy.memcached.CASResponse;
47 import net.spy.memcached.CASValue;
48 import net.spy.memcached.MemcachedClient;
49 import net.spy.memcached.internal.BulkFuture;
50 import net.spy.memcached.internal.GetFuture;
51 import net.spy.memcached.internal.OperationFuture;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 public class MemcachedHttpAsyncCacheStorage extends AbstractBinaryAsyncCacheStorage<CASValue<Object>> {
92
93 private final MemcachedClient client;
94 private final KeyHashingScheme keyHashingScheme;
95
96
97
98
99
100
101
102
103
104 public MemcachedHttpAsyncCacheStorage(final InetSocketAddress address) throws IOException {
105 this(new MemcachedClient(address));
106 }
107
108
109
110
111
112
113 public MemcachedHttpAsyncCacheStorage(final MemcachedClient cache) {
114 this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE);
115 }
116
117
118
119
120
121
122
123
124
125
126
127 public MemcachedHttpAsyncCacheStorage(
128 final MemcachedClient client,
129 final CacheConfig config,
130 final HttpCacheEntrySerializer<byte[]> serializer,
131 final KeyHashingScheme keyHashingScheme) {
132 super((config != null ? config : CacheConfig.DEFAULT).getMaxUpdateRetries(),
133 serializer != null ? serializer : ByteArrayCacheEntrySerializer.INSTANCE);
134 this.client = Args.notNull(client, "Memcached client");
135 this.keyHashingScheme = keyHashingScheme;
136 }
137
138 @Override
139 protected String digestToStorageKey(final String key) {
140 return keyHashingScheme.hash(key);
141 }
142
143 private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException {
144 if (storageObject == null) {
145 return null;
146 }
147 if (storageObject instanceof byte[]) {
148 return (byte[]) storageObject;
149 }
150 throw new ResourceIOException("Unexpected cache content: " + storageObject.getClass());
151 }
152
153 @Override
154 protected byte[] getStorageObject(final CASValue<Object> casValue) throws ResourceIOException {
155 return castAsByteArray(casValue.getValue());
156 }
157
158 private <T> Cancellable operation(final OperationFuture<T> operationFuture, final FutureCallback<T> callback) {
159 operationFuture.addListener(future -> {
160 try {
161 callback.completed(operationFuture.get());
162 } catch (final ExecutionException ex) {
163 if (ex.getCause() instanceof Exception) {
164 callback.failed((Exception) ex.getCause());
165 } else {
166 callback.failed(ex);
167 }
168 }
169 });
170 return Operations.cancellable(operationFuture);
171 }
172
173 @Override
174 protected Cancellable store(final String storageKey, final byte[] storageObject, final FutureCallback<Boolean> callback) {
175 return operation(client.set(storageKey, 0, storageObject), callback);
176 }
177
178 @Override
179 protected Cancellable restore(final String storageKey, final FutureCallback<byte[]> callback) {
180 final GetFuture<Object> getFuture = client.asyncGet(storageKey);
181 getFuture.addListener(future -> {
182 try {
183 callback.completed(castAsByteArray(getFuture.get()));
184 } catch (final ExecutionException ex) {
185 if (ex.getCause() instanceof Exception) {
186 callback.failed((Exception) ex.getCause());
187 } else {
188 callback.failed(ex);
189 }
190 }
191 });
192 return Operations.cancellable(getFuture);
193 }
194
195 @Override
196 protected Cancellable getForUpdateCAS(final String storageKey, final FutureCallback<CASValue<Object>> callback) {
197 return operation(client.asyncGets(storageKey), callback);
198 }
199
200 @Override
201 protected Cancellable updateCAS(
202 final String storageKey, final CASValue<Object> casValue, final byte[] storageObject, final FutureCallback<Boolean> callback) {
203 return operation(client.asyncCAS(storageKey, casValue.getCas(), storageObject), new FutureCallback<CASResponse>() {
204
205 @Override
206 public void completed(final CASResponse result) {
207 callback.completed(result == CASResponse.OK);
208 }
209
210 @Override
211 public void failed(final Exception ex) {
212 callback.failed(ex);
213 }
214
215 @Override
216 public void cancelled() {
217 callback.cancelled();
218 }
219
220 });
221 }
222
223 @Override
224 protected Cancellable delete(final String storageKey, final FutureCallback<Boolean> callback) {
225 return operation(client.delete(storageKey), callback);
226 }
227
228 @Override
229 protected Cancellable bulkRestore(final Collection<String> storageKeys, final FutureCallback<Map<String, byte[]>> callback) {
230 final BulkFuture<Map<String, Object>> future = client.asyncGetBulk(storageKeys);
231 future.addListener(future1 -> {
232 final Map<String, ?> storageObjectMap = future1.get();
233 final Map<String, byte[]> resultMap = new HashMap<>(storageObjectMap.size());
234 for (final Map.Entry<String, ?> resultEntry: storageObjectMap.entrySet()) {
235 resultMap.put(resultEntry.getKey(), castAsByteArray(resultEntry.getValue()));
236 }
237 callback.completed(resultMap);
238 });
239 return Operations.cancellable(future);
240 }
241
242 }