1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.cache;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.Date;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.hc.client5.http.HttpRoute;
40 import org.apache.hc.client5.http.async.AsyncExecCallback;
41 import org.apache.hc.client5.http.async.AsyncExecChain;
42 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
43 import org.apache.hc.client5.http.async.methods.SimpleBody;
44 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
45 import org.apache.hc.client5.http.cache.CacheResponseStatus;
46 import org.apache.hc.client5.http.cache.HeaderConstants;
47 import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
48 import org.apache.hc.client5.http.cache.HttpCacheEntry;
49 import org.apache.hc.client5.http.cache.ResourceFactory;
50 import org.apache.hc.client5.http.cache.ResourceIOException;
51 import org.apache.hc.client5.http.impl.ExecSupport;
52 import org.apache.hc.client5.http.impl.RequestCopier;
53 import org.apache.hc.client5.http.protocol.HttpClientContext;
54 import org.apache.hc.client5.http.schedule.SchedulingStrategy;
55 import org.apache.hc.client5.http.utils.DateUtils;
56 import org.apache.hc.core5.annotation.Contract;
57 import org.apache.hc.core5.annotation.ThreadingBehavior;
58 import org.apache.hc.core5.concurrent.CancellableDependency;
59 import org.apache.hc.core5.concurrent.ComplexFuture;
60 import org.apache.hc.core5.concurrent.FutureCallback;
61 import org.apache.hc.core5.http.ContentType;
62 import org.apache.hc.core5.http.EntityDetails;
63 import org.apache.hc.core5.http.Header;
64 import org.apache.hc.core5.http.HttpException;
65 import org.apache.hc.core5.http.HttpHeaders;
66 import org.apache.hc.core5.http.HttpHost;
67 import org.apache.hc.core5.http.HttpRequest;
68 import org.apache.hc.core5.http.HttpResponse;
69 import org.apache.hc.core5.http.HttpStatus;
70 import org.apache.hc.core5.http.impl.BasicEntityDetails;
71 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
72 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
73 import org.apache.hc.core5.http.nio.CapacityChannel;
74 import org.apache.hc.core5.http.protocol.HttpCoreContext;
75 import org.apache.hc.core5.net.URIAuthority;
76 import org.apache.hc.core5.util.Args;
77 import org.apache.hc.core5.util.ByteArrayBuffer;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81
82
83
84
85
86
87
88
89
90
91
92 @Contract(threading = ThreadingBehavior.SAFE)
93 class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler {
94
95 private static final Logger LOG = LoggerFactory.getLogger(AsyncCachingExec.class);
96 private final HttpAsyncCache responseCache;
97 private final DefaultAsyncCacheRevalidator cacheRevalidator;
98 private final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder;
99
100 AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) {
101 super(config);
102 this.responseCache = Args.notNull(cache, "Response cache");
103 this.cacheRevalidator = cacheRevalidator;
104 this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(RequestCopier.INSTANCE);
105 }
106
107 AsyncCachingExec(
108 final HttpAsyncCache responseCache,
109 final CacheValidityPolicy validityPolicy,
110 final ResponseCachingPolicy responseCachingPolicy,
111 final CachedHttpResponseGenerator responseGenerator,
112 final CacheableRequestPolicy cacheableRequestPolicy,
113 final CachedResponseSuitabilityChecker suitabilityChecker,
114 final ResponseProtocolCompliance responseCompliance,
115 final RequestProtocolCompliance requestCompliance,
116 final DefaultAsyncCacheRevalidator cacheRevalidator,
117 final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder,
118 final CacheConfig config) {
119 super(validityPolicy, responseCachingPolicy, responseGenerator, cacheableRequestPolicy,
120 suitabilityChecker, responseCompliance, requestCompliance, config);
121 this.responseCache = responseCache;
122 this.cacheRevalidator = cacheRevalidator;
123 this.conditionalRequestBuilder = conditionalRequestBuilder;
124 }
125
126 AsyncCachingExec(
127 final HttpAsyncCache cache,
128 final ScheduledExecutorService executorService,
129 final SchedulingStrategy schedulingStrategy,
130 final CacheConfig config) {
131 this(cache,
132 executorService != null ? new DefaultAsyncCacheRevalidator(executorService, schedulingStrategy) : null,
133 config);
134 }
135
136 AsyncCachingExec(
137 final ResourceFactory resourceFactory,
138 final HttpAsyncCacheStorage storage,
139 final ScheduledExecutorService executorService,
140 final SchedulingStrategy schedulingStrategy,
141 final CacheConfig config) {
142 this(new BasicHttpAsyncCache(resourceFactory, storage), executorService, schedulingStrategy, config);
143 }
144
145 private void triggerResponse(
146 final SimpleHttpResponse cacheResponse,
147 final AsyncExecChain.Scope scope,
148 final AsyncExecCallback asyncExecCallback) {
149 scope.clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, cacheResponse);
150 scope.execRuntime.releaseEndpoint();
151
152 final SimpleBody body = cacheResponse.getBody();
153 final byte[] content = body != null ? body.getBodyBytes() : null;
154 final ContentType contentType = body != null ? body.getContentType() : null;
155 try {
156 final AsyncDataConsumer dataConsumer = asyncExecCallback.handleResponse(
157 cacheResponse,
158 content != null ? new BasicEntityDetails(content.length, contentType) : null);
159 if (dataConsumer != null) {
160 if (content != null) {
161 dataConsumer.consume(ByteBuffer.wrap(content));
162 }
163 dataConsumer.streamEnd(null);
164 }
165 asyncExecCallback.completed();
166 } catch (final HttpException | IOException ex) {
167 asyncExecCallback.failed(ex);
168 }
169 }
170
171 static class AsyncExecCallbackWrapper implements AsyncExecCallback {
172
173 private final AsyncExecCallback asyncExecCallback;
174 private final Runnable command;
175
176 AsyncExecCallbackWrapper(final AsyncExecCallback asyncExecCallback, final Runnable command) {
177 this.asyncExecCallback = asyncExecCallback;
178 this.command = command;
179 }
180
181 @Override
182 public AsyncDataConsumer handleResponse(
183 final HttpResponse response,
184 final EntityDetails entityDetails) throws HttpException, IOException {
185 return null;
186 }
187
188 @Override
189 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
190 }
191
192 @Override
193 public void completed() {
194 command.run();
195 }
196
197 @Override
198 public void failed(final Exception cause) {
199 asyncExecCallback.failed(cause);
200 }
201
202 }
203
204 @Override
205 public void execute(
206 final HttpRequest request,
207 final AsyncEntityProducer entityProducer,
208 final AsyncExecChain.Scope scope,
209 final AsyncExecChain chain,
210 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
211 Args.notNull(request, "HTTP request");
212 Args.notNull(scope, "Scope");
213
214 final HttpRoute route = scope.route;
215 final CancellableDependency operation = scope.cancellableDependency;
216 final HttpClientContext context = scope.clientContext;
217 context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
218 context.setAttribute(HttpClientContext.HTTP_REQUEST, request);
219
220 final URIAuthority authority = request.getAuthority();
221 final String scheme = request.getScheme();
222 final HttpHost target = authority != null ? new HttpHost(scheme, authority) : route.getTargetHost();
223 final String via = generateViaHeader(request);
224
225
226 setResponseStatus(context, CacheResponseStatus.CACHE_MISS);
227
228 if (clientRequestsOurOptions(request)) {
229 setResponseStatus(context, CacheResponseStatus.CACHE_MODULE_RESPONSE);
230 triggerResponse(SimpleHttpResponse.create(HttpStatus.SC_NOT_IMPLEMENTED), scope, asyncExecCallback);
231 return;
232 }
233
234 final SimpleHttpResponse fatalErrorResponse = getFatallyNoncompliantResponse(request, context);
235 if (fatalErrorResponse != null) {
236 triggerResponse(fatalErrorResponse, scope, asyncExecCallback);
237 return;
238 }
239
240 requestCompliance.makeRequestCompliant(request);
241 request.addHeader("Via",via);
242
243 if (!cacheableRequestPolicy.isServableFromCache(request)) {
244 LOG.debug("Request is not servable from cache");
245 operation.setDependency(responseCache.flushCacheEntriesInvalidatedByRequest(target, request, new FutureCallback<Boolean>() {
246
247 @Override
248 public void completed(final Boolean result) {
249 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
250 }
251
252 @Override
253 public void failed(final Exception cause) {
254 asyncExecCallback.failed(cause);
255 }
256
257 @Override
258 public void cancelled() {
259 asyncExecCallback.failed(new InterruptedIOException());
260 }
261
262 }));
263 } else {
264 operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
265
266 @Override
267 public void completed(final HttpCacheEntry entry) {
268 if (entry == null) {
269 LOG.debug("Cache miss");
270 handleCacheMiss(target, request, entityProducer, scope, chain, asyncExecCallback);
271 } else {
272 handleCacheHit(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
273 }
274 }
275
276 @Override
277 public void failed(final Exception cause) {
278 asyncExecCallback.failed(cause);
279 }
280
281 @Override
282 public void cancelled() {
283 asyncExecCallback.failed(new InterruptedIOException());
284 }
285
286 }));
287
288 }
289 }
290
291 void chainProceed(
292 final HttpRequest request,
293 final AsyncEntityProducer entityProducer,
294 final AsyncExecChain.Scope scope,
295 final AsyncExecChain chain,
296 final AsyncExecCallback asyncExecCallback) {
297 try {
298 chain.proceed(request, entityProducer, scope, asyncExecCallback);
299 } catch (final HttpException | IOException ex) {
300 asyncExecCallback.failed(ex);
301 }
302 }
303
304 void callBackend(
305 final HttpHost target,
306 final HttpRequest request,
307 final AsyncEntityProducer entityProducer,
308 final AsyncExecChain.Scope scope,
309 final AsyncExecChain chain,
310 final AsyncExecCallback asyncExecCallback) {
311 LOG.debug("Calling the backend");
312 final Date requestDate = getCurrentDate();
313 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
314 chainProceed(request, entityProducer, scope, chain, new AsyncExecCallback() {
315
316 @Override
317 public AsyncDataConsumer handleResponse(
318 final HttpResponse backendResponse,
319 final EntityDetails entityDetails) throws HttpException, IOException {
320 final Date responseDate = getCurrentDate();
321 backendResponse.addHeader("Via", generateViaHeader(backendResponse));
322
323 final AsyncExecCallback callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
324 callbackRef.set(callback);
325 return callback.handleResponse(backendResponse, entityDetails);
326 }
327
328 @Override
329 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
330 final AsyncExecCallback callback = callbackRef.getAndSet(null);
331 if (callback != null) {
332 callback.handleInformationResponse(response);
333 } else {
334 asyncExecCallback.handleInformationResponse(response);
335 }
336 }
337
338 @Override
339 public void completed() {
340 final AsyncExecCallback callback = callbackRef.getAndSet(null);
341 if (callback != null) {
342 callback.completed();
343 } else {
344 asyncExecCallback.completed();
345 }
346 }
347
348 @Override
349 public void failed(final Exception cause) {
350 final AsyncExecCallback callback = callbackRef.getAndSet(null);
351 if (callback != null) {
352 callback.failed(cause);
353 } else {
354 asyncExecCallback.failed(cause);
355 }
356 }
357
358 });
359 }
360
361 class CachingAsyncDataConsumer implements AsyncDataConsumer {
362
363 private final AsyncExecCallback fallback;
364 private final HttpResponse backendResponse;
365 private final EntityDetails entityDetails;
366 private final AtomicBoolean writtenThrough;
367 private final AtomicReference<ByteArrayBuffer> bufferRef;
368 private final AtomicReference<AsyncDataConsumer> dataConsumerRef;
369
370 CachingAsyncDataConsumer(
371 final AsyncExecCallback fallback,
372 final HttpResponse backendResponse,
373 final EntityDetails entityDetails) {
374 this.fallback = fallback;
375 this.backendResponse = backendResponse;
376 this.entityDetails = entityDetails;
377 this.writtenThrough = new AtomicBoolean(false);
378 this.bufferRef = new AtomicReference<>(entityDetails != null ? new ByteArrayBuffer(1024) : null);
379 this.dataConsumerRef = new AtomicReference<>();
380 }
381
382 @Override
383 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
384 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
385 if (dataConsumer != null) {
386 dataConsumer.updateCapacity(capacityChannel);
387 } else {
388 capacityChannel.update(Integer.MAX_VALUE);
389 }
390 }
391
392 @Override
393 public final void consume(final ByteBuffer src) throws IOException {
394 final ByteArrayBuffer buffer = bufferRef.get();
395 if (buffer != null) {
396 if (src.hasArray()) {
397 buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
398 } else {
399 while (src.hasRemaining()) {
400 buffer.append(src.get());
401 }
402 }
403 if (buffer.length() > cacheConfig.getMaxObjectSize()) {
404 LOG.debug("Backend response content length exceeds maximum");
405
406
407 bufferRef.set(null);
408 try {
409 final AsyncDataConsumer dataConsumer = fallback.handleResponse(backendResponse, entityDetails);
410 if (dataConsumer != null) {
411 dataConsumerRef.set(dataConsumer);
412 writtenThrough.set(true);
413 dataConsumer.consume(ByteBuffer.wrap(buffer.array(), 0, buffer.length()));
414 }
415 } catch (final HttpException ex) {
416 fallback.failed(ex);
417 }
418 }
419 } else {
420 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
421 if (dataConsumer != null) {
422 dataConsumer.consume(src);
423 }
424 }
425 }
426
427 @Override
428 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
429 final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
430 if (dataConsumer != null) {
431 dataConsumer.streamEnd(trailers);
432 }
433 }
434
435 @Override
436 public void releaseResources() {
437 final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
438 if (dataConsumer != null) {
439 dataConsumer.releaseResources();
440 }
441 }
442
443 }
444
445 class BackendResponseHandler implements AsyncExecCallback {
446
447 private final HttpHost target;
448 private final HttpRequest request;
449 private final Date requestDate;
450 private final Date responseDate;
451 private final AsyncExecChain.Scope scope;
452 private final AsyncExecCallback asyncExecCallback;
453 private final AtomicReference<CachingAsyncDataConsumer> cachingConsumerRef;
454
455 BackendResponseHandler(
456 final HttpHost target,
457 final HttpRequest request,
458 final Date requestDate,
459 final Date responseDate,
460 final AsyncExecChain.Scope scope,
461 final AsyncExecCallback asyncExecCallback) {
462 this.target = target;
463 this.request = request;
464 this.requestDate = requestDate;
465 this.responseDate = responseDate;
466 this.scope = scope;
467 this.asyncExecCallback = asyncExecCallback;
468 this.cachingConsumerRef = new AtomicReference<>();
469 }
470
471 @Override
472 public AsyncDataConsumer handleResponse(
473 final HttpResponse backendResponse,
474 final EntityDetails entityDetails) throws HttpException, IOException {
475 responseCompliance.ensureProtocolCompliance(scope.originalRequest, request, backendResponse);
476 responseCache.flushCacheEntriesInvalidatedByExchange(target, request, backendResponse, new FutureCallback<Boolean>() {
477
478 @Override
479 public void completed(final Boolean result) {
480 }
481
482 @Override
483 public void failed(final Exception ex) {
484 LOG.warn("Unable to flush invalidated entries from cache", ex);
485 }
486
487 @Override
488 public void cancelled() {
489 }
490
491 });
492 final boolean cacheable = responseCachingPolicy.isResponseCacheable(request, backendResponse);
493 if (cacheable) {
494 cachingConsumerRef.set(new CachingAsyncDataConsumer(asyncExecCallback, backendResponse, entityDetails));
495 storeRequestIfModifiedSinceFor304Response(request, backendResponse);
496 } else {
497 LOG.debug("Backend response is not cacheable");
498 responseCache.flushCacheEntriesFor(target, request, new FutureCallback<Boolean>() {
499
500 @Override
501 public void completed(final Boolean result) {
502 }
503
504 @Override
505 public void failed(final Exception ex) {
506 LOG.warn("Unable to flush invalidated entries from cache", ex);
507 }
508
509 @Override
510 public void cancelled() {
511 }
512
513 });
514 }
515 final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.get();
516 if (cachingDataConsumer != null) {
517 LOG.debug("Caching backend response");
518 return cachingDataConsumer;
519 }
520 return asyncExecCallback.handleResponse(backendResponse, entityDetails);
521 }
522
523 @Override
524 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
525 asyncExecCallback.handleInformationResponse(response);
526 }
527
528 void triggerNewCacheEntryResponse(final HttpResponse backendResponse, final Date responseDate, final ByteArrayBuffer buffer) {
529 final CancellableDependency operation = scope.cancellableDependency;
530 operation.setDependency(responseCache.createCacheEntry(
531 target,
532 request,
533 backendResponse,
534 buffer,
535 requestDate,
536 responseDate,
537 new FutureCallback<HttpCacheEntry>() {
538
539 @Override
540 public void completed(final HttpCacheEntry newEntry) {
541 LOG.debug("Backend response successfully cached");
542 try {
543 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, newEntry);
544 triggerResponse(cacheResponse, scope, asyncExecCallback);
545 } catch (final ResourceIOException ex) {
546 asyncExecCallback.failed(ex);
547 }
548 }
549
550 @Override
551 public void failed(final Exception ex) {
552 asyncExecCallback.failed(ex);
553 }
554
555 @Override
556 public void cancelled() {
557 asyncExecCallback.failed(new InterruptedIOException());
558 }
559
560 }));
561
562 }
563
564 @Override
565 public void completed() {
566 final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.getAndSet(null);
567 if (cachingDataConsumer != null && !cachingDataConsumer.writtenThrough.get()) {
568 final ByteArrayBuffer buffer = cachingDataConsumer.bufferRef.getAndSet(null);
569 final HttpResponse backendResponse = cachingDataConsumer.backendResponse;
570 if (cacheConfig.isFreshnessCheckEnabled()) {
571 final CancellableDependency operation = scope.cancellableDependency;
572 operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
573
574 @Override
575 public void completed(final HttpCacheEntry existingEntry) {
576 if (DateUtils.isAfter(existingEntry, backendResponse, HttpHeaders.DATE)) {
577 LOG.debug("Backend already contains fresher cache entry");
578 try {
579 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, existingEntry);
580 triggerResponse(cacheResponse, scope, asyncExecCallback);
581 } catch (final ResourceIOException ex) {
582 asyncExecCallback.failed(ex);
583 }
584 } else {
585 triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
586 }
587 }
588
589 @Override
590 public void failed(final Exception cause) {
591 asyncExecCallback.failed(cause);
592 }
593
594 @Override
595 public void cancelled() {
596 asyncExecCallback.failed(new InterruptedIOException());
597 }
598
599 }));
600 } else {
601 triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
602 }
603 } else {
604 asyncExecCallback.completed();
605 }
606 }
607
608 @Override
609 public void failed(final Exception cause) {
610 asyncExecCallback.failed(cause);
611 }
612
613 }
614
615 private void handleCacheHit(
616 final HttpHost target,
617 final HttpRequest request,
618 final AsyncEntityProducer entityProducer,
619 final AsyncExecChain.Scope scope,
620 final AsyncExecChain chain,
621 final AsyncExecCallback asyncExecCallback,
622 final HttpCacheEntry entry) {
623 final HttpClientContext context = scope.clientContext;
624 recordCacheHit(target, request);
625 final Date now = getCurrentDate();
626 if (suitabilityChecker.canCachedResponseBeUsed(target, request, entry, now)) {
627 LOG.debug("Cache hit");
628 try {
629 final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
630 triggerResponse(cacheResponse, scope, asyncExecCallback);
631 } catch (final ResourceIOException ex) {
632 recordCacheFailure(target, request);
633 if (!mayCallBackend(request)) {
634 final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
635 triggerResponse(cacheResponse, scope, asyncExecCallback);
636 } else {
637 setResponseStatus(scope.clientContext, CacheResponseStatus.FAILURE);
638 try {
639 chain.proceed(request, entityProducer, scope, asyncExecCallback);
640 } catch (final HttpException | IOException ex2) {
641 asyncExecCallback.failed(ex2);
642 }
643 }
644 }
645 } else if (!mayCallBackend(request)) {
646 LOG.debug("Cache entry not suitable but only-if-cached requested");
647 final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
648 triggerResponse(cacheResponse, scope, asyncExecCallback);
649 } else if (!(entry.getStatus() == HttpStatus.SC_NOT_MODIFIED && !suitabilityChecker.isConditional(request))) {
650 LOG.debug("Revalidating cache entry");
651 if (cacheRevalidator != null
652 && !staleResponseNotAllowed(request, entry, now)
653 && validityPolicy.mayReturnStaleWhileRevalidating(entry, now)) {
654 LOG.debug("Serving stale with asynchronous revalidation");
655 try {
656 final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
657 final String exchangeId = ExecSupport.getNextExchangeId();
658 final AsyncExecChain.Scope fork = new AsyncExecChain.Scope(
659 exchangeId,
660 scope.route,
661 scope.originalRequest,
662 new ComplexFuture<>(null),
663 HttpClientContext.create(),
664 scope.execRuntime.fork());
665 cacheRevalidator.revalidateCacheEntry(
666 responseCache.generateKey(target, request, entry),
667 asyncExecCallback,
668 new DefaultAsyncCacheRevalidator.RevalidationCall() {
669
670 @Override
671 public void execute(final AsyncExecCallback asyncExecCallback) {
672 revalidateCacheEntry(target, request, entityProducer, fork, chain, asyncExecCallback, entry);
673 }
674
675 });
676 triggerResponse(cacheResponse, scope, asyncExecCallback);
677 } catch (final ResourceIOException ex) {
678 asyncExecCallback.failed(ex);
679 }
680 } else {
681 revalidateCacheEntry(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
682 }
683 } else {
684 LOG.debug("Cache entry not usable; calling backend");
685 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
686 }
687 }
688
689 void revalidateCacheEntry(
690 final HttpHost target,
691 final HttpRequest request,
692 final AsyncEntityProducer entityProducer,
693 final AsyncExecChain.Scope scope,
694 final AsyncExecChain chain,
695 final AsyncExecCallback asyncExecCallback,
696 final HttpCacheEntry cacheEntry) {
697 final Date requestDate = getCurrentDate();
698 final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequest(scope.originalRequest, cacheEntry);
699 chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
700
701 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
702
703 void triggerUpdatedCacheEntryResponse(final HttpResponse backendResponse, final Date responseDate) {
704 final CancellableDependency operation = scope.cancellableDependency;
705 recordCacheUpdate(scope.clientContext);
706 operation.setDependency(responseCache.updateCacheEntry(
707 target,
708 request,
709 cacheEntry,
710 backendResponse,
711 requestDate,
712 responseDate,
713 new FutureCallback<HttpCacheEntry>() {
714
715 @Override
716 public void completed(final HttpCacheEntry updatedEntry) {
717 if (suitabilityChecker.isConditional(request)
718 && suitabilityChecker.allConditionalsMatch(request, updatedEntry, new Date())) {
719 final SimpleHttpResponse cacheResponse = responseGenerator.generateNotModifiedResponse(updatedEntry);
720 triggerResponse(cacheResponse, scope, asyncExecCallback);
721 } else {
722 try {
723 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, updatedEntry);
724 triggerResponse(cacheResponse, scope, asyncExecCallback);
725 } catch (final ResourceIOException ex) {
726 asyncExecCallback.failed(ex);
727 }
728 }
729 }
730
731 @Override
732 public void failed(final Exception ex) {
733 asyncExecCallback.failed(ex);
734 }
735
736 @Override
737 public void cancelled() {
738 asyncExecCallback.failed(new InterruptedIOException());
739 }
740
741 }));
742 }
743
744 void triggerResponseStaleCacheEntry() {
745 try {
746 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, cacheEntry);
747 cacheResponse.addHeader(HeaderConstants.WARNING, "110 localhost \"Response is stale\"");
748 triggerResponse(cacheResponse, scope, asyncExecCallback);
749 } catch (final ResourceIOException ex) {
750 asyncExecCallback.failed(ex);
751 }
752 }
753
754 AsyncExecCallback evaluateResponse(final HttpResponse backendResponse, final Date responseDate) {
755 backendResponse.addHeader(HeaderConstants.VIA, generateViaHeader(backendResponse));
756
757 final int statusCode = backendResponse.getCode();
758 if (statusCode == HttpStatus.SC_NOT_MODIFIED || statusCode == HttpStatus.SC_OK) {
759 recordCacheUpdate(scope.clientContext);
760 }
761 if (statusCode == HttpStatus.SC_NOT_MODIFIED) {
762 return new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
763
764 @Override
765 public void run() {
766 triggerUpdatedCacheEntryResponse(backendResponse, responseDate);
767 }
768
769 });
770 }
771 if (staleIfErrorAppliesTo(statusCode)
772 && !staleResponseNotAllowed(request, cacheEntry, getCurrentDate())
773 && validityPolicy.mayReturnStaleIfError(request, cacheEntry, responseDate)) {
774 return new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
775
776 @Override
777 public void run() {
778 triggerResponseStaleCacheEntry();
779 }
780
781 });
782 }
783 return new BackendResponseHandler(target, conditionalRequest, requestDate, responseDate, scope, asyncExecCallback);
784 }
785
786 @Override
787 public AsyncDataConsumer handleResponse(
788 final HttpResponse backendResponse1,
789 final EntityDetails entityDetails) throws HttpException, IOException {
790
791 final Date responseDate1 = getCurrentDate();
792
793 final AsyncExecCallback callback1;
794 if (revalidationResponseIsTooOld(backendResponse1, cacheEntry)
795 && (entityProducer == null || entityProducer.isRepeatable())) {
796
797 final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(
798 scope.originalRequest);
799
800 callback1 = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
801
802 @Override
803 public void run() {
804 chainProceed(unconditional, entityProducer, scope, chain, new AsyncExecCallback() {
805
806 @Override
807 public AsyncDataConsumer handleResponse(
808 final HttpResponse backendResponse2,
809 final EntityDetails entityDetails) throws HttpException, IOException {
810 final Date responseDate2 = getCurrentDate();
811 final AsyncExecCallback callback2 = evaluateResponse(backendResponse2, responseDate2);
812 callbackRef.set(callback2);
813 return callback2.handleResponse(backendResponse2, entityDetails);
814 }
815
816 @Override
817 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
818 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
819 if (callback2 != null) {
820 callback2.handleInformationResponse(response);
821 } else {
822 asyncExecCallback.handleInformationResponse(response);
823 }
824 }
825
826 @Override
827 public void completed() {
828 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
829 if (callback2 != null) {
830 callback2.completed();
831 } else {
832 asyncExecCallback.completed();
833 }
834 }
835
836 @Override
837 public void failed(final Exception cause) {
838 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
839 if (callback2 != null) {
840 callback2.failed(cause);
841 } else {
842 asyncExecCallback.failed(cause);
843 }
844 }
845
846 });
847
848 }
849
850 });
851 } else {
852 callback1 = evaluateResponse(backendResponse1, responseDate1);
853 }
854 callbackRef.set(callback1);
855 return callback1.handleResponse(backendResponse1, entityDetails);
856 }
857
858 @Override
859 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
860 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
861 if (callback1 != null) {
862 callback1.handleInformationResponse(response);
863 } else {
864 asyncExecCallback.handleInformationResponse(response);
865 }
866 }
867
868 @Override
869 public void completed() {
870 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
871 if (callback1 != null) {
872 callback1.completed();
873 } else {
874 asyncExecCallback.completed();
875 }
876 }
877
878 @Override
879 public void failed(final Exception cause) {
880 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
881 if (callback1 != null) {
882 callback1.failed(cause);
883 } else {
884 asyncExecCallback.failed(cause);
885 }
886 }
887
888 });
889
890 }
891
892 private void handleCacheMiss(
893 final HttpHost target,
894 final HttpRequest request,
895 final AsyncEntityProducer entityProducer,
896 final AsyncExecChain.Scope scope,
897 final AsyncExecChain chain,
898 final AsyncExecCallback asyncExecCallback) {
899 recordCacheMiss(target, request);
900
901 if (mayCallBackend(request)) {
902 final CancellableDependency operation = scope.cancellableDependency;
903 operation.setDependency(responseCache.getVariantCacheEntriesWithEtags(
904 target,
905 request,
906 new FutureCallback<Map<String, Variant>>() {
907
908 @Override
909 public void completed(final Map<String, Variant> variants) {
910 if (variants != null && !variants.isEmpty() && (entityProducer == null || entityProducer.isRepeatable())) {
911 negotiateResponseFromVariants(target, request, entityProducer, scope, chain, asyncExecCallback, variants);
912 } else {
913 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
914 }
915 }
916
917 @Override
918 public void failed(final Exception ex) {
919 asyncExecCallback.failed(ex);
920 }
921
922 @Override
923 public void cancelled() {
924 asyncExecCallback.failed(new InterruptedIOException());
925 }
926
927 }));
928 } else {
929 final SimpleHttpResponse cacheResponse = SimpleHttpResponse.create(HttpStatus.SC_GATEWAY_TIMEOUT, "Gateway Timeout");
930 triggerResponse(cacheResponse, scope, asyncExecCallback);
931 }
932 }
933
934 void negotiateResponseFromVariants(
935 final HttpHost target,
936 final HttpRequest request,
937 final AsyncEntityProducer entityProducer,
938 final AsyncExecChain.Scope scope,
939 final AsyncExecChain chain,
940 final AsyncExecCallback asyncExecCallback,
941 final Map<String, Variant> variants) {
942 final CancellableDependency operation = scope.cancellableDependency;
943 final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequestFromVariants(request, variants);
944
945 final Date requestDate = getCurrentDate();
946 chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
947
948 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
949
950 void updateVariantCacheEntry(final HttpResponse backendResponse, final Date responseDate, final Variant matchingVariant) {
951 recordCacheUpdate(scope.clientContext);
952 operation.setDependency(responseCache.updateVariantCacheEntry(
953 target,
954 conditionalRequest,
955 backendResponse,
956 matchingVariant,
957 requestDate,
958 responseDate,
959 new FutureCallback<HttpCacheEntry>() {
960
961 @Override
962 public void completed(final HttpCacheEntry responseEntry) {
963 if (shouldSendNotModifiedResponse(request, responseEntry)) {
964 final SimpleHttpResponse cacheResponse = responseGenerator.generateNotModifiedResponse(responseEntry);
965 triggerResponse(cacheResponse, scope, asyncExecCallback);
966 } else {
967 try {
968 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, responseEntry);
969 operation.setDependency(responseCache.reuseVariantEntryFor(
970 target,
971 request,
972 matchingVariant,
973 new FutureCallback<Boolean>() {
974
975 @Override
976 public void completed(final Boolean result) {
977 triggerResponse(cacheResponse, scope, asyncExecCallback);
978 }
979
980 @Override
981 public void failed(final Exception ex) {
982 asyncExecCallback.failed(ex);
983 }
984
985 @Override
986 public void cancelled() {
987 asyncExecCallback.failed(new InterruptedIOException());
988 }
989
990 }));
991 } catch (final ResourceIOException ex) {
992 asyncExecCallback.failed(ex);
993 }
994 }
995 }
996
997 @Override
998 public void failed(final Exception ex) {
999 asyncExecCallback.failed(ex);
1000 }
1001
1002 @Override
1003 public void cancelled() {
1004 asyncExecCallback.failed(new InterruptedIOException());
1005 }
1006
1007 }));
1008 }
1009
1010 @Override
1011 public AsyncDataConsumer handleResponse(
1012 final HttpResponse backendResponse,
1013 final EntityDetails entityDetails) throws HttpException, IOException {
1014 final Date responseDate = getCurrentDate();
1015 backendResponse.addHeader("Via", generateViaHeader(backendResponse));
1016
1017 final AsyncExecCallback callback;
1018
1019 if (backendResponse.getCode() != HttpStatus.SC_NOT_MODIFIED) {
1020 callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
1021 } else {
1022 final Header resultEtagHeader = backendResponse.getFirstHeader(HeaderConstants.ETAG);
1023 if (resultEtagHeader == null) {
1024 LOG.warn("304 response did not contain ETag");
1025 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1026
1027 @Override
1028 public void run() {
1029 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
1030 }
1031
1032 });
1033 } else {
1034 final String resultEtag = resultEtagHeader.getValue();
1035 final Variant matchingVariant = variants.get(resultEtag);
1036 if (matchingVariant == null) {
1037 LOG.debug("304 response did not contain ETag matching one sent in If-None-Match");
1038 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1039
1040 @Override
1041 public void run() {
1042 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
1043 }
1044
1045 });
1046 } else {
1047 if (revalidationResponseIsTooOld(backendResponse, matchingVariant.getEntry())) {
1048 final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(request);
1049 scope.clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, unconditional);
1050 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1051
1052 @Override
1053 public void run() {
1054 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
1055 }
1056
1057 });
1058 } else {
1059 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1060
1061 @Override
1062 public void run() {
1063 updateVariantCacheEntry(backendResponse, responseDate, matchingVariant);
1064 }
1065
1066 });
1067 }
1068 }
1069 }
1070 }
1071 callbackRef.set(callback);
1072 return callback.handleResponse(backendResponse, entityDetails);
1073 }
1074
1075 @Override
1076 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
1077 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1078 if (callback != null) {
1079 callback.handleInformationResponse(response);
1080 } else {
1081 asyncExecCallback.handleInformationResponse(response);
1082 }
1083 }
1084
1085 @Override
1086 public void completed() {
1087 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1088 if (callback != null) {
1089 callback.completed();
1090 } else {
1091 asyncExecCallback.completed();
1092 }
1093 }
1094
1095 @Override
1096 public void failed(final Exception cause) {
1097 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1098 if (callback != null) {
1099 callback.failed(cause);
1100 } else {
1101 asyncExecCallback.failed(cause);
1102 }
1103 }
1104
1105 });
1106
1107 }
1108
1109 }