1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.cache.memcached;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.util.Collection;
32 import java.util.HashMap;
33 import java.util.Map;
34 import java.util.concurrent.CancellationException;
35 import java.util.concurrent.ExecutionException;
36
37 import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
38 import org.apache.hc.client5.http.cache.ResourceIOException;
39 import org.apache.hc.client5.http.impl.Operations;
40 import org.apache.hc.client5.http.impl.cache.AbstractBinaryAsyncCacheStorage;
41 import org.apache.hc.client5.http.impl.cache.CacheConfig;
42 import org.apache.hc.client5.http.impl.cache.HttpByteArrayCacheEntrySerializer;
43 import org.apache.hc.core5.concurrent.Cancellable;
44 import org.apache.hc.core5.concurrent.FutureCallback;
45 import org.apache.hc.core5.util.Args;
46
47 import net.spy.memcached.CASResponse;
48 import net.spy.memcached.CASValue;
49 import net.spy.memcached.MemcachedClient;
50 import net.spy.memcached.internal.BulkFuture;
51 import net.spy.memcached.internal.GetFuture;
52 import net.spy.memcached.internal.OperationFuture;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 public class MemcachedHttpAsyncCacheStorage extends AbstractBinaryAsyncCacheStorage<CASValue<Object>> {
93
94 private final MemcachedClient client;
95 private final KeyHashingScheme keyHashingScheme;
96
97
98
99
100
101
102
103
104
105 public MemcachedHttpAsyncCacheStorage(final InetSocketAddress address) throws IOException {
106 this(new MemcachedClient(address));
107 }
108
109
110
111
112
113
114 public MemcachedHttpAsyncCacheStorage(final MemcachedClient cache) {
115 this(cache, CacheConfig.DEFAULT, HttpByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE);
116 }
117
118
119
120
121
122
123
124
125
126
127
128 public MemcachedHttpAsyncCacheStorage(
129 final MemcachedClient client,
130 final CacheConfig config,
131 final HttpCacheEntrySerializer<byte[]> serializer,
132 final KeyHashingScheme keyHashingScheme) {
133 super((config != null ? config : CacheConfig.DEFAULT).getMaxUpdateRetries(),
134 serializer != null ? serializer : HttpByteArrayCacheEntrySerializer.INSTANCE);
135 this.client = Args.notNull(client, "Memcached client");
136 this.keyHashingScheme = keyHashingScheme;
137 }
138
139 @Override
140 protected String digestToStorageKey(final String key) {
141 return keyHashingScheme.hash(key);
142 }
143
144 private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException {
145 if (storageObject == null) {
146 return null;
147 }
148 if (storageObject instanceof byte[]) {
149 return (byte[]) storageObject;
150 }
151 throw new ResourceIOException("Unexpected cache content: " + storageObject.getClass());
152 }
153
154 @Override
155 protected byte[] getStorageObject(final CASValue<Object> casValue) throws ResourceIOException {
156 return castAsByteArray(casValue.getValue());
157 }
158
159 private <T> Cancellable operation(final OperationFuture<T> operationFuture, final FutureCallback<T> callback) {
160 operationFuture.addListener(future -> {
161 try {
162 callback.completed(operationFuture.get());
163 } catch (final ExecutionException ex) {
164 if (ex.getCause() instanceof Exception) {
165 if (ex.getCause() instanceof CancellationException) {
166 callback.failed(new MemcachedOperationCancellationException(ex.getCause()));
167 } else {
168 callback.failed((Exception) ex.getCause());
169 }
170 } else {
171 callback.failed(ex);
172 }
173 }
174 });
175 return Operations.cancellable(operationFuture);
176 }
177
178 @Override
179 protected Cancellable store(final String storageKey, final byte[] storageObject, final FutureCallback<Boolean> callback) {
180 return operation(client.set(storageKey, 0, storageObject), callback);
181 }
182
183 @Override
184 protected Cancellable restore(final String storageKey, final FutureCallback<byte[]> callback) {
185 final GetFuture<Object> getFuture = client.asyncGet(storageKey);
186 getFuture.addListener(future -> {
187 try {
188 callback.completed(castAsByteArray(getFuture.get()));
189 } catch (final ExecutionException ex) {
190 if (ex.getCause() instanceof Exception) {
191 if (ex.getCause() instanceof CancellationException) {
192 callback.failed(new MemcachedOperationCancellationException(ex.getCause()));
193 } else {
194 callback.failed((Exception) ex.getCause());
195 }
196 } else {
197 callback.failed(ex);
198 }
199 }
200 });
201 return Operations.cancellable(getFuture);
202 }
203
204 @Override
205 protected Cancellable getForUpdateCAS(final String storageKey, final FutureCallback<CASValue<Object>> callback) {
206 return operation(client.asyncGets(storageKey), callback);
207 }
208
209 @Override
210 protected Cancellable updateCAS(
211 final String storageKey, final CASValue<Object> casValue, final byte[] storageObject, final FutureCallback<Boolean> callback) {
212 return operation(client.asyncCAS(storageKey, casValue.getCas(), storageObject), new FutureCallback<CASResponse>() {
213
214 @Override
215 public void completed(final CASResponse result) {
216 callback.completed(result == CASResponse.OK);
217 }
218
219 @Override
220 public void failed(final Exception ex) {
221 callback.failed(ex);
222 }
223
224 @Override
225 public void cancelled() {
226 callback.cancelled();
227 }
228
229 });
230 }
231
232 @Override
233 protected Cancellable delete(final String storageKey, final FutureCallback<Boolean> callback) {
234 return operation(client.delete(storageKey), callback);
235 }
236
237 @Override
238 protected Cancellable bulkRestore(final Collection<String> storageKeys, final FutureCallback<Map<String, byte[]>> callback) {
239 final BulkFuture<Map<String, Object>> future = client.asyncGetBulk(storageKeys);
240 future.addListener(future1 -> {
241 final Map<String, ?> storageObjectMap = future1.get();
242 final Map<String, byte[]> resultMap = new HashMap<>(storageObjectMap.size());
243 for (final Map.Entry<String, ?> resultEntry: storageObjectMap.entrySet()) {
244 resultMap.put(resultEntry.getKey(), castAsByteArray(resultEntry.getValue()));
245 }
246 callback.completed(resultMap);
247 });
248 return Operations.cancellable(future);
249 }
250
251 }