1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.testing.async;
29
30 import org.apache.hc.core5.function.Resolver;
31 import org.apache.hc.core5.http.EntityDetails;
32 import org.apache.hc.core5.http.Header;
33 import org.apache.hc.core5.http.HttpException;
34 import org.apache.hc.core5.http.HttpHeaders;
35 import org.apache.hc.core5.http.HttpRequest;
36 import org.apache.hc.core5.http.HttpResponse;
37 import org.apache.hc.core5.http.HttpStatus;
38 import org.apache.hc.core5.http.HttpVersion;
39 import org.apache.hc.core5.http.ProtocolVersion;
40 import org.apache.hc.core5.http.message.BasicHttpResponse;
41 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
42 import org.apache.hc.core5.http.nio.CapacityChannel;
43 import org.apache.hc.core5.http.nio.DataStreamChannel;
44 import org.apache.hc.core5.http.nio.ResponseChannel;
45 import org.apache.hc.core5.http.protocol.HttpContext;
46 import org.apache.hc.core5.util.Args;
47 import org.apache.hc.core5.util.TimeValue;
48
49 import java.io.IOException;
50 import java.nio.ByteBuffer;
51 import java.util.List;
52 import java.util.concurrent.atomic.AtomicBoolean;
53
54 public class ServiceUnavailableAsyncDecorator implements AsyncServerExchangeHandler {
55
56 private final AsyncServerExchangeHandler exchangeHandler;
57 private final Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver;
58 private final AtomicBoolean serviceUnavailable;
59
60 public ServiceUnavailableAsyncDecorator(final AsyncServerExchangeHandler exchangeHandler,
61 final Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver) {
62 this.exchangeHandler = Args.notNull(exchangeHandler, "Exchange handler");
63 this.serviceAvailabilityResolver = Args.notNull(serviceAvailabilityResolver, "Service availability resolver");
64 this.serviceUnavailable = new AtomicBoolean();
65 }
66
67 @Override
68 public void handleRequest(final HttpRequest request,
69 final EntityDetails entityDetails,
70 final ResponseChannel responseChannel,
71 final HttpContext context) throws HttpException, IOException {
72 final TimeValue retryAfter = serviceAvailabilityResolver.resolve(request);
73 serviceUnavailable.set(TimeValue.isPositive(retryAfter));
74 if (serviceUnavailable.get()) {
75 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
76 response.addHeader(HttpHeaders.RETRY_AFTER, Long.toString(retryAfter.toSeconds()));
77 final ProtocolVersion version = request.getVersion();
78 if (version != null && version.compareToVersion(HttpVersion.HTTP_2) < 0) {
79 response.addHeader(HttpHeaders.CONNECTION, "Close");
80 }
81 responseChannel.sendResponse(response, null, context);
82 } else {
83 exchangeHandler.handleRequest(request, entityDetails, responseChannel, context);
84 }
85 }
86
87 @Override
88 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
89 if (!serviceUnavailable.get()) {
90 exchangeHandler.updateCapacity(capacityChannel);
91 } else {
92 capacityChannel.update(Integer.MAX_VALUE);
93 }
94 }
95
96 @Override
97 public final void consume(final ByteBuffer src) throws IOException {
98 if (!serviceUnavailable.get()) {
99 exchangeHandler.consume(src);
100 }
101 }
102
103 @Override
104 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
105 if (!serviceUnavailable.get()) {
106 exchangeHandler.streamEnd(trailers);
107 }
108 }
109
110 @Override
111 public int available() {
112 if (!serviceUnavailable.get()) {
113 return exchangeHandler.available();
114 } else {
115 return 0;
116 }
117 }
118
119 @Override
120 public void produce(final DataStreamChannel channel) throws IOException {
121 if (!serviceUnavailable.get()) {
122 exchangeHandler.produce(channel);
123 }
124 }
125
126 @Override
127 public void failed(final Exception cause) {
128 if (!serviceUnavailable.get()) {
129 exchangeHandler.failed(cause);
130 }
131 }
132
133 @Override
134 public void releaseResources() {
135 exchangeHandler.releaseResources();
136 }
137
138 }