View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.cache.memcached;
28  
29  import java.io.IOException;
30  import java.net.InetSocketAddress;
31  import java.util.Collection;
32  import java.util.HashMap;
33  import java.util.Map;
34  import java.util.concurrent.CancellationException;
35  import java.util.concurrent.ExecutionException;
36  
37  import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
38  import org.apache.hc.client5.http.cache.ResourceIOException;
39  import org.apache.hc.client5.http.impl.Operations;
40  import org.apache.hc.client5.http.impl.cache.AbstractBinaryAsyncCacheStorage;
41  import org.apache.hc.client5.http.impl.cache.CacheConfig;
42  import org.apache.hc.client5.http.impl.cache.HttpByteArrayCacheEntrySerializer;
43  import org.apache.hc.core5.concurrent.Cancellable;
44  import org.apache.hc.core5.concurrent.FutureCallback;
45  import org.apache.hc.core5.util.Args;
46  
47  import net.spy.memcached.CASResponse;
48  import net.spy.memcached.CASValue;
49  import net.spy.memcached.MemcachedClient;
50  import net.spy.memcached.internal.BulkFuture;
51  import net.spy.memcached.internal.GetFuture;
52  import net.spy.memcached.internal.OperationFuture;
53  
54  /**
55   * <p>
56   * This class is a storage backend that uses an external <i>memcached</i>
57   * for storing cached origin responses. This storage option provides a
58   * couple of interesting advantages over the default in-memory storage
59   * backend:
60   * </p>
61   * <ol>
62   * <li>in-memory cached objects can survive an application restart since
63   * they are held in a separate process</li>
64   * <li>it becomes possible for several cooperating applications to share
65   * a large <i>memcached</i> farm together</li>
66   * </ol>
67   * <p>
68   * Note that in a shared memcached pool setting you may wish to make use
69   * of the Ketama consistent hashing algorithm to reduce the number of
70   * cache misses that might result if one of the memcached cluster members
71   * fails (see the <a href="http://dustin.github.com/java-memcached-client/apidocs/net/spy/memcached/KetamaConnectionFactory.html">
72   * KetamaConnectionFactory</a>).
73   * </p>
74   * <p>
75   * Because memcached places limits on the size of its keys, we need to
76   * introduce a key hashing scheme to map the annotated URLs the higher-level
77   * caching HTTP client wants to use as keys onto ones that are suitable
78   * for use with memcached. Please see {@link KeyHashingScheme} if you would
79   * like to use something other than the provided {@link SHA256KeyHashingScheme}.
80   * </p>
81   *
82   * <p>
83   * Please refer to the <a href="http://code.google.com/p/memcached/wiki/NewStart">
84   * memcached documentation</a> and in particular to the documentation for
85   * the <a href="http://code.google.com/p/spymemcached/">spymemcached
86   * documentation</a> for details about how to set up and configure memcached
87   * and the Java client used here, respectively.
88   * </p>
89   *
90   * @since 5.0
91   */
92  public class MemcachedHttpAsyncCacheStorage extends AbstractBinaryAsyncCacheStorage<CASValue<Object>> {
93  
94      private final MemcachedClient client;
95      private final KeyHashingScheme keyHashingScheme;
96  
97      /**
98       * Create a storage backend talking to a <i>memcached</i> instance
99       * listening on the specified host and port. This is useful if you
100      * just have a single local memcached instance running on the same
101      * machine as your application, for example.
102      * @param address where the <i>memcached</i> daemon is running
103      * @throws IOException in case of an error
104      */
105     public MemcachedHttpAsyncCacheStorage(final InetSocketAddress address) throws IOException {
106         this(new MemcachedClient(address));
107     }
108 
109     /**
110      * Create a storage backend using the pre-configured given
111      * <i>memcached</i> client.
112      * @param cache client to use for communicating with <i>memcached</i>
113      */
114     public MemcachedHttpAsyncCacheStorage(final MemcachedClient cache) {
115         this(cache, CacheConfig.DEFAULT, HttpByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE);
116     }
117 
118     /**
119      * Create a storage backend using the given <i>memcached</i> client and
120      * applying the given cache configuration, serialization, and hashing
121      * mechanisms.
122      * @param client how to talk to <i>memcached</i>
123      * @param config apply HTTP cache-related options
124      * @param serializer alternative serialization mechanism
125      * @param keyHashingScheme how to map higher-level logical "storage keys"
126      *   onto "cache keys" suitable for use with memcached
127      */
128     public MemcachedHttpAsyncCacheStorage(
129             final MemcachedClient client,
130             final CacheConfig config,
131             final HttpCacheEntrySerializer<byte[]> serializer,
132             final KeyHashingScheme keyHashingScheme) {
133         super((config != null ? config : CacheConfig.DEFAULT).getMaxUpdateRetries(),
134                 serializer != null ? serializer : HttpByteArrayCacheEntrySerializer.INSTANCE);
135         this.client = Args.notNull(client, "Memcached client");
136         this.keyHashingScheme = keyHashingScheme;
137     }
138 
139     @Override
140     protected String digestToStorageKey(final String key) {
141         return keyHashingScheme.hash(key);
142     }
143 
144     private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException {
145         if (storageObject == null) {
146             return null;
147         }
148         if (storageObject instanceof byte[]) {
149             return (byte[]) storageObject;
150         }
151         throw new ResourceIOException("Unexpected cache content: " + storageObject.getClass());
152     }
153 
154     @Override
155     protected byte[] getStorageObject(final CASValue<Object> casValue) throws ResourceIOException {
156         return castAsByteArray(casValue.getValue());
157     }
158 
159     private <T> Cancellable operation(final OperationFuture<T> operationFuture, final FutureCallback<T> callback) {
160         operationFuture.addListener(future -> {
161             try {
162                 callback.completed(operationFuture.get());
163             } catch (final ExecutionException ex) {
164                 if (ex.getCause() instanceof Exception) {
165                     if (ex.getCause() instanceof CancellationException) {
166                         callback.failed(new MemcachedOperationCancellationException(ex.getCause()));
167                     } else {
168                         callback.failed((Exception) ex.getCause());
169                     }
170                 } else {
171                     callback.failed(ex);
172                 }
173             }
174         });
175         return Operations.cancellable(operationFuture);
176     }
177 
178     @Override
179     protected Cancellable store(final String storageKey, final byte[] storageObject, final FutureCallback<Boolean> callback) {
180         return operation(client.set(storageKey, 0, storageObject), callback);
181     }
182 
183     @Override
184     protected Cancellable restore(final String storageKey, final FutureCallback<byte[]> callback) {
185         final GetFuture<Object> getFuture = client.asyncGet(storageKey);
186         getFuture.addListener(future -> {
187             try {
188                 callback.completed(castAsByteArray(getFuture.get()));
189             } catch (final ExecutionException ex) {
190                 if (ex.getCause() instanceof Exception) {
191                     if (ex.getCause() instanceof CancellationException) {
192                         callback.failed(new MemcachedOperationCancellationException(ex.getCause()));
193                     } else {
194                         callback.failed((Exception) ex.getCause());
195                     }
196                 } else {
197                     callback.failed(ex);
198                 }
199             }
200         });
201         return Operations.cancellable(getFuture);
202     }
203 
204     @Override
205     protected Cancellable getForUpdateCAS(final String storageKey, final FutureCallback<CASValue<Object>> callback) {
206         return operation(client.asyncGets(storageKey), callback);
207     }
208 
209     @Override
210     protected Cancellable updateCAS(
211             final String storageKey, final CASValue<Object> casValue, final byte[] storageObject, final FutureCallback<Boolean> callback) {
212         return operation(client.asyncCAS(storageKey, casValue.getCas(), storageObject), new FutureCallback<CASResponse>() {
213 
214             @Override
215             public void completed(final CASResponse result) {
216                 callback.completed(result == CASResponse.OK);
217             }
218 
219             @Override
220             public void failed(final Exception ex) {
221                 callback.failed(ex);
222             }
223 
224             @Override
225             public void cancelled() {
226                 callback.cancelled();
227             }
228 
229         });
230     }
231 
232     @Override
233     protected Cancellable delete(final String storageKey, final FutureCallback<Boolean> callback) {
234         return operation(client.delete(storageKey), callback);
235     }
236 
237     @Override
238     protected Cancellable bulkRestore(final Collection<String> storageKeys, final FutureCallback<Map<String, byte[]>> callback) {
239         final BulkFuture<Map<String, Object>> future = client.asyncGetBulk(storageKeys);
240         future.addListener(future1 -> {
241             final Map<String, ?> storageObjectMap = future1.get();
242             final Map<String, byte[]> resultMap = new HashMap<>(storageObjectMap.size());
243             for (final Map.Entry<String, ?> resultEntry: storageObjectMap.entrySet()) {
244                 resultMap.put(resultEntry.getKey(), castAsByteArray(resultEntry.getValue()));
245             }
246             callback.completed(resultMap);
247         });
248         return Operations.cancellable(future);
249     }
250 
251 }