1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.cache;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.time.Instant;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.hc.client5.http.HttpRoute;
40 import org.apache.hc.client5.http.async.AsyncExecCallback;
41 import org.apache.hc.client5.http.async.AsyncExecChain;
42 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
43 import org.apache.hc.client5.http.async.methods.SimpleBody;
44 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
45 import org.apache.hc.client5.http.cache.CacheResponseStatus;
46 import org.apache.hc.client5.http.cache.HeaderConstants;
47 import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
48 import org.apache.hc.client5.http.cache.HttpCacheEntry;
49 import org.apache.hc.client5.http.cache.ResourceFactory;
50 import org.apache.hc.client5.http.cache.ResourceIOException;
51 import org.apache.hc.client5.http.impl.ExecSupport;
52 import org.apache.hc.client5.http.protocol.HttpClientContext;
53 import org.apache.hc.client5.http.schedule.SchedulingStrategy;
54 import org.apache.hc.core5.annotation.Contract;
55 import org.apache.hc.core5.annotation.ThreadingBehavior;
56 import org.apache.hc.core5.concurrent.CancellableDependency;
57 import org.apache.hc.core5.concurrent.ComplexFuture;
58 import org.apache.hc.core5.concurrent.FutureCallback;
59 import org.apache.hc.core5.http.ContentType;
60 import org.apache.hc.core5.http.EntityDetails;
61 import org.apache.hc.core5.http.Header;
62 import org.apache.hc.core5.http.HttpException;
63 import org.apache.hc.core5.http.HttpHeaders;
64 import org.apache.hc.core5.http.HttpHost;
65 import org.apache.hc.core5.http.HttpRequest;
66 import org.apache.hc.core5.http.HttpResponse;
67 import org.apache.hc.core5.http.HttpStatus;
68 import org.apache.hc.core5.http.impl.BasicEntityDetails;
69 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
70 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
71 import org.apache.hc.core5.http.nio.CapacityChannel;
72 import org.apache.hc.core5.http.protocol.HttpCoreContext;
73 import org.apache.hc.core5.http.support.BasicRequestBuilder;
74 import org.apache.hc.core5.net.URIAuthority;
75 import org.apache.hc.core5.util.Args;
76 import org.apache.hc.core5.util.ByteArrayBuffer;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80
81
82
83
84
85
86
87
88
89
90
91 @Contract(threading = ThreadingBehavior.SAFE)
92 class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler {
93
94 private static final Logger LOG = LoggerFactory.getLogger(AsyncCachingExec.class);
95 private final HttpAsyncCache responseCache;
96 private final DefaultAsyncCacheRevalidator cacheRevalidator;
97 private final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder;
98
99 AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) {
100 super(config);
101 this.responseCache = Args.notNull(cache, "Response cache");
102 this.cacheRevalidator = cacheRevalidator;
103 this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(request ->
104 BasicRequestBuilder.copy(request).build());
105 }
106
107 AsyncCachingExec(
108 final HttpAsyncCache responseCache,
109 final CacheValidityPolicy validityPolicy,
110 final ResponseCachingPolicy responseCachingPolicy,
111 final CachedHttpResponseGenerator responseGenerator,
112 final CacheableRequestPolicy cacheableRequestPolicy,
113 final CachedResponseSuitabilityChecker suitabilityChecker,
114 final ResponseProtocolCompliance responseCompliance,
115 final RequestProtocolCompliance requestCompliance,
116 final DefaultAsyncCacheRevalidator cacheRevalidator,
117 final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder,
118 final CacheConfig config) {
119 super(validityPolicy, responseCachingPolicy, responseGenerator, cacheableRequestPolicy,
120 suitabilityChecker, responseCompliance, requestCompliance, config);
121 this.responseCache = responseCache;
122 this.cacheRevalidator = cacheRevalidator;
123 this.conditionalRequestBuilder = conditionalRequestBuilder;
124 }
125
126 AsyncCachingExec(
127 final HttpAsyncCache cache,
128 final ScheduledExecutorService executorService,
129 final SchedulingStrategy schedulingStrategy,
130 final CacheConfig config) {
131 this(cache,
132 executorService != null ? new DefaultAsyncCacheRevalidator(executorService, schedulingStrategy) : null,
133 config);
134 }
135
136 AsyncCachingExec(
137 final ResourceFactory resourceFactory,
138 final HttpAsyncCacheStorage storage,
139 final ScheduledExecutorService executorService,
140 final SchedulingStrategy schedulingStrategy,
141 final CacheConfig config) {
142 this(new BasicHttpAsyncCache(resourceFactory, storage), executorService, schedulingStrategy, config);
143 }
144
145 private void triggerResponse(
146 final SimpleHttpResponse cacheResponse,
147 final AsyncExecChain.Scope scope,
148 final AsyncExecCallback asyncExecCallback) {
149 scope.clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, cacheResponse);
150 scope.execRuntime.releaseEndpoint();
151
152 final SimpleBody body = cacheResponse.getBody();
153 final byte[] content = body != null ? body.getBodyBytes() : null;
154 final ContentType contentType = body != null ? body.getContentType() : null;
155 try {
156 final AsyncDataConsumer dataConsumer = asyncExecCallback.handleResponse(
157 cacheResponse,
158 content != null ? new BasicEntityDetails(content.length, contentType) : null);
159 if (dataConsumer != null) {
160 if (content != null) {
161 dataConsumer.consume(ByteBuffer.wrap(content));
162 }
163 dataConsumer.streamEnd(null);
164 }
165 asyncExecCallback.completed();
166 } catch (final HttpException | IOException ex) {
167 asyncExecCallback.failed(ex);
168 }
169 }
170
171 static class AsyncExecCallbackWrapper implements AsyncExecCallback {
172
173 private final AsyncExecCallback asyncExecCallback;
174 private final Runnable command;
175
176 AsyncExecCallbackWrapper(final AsyncExecCallback asyncExecCallback, final Runnable command) {
177 this.asyncExecCallback = asyncExecCallback;
178 this.command = command;
179 }
180
181 @Override
182 public AsyncDataConsumer handleResponse(
183 final HttpResponse response,
184 final EntityDetails entityDetails) throws HttpException, IOException {
185 return null;
186 }
187
188 @Override
189 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
190 }
191
192 @Override
193 public void completed() {
194 command.run();
195 }
196
197 @Override
198 public void failed(final Exception cause) {
199 asyncExecCallback.failed(cause);
200 }
201
202 }
203
204 @Override
205 public void execute(
206 final HttpRequest request,
207 final AsyncEntityProducer entityProducer,
208 final AsyncExecChain.Scope scope,
209 final AsyncExecChain chain,
210 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
211 Args.notNull(request, "HTTP request");
212 Args.notNull(scope, "Scope");
213
214 final HttpRoute route = scope.route;
215 final CancellableDependency operation = scope.cancellableDependency;
216 final HttpClientContext context = scope.clientContext;
217 context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
218 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
219
220 final URIAuthority authority = request.getAuthority();
221 final String scheme = request.getScheme();
222 final HttpHost target = authority != null ? new HttpHost(scheme, authority) : route.getTargetHost();
223 final String via = generateViaHeader(request);
224
225
226 setResponseStatus(context, CacheResponseStatus.CACHE_MISS);
227
228 if (clientRequestsOurOptions(request)) {
229 setResponseStatus(context, CacheResponseStatus.CACHE_MODULE_RESPONSE);
230 triggerResponse(SimpleHttpResponse.create(HttpStatus.SC_NOT_IMPLEMENTED), scope, asyncExecCallback);
231 return;
232 }
233
234 final SimpleHttpResponse fatalErrorResponse = getFatallyNonCompliantResponse(request, context);
235 if (fatalErrorResponse != null) {
236 triggerResponse(fatalErrorResponse, scope, asyncExecCallback);
237 return;
238 }
239
240 requestCompliance.makeRequestCompliant(request);
241 request.addHeader("Via",via);
242
243 if (!cacheableRequestPolicy.isServableFromCache(request)) {
244 LOG.debug("Request is not servable from cache");
245 operation.setDependency(responseCache.flushCacheEntriesInvalidatedByRequest(target, request, new FutureCallback<Boolean>() {
246
247 @Override
248 public void completed(final Boolean result) {
249 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
250 }
251
252 @Override
253 public void failed(final Exception cause) {
254 asyncExecCallback.failed(cause);
255 }
256
257 @Override
258 public void cancelled() {
259 asyncExecCallback.failed(new InterruptedIOException());
260 }
261
262 }));
263 } else {
264 operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
265
266 @Override
267 public void completed(final HttpCacheEntry entry) {
268 if (entry == null) {
269 LOG.debug("Cache miss");
270 handleCacheMiss(target, request, entityProducer, scope, chain, asyncExecCallback);
271 } else {
272 handleCacheHit(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
273 }
274 }
275
276 @Override
277 public void failed(final Exception cause) {
278 asyncExecCallback.failed(cause);
279 }
280
281 @Override
282 public void cancelled() {
283 asyncExecCallback.failed(new InterruptedIOException());
284 }
285
286 }));
287
288 }
289 }
290
291 void chainProceed(
292 final HttpRequest request,
293 final AsyncEntityProducer entityProducer,
294 final AsyncExecChain.Scope scope,
295 final AsyncExecChain chain,
296 final AsyncExecCallback asyncExecCallback) {
297 try {
298 chain.proceed(request, entityProducer, scope, asyncExecCallback);
299 } catch (final HttpException | IOException ex) {
300 asyncExecCallback.failed(ex);
301 }
302 }
303
304 void callBackend(
305 final HttpHost target,
306 final HttpRequest request,
307 final AsyncEntityProducer entityProducer,
308 final AsyncExecChain.Scope scope,
309 final AsyncExecChain chain,
310 final AsyncExecCallback asyncExecCallback) {
311 LOG.debug("Calling the backend");
312 final Instant requestDate = getCurrentDate();
313 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
314 chainProceed(request, entityProducer, scope, chain, new AsyncExecCallback() {
315
316 @Override
317 public AsyncDataConsumer handleResponse(
318 final HttpResponse backendResponse,
319 final EntityDetails entityDetails) throws HttpException, IOException {
320 final Instant responseDate = getCurrentDate();
321 backendResponse.addHeader("Via", generateViaHeader(backendResponse));
322
323 final AsyncExecCallback callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
324 callbackRef.set(callback);
325 return callback.handleResponse(backendResponse, entityDetails);
326 }
327
328 @Override
329 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
330 final AsyncExecCallback callback = callbackRef.getAndSet(null);
331 if (callback != null) {
332 callback.handleInformationResponse(response);
333 } else {
334 asyncExecCallback.handleInformationResponse(response);
335 }
336 }
337
338 @Override
339 public void completed() {
340 final AsyncExecCallback callback = callbackRef.getAndSet(null);
341 if (callback != null) {
342 callback.completed();
343 } else {
344 asyncExecCallback.completed();
345 }
346 }
347
348 @Override
349 public void failed(final Exception cause) {
350 final AsyncExecCallback callback = callbackRef.getAndSet(null);
351 if (callback != null) {
352 callback.failed(cause);
353 } else {
354 asyncExecCallback.failed(cause);
355 }
356 }
357
358 });
359 }
360
361 class CachingAsyncDataConsumer implements AsyncDataConsumer {
362
363 private final AsyncExecCallback fallback;
364 private final HttpResponse backendResponse;
365 private final EntityDetails entityDetails;
366 private final AtomicBoolean writtenThrough;
367 private final AtomicReference<ByteArrayBuffer> bufferRef;
368 private final AtomicReference<AsyncDataConsumer> dataConsumerRef;
369
370 CachingAsyncDataConsumer(
371 final AsyncExecCallback fallback,
372 final HttpResponse backendResponse,
373 final EntityDetails entityDetails) {
374 this.fallback = fallback;
375 this.backendResponse = backendResponse;
376 this.entityDetails = entityDetails;
377 this.writtenThrough = new AtomicBoolean(false);
378 this.bufferRef = new AtomicReference<>(entityDetails != null ? new ByteArrayBuffer(1024) : null);
379 this.dataConsumerRef = new AtomicReference<>();
380 }
381
382 @Override
383 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
384 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
385 if (dataConsumer != null) {
386 dataConsumer.updateCapacity(capacityChannel);
387 } else {
388 capacityChannel.update(Integer.MAX_VALUE);
389 }
390 }
391
392 @Override
393 public final void consume(final ByteBuffer src) throws IOException {
394 final ByteArrayBuffer buffer = bufferRef.get();
395 if (buffer != null) {
396 if (src.hasArray()) {
397 buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
398 } else {
399 while (src.hasRemaining()) {
400 buffer.append(src.get());
401 }
402 }
403 if (buffer.length() > cacheConfig.getMaxObjectSize()) {
404 LOG.debug("Backend response content length exceeds maximum");
405
406
407 bufferRef.set(null);
408 try {
409 final AsyncDataConsumer dataConsumer = fallback.handleResponse(backendResponse, entityDetails);
410 if (dataConsumer != null) {
411 dataConsumerRef.set(dataConsumer);
412 writtenThrough.set(true);
413 dataConsumer.consume(ByteBuffer.wrap(buffer.array(), 0, buffer.length()));
414 }
415 } catch (final HttpException ex) {
416 fallback.failed(ex);
417 }
418 }
419 } else {
420 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
421 if (dataConsumer != null) {
422 dataConsumer.consume(src);
423 }
424 }
425 }
426
427 @Override
428 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
429 final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
430 if (dataConsumer != null) {
431 dataConsumer.streamEnd(trailers);
432 }
433 }
434
435 @Override
436 public void releaseResources() {
437 final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
438 if (dataConsumer != null) {
439 dataConsumer.releaseResources();
440 }
441 }
442
443 }
444
445 class BackendResponseHandler implements AsyncExecCallback {
446
447 private final HttpHost target;
448 private final HttpRequest request;
449 private final Instant requestDate;
450 private final Instant responseDate;
451 private final AsyncExecChain.Scope scope;
452 private final AsyncExecCallback asyncExecCallback;
453 private final AtomicReference<CachingAsyncDataConsumer> cachingConsumerRef;
454
455 BackendResponseHandler(
456 final HttpHost target,
457 final HttpRequest request,
458 final Instant requestDate,
459 final Instant responseDate,
460 final AsyncExecChain.Scope scope,
461 final AsyncExecCallback asyncExecCallback) {
462 this.target = target;
463 this.request = request;
464 this.requestDate = requestDate;
465 this.responseDate = responseDate;
466 this.scope = scope;
467 this.asyncExecCallback = asyncExecCallback;
468 this.cachingConsumerRef = new AtomicReference<>();
469 }
470
471 @Override
472 public AsyncDataConsumer handleResponse(
473 final HttpResponse backendResponse,
474 final EntityDetails entityDetails) throws HttpException, IOException {
475 responseCompliance.ensureProtocolCompliance(scope.originalRequest, request, backendResponse);
476 responseCache.flushCacheEntriesInvalidatedByExchange(target, request, backendResponse, new FutureCallback<Boolean>() {
477
478 @Override
479 public void completed(final Boolean result) {
480 }
481
482 @Override
483 public void failed(final Exception ex) {
484 LOG.warn("Unable to flush invalidated entries from cache", ex);
485 }
486
487 @Override
488 public void cancelled() {
489 }
490
491 });
492 final boolean cacheable = responseCachingPolicy.isResponseCacheable(request, backendResponse);
493 if (cacheable) {
494 cachingConsumerRef.set(new CachingAsyncDataConsumer(asyncExecCallback, backendResponse, entityDetails));
495 storeRequestIfModifiedSinceFor304Response(request, backendResponse);
496 } else {
497 LOG.debug("Backend response is not cacheable");
498 responseCache.flushCacheEntriesFor(target, request, new FutureCallback<Boolean>() {
499
500 @Override
501 public void completed(final Boolean result) {
502 }
503
504 @Override
505 public void failed(final Exception ex) {
506 LOG.warn("Unable to flush invalidated entries from cache", ex);
507 }
508
509 @Override
510 public void cancelled() {
511 }
512
513 });
514 }
515 final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.get();
516 if (cachingDataConsumer != null) {
517 LOG.debug("Caching backend response");
518 return cachingDataConsumer;
519 }
520 return asyncExecCallback.handleResponse(backendResponse, entityDetails);
521 }
522
523 @Override
524 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
525 asyncExecCallback.handleInformationResponse(response);
526 }
527
528 void triggerNewCacheEntryResponse(final HttpResponse backendResponse, final Instant responseDate, final ByteArrayBuffer buffer) {
529 final CancellableDependency operation = scope.cancellableDependency;
530 operation.setDependency(responseCache.createCacheEntry(
531 target,
532 request,
533 backendResponse,
534 buffer,
535 requestDate,
536 responseDate,
537 new FutureCallback<HttpCacheEntry>() {
538
539 @Override
540 public void completed(final HttpCacheEntry newEntry) {
541 LOG.debug("Backend response successfully cached");
542 try {
543 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, newEntry);
544 triggerResponse(cacheResponse, scope, asyncExecCallback);
545 } catch (final ResourceIOException ex) {
546 asyncExecCallback.failed(ex);
547 }
548 }
549
550 @Override
551 public void failed(final Exception ex) {
552 asyncExecCallback.failed(ex);
553 }
554
555 @Override
556 public void cancelled() {
557 asyncExecCallback.failed(new InterruptedIOException());
558 }
559
560 }));
561
562 }
563
564 @Override
565 public void completed() {
566 final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.getAndSet(null);
567 if (cachingDataConsumer != null && !cachingDataConsumer.writtenThrough.get()) {
568 final ByteArrayBuffer buffer = cachingDataConsumer.bufferRef.getAndSet(null);
569 final HttpResponse backendResponse = cachingDataConsumer.backendResponse;
570 if (cacheConfig.isFreshnessCheckEnabled()) {
571 final CancellableDependency operation = scope.cancellableDependency;
572 operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
573
574 @Override
575 public void completed(final HttpCacheEntry existingEntry) {
576 if (DateSupport.isAfter(existingEntry, backendResponse, HttpHeaders.DATE)) {
577 LOG.debug("Backend already contains fresher cache entry");
578 try {
579 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, existingEntry);
580 triggerResponse(cacheResponse, scope, asyncExecCallback);
581 } catch (final ResourceIOException ex) {
582 asyncExecCallback.failed(ex);
583 }
584 } else {
585 triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
586 }
587 }
588
589 @Override
590 public void failed(final Exception cause) {
591 asyncExecCallback.failed(cause);
592 }
593
594 @Override
595 public void cancelled() {
596 asyncExecCallback.failed(new InterruptedIOException());
597 }
598
599 }));
600 } else {
601 triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
602 }
603 } else {
604 asyncExecCallback.completed();
605 }
606 }
607
608 @Override
609 public void failed(final Exception cause) {
610 asyncExecCallback.failed(cause);
611 }
612
613 }
614
615 private void handleCacheHit(
616 final HttpHost target,
617 final HttpRequest request,
618 final AsyncEntityProducer entityProducer,
619 final AsyncExecChain.Scope scope,
620 final AsyncExecChain chain,
621 final AsyncExecCallback asyncExecCallback,
622 final HttpCacheEntry entry) {
623 final HttpClientContext context = scope.clientContext;
624 recordCacheHit(target, request);
625 final Instant now = getCurrentDate();
626 if (suitabilityChecker.canCachedResponseBeUsed(target, request, entry, now)) {
627 LOG.debug("Cache hit");
628 try {
629 final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
630 triggerResponse(cacheResponse, scope, asyncExecCallback);
631 } catch (final ResourceIOException ex) {
632 recordCacheFailure(target, request);
633 if (!mayCallBackend(request)) {
634 final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
635 triggerResponse(cacheResponse, scope, asyncExecCallback);
636 } else {
637 setResponseStatus(scope.clientContext, CacheResponseStatus.FAILURE);
638 try {
639 chain.proceed(request, entityProducer, scope, asyncExecCallback);
640 } catch (final HttpException | IOException ex2) {
641 asyncExecCallback.failed(ex2);
642 }
643 }
644 }
645 } else if (!mayCallBackend(request)) {
646 LOG.debug("Cache entry not suitable but only-if-cached requested");
647 final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
648 triggerResponse(cacheResponse, scope, asyncExecCallback);
649 } else if (!(entry.getStatus() == HttpStatus.SC_NOT_MODIFIED && !suitabilityChecker.isConditional(request))) {
650 LOG.debug("Revalidating cache entry");
651 if (cacheRevalidator != null
652 && !staleResponseNotAllowed(request, entry, now)
653 && validityPolicy.mayReturnStaleWhileRevalidating(entry, now)) {
654 LOG.debug("Serving stale with asynchronous revalidation");
655 try {
656 final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
657 final String exchangeId = ExecSupport.getNextExchangeId();
658 context.setExchangeId(exchangeId);
659 final AsyncExecChain.Scope fork = new AsyncExecChain.Scope(
660 exchangeId,
661 scope.route,
662 scope.originalRequest,
663 new ComplexFuture<>(null),
664 HttpClientContext.create(),
665 scope.execRuntime.fork(),
666 scope.scheduler,
667 scope.execCount);
668 cacheRevalidator.revalidateCacheEntry(
669 responseCache.generateKey(target, request, entry),
670 asyncExecCallback,
671 asyncExecCallback1 -> revalidateCacheEntry(target, request, entityProducer, fork, chain, asyncExecCallback1, entry));
672 triggerResponse(cacheResponse, scope, asyncExecCallback);
673 } catch (final ResourceIOException ex) {
674 asyncExecCallback.failed(ex);
675 }
676 } else {
677 revalidateCacheEntry(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
678 }
679 } else {
680 LOG.debug("Cache entry not usable; calling backend");
681 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
682 }
683 }
684
685 void revalidateCacheEntry(
686 final HttpHost target,
687 final HttpRequest request,
688 final AsyncEntityProducer entityProducer,
689 final AsyncExecChain.Scope scope,
690 final AsyncExecChain chain,
691 final AsyncExecCallback asyncExecCallback,
692 final HttpCacheEntry cacheEntry) {
693 final Instant requestDate = getCurrentDate();
694 final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequest(
695 BasicRequestBuilder.copy(scope.originalRequest).build(),
696 cacheEntry);
697 chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
698
699 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
700
701 void triggerUpdatedCacheEntryResponse(final HttpResponse backendResponse, final Instant responseDate) {
702 final CancellableDependency operation = scope.cancellableDependency;
703 recordCacheUpdate(scope.clientContext);
704 operation.setDependency(responseCache.updateCacheEntry(
705 target,
706 request,
707 cacheEntry,
708 backendResponse,
709 requestDate,
710 responseDate,
711 new FutureCallback<HttpCacheEntry>() {
712
713 @Override
714 public void completed(final HttpCacheEntry updatedEntry) {
715 if (suitabilityChecker.isConditional(request)
716 && suitabilityChecker.allConditionalsMatch(request, updatedEntry, Instant.now())) {
717 final SimpleHttpResponse cacheResponse = responseGenerator.generateNotModifiedResponse(updatedEntry);
718 triggerResponse(cacheResponse, scope, asyncExecCallback);
719 } else {
720 try {
721 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, updatedEntry);
722 triggerResponse(cacheResponse, scope, asyncExecCallback);
723 } catch (final ResourceIOException ex) {
724 asyncExecCallback.failed(ex);
725 }
726 }
727 }
728
729 @Override
730 public void failed(final Exception ex) {
731 asyncExecCallback.failed(ex);
732 }
733
734 @Override
735 public void cancelled() {
736 asyncExecCallback.failed(new InterruptedIOException());
737 }
738
739 }));
740 }
741
742 void triggerResponseStaleCacheEntry() {
743 try {
744 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, cacheEntry);
745 cacheResponse.addHeader(HeaderConstants.WARNING, "110 localhost \"Response is stale\"");
746 triggerResponse(cacheResponse, scope, asyncExecCallback);
747 } catch (final ResourceIOException ex) {
748 asyncExecCallback.failed(ex);
749 }
750 }
751
752 AsyncExecCallback evaluateResponse(final HttpResponse backendResponse, final Instant responseDate) {
753 backendResponse.addHeader(HeaderConstants.VIA, generateViaHeader(backendResponse));
754
755 final int statusCode = backendResponse.getCode();
756 if (statusCode == HttpStatus.SC_NOT_MODIFIED || statusCode == HttpStatus.SC_OK) {
757 recordCacheUpdate(scope.clientContext);
758 }
759 if (statusCode == HttpStatus.SC_NOT_MODIFIED) {
760 return new AsyncExecCallbackWrapper(asyncExecCallback, () -> triggerUpdatedCacheEntryResponse(backendResponse, responseDate));
761 }
762 if (staleIfErrorAppliesTo(statusCode)
763 && !staleResponseNotAllowed(request, cacheEntry, getCurrentDate())
764 && validityPolicy.mayReturnStaleIfError(request, cacheEntry, responseDate)) {
765 return new AsyncExecCallbackWrapper(asyncExecCallback, this::triggerResponseStaleCacheEntry);
766 }
767 return new BackendResponseHandler(target, conditionalRequest, requestDate, responseDate, scope, asyncExecCallback);
768 }
769
770 @Override
771 public AsyncDataConsumer handleResponse(
772 final HttpResponse backendResponse1,
773 final EntityDetails entityDetails) throws HttpException, IOException {
774
775 final Instant responseDate1 = getCurrentDate();
776
777 final AsyncExecCallback callback1;
778 if (revalidationResponseIsTooOld(backendResponse1, cacheEntry)
779 && (entityProducer == null || entityProducer.isRepeatable())) {
780
781 final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(
782 BasicRequestBuilder.copy(scope.originalRequest).build());
783
784 callback1 = new AsyncExecCallbackWrapper(asyncExecCallback, () -> chainProceed(unconditional, entityProducer, scope, chain, new AsyncExecCallback() {
785
786 @Override
787 public AsyncDataConsumer handleResponse(
788 final HttpResponse backendResponse2,
789 final EntityDetails entityDetails1) throws HttpException, IOException {
790 final Instant responseDate2 = getCurrentDate();
791 final AsyncExecCallback callback2 = evaluateResponse(backendResponse2, responseDate2);
792 callbackRef.set(callback2);
793 return callback2.handleResponse(backendResponse2, entityDetails1);
794 }
795
796 @Override
797 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
798 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
799 if (callback2 != null) {
800 callback2.handleInformationResponse(response);
801 } else {
802 asyncExecCallback.handleInformationResponse(response);
803 }
804 }
805
806 @Override
807 public void completed() {
808 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
809 if (callback2 != null) {
810 callback2.completed();
811 } else {
812 asyncExecCallback.completed();
813 }
814 }
815
816 @Override
817 public void failed(final Exception cause) {
818 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
819 if (callback2 != null) {
820 callback2.failed(cause);
821 } else {
822 asyncExecCallback.failed(cause);
823 }
824 }
825
826 }));
827 } else {
828 callback1 = evaluateResponse(backendResponse1, responseDate1);
829 }
830 callbackRef.set(callback1);
831 return callback1.handleResponse(backendResponse1, entityDetails);
832 }
833
834 @Override
835 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
836 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
837 if (callback1 != null) {
838 callback1.handleInformationResponse(response);
839 } else {
840 asyncExecCallback.handleInformationResponse(response);
841 }
842 }
843
844 @Override
845 public void completed() {
846 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
847 if (callback1 != null) {
848 callback1.completed();
849 } else {
850 asyncExecCallback.completed();
851 }
852 }
853
854 @Override
855 public void failed(final Exception cause) {
856 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
857 if (callback1 != null) {
858 callback1.failed(cause);
859 } else {
860 asyncExecCallback.failed(cause);
861 }
862 }
863
864 });
865
866 }
867
868 private void handleCacheMiss(
869 final HttpHost target,
870 final HttpRequest request,
871 final AsyncEntityProducer entityProducer,
872 final AsyncExecChain.Scope scope,
873 final AsyncExecChain chain,
874 final AsyncExecCallback asyncExecCallback) {
875 recordCacheMiss(target, request);
876
877 if (mayCallBackend(request)) {
878 final CancellableDependency operation = scope.cancellableDependency;
879 operation.setDependency(responseCache.getVariantCacheEntriesWithEtags(
880 target,
881 request,
882 new FutureCallback<Map<String, Variant>>() {
883
884 @Override
885 public void completed(final Map<String, Variant> variants) {
886 if (variants != null && !variants.isEmpty() && (entityProducer == null || entityProducer.isRepeatable())) {
887 negotiateResponseFromVariants(target, request, entityProducer, scope, chain, asyncExecCallback, variants);
888 } else {
889 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
890 }
891 }
892
893 @Override
894 public void failed(final Exception ex) {
895 asyncExecCallback.failed(ex);
896 }
897
898 @Override
899 public void cancelled() {
900 asyncExecCallback.failed(new InterruptedIOException());
901 }
902
903 }));
904 } else {
905 final SimpleHttpResponse cacheResponse = SimpleHttpResponse.create(HttpStatus.SC_GATEWAY_TIMEOUT, "Gateway Timeout");
906 triggerResponse(cacheResponse, scope, asyncExecCallback);
907 }
908 }
909
910 void negotiateResponseFromVariants(
911 final HttpHost target,
912 final HttpRequest request,
913 final AsyncEntityProducer entityProducer,
914 final AsyncExecChain.Scope scope,
915 final AsyncExecChain chain,
916 final AsyncExecCallback asyncExecCallback,
917 final Map<String, Variant> variants) {
918 final CancellableDependency operation = scope.cancellableDependency;
919 final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequestFromVariants(
920 BasicRequestBuilder.copy(request).build(),
921 variants);
922
923 final Instant requestDate = getCurrentDate();
924 chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
925
926 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
927
928 void updateVariantCacheEntry(final HttpResponse backendResponse, final Instant responseDate, final Variant matchingVariant) {
929 recordCacheUpdate(scope.clientContext);
930 operation.setDependency(responseCache.updateVariantCacheEntry(
931 target,
932 conditionalRequest,
933 backendResponse,
934 matchingVariant,
935 requestDate,
936 responseDate,
937 new FutureCallback<HttpCacheEntry>() {
938
939 @Override
940 public void completed(final HttpCacheEntry responseEntry) {
941 if (shouldSendNotModifiedResponse(request, responseEntry)) {
942 final SimpleHttpResponse cacheResponse = responseGenerator.generateNotModifiedResponse(responseEntry);
943 triggerResponse(cacheResponse, scope, asyncExecCallback);
944 } else {
945 try {
946 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, responseEntry);
947 operation.setDependency(responseCache.reuseVariantEntryFor(
948 target,
949 request,
950 matchingVariant,
951 new FutureCallback<Boolean>() {
952
953 @Override
954 public void completed(final Boolean result) {
955 triggerResponse(cacheResponse, scope, asyncExecCallback);
956 }
957
958 @Override
959 public void failed(final Exception ex) {
960 asyncExecCallback.failed(ex);
961 }
962
963 @Override
964 public void cancelled() {
965 asyncExecCallback.failed(new InterruptedIOException());
966 }
967
968 }));
969 } catch (final ResourceIOException ex) {
970 asyncExecCallback.failed(ex);
971 }
972 }
973 }
974
975 @Override
976 public void failed(final Exception ex) {
977 asyncExecCallback.failed(ex);
978 }
979
980 @Override
981 public void cancelled() {
982 asyncExecCallback.failed(new InterruptedIOException());
983 }
984
985 }));
986 }
987
988 @Override
989 public AsyncDataConsumer handleResponse(
990 final HttpResponse backendResponse,
991 final EntityDetails entityDetails) throws HttpException, IOException {
992 final Instant responseDate = getCurrentDate();
993 backendResponse.addHeader("Via", generateViaHeader(backendResponse));
994
995 final AsyncExecCallback callback;
996
997 if (backendResponse.getCode() != HttpStatus.SC_NOT_MODIFIED) {
998 callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
999 } else {
1000 final Header resultEtagHeader = backendResponse.getFirstHeader(HeaderConstants.ETAG);
1001 if (resultEtagHeader == null) {
1002 LOG.warn("304 response did not contain ETag");
1003 callback = new AsyncExecCallbackWrapper(asyncExecCallback, () -> callBackend(target, request, entityProducer, scope, chain, asyncExecCallback));
1004 } else {
1005 final String resultEtag = resultEtagHeader.getValue();
1006 final Variant matchingVariant = variants.get(resultEtag);
1007 if (matchingVariant == null) {
1008 LOG.debug("304 response did not contain ETag matching one sent in If-None-Match");
1009 callback = new AsyncExecCallbackWrapper(asyncExecCallback, () -> callBackend(target, request, entityProducer, scope, chain, asyncExecCallback));
1010 } else {
1011 if (revalidationResponseIsTooOld(backendResponse, matchingVariant.getEntry())) {
1012 final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(
1013 BasicRequestBuilder.copy(request).build());
1014 scope.clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, unconditional);
1015 callback = new AsyncExecCallbackWrapper(asyncExecCallback, () -> callBackend(target, request, entityProducer, scope, chain, asyncExecCallback));
1016 } else {
1017 callback = new AsyncExecCallbackWrapper(asyncExecCallback, () -> updateVariantCacheEntry(backendResponse, responseDate, matchingVariant));
1018 }
1019 }
1020 }
1021 }
1022 callbackRef.set(callback);
1023 return callback.handleResponse(backendResponse, entityDetails);
1024 }
1025
1026 @Override
1027 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
1028 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1029 if (callback != null) {
1030 callback.handleInformationResponse(response);
1031 } else {
1032 asyncExecCallback.handleInformationResponse(response);
1033 }
1034 }
1035
1036 @Override
1037 public void completed() {
1038 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1039 if (callback != null) {
1040 callback.completed();
1041 } else {
1042 asyncExecCallback.completed();
1043 }
1044 }
1045
1046 @Override
1047 public void failed(final Exception cause) {
1048 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1049 if (callback != null) {
1050 callback.failed(cause);
1051 } else {
1052 asyncExecCallback.failed(cause);
1053 }
1054 }
1055
1056 });
1057
1058 }
1059
1060 }