View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.cache.memcached;
28  
29  import java.io.IOException;
30  import java.net.InetSocketAddress;
31  import java.util.Collection;
32  import java.util.HashMap;
33  import java.util.Map;
34  import java.util.concurrent.ExecutionException;
35  
36  import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
37  import org.apache.hc.client5.http.cache.ResourceIOException;
38  import org.apache.hc.client5.http.impl.Operations;
39  import org.apache.hc.client5.http.impl.cache.AbstractBinaryAsyncCacheStorage;
40  import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer;
41  import org.apache.hc.client5.http.impl.cache.CacheConfig;
42  import org.apache.hc.core5.concurrent.Cancellable;
43  import org.apache.hc.core5.concurrent.FutureCallback;
44  import org.apache.hc.core5.util.Args;
45  
46  import net.spy.memcached.CASResponse;
47  import net.spy.memcached.CASValue;
48  import net.spy.memcached.MemcachedClient;
49  import net.spy.memcached.internal.BulkFuture;
50  import net.spy.memcached.internal.GetFuture;
51  import net.spy.memcached.internal.OperationFuture;
52  
53  /**
54   * <p>
55   * This class is a storage backend that uses an external <i>memcached</i>
56   * for storing cached origin responses. This storage option provides a
57   * couple of interesting advantages over the default in-memory storage
58   * backend:
59   * </p>
60   * <ol>
61   * <li>in-memory cached objects can survive an application restart since
62   * they are held in a separate process</li>
63   * <li>it becomes possible for several cooperating applications to share
64   * a large <i>memcached</i> farm together</li>
65   * </ol>
66   * <p>
67   * Note that in a shared memcached pool setting you may wish to make use
68   * of the Ketama consistent hashing algorithm to reduce the number of
69   * cache misses that might result if one of the memcached cluster members
70   * fails (see the <a href="http://dustin.github.com/java-memcached-client/apidocs/net/spy/memcached/KetamaConnectionFactory.html">
71   * KetamaConnectionFactory</a>).
72   * </p>
73   * <p>
74   * Because memcached places limits on the size of its keys, we need to
75   * introduce a key hashing scheme to map the annotated URLs the higher-level
76   * caching HTTP client wants to use as keys onto ones that are suitable
77   * for use with memcached. Please see {@link KeyHashingScheme} if you would
78   * like to use something other than the provided {@link SHA256KeyHashingScheme}.
79   * </p>
80   *
81   * <p>
82   * Please refer to the <a href="http://code.google.com/p/memcached/wiki/NewStart">
83   * memcached documentation</a> and in particular to the documentation for
84   * the <a href="http://code.google.com/p/spymemcached/">spymemcached
85   * documentation</a> for details about how to set up and configure memcached
86   * and the Java client used here, respectively.
87   * </p>
88   *
89   * @since 5.0
90   */
91  public class MemcachedHttpAsyncCacheStorage extends AbstractBinaryAsyncCacheStorage<CASValue<Object>> {
92  
93      private final MemcachedClient client;
94      private final KeyHashingScheme keyHashingScheme;
95  
96      /**
97       * Create a storage backend talking to a <i>memcached</i> instance
98       * listening on the specified host and port. This is useful if you
99       * just have a single local memcached instance running on the same
100      * machine as your application, for example.
101      * @param address where the <i>memcached</i> daemon is running
102      * @throws IOException in case of an error
103      */
104     public MemcachedHttpAsyncCacheStorage(final InetSocketAddress address) throws IOException {
105         this(new MemcachedClient(address));
106     }
107 
108     /**
109      * Create a storage backend using the pre-configured given
110      * <i>memcached</i> client.
111      * @param cache client to use for communicating with <i>memcached</i>
112      */
113     public MemcachedHttpAsyncCacheStorage(final MemcachedClient cache) {
114         this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE);
115     }
116 
117     /**
118      * Create a storage backend using the given <i>memcached</i> client and
119      * applying the given cache configuration, serialization, and hashing
120      * mechanisms.
121      * @param client how to talk to <i>memcached</i>
122      * @param config apply HTTP cache-related options
123      * @param serializer alternative serialization mechanism
124      * @param keyHashingScheme how to map higher-level logical "storage keys"
125      *   onto "cache keys" suitable for use with memcached
126      */
127     public MemcachedHttpAsyncCacheStorage(
128             final MemcachedClient client,
129             final CacheConfig config,
130             final HttpCacheEntrySerializer<byte[]> serializer,
131             final KeyHashingScheme keyHashingScheme) {
132         super((config != null ? config : CacheConfig.DEFAULT).getMaxUpdateRetries(),
133                 serializer != null ? serializer : ByteArrayCacheEntrySerializer.INSTANCE);
134         this.client = Args.notNull(client, "Memcached client");
135         this.keyHashingScheme = keyHashingScheme;
136     }
137 
138     @Override
139     protected String digestToStorageKey(final String key) {
140         return keyHashingScheme.hash(key);
141     }
142 
143     private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException {
144         if (storageObject == null) {
145             return null;
146         }
147         if (storageObject instanceof byte[]) {
148             return (byte[]) storageObject;
149         }
150         throw new ResourceIOException("Unexpected cache content: " + storageObject.getClass());
151     }
152 
153     @Override
154     protected byte[] getStorageObject(final CASValue<Object> casValue) throws ResourceIOException {
155         return castAsByteArray(casValue.getValue());
156     }
157 
158     private <T> Cancellable operation(final OperationFuture<T> operationFuture, final FutureCallback<T> callback) {
159         operationFuture.addListener(future -> {
160             try {
161                 callback.completed(operationFuture.get());
162             } catch (final ExecutionException ex) {
163                 if (ex.getCause() instanceof Exception) {
164                     callback.failed((Exception) ex.getCause());
165                 } else {
166                     callback.failed(ex);
167                 }
168             }
169         });
170         return Operations.cancellable(operationFuture);
171     }
172 
173     @Override
174     protected Cancellable store(final String storageKey, final byte[] storageObject, final FutureCallback<Boolean> callback) {
175         return operation(client.set(storageKey, 0, storageObject), callback);
176     }
177 
178     @Override
179     protected Cancellable restore(final String storageKey, final FutureCallback<byte[]> callback) {
180         final GetFuture<Object> getFuture = client.asyncGet(storageKey);
181         getFuture.addListener(future -> {
182             try {
183                 callback.completed(castAsByteArray(getFuture.get()));
184             } catch (final ExecutionException ex) {
185                 if (ex.getCause() instanceof Exception) {
186                     callback.failed((Exception) ex.getCause());
187                 } else {
188                     callback.failed(ex);
189                 }
190             }
191         });
192         return Operations.cancellable(getFuture);
193     }
194 
195     @Override
196     protected Cancellable getForUpdateCAS(final String storageKey, final FutureCallback<CASValue<Object>> callback) {
197         return operation(client.asyncGets(storageKey), callback);
198     }
199 
200     @Override
201     protected Cancellable updateCAS(
202             final String storageKey, final CASValue<Object> casValue, final byte[] storageObject, final FutureCallback<Boolean> callback) {
203         return operation(client.asyncCAS(storageKey, casValue.getCas(), storageObject), new FutureCallback<CASResponse>() {
204 
205             @Override
206             public void completed(final CASResponse result) {
207                 callback.completed(result == CASResponse.OK);
208             }
209 
210             @Override
211             public void failed(final Exception ex) {
212                 callback.failed(ex);
213             }
214 
215             @Override
216             public void cancelled() {
217                 callback.cancelled();
218             }
219 
220         });
221     }
222 
223     @Override
224     protected Cancellable delete(final String storageKey, final FutureCallback<Boolean> callback) {
225         return operation(client.delete(storageKey), callback);
226     }
227 
228     @Override
229     protected Cancellable bulkRestore(final Collection<String> storageKeys, final FutureCallback<Map<String, byte[]>> callback) {
230         final BulkFuture<Map<String, Object>> future = client.asyncGetBulk(storageKeys);
231         future.addListener(future1 -> {
232             final Map<String, ?> storageObjectMap = future1.get();
233             final Map<String, byte[]> resultMap = new HashMap<>(storageObjectMap.size());
234             for (final Map.Entry<String, ?> resultEntry: storageObjectMap.entrySet()) {
235                 resultMap.put(resultEntry.getKey(), castAsByteArray(resultEntry.getValue()));
236             }
237             callback.completed(resultMap);
238         });
239         return Operations.cancellable(future);
240     }
241 
242 }