1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.net.InetSocketAddress;
31 import java.util.Set;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicLong;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.hc.client5.http.DnsResolver;
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.impl.ConnPoolSupport;
41 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
42 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
43 import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
44 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
45 import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
46 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
47 import org.apache.hc.core5.annotation.Contract;
48 import org.apache.hc.core5.annotation.Internal;
49 import org.apache.hc.core5.annotation.ThreadingBehavior;
50 import org.apache.hc.core5.concurrent.ComplexFuture;
51 import org.apache.hc.core5.concurrent.FutureCallback;
52 import org.apache.hc.core5.function.Callback;
53 import org.apache.hc.core5.http.HttpHost;
54 import org.apache.hc.core5.http.HttpVersion;
55 import org.apache.hc.core5.http.ProtocolVersion;
56 import org.apache.hc.core5.http.config.Lookup;
57 import org.apache.hc.core5.http.config.RegistryBuilder;
58 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
59 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
60 import org.apache.hc.core5.http.nio.HandlerFactory;
61 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
62 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
63 import org.apache.hc.core5.http.protocol.HttpContext;
64 import org.apache.hc.core5.http2.nio.command.PingCommand;
65 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
66 import org.apache.hc.core5.io.CloseMode;
67 import org.apache.hc.core5.pool.ConnPoolControl;
68 import org.apache.hc.core5.pool.LaxConnPool;
69 import org.apache.hc.core5.pool.ManagedConnPool;
70 import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
71 import org.apache.hc.core5.pool.PoolEntry;
72 import org.apache.hc.core5.pool.PoolReusePolicy;
73 import org.apache.hc.core5.pool.PoolStats;
74 import org.apache.hc.core5.pool.StrictConnPool;
75 import org.apache.hc.core5.reactor.Command;
76 import org.apache.hc.core5.reactor.ConnectionInitiator;
77 import org.apache.hc.core5.util.Args;
78 import org.apache.hc.core5.util.Asserts;
79 import org.apache.hc.core5.util.Identifiable;
80 import org.apache.hc.core5.util.TimeValue;
81 import org.apache.hc.core5.util.Timeout;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
104 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
105
106 private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
107
108 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
109 public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
110
111 private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
112 private final AsyncClientConnectionOperator connectionOperator;
113 private final AtomicBoolean closed;
114
115 private volatile TimeValue validateAfterInactivity;
116
117 public PoolingAsyncClientConnectionManager() {
118 this(RegistryBuilder.<TlsStrategy>create()
119 .register("https", DefaultClientTlsStrategy.getDefault())
120 .build());
121 }
122
123 public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
124 this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
125 }
126
127 public PoolingAsyncClientConnectionManager(
128 final Lookup<TlsStrategy> tlsStrategyLookup,
129 final PoolConcurrencyPolicy poolConcurrencyPolicy,
130 final TimeValue timeToLive) {
131 this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
132 }
133
134 public PoolingAsyncClientConnectionManager(
135 final Lookup<TlsStrategy> tlsStrategyLookup,
136 final PoolConcurrencyPolicy poolConcurrencyPolicy,
137 final PoolReusePolicy poolReusePolicy,
138 final TimeValue timeToLive) {
139 this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
140 }
141
142 public PoolingAsyncClientConnectionManager(
143 final Lookup<TlsStrategy> tlsStrategyLookup,
144 final PoolConcurrencyPolicy poolConcurrencyPolicy,
145 final PoolReusePolicy poolReusePolicy,
146 final TimeValue timeToLive,
147 final SchemePortResolver schemePortResolver,
148 final DnsResolver dnsResolver) {
149 this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
150 poolConcurrencyPolicy, poolReusePolicy, timeToLive);
151 }
152
153 @Internal
154 protected PoolingAsyncClientConnectionManager(
155 final AsyncClientConnectionOperator connectionOperator,
156 final PoolConcurrencyPolicy poolConcurrencyPolicy,
157 final PoolReusePolicy poolReusePolicy,
158 final TimeValue timeToLive) {
159 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
160 switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
161 case STRICT:
162 this.pool = new StrictConnPool<>(
163 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
164 DEFAULT_MAX_TOTAL_CONNECTIONS,
165 timeToLive,
166 poolReusePolicy,
167 null);
168 break;
169 case LAX:
170 this.pool = new LaxConnPool<>(
171 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
172 timeToLive,
173 poolReusePolicy,
174 null);
175 break;
176 default:
177 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
178 }
179 this.closed = new AtomicBoolean(false);
180 }
181
182 @Internal
183 protected PoolingAsyncClientConnectionManager(
184 final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
185 final AsyncClientConnectionOperator connectionOperator) {
186 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
187 this.pool = Args.notNull(pool, "Connection pool");
188 this.closed = new AtomicBoolean(false);
189 }
190
191 @Override
192 public void close() {
193 close(CloseMode.GRACEFUL);
194 }
195
196 @Override
197 public void close(final CloseMode closeMode) {
198 if (this.closed.compareAndSet(false, true)) {
199 if (LOG.isDebugEnabled()) {
200 LOG.debug("Shutdown connection pool {}", closeMode);
201 }
202 this.pool.close(closeMode);
203 LOG.debug("Connection pool shut down");
204 }
205 }
206
207 private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
208 if (endpoint instanceof InternalConnectionEndpoint) {
209 return (InternalConnectionEndpoint) endpoint;
210 }
211 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
212 }
213
214 @Override
215 public Future<AsyncConnectionEndpoint> lease(
216 final String id,
217 final HttpRoute route,
218 final Object state,
219 final Timeout requestTimeout,
220 final FutureCallback<AsyncConnectionEndpoint> callback) {
221 if (LOG.isDebugEnabled()) {
222 LOG.debug("{}: endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
223 }
224 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
225 final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
226 route, state, requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
227
228 void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
229 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
230 if (connection != null) {
231 connection.activate();
232 }
233 if (LOG.isDebugEnabled()) {
234 LOG.debug("{}: endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
235 }
236 final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
237 if (LOG.isDebugEnabled()) {
238 LOG.debug("{}: acquired {}", id, ConnPoolSupport.getId(endpoint));
239 }
240 resultFuture.completed(endpoint);
241 }
242
243 @Override
244 public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
245 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
246 if (connection != null) {
247 if (connection.isOpen()) {
248 final ProtocolVersion protocolVersion = connection.getProtocolVersion();
249 if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
250 final TimeValue timeValue = PoolingAsyncClientConnectionManager.this.validateAfterInactivity;
251 if (TimeValue.isNonNegative(timeValue) &&
252 poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
253 connection.submitCommand(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
254
255 @Override
256 public void execute(final Boolean result) {
257 if (result == null || !result) {
258 if (LOG.isDebugEnabled()) {
259 LOG.debug("{}: connection {} is stale", id, ConnPoolSupport.getId(connection));
260 }
261 poolEntry.discardConnection(CloseMode.IMMEDIATE);
262 }
263 leaseCompleted(poolEntry);
264 }
265
266 })), Command.Priority.IMMEDIATE);
267 return;
268 }
269 }
270 } else {
271 if (LOG.isDebugEnabled()) {
272 LOG.debug("{}: connection {} is closed", id, ConnPoolSupport.getId(connection));
273 }
274 poolEntry.discardConnection(CloseMode.IMMEDIATE);
275 }
276 }
277 leaseCompleted(poolEntry);
278 }
279
280 @Override
281 public void failed(final Exception ex) {
282 if (LOG.isDebugEnabled()) {
283 LOG.debug("{}: endpoint lease failed", id);
284 }
285 resultFuture.failed(ex);
286 }
287
288 @Override
289 public void cancelled() {
290 if (LOG.isDebugEnabled()) {
291 LOG.debug("{}: endpoint lease cancelled", id);
292 }
293 resultFuture.cancel();
294 }
295
296 });
297
298 resultFuture.setDependency(leaseFuture);
299 return resultFuture;
300 }
301
302 @Override
303 public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
304 Args.notNull(endpoint, "Managed endpoint");
305 Args.notNull(keepAlive, "Keep-alive time");
306 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
307 if (entry == null) {
308 return;
309 }
310 if (LOG.isDebugEnabled()) {
311 LOG.debug("{}: releasing endpoint", ConnPoolSupport.getId(endpoint));
312 }
313 final ManagedAsyncClientConnection connection = entry.getConnection();
314 boolean reusable = connection != null && connection.isOpen();
315 try {
316 if (reusable) {
317 entry.updateState(state);
318 entry.updateExpiry(keepAlive);
319 connection.passivate();
320 if (LOG.isDebugEnabled()) {
321 final String s;
322 if (TimeValue.isPositive(keepAlive)) {
323 s = "for " + keepAlive;
324 } else {
325 s = "indefinitely";
326 }
327 LOG.debug("{}: connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
328 }
329 }
330 } catch (final RuntimeException ex) {
331 reusable = false;
332 throw ex;
333 } finally {
334 pool.release(entry, reusable);
335 if (LOG.isDebugEnabled()) {
336 LOG.debug("{}: connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
337 }
338 }
339 }
340
341 @Override
342 public Future<AsyncConnectionEndpoint> connect(
343 final AsyncConnectionEndpoint endpoint,
344 final ConnectionInitiator connectionInitiator,
345 final Timeout connectTimeout,
346 final Object attachment,
347 final HttpContext context,
348 final FutureCallback<AsyncConnectionEndpoint> callback) {
349 Args.notNull(endpoint, "Endpoint");
350 Args.notNull(connectionInitiator, "Connection initiator");
351 Args.notNull(connectTimeout, "Timeout");
352 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
353 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
354 if (internalEndpoint.isConnected()) {
355 resultFuture.completed(endpoint);
356 return resultFuture;
357 }
358 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
359 final HttpRoute route = poolEntry.getRoute();
360 final HttpHost host;
361 if (route.getProxyHost() != null) {
362 host = route.getProxyHost();
363 } else {
364 host = route.getTargetHost();
365 }
366 final InetSocketAddress localAddress = route.getLocalSocketAddress();
367 if (LOG.isDebugEnabled()) {
368 LOG.debug("{}: connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
369 }
370 final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
371 connectionInitiator, host, localAddress, connectTimeout, attachment, new FutureCallback<ManagedAsyncClientConnection>() {
372
373 @Override
374 public void completed(final ManagedAsyncClientConnection connection) {
375 try {
376 if (LOG.isDebugEnabled()) {
377 LOG.debug("{}: connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
378 }
379 poolEntry.assignConnection(connection);
380 resultFuture.completed(internalEndpoint);
381 } catch (final RuntimeException ex) {
382 resultFuture.failed(ex);
383 }
384 }
385
386 @Override
387 public void failed(final Exception ex) {
388 resultFuture.failed(ex);
389 }
390
391 @Override
392 public void cancelled() {
393 resultFuture.cancel();
394 }
395
396 });
397 resultFuture.setDependency(connectFuture);
398 return resultFuture;
399 }
400
401 @Override
402 public void upgrade(
403 final AsyncConnectionEndpoint endpoint,
404 final Object attachment,
405 final HttpContext context) {
406 Args.notNull(endpoint, "Managed endpoint");
407 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
408 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
409 final HttpRoute route = poolEntry.getRoute();
410 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
411 connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), attachment);
412 if (LOG.isDebugEnabled()) {
413 LOG.debug("{}: upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
414 }
415 }
416
417 @Override
418 public Set<HttpRoute> getRoutes() {
419 return pool.getRoutes();
420 }
421
422 @Override
423 public void setMaxTotal(final int max) {
424 pool.setMaxTotal(max);
425 }
426
427 @Override
428 public int getMaxTotal() {
429 return pool.getMaxTotal();
430 }
431
432 @Override
433 public void setDefaultMaxPerRoute(final int max) {
434 pool.setDefaultMaxPerRoute(max);
435 }
436
437 @Override
438 public int getDefaultMaxPerRoute() {
439 return pool.getDefaultMaxPerRoute();
440 }
441
442 @Override
443 public void setMaxPerRoute(final HttpRoute route, final int max) {
444 pool.setMaxPerRoute(route, max);
445 }
446
447 @Override
448 public int getMaxPerRoute(final HttpRoute route) {
449 return pool.getMaxPerRoute(route);
450 }
451
452 @Override
453 public void closeIdle(final TimeValue idletime) {
454 pool.closeIdle(idletime);
455 }
456
457 @Override
458 public void closeExpired() {
459 pool.closeExpired();
460 }
461
462 @Override
463 public PoolStats getTotalStats() {
464 return pool.getTotalStats();
465 }
466
467 @Override
468 public PoolStats getStats(final HttpRoute route) {
469 return pool.getStats(route);
470 }
471
472 public TimeValue getValidateAfterInactivity() {
473 return validateAfterInactivity;
474 }
475
476
477
478
479
480
481
482
483 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
484 this.validateAfterInactivity = validateAfterInactivity;
485 }
486
487 private static final AtomicLong COUNT = new AtomicLong(0);
488
489 class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements Identifiable {
490
491 private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
492 private final String id;
493
494 InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
495 this.poolEntryRef = new AtomicReference<>(poolEntry);
496 this.id = String.format("ep-%08X", COUNT.getAndIncrement());
497 }
498
499 @Override
500 public String getId() {
501 return id;
502 }
503
504 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
505 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
506 if (poolEntry == null) {
507 throw new ConnectionShutdownException();
508 }
509 return poolEntry;
510 }
511
512 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
513 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
514 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
515 Asserts.check(connection != null && connection.isOpen(), "Endpoint is not connected");
516 return poolEntry;
517 }
518
519 PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
520 return poolEntryRef.getAndSet(null);
521 }
522
523 @Override
524 public void close(final CloseMode closeMode) {
525 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
526 if (poolEntry != null) {
527 if (LOG.isDebugEnabled()) {
528 LOG.debug("{}: close {}", id, closeMode);
529 }
530 poolEntry.discardConnection(closeMode);
531 }
532 }
533
534 @Override
535 public boolean isConnected() {
536 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
537 if (poolEntry == null) {
538 return false;
539 }
540 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
541 if (connection == null) {
542 return false;
543 }
544 if (!connection.isOpen()) {
545 poolEntry.discardConnection(CloseMode.IMMEDIATE);
546 return false;
547 }
548 return true;
549 }
550
551 @Override
552 public void setSocketTimeout(final Timeout timeout) {
553 getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
554 }
555
556 @Override
557 public void execute(
558 final String exchangeId,
559 final AsyncClientExchangeHandler exchangeHandler,
560 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
561 final HttpContext context) {
562 final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
563 if (LOG.isDebugEnabled()) {
564 LOG.debug("{}: executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
565 }
566 connection.submitCommand(
567 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
568 Command.Priority.NORMAL);
569 }
570
571 }
572
573 }