1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.cache;
28
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.atomic.AtomicInteger;
35
36 import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
37 import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
38 import org.apache.hc.client5.http.cache.HttpCacheEntry;
39 import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
40 import org.apache.hc.client5.http.cache.HttpCacheStorageEntry;
41 import org.apache.hc.client5.http.cache.HttpCacheUpdateException;
42 import org.apache.hc.client5.http.cache.ResourceIOException;
43 import org.apache.hc.client5.http.impl.Operations;
44 import org.apache.hc.core5.concurrent.Cancellable;
45 import org.apache.hc.core5.concurrent.ComplexCancellable;
46 import org.apache.hc.core5.concurrent.FutureCallback;
47 import org.apache.hc.core5.util.Args;
48
49
50
51
52
53
54 public abstract class AbstractSerializingAsyncCacheStorage<T, CAS> implements HttpAsyncCacheStorage {
55
56 private final int maxUpdateRetries;
57 private final HttpCacheEntrySerializer<T> serializer;
58
59 public AbstractSerializingAsyncCacheStorage(final int maxUpdateRetries, final HttpCacheEntrySerializer<T> serializer) {
60 this.maxUpdateRetries = Args.notNegative(maxUpdateRetries, "Max retries");
61 this.serializer = Args.notNull(serializer, "Cache entry serializer");
62 }
63
64 protected abstract String digestToStorageKey(String key);
65
66 protected abstract T getStorageObject(CAS cas) throws ResourceIOException;
67
68 protected abstract Cancellable store(String storageKey, T storageObject, FutureCallback<Boolean> callback);
69
70 protected abstract Cancellable restore(String storageKey, FutureCallback<T> callback);
71
72 protected abstract Cancellable getForUpdateCAS(String storageKey, FutureCallback<CAS> callback);
73
74 protected abstract Cancellable updateCAS(String storageKey, CAS cas, T storageObject, FutureCallback<Boolean> callback);
75
76 protected abstract Cancellable delete(String storageKey, FutureCallback<Boolean> callback);
77
78 protected abstract Cancellable bulkRestore(Collection<String> storageKeys, FutureCallback<Map<String, T>> callback);
79
80 @Override
81 public final Cancellable putEntry(
82 final String key, final HttpCacheEntry entry, final FutureCallback<Boolean> callback) {
83 Args.notNull(key, "Storage key");
84 Args.notNull(callback, "Callback");
85 try {
86 final String storageKey = digestToStorageKey(key);
87 final T storageObject = serializer.serialize(new HttpCacheStorageEntry(key, entry));
88 return store(storageKey, storageObject, callback);
89 } catch (final Exception ex) {
90 callback.failed(ex);
91 return Operations.nonCancellable();
92 }
93 }
94
95 @Override
96 public final Cancellable getEntry(final String key, final FutureCallback<HttpCacheEntry> callback) {
97 Args.notNull(key, "Storage key");
98 Args.notNull(callback, "Callback");
99 try {
100 final String storageKey = digestToStorageKey(key);
101 return restore(storageKey, new FutureCallback<T>() {
102
103 @Override
104 public void completed(final T storageObject) {
105 try {
106 if (storageObject != null) {
107 final HttpCacheStorageEntry entry = serializer.deserialize(storageObject);
108 if (key.equals(entry.getKey())) {
109 callback.completed(entry.getContent());
110 } else {
111 callback.completed(null);
112 }
113 } else {
114 callback.completed(null);
115 }
116 } catch (final Exception ex) {
117 callback.failed(ex);
118 }
119 }
120
121 @Override
122 public void failed(final Exception ex) {
123 callback.failed(ex);
124 }
125
126 @Override
127 public void cancelled() {
128 callback.cancelled();
129 }
130
131 });
132 } catch (final Exception ex) {
133 callback.failed(ex);
134 return Operations.nonCancellable();
135 }
136 }
137
138 @Override
139 public final Cancellable removeEntry(final String key, final FutureCallback<Boolean> callback) {
140 Args.notNull(key, "Storage key");
141 Args.notNull(callback, "Callback");
142 try {
143 final String storageKey = digestToStorageKey(key);
144 return delete(storageKey, callback);
145 } catch (final Exception ex) {
146 callback.failed(ex);
147 return Operations.nonCancellable();
148 }
149 }
150
151 @Override
152 public final Cancellable updateEntry(
153 final String key, final HttpCacheCASOperation casOperation, final FutureCallback<Boolean> callback) {
154 Args.notNull(key, "Storage key");
155 Args.notNull(casOperation, "CAS operation");
156 Args.notNull(callback, "Callback");
157 final ComplexCancellable complexCancellable = new ComplexCancellable();
158 final AtomicInteger count = new AtomicInteger(0);
159 atemmptUpdateEntry(key, casOperation, complexCancellable, count, callback);
160 return complexCancellable;
161 }
162
163 private void atemmptUpdateEntry(
164 final String key,
165 final HttpCacheCASOperation casOperation,
166 final ComplexCancellable complexCancellable,
167 final AtomicInteger count,
168 final FutureCallback<Boolean> callback) {
169 try {
170 final String storageKey = digestToStorageKey(key);
171 complexCancellable.setDependency(getForUpdateCAS(storageKey, new FutureCallback<CAS>() {
172
173 @Override
174 public void completed(final CAS cas) {
175 try {
176 HttpCacheStorageEntry storageEntry = cas != null ? serializer.deserialize(getStorageObject(cas)) : null;
177 if (storageEntry != null && !key.equals(storageEntry.getKey())) {
178 storageEntry = null;
179 }
180 final HttpCacheEntry existingEntry = storageEntry != null ? storageEntry.getContent() : null;
181 final HttpCacheEntry updatedEntry = casOperation.execute(existingEntry);
182 if (existingEntry == null) {
183 putEntry(key, updatedEntry, callback);
184 } else {
185 final T storageObject = serializer.serialize(new HttpCacheStorageEntry(key, updatedEntry));
186 complexCancellable.setDependency(updateCAS(storageKey, cas, storageObject, new FutureCallback<Boolean>() {
187
188 @Override
189 public void completed(final Boolean result) {
190 if (result) {
191 callback.completed(result);
192 } else {
193 if (!complexCancellable.isCancelled()) {
194 final int numRetries = count.incrementAndGet();
195 if (numRetries >= maxUpdateRetries) {
196 callback.failed(new HttpCacheUpdateException("Cache update failed after " + numRetries + " retries"));
197 } else {
198 atemmptUpdateEntry(key, casOperation, complexCancellable, count, callback);
199 }
200 }
201 }
202 }
203
204 @Override
205 public void failed(final Exception ex) {
206 callback.failed(ex);
207 }
208
209 @Override
210 public void cancelled() {
211 callback.cancelled();
212 }
213
214 }));
215 }
216 } catch (final Exception ex) {
217 callback.failed(ex);
218 }
219 }
220
221 @Override
222 public void failed(final Exception ex) {
223 callback.failed(ex);
224 }
225
226 @Override
227 public void cancelled() {
228 callback.cancelled();
229 }
230
231 }));
232 } catch (final Exception ex) {
233 callback.failed(ex);
234 }
235 }
236
237 @Override
238 public final Cancellable getEntries(final Collection<String> keys, final FutureCallback<Map<String, HttpCacheEntry>> callback) {
239 Args.notNull(keys, "Storage keys");
240 Args.notNull(callback, "Callback");
241 try {
242 final List<String> storageKeys = new ArrayList<>(keys.size());
243 for (final String key: keys) {
244 storageKeys.add(digestToStorageKey(key));
245 }
246 return bulkRestore(storageKeys, new FutureCallback<Map<String, T>>() {
247
248 @Override
249 public void completed(final Map<String, T> storageObjectMap) {
250 try {
251 final Map<String, HttpCacheEntry> resultMap = new HashMap<>();
252 for (final String key: keys) {
253 final String storageKey = digestToStorageKey(key);
254 final T storageObject = storageObjectMap.get(storageKey);
255 if (storageObject != null) {
256 final HttpCacheStorageEntry entry = serializer.deserialize(storageObject);
257 if (key.equals(entry.getKey())) {
258 resultMap.put(key, entry.getContent());
259 }
260 }
261 }
262 callback.completed(resultMap);
263 } catch (final Exception ex) {
264 callback.failed(ex);
265 }
266 }
267
268 @Override
269 public void failed(final Exception ex) {
270 callback.failed(ex);
271 }
272
273 @Override
274 public void cancelled() {
275 callback.cancelled();
276 }
277
278 });
279 } catch (final Exception ex) {
280 callback.failed(ex);
281 return Operations.nonCancellable();
282 }
283 }
284
285 }