1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.net.InetSocketAddress;
31 import java.util.Set;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicLong;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import org.apache.hc.client5.http.DnsResolver;
41 import org.apache.hc.client5.http.HttpRoute;
42 import org.apache.hc.client5.http.SchemePortResolver;
43 import org.apache.hc.client5.http.impl.ConnPoolSupport;
44 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
45 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
46 import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
47 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
48 import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
49 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
50 import org.apache.hc.core5.annotation.Contract;
51 import org.apache.hc.core5.annotation.Internal;
52 import org.apache.hc.core5.annotation.ThreadingBehavior;
53 import org.apache.hc.core5.concurrent.BasicFuture;
54 import org.apache.hc.core5.concurrent.ComplexFuture;
55 import org.apache.hc.core5.concurrent.FutureCallback;
56 import org.apache.hc.core5.function.Callback;
57 import org.apache.hc.core5.http.HttpHost;
58 import org.apache.hc.core5.http.HttpVersion;
59 import org.apache.hc.core5.http.ProtocolVersion;
60 import org.apache.hc.core5.http.config.Lookup;
61 import org.apache.hc.core5.http.config.RegistryBuilder;
62 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
63 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
64 import org.apache.hc.core5.http.nio.HandlerFactory;
65 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
66 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
67 import org.apache.hc.core5.http.protocol.HttpContext;
68 import org.apache.hc.core5.http2.nio.command.PingCommand;
69 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
70 import org.apache.hc.core5.io.CloseMode;
71 import org.apache.hc.core5.pool.ConnPoolControl;
72 import org.apache.hc.core5.pool.LaxConnPool;
73 import org.apache.hc.core5.pool.ManagedConnPool;
74 import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
75 import org.apache.hc.core5.pool.PoolEntry;
76 import org.apache.hc.core5.pool.PoolReusePolicy;
77 import org.apache.hc.core5.pool.PoolStats;
78 import org.apache.hc.core5.pool.StrictConnPool;
79 import org.apache.hc.core5.reactor.Command;
80 import org.apache.hc.core5.reactor.ConnectionInitiator;
81 import org.apache.hc.core5.util.Args;
82 import org.apache.hc.core5.util.Identifiable;
83 import org.apache.hc.core5.util.TimeValue;
84 import org.apache.hc.core5.util.Timeout;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
107 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
108
109 private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
110
111 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
112 public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
113
114 private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
115 private final AsyncClientConnectionOperator connectionOperator;
116 private final AtomicBoolean closed;
117
118 private volatile TimeValue validateAfterInactivity;
119
120 public PoolingAsyncClientConnectionManager() {
121 this(RegistryBuilder.<TlsStrategy>create()
122 .register("https", DefaultClientTlsStrategy.getDefault())
123 .build());
124 }
125
126 public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
127 this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
128 }
129
130 public PoolingAsyncClientConnectionManager(
131 final Lookup<TlsStrategy> tlsStrategyLookup,
132 final PoolConcurrencyPolicy poolConcurrencyPolicy,
133 final TimeValue timeToLive) {
134 this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
135 }
136
137 public PoolingAsyncClientConnectionManager(
138 final Lookup<TlsStrategy> tlsStrategyLookup,
139 final PoolConcurrencyPolicy poolConcurrencyPolicy,
140 final PoolReusePolicy poolReusePolicy,
141 final TimeValue timeToLive) {
142 this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
143 }
144
145 public PoolingAsyncClientConnectionManager(
146 final Lookup<TlsStrategy> tlsStrategyLookup,
147 final PoolConcurrencyPolicy poolConcurrencyPolicy,
148 final PoolReusePolicy poolReusePolicy,
149 final TimeValue timeToLive,
150 final SchemePortResolver schemePortResolver,
151 final DnsResolver dnsResolver) {
152 this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
153 poolConcurrencyPolicy, poolReusePolicy, timeToLive);
154 }
155
156 @Internal
157 protected PoolingAsyncClientConnectionManager(
158 final AsyncClientConnectionOperator connectionOperator,
159 final PoolConcurrencyPolicy poolConcurrencyPolicy,
160 final PoolReusePolicy poolReusePolicy,
161 final TimeValue timeToLive) {
162 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
163 switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
164 case STRICT:
165 this.pool = new StrictConnPool<>(
166 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
167 DEFAULT_MAX_TOTAL_CONNECTIONS,
168 timeToLive,
169 poolReusePolicy,
170 null);
171 break;
172 case LAX:
173 this.pool = new LaxConnPool<>(
174 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
175 timeToLive,
176 poolReusePolicy,
177 null);
178 break;
179 default:
180 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
181 }
182 this.closed = new AtomicBoolean(false);
183 }
184
185 @Internal
186 protected PoolingAsyncClientConnectionManager(
187 final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
188 final AsyncClientConnectionOperator connectionOperator) {
189 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
190 this.pool = Args.notNull(pool, "Connection pool");
191 this.closed = new AtomicBoolean(false);
192 }
193
194 @Override
195 public void close() {
196 close(CloseMode.GRACEFUL);
197 }
198
199 @Override
200 public void close(final CloseMode closeMode) {
201 if (this.closed.compareAndSet(false, true)) {
202 if (LOG.isDebugEnabled()) {
203 LOG.debug("Shutdown connection pool {}", closeMode);
204 }
205 this.pool.close(closeMode);
206 LOG.debug("Connection pool shut down");
207 }
208 }
209
210 private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
211 if (endpoint instanceof InternalConnectionEndpoint) {
212 return (InternalConnectionEndpoint) endpoint;
213 }
214 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
215 }
216
217 @Override
218 public Future<AsyncConnectionEndpoint> lease(
219 final String id,
220 final HttpRoute route,
221 final Object state,
222 final Timeout requestTimeout,
223 final FutureCallback<AsyncConnectionEndpoint> callback) {
224 if (LOG.isDebugEnabled()) {
225 LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
226 }
227 return new Future<AsyncConnectionEndpoint>() {
228
229 final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
230
231 final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
232 route,
233 state,
234 requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
235
236 @Override
237 public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
238 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
239 if (connection != null) {
240 if (connection.isOpen()) {
241 final ProtocolVersion protocolVersion = connection.getProtocolVersion();
242 if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
243 final TimeValue timeValue = PoolingAsyncClientConnectionManager.this.validateAfterInactivity;
244 if (TimeValue.isNonNegative(timeValue) &&
245 poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
246 connection.submitCommand(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
247
248 @Override
249 public void execute(final Boolean result) {
250 if (result == null || !result) {
251 if (LOG.isDebugEnabled()) {
252 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
253 }
254 poolEntry.discardConnection(CloseMode.IMMEDIATE);
255 }
256 leaseCompleted(poolEntry);
257 }
258
259 })), Command.Priority.IMMEDIATE);
260 return;
261 }
262 }
263 } else {
264 if (LOG.isDebugEnabled()) {
265 LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
266 }
267 poolEntry.discardConnection(CloseMode.IMMEDIATE);
268 }
269 }
270 leaseCompleted(poolEntry);
271 }
272
273 void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
274 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
275 if (connection != null) {
276 connection.activate();
277 }
278 if (LOG.isDebugEnabled()) {
279 LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
280 }
281 final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
282 if (LOG.isDebugEnabled()) {
283 LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
284 }
285 resultFuture.completed(endpoint);
286 }
287
288 @Override
289 public void failed(final Exception ex) {
290 if (LOG.isDebugEnabled()) {
291 LOG.debug("{} endpoint lease failed", id);
292 }
293 resultFuture.failed(ex);
294 }
295
296 @Override
297 public void cancelled() {
298 if (LOG.isDebugEnabled()) {
299 LOG.debug("{} endpoint lease cancelled", id);
300 }
301 resultFuture.cancel();
302 }
303
304 });
305
306 @Override
307 public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
308 return resultFuture.get();
309 }
310
311 @Override
312 public AsyncConnectionEndpoint get(
313 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
314 return resultFuture.get(timeout, unit);
315 }
316
317 @Override
318 public boolean cancel(final boolean mayInterruptIfRunning) {
319 return leaseFuture.cancel(mayInterruptIfRunning);
320 }
321
322 @Override
323 public boolean isDone() {
324 return resultFuture.isDone();
325 }
326
327 @Override
328 public boolean isCancelled() {
329 return resultFuture.isCancelled();
330 }
331
332 };
333 }
334
335 @Override
336 public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
337 Args.notNull(endpoint, "Managed endpoint");
338 Args.notNull(keepAlive, "Keep-alive time");
339 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
340 if (entry == null) {
341 return;
342 }
343 if (LOG.isDebugEnabled()) {
344 LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
345 }
346 final ManagedAsyncClientConnection connection = entry.getConnection();
347 boolean reusable = connection != null && connection.isOpen();
348 try {
349 if (reusable) {
350 entry.updateState(state);
351 entry.updateExpiry(keepAlive);
352 connection.passivate();
353 if (LOG.isDebugEnabled()) {
354 final String s;
355 if (TimeValue.isPositive(keepAlive)) {
356 s = "for " + keepAlive;
357 } else {
358 s = "indefinitely";
359 }
360 LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
361 }
362 }
363 } catch (final RuntimeException ex) {
364 reusable = false;
365 throw ex;
366 } finally {
367 pool.release(entry, reusable);
368 if (LOG.isDebugEnabled()) {
369 LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
370 }
371 }
372 }
373
374 @Override
375 public Future<AsyncConnectionEndpoint> connect(
376 final AsyncConnectionEndpoint endpoint,
377 final ConnectionInitiator connectionInitiator,
378 final Timeout connectTimeout,
379 final Object attachment,
380 final HttpContext context,
381 final FutureCallback<AsyncConnectionEndpoint> callback) {
382 Args.notNull(endpoint, "Endpoint");
383 Args.notNull(connectionInitiator, "Connection initiator");
384 Args.notNull(connectTimeout, "Timeout");
385 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
386 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
387 if (internalEndpoint.isConnected()) {
388 resultFuture.completed(endpoint);
389 return resultFuture;
390 }
391 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
392 final HttpRoute route = poolEntry.getRoute();
393 final HttpHost host;
394 if (route.getProxyHost() != null) {
395 host = route.getProxyHost();
396 } else {
397 host = route.getTargetHost();
398 }
399 final InetSocketAddress localAddress = route.getLocalSocketAddress();
400 if (LOG.isDebugEnabled()) {
401 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
402 }
403 final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
404 connectionInitiator, host, localAddress, connectTimeout, attachment, new FutureCallback<ManagedAsyncClientConnection>() {
405
406 @Override
407 public void completed(final ManagedAsyncClientConnection connection) {
408 try {
409 if (LOG.isDebugEnabled()) {
410 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
411 }
412 poolEntry.assignConnection(connection);
413 resultFuture.completed(internalEndpoint);
414 } catch (final RuntimeException ex) {
415 resultFuture.failed(ex);
416 }
417 }
418
419 @Override
420 public void failed(final Exception ex) {
421 resultFuture.failed(ex);
422 }
423
424 @Override
425 public void cancelled() {
426 resultFuture.cancel();
427 }
428
429 });
430 resultFuture.setDependency(connectFuture);
431 return resultFuture;
432 }
433
434 @Override
435 public void upgrade(
436 final AsyncConnectionEndpoint endpoint,
437 final Object attachment,
438 final HttpContext context) {
439 Args.notNull(endpoint, "Managed endpoint");
440 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
441 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
442 final HttpRoute route = poolEntry.getRoute();
443 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
444 connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), attachment);
445 if (LOG.isDebugEnabled()) {
446 LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
447 }
448 }
449
450 @Override
451 public Set<HttpRoute> getRoutes() {
452 return pool.getRoutes();
453 }
454
455 @Override
456 public void setMaxTotal(final int max) {
457 pool.setMaxTotal(max);
458 }
459
460 @Override
461 public int getMaxTotal() {
462 return pool.getMaxTotal();
463 }
464
465 @Override
466 public void setDefaultMaxPerRoute(final int max) {
467 pool.setDefaultMaxPerRoute(max);
468 }
469
470 @Override
471 public int getDefaultMaxPerRoute() {
472 return pool.getDefaultMaxPerRoute();
473 }
474
475 @Override
476 public void setMaxPerRoute(final HttpRoute route, final int max) {
477 pool.setMaxPerRoute(route, max);
478 }
479
480 @Override
481 public int getMaxPerRoute(final HttpRoute route) {
482 return pool.getMaxPerRoute(route);
483 }
484
485 @Override
486 public void closeIdle(final TimeValue idletime) {
487 pool.closeIdle(idletime);
488 }
489
490 @Override
491 public void closeExpired() {
492 pool.closeExpired();
493 }
494
495 @Override
496 public PoolStats getTotalStats() {
497 return pool.getTotalStats();
498 }
499
500 @Override
501 public PoolStats getStats(final HttpRoute route) {
502 return pool.getStats(route);
503 }
504
505 public TimeValue getValidateAfterInactivity() {
506 return validateAfterInactivity;
507 }
508
509
510
511
512
513
514
515
516 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
517 this.validateAfterInactivity = validateAfterInactivity;
518 }
519
520 private static final AtomicLong COUNT = new AtomicLong(0);
521
522 class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements Identifiable {
523
524 private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
525 private final String id;
526
527 InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
528 this.poolEntryRef = new AtomicReference<>(poolEntry);
529 this.id = String.format("ep-%010d", COUNT.getAndIncrement());
530 }
531
532 @Override
533 public String getId() {
534 return id;
535 }
536
537 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
538 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
539 if (poolEntry == null) {
540 throw new ConnectionShutdownException();
541 }
542 return poolEntry;
543 }
544
545 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
546 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
547 if (poolEntry.getConnection() == null) {
548 throw new ConnectionShutdownException();
549 }
550 return poolEntry;
551 }
552
553 PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
554 return poolEntryRef.getAndSet(null);
555 }
556
557 @Override
558 public void close(final CloseMode closeMode) {
559 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
560 if (poolEntry != null) {
561 if (LOG.isDebugEnabled()) {
562 LOG.debug("{} close {}", id, closeMode);
563 }
564 poolEntry.discardConnection(closeMode);
565 }
566 }
567
568 @Override
569 public boolean isConnected() {
570 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
571 if (poolEntry == null) {
572 return false;
573 }
574 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
575 if (connection == null) {
576 return false;
577 }
578 if (!connection.isOpen()) {
579 poolEntry.discardConnection(CloseMode.GRACEFUL);
580 return false;
581 }
582 return true;
583 }
584
585 @Override
586 public void setSocketTimeout(final Timeout timeout) {
587 getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
588 }
589
590 @Override
591 public void execute(
592 final String exchangeId,
593 final AsyncClientExchangeHandler exchangeHandler,
594 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
595 final HttpContext context) {
596 final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
597 if (LOG.isDebugEnabled()) {
598 LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
599 }
600 connection.submitCommand(
601 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
602 Command.Priority.NORMAL);
603 }
604
605 }
606
607 }