1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.config.Configurable;
41 import org.apache.hc.client5.http.config.RequestConfig;
42 import org.apache.hc.client5.http.impl.ConnPoolSupport;
43 import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
44 import org.apache.hc.client5.http.impl.ExecSupport;
45 import org.apache.hc.client5.http.impl.classic.RequestFailedException;
46 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
47 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
48 import org.apache.hc.client5.http.protocol.HttpClientContext;
49 import org.apache.hc.client5.http.routing.RoutingSupport;
50 import org.apache.hc.core5.annotation.Contract;
51 import org.apache.hc.core5.annotation.ThreadingBehavior;
52 import org.apache.hc.core5.concurrent.BasicFuture;
53 import org.apache.hc.core5.concurrent.Cancellable;
54 import org.apache.hc.core5.concurrent.ComplexCancellable;
55 import org.apache.hc.core5.concurrent.ComplexFuture;
56 import org.apache.hc.core5.concurrent.FutureCallback;
57 import org.apache.hc.core5.function.Callback;
58 import org.apache.hc.core5.http.EntityDetails;
59 import org.apache.hc.core5.http.Header;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpRequest;
63 import org.apache.hc.core5.http.HttpResponse;
64 import org.apache.hc.core5.http.HttpStatus;
65 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
66 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68 import org.apache.hc.core5.http.nio.CapacityChannel;
69 import org.apache.hc.core5.http.nio.DataStreamChannel;
70 import org.apache.hc.core5.http.nio.HandlerFactory;
71 import org.apache.hc.core5.http.nio.RequestChannel;
72 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
73 import org.apache.hc.core5.http.protocol.HttpContext;
74 import org.apache.hc.core5.http2.HttpVersionPolicy;
75 import org.apache.hc.core5.io.CloseMode;
76 import org.apache.hc.core5.io.Closer;
77 import org.apache.hc.core5.reactor.Command;
78 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
80 import org.apache.hc.core5.reactor.IOReactorConfig;
81 import org.apache.hc.core5.reactor.IOSession;
82 import org.apache.hc.core5.util.Args;
83 import org.apache.hc.core5.util.Asserts;
84 import org.apache.hc.core5.util.TimeValue;
85 import org.apache.hc.core5.util.Timeout;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
88
89
90
91
92
93
94
95
96
97
98
99
100
101 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
102 public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
103
104 private static final Logger LOG = LoggerFactory.getLogger(MinimalHttpAsyncClient.class);
105 private final AsyncClientConnectionManager manager;
106 private final SchemePortResolver schemePortResolver;
107 private final HttpVersionPolicy versionPolicy;
108
109 MinimalHttpAsyncClient(
110 final IOEventHandlerFactory eventHandlerFactory,
111 final AsyncPushConsumerRegistry pushConsumerRegistry,
112 final HttpVersionPolicy versionPolicy,
113 final IOReactorConfig reactorConfig,
114 final ThreadFactory threadFactory,
115 final ThreadFactory workerThreadFactory,
116 final AsyncClientConnectionManager manager,
117 final SchemePortResolver schemePortResolver) {
118 super(new DefaultConnectingIOReactor(
119 eventHandlerFactory,
120 reactorConfig,
121 workerThreadFactory,
122 LoggingIOSessionDecorator.INSTANCE,
123 LoggingExceptionCallback.INSTANCE,
124 null,
125 new Callback<IOSession>() {
126
127 @Override
128 public void execute(final IOSession ioSession) {
129 ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL);
130 }
131
132 }),
133 pushConsumerRegistry,
134 threadFactory);
135 this.manager = manager;
136 this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
137 this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
138 }
139
140 private Future<AsyncConnectionEndpoint> leaseEndpoint(
141 final HttpHost host,
142 final Timeout connectionRequestTimeout,
143 final Timeout connectTimeout,
144 final HttpClientContext clientContext,
145 final FutureCallback<AsyncConnectionEndpoint> callback) {
146 final HttpRoute/http/HttpRoute.html#HttpRoute">HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
147 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
148 final String exchangeId = ExecSupport.getNextExchangeId();
149 clientContext.setExchangeId(exchangeId);
150 final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
151 exchangeId,
152 route,
153 null,
154 connectionRequestTimeout,
155 new FutureCallback<AsyncConnectionEndpoint>() {
156
157 @Override
158 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
159 if (connectionEndpoint.isConnected()) {
160 resultFuture.completed(connectionEndpoint);
161 } else {
162 final Future<AsyncConnectionEndpoint> connectFuture = manager.connect(
163 connectionEndpoint,
164 getConnectionInitiator(),
165 connectTimeout,
166 versionPolicy,
167 clientContext,
168 new FutureCallback<AsyncConnectionEndpoint>() {
169
170 @Override
171 public void completed(final AsyncConnectionEndpoint result) {
172 resultFuture.completed(result);
173 }
174
175 @Override
176 public void failed(final Exception ex) {
177 try {
178 Closer.closeQuietly(connectionEndpoint);
179 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
180 } finally {
181 resultFuture.failed(ex);
182 }
183 }
184
185 @Override
186 public void cancelled() {
187 try {
188 Closer.closeQuietly(connectionEndpoint);
189 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
190 } finally {
191 resultFuture.cancel(true);
192 }
193 }
194
195 });
196 resultFuture.setDependency(connectFuture);
197 }
198 }
199
200 @Override
201 public void failed(final Exception ex) {
202 callback.failed(ex);
203 }
204
205 @Override
206 public void cancelled() {
207 callback.cancelled();
208 }
209
210 });
211 resultFuture.setDependency(leaseFuture);
212 return resultFuture;
213 }
214
215 public Future<AsyncClientEndpoint> lease(
216 final HttpHost host,
217 final FutureCallback<AsyncClientEndpoint> callback) {
218 return lease(host, HttpClientContext.create(), callback);
219 }
220
221 public Future<AsyncClientEndpoint> lease(
222 final HttpHost host,
223 final HttpContext context,
224 final FutureCallback<AsyncClientEndpoint> callback) {
225 Args.notNull(host, "Host");
226 Args.notNull(context, "HTTP context");
227 final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
228 if (!isRunning()) {
229 future.failed(new CancellationException("Connection lease cancelled"));
230 return future;
231 }
232 final HttpClientContext clientContext = HttpClientContext.adapt(context);
233 final RequestConfig requestConfig = clientContext.getRequestConfig();
234 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
235 final Timeout connectTimeout = requestConfig.getConnectTimeout();
236 leaseEndpoint(
237 host,
238 connectionRequestTimeout,
239 connectTimeout,
240 clientContext,
241 new FutureCallback<AsyncConnectionEndpoint>() {
242
243 @Override
244 public void completed(final AsyncConnectionEndpoint result) {
245 future.completed(new InternalAsyncClientEndpoint(result));
246 }
247
248 @Override
249 public void failed(final Exception ex) {
250 future.failed(ex);
251 }
252
253 @Override
254 public void cancelled() {
255 future.cancel(true);
256 }
257
258 });
259 return future;
260 }
261
262 @Override
263 public Cancellable execute(
264 final AsyncClientExchangeHandler exchangeHandler,
265 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
266 final HttpContext context) {
267 final ComplexCancellable cancellable = new ComplexCancellable();
268 try {
269 if (!isRunning()) {
270 throw new CancellationException("Request execution cancelled");
271 }
272 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
273 exchangeHandler.produceRequest(new RequestChannel() {
274
275 @Override
276 public void sendRequest(
277 final HttpRequest request,
278 final EntityDetails entityDetails,
279 final HttpContext context) throws HttpException, IOException {
280 RequestConfig requestConfig = null;
281 if (request instanceof Configurable) {
282 requestConfig = ((Configurable) request).getConfig();
283 }
284 if (requestConfig != null) {
285 clientContext.setRequestConfig(requestConfig);
286 } else {
287 requestConfig = clientContext.getRequestConfig();
288 }
289 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
290 final Timeout connectTimeout = requestConfig.getConnectTimeout();
291 final Timeout responseTimeout = requestConfig.getResponseTimeout();
292 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
293
294 final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
295 target,
296 connectionRequestTimeout,
297 connectTimeout,
298 clientContext,
299 new FutureCallback<AsyncConnectionEndpoint>() {
300
301 @Override
302 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
303 final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
304 final AtomicInteger messageCountDown = new AtomicInteger(2);
305 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
306
307 @Override
308 public void releaseResources() {
309 try {
310 exchangeHandler.releaseResources();
311 } finally {
312 endpoint.releaseAndDiscard();
313 }
314 }
315
316 @Override
317 public void failed(final Exception cause) {
318 try {
319 exchangeHandler.failed(cause);
320 } finally {
321 endpoint.releaseAndDiscard();
322 }
323 }
324
325 @Override
326 public void cancel() {
327 failed(new RequestFailedException("Request aborted"));
328 }
329
330 @Override
331 public void produceRequest(
332 final RequestChannel channel,
333 final HttpContext context) throws HttpException, IOException {
334 channel.sendRequest(request, entityDetails, context);
335 if (entityDetails == null) {
336 messageCountDown.decrementAndGet();
337 }
338 }
339
340 @Override
341 public int available() {
342 return exchangeHandler.available();
343 }
344
345 @Override
346 public void produce(final DataStreamChannel channel) throws IOException {
347 exchangeHandler.produce(new DataStreamChannel() {
348
349 @Override
350 public void requestOutput() {
351 channel.requestOutput();
352 }
353
354 @Override
355 public int write(final ByteBuffer src) throws IOException {
356 return channel.write(src);
357 }
358
359 @Override
360 public void endStream(final List<? extends Header> trailers) throws IOException {
361 channel.endStream(trailers);
362 if (messageCountDown.decrementAndGet() <= 0) {
363 endpoint.releaseAndReuse();
364 }
365 }
366
367 @Override
368 public void endStream() throws IOException {
369 channel.endStream();
370 if (messageCountDown.decrementAndGet() <= 0) {
371 endpoint.releaseAndReuse();
372 }
373 }
374
375 });
376 }
377
378 @Override
379 public void consumeInformation(
380 final HttpResponse response,
381 final HttpContext context) throws HttpException, IOException {
382 exchangeHandler.consumeInformation(response, context);
383 }
384
385 @Override
386 public void consumeResponse(
387 final HttpResponse response,
388 final EntityDetails entityDetails,
389 final HttpContext context) throws HttpException, IOException {
390 exchangeHandler.consumeResponse(response, entityDetails, context);
391 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
392 messageCountDown.decrementAndGet();
393 }
394 if (entityDetails == null) {
395 if (messageCountDown.decrementAndGet() <= 0) {
396 endpoint.releaseAndReuse();
397 }
398 }
399 }
400
401 @Override
402 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
403 exchangeHandler.updateCapacity(capacityChannel);
404 }
405
406 @Override
407 public void consume(final ByteBuffer src) throws IOException {
408 exchangeHandler.consume(src);
409 }
410
411 @Override
412 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
413 if (messageCountDown.decrementAndGet() <= 0) {
414 endpoint.releaseAndReuse();
415 }
416 exchangeHandler.streamEnd(trailers);
417 }
418
419 };
420 if (responseTimeout != null) {
421 endpoint.setSocketTimeout(responseTimeout);
422 }
423 endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
424 }
425
426 @Override
427 public void failed(final Exception ex) {
428 exchangeHandler.failed(ex);
429 }
430
431 @Override
432 public void cancelled() {
433 exchangeHandler.cancel();
434 }
435
436 });
437
438 cancellable.setDependency(new Cancellable() {
439
440 @Override
441 public boolean cancel() {
442 return leaseFuture.cancel(true);
443 }
444
445 });
446 }
447 }, context);
448
449 } catch (final HttpException | IOException | IllegalStateException ex) {
450 exchangeHandler.failed(ex);
451 }
452 return cancellable;
453 }
454
455 private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
456
457 private final AsyncConnectionEndpoint connectionEndpoint;
458 private final AtomicBoolean released;
459
460 InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
461 this.connectionEndpoint = connectionEndpoint;
462 this.released = new AtomicBoolean(false);
463 }
464
465 boolean isReleased() {
466 return released.get();
467 }
468
469 @Override
470 public boolean isConnected() {
471 return !isReleased() && connectionEndpoint.isConnected();
472 }
473
474 @Override
475 public void execute(
476 final AsyncClientExchangeHandler exchangeHandler,
477 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
478 final HttpContext context) {
479 Asserts.check(!released.get(), "Endpoint has already been released");
480
481 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
482 final String exchangeId = ExecSupport.getNextExchangeId();
483 clientContext.setExchangeId(exchangeId);
484 if (LOG.isDebugEnabled()) {
485 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
486 connectionEndpoint.execute(
487 exchangeId,
488 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
489 pushHandlerFactory,
490 clientContext);
491 } else {
492 connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
493 }
494 }
495
496 public void setSocketTimeout(final Timeout timeout) {
497 connectionEndpoint.setSocketTimeout(timeout);
498 }
499
500 @Override
501 public void releaseAndReuse() {
502 if (released.compareAndSet(false, true)) {
503 manager.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECOND);
504 }
505 }
506
507 @Override
508 public void releaseAndDiscard() {
509 if (released.compareAndSet(false, true)) {
510 Closer.closeQuietly(connectionEndpoint);
511 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
512 }
513 }
514
515 }
516
517 }