1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.cache;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.Date;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.hc.client5.http.HttpRoute;
40 import org.apache.hc.client5.http.async.AsyncExecCallback;
41 import org.apache.hc.client5.http.async.AsyncExecChain;
42 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
43 import org.apache.hc.client5.http.async.methods.SimpleBody;
44 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
45 import org.apache.hc.client5.http.cache.CacheResponseStatus;
46 import org.apache.hc.client5.http.cache.HeaderConstants;
47 import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
48 import org.apache.hc.client5.http.cache.HttpCacheEntry;
49 import org.apache.hc.client5.http.cache.ResourceFactory;
50 import org.apache.hc.client5.http.cache.ResourceIOException;
51 import org.apache.hc.client5.http.impl.ExecSupport;
52 import org.apache.hc.client5.http.protocol.HttpClientContext;
53 import org.apache.hc.client5.http.schedule.SchedulingStrategy;
54 import org.apache.hc.client5.http.utils.DateUtils;
55 import org.apache.hc.core5.annotation.Contract;
56 import org.apache.hc.core5.annotation.ThreadingBehavior;
57 import org.apache.hc.core5.concurrent.CancellableDependency;
58 import org.apache.hc.core5.concurrent.ComplexFuture;
59 import org.apache.hc.core5.concurrent.FutureCallback;
60 import org.apache.hc.core5.function.Factory;
61 import org.apache.hc.core5.http.ContentType;
62 import org.apache.hc.core5.http.EntityDetails;
63 import org.apache.hc.core5.http.Header;
64 import org.apache.hc.core5.http.HttpException;
65 import org.apache.hc.core5.http.HttpHeaders;
66 import org.apache.hc.core5.http.HttpHost;
67 import org.apache.hc.core5.http.HttpRequest;
68 import org.apache.hc.core5.http.HttpResponse;
69 import org.apache.hc.core5.http.HttpStatus;
70 import org.apache.hc.core5.http.impl.BasicEntityDetails;
71 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
72 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
73 import org.apache.hc.core5.http.nio.CapacityChannel;
74 import org.apache.hc.core5.http.protocol.HttpCoreContext;
75 import org.apache.hc.core5.http.support.BasicRequestBuilder;
76 import org.apache.hc.core5.net.URIAuthority;
77 import org.apache.hc.core5.util.Args;
78 import org.apache.hc.core5.util.ByteArrayBuffer;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82
83
84
85
86
87
88
89
90
91
92
93 @Contract(threading = ThreadingBehavior.SAFE)
94 class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler {
95
96 private static final Logger LOG = LoggerFactory.getLogger(AsyncCachingExec.class);
97 private final HttpAsyncCache responseCache;
98 private final DefaultAsyncCacheRevalidator cacheRevalidator;
99 private final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder;
100
101 AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) {
102 super(config);
103 this.responseCache = Args.notNull(cache, "Response cache");
104 this.cacheRevalidator = cacheRevalidator;
105 this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(new Factory<HttpRequest, HttpRequest>() {
106
107 @Override
108 public HttpRequest create(final HttpRequest request) {
109 return BasicRequestBuilder.copy(request).build();
110 }
111
112 });
113 }
114
115 AsyncCachingExec(
116 final HttpAsyncCache responseCache,
117 final CacheValidityPolicy validityPolicy,
118 final ResponseCachingPolicy responseCachingPolicy,
119 final CachedHttpResponseGenerator responseGenerator,
120 final CacheableRequestPolicy cacheableRequestPolicy,
121 final CachedResponseSuitabilityChecker suitabilityChecker,
122 final ResponseProtocolCompliance responseCompliance,
123 final RequestProtocolCompliance requestCompliance,
124 final DefaultAsyncCacheRevalidator cacheRevalidator,
125 final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder,
126 final CacheConfig config) {
127 super(validityPolicy, responseCachingPolicy, responseGenerator, cacheableRequestPolicy,
128 suitabilityChecker, responseCompliance, requestCompliance, config);
129 this.responseCache = responseCache;
130 this.cacheRevalidator = cacheRevalidator;
131 this.conditionalRequestBuilder = conditionalRequestBuilder;
132 }
133
134 AsyncCachingExec(
135 final HttpAsyncCache cache,
136 final ScheduledExecutorService executorService,
137 final SchedulingStrategy schedulingStrategy,
138 final CacheConfig config) {
139 this(cache,
140 executorService != null ? new DefaultAsyncCacheRevalidator(executorService, schedulingStrategy) : null,
141 config);
142 }
143
144 AsyncCachingExec(
145 final ResourceFactory resourceFactory,
146 final HttpAsyncCacheStorage storage,
147 final ScheduledExecutorService executorService,
148 final SchedulingStrategy schedulingStrategy,
149 final CacheConfig config) {
150 this(new BasicHttpAsyncCache(resourceFactory, storage), executorService, schedulingStrategy, config);
151 }
152
153 private void triggerResponse(
154 final SimpleHttpResponse cacheResponse,
155 final AsyncExecChain.Scope scope,
156 final AsyncExecCallback asyncExecCallback) {
157 scope.clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, cacheResponse);
158 scope.execRuntime.releaseEndpoint();
159
160 final SimpleBody body = cacheResponse.getBody();
161 final byte[] content = body != null ? body.getBodyBytes() : null;
162 final ContentType contentType = body != null ? body.getContentType() : null;
163 try {
164 final AsyncDataConsumer dataConsumer = asyncExecCallback.handleResponse(
165 cacheResponse,
166 content != null ? new BasicEntityDetails(content.length, contentType) : null);
167 if (dataConsumer != null) {
168 if (content != null) {
169 dataConsumer.consume(ByteBuffer.wrap(content));
170 }
171 dataConsumer.streamEnd(null);
172 }
173 asyncExecCallback.completed();
174 } catch (final HttpException | IOException ex) {
175 asyncExecCallback.failed(ex);
176 }
177 }
178
179 static class AsyncExecCallbackWrapper implements AsyncExecCallback {
180
181 private final AsyncExecCallback asyncExecCallback;
182 private final Runnable command;
183
184 AsyncExecCallbackWrapper(final AsyncExecCallback asyncExecCallback, final Runnable command) {
185 this.asyncExecCallback = asyncExecCallback;
186 this.command = command;
187 }
188
189 @Override
190 public AsyncDataConsumer handleResponse(
191 final HttpResponse response,
192 final EntityDetails entityDetails) throws HttpException, IOException {
193 return null;
194 }
195
196 @Override
197 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
198 }
199
200 @Override
201 public void completed() {
202 command.run();
203 }
204
205 @Override
206 public void failed(final Exception cause) {
207 asyncExecCallback.failed(cause);
208 }
209
210 }
211
212 @Override
213 public void execute(
214 final HttpRequest request,
215 final AsyncEntityProducer entityProducer,
216 final AsyncExecChain.Scope scope,
217 final AsyncExecChain chain,
218 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
219 Args.notNull(request, "HTTP request");
220 Args.notNull(scope, "Scope");
221
222 final HttpRoute route = scope.route;
223 final CancellableDependency operation = scope.cancellableDependency;
224 final HttpClientContext context = scope.clientContext;
225 context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
226 context.setAttribute(HttpClientContext.HTTP_REQUEST, request);
227
228 final URIAuthority authority = request.getAuthority();
229 final String scheme = request.getScheme();
230 final HttpHost target = authority != null ? new HttpHost(scheme, authority) : route.getTargetHost();
231 final String via = generateViaHeader(request);
232
233
234 setResponseStatus(context, CacheResponseStatus.CACHE_MISS);
235
236 if (clientRequestsOurOptions(request)) {
237 setResponseStatus(context, CacheResponseStatus.CACHE_MODULE_RESPONSE);
238 triggerResponse(SimpleHttpResponse.create(HttpStatus.SC_NOT_IMPLEMENTED), scope, asyncExecCallback);
239 return;
240 }
241
242 final SimpleHttpResponse fatalErrorResponse = getFatallyNoncompliantResponse(request, context);
243 if (fatalErrorResponse != null) {
244 triggerResponse(fatalErrorResponse, scope, asyncExecCallback);
245 return;
246 }
247
248 requestCompliance.makeRequestCompliant(request);
249 request.addHeader("Via",via);
250
251 if (!cacheableRequestPolicy.isServableFromCache(request)) {
252 LOG.debug("Request is not servable from cache");
253 operation.setDependency(responseCache.flushCacheEntriesInvalidatedByRequest(target, request, new FutureCallback<Boolean>() {
254
255 @Override
256 public void completed(final Boolean result) {
257 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
258 }
259
260 @Override
261 public void failed(final Exception cause) {
262 asyncExecCallback.failed(cause);
263 }
264
265 @Override
266 public void cancelled() {
267 asyncExecCallback.failed(new InterruptedIOException());
268 }
269
270 }));
271 } else {
272 operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
273
274 @Override
275 public void completed(final HttpCacheEntry entry) {
276 if (entry == null) {
277 LOG.debug("Cache miss");
278 handleCacheMiss(target, request, entityProducer, scope, chain, asyncExecCallback);
279 } else {
280 handleCacheHit(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
281 }
282 }
283
284 @Override
285 public void failed(final Exception cause) {
286 asyncExecCallback.failed(cause);
287 }
288
289 @Override
290 public void cancelled() {
291 asyncExecCallback.failed(new InterruptedIOException());
292 }
293
294 }));
295
296 }
297 }
298
299 void chainProceed(
300 final HttpRequest request,
301 final AsyncEntityProducer entityProducer,
302 final AsyncExecChain.Scope scope,
303 final AsyncExecChain chain,
304 final AsyncExecCallback asyncExecCallback) {
305 try {
306 chain.proceed(request, entityProducer, scope, asyncExecCallback);
307 } catch (final HttpException | IOException ex) {
308 asyncExecCallback.failed(ex);
309 }
310 }
311
312 void callBackend(
313 final HttpHost target,
314 final HttpRequest request,
315 final AsyncEntityProducer entityProducer,
316 final AsyncExecChain.Scope scope,
317 final AsyncExecChain chain,
318 final AsyncExecCallback asyncExecCallback) {
319 LOG.debug("Calling the backend");
320 final Date requestDate = getCurrentDate();
321 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
322 chainProceed(request, entityProducer, scope, chain, new AsyncExecCallback() {
323
324 @Override
325 public AsyncDataConsumer handleResponse(
326 final HttpResponse backendResponse,
327 final EntityDetails entityDetails) throws HttpException, IOException {
328 final Date responseDate = getCurrentDate();
329 backendResponse.addHeader("Via", generateViaHeader(backendResponse));
330
331 final AsyncExecCallback callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
332 callbackRef.set(callback);
333 return callback.handleResponse(backendResponse, entityDetails);
334 }
335
336 @Override
337 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
338 final AsyncExecCallback callback = callbackRef.getAndSet(null);
339 if (callback != null) {
340 callback.handleInformationResponse(response);
341 } else {
342 asyncExecCallback.handleInformationResponse(response);
343 }
344 }
345
346 @Override
347 public void completed() {
348 final AsyncExecCallback callback = callbackRef.getAndSet(null);
349 if (callback != null) {
350 callback.completed();
351 } else {
352 asyncExecCallback.completed();
353 }
354 }
355
356 @Override
357 public void failed(final Exception cause) {
358 final AsyncExecCallback callback = callbackRef.getAndSet(null);
359 if (callback != null) {
360 callback.failed(cause);
361 } else {
362 asyncExecCallback.failed(cause);
363 }
364 }
365
366 });
367 }
368
369 class CachingAsyncDataConsumer implements AsyncDataConsumer {
370
371 private final AsyncExecCallback fallback;
372 private final HttpResponse backendResponse;
373 private final EntityDetails entityDetails;
374 private final AtomicBoolean writtenThrough;
375 private final AtomicReference<ByteArrayBuffer> bufferRef;
376 private final AtomicReference<AsyncDataConsumer> dataConsumerRef;
377
378 CachingAsyncDataConsumer(
379 final AsyncExecCallback fallback,
380 final HttpResponse backendResponse,
381 final EntityDetails entityDetails) {
382 this.fallback = fallback;
383 this.backendResponse = backendResponse;
384 this.entityDetails = entityDetails;
385 this.writtenThrough = new AtomicBoolean(false);
386 this.bufferRef = new AtomicReference<>(entityDetails != null ? new ByteArrayBuffer(1024) : null);
387 this.dataConsumerRef = new AtomicReference<>();
388 }
389
390 @Override
391 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
392 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
393 if (dataConsumer != null) {
394 dataConsumer.updateCapacity(capacityChannel);
395 } else {
396 capacityChannel.update(Integer.MAX_VALUE);
397 }
398 }
399
400 @Override
401 public final void consume(final ByteBuffer src) throws IOException {
402 final ByteArrayBuffer buffer = bufferRef.get();
403 if (buffer != null) {
404 if (src.hasArray()) {
405 buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
406 } else {
407 while (src.hasRemaining()) {
408 buffer.append(src.get());
409 }
410 }
411 if (buffer.length() > cacheConfig.getMaxObjectSize()) {
412 LOG.debug("Backend response content length exceeds maximum");
413
414
415 bufferRef.set(null);
416 try {
417 final AsyncDataConsumer dataConsumer = fallback.handleResponse(backendResponse, entityDetails);
418 if (dataConsumer != null) {
419 dataConsumerRef.set(dataConsumer);
420 writtenThrough.set(true);
421 dataConsumer.consume(ByteBuffer.wrap(buffer.array(), 0, buffer.length()));
422 }
423 } catch (final HttpException ex) {
424 fallback.failed(ex);
425 }
426 }
427 } else {
428 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
429 if (dataConsumer != null) {
430 dataConsumer.consume(src);
431 }
432 }
433 }
434
435 @Override
436 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
437 final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
438 if (dataConsumer != null) {
439 dataConsumer.streamEnd(trailers);
440 }
441 }
442
443 @Override
444 public void releaseResources() {
445 final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
446 if (dataConsumer != null) {
447 dataConsumer.releaseResources();
448 }
449 }
450
451 }
452
453 class BackendResponseHandler implements AsyncExecCallback {
454
455 private final HttpHost target;
456 private final HttpRequest request;
457 private final Date requestDate;
458 private final Date responseDate;
459 private final AsyncExecChain.Scope scope;
460 private final AsyncExecCallback asyncExecCallback;
461 private final AtomicReference<CachingAsyncDataConsumer> cachingConsumerRef;
462
463 BackendResponseHandler(
464 final HttpHost target,
465 final HttpRequest request,
466 final Date requestDate,
467 final Date responseDate,
468 final AsyncExecChain.Scope scope,
469 final AsyncExecCallback asyncExecCallback) {
470 this.target = target;
471 this.request = request;
472 this.requestDate = requestDate;
473 this.responseDate = responseDate;
474 this.scope = scope;
475 this.asyncExecCallback = asyncExecCallback;
476 this.cachingConsumerRef = new AtomicReference<>();
477 }
478
479 @Override
480 public AsyncDataConsumer handleResponse(
481 final HttpResponse backendResponse,
482 final EntityDetails entityDetails) throws HttpException, IOException {
483 responseCompliance.ensureProtocolCompliance(scope.originalRequest, request, backendResponse);
484 responseCache.flushCacheEntriesInvalidatedByExchange(target, request, backendResponse, new FutureCallback<Boolean>() {
485
486 @Override
487 public void completed(final Boolean result) {
488 }
489
490 @Override
491 public void failed(final Exception ex) {
492 LOG.warn("Unable to flush invalidated entries from cache", ex);
493 }
494
495 @Override
496 public void cancelled() {
497 }
498
499 });
500 final boolean cacheable = responseCachingPolicy.isResponseCacheable(request, backendResponse);
501 if (cacheable) {
502 cachingConsumerRef.set(new CachingAsyncDataConsumer(asyncExecCallback, backendResponse, entityDetails));
503 storeRequestIfModifiedSinceFor304Response(request, backendResponse);
504 } else {
505 LOG.debug("Backend response is not cacheable");
506 responseCache.flushCacheEntriesFor(target, request, new FutureCallback<Boolean>() {
507
508 @Override
509 public void completed(final Boolean result) {
510 }
511
512 @Override
513 public void failed(final Exception ex) {
514 LOG.warn("Unable to flush invalidated entries from cache", ex);
515 }
516
517 @Override
518 public void cancelled() {
519 }
520
521 });
522 }
523 final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.get();
524 if (cachingDataConsumer != null) {
525 LOG.debug("Caching backend response");
526 return cachingDataConsumer;
527 }
528 return asyncExecCallback.handleResponse(backendResponse, entityDetails);
529 }
530
531 @Override
532 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
533 asyncExecCallback.handleInformationResponse(response);
534 }
535
536 void triggerNewCacheEntryResponse(final HttpResponse backendResponse, final Date responseDate, final ByteArrayBuffer buffer) {
537 final CancellableDependency operation = scope.cancellableDependency;
538 operation.setDependency(responseCache.createCacheEntry(
539 target,
540 request,
541 backendResponse,
542 buffer,
543 requestDate,
544 responseDate,
545 new FutureCallback<HttpCacheEntry>() {
546
547 @Override
548 public void completed(final HttpCacheEntry newEntry) {
549 LOG.debug("Backend response successfully cached");
550 try {
551 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, newEntry);
552 triggerResponse(cacheResponse, scope, asyncExecCallback);
553 } catch (final ResourceIOException ex) {
554 asyncExecCallback.failed(ex);
555 }
556 }
557
558 @Override
559 public void failed(final Exception ex) {
560 asyncExecCallback.failed(ex);
561 }
562
563 @Override
564 public void cancelled() {
565 asyncExecCallback.failed(new InterruptedIOException());
566 }
567
568 }));
569
570 }
571
572 @Override
573 public void completed() {
574 final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.getAndSet(null);
575 if (cachingDataConsumer != null && !cachingDataConsumer.writtenThrough.get()) {
576 final ByteArrayBuffer buffer = cachingDataConsumer.bufferRef.getAndSet(null);
577 final HttpResponse backendResponse = cachingDataConsumer.backendResponse;
578 if (cacheConfig.isFreshnessCheckEnabled()) {
579 final CancellableDependency operation = scope.cancellableDependency;
580 operation.setDependency(responseCache.getCacheEntry(target, request, new FutureCallback<HttpCacheEntry>() {
581
582 @Override
583 public void completed(final HttpCacheEntry existingEntry) {
584 if (DateUtils.isAfter(existingEntry, backendResponse, HttpHeaders.DATE)) {
585 LOG.debug("Backend already contains fresher cache entry");
586 try {
587 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, existingEntry);
588 triggerResponse(cacheResponse, scope, asyncExecCallback);
589 } catch (final ResourceIOException ex) {
590 asyncExecCallback.failed(ex);
591 }
592 } else {
593 triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
594 }
595 }
596
597 @Override
598 public void failed(final Exception cause) {
599 asyncExecCallback.failed(cause);
600 }
601
602 @Override
603 public void cancelled() {
604 asyncExecCallback.failed(new InterruptedIOException());
605 }
606
607 }));
608 } else {
609 triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
610 }
611 } else {
612 asyncExecCallback.completed();
613 }
614 }
615
616 @Override
617 public void failed(final Exception cause) {
618 asyncExecCallback.failed(cause);
619 }
620
621 }
622
623 private void handleCacheHit(
624 final HttpHost target,
625 final HttpRequest request,
626 final AsyncEntityProducer entityProducer,
627 final AsyncExecChain.Scope scope,
628 final AsyncExecChain chain,
629 final AsyncExecCallback asyncExecCallback,
630 final HttpCacheEntry entry) {
631 final HttpClientContext context = scope.clientContext;
632 recordCacheHit(target, request);
633 final Date now = getCurrentDate();
634 if (suitabilityChecker.canCachedResponseBeUsed(target, request, entry, now)) {
635 LOG.debug("Cache hit");
636 try {
637 final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
638 triggerResponse(cacheResponse, scope, asyncExecCallback);
639 } catch (final ResourceIOException ex) {
640 recordCacheFailure(target, request);
641 if (!mayCallBackend(request)) {
642 final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
643 triggerResponse(cacheResponse, scope, asyncExecCallback);
644 } else {
645 setResponseStatus(scope.clientContext, CacheResponseStatus.FAILURE);
646 try {
647 chain.proceed(request, entityProducer, scope, asyncExecCallback);
648 } catch (final HttpException | IOException ex2) {
649 asyncExecCallback.failed(ex2);
650 }
651 }
652 }
653 } else if (!mayCallBackend(request)) {
654 LOG.debug("Cache entry not suitable but only-if-cached requested");
655 final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
656 triggerResponse(cacheResponse, scope, asyncExecCallback);
657 } else if (!(entry.getStatus() == HttpStatus.SC_NOT_MODIFIED && !suitabilityChecker.isConditional(request))) {
658 LOG.debug("Revalidating cache entry");
659 if (cacheRevalidator != null
660 && !staleResponseNotAllowed(request, entry, now)
661 && validityPolicy.mayReturnStaleWhileRevalidating(entry, now)) {
662 LOG.debug("Serving stale with asynchronous revalidation");
663 try {
664 final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
665 final String exchangeId = ExecSupport.getNextExchangeId();
666 context.setExchangeId(exchangeId);
667 final AsyncExecChain.Scope fork = new AsyncExecChain.Scope(
668 exchangeId,
669 scope.route,
670 scope.originalRequest,
671 new ComplexFuture<>(null),
672 HttpClientContext.create(),
673 scope.execRuntime.fork(),
674 scope.scheduler,
675 scope.execCount);
676 cacheRevalidator.revalidateCacheEntry(
677 responseCache.generateKey(target, request, entry),
678 asyncExecCallback,
679 new DefaultAsyncCacheRevalidator.RevalidationCall() {
680
681 @Override
682 public void execute(final AsyncExecCallback asyncExecCallback) {
683 revalidateCacheEntry(target, request, entityProducer, fork, chain, asyncExecCallback, entry);
684 }
685
686 });
687 triggerResponse(cacheResponse, scope, asyncExecCallback);
688 } catch (final ResourceIOException ex) {
689 asyncExecCallback.failed(ex);
690 }
691 } else {
692 revalidateCacheEntry(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
693 }
694 } else {
695 LOG.debug("Cache entry not usable; calling backend");
696 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
697 }
698 }
699
700 void revalidateCacheEntry(
701 final HttpHost target,
702 final HttpRequest request,
703 final AsyncEntityProducer entityProducer,
704 final AsyncExecChain.Scope scope,
705 final AsyncExecChain chain,
706 final AsyncExecCallback asyncExecCallback,
707 final HttpCacheEntry cacheEntry) {
708 final Date requestDate = getCurrentDate();
709 final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequest(
710 BasicRequestBuilder.copy(scope.originalRequest).build(),
711 cacheEntry);
712 chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
713
714 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
715
716 void triggerUpdatedCacheEntryResponse(final HttpResponse backendResponse, final Date responseDate) {
717 final CancellableDependency operation = scope.cancellableDependency;
718 recordCacheUpdate(scope.clientContext);
719 operation.setDependency(responseCache.updateCacheEntry(
720 target,
721 request,
722 cacheEntry,
723 backendResponse,
724 requestDate,
725 responseDate,
726 new FutureCallback<HttpCacheEntry>() {
727
728 @Override
729 public void completed(final HttpCacheEntry updatedEntry) {
730 if (suitabilityChecker.isConditional(request)
731 && suitabilityChecker.allConditionalsMatch(request, updatedEntry, new Date())) {
732 final SimpleHttpResponse cacheResponse = responseGenerator.generateNotModifiedResponse(updatedEntry);
733 triggerResponse(cacheResponse, scope, asyncExecCallback);
734 } else {
735 try {
736 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, updatedEntry);
737 triggerResponse(cacheResponse, scope, asyncExecCallback);
738 } catch (final ResourceIOException ex) {
739 asyncExecCallback.failed(ex);
740 }
741 }
742 }
743
744 @Override
745 public void failed(final Exception ex) {
746 asyncExecCallback.failed(ex);
747 }
748
749 @Override
750 public void cancelled() {
751 asyncExecCallback.failed(new InterruptedIOException());
752 }
753
754 }));
755 }
756
757 void triggerResponseStaleCacheEntry() {
758 try {
759 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, cacheEntry);
760 cacheResponse.addHeader(HeaderConstants.WARNING, "110 localhost \"Response is stale\"");
761 triggerResponse(cacheResponse, scope, asyncExecCallback);
762 } catch (final ResourceIOException ex) {
763 asyncExecCallback.failed(ex);
764 }
765 }
766
767 AsyncExecCallback evaluateResponse(final HttpResponse backendResponse, final Date responseDate) {
768 backendResponse.addHeader(HeaderConstants.VIA, generateViaHeader(backendResponse));
769
770 final int statusCode = backendResponse.getCode();
771 if (statusCode == HttpStatus.SC_NOT_MODIFIED || statusCode == HttpStatus.SC_OK) {
772 recordCacheUpdate(scope.clientContext);
773 }
774 if (statusCode == HttpStatus.SC_NOT_MODIFIED) {
775 return new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
776
777 @Override
778 public void run() {
779 triggerUpdatedCacheEntryResponse(backendResponse, responseDate);
780 }
781
782 });
783 }
784 if (staleIfErrorAppliesTo(statusCode)
785 && !staleResponseNotAllowed(request, cacheEntry, getCurrentDate())
786 && validityPolicy.mayReturnStaleIfError(request, cacheEntry, responseDate)) {
787 return new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
788
789 @Override
790 public void run() {
791 triggerResponseStaleCacheEntry();
792 }
793
794 });
795 }
796 return new BackendResponseHandler(target, conditionalRequest, requestDate, responseDate, scope, asyncExecCallback);
797 }
798
799 @Override
800 public AsyncDataConsumer handleResponse(
801 final HttpResponse backendResponse1,
802 final EntityDetails entityDetails) throws HttpException, IOException {
803
804 final Date responseDate1 = getCurrentDate();
805
806 final AsyncExecCallback callback1;
807 if (revalidationResponseIsTooOld(backendResponse1, cacheEntry)
808 && (entityProducer == null || entityProducer.isRepeatable())) {
809
810 final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(
811 BasicRequestBuilder.copy(scope.originalRequest).build());
812
813 callback1 = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
814
815 @Override
816 public void run() {
817 chainProceed(unconditional, entityProducer, scope, chain, new AsyncExecCallback() {
818
819 @Override
820 public AsyncDataConsumer handleResponse(
821 final HttpResponse backendResponse2,
822 final EntityDetails entityDetails) throws HttpException, IOException {
823 final Date responseDate2 = getCurrentDate();
824 final AsyncExecCallback callback2 = evaluateResponse(backendResponse2, responseDate2);
825 callbackRef.set(callback2);
826 return callback2.handleResponse(backendResponse2, entityDetails);
827 }
828
829 @Override
830 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
831 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
832 if (callback2 != null) {
833 callback2.handleInformationResponse(response);
834 } else {
835 asyncExecCallback.handleInformationResponse(response);
836 }
837 }
838
839 @Override
840 public void completed() {
841 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
842 if (callback2 != null) {
843 callback2.completed();
844 } else {
845 asyncExecCallback.completed();
846 }
847 }
848
849 @Override
850 public void failed(final Exception cause) {
851 final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
852 if (callback2 != null) {
853 callback2.failed(cause);
854 } else {
855 asyncExecCallback.failed(cause);
856 }
857 }
858
859 });
860
861 }
862
863 });
864 } else {
865 callback1 = evaluateResponse(backendResponse1, responseDate1);
866 }
867 callbackRef.set(callback1);
868 return callback1.handleResponse(backendResponse1, entityDetails);
869 }
870
871 @Override
872 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
873 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
874 if (callback1 != null) {
875 callback1.handleInformationResponse(response);
876 } else {
877 asyncExecCallback.handleInformationResponse(response);
878 }
879 }
880
881 @Override
882 public void completed() {
883 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
884 if (callback1 != null) {
885 callback1.completed();
886 } else {
887 asyncExecCallback.completed();
888 }
889 }
890
891 @Override
892 public void failed(final Exception cause) {
893 final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
894 if (callback1 != null) {
895 callback1.failed(cause);
896 } else {
897 asyncExecCallback.failed(cause);
898 }
899 }
900
901 });
902
903 }
904
905 private void handleCacheMiss(
906 final HttpHost target,
907 final HttpRequest request,
908 final AsyncEntityProducer entityProducer,
909 final AsyncExecChain.Scope scope,
910 final AsyncExecChain chain,
911 final AsyncExecCallback asyncExecCallback) {
912 recordCacheMiss(target, request);
913
914 if (mayCallBackend(request)) {
915 final CancellableDependency operation = scope.cancellableDependency;
916 operation.setDependency(responseCache.getVariantCacheEntriesWithEtags(
917 target,
918 request,
919 new FutureCallback<Map<String, Variant>>() {
920
921 @Override
922 public void completed(final Map<String, Variant> variants) {
923 if (variants != null && !variants.isEmpty() && (entityProducer == null || entityProducer.isRepeatable())) {
924 negotiateResponseFromVariants(target, request, entityProducer, scope, chain, asyncExecCallback, variants);
925 } else {
926 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
927 }
928 }
929
930 @Override
931 public void failed(final Exception ex) {
932 asyncExecCallback.failed(ex);
933 }
934
935 @Override
936 public void cancelled() {
937 asyncExecCallback.failed(new InterruptedIOException());
938 }
939
940 }));
941 } else {
942 final SimpleHttpResponse cacheResponse = SimpleHttpResponse.create(HttpStatus.SC_GATEWAY_TIMEOUT, "Gateway Timeout");
943 triggerResponse(cacheResponse, scope, asyncExecCallback);
944 }
945 }
946
947 void negotiateResponseFromVariants(
948 final HttpHost target,
949 final HttpRequest request,
950 final AsyncEntityProducer entityProducer,
951 final AsyncExecChain.Scope scope,
952 final AsyncExecChain chain,
953 final AsyncExecCallback asyncExecCallback,
954 final Map<String, Variant> variants) {
955 final CancellableDependency operation = scope.cancellableDependency;
956 final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequestFromVariants(
957 BasicRequestBuilder.copy(request).build(),
958 variants);
959
960 final Date requestDate = getCurrentDate();
961 chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
962
963 final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
964
965 void updateVariantCacheEntry(final HttpResponse backendResponse, final Date responseDate, final Variant matchingVariant) {
966 recordCacheUpdate(scope.clientContext);
967 operation.setDependency(responseCache.updateVariantCacheEntry(
968 target,
969 conditionalRequest,
970 backendResponse,
971 matchingVariant,
972 requestDate,
973 responseDate,
974 new FutureCallback<HttpCacheEntry>() {
975
976 @Override
977 public void completed(final HttpCacheEntry responseEntry) {
978 if (shouldSendNotModifiedResponse(request, responseEntry)) {
979 final SimpleHttpResponse cacheResponse = responseGenerator.generateNotModifiedResponse(responseEntry);
980 triggerResponse(cacheResponse, scope, asyncExecCallback);
981 } else {
982 try {
983 final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, responseEntry);
984 operation.setDependency(responseCache.reuseVariantEntryFor(
985 target,
986 request,
987 matchingVariant,
988 new FutureCallback<Boolean>() {
989
990 @Override
991 public void completed(final Boolean result) {
992 triggerResponse(cacheResponse, scope, asyncExecCallback);
993 }
994
995 @Override
996 public void failed(final Exception ex) {
997 asyncExecCallback.failed(ex);
998 }
999
1000 @Override
1001 public void cancelled() {
1002 asyncExecCallback.failed(new InterruptedIOException());
1003 }
1004
1005 }));
1006 } catch (final ResourceIOException ex) {
1007 asyncExecCallback.failed(ex);
1008 }
1009 }
1010 }
1011
1012 @Override
1013 public void failed(final Exception ex) {
1014 asyncExecCallback.failed(ex);
1015 }
1016
1017 @Override
1018 public void cancelled() {
1019 asyncExecCallback.failed(new InterruptedIOException());
1020 }
1021
1022 }));
1023 }
1024
1025 @Override
1026 public AsyncDataConsumer handleResponse(
1027 final HttpResponse backendResponse,
1028 final EntityDetails entityDetails) throws HttpException, IOException {
1029 final Date responseDate = getCurrentDate();
1030 backendResponse.addHeader("Via", generateViaHeader(backendResponse));
1031
1032 final AsyncExecCallback callback;
1033
1034 if (backendResponse.getCode() != HttpStatus.SC_NOT_MODIFIED) {
1035 callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
1036 } else {
1037 final Header resultEtagHeader = backendResponse.getFirstHeader(HeaderConstants.ETAG);
1038 if (resultEtagHeader == null) {
1039 LOG.warn("304 response did not contain ETag");
1040 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1041
1042 @Override
1043 public void run() {
1044 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
1045 }
1046
1047 });
1048 } else {
1049 final String resultEtag = resultEtagHeader.getValue();
1050 final Variant matchingVariant = variants.get(resultEtag);
1051 if (matchingVariant == null) {
1052 LOG.debug("304 response did not contain ETag matching one sent in If-None-Match");
1053 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1054
1055 @Override
1056 public void run() {
1057 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
1058 }
1059
1060 });
1061 } else {
1062 if (revalidationResponseIsTooOld(backendResponse, matchingVariant.getEntry())) {
1063 final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(
1064 BasicRequestBuilder.copy(request).build());
1065 scope.clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, unconditional);
1066 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1067
1068 @Override
1069 public void run() {
1070 callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
1071 }
1072
1073 });
1074 } else {
1075 callback = new AsyncExecCallbackWrapper(asyncExecCallback, new Runnable() {
1076
1077 @Override
1078 public void run() {
1079 updateVariantCacheEntry(backendResponse, responseDate, matchingVariant);
1080 }
1081
1082 });
1083 }
1084 }
1085 }
1086 }
1087 callbackRef.set(callback);
1088 return callback.handleResponse(backendResponse, entityDetails);
1089 }
1090
1091 @Override
1092 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
1093 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1094 if (callback != null) {
1095 callback.handleInformationResponse(response);
1096 } else {
1097 asyncExecCallback.handleInformationResponse(response);
1098 }
1099 }
1100
1101 @Override
1102 public void completed() {
1103 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1104 if (callback != null) {
1105 callback.completed();
1106 } else {
1107 asyncExecCallback.completed();
1108 }
1109 }
1110
1111 @Override
1112 public void failed(final Exception cause) {
1113 final AsyncExecCallback callback = callbackRef.getAndSet(null);
1114 if (callback != null) {
1115 callback.failed(cause);
1116 } else {
1117 asyncExecCallback.failed(cause);
1118 }
1119 }
1120
1121 });
1122
1123 }
1124
1125 }