View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.cache.memcached;
28  
29  import java.io.IOException;
30  import java.net.InetSocketAddress;
31  import java.util.Collection;
32  import java.util.HashMap;
33  import java.util.Map;
34  import java.util.concurrent.ExecutionException;
35  
36  import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
37  import org.apache.hc.client5.http.cache.ResourceIOException;
38  import org.apache.hc.client5.http.impl.Operations;
39  import org.apache.hc.client5.http.impl.cache.AbstractBinaryAsyncCacheStorage;
40  import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer;
41  import org.apache.hc.client5.http.impl.cache.CacheConfig;
42  import org.apache.hc.core5.concurrent.Cancellable;
43  import org.apache.hc.core5.concurrent.FutureCallback;
44  import org.apache.hc.core5.util.Args;
45  
46  import net.spy.memcached.CASResponse;
47  import net.spy.memcached.CASValue;
48  import net.spy.memcached.MemcachedClient;
49  import net.spy.memcached.internal.BulkFuture;
50  import net.spy.memcached.internal.BulkGetCompletionListener;
51  import net.spy.memcached.internal.BulkGetFuture;
52  import net.spy.memcached.internal.GetCompletionListener;
53  import net.spy.memcached.internal.GetFuture;
54  import net.spy.memcached.internal.OperationCompletionListener;
55  import net.spy.memcached.internal.OperationFuture;
56  
57  /**
58   * <p>
59   * This class is a storage backend that uses an external <i>memcached</i>
60   * for storing cached origin responses. This storage option provides a
61   * couple of interesting advantages over the default in-memory storage
62   * backend:
63   * </p>
64   * <ol>
65   * <li>in-memory cached objects can survive an application restart since
66   * they are held in a separate process</li>
67   * <li>it becomes possible for several cooperating applications to share
68   * a large <i>memcached</i> farm together</li>
69   * </ol>
70   * <p>
71   * Note that in a shared memcached pool setting you may wish to make use
72   * of the Ketama consistent hashing algorithm to reduce the number of
73   * cache misses that might result if one of the memcached cluster members
74   * fails (see the <a href="http://dustin.github.com/java-memcached-client/apidocs/net/spy/memcached/KetamaConnectionFactory.html">
75   * KetamaConnectionFactory</a>).
76   * </p>
77   * <p>
78   * Because memcached places limits on the size of its keys, we need to
79   * introduce a key hashing scheme to map the annotated URLs the higher-level
80   * caching HTTP client wants to use as keys onto ones that are suitable
81   * for use with memcached. Please see {@link KeyHashingScheme} if you would
82   * like to use something other than the provided {@link SHA256KeyHashingScheme}.
83   * </p>
84   *
85   * <p>
86   * Please refer to the <a href="http://code.google.com/p/memcached/wiki/NewStart">
87   * memcached documentation</a> and in particular to the documentation for
88   * the <a href="http://code.google.com/p/spymemcached/">spymemcached
89   * documentation</a> for details about how to set up and configure memcached
90   * and the Java client used here, respectively.
91   * </p>
92   *
93   * @since 5.0
94   */
95  public class MemcachedHttpAsyncCacheStorage extends AbstractBinaryAsyncCacheStorage<CASValue<Object>> {
96  
97      private final MemcachedClient client;
98      private final KeyHashingScheme keyHashingScheme;
99  
100     /**
101      * Create a storage backend talking to a <i>memcached</i> instance
102      * listening on the specified host and port. This is useful if you
103      * just have a single local memcached instance running on the same
104      * machine as your application, for example.
105      * @param address where the <i>memcached</i> daemon is running
106      * @throws IOException in case of an error
107      */
108     public MemcachedHttpAsyncCacheStorage(final InetSocketAddress address) throws IOException {
109         this(new MemcachedClient(address));
110     }
111 
112     /**
113      * Create a storage backend using the pre-configured given
114      * <i>memcached</i> client.
115      * @param cache client to use for communicating with <i>memcached</i>
116      */
117     public MemcachedHttpAsyncCacheStorage(final MemcachedClient cache) {
118         this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE);
119     }
120 
121     /**
122      * Create a storage backend using the given <i>memcached</i> client and
123      * applying the given cache configuration, serialization, and hashing
124      * mechanisms.
125      * @param client how to talk to <i>memcached</i>
126      * @param config apply HTTP cache-related options
127      * @param serializer alternative serialization mechanism
128      * @param keyHashingScheme how to map higher-level logical "storage keys"
129      *   onto "cache keys" suitable for use with memcached
130      */
131     public MemcachedHttpAsyncCacheStorage(
132             final MemcachedClient client,
133             final CacheConfig config,
134             final HttpCacheEntrySerializer<byte[]> serializer,
135             final KeyHashingScheme keyHashingScheme) {
136         super((config != null ? config : CacheConfig.DEFAULT).getMaxUpdateRetries(),
137                 serializer != null ? serializer : ByteArrayCacheEntrySerializer.INSTANCE);
138         this.client = Args.notNull(client, "Memcached client");
139         this.keyHashingScheme = keyHashingScheme;
140     }
141 
142     @Override
143     protected String digestToStorageKey(final String key) {
144         return keyHashingScheme.hash(key);
145     }
146 
147     private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException {
148         if (storageObject == null) {
149             return null;
150         }
151         if (storageObject instanceof byte[]) {
152             return (byte[]) storageObject;
153         }
154         throw new ResourceIOException("Unexpected cache content: " + storageObject.getClass());
155     }
156 
157     @Override
158     protected byte[] getStorageObject(final CASValue<Object> casValue) throws ResourceIOException {
159         return castAsByteArray(casValue.getValue());
160     }
161 
162     private <T> Cancellable operation(final OperationFuture<T> operationFuture, final FutureCallback<T> callback) {
163         operationFuture.addListener(new OperationCompletionListener() {
164 
165             @Override
166             public void onComplete(final OperationFuture<?> future) throws Exception {
167                 try {
168                     callback.completed(operationFuture.get());
169                 } catch (final ExecutionException ex) {
170                     if (ex.getCause() instanceof Exception) {
171                         callback.failed((Exception) ex.getCause());
172                     } else {
173                         callback.failed(ex);
174                     }
175                 }
176             }
177 
178         });
179         return Operations.cancellable(operationFuture);
180     }
181 
182     @Override
183     protected Cancellable store(final String storageKey, final byte[] storageObject, final FutureCallback<Boolean> callback) {
184         return operation(client.set(storageKey, 0, storageObject), callback);
185     }
186 
187     @Override
188     protected Cancellable restore(final String storageKey, final FutureCallback<byte[]> callback) {
189         final GetFuture<Object> getFuture = client.asyncGet(storageKey);
190         getFuture.addListener(new GetCompletionListener() {
191 
192             @Override
193             public void onComplete(final GetFuture<?> future) throws Exception {
194                 try {
195                     callback.completed(castAsByteArray(getFuture.get()));
196                 } catch (final ExecutionException ex) {
197                     if (ex.getCause() instanceof Exception) {
198                         callback.failed((Exception) ex.getCause());
199                     } else {
200                         callback.failed(ex);
201                     }
202                 }
203             }
204 
205         });
206         return Operations.cancellable(getFuture);
207     }
208 
209     @Override
210     protected Cancellable getForUpdateCAS(final String storageKey, final FutureCallback<CASValue<Object>> callback) {
211         return operation(client.asyncGets(storageKey), callback);
212     }
213 
214     @Override
215     protected Cancellable updateCAS(
216             final String storageKey, final CASValue<Object> casValue, final byte[] storageObject, final FutureCallback<Boolean> callback) {
217         return operation(client.asyncCAS(storageKey, casValue.getCas(), storageObject), new FutureCallback<CASResponse>() {
218 
219             @Override
220             public void completed(final CASResponse result) {
221                 callback.completed(result == CASResponse.OK);
222             }
223 
224             @Override
225             public void failed(final Exception ex) {
226                 callback.failed(ex);
227             }
228 
229             @Override
230             public void cancelled() {
231                 callback.cancelled();
232             }
233 
234         });
235     }
236 
237     @Override
238     protected Cancellable delete(final String storageKey, final FutureCallback<Boolean> callback) {
239         return operation(client.delete(storageKey), callback);
240     }
241 
242     @Override
243     protected Cancellable bulkRestore(final Collection<String> storageKeys, final FutureCallback<Map<String, byte[]>> callback) {
244         final BulkFuture<Map<String, Object>> future = client.asyncGetBulk(storageKeys);
245         future.addListener(new BulkGetCompletionListener() {
246 
247             @Override
248             public void onComplete(final BulkGetFuture<?> future) throws Exception {
249                 final Map<String, ?> storageObjectMap = future.get();
250                 final Map<String, byte[]> resultMap = new HashMap<>(storageObjectMap.size());
251                 for (final Map.Entry<String, ?> resultEntry: storageObjectMap.entrySet()) {
252                     resultMap.put(resultEntry.getKey(), castAsByteArray(resultEntry.getValue()));
253                 }
254                 callback.completed(resultMap);
255             }
256         });
257         return Operations.cancellable(future);
258     }
259 
260 }