1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
37 import org.apache.hc.client5.http.HttpRoute;
38 import org.apache.hc.client5.http.UserTokenHandler;
39 import org.apache.hc.client5.http.async.AsyncExecCallback;
40 import org.apache.hc.client5.http.async.AsyncExecChain;
41 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
42 import org.apache.hc.client5.http.async.AsyncExecRuntime;
43 import org.apache.hc.client5.http.protocol.HttpClientContext;
44 import org.apache.hc.core5.annotation.Contract;
45 import org.apache.hc.core5.annotation.Internal;
46 import org.apache.hc.core5.annotation.ThreadingBehavior;
47 import org.apache.hc.core5.concurrent.CancellableDependency;
48 import org.apache.hc.core5.http.EntityDetails;
49 import org.apache.hc.core5.http.Header;
50 import org.apache.hc.core5.http.HttpException;
51 import org.apache.hc.core5.http.HttpRequest;
52 import org.apache.hc.core5.http.HttpResponse;
53 import org.apache.hc.core5.http.HttpStatus;
54 import org.apache.hc.core5.http.message.RequestLine;
55 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
56 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
57 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
58 import org.apache.hc.core5.http.nio.CapacityChannel;
59 import org.apache.hc.core5.http.nio.DataStreamChannel;
60 import org.apache.hc.core5.http.nio.RequestChannel;
61 import org.apache.hc.core5.http.protocol.HttpContext;
62 import org.apache.hc.core5.util.TimeValue;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66
67
68
69
70
71
72
73 @Contract(threading = ThreadingBehavior.STATELESS)
74 @Internal
75 class HttpAsyncMainClientExec implements AsyncExecChainHandler {
76
77 private static final Logger LOG = LoggerFactory.getLogger(HttpAsyncMainClientExec.class);
78
79 private final ConnectionKeepAliveStrategy keepAliveStrategy;
80 private final UserTokenHandler userTokenHandler;
81
82 HttpAsyncMainClientExec(final ConnectionKeepAliveStrategy keepAliveStrategy, final UserTokenHandler userTokenHandler) {
83 this.keepAliveStrategy = keepAliveStrategy;
84 this.userTokenHandler = userTokenHandler;
85 }
86
87 @Override
88 public void execute(
89 final HttpRequest request,
90 final AsyncEntityProducer entityProducer,
91 final AsyncExecChain.Scope scope,
92 final AsyncExecChain chain,
93 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
94 final String exchangeId = scope.exchangeId;
95 final HttpRoute route = scope.route;
96 final CancellableDependency operation = scope.cancellableDependency;
97 final HttpClientContext clientContext = scope.clientContext;
98 final AsyncExecRuntime execRuntime = scope.execRuntime;
99
100 if (LOG.isDebugEnabled()) {
101 LOG.debug("{} executing {}", exchangeId, new RequestLine(request));
102 }
103
104 final AtomicInteger messageCountDown = new AtomicInteger(2);
105 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
106
107 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
108
109 @Override
110 public void releaseResources() {
111 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
112 if (entityConsumer != null) {
113 entityConsumer.releaseResources();
114 }
115 }
116
117 @Override
118 public void failed(final Exception cause) {
119 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
120 if (entityConsumer != null) {
121 entityConsumer.releaseResources();
122 }
123 execRuntime.markConnectionNonReusable();
124 asyncExecCallback.failed(cause);
125 }
126
127 @Override
128 public void cancel() {
129 failed(new InterruptedIOException());
130 }
131
132 @Override
133 public void produceRequest(
134 final RequestChannel channel,
135 final HttpContext context) throws HttpException, IOException {
136 channel.sendRequest(request, entityProducer, context);
137 if (entityProducer == null) {
138 messageCountDown.decrementAndGet();
139 }
140 }
141
142 @Override
143 public int available() {
144 return entityProducer.available();
145 }
146
147 @Override
148 public void produce(final DataStreamChannel channel) throws IOException {
149 entityProducer.produce(new DataStreamChannel() {
150
151 @Override
152 public void requestOutput() {
153 channel.requestOutput();
154 }
155
156 @Override
157 public int write(final ByteBuffer src) throws IOException {
158 return channel.write(src);
159 }
160
161 @Override
162 public void endStream(final List<? extends Header> trailers) throws IOException {
163 channel.endStream(trailers);
164 if (messageCountDown.decrementAndGet() <= 0) {
165 asyncExecCallback.completed();
166 }
167 }
168
169 @Override
170 public void endStream() throws IOException {
171 channel.endStream();
172 if (messageCountDown.decrementAndGet() <= 0) {
173 asyncExecCallback.completed();
174 }
175 }
176
177 });
178 }
179
180 @Override
181 public void consumeInformation(
182 final HttpResponse response,
183 final HttpContext context) throws HttpException, IOException {
184 asyncExecCallback.handleInformationResponse(response);
185 }
186
187 @Override
188 public void consumeResponse(
189 final HttpResponse response,
190 final EntityDetails entityDetails,
191 final HttpContext context) throws HttpException, IOException {
192 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
193 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
194 messageCountDown.decrementAndGet();
195 }
196 final TimeValue keepAliveDuration = keepAliveStrategy.getKeepAliveDuration(response, clientContext);
197 Object userToken = clientContext.getUserToken();
198 if (userToken == null) {
199 userToken = userTokenHandler.getUserToken(route, clientContext);
200 clientContext.setAttribute(HttpClientContext.USER_TOKEN, userToken);
201 }
202 execRuntime.markConnectionReusable(userToken, keepAliveDuration);
203 if (entityDetails == null) {
204 execRuntime.validateConnection();
205 if (messageCountDown.decrementAndGet() <= 0) {
206 asyncExecCallback.completed();
207 }
208 }
209 }
210
211 @Override
212 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
213 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
214 if (entityConsumer != null) {
215 entityConsumer.updateCapacity(capacityChannel);
216 } else {
217 capacityChannel.update(Integer.MAX_VALUE);
218 }
219 }
220
221 @Override
222 public void consume(final ByteBuffer src) throws IOException {
223 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
224 if (entityConsumer != null) {
225 entityConsumer.consume(src);
226 }
227 }
228
229 @Override
230 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
231 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
232 if (entityConsumer != null) {
233 entityConsumer.streamEnd(trailers);
234 } else {
235 execRuntime.validateConnection();
236 }
237 if (messageCountDown.decrementAndGet() <= 0) {
238 asyncExecCallback.completed();
239 }
240 }
241
242 };
243
244 if (LOG.isDebugEnabled()) {
245 operation.setDependency(execRuntime.execute(
246 exchangeId,
247 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
248 clientContext));
249 } else {
250 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
251 }
252 }
253
254 }