1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.bootstrap;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.hc.core5.annotation.Internal;
38 import org.apache.hc.core5.concurrent.BasicFuture;
39 import org.apache.hc.core5.concurrent.CallbackContribution;
40 import org.apache.hc.core5.concurrent.ComplexFuture;
41 import org.apache.hc.core5.concurrent.FutureCallback;
42 import org.apache.hc.core5.concurrent.FutureContribution;
43 import org.apache.hc.core5.function.Callback;
44 import org.apache.hc.core5.function.Decorator;
45 import org.apache.hc.core5.http.ConnectionClosedException;
46 import org.apache.hc.core5.http.EntityDetails;
47 import org.apache.hc.core5.http.Header;
48 import org.apache.hc.core5.http.HttpConnection;
49 import org.apache.hc.core5.http.HttpException;
50 import org.apache.hc.core5.http.HttpHost;
51 import org.apache.hc.core5.http.HttpResponse;
52 import org.apache.hc.core5.http.impl.DefaultAddressResolver;
53 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
54 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
55 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
56 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
57 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
58 import org.apache.hc.core5.http.nio.CapacityChannel;
59 import org.apache.hc.core5.http.nio.DataStreamChannel;
60 import org.apache.hc.core5.http.nio.HandlerFactory;
61 import org.apache.hc.core5.http.nio.RequestChannel;
62 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
63 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
64 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
65 import org.apache.hc.core5.http.nio.ssl.TlsUpgradeCapable;
66 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
67 import org.apache.hc.core5.http.protocol.HttpContext;
68 import org.apache.hc.core5.http.protocol.HttpCoreContext;
69 import org.apache.hc.core5.io.CloseMode;
70 import org.apache.hc.core5.net.NamedEndpoint;
71 import org.apache.hc.core5.net.URIAuthority;
72 import org.apache.hc.core5.pool.ConnPoolControl;
73 import org.apache.hc.core5.pool.ManagedConnPool;
74 import org.apache.hc.core5.pool.PoolEntry;
75 import org.apache.hc.core5.pool.PoolStats;
76 import org.apache.hc.core5.reactor.Command;
77 import org.apache.hc.core5.reactor.EndpointParameters;
78 import org.apache.hc.core5.reactor.IOEventHandler;
79 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
80 import org.apache.hc.core5.reactor.IOReactorConfig;
81 import org.apache.hc.core5.reactor.IOSession;
82 import org.apache.hc.core5.reactor.IOSessionListener;
83 import org.apache.hc.core5.reactor.ProtocolIOSession;
84 import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
85 import org.apache.hc.core5.util.Args;
86 import org.apache.hc.core5.util.TimeValue;
87 import org.apache.hc.core5.util.Timeout;
88
89
90
91
92
93
94 public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
95
96 private final ManagedConnPool<HttpHost, IOSession> connPool;
97 private final TlsStrategy tlsStrategy;
98 private final Timeout handshakeTimeout;
99
100
101
102
103
104
105 @Internal
106 public HttpAsyncRequester(
107 final IOReactorConfig ioReactorConfig,
108 final IOEventHandlerFactory eventHandlerFactory,
109 final Decorator<IOSession> ioSessionDecorator,
110 final Callback<Exception> exceptionCallback,
111 final IOSessionListener sessionListener,
112 final ManagedConnPool<HttpHost, IOSession> connPool,
113 final TlsStrategy tlsStrategy,
114 final Timeout handshakeTimeout) {
115 super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
116 ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
117 this.connPool = Args.notNull(connPool, "Connection pool");
118 this.tlsStrategy = tlsStrategy;
119 this.handshakeTimeout = handshakeTimeout;
120 }
121
122
123
124
125 @Internal
126 public HttpAsyncRequester(
127 final IOReactorConfig ioReactorConfig,
128 final IOEventHandlerFactory eventHandlerFactory,
129 final Decorator<IOSession> ioSessionDecorator,
130 final Callback<Exception> exceptionCallback,
131 final IOSessionListener sessionListener,
132 final ManagedConnPool<HttpHost, IOSession> connPool) {
133 this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
134 null, null);
135 }
136
137 @Override
138 public PoolStats getTotalStats() {
139 return connPool.getTotalStats();
140 }
141
142 @Override
143 public PoolStats getStats(final HttpHost route) {
144 return connPool.getStats(route);
145 }
146
147 @Override
148 public void setMaxTotal(final int max) {
149 connPool.setMaxTotal(max);
150 }
151
152 @Override
153 public int getMaxTotal() {
154 return connPool.getMaxTotal();
155 }
156
157 @Override
158 public void setDefaultMaxPerRoute(final int max) {
159 connPool.setDefaultMaxPerRoute(max);
160 }
161
162 @Override
163 public int getDefaultMaxPerRoute() {
164 return connPool.getDefaultMaxPerRoute();
165 }
166
167 @Override
168 public void setMaxPerRoute(final HttpHost route, final int max) {
169 connPool.setMaxPerRoute(route, max);
170 }
171
172 @Override
173 public int getMaxPerRoute(final HttpHost route) {
174 return connPool.getMaxPerRoute(route);
175 }
176
177 @Override
178 public void closeIdle(final TimeValue idleTime) {
179 connPool.closeIdle(idleTime);
180 }
181
182 @Override
183 public void closeExpired() {
184 connPool.closeExpired();
185 }
186
187 @Override
188 public Set<HttpHost> getRoutes() {
189 return connPool.getRoutes();
190 }
191
192 public Future<AsyncClientEndpoint> connect(
193 final HttpHost host,
194 final Timeout timeout,
195 final Object attachment,
196 final FutureCallback<AsyncClientEndpoint> callback) {
197 return doConnect(host, timeout, attachment, callback);
198 }
199
200 protected Future<AsyncClientEndpoint> doConnect(
201 final HttpHost host,
202 final Timeout timeout,
203 final Object attachment,
204 final FutureCallback<AsyncClientEndpoint> callback) {
205 Args.notNull(host, "Host");
206 Args.notNull(timeout, "Timeout");
207 final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
208 final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
209 host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
210
211 @Override
212 public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
213 final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
214 final IOSession ioSession = poolEntry.getConnection();
215 if (ioSession != null && !ioSession.isOpen()) {
216 poolEntry.discardConnection(CloseMode.IMMEDIATE);
217 }
218 if (poolEntry.hasConnection()) {
219 resultFuture.completed(endpoint);
220 } else {
221 final Future<IOSession> future = requestSession(
222 host,
223 timeout,
224 new EndpointParameters(host, attachment),
225 new FutureCallback<IOSession>() {
226
227 @Override
228 public void completed(final IOSession session) {
229 session.setSocketTimeout(timeout);
230 poolEntry.assignConnection(session);
231 resultFuture.completed(endpoint);
232 }
233
234 @Override
235 public void failed(final Exception cause) {
236 try {
237 resultFuture.failed(cause);
238 } finally {
239 endpoint.releaseAndDiscard();
240 }
241 }
242
243 @Override
244 public void cancelled() {
245 try {
246 resultFuture.cancel();
247 } finally {
248 endpoint.releaseAndDiscard();
249 }
250 }
251
252 });
253 resultFuture.setDependency(future);
254 }
255 }
256
257 @Override
258 public void failed(final Exception ex) {
259 resultFuture.failed(ex);
260 }
261
262 @Override
263 public void cancelled() {
264 resultFuture.cancel();
265 }
266
267 });
268 resultFuture.setDependency(leaseFuture);
269 return resultFuture;
270 }
271
272 public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) {
273 return connect(host, timeout, null, null);
274 }
275
276
277
278
279 public void execute(
280 final HttpHost target,
281 final AsyncClientExchangeHandler exchangeHandler,
282 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
283 final Timeout timeout,
284 final HttpContext executeContext) {
285 Args.notNull(exchangeHandler, "Exchange handler");
286 Args.notNull(timeout, "Timeout");
287 Args.notNull(executeContext, "Context");
288 try {
289 exchangeHandler.produceRequest((request, entityDetails, requestContext) -> {
290 final HttpHost host = target != null ? target : defaultTarget(request);
291 if (request.getAuthority() == null) {
292 request.setAuthority(new URIAuthority(host.getHostName(), host.getPort()));
293 }
294 connect(host, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
295
296 @Override
297 public void completed(final AsyncClientEndpoint endpoint) {
298 endpoint.execute(new AsyncClientExchangeHandler() {
299
300 @Override
301 public void releaseResources() {
302 endpoint.releaseAndDiscard();
303 exchangeHandler.releaseResources();
304 }
305
306 @Override
307 public void failed(final Exception cause) {
308 endpoint.releaseAndDiscard();
309 exchangeHandler.failed(cause);
310 }
311
312 @Override
313 public void cancel() {
314 endpoint.releaseAndDiscard();
315 exchangeHandler.cancel();
316 }
317
318 @Override
319 public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
320 channel.sendRequest(request, entityDetails, httpContext);
321 }
322
323 @Override
324 public int available() {
325 return exchangeHandler.available();
326 }
327
328 @Override
329 public void produce(final DataStreamChannel channel) throws IOException {
330 exchangeHandler.produce(channel);
331 }
332
333 @Override
334 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
335 exchangeHandler.consumeInformation(response, httpContext);
336 }
337
338 @Override
339 public void consumeResponse(
340 final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
341 if (entityDetails == null) {
342 endpoint.releaseAndReuse();
343 }
344 exchangeHandler.consumeResponse(response, entityDetails, httpContext);
345 }
346
347 @Override
348 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
349 exchangeHandler.updateCapacity(capacityChannel);
350 }
351
352 @Override
353 public void consume(final ByteBuffer src) throws IOException {
354 exchangeHandler.consume(src);
355 }
356
357 @Override
358 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
359 endpoint.releaseAndReuse();
360 exchangeHandler.streamEnd(trailers);
361 }
362
363 }, pushHandlerFactory, executeContext);
364
365 }
366
367 @Override
368 public void failed(final Exception ex) {
369 exchangeHandler.failed(ex);
370 }
371
372 @Override
373 public void cancelled() {
374 exchangeHandler.cancel();
375 }
376
377 });
378
379 }, executeContext);
380
381 } catch (final IOException | HttpException ex) {
382 exchangeHandler.failed(ex);
383 }
384 }
385
386 public void execute(
387 final AsyncClientExchangeHandler exchangeHandler,
388 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
389 final Timeout timeout,
390 final HttpContext executeContext) {
391 execute(null, exchangeHandler, pushHandlerFactory, timeout, executeContext);
392 }
393
394 public void execute(
395 final AsyncClientExchangeHandler exchangeHandler,
396 final Timeout timeout,
397 final HttpContext executeContext) {
398 execute(exchangeHandler, null, timeout, executeContext);
399 }
400
401
402
403
404 public final <T> Future<T> execute(
405 final HttpHost target,
406 final AsyncRequestProducer requestProducer,
407 final AsyncResponseConsumer<T> responseConsumer,
408 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
409 final Timeout timeout,
410 final HttpContext context,
411 final FutureCallback<T> callback) {
412 Args.notNull(requestProducer, "Request producer");
413 Args.notNull(responseConsumer, "Response consumer");
414 Args.notNull(timeout, "Timeout");
415 final BasicFuture<T> future = new BasicFuture<>(callback);
416 final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(
417 requestProducer,
418 responseConsumer,
419 new FutureContribution<T>(future) {
420
421 @Override
422 public void completed(final T result) {
423 future.completed(result);
424 }
425
426 });
427 execute(target, exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
428 return future;
429 }
430
431 public final <T> Future<T> execute(
432 final AsyncRequestProducer requestProducer,
433 final AsyncResponseConsumer<T> responseConsumer,
434 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
435 final Timeout timeout,
436 final HttpContext context,
437 final FutureCallback<T> callback) {
438 return execute(null, requestProducer, responseConsumer, pushHandlerFactory, timeout, context, callback);
439 }
440
441 public final <T> Future<T> execute(
442 final AsyncRequestProducer requestProducer,
443 final AsyncResponseConsumer<T> responseConsumer,
444 final Timeout timeout,
445 final HttpContext context,
446 final FutureCallback<T> callback) {
447 return execute(requestProducer, responseConsumer, null, timeout, context, callback);
448 }
449
450
451
452
453 public final <T> Future<T> execute(
454 final HttpHost target,
455 final AsyncRequestProducer requestProducer,
456 final AsyncResponseConsumer<T> responseConsumer,
457 final Timeout timeout,
458 final FutureCallback<T> callback) {
459 return execute(target, requestProducer, responseConsumer, null, timeout, null, callback);
460 }
461
462 public final <T> Future<T> execute(
463 final AsyncRequestProducer requestProducer,
464 final AsyncResponseConsumer<T> responseConsumer,
465 final Timeout timeout,
466 final FutureCallback<T> callback) {
467 return execute(requestProducer, responseConsumer, null, timeout, null, callback);
468 }
469
470 protected void doTlsUpgrade(
471 final ProtocolIOSession ioSession,
472 final NamedEndpoint endpoint,
473 final FutureCallback<ProtocolIOSession> callback) {
474 if (tlsStrategy != null) {
475 tlsStrategy.upgrade(ioSession,
476 endpoint,
477 null,
478 handshakeTimeout,
479 new CallbackContribution<TransportSecurityLayer>(callback) {
480
481 @Override
482 public void completed(final TransportSecurityLayer transportSecurityLayer) {
483 if (callback != null) {
484 callback.completed(ioSession);
485 }
486 }
487
488 });
489 } else {
490 throw new IllegalStateException("TLS upgrade not supported");
491 }
492 }
493
494 private class InternalAsyncClientEndpoint extends AsyncClientEndpoint implements TlsUpgradeCapable {
495
496 final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
497
498 InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
499 this.poolEntryRef = new AtomicReference<>(poolEntry);
500 }
501
502 private IOSession getIOSession() {
503 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
504 if (poolEntry == null) {
505 throw new IllegalStateException("Endpoint has already been released");
506 }
507 final IOSession ioSession = poolEntry.getConnection();
508 if (ioSession == null) {
509 throw new IllegalStateException("I/O session is invalid");
510 }
511 return ioSession;
512 }
513
514 @Override
515 public void execute(
516 final AsyncClientExchangeHandler exchangeHandler,
517 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
518 final HttpContext context) {
519 final IOSession ioSession = getIOSession();
520 ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
521 if (!ioSession.isOpen()) {
522 try {
523 exchangeHandler.failed(new ConnectionClosedException());
524 } finally {
525 exchangeHandler.releaseResources();
526 }
527 }
528 }
529
530 @Override
531 public boolean isConnected() {
532 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
533 if (poolEntry != null) {
534 final IOSession ioSession = poolEntry.getConnection();
535 if (ioSession == null || !ioSession.isOpen()) {
536 return false;
537 }
538 final IOEventHandler handler = ioSession.getHandler();
539 return (handler instanceof HttpConnection) && ((HttpConnection) handler).isOpen();
540 }
541 return false;
542 }
543
544 @Override
545 public void releaseAndReuse() {
546 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
547 if (poolEntry != null) {
548 final IOSession ioSession = poolEntry.getConnection();
549 connPool.release(poolEntry, ioSession != null && ioSession.isOpen());
550 }
551 }
552
553 @Override
554 public void releaseAndDiscard() {
555 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
556 if (poolEntry != null) {
557 poolEntry.discardConnection(CloseMode.GRACEFUL);
558 connPool.release(poolEntry, false);
559 }
560 }
561
562 @Override
563 public void tlsUpgrade(final NamedEndpoint endpoint, final FutureCallback<ProtocolIOSession> callback) {
564 final IOSession ioSession = getIOSession();
565 if (ioSession instanceof ProtocolIOSession) {
566 doTlsUpgrade((ProtocolIOSession) ioSession, endpoint, callback);
567 } else {
568 throw new IllegalStateException("TLS upgrade not supported");
569 }
570 }
571 }
572
573 }