View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.nio;
29  
30  import java.net.InetSocketAddress;
31  import java.util.Set;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.client5.http.DnsResolver;
38  import org.apache.hc.client5.http.HttpRoute;
39  import org.apache.hc.client5.http.SchemePortResolver;
40  import org.apache.hc.client5.http.impl.ConnPoolSupport;
41  import org.apache.hc.client5.http.impl.ConnectionShutdownException;
42  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
43  import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
44  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
45  import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
46  import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
47  import org.apache.hc.core5.annotation.Contract;
48  import org.apache.hc.core5.annotation.Internal;
49  import org.apache.hc.core5.annotation.ThreadingBehavior;
50  import org.apache.hc.core5.concurrent.ComplexFuture;
51  import org.apache.hc.core5.concurrent.FutureCallback;
52  import org.apache.hc.core5.function.Callback;
53  import org.apache.hc.core5.http.HttpHost;
54  import org.apache.hc.core5.http.HttpVersion;
55  import org.apache.hc.core5.http.ProtocolVersion;
56  import org.apache.hc.core5.http.config.Lookup;
57  import org.apache.hc.core5.http.config.RegistryBuilder;
58  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
59  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
60  import org.apache.hc.core5.http.nio.HandlerFactory;
61  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
62  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
63  import org.apache.hc.core5.http.protocol.HttpContext;
64  import org.apache.hc.core5.http2.nio.command.PingCommand;
65  import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
66  import org.apache.hc.core5.io.CloseMode;
67  import org.apache.hc.core5.pool.ConnPoolControl;
68  import org.apache.hc.core5.pool.LaxConnPool;
69  import org.apache.hc.core5.pool.ManagedConnPool;
70  import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
71  import org.apache.hc.core5.pool.PoolEntry;
72  import org.apache.hc.core5.pool.PoolReusePolicy;
73  import org.apache.hc.core5.pool.PoolStats;
74  import org.apache.hc.core5.pool.StrictConnPool;
75  import org.apache.hc.core5.reactor.Command;
76  import org.apache.hc.core5.reactor.ConnectionInitiator;
77  import org.apache.hc.core5.util.Args;
78  import org.apache.hc.core5.util.Asserts;
79  import org.apache.hc.core5.util.Identifiable;
80  import org.apache.hc.core5.util.TimeValue;
81  import org.apache.hc.core5.util.Timeout;
82  import org.slf4j.Logger;
83  import org.slf4j.LoggerFactory;
84  
85  /**
86   * {@code PoolingAsyncClientConnectionManager} maintains a pool of non-blocking
87   * {@link org.apache.hc.core5.http.HttpConnection}s and is able to service
88   * connection requests from multiple execution threads. Connections are pooled
89   * on a per route basis. A request for a route which already the manager has
90   * persistent connections for available in the pool will be services by leasing
91   * a connection from the pool rather than creating a new connection.
92   * <p>
93   * {@code PoolingAsyncClientConnectionManager} maintains a maximum limit
94   * of connection on a per route basis and in total. Connection limits
95   * can be adjusted using {@link ConnPoolControl} methods.
96   * <p>
97   * Total time to live (TTL) set at construction time defines maximum life span
98   * of persistent connections regardless of their expiration setting. No persistent
99   * connection will be re-used past its TTL value.
100  *
101  * @since 5.0
102  */
103 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
104 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
105 
106     private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
107 
108     public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
109     public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
110 
111     private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
112     private final AsyncClientConnectionOperator connectionOperator;
113     private final AtomicBoolean closed;
114 
115     private volatile TimeValue validateAfterInactivity;
116 
117     public PoolingAsyncClientConnectionManager() {
118         this(RegistryBuilder.<TlsStrategy>create()
119                 .register("https", DefaultClientTlsStrategy.getDefault())
120                 .build());
121     }
122 
123     public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
124         this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
125     }
126 
127     public PoolingAsyncClientConnectionManager(
128             final Lookup<TlsStrategy> tlsStrategyLookup,
129             final PoolConcurrencyPolicy poolConcurrencyPolicy,
130             final TimeValue timeToLive) {
131         this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
132     }
133 
134     public PoolingAsyncClientConnectionManager(
135             final Lookup<TlsStrategy> tlsStrategyLookup,
136             final PoolConcurrencyPolicy poolConcurrencyPolicy,
137             final PoolReusePolicy poolReusePolicy,
138             final TimeValue timeToLive) {
139         this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
140     }
141 
142     public PoolingAsyncClientConnectionManager(
143             final Lookup<TlsStrategy> tlsStrategyLookup,
144             final PoolConcurrencyPolicy poolConcurrencyPolicy,
145             final PoolReusePolicy poolReusePolicy,
146             final TimeValue timeToLive,
147             final SchemePortResolver schemePortResolver,
148             final DnsResolver dnsResolver) {
149         this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
150                 poolConcurrencyPolicy, poolReusePolicy, timeToLive);
151     }
152 
153     @Internal
154     protected PoolingAsyncClientConnectionManager(
155             final AsyncClientConnectionOperator connectionOperator,
156             final PoolConcurrencyPolicy poolConcurrencyPolicy,
157             final PoolReusePolicy poolReusePolicy,
158             final TimeValue timeToLive) {
159         this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
160         switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
161             case STRICT:
162                 this.pool = new StrictConnPool<>(
163                         DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
164                         DEFAULT_MAX_TOTAL_CONNECTIONS,
165                         timeToLive,
166                         poolReusePolicy,
167                         null);
168                 break;
169             case LAX:
170                 this.pool = new LaxConnPool<>(
171                         DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
172                         timeToLive,
173                         poolReusePolicy,
174                         null);
175                 break;
176             default:
177                 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
178         }
179         this.closed = new AtomicBoolean(false);
180     }
181 
182     @Internal
183     protected PoolingAsyncClientConnectionManager(
184             final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
185             final AsyncClientConnectionOperator connectionOperator) {
186         this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
187         this.pool = Args.notNull(pool, "Connection pool");
188         this.closed = new AtomicBoolean(false);
189     }
190 
191     @Override
192     public void close() {
193         close(CloseMode.GRACEFUL);
194     }
195 
196     @Override
197     public void close(final CloseMode closeMode) {
198         if (this.closed.compareAndSet(false, true)) {
199             if (LOG.isDebugEnabled()) {
200                 LOG.debug("Shutdown connection pool {}", closeMode);
201             }
202             this.pool.close(closeMode);
203             LOG.debug("Connection pool shut down");
204         }
205     }
206 
207     private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
208         if (endpoint instanceof InternalConnectionEndpoint) {
209             return (InternalConnectionEndpoint) endpoint;
210         }
211         throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
212     }
213 
214     @Override
215     public Future<AsyncConnectionEndpoint> lease(
216             final String id,
217             final HttpRoute route,
218             final Object state,
219             final Timeout requestTimeout,
220             final FutureCallback<AsyncConnectionEndpoint> callback) {
221         if (LOG.isDebugEnabled()) {
222             LOG.debug("{}: endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
223         }
224         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
225         final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
226                 route, state, requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
227 
228                     void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
229                         final ManagedAsyncClientConnection connection = poolEntry.getConnection();
230                         if (connection != null) {
231                             connection.activate();
232                         }
233                         if (LOG.isDebugEnabled()) {
234                             LOG.debug("{}: endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
235                         }
236                         final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
237                         if (LOG.isDebugEnabled()) {
238                             LOG.debug("{}: acquired {}", id, ConnPoolSupport.getId(endpoint));
239                         }
240                         resultFuture.completed(endpoint);
241                     }
242 
243                     @Override
244                     public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
245                         final ManagedAsyncClientConnection connection = poolEntry.getConnection();
246                         if (connection != null) {
247                             if (connection.isOpen()) {
248                                 final ProtocolVersion protocolVersion = connection.getProtocolVersion();
249                                 if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
250                                     final TimeValue timeValue = PoolingAsyncClientConnectionManager.this.validateAfterInactivity;
251                                     if (TimeValue.isNonNegative(timeValue) &&
252                                             poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
253                                         connection.submitCommand(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
254 
255                                             @Override
256                                             public void execute(final Boolean result) {
257                                                 if (result == null || !result) {
258                                                     if (LOG.isDebugEnabled()) {
259                                                         LOG.debug("{}: connection {} is stale", id, ConnPoolSupport.getId(connection));
260                                                     }
261                                                     poolEntry.discardConnection(CloseMode.IMMEDIATE);
262                                                 }
263                                                 leaseCompleted(poolEntry);
264                                             }
265 
266                                         })), Command.Priority.IMMEDIATE);
267                                         return;
268                                     }
269                                 }
270                             } else {
271                                 if (LOG.isDebugEnabled()) {
272                                     LOG.debug("{}: connection {} is closed", id, ConnPoolSupport.getId(connection));
273                                 }
274                                 poolEntry.discardConnection(CloseMode.IMMEDIATE);
275                             }
276                         }
277                         leaseCompleted(poolEntry);
278                     }
279 
280                     @Override
281                     public void failed(final Exception ex) {
282                         if (LOG.isDebugEnabled()) {
283                             LOG.debug("{}: endpoint lease failed", id);
284                         }
285                         resultFuture.failed(ex);
286                     }
287 
288                     @Override
289                     public void cancelled() {
290                         if (LOG.isDebugEnabled()) {
291                             LOG.debug("{}: endpoint lease cancelled", id);
292                         }
293                         resultFuture.cancel();
294                     }
295 
296                 });
297 
298         resultFuture.setDependency(leaseFuture);
299         return resultFuture;
300     }
301 
302     @Override
303     public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
304         Args.notNull(endpoint, "Managed endpoint");
305         Args.notNull(keepAlive, "Keep-alive time");
306         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
307         if (entry == null) {
308             return;
309         }
310         if (LOG.isDebugEnabled()) {
311             LOG.debug("{}: releasing endpoint", ConnPoolSupport.getId(endpoint));
312         }
313         final ManagedAsyncClientConnection connection = entry.getConnection();
314         boolean reusable = connection != null && connection.isOpen();
315         try {
316             if (reusable) {
317                 entry.updateState(state);
318                 entry.updateExpiry(keepAlive);
319                 connection.passivate();
320                 if (LOG.isDebugEnabled()) {
321                     final String s;
322                     if (TimeValue.isPositive(keepAlive)) {
323                         s = "for " + keepAlive;
324                     } else {
325                         s = "indefinitely";
326                     }
327                     LOG.debug("{}: connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
328                 }
329             }
330         } catch (final RuntimeException ex) {
331             reusable = false;
332             throw ex;
333         } finally {
334             pool.release(entry, reusable);
335             if (LOG.isDebugEnabled()) {
336                 LOG.debug("{}: connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
337             }
338         }
339     }
340 
341     @Override
342     public Future<AsyncConnectionEndpoint> connect(
343             final AsyncConnectionEndpoint endpoint,
344             final ConnectionInitiator connectionInitiator,
345             final Timeout connectTimeout,
346             final Object attachment,
347             final HttpContext context,
348             final FutureCallback<AsyncConnectionEndpoint> callback) {
349         Args.notNull(endpoint, "Endpoint");
350         Args.notNull(connectionInitiator, "Connection initiator");
351         Args.notNull(connectTimeout, "Timeout");
352         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
353         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
354         if (internalEndpoint.isConnected()) {
355             resultFuture.completed(endpoint);
356             return resultFuture;
357         }
358         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
359         final HttpRoute route = poolEntry.getRoute();
360         final HttpHost host;
361         if (route.getProxyHost() != null) {
362             host = route.getProxyHost();
363         } else {
364             host = route.getTargetHost();
365         }
366         final InetSocketAddress localAddress = route.getLocalSocketAddress();
367         if (LOG.isDebugEnabled()) {
368             LOG.debug("{}: connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
369         }
370         final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
371                 connectionInitiator, host, localAddress, connectTimeout, attachment, new FutureCallback<ManagedAsyncClientConnection>() {
372 
373                     @Override
374                     public void completed(final ManagedAsyncClientConnection connection) {
375                         try {
376                             if (LOG.isDebugEnabled()) {
377                                 LOG.debug("{}: connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
378                             }
379                             poolEntry.assignConnection(connection);
380                             resultFuture.completed(internalEndpoint);
381                         } catch (final RuntimeException ex) {
382                             resultFuture.failed(ex);
383                         }
384                     }
385 
386                     @Override
387                     public void failed(final Exception ex) {
388                         resultFuture.failed(ex);
389                     }
390 
391                     @Override
392                     public void cancelled() {
393                         resultFuture.cancel();
394                     }
395 
396                 });
397         resultFuture.setDependency(connectFuture);
398         return resultFuture;
399     }
400 
401     @Override
402     public void upgrade(
403             final AsyncConnectionEndpoint endpoint,
404             final Object attachment,
405             final HttpContext context) {
406         Args.notNull(endpoint, "Managed endpoint");
407         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
408         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
409         final HttpRoute route = poolEntry.getRoute();
410         final ManagedAsyncClientConnection connection = poolEntry.getConnection();
411         connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), attachment);
412         if (LOG.isDebugEnabled()) {
413             LOG.debug("{}: upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
414         }
415     }
416 
417     @Override
418     public Set<HttpRoute> getRoutes() {
419         return pool.getRoutes();
420     }
421 
422     @Override
423     public void setMaxTotal(final int max) {
424         pool.setMaxTotal(max);
425     }
426 
427     @Override
428     public int getMaxTotal() {
429         return pool.getMaxTotal();
430     }
431 
432     @Override
433     public void setDefaultMaxPerRoute(final int max) {
434         pool.setDefaultMaxPerRoute(max);
435     }
436 
437     @Override
438     public int getDefaultMaxPerRoute() {
439         return pool.getDefaultMaxPerRoute();
440     }
441 
442     @Override
443     public void setMaxPerRoute(final HttpRoute route, final int max) {
444         pool.setMaxPerRoute(route, max);
445     }
446 
447     @Override
448     public int getMaxPerRoute(final HttpRoute route) {
449         return pool.getMaxPerRoute(route);
450     }
451 
452     @Override
453     public void closeIdle(final TimeValue idletime) {
454         pool.closeIdle(idletime);
455     }
456 
457     @Override
458     public void closeExpired() {
459         pool.closeExpired();
460     }
461 
462     @Override
463     public PoolStats getTotalStats() {
464         return pool.getTotalStats();
465     }
466 
467     @Override
468     public PoolStats getStats(final HttpRoute route) {
469         return pool.getStats(route);
470     }
471 
472     public TimeValue getValidateAfterInactivity() {
473         return validateAfterInactivity;
474     }
475 
476     /**
477      * Defines period of inactivity after which persistent connections must
478      * be re-validated prior to being {@link #lease(String, HttpRoute, Object, Timeout,
479      * FutureCallback)} leased} to the consumer. Negative values passed
480      * to this method disable connection validation. This check helps detect connections
481      * that have become stale (half-closed) while kept inactive in the pool.
482      */
483     public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
484         this.validateAfterInactivity = validateAfterInactivity;
485     }
486 
487     private static final AtomicLong COUNT = new AtomicLong(0);
488 
489     class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements Identifiable {
490 
491         private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
492         private final String id;
493 
494         InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
495             this.poolEntryRef = new AtomicReference<>(poolEntry);
496             this.id = String.format("ep-%08X", COUNT.getAndIncrement());
497         }
498 
499         @Override
500         public String getId() {
501             return id;
502         }
503 
504         PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
505             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
506             if (poolEntry == null) {
507                 throw new ConnectionShutdownException();
508             }
509             return poolEntry;
510         }
511 
512         PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
513             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
514             final ManagedAsyncClientConnection connection = poolEntry.getConnection();
515             Asserts.check(connection != null && connection.isOpen(), "Endpoint is not connected");
516             return poolEntry;
517         }
518 
519         PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
520             return poolEntryRef.getAndSet(null);
521         }
522 
523         @Override
524         public void close(final CloseMode closeMode) {
525             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
526             if (poolEntry != null) {
527                 if (LOG.isDebugEnabled()) {
528                     LOG.debug("{}: close {}", id, closeMode);
529                 }
530                 poolEntry.discardConnection(closeMode);
531             }
532         }
533 
534         @Override
535         public boolean isConnected() {
536             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
537             if (poolEntry == null) {
538                 return false;
539             }
540             final ManagedAsyncClientConnection connection = poolEntry.getConnection();
541             if (connection == null) {
542                 return false;
543             }
544             if (!connection.isOpen()) {
545                 poolEntry.discardConnection(CloseMode.IMMEDIATE);
546                 return false;
547             }
548             return true;
549         }
550 
551         @Override
552         public void setSocketTimeout(final Timeout timeout) {
553             getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
554         }
555 
556         @Override
557         public void execute(
558                 final String exchangeId,
559                 final AsyncClientExchangeHandler exchangeHandler,
560                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
561                 final HttpContext context) {
562             final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
563             if (LOG.isDebugEnabled()) {
564                 LOG.debug("{}: executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
565             }
566             connection.submitCommand(
567                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
568                     Command.Priority.NORMAL);
569         }
570 
571     }
572 
573 }