1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.cache;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.Date;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.hc.client5.http.HttpRoute;
40 import org.apache.hc.client5.http.async.AsyncExecCallback;
41 import org.apache.hc.client5.http.async.AsyncExecChain;
42 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
43 import org.apache.hc.client5.http.async.methods.SimpleBody;
44 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
45 import org.apache.hc.client5.http.cache.CacheResponseStatus;
46 import org.apache.hc.client5.http.cache.HeaderConstants;
47 import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
48 import org.apache.hc.client5.http.cache.HttpCacheEntry;
49 import org.apache.hc.client5.http.cache.ResourceFactory;
50 import org.apache.hc.client5.http.cache.ResourceIOException;
51 import org.apache.hc.client5.http.impl.ExecSupport;
52 import org.apache.hc.client5.http.impl.RequestCopier;
53 import org.apache.hc.client5.http.protocol.HttpClientContext;
54 import org.apache.hc.client5.http.schedule.SchedulingStrategy;
55 import org.apache.hc.client5.http.utils.DateUtils;
56 import org.apache.hc.core5.annotation.Contract;
57 import org.apache.hc.core5.annotation.ThreadingBehavior;
58 import org.apache.hc.core5.concurrent.CancellableDependency;
59 import org.apache.hc.core5.concurrent.ComplexFuture;
60 import org.apache.hc.core5.concurrent.FutureCallback;
61 import org.apache.hc.core5.http.ContentType;
62 import org.apache.hc.core5.http.EntityDetails;
63 import org.apache.hc.core5.http.Header;
64 import org.apache.hc.core5.http.HttpException;
65 import org.apache.hc.core5.http.HttpHeaders;
66 import org.apache.hc.core5.http.HttpHost;
67 import org.apache.hc.core5.http.HttpRequest;
68 import org.apache.hc.core5.http.HttpResponse;
69 import org.apache.hc.core5.http.HttpStatus;
70 import org.apache.hc.core5.http.impl.BasicEntityDetails;
71 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
72 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
73 import org.apache.hc.core5.http.nio.CapacityChannel;
74 import org.apache.hc.core5.http.protocol.HttpCoreContext;
75 import org.apache.hc.core5.net.URIAuthority;
76 import org.apache.hc.core5.util.Args;
77 import org.apache.hc.core5.util.ByteArrayBuffer;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81
82
83
84
85
86
87
88
89
90
91
92 @Contract(threading = ThreadingBehavior.SAFE)
93 class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler {
94
95 private static final Logger LOG = LoggerFactory.getLogger(AsyncCachingExec.class);
96 private final HttpAsyncCache responseCache;
97 private final DefaultAsyncCacheRevalidator cacheRevalidator;
98 private final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder;
99
100 AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) {
101 super(config);
102 this.responseCache = Args.notNull(cache, "Response cache");
103 this.cacheRevalidator = cacheRevalidator;
104 this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(RequestCopier.INSTANCE);
105 }
106
107 AsyncCachingExec(
108 final HttpAsyncCache responseCache,
109 final CacheValidityPolicy validityPolicy,
110 final ResponseCachingPolicy responseCachingPolicy,
111 final CachedHttpResponseGenerator responseGenerator,
112 final CacheableRequestPolicy cacheableRequestPolicy,
113 final CachedResponseSuitabilityChecker suitabilityChecker,
114 final ResponseProtocolCompliance responseCompliance,
115 final RequestProtocolCompliance requestCompliance,
116 final DefaultAsyncCacheRevalidator cacheRevalidator,
117 final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder,
118 final CacheConfig config) {
119 super(validityPolicy, responseCachingPolicy, responseGenerator, cacheableRequestPolicy,
120 suitabilityChecker, responseCompliance, requestCompliance, config);
121 this.responseCache = responseCache;
122 this.cacheRevalidator = cacheRevalidator;
123 this.conditionalRequestBuilder = conditionalRequestBuilder;
124 }
125
126 AsyncCachingExec(
127 final HttpAsyncCache cache,
128 final ScheduledExecutorService executorService,
129 final SchedulingStrategy schedulingStrategy,
130 final CacheConfig config) {
131 this(cache,
132 executorService != null ? new DefaultAsyncCacheRevalidator(executorService, schedulingStrategy) : null,
133 config);
134 }
135
136 AsyncCachingExec(
137 final ResourceFactory resourceFactory,
138 final HttpAsyncCacheStorage storage,
139 final ScheduledExecutorService executorService,
140 final SchedulingStrategy schedulingStrategy,
141 final CacheConfig config) {
142 this(new BasicHttpAsyncCache(resourceFactory, storage), executorService, schedulingStrategy, config);
143 }
144
145 private void triggerResponse(
146 final SimpleHttpResponse cacheResponse,
147 final AsyncExecChain.Scope scope,
148 final AsyncExecCallback asyncExecCallback) {
149 scope.clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, cacheResponse);
150 scope.execRuntime.releaseEndpoint();
151
152 final SimpleBody body = cacheResponse.getBody();
153 final byte[] content = body != null ? body.getBodyBytes() : null;
154 final ContentType contentType = body != null ? body.getContentType() : null;
155 try {
156 final AsyncDataConsumer dataConsumer = asyncExecCallback.handleResponse(
157 cacheResponse,
158 content != null ? new BasicEntityDetails(content.length, contentType) : null);
159 if (dataConsumer != null) {
160 dataConsumer.consume(ByteBuffer.wrap(content));
161 dataConsumer.streamEnd(null);
162 }
163 asyncExecCallback.completed();
164 } catch (final HttpException | IOException ex) {
165 asyncExecCallback.failed(ex);
166 }
167 }
168
169 static class AsyncExecCallbackWrapper implements AsyncExecCallback {
170
171 private final AsyncExecCallback asyncExecCallback;
172 private final Runnable command;
173
174 AsyncExecCallbackWrapper(final AsyncExecCallback asyncExecCallback, final Runnable command) {
175 this.asyncExecCallback = asyncExecCallback;
176 this.command = command;
177 }
178
179 @Override
180 public AsyncDataConsumer handleResponse(
181 final HttpResponse response,
182 final EntityDetails entityDetails) throws HttpException, IOException {
183 return null;
184 }
185
186 @Override
187 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
188 }
189
190 @Override
191 public void completed() {
192 command.run();
193 }
194
195 @Override
196 public void failed(final Exception cause) {
197 asyncExecCallback.failed(cause);
198 }
199
200 }
201
202 @Override
203 public void execute(
204 final HttpRequest request,
205 final AsyncEntityProducer entityProducer,
206 final AsyncExecChain.Scope scope,
207 final AsyncExecChain chain,
208 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
209 Args.notNull(request, "HTTP request");
210 Args.notNull(scope, "Scope");
211
212 final HttpRoute route = scope.route;
213 final CancellableDependency operation = scope.cancellableDependency;
214 final HttpClientContext context = scope.clientContext;
215 context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
216 context.setAttribute(HttpClientContext.HTTP_REQUEST, request);
217
218 final URIAuthority authority = request.getAuthority();
219 final String scheme = request.getScheme();
220 final HttpHost target = authority != null ? new HttpHost(scheme, authority) : route.getTargetHost();
221 final String via = generateViaHeader(request);
222
223
224 setResponseStatus(context, CacheResponseStatus.CACHE_MISS);
225
226 if (clientRequestsOurOptions(request)) {
227 setResponseStatus(context, CacheResponseStatus.CACHE_MODULE_RESPONSE);
228 triggerResponse(SimpleHttpResponse.create(HttpStatus.SC_NOT_IMPLEMENTED), scope, asyncExecCallback);
229 return;
230 }
231
232 final SimpleHttpResponse fatalErrorResponse = getFatallyNoncompliantResponse(request, context);
233 if (fatalErrorResponse != null) {
234 triggerResponse(fatalErrorResponse, scope, asyncExecCallback);
235 return;
236 }
237
238 requestCompliance.makeRequestCompliant(request);
239 request.addHeader("Via",via);
240
241 if (!cacheableRequestPolicy.isServableFromCache(request)) {
242 LOG.debug("Request is not servable from cache");
243 operation.setDependency(responseCache.flushCacheEntriesInvalidatedByRequest(target, request, new FutureCallback<Boolean>() {
244
245 @Override
246 public void completed(final Boolean result) {
247 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
248 }
249
250 @Override
251 public void failed(final Exception cause) {
252 asyncExecCallback.failed(cause);
253 }
254
255 @Override
256 public void cancelled() {
257 asyncExecCallback.failed(new InterruptedIOException());
258 }
259
260 }));
261 } else {
262 operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
263
264 @Override
265 public void completed(final HttpCacheEntry entry) {
266 if (entry == null) {
267 LOG.debug("Cache miss");
268 handleCacheMiss(target, request, entityProducer, scope, chain, asyncExecCallback);
269 } else {
270 handleCacheHit(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
271 }
272 }
273
274 @Override
275 public void failed(final Exception cause) {
276 asyncExecCallback.failed(cause);
277 }
278
279 @Override
280 public void cancelled() {
281 asyncExecCallback.failed(new InterruptedIOException());
282 }
283
284 }));
285
286 }
287 }
288
289 void chainProceed(
290 final HttpRequest request,
291 final AsyncEntityProducer entityProducer,
292 final AsyncExecChain.Scope scope,
293 final AsyncExecChain chain,
294 final AsyncExecCallback asyncExecCallback) {
295 try {
296 chain.proceed(request, entityProducer, scope, asyncExecCallback);
297 } catch (final HttpException | IOException ex) {
298 asyncExecCallback.failed(ex);
299 }
300 }
301
302 void callBackend(
303 final HttpHost target,
304 final HttpRequest request,
305 final AsyncEntityProducer entityProducer,
306 final AsyncExecChain.Scope scope,
307 final AsyncExecChain chain,
308 final AsyncExecCallback asyncExecCallback) {
309 LOG.debug("Calling the backend");
310 final Date requestDate = getCurrentDate();
311 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
312 chainProceed(request, entityProducer, scope, chain, new AsyncExecCallback() {
313
314 @Override
315 public AsyncDataConsumer handleResponse(
316 final HttpResponse backendResponse,
317 final EntityDetails entityDetails) throws HttpException, IOException {
318 final Date responseDate = getCurrentDate();
319 backendResponse.addHeader("Via", generateViaHeader(backendResponse));
320
321 final AsyncExecCallback callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
322 callbackRef.set(callback);
323 return callback.handleResponse(backendResponse, entityDetails);
324 }
325
326 @Override
327 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
328 final AsyncExecCallback callback = callbackRef.getAndSet(null);
329 if (callback != null) {
330 callback.handleInformationResponse(response);
331 } else {
332 asyncExecCallback.handleInformationResponse(response);
333 }
334 }
335
336 @Override
337 public void completed() {
338 final AsyncExecCallback callback = callbackRef.getAndSet(null);
339 if (callback != null) {
340 callback.completed();
341 } else {
342 asyncExecCallback.completed();
343 }
344 }
345
346 @Override
347 public void failed(final Exception cause) {
348 final AsyncExecCallback callback = callbackRef.getAndSet(null);
349 if (callback != null) {
350 callback.failed(cause);
351 } else {
352 asyncExecCallback.failed(cause);
353 }
354 }
355
356 });
357 }
358
359 class CachingAsyncDataConsumer implements AsyncDataConsumer {
360
361 private final AsyncExecCallback fallback;
362 private final HttpResponse backendResponse;
363 private final EntityDetails entityDetails;
364 private final AtomicBoolean writtenThrough;
365 private final AtomicReference<ByteArrayBuffer> bufferRef;
366 private final AtomicReference<AsyncDataConsumer> dataConsumerRef;
367
368 CachingAsyncDataConsumer(
369 final AsyncExecCallback fallback,
370 final HttpResponse backendResponse,
371 final EntityDetails entityDetails) {
372 this.fallback = fallback;
373 this.backendResponse = backendResponse;
374 this.entityDetails = entityDetails;
375 this.writtenThrough = new AtomicBoolean(false);
376 this.bufferRef = new AtomicReference<>(entityDetails != null ? new ByteArrayBuffer(1024) : null);
377 this.dataConsumerRef = new AtomicReference<>();
378 }
379
380 @Override
381 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
382 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
383 if (dataConsumer != null) {
384 dataConsumer.updateCapacity(capacityChannel);
385 } else {
386 capacityChannel.update(Integer.MAX_VALUE);
387 }
388 }
389
390 @Override
391 public final void consume(final ByteBuffer src) throws IOException {
392 final ByteArrayBuffer buffer = bufferRef.get();
393 if (buffer != null) {
394 if (src.hasArray()) {
395 buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
396 } else {
397 while (src.hasRemaining()) {
398 buffer.append(src.get());
399 }
400 }
401 if (buffer.length() > cacheConfig.getMaxObjectSize()) {
402 LOG.debug("Backend response content length exceeds maximum");
403
404
405 bufferRef.set(null);
406 try {
407 final AsyncDataConsumer dataConsumer = fallback.handleResponse(backendResponse, entityDetails);
408 if (dataConsumer != null) {
409 dataConsumerRef.set(dataConsumer);
410 writtenThrough.set(true);
411 dataConsumer.consume(ByteBuffer.wrap(buffer.array(), 0, buffer.length()));
412 }
413 } catch (final HttpException ex) {
414 fallback.failed(ex);
415 }
416 }
417 } else {
418 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
419 if (dataConsumer != null) {
420 dataConsumer.consume(src);
421 }
422 }
423 }
424
425 @Override
426 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
427 final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
428 if (dataConsumer != null) {
429 dataConsumer.streamEnd(trailers);
430 }
431 }
432
433 @Override
434 public void releaseResources() {
435 final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
436 if (dataConsumer != null) {
437 dataConsumer.releaseResources();
438 }
439 }
440
441 }
442
443 class BackendResponseHandler implements AsyncExecCallback {
444
445 private final HttpHost target;
446 private final HttpRequest request;
447 private final Date requestDate;
448 private final Date responseDate;
449 private final AsyncExecChain.Scope scope;
450 private final AsyncExecCallback asyncExecCallback;
451 private final AtomicReference<CachingAsyncDataConsumer> cachingConsumerRef;
452
453 BackendResponseHandler(
454 final HttpHost target,
455 final HttpRequest request,
456 final Date requestDate,
457 final Date responseDate,
458 final AsyncExecChain.Scope scope,
459 final AsyncExecCallback asyncExecCallback) {
460 this.target = target;
461 this.request = request;
462 this.requestDate = requestDate;
463 this.responseDate = responseDate;
464 this.scope = scope;
465 this.asyncExecCallback = asyncExecCallback;
466 this.cachingConsumerRef = new AtomicReference<>();
467 }
468
469 @Override
470 public AsyncDataConsumer handleResponse(
471 final HttpResponse backendResponse,
472 final EntityDetails entityDetails) throws HttpException, IOException {
473 responseCompliance.ensureProtocolCompliance(scope.originalRequest, request, backendResponse);
474 responseCache.flushCacheEntriesInvalidatedByExchange(target, request, backendResponse, new FutureCallback<Boolean>() {
475
476 @Override
477 public void completed(final Boolean result) {
478 }
479
480 @Override
481 public void failed(final Exception ex) {
482 LOG.warn("Unable to flush invalidated entries from cache", ex);
483 }
484
485 @Override
486 public void cancelled() {
487 }
488
489 });
490 final boolean cacheable = responseCachingPolicy.isResponseCacheable(request, backendResponse);
491 if (cacheable) {
492 cachingConsumerRef.set(new CachingAsyncDataConsumer(asyncExecCallback, backendResponse, entityDetails));
493 storeRequestIfModifiedSinceFor304Response(request, backendResponse);
494 } else {
495 LOG.debug("Backend response is not cacheable");
496 responseCache.flushCacheEntriesFor(target, request, new FutureCallback<Boolean>() {
497
498 @Override
499 public void completed(final Boolean result) {
500 }
501
502 @Override
503 public void failed(final Exception ex) {
504 LOG.warn("Unable to flush invalidated entries from cache", ex);
505 }
506
507 @Override
508 public void cancelled() {
509 }
510
511 });
512 }
513 final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.get();
514 if (cachingDataConsumer != null) {
515 LOG.debug("Caching backend response");
516 return cachingDataConsumer;
517 }
518 return asyncExecCallback.handleResponse(backendResponse, entityDetails);
519 }
520
521 @Override
522 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
523 asyncExecCallback.handleInformationResponse(response);
524 }
525
526 void triggerNewCacheEntryResponse(final HttpResponse backendResponse, final Date responseDate, final ByteArrayBuffer buffer) {
527 final CancellableDependency operation = scope.cancellableDependency;
528 operation.setDependency(responseCache.createCacheEntry(
529 target,
530 request,
531 backendResponse,
532 buffer,
533 requestDate,
534 responseDate,
535 new FutureCallback<HttpCacheEntry>() {
536
537 @Override
538 public void completed(final HttpCacheEntry newEntry) {
539 LOG.debug("Backend response successfully cached");
540 try {
541 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, newEntry);
542 triggerResponse(cacheResponse, scope, asyncExecCallback);
543 } catch (final ResourceIOException ex) {
544 asyncExecCallback.failed(ex);
545 }
546 }
547
548 @Override
549 public void failed(final Exception ex) {
550 asyncExecCallback.failed(ex);
551 }
552
553 @Override
554 public void cancelled() {
555 asyncExecCallback.failed(new InterruptedIOException());
556 }
557
558 }));
559
560 }
561
562 @Override
563 public void completed() {
564 final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.getAndSet(null);
565 if (cachingDataConsumer != null && !cachingDataConsumer.writtenThrough.get()) {
566 final ByteArrayBuffer buffer = cachingDataConsumer.bufferRef.getAndSet(null);
567 final HttpResponse backendResponse = cachingDataConsumer.backendResponse;
568 if (cacheConfig.isFreshnessCheckEnabled()) {
569 final CancellableDependency operation = scope.cancellableDependency;
570 operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
571
572 @Override
573 public void completed(final HttpCacheEntry existingEntry) {
574 if (DateUtils.isAfter(existingEntry, backendResponse, HttpHeaders.DATE)) {
575 LOG.debug("Backend already contains fresher cache entry");
576 try {
577 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, existingEntry);
578 triggerResponse(cacheResponse, scope, asyncExecCallback);
579 } catch (final ResourceIOException ex) {
580 asyncExecCallback.failed(ex);
581 }
582 } else {
583 triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
584 }
585 }
586
587 @Override
588 public void failed(final Exception cause) {
589 asyncExecCallback.failed(cause);
590 }
591
592 @Override
593 public void cancelled() {
594 asyncExecCallback.failed(new InterruptedIOException());
595 }
596
597 }));
598 } else {
599 triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
600 }
601 } else {
602 asyncExecCallback.completed();
603 }
604 }
605
606 @Override
607 public void failed(final Exception cause) {
608 asyncExecCallback.failed(cause);
609 }
610
611 }
612
613 private void handleCacheHit(
614 final HttpHost target,
615 final HttpRequest request,
616 final AsyncEntityProducer entityProducer,
617 final AsyncExecChain.Scope scope,
618 final AsyncExecChain chain,
619 final AsyncExecCallback asyncExecCallback,
620 final HttpCacheEntry entry) {
621 final HttpClientContext context = scope.clientContext;
622 recordCacheHit(target, request);
623 final Date now = getCurrentDate();
624 if (suitabilityChecker.canCachedResponseBeUsed(target, request, entry, now)) {
625 LOG.debug("Cache hit");
626 try {
627 final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
628 triggerResponse(cacheResponse, scope, asyncExecCallback);
629 } catch (final ResourceIOException ex) {
630 recordCacheFailure(target, request);
631 if (!mayCallBackend(request)) {
632 final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
633 triggerResponse(cacheResponse, scope, asyncExecCallback);
634 } else {
635 setResponseStatus(scope.clientContext, CacheResponseStatus.FAILURE);
636 try {
637 chain.proceed(request, entityProducer, scope, asyncExecCallback);
638 } catch (final HttpException | IOException ex2) {
639 asyncExecCallback.failed(ex2);
640 }
641 }
642 }
643 } else if (!mayCallBackend(request)) {
644 LOG.debug("Cache entry not suitable but only-if-cached requested");
645 final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
646 triggerResponse(cacheResponse, scope, asyncExecCallback);
647 } else if (!(entry.getStatus() == HttpStatus.SC_NOT_MODIFIED && !suitabilityChecker.isConditional(request))) {
648 LOG.debug("Revalidating cache entry");
649 if (cacheRevalidator != null
650 && !staleResponseNotAllowed(request, entry, now)
651 && validityPolicy.mayReturnStaleWhileRevalidating(entry, now)) {
652 LOG.debug("Serving stale with asynchronous revalidation");
653 try {
654 final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
655 final String exchangeId = ExecSupport.getNextExchangeId();
656 final AsyncExecChain.Scope fork = new AsyncExecChain.Scope(
657 exchangeId,
658 scope.route,
659 scope.originalRequest,
660 new ComplexFuture<>(null),
661 HttpClientContext.create(),
662 scope.execRuntime.fork());
663 cacheRevalidator.revalidateCacheEntry(
664 responseCache.generateKey(target, request, entry),
665 asyncExecCallback,
666 new DefaultAsyncCacheRevalidator.RevalidationCall() {
667
668 @Override
669 public void execute(final AsyncExecCallback asyncExecCallback) {
670 revalidateCacheEntry(target, request, entityProducer, fork, chain, asyncExecCallback, entry);
671 }
672
673 });
674 triggerResponse(cacheResponse, scope, asyncExecCallback);
675 } catch (final ResourceIOException ex) {
676 asyncExecCallback.failed(ex);
677 }
678 } else {
679 revalidateCacheEntry(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
680 }
681 } else {
682 LOG.debug("Cache entry not usable; calling backend");
683 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
684 }
685 }
686
687 void revalidateCacheEntry(
688 final HttpHost target,
689 final HttpRequest request,
690 final AsyncEntityProducer entityProducer,
691 final AsyncExecChain.Scope scope,
692 final AsyncExecChain chain,
693 final AsyncExecCallback asyncExecCallback,
694 final HttpCacheEntry cacheEntry) {
695 final Date requestDate = getCurrentDate();
696 final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequest(scope.originalRequest, cacheEntry);
697 chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
698
699 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
700
701 void triggerUpdatedCacheEntryResponse(final HttpResponse backendResponse, final Date responseDate) {
702 final CancellableDependency operation = scope.cancellableDependency;
703 recordCacheUpdate(scope.clientContext);
704 operation.setDependency(responseCache.updateCacheEntry(
705 target,
706 request,
707 cacheEntry,
708 backendResponse,
709 requestDate,
710 responseDate,
711 new FutureCallback<HttpCacheEntry>() {
712
713 @Override
714 public void completed(final HttpCacheEntry updatedEntry) {
715 if (suitabilityChecker.isConditional(request)
716 && suitabilityChecker.allConditionalsMatch(request, updatedEntry, new Date())) {
717 final SimpleHttpResponse cacheResponse = responseGenerator.generateNotModifiedResponse(updatedEntry);
718 triggerResponse(cacheResponse, scope, asyncExecCallback);
719 } else {
720 try {
721 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, updatedEntry);
722 triggerResponse(cacheResponse, scope, asyncExecCallback);
723 } catch (final ResourceIOException ex) {
724 asyncExecCallback.failed(ex);
725 }
726 }
727 }
728
729 @Override
730 public void failed(final Exception ex) {
731 asyncExecCallback.failed(ex);
732 }
733
734 @Override
735 public void cancelled() {
736 asyncExecCallback.failed(new InterruptedIOException());
737 }
738
739 }));
740 }
741
742 void triggerResponseStaleCacheEntry() {
743 try {
744 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, cacheEntry);
745 cacheResponse.addHeader(HeaderConstants.WARNING, "110 localhost \"Response is stale\"");
746 triggerResponse(cacheResponse, scope, asyncExecCallback);
747 } catch (final ResourceIOException ex) {
748 asyncExecCallback.failed(ex);
749 }
750 }
751
752 AsyncExecCallback evaluateResponse(final HttpResponse backendResponse, final Date responseDate) {
753 backendResponse.addHeader(HeaderConstants.VIA, generateViaHeader(backendResponse));
754
755 final int statusCode = backendResponse.getCode();
756 if (statusCode == HttpStatus.SC_NOT_MODIFIED || statusCode == HttpStatus.SC_OK) {
757 recordCacheUpdate(scope.clientContext);
758 }
759 if (statusCode == HttpStatus.SC_NOT_MODIFIED) {
760 return new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
761
762 @Override
763 public void run() {
764 triggerUpdatedCacheEntryResponse(backendResponse, responseDate);
765 }
766
767 });
768 }
769 if (staleIfErrorAppliesTo(statusCode)
770 && !staleResponseNotAllowed(request, cacheEntry, getCurrentDate())
771 && validityPolicy.mayReturnStaleIfError(request, cacheEntry, responseDate)) {
772 return new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
773
774 @Override
775 public void run() {
776 triggerResponseStaleCacheEntry();
777 }
778
779 });
780 }
781 return new BackendResponseHandler(target, conditionalRequest, requestDate, responseDate, scope, asyncExecCallback);
782 }
783
784 @Override
785 public AsyncDataConsumer handleResponse(
786 final HttpResponse backendResponse1,
787 final EntityDetails entityDetails) throws HttpException, IOException {
788
789 final Date responseDate1 = getCurrentDate();
790
791 final AsyncExecCallback callback1;
792 if (revalidationResponseIsTooOld(backendResponse1, cacheEntry)
793 && (entityProducer == null || entityProducer.isRepeatable())) {
794
795 final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(
796 scope.originalRequest);
797
798 callback1 = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
799
800 @Override
801 public void run() {
802 chainProceed(unconditional, entityProducer, scope, chain, new AsyncExecCallback() {
803
804 @Override
805 public AsyncDataConsumer handleResponse(
806 final HttpResponse backendResponse2,
807 final EntityDetails entityDetails) throws HttpException, IOException {
808 final Date responseDate2 = getCurrentDate();
809 final AsyncExecCallback callback2 = evaluateResponse(backendResponse2, responseDate2);
810 callbackRef.set(callback2);
811 return callback2.handleResponse(backendResponse2, entityDetails);
812 }
813
814 @Override
815 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
816 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
817 if (callback2 != null) {
818 callback2.handleInformationResponse(response);
819 } else {
820 asyncExecCallback.handleInformationResponse(response);
821 }
822 }
823
824 @Override
825 public void completed() {
826 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
827 if (callback2 != null) {
828 callback2.completed();
829 } else {
830 asyncExecCallback.completed();
831 }
832 }
833
834 @Override
835 public void failed(final Exception cause) {
836 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
837 if (callback2 != null) {
838 callback2.failed(cause);
839 } else {
840 asyncExecCallback.failed(cause);
841 }
842 }
843
844 });
845
846 }
847
848 });
849 } else {
850 callback1 = evaluateResponse(backendResponse1, responseDate1);
851 }
852 callbackRef.set(callback1);
853 return callback1.handleResponse(backendResponse1, entityDetails);
854 }
855
856 @Override
857 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
858 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
859 if (callback1 != null) {
860 callback1.handleInformationResponse(response);
861 } else {
862 asyncExecCallback.handleInformationResponse(response);
863 }
864 }
865
866 @Override
867 public void completed() {
868 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
869 if (callback1 != null) {
870 callback1.completed();
871 } else {
872 asyncExecCallback.completed();
873 }
874 }
875
876 @Override
877 public void failed(final Exception cause) {
878 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
879 if (callback1 != null) {
880 callback1.failed(cause);
881 } else {
882 asyncExecCallback.failed(cause);
883 }
884 }
885
886 });
887
888 }
889
890 private void handleCacheMiss(
891 final HttpHost target,
892 final HttpRequest request,
893 final AsyncEntityProducer entityProducer,
894 final AsyncExecChain.Scope scope,
895 final AsyncExecChain chain,
896 final AsyncExecCallback asyncExecCallback) {
897 recordCacheMiss(target, request);
898
899 if (mayCallBackend(request)) {
900 final CancellableDependency operation = scope.cancellableDependency;
901 operation.setDependency(responseCache.getVariantCacheEntriesWithEtags(
902 target,
903 request,
904 new FutureCallback<Map<String, Variant>>() {
905
906 @Override
907 public void completed(final Map<String, Variant> variants) {
908 if (variants != null && !variants.isEmpty() && (entityProducer == null || entityProducer.isRepeatable())) {
909 negotiateResponseFromVariants(target, request, entityProducer, scope, chain, asyncExecCallback, variants);
910 } else {
911 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
912 }
913 }
914
915 @Override
916 public void failed(final Exception ex) {
917 asyncExecCallback.failed(ex);
918 }
919
920 @Override
921 public void cancelled() {
922 asyncExecCallback.failed(new InterruptedIOException());
923 }
924
925 }));
926 } else {
927 final SimpleHttpResponse cacheResponse = SimpleHttpResponse.create(HttpStatus.SC_GATEWAY_TIMEOUT, "Gateway Timeout");
928 triggerResponse(cacheResponse, scope, asyncExecCallback);
929 }
930 }
931
932 void negotiateResponseFromVariants(
933 final HttpHost target,
934 final HttpRequest request,
935 final AsyncEntityProducer entityProducer,
936 final AsyncExecChain.Scope scope,
937 final AsyncExecChain chain,
938 final AsyncExecCallback asyncExecCallback,
939 final Map<String, Variant> variants) {
940 final CancellableDependency operation = scope.cancellableDependency;
941 final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequestFromVariants(request, variants);
942
943 final Date requestDate = getCurrentDate();
944 chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
945
946 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
947
948 void updateVariantCacheEntry(final HttpResponse backendResponse, final Date responseDate, final Variant matchingVariant) {
949 recordCacheUpdate(scope.clientContext);
950 operation.setDependency(responseCache.updateVariantCacheEntry(
951 target,
952 conditionalRequest,
953 backendResponse,
954 matchingVariant,
955 requestDate,
956 responseDate,
957 new FutureCallback<HttpCacheEntry>() {
958
959 @Override
960 public void completed(final HttpCacheEntry responseEntry) {
961 if (shouldSendNotModifiedResponse(request, responseEntry)) {
962 final SimpleHttpResponse cacheResponse = responseGenerator.generateNotModifiedResponse(responseEntry);
963 triggerResponse(cacheResponse, scope, asyncExecCallback);
964 } else {
965 try {
966 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, responseEntry);
967 operation.setDependency(responseCache.reuseVariantEntryFor(
968 target,
969 request,
970 matchingVariant,
971 new FutureCallback<Boolean>() {
972
973 @Override
974 public void completed(final Boolean result) {
975 triggerResponse(cacheResponse, scope, asyncExecCallback);
976 }
977
978 @Override
979 public void failed(final Exception ex) {
980 asyncExecCallback.failed(ex);
981 }
982
983 @Override
984 public void cancelled() {
985 asyncExecCallback.failed(new InterruptedIOException());
986 }
987
988 }));
989 } catch (final ResourceIOException ex) {
990 asyncExecCallback.failed(ex);
991 }
992 }
993 }
994
995 @Override
996 public void failed(final Exception ex) {
997 asyncExecCallback.failed(ex);
998 }
999
1000 @Override
1001 public void cancelled() {
1002 asyncExecCallback.failed(new InterruptedIOException());
1003 }
1004
1005 }));
1006 }
1007
1008 @Override
1009 public AsyncDataConsumer handleResponse(
1010 final HttpResponse backendResponse,
1011 final EntityDetails entityDetails) throws HttpException, IOException {
1012 final Date responseDate = getCurrentDate();
1013 backendResponse.addHeader("Via", generateViaHeader(backendResponse));
1014
1015 final AsyncExecCallback callback;
1016
1017 if (backendResponse.getCode() != HttpStatus.SC_NOT_MODIFIED) {
1018 callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
1019 } else {
1020 final Header resultEtagHeader = backendResponse.getFirstHeader(HeaderConstants.ETAG);
1021 if (resultEtagHeader == null) {
1022 LOG.warn("304 response did not contain ETag");
1023 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1024
1025 @Override
1026 public void run() {
1027 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
1028 }
1029
1030 });
1031 } else {
1032 final String resultEtag = resultEtagHeader.getValue();
1033 final Variant matchingVariant = variants.get(resultEtag);
1034 if (matchingVariant == null) {
1035 LOG.debug("304 response did not contain ETag matching one sent in If-None-Match");
1036 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1037
1038 @Override
1039 public void run() {
1040 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
1041 }
1042
1043 });
1044 } else {
1045 if (revalidationResponseIsTooOld(backendResponse, matchingVariant.getEntry())) {
1046 final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(request);
1047 scope.clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, unconditional);
1048 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1049
1050 @Override
1051 public void run() {
1052 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
1053 }
1054
1055 });
1056 } else {
1057 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1058
1059 @Override
1060 public void run() {
1061 updateVariantCacheEntry(backendResponse, responseDate, matchingVariant);
1062 }
1063
1064 });
1065 }
1066 }
1067 }
1068 }
1069 callbackRef.set(callback);
1070 return callback.handleResponse(backendResponse, entityDetails);
1071 }
1072
1073 @Override
1074 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
1075 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1076 if (callback != null) {
1077 callback.handleInformationResponse(response);
1078 } else {
1079 asyncExecCallback.handleInformationResponse(response);
1080 }
1081 }
1082
1083 @Override
1084 public void completed() {
1085 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1086 if (callback != null) {
1087 callback.completed();
1088 } else {
1089 asyncExecCallback.completed();
1090 }
1091 }
1092
1093 @Override
1094 public void failed(final Exception cause) {
1095 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1096 if (callback != null) {
1097 callback.failed(cause);
1098 } else {
1099 asyncExecCallback.failed(cause);
1100 }
1101 }
1102
1103 });
1104
1105 }
1106
1107 }