1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.net.InetSocketAddress;
31 import java.util.Set;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicLong;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.hc.client5.http.DnsResolver;
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.impl.ConnPoolSupport;
41 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
42 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
43 import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
44 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
45 import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
46 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
47 import org.apache.hc.core5.annotation.Contract;
48 import org.apache.hc.core5.annotation.Internal;
49 import org.apache.hc.core5.annotation.ThreadingBehavior;
50 import org.apache.hc.core5.concurrent.ComplexFuture;
51 import org.apache.hc.core5.concurrent.FutureCallback;
52 import org.apache.hc.core5.function.Callback;
53 import org.apache.hc.core5.http.HttpHost;
54 import org.apache.hc.core5.http.HttpVersion;
55 import org.apache.hc.core5.http.ProtocolVersion;
56 import org.apache.hc.core5.http.config.Lookup;
57 import org.apache.hc.core5.http.config.RegistryBuilder;
58 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
59 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
60 import org.apache.hc.core5.http.nio.HandlerFactory;
61 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
62 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
63 import org.apache.hc.core5.http.protocol.HttpContext;
64 import org.apache.hc.core5.http2.nio.command.PingCommand;
65 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
66 import org.apache.hc.core5.io.CloseMode;
67 import org.apache.hc.core5.pool.ConnPoolControl;
68 import org.apache.hc.core5.pool.LaxConnPool;
69 import org.apache.hc.core5.pool.ManagedConnPool;
70 import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
71 import org.apache.hc.core5.pool.PoolEntry;
72 import org.apache.hc.core5.pool.PoolReusePolicy;
73 import org.apache.hc.core5.pool.PoolStats;
74 import org.apache.hc.core5.pool.StrictConnPool;
75 import org.apache.hc.core5.reactor.Command;
76 import org.apache.hc.core5.reactor.ConnectionInitiator;
77 import org.apache.hc.core5.util.Args;
78 import org.apache.hc.core5.util.Asserts;
79 import org.apache.hc.core5.util.Identifiable;
80 import org.apache.hc.core5.util.TimeValue;
81 import org.apache.hc.core5.util.Timeout;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
104 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
105
106 private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
107
108 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
109 public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
110
111 private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
112 private final AsyncClientConnectionOperator connectionOperator;
113 private final AtomicBoolean closed;
114
115 private volatile TimeValue validateAfterInactivity;
116
117 public PoolingAsyncClientConnectionManager() {
118 this(RegistryBuilder.<TlsStrategy>create()
119 .register("https", DefaultClientTlsStrategy.getDefault())
120 .build());
121 }
122
123 public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
124 this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
125 }
126
127 public PoolingAsyncClientConnectionManager(
128 final Lookup<TlsStrategy> tlsStrategyLookup,
129 final PoolConcurrencyPolicy poolConcurrencyPolicy,
130 final TimeValue timeToLive) {
131 this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
132 }
133
134 public PoolingAsyncClientConnectionManager(
135 final Lookup<TlsStrategy> tlsStrategyLookup,
136 final PoolConcurrencyPolicy poolConcurrencyPolicy,
137 final PoolReusePolicy poolReusePolicy,
138 final TimeValue timeToLive) {
139 this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
140 }
141
142 public PoolingAsyncClientConnectionManager(
143 final Lookup<TlsStrategy> tlsStrategyLookup,
144 final PoolConcurrencyPolicy poolConcurrencyPolicy,
145 final PoolReusePolicy poolReusePolicy,
146 final TimeValue timeToLive,
147 final SchemePortResolver schemePortResolver,
148 final DnsResolver dnsResolver) {
149 this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
150 poolConcurrencyPolicy, poolReusePolicy, timeToLive);
151 }
152
153 @Internal
154 protected PoolingAsyncClientConnectionManager(
155 final AsyncClientConnectionOperator connectionOperator,
156 final PoolConcurrencyPolicy poolConcurrencyPolicy,
157 final PoolReusePolicy poolReusePolicy,
158 final TimeValue timeToLive) {
159 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
160 switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
161 case STRICT:
162 this.pool = new StrictConnPool<>(
163 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
164 DEFAULT_MAX_TOTAL_CONNECTIONS,
165 timeToLive,
166 poolReusePolicy,
167 null);
168 break;
169 case LAX:
170 this.pool = new LaxConnPool<>(
171 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
172 timeToLive,
173 poolReusePolicy,
174 null);
175 break;
176 default:
177 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
178 }
179 this.closed = new AtomicBoolean(false);
180 }
181
182 @Internal
183 protected PoolingAsyncClientConnectionManager(
184 final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
185 final AsyncClientConnectionOperator connectionOperator) {
186 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
187 this.pool = Args.notNull(pool, "Connection pool");
188 this.closed = new AtomicBoolean(false);
189 }
190
191 @Override
192 public void close() {
193 close(CloseMode.GRACEFUL);
194 }
195
196 @Override
197 public void close(final CloseMode closeMode) {
198 if (this.closed.compareAndSet(false, true)) {
199 if (LOG.isDebugEnabled()) {
200 LOG.debug("Shutdown connection pool {}", closeMode);
201 }
202 this.pool.close(closeMode);
203 LOG.debug("Connection pool shut down");
204 }
205 }
206
207 private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
208 if (endpoint instanceof InternalConnectionEndpoint) {
209 return (InternalConnectionEndpoint) endpoint;
210 }
211 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
212 }
213
214 @Override
215 public Future<AsyncConnectionEndpoint> lease(
216 final String id,
217 final HttpRoute route,
218 final Object state,
219 final Timeout requestTimeout,
220 final FutureCallback<AsyncConnectionEndpoint> callback) {
221 if (LOG.isDebugEnabled()) {
222 LOG.debug("{}: endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
223 }
224 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
225 final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
226 route, state, requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
227
228 void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
229 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
230 if (connection != null) {
231 connection.activate();
232 }
233 if (LOG.isDebugEnabled()) {
234 LOG.debug("{}: endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
235 }
236 final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
237 if (LOG.isDebugEnabled()) {
238 LOG.debug("{}: acquired {}", id, ConnPoolSupport.getId(endpoint));
239 }
240 resultFuture.completed(endpoint);
241 }
242
243 @Override
244 public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
245 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
246 final TimeValue timeValue = PoolingAsyncClientConnectionManager.this.validateAfterInactivity;
247 if (TimeValue.isNonNegative(timeValue) && connection != null &&
248 poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
249 final ProtocolVersion protocolVersion = connection.getProtocolVersion();
250 if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
251 connection.submitCommand(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
252
253 @Override
254 public void execute(final Boolean result) {
255 if (result == null || !result) {
256 if (LOG.isDebugEnabled()) {
257 LOG.debug("{}: connection {} is stale", id, ConnPoolSupport.getId(connection));
258 }
259 poolEntry.discardConnection(CloseMode.IMMEDIATE);
260 }
261 leaseCompleted(poolEntry);
262 }
263
264 })), Command.Priority.IMMEDIATE);
265 } else {
266 if (!connection.isOpen()) {
267 if (LOG.isDebugEnabled()) {
268 LOG.debug("{}: connection {} is closed", id, ConnPoolSupport.getId(connection));
269 }
270 poolEntry.discardConnection(CloseMode.IMMEDIATE);
271 }
272 leaseCompleted(poolEntry);
273 }
274 } else {
275 leaseCompleted(poolEntry);
276 }
277 }
278
279 @Override
280 public void failed(final Exception ex) {
281 if (LOG.isDebugEnabled()) {
282 LOG.debug("{}: endpoint lease failed", id);
283 }
284 resultFuture.failed(ex);
285 }
286
287 @Override
288 public void cancelled() {
289 if (LOG.isDebugEnabled()) {
290 LOG.debug("{}: endpoint lease cancelled", id);
291 }
292 resultFuture.cancel();
293 }
294
295 });
296
297 resultFuture.setDependency(leaseFuture);
298 return resultFuture;
299 }
300
301 @Override
302 public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
303 Args.notNull(endpoint, "Managed endpoint");
304 Args.notNull(keepAlive, "Keep-alive time");
305 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
306 if (entry == null) {
307 return;
308 }
309 if (LOG.isDebugEnabled()) {
310 LOG.debug("{}: releasing endpoint", ConnPoolSupport.getId(endpoint));
311 }
312 final ManagedAsyncClientConnection connection = entry.getConnection();
313 boolean reusable = connection != null && connection.isOpen();
314 try {
315 if (reusable) {
316 entry.updateState(state);
317 entry.updateExpiry(keepAlive);
318 connection.passivate();
319 if (LOG.isDebugEnabled()) {
320 final String s;
321 if (TimeValue.isPositive(keepAlive)) {
322 s = "for " + keepAlive;
323 } else {
324 s = "indefinitely";
325 }
326 LOG.debug("{}: connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
327 }
328 }
329 } catch (final RuntimeException ex) {
330 reusable = false;
331 throw ex;
332 } finally {
333 pool.release(entry, reusable);
334 if (LOG.isDebugEnabled()) {
335 LOG.debug("{}: connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
336 }
337 }
338 }
339
340 @Override
341 public Future<AsyncConnectionEndpoint> connect(
342 final AsyncConnectionEndpoint endpoint,
343 final ConnectionInitiator connectionInitiator,
344 final Timeout connectTimeout,
345 final Object attachment,
346 final HttpContext context,
347 final FutureCallback<AsyncConnectionEndpoint> callback) {
348 Args.notNull(endpoint, "Endpoint");
349 Args.notNull(connectionInitiator, "Connection initiator");
350 Args.notNull(connectTimeout, "Timeout");
351 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
352 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
353 if (internalEndpoint.isConnected()) {
354 resultFuture.completed(endpoint);
355 return resultFuture;
356 }
357 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
358 final HttpRoute route = poolEntry.getRoute();
359 final HttpHost host;
360 if (route.getProxyHost() != null) {
361 host = route.getProxyHost();
362 } else {
363 host = route.getTargetHost();
364 }
365 final InetSocketAddress localAddress = route.getLocalSocketAddress();
366 if (LOG.isDebugEnabled()) {
367 LOG.debug("{}: connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
368 }
369 final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
370 connectionInitiator, host, localAddress, connectTimeout, attachment, new FutureCallback<ManagedAsyncClientConnection>() {
371
372 @Override
373 public void completed(final ManagedAsyncClientConnection connection) {
374 try {
375 if (LOG.isDebugEnabled()) {
376 LOG.debug("{}: connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
377 }
378 poolEntry.assignConnection(connection);
379 resultFuture.completed(internalEndpoint);
380 } catch (final RuntimeException ex) {
381 resultFuture.failed(ex);
382 }
383 }
384
385 @Override
386 public void failed(final Exception ex) {
387 resultFuture.failed(ex);
388 }
389
390 @Override
391 public void cancelled() {
392 resultFuture.cancel();
393 }
394
395 });
396 resultFuture.setDependency(connectFuture);
397 return resultFuture;
398 }
399
400 @Override
401 public void upgrade(
402 final AsyncConnectionEndpoint endpoint,
403 final Object attachment,
404 final HttpContext context) {
405 Args.notNull(endpoint, "Managed endpoint");
406 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
407 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
408 final HttpRoute route = poolEntry.getRoute();
409 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
410 connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), attachment);
411 if (LOG.isDebugEnabled()) {
412 LOG.debug("{}: upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
413 }
414 }
415
416 @Override
417 public Set<HttpRoute> getRoutes() {
418 return pool.getRoutes();
419 }
420
421 @Override
422 public void setMaxTotal(final int max) {
423 pool.setMaxTotal(max);
424 }
425
426 @Override
427 public int getMaxTotal() {
428 return pool.getMaxTotal();
429 }
430
431 @Override
432 public void setDefaultMaxPerRoute(final int max) {
433 pool.setDefaultMaxPerRoute(max);
434 }
435
436 @Override
437 public int getDefaultMaxPerRoute() {
438 return pool.getDefaultMaxPerRoute();
439 }
440
441 @Override
442 public void setMaxPerRoute(final HttpRoute route, final int max) {
443 pool.setMaxPerRoute(route, max);
444 }
445
446 @Override
447 public int getMaxPerRoute(final HttpRoute route) {
448 return pool.getMaxPerRoute(route);
449 }
450
451 @Override
452 public void closeIdle(final TimeValue idletime) {
453 pool.closeIdle(idletime);
454 }
455
456 @Override
457 public void closeExpired() {
458 pool.closeExpired();
459 }
460
461 @Override
462 public PoolStats getTotalStats() {
463 return pool.getTotalStats();
464 }
465
466 @Override
467 public PoolStats getStats(final HttpRoute route) {
468 return pool.getStats(route);
469 }
470
471 public TimeValue getValidateAfterInactivity() {
472 return validateAfterInactivity;
473 }
474
475
476
477
478
479
480
481
482 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
483 this.validateAfterInactivity = validateAfterInactivity;
484 }
485
486 private static final AtomicLong COUNT = new AtomicLong(0);
487
488 class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements Identifiable {
489
490 private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
491 private final String id;
492
493 InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
494 this.poolEntryRef = new AtomicReference<>(poolEntry);
495 this.id = String.format("ep-%08X", COUNT.getAndIncrement());
496 }
497
498 @Override
499 public String getId() {
500 return id;
501 }
502
503 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
504 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
505 if (poolEntry == null) {
506 throw new ConnectionShutdownException();
507 }
508 return poolEntry;
509 }
510
511 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
512 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
513 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
514 Asserts.check(connection != null && connection.isOpen(), "Endpoint is not connected");
515 return poolEntry;
516 }
517
518 PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
519 return poolEntryRef.getAndSet(null);
520 }
521
522 @Override
523 public void close(final CloseMode closeMode) {
524 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
525 if (poolEntry != null) {
526 if (LOG.isDebugEnabled()) {
527 LOG.debug("{}: close {}", id, closeMode);
528 }
529 poolEntry.discardConnection(closeMode);
530 }
531 }
532
533 @Override
534 public boolean isConnected() {
535 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
536 if (poolEntry == null) {
537 return false;
538 }
539 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
540 if (connection == null) {
541 return false;
542 }
543 if (!connection.isOpen()) {
544 poolEntry.discardConnection(CloseMode.IMMEDIATE);
545 return false;
546 }
547 return true;
548 }
549
550 @Override
551 public void setSocketTimeout(final Timeout timeout) {
552 getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
553 }
554
555 @Override
556 public void execute(
557 final String exchangeId,
558 final AsyncClientExchangeHandler exchangeHandler,
559 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
560 final HttpContext context) {
561 final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
562 if (LOG.isDebugEnabled()) {
563 LOG.debug("{}: executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
564 }
565 connection.submitCommand(
566 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
567 Command.Priority.NORMAL);
568 }
569
570 }
571
572 }