View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.cache;
28  
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.HashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
37  import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
38  import org.apache.hc.client5.http.cache.HttpCacheEntry;
39  import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
40  import org.apache.hc.client5.http.cache.HttpCacheStorageEntry;
41  import org.apache.hc.client5.http.cache.HttpCacheUpdateException;
42  import org.apache.hc.client5.http.cache.ResourceIOException;
43  import org.apache.hc.client5.http.impl.Operations;
44  import org.apache.hc.core5.concurrent.Cancellable;
45  import org.apache.hc.core5.concurrent.ComplexCancellable;
46  import org.apache.hc.core5.concurrent.FutureCallback;
47  import org.apache.hc.core5.util.Args;
48  
49  /**
50   * Abstract cache backend for serialized objects capable of CAS (compare-and-swap) updates.
51   *
52   * @since 5.0
53   */
54  public abstract class AbstractSerializingAsyncCacheStorage<T, CAS> implements HttpAsyncCacheStorage {
55  
56      private final int maxUpdateRetries;
57      private final HttpCacheEntrySerializer<T> serializer;
58  
59      public AbstractSerializingAsyncCacheStorage(final int maxUpdateRetries, final HttpCacheEntrySerializer<T> serializer) {
60          this.maxUpdateRetries = Args.notNegative(maxUpdateRetries, "Max retries");
61          this.serializer = Args.notNull(serializer, "Cache entry serializer");
62      }
63  
64      protected abstract String digestToStorageKey(String key);
65  
66      protected abstract T getStorageObject(CAS cas) throws ResourceIOException;
67  
68      protected abstract Cancellable store(String storageKey, T storageObject, FutureCallback<Boolean> callback);
69  
70      protected abstract Cancellable restore(String storageKey, FutureCallback<T> callback);
71  
72      protected abstract Cancellable getForUpdateCAS(String storageKey, FutureCallback<CAS> callback);
73  
74      protected abstract Cancellable updateCAS(String storageKey, CAS cas, T storageObject, FutureCallback<Boolean> callback);
75  
76      protected abstract Cancellable delete(String storageKey, FutureCallback<Boolean> callback);
77  
78      protected abstract Cancellable bulkRestore(Collection<String> storageKeys, FutureCallback<Map<String, T>> callback);
79  
80      @Override
81      public final Cancellable putEntry(
82              final String key, final HttpCacheEntry entry, final FutureCallback<Boolean> callback) {
83          Args.notNull(key, "Storage key");
84          Args.notNull(callback, "Callback");
85          try {
86              final String storageKey = digestToStorageKey(key);
87              final T storageObject = serializer.serialize(new HttpCacheStorageEntry(key, entry));
88              return store(storageKey, storageObject, callback);
89          } catch (final Exception ex) {
90              callback.failed(ex);
91              return Operations.nonCancellable();
92          }
93      }
94  
95      @Override
96      public final Cancellable getEntry(final String key, final FutureCallback<HttpCacheEntry> callback) {
97          Args.notNull(key, "Storage key");
98          Args.notNull(callback, "Callback");
99          try {
100             final String storageKey = digestToStorageKey(key);
101             return restore(storageKey, new FutureCallback<T>() {
102 
103                 @Override
104                 public void completed(final T storageObject) {
105                     try {
106                         if (storageObject != null) {
107                             final HttpCacheStorageEntry entry = serializer.deserialize(storageObject);
108                             if (key.equals(entry.getKey())) {
109                                 callback.completed(entry.getContent());
110                             } else {
111                                 callback.completed(null);
112                             }
113                         } else {
114                             callback.completed(null);
115                         }
116                     } catch (final Exception ex) {
117                         callback.failed(ex);
118                     }
119                 }
120 
121                 @Override
122                 public void failed(final Exception ex) {
123                     callback.failed(ex);
124                 }
125 
126                 @Override
127                 public void cancelled() {
128                     callback.cancelled();
129                 }
130 
131             });
132         } catch (final Exception ex) {
133             callback.failed(ex);
134             return Operations.nonCancellable();
135         }
136     }
137 
138     @Override
139     public final Cancellable removeEntry(final String key, final FutureCallback<Boolean> callback) {
140         Args.notNull(key, "Storage key");
141         Args.notNull(callback, "Callback");
142         try {
143             final String storageKey = digestToStorageKey(key);
144             return delete(storageKey, callback);
145         } catch (final Exception ex) {
146             callback.failed(ex);
147             return Operations.nonCancellable();
148         }
149     }
150 
151     @Override
152     public final Cancellable updateEntry(
153             final String key, final HttpCacheCASOperation casOperation, final FutureCallback<Boolean> callback) {
154         Args.notNull(key, "Storage key");
155         Args.notNull(casOperation, "CAS operation");
156         Args.notNull(callback, "Callback");
157         final ComplexCancellable complexCancellable = new ComplexCancellable();
158         final AtomicInteger count = new AtomicInteger(0);
159         attemptUpdateEntry(key, casOperation, complexCancellable, count, callback);
160         return complexCancellable;
161     }
162 
163     private void attemptUpdateEntry(
164             final String key,
165             final HttpCacheCASOperation casOperation,
166             final ComplexCancellable complexCancellable,
167             final AtomicInteger count,
168             final FutureCallback<Boolean> callback) {
169         try {
170             final String storageKey = digestToStorageKey(key);
171             complexCancellable.setDependency(getForUpdateCAS(storageKey, new FutureCallback<CAS>() {
172 
173                 @Override
174                 public void completed(final CAS cas) {
175                     try {
176                         HttpCacheStorageEntry storageEntry = cas != null ? serializer.deserialize(getStorageObject(cas)) : null;
177                         if (storageEntry != null && !key.equals(storageEntry.getKey())) {
178                             storageEntry = null;
179                         }
180                         final HttpCacheEntry existingEntry = storageEntry != null ? storageEntry.getContent() : null;
181                         final HttpCacheEntry updatedEntry = casOperation.execute(existingEntry);
182                         if (existingEntry == null) {
183                             putEntry(key, updatedEntry, callback);
184                         } else {
185                             final T storageObject = serializer.serialize(new HttpCacheStorageEntry(key, updatedEntry));
186                             complexCancellable.setDependency(updateCAS(storageKey, cas, storageObject, new FutureCallback<Boolean>() {
187 
188                                 @Override
189                                 public void completed(final Boolean result) {
190                                     if (result.booleanValue()) {
191                                         callback.completed(result);
192                                     } else {
193                                         if (!complexCancellable.isCancelled()) {
194                                             final int numRetries = count.incrementAndGet();
195                                             if (numRetries >= maxUpdateRetries) {
196                                                 callback.failed(new HttpCacheUpdateException("Cache update failed after " + numRetries + " retries"));
197                                             } else {
198                                                 attemptUpdateEntry(key, casOperation, complexCancellable, count, callback);
199                                             }
200                                         }
201                                     }
202                                 }
203 
204                                 @Override
205                                 public void failed(final Exception ex) {
206                                     callback.failed(ex);
207                                 }
208 
209                                 @Override
210                                 public void cancelled() {
211                                     callback.cancelled();
212                                 }
213 
214                             }));
215                         }
216                     } catch (final Exception ex) {
217                         callback.failed(ex);
218                     }
219                 }
220 
221                 @Override
222                 public void failed(final Exception ex) {
223                     callback.failed(ex);
224                 }
225 
226                 @Override
227                 public void cancelled() {
228                     callback.cancelled();
229                 }
230 
231             }));
232         } catch (final Exception ex) {
233             callback.failed(ex);
234         }
235     }
236 
237     @Override
238     public final Cancellable getEntries(final Collection<String> keys, final FutureCallback<Map<String, HttpCacheEntry>> callback) {
239         Args.notNull(keys, "Storage keys");
240         Args.notNull(callback, "Callback");
241         try {
242             final List<String> storageKeys = new ArrayList<>(keys.size());
243             for (final String key: keys) {
244                 storageKeys.add(digestToStorageKey(key));
245             }
246             return bulkRestore(storageKeys, new FutureCallback<Map<String, T>>() {
247 
248                 @Override
249                 public void completed(final Map<String, T> storageObjectMap) {
250                     try {
251                         final Map<String, HttpCacheEntry> resultMap = new HashMap<>();
252                         for (final String key: keys) {
253                             final String storageKey = digestToStorageKey(key);
254                             final T storageObject = storageObjectMap.get(storageKey);
255                             if (storageObject != null) {
256                                 final HttpCacheStorageEntry entry = serializer.deserialize(storageObject);
257                                 if (key.equals(entry.getKey())) {
258                                     resultMap.put(key, entry.getContent());
259                                 }
260                             }
261                         }
262                         callback.completed(resultMap);
263                     } catch (final Exception ex) {
264                         callback.failed(ex);
265                     }
266                 }
267 
268                 @Override
269                 public void failed(final Exception ex) {
270                     callback.failed(ex);
271                 }
272 
273                 @Override
274                 public void cancelled() {
275                     callback.cancelled();
276                 }
277 
278             });
279         } catch (final Exception ex) {
280             callback.failed(ex);
281             return Operations.nonCancellable();
282         }
283     }
284 
285 }