1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.io;
28
29 import java.io.IOException;
30 import java.util.Set;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicLong;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import org.apache.hc.client5.http.DnsResolver;
39 import org.apache.hc.client5.http.HttpRoute;
40 import org.apache.hc.client5.http.SchemePortResolver;
41 import org.apache.hc.client5.http.impl.ConnPoolSupport;
42 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
43 import org.apache.hc.client5.http.io.ConnectionEndpoint;
44 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
45 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
46 import org.apache.hc.client5.http.io.LeaseRequest;
47 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
48 import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
49 import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
50 import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
51 import org.apache.hc.core5.annotation.Contract;
52 import org.apache.hc.core5.annotation.Internal;
53 import org.apache.hc.core5.annotation.ThreadingBehavior;
54 import org.apache.hc.core5.http.ClassicHttpRequest;
55 import org.apache.hc.core5.http.ClassicHttpResponse;
56 import org.apache.hc.core5.http.HttpException;
57 import org.apache.hc.core5.http.HttpHost;
58 import org.apache.hc.core5.http.URIScheme;
59 import org.apache.hc.core5.http.config.Registry;
60 import org.apache.hc.core5.http.config.RegistryBuilder;
61 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
62 import org.apache.hc.core5.http.io.HttpConnectionFactory;
63 import org.apache.hc.core5.http.io.SocketConfig;
64 import org.apache.hc.core5.http.protocol.HttpContext;
65 import org.apache.hc.core5.io.CloseMode;
66 import org.apache.hc.core5.pool.ConnPoolControl;
67 import org.apache.hc.core5.pool.LaxConnPool;
68 import org.apache.hc.core5.pool.ManagedConnPool;
69 import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
70 import org.apache.hc.core5.pool.PoolEntry;
71 import org.apache.hc.core5.pool.PoolReusePolicy;
72 import org.apache.hc.core5.pool.PoolStats;
73 import org.apache.hc.core5.pool.StrictConnPool;
74 import org.apache.hc.core5.util.Args;
75 import org.apache.hc.core5.util.Asserts;
76 import org.apache.hc.core5.util.Identifiable;
77 import org.apache.hc.core5.util.TimeValue;
78 import org.apache.hc.core5.util.Timeout;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
101 public class PoolingHttpClientConnectionManager
102 implements HttpClientConnectionManager, ConnPoolControl<HttpRoute> {
103
104 private static final Logger LOG = LoggerFactory.getLogger(PoolingHttpClientConnectionManager.class);
105
106 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
107 public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
108
109 private final HttpClientConnectionOperator connectionOperator;
110 private final ManagedConnPool<HttpRoute, ManagedHttpClientConnection> pool;
111 private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
112 private final AtomicBoolean closed;
113
114 private volatile SocketConfig defaultSocketConfig;
115 private volatile TimeValue validateAfterInactivity;
116
117 public PoolingHttpClientConnectionManager() {
118 this(RegistryBuilder.<ConnectionSocketFactory>create()
119 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
120 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
121 .build());
122 }
123
124 public PoolingHttpClientConnectionManager(
125 final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
126 this(socketFactoryRegistry, null);
127 }
128
129 public PoolingHttpClientConnectionManager(
130 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
131 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
132 this(socketFactoryRegistry, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND, connFactory);
133 }
134
135 public PoolingHttpClientConnectionManager(
136 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
137 final PoolConcurrencyPolicy poolConcurrencyPolicy,
138 final TimeValue timeToLive,
139 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
140 this(socketFactoryRegistry, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive, connFactory);
141 }
142
143 public PoolingHttpClientConnectionManager(
144 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
145 final PoolConcurrencyPolicy poolConcurrencyPolicy,
146 final PoolReusePolicy poolReusePolicy,
147 final TimeValue timeToLive) {
148 this(socketFactoryRegistry, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null);
149 }
150
151 public PoolingHttpClientConnectionManager(
152 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
153 final PoolConcurrencyPolicy poolConcurrencyPolicy,
154 final PoolReusePolicy poolReusePolicy,
155 final TimeValue timeToLive,
156 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
157 this(socketFactoryRegistry, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null, connFactory);
158 }
159
160 public PoolingHttpClientConnectionManager(
161 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
162 final PoolConcurrencyPolicy poolConcurrencyPolicy,
163 final PoolReusePolicy poolReusePolicy,
164 final TimeValue timeToLive,
165 final SchemePortResolver schemePortResolver,
166 final DnsResolver dnsResolver,
167 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
168 this(new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
169 poolConcurrencyPolicy,
170 poolReusePolicy,
171 timeToLive,
172 connFactory);
173 }
174
175 @Internal
176 protected PoolingHttpClientConnectionManager(
177 final HttpClientConnectionOperator httpClientConnectionOperator,
178 final PoolConcurrencyPolicy poolConcurrencyPolicy,
179 final PoolReusePolicy poolReusePolicy,
180 final TimeValue timeToLive,
181 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
182 super();
183 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
184 switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
185 case STRICT:
186 this.pool = new StrictConnPool<>(
187 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
188 DEFAULT_MAX_TOTAL_CONNECTIONS,
189 timeToLive,
190 poolReusePolicy,
191 null);
192 break;
193 case LAX:
194 this.pool = new LaxConnPool<>(
195 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
196 timeToLive,
197 poolReusePolicy,
198 null);
199 break;
200 default:
201 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
202 }
203 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
204 this.closed = new AtomicBoolean(false);
205 this.validateAfterInactivity = TimeValue.ofSeconds(2L);
206 }
207
208 @Internal
209 protected PoolingHttpClientConnectionManager(
210 final HttpClientConnectionOperator httpClientConnectionOperator,
211 final ManagedConnPool<HttpRoute, ManagedHttpClientConnection> pool,
212 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
213 super();
214 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
215 this.pool = Args.notNull(pool, "Connection pool");
216 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
217 this.closed = new AtomicBoolean(false);
218 }
219
220 @Override
221 public void close() {
222 close(CloseMode.GRACEFUL);
223 }
224
225 @Override
226 public void close(final CloseMode closeMode) {
227 if (this.closed.compareAndSet(false, true)) {
228 if (LOG.isDebugEnabled()) {
229 LOG.debug("Shutdown connection pool {}", closeMode);
230 }
231 this.pool.close(closeMode);
232 LOG.debug("Connection pool shut down");
233 }
234 }
235
236 private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
237 if (endpoint instanceof InternalConnectionEndpoint) {
238 return (InternalConnectionEndpoint) endpoint;
239 }
240 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
241 }
242
243 public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
244 return lease(id, route, Timeout.DISABLED, state);
245 }
246
247 @Override
248 public LeaseRequest lease(
249 final String id,
250 final HttpRoute route,
251 final Timeout requestTimeout,
252 final Object state) {
253 Args.notNull(route, "HTTP route");
254 if (LOG.isDebugEnabled()) {
255 LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
256 }
257 final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, requestTimeout, null);
258 return new LeaseRequest() {
259
260 private volatile ConnectionEndpoint endpoint;
261
262 @Override
263 public synchronized ConnectionEndpoint get(
264 final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
265 Args.notNull(timeout, "Operation timeout");
266 if (this.endpoint != null) {
267 return this.endpoint;
268 }
269 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
270 try {
271 poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
272 } catch (final TimeoutException ex) {
273 leaseFuture.cancel(true);
274 throw ex;
275 }
276 if (LOG.isDebugEnabled()) {
277 LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
278 }
279 try {
280 final TimeValue validateAfterInactivitySnapshot = validateAfterInactivity;
281 if (TimeValue.isNonNegative(validateAfterInactivitySnapshot)) {
282 final ManagedHttpClientConnection conn = poolEntry.getConnection();
283 if (conn != null
284 && poolEntry.getUpdated() + validateAfterInactivitySnapshot.toMilliseconds() <= System.currentTimeMillis()) {
285 boolean stale;
286 try {
287 stale = conn.isStale();
288 } catch (final IOException ignore) {
289 stale = true;
290 }
291 if (stale) {
292 if (LOG.isDebugEnabled()) {
293 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
294 }
295 poolEntry.discardConnection(CloseMode.IMMEDIATE);
296 }
297 }
298 }
299 final ManagedHttpClientConnection conn = poolEntry.getConnection();
300 if (conn != null) {
301 conn.activate();
302 } else {
303 poolEntry.assignConnection(connFactory.createConnection(null));
304 }
305 this.endpoint = new InternalConnectionEndpoint(poolEntry);
306 if (LOG.isDebugEnabled()) {
307 LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
308 }
309 return this.endpoint;
310 } catch (final Exception ex) {
311 if (LOG.isDebugEnabled()) {
312 LOG.debug("{} endpoint lease failed", id);
313 }
314 pool.release(poolEntry, false);
315 throw new ExecutionException(ex.getMessage(), ex);
316 }
317 }
318
319 @Override
320 public boolean cancel() {
321 return leaseFuture.cancel(true);
322 }
323
324 };
325
326 }
327
328 @Override
329 public void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
330 Args.notNull(endpoint, "Managed endpoint");
331 final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = cast(endpoint).detach();
332 if (entry == null) {
333 return;
334 }
335 if (LOG.isDebugEnabled()) {
336 LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
337 }
338 final ManagedHttpClientConnection conn = entry.getConnection();
339 if (conn != null && keepAlive == null) {
340 conn.close(CloseMode.GRACEFUL);
341 }
342 boolean reusable = conn != null && conn.isOpen() && conn.isConsistent();
343 try {
344 if (reusable) {
345 entry.updateState(state);
346 entry.updateExpiry(keepAlive);
347 conn.passivate();
348 if (LOG.isDebugEnabled()) {
349 final String s;
350 if (TimeValue.isPositive(keepAlive)) {
351 s = "for " + keepAlive;
352 } else {
353 s = "indefinitely";
354 }
355 LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn), s);
356 }
357 } else {
358 if (LOG.isDebugEnabled()) {
359 LOG.debug("{} connection is not kept alive", ConnPoolSupport.getId(endpoint));
360 }
361 }
362 } catch (final RuntimeException ex) {
363 reusable = false;
364 throw ex;
365 } finally {
366 this.pool.release(entry, reusable);
367 if (LOG.isDebugEnabled()) {
368 LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
369 }
370 }
371 }
372
373 @Override
374 public void connect(final ConnectionEndpoint endpoint, final TimeValue connectTimeout, final HttpContext context) throws IOException {
375 Args.notNull(endpoint, "Managed endpoint");
376 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
377 if (internalEndpoint.isConnected()) {
378 return;
379 }
380 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = internalEndpoint.getPoolEntry();
381 if (!poolEntry.hasConnection()) {
382 poolEntry.assignConnection(connFactory.createConnection(null));
383 }
384 final HttpRoute route = poolEntry.getRoute();
385 final HttpHost host;
386 if (route.getProxyHost() != null) {
387 host = route.getProxyHost();
388 } else {
389 host = route.getTargetHost();
390 }
391 if (LOG.isDebugEnabled()) {
392 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
393 }
394 final ManagedHttpClientConnection conn = poolEntry.getConnection();
395 final SocketConfig defaultSocketConfigSnapshot = defaultSocketConfig;
396 this.connectionOperator.connect(
397 conn,
398 host,
399 route.getLocalSocketAddress(),
400 connectTimeout,
401 defaultSocketConfigSnapshot != null ? defaultSocketConfigSnapshot : SocketConfig.DEFAULT,
402 context);
403 if (LOG.isDebugEnabled()) {
404 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
405 }
406 }
407
408 @Override
409 public void upgrade(final ConnectionEndpoint endpoint, final HttpContext context) throws IOException {
410 Args.notNull(endpoint, "Managed endpoint");
411 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
412 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
413 final HttpRoute route = poolEntry.getRoute();
414 this.connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), context);
415 }
416
417 @Override
418 public void closeIdle(final TimeValue idleTime) {
419 Args.notNull(idleTime, "Idle time");
420 if (LOG.isDebugEnabled()) {
421 LOG.debug("Closing connections idle longer than {}", idleTime);
422 }
423 this.pool.closeIdle(idleTime);
424 }
425
426 @Override
427 public void closeExpired() {
428 LOG.debug("Closing expired connections");
429 this.pool.closeExpired();
430 }
431
432 @Override
433 public Set<HttpRoute> getRoutes() {
434 return this.pool.getRoutes();
435 }
436
437 @Override
438 public int getMaxTotal() {
439 return this.pool.getMaxTotal();
440 }
441
442 @Override
443 public void setMaxTotal(final int max) {
444 this.pool.setMaxTotal(max);
445 }
446
447 @Override
448 public int getDefaultMaxPerRoute() {
449 return this.pool.getDefaultMaxPerRoute();
450 }
451
452 @Override
453 public void setDefaultMaxPerRoute(final int max) {
454 this.pool.setDefaultMaxPerRoute(max);
455 }
456
457 @Override
458 public int getMaxPerRoute(final HttpRoute route) {
459 return this.pool.getMaxPerRoute(route);
460 }
461
462 @Override
463 public void setMaxPerRoute(final HttpRoute route, final int max) {
464 this.pool.setMaxPerRoute(route, max);
465 }
466
467 @Override
468 public PoolStats getTotalStats() {
469 return this.pool.getTotalStats();
470 }
471
472 @Override
473 public PoolStats getStats(final HttpRoute route) {
474 return this.pool.getStats(route);
475 }
476
477 public SocketConfig getDefaultSocketConfig() {
478 return this.defaultSocketConfig;
479 }
480
481 public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
482 this.defaultSocketConfig = defaultSocketConfig;
483 }
484
485
486
487
488
489
490 public TimeValue getValidateAfterInactivity() {
491 return validateAfterInactivity;
492 }
493
494
495
496
497
498
499
500
501
502 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
503 this.validateAfterInactivity = validateAfterInactivity;
504 }
505
506 private static final AtomicLong COUNT = new AtomicLong(0);
507
508 class InternalConnectionEndpoint extends ConnectionEndpoint implements Identifiable {
509
510 private final AtomicReference<PoolEntry<HttpRoute, ManagedHttpClientConnection>> poolEntryRef;
511 private final String id;
512
513 InternalConnectionEndpoint(
514 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry) {
515 this.poolEntryRef = new AtomicReference<>(poolEntry);
516 this.id = String.format("ep-%010d", COUNT.getAndIncrement());
517 }
518
519 @Override
520 public String getId() {
521 return id;
522 }
523
524 PoolEntry<HttpRoute, ManagedHttpClientConnection> getPoolEntry() {
525 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = poolEntryRef.get();
526 if (poolEntry == null) {
527 throw new ConnectionShutdownException();
528 }
529 return poolEntry;
530 }
531
532 PoolEntry<HttpRoute, ManagedHttpClientConnection> getValidatedPoolEntry() {
533 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = getPoolEntry();
534 final ManagedHttpClientConnection connection = poolEntry.getConnection();
535 Asserts.check(connection != null && connection.isOpen(), "Endpoint is not connected");
536 return poolEntry;
537 }
538
539 PoolEntry<HttpRoute, ManagedHttpClientConnection> detach() {
540 return poolEntryRef.getAndSet(null);
541 }
542
543 @Override
544 public void close(final CloseMode closeMode) {
545 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = poolEntryRef.get();
546 if (poolEntry != null) {
547 poolEntry.discardConnection(closeMode);
548 }
549 }
550
551 @Override
552 public void close() throws IOException {
553 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = poolEntryRef.get();
554 if (poolEntry != null) {
555 poolEntry.discardConnection(CloseMode.GRACEFUL);
556 }
557 }
558
559 @Override
560 public boolean isConnected() {
561 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = getPoolEntry();
562 final ManagedHttpClientConnection connection = poolEntry.getConnection();
563 return connection != null && connection.isOpen();
564 }
565
566 @Override
567 public void setSocketTimeout(final Timeout timeout) {
568 getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
569 }
570
571 @Override
572 public ClassicHttpResponse execute(
573 final String exchangeId,
574 final ClassicHttpRequest request,
575 final HttpRequestExecutor requestExecutor,
576 final HttpContext context) throws IOException, HttpException {
577 Args.notNull(request, "HTTP request");
578 Args.notNull(requestExecutor, "Request executor");
579 final ManagedHttpClientConnection connection = getValidatedPoolEntry().getConnection();
580 if (LOG.isDebugEnabled()) {
581 LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
582 }
583 return requestExecutor.execute(request, connection, context);
584 }
585
586 }
587
588 }