View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.nio;
29  
30  import java.net.InetSocketAddress;
31  import java.util.Set;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.client5.http.DnsResolver;
38  import org.apache.hc.client5.http.HttpRoute;
39  import org.apache.hc.client5.http.SchemePortResolver;
40  import org.apache.hc.client5.http.impl.ConnPoolSupport;
41  import org.apache.hc.client5.http.impl.ConnectionShutdownException;
42  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
43  import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
44  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
45  import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
46  import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
47  import org.apache.hc.core5.annotation.Contract;
48  import org.apache.hc.core5.annotation.Internal;
49  import org.apache.hc.core5.annotation.ThreadingBehavior;
50  import org.apache.hc.core5.concurrent.ComplexFuture;
51  import org.apache.hc.core5.concurrent.FutureCallback;
52  import org.apache.hc.core5.function.Callback;
53  import org.apache.hc.core5.http.HttpHost;
54  import org.apache.hc.core5.http.HttpVersion;
55  import org.apache.hc.core5.http.ProtocolVersion;
56  import org.apache.hc.core5.http.config.Lookup;
57  import org.apache.hc.core5.http.config.RegistryBuilder;
58  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
59  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
60  import org.apache.hc.core5.http.nio.HandlerFactory;
61  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
62  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
63  import org.apache.hc.core5.http.protocol.HttpContext;
64  import org.apache.hc.core5.http2.nio.command.PingCommand;
65  import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
66  import org.apache.hc.core5.io.CloseMode;
67  import org.apache.hc.core5.pool.ConnPoolControl;
68  import org.apache.hc.core5.pool.LaxConnPool;
69  import org.apache.hc.core5.pool.ManagedConnPool;
70  import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
71  import org.apache.hc.core5.pool.PoolEntry;
72  import org.apache.hc.core5.pool.PoolReusePolicy;
73  import org.apache.hc.core5.pool.PoolStats;
74  import org.apache.hc.core5.pool.StrictConnPool;
75  import org.apache.hc.core5.reactor.Command;
76  import org.apache.hc.core5.reactor.ConnectionInitiator;
77  import org.apache.hc.core5.util.Args;
78  import org.apache.hc.core5.util.Asserts;
79  import org.apache.hc.core5.util.Identifiable;
80  import org.apache.hc.core5.util.TimeValue;
81  import org.apache.hc.core5.util.Timeout;
82  import org.slf4j.Logger;
83  import org.slf4j.LoggerFactory;
84  
85  /**
86   * {@code PoolingAsyncClientConnectionManager} maintains a pool of non-blocking
87   * {@link org.apache.hc.core5.http.HttpConnection}s and is able to service
88   * connection requests from multiple execution threads. Connections are pooled
89   * on a per route basis. A request for a route which already the manager has
90   * persistent connections for available in the pool will be services by leasing
91   * a connection from the pool rather than creating a new connection.
92   * <p>
93   * {@code PoolingAsyncClientConnectionManager} maintains a maximum limit
94   * of connection on a per route basis and in total. Connection limits
95   * can be adjusted using {@link ConnPoolControl} methods.
96   * <p>
97   * Total time to live (TTL) set at construction time defines maximum life span
98   * of persistent connections regardless of their expiration setting. No persistent
99   * connection will be re-used past its TTL value.
100  *
101  * @since 5.0
102  */
103 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
104 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
105 
106     private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
107 
108     public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
109     public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
110 
111     private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
112     private final AsyncClientConnectionOperator connectionOperator;
113     private final AtomicBoolean closed;
114 
115     private volatile TimeValue validateAfterInactivity;
116 
117     public PoolingAsyncClientConnectionManager() {
118         this(RegistryBuilder.<TlsStrategy>create()
119                 .register("https", DefaultClientTlsStrategy.getDefault())
120                 .build());
121     }
122 
123     public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
124         this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
125     }
126 
127     public PoolingAsyncClientConnectionManager(
128             final Lookup<TlsStrategy> tlsStrategyLookup,
129             final PoolConcurrencyPolicy poolConcurrencyPolicy,
130             final TimeValue timeToLive) {
131         this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
132     }
133 
134     public PoolingAsyncClientConnectionManager(
135             final Lookup<TlsStrategy> tlsStrategyLookup,
136             final PoolConcurrencyPolicy poolConcurrencyPolicy,
137             final PoolReusePolicy poolReusePolicy,
138             final TimeValue timeToLive) {
139         this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
140     }
141 
142     public PoolingAsyncClientConnectionManager(
143             final Lookup<TlsStrategy> tlsStrategyLookup,
144             final PoolConcurrencyPolicy poolConcurrencyPolicy,
145             final PoolReusePolicy poolReusePolicy,
146             final TimeValue timeToLive,
147             final SchemePortResolver schemePortResolver,
148             final DnsResolver dnsResolver) {
149         this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
150                 poolConcurrencyPolicy, poolReusePolicy, timeToLive);
151     }
152 
153     @Internal
154     protected PoolingAsyncClientConnectionManager(
155             final AsyncClientConnectionOperator connectionOperator,
156             final PoolConcurrencyPolicy poolConcurrencyPolicy,
157             final PoolReusePolicy poolReusePolicy,
158             final TimeValue timeToLive) {
159         this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
160         switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
161             case STRICT:
162                 this.pool = new StrictConnPool<>(
163                         DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
164                         DEFAULT_MAX_TOTAL_CONNECTIONS,
165                         timeToLive,
166                         poolReusePolicy,
167                         null);
168                 break;
169             case LAX:
170                 this.pool = new LaxConnPool<>(
171                         DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
172                         timeToLive,
173                         poolReusePolicy,
174                         null);
175                 break;
176             default:
177                 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
178         }
179         this.closed = new AtomicBoolean(false);
180     }
181 
182     @Internal
183     protected PoolingAsyncClientConnectionManager(
184             final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
185             final AsyncClientConnectionOperator connectionOperator) {
186         this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
187         this.pool = Args.notNull(pool, "Connection pool");
188         this.closed = new AtomicBoolean(false);
189     }
190 
191     @Override
192     public void close() {
193         close(CloseMode.GRACEFUL);
194     }
195 
196     @Override
197     public void close(final CloseMode closeMode) {
198         if (this.closed.compareAndSet(false, true)) {
199             if (LOG.isDebugEnabled()) {
200                 LOG.debug("Shutdown connection pool {}", closeMode);
201             }
202             this.pool.close(closeMode);
203             LOG.debug("Connection pool shut down");
204         }
205     }
206 
207     private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
208         if (endpoint instanceof InternalConnectionEndpoint) {
209             return (InternalConnectionEndpoint) endpoint;
210         }
211         throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
212     }
213 
214     @Override
215     public Future<AsyncConnectionEndpoint> lease(
216             final String id,
217             final HttpRoute route,
218             final Object state,
219             final Timeout requestTimeout,
220             final FutureCallback<AsyncConnectionEndpoint> callback) {
221         if (LOG.isDebugEnabled()) {
222             LOG.debug("{}: endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
223         }
224         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
225         final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
226                 route, state, requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
227 
228                     void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
229                         final ManagedAsyncClientConnection connection = poolEntry.getConnection();
230                         if (connection != null) {
231                             connection.activate();
232                         }
233                         if (LOG.isDebugEnabled()) {
234                             LOG.debug("{}: endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
235                         }
236                         final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
237                         if (LOG.isDebugEnabled()) {
238                             LOG.debug("{}: acquired {}", id, ConnPoolSupport.getId(endpoint));
239                         }
240                         resultFuture.completed(endpoint);
241                     }
242 
243                     @Override
244                     public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
245                         final ManagedAsyncClientConnection connection = poolEntry.getConnection();
246                         final TimeValue timeValue = PoolingAsyncClientConnectionManager.this.validateAfterInactivity;
247                         if (TimeValue.isNonNegative(timeValue) && connection != null &&
248                                 poolEntry.getUpdated() + timeValue.toMilliseconds() <= System.currentTimeMillis()) {
249                             final ProtocolVersion protocolVersion = connection.getProtocolVersion();
250                             if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
251                                 connection.submitCommand(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
252 
253                                     @Override
254                                     public void execute(final Boolean result) {
255                                         if (result == null || !result)  {
256                                             if (LOG.isDebugEnabled()) {
257                                                 LOG.debug("{}: connection {} is stale", id, ConnPoolSupport.getId(connection));
258                                             }
259                                             poolEntry.discardConnection(CloseMode.IMMEDIATE);
260                                         }
261                                         leaseCompleted(poolEntry);
262                                     }
263 
264                                 })), Command.Priority.IMMEDIATE);
265                             } else {
266                                 if (!connection.isOpen()) {
267                                     if (LOG.isDebugEnabled()) {
268                                         LOG.debug("{}: connection {} is closed", id, ConnPoolSupport.getId(connection));
269                                     }
270                                     poolEntry.discardConnection(CloseMode.IMMEDIATE);
271                                 }
272                                 leaseCompleted(poolEntry);
273                             }
274                         } else {
275                             leaseCompleted(poolEntry);
276                         }
277                     }
278 
279                     @Override
280                     public void failed(final Exception ex) {
281                         if (LOG.isDebugEnabled()) {
282                             LOG.debug("{}: endpoint lease failed", id);
283                         }
284                         resultFuture.failed(ex);
285                     }
286 
287                     @Override
288                     public void cancelled() {
289                         if (LOG.isDebugEnabled()) {
290                             LOG.debug("{}: endpoint lease cancelled", id);
291                         }
292                         resultFuture.cancel();
293                     }
294 
295                 });
296 
297         resultFuture.setDependency(leaseFuture);
298         return resultFuture;
299     }
300 
301     @Override
302     public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
303         Args.notNull(endpoint, "Managed endpoint");
304         Args.notNull(keepAlive, "Keep-alive time");
305         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
306         if (entry == null) {
307             return;
308         }
309         if (LOG.isDebugEnabled()) {
310             LOG.debug("{}: releasing endpoint", ConnPoolSupport.getId(endpoint));
311         }
312         final ManagedAsyncClientConnection connection = entry.getConnection();
313         boolean reusable = connection != null && connection.isOpen();
314         try {
315             if (reusable) {
316                 entry.updateState(state);
317                 entry.updateExpiry(keepAlive);
318                 connection.passivate();
319                 if (LOG.isDebugEnabled()) {
320                     final String s;
321                     if (TimeValue.isPositive(keepAlive)) {
322                         s = "for " + keepAlive;
323                     } else {
324                         s = "indefinitely";
325                     }
326                     LOG.debug("{}: connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
327                 }
328             }
329         } catch (final RuntimeException ex) {
330             reusable = false;
331             throw ex;
332         } finally {
333             pool.release(entry, reusable);
334             if (LOG.isDebugEnabled()) {
335                 LOG.debug("{}: connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
336             }
337         }
338     }
339 
340     @Override
341     public Future<AsyncConnectionEndpoint> connect(
342             final AsyncConnectionEndpoint endpoint,
343             final ConnectionInitiator connectionInitiator,
344             final Timeout connectTimeout,
345             final Object attachment,
346             final HttpContext context,
347             final FutureCallback<AsyncConnectionEndpoint> callback) {
348         Args.notNull(endpoint, "Endpoint");
349         Args.notNull(connectionInitiator, "Connection initiator");
350         Args.notNull(connectTimeout, "Timeout");
351         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
352         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
353         if (internalEndpoint.isConnected()) {
354             resultFuture.completed(endpoint);
355             return resultFuture;
356         }
357         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
358         final HttpRoute route = poolEntry.getRoute();
359         final HttpHost host;
360         if (route.getProxyHost() != null) {
361             host = route.getProxyHost();
362         } else {
363             host = route.getTargetHost();
364         }
365         final InetSocketAddress localAddress = route.getLocalSocketAddress();
366         if (LOG.isDebugEnabled()) {
367             LOG.debug("{}: connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
368         }
369         final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
370                 connectionInitiator, host, localAddress, connectTimeout, attachment, new FutureCallback<ManagedAsyncClientConnection>() {
371 
372                     @Override
373                     public void completed(final ManagedAsyncClientConnection connection) {
374                         try {
375                             if (LOG.isDebugEnabled()) {
376                                 LOG.debug("{}: connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
377                             }
378                             poolEntry.assignConnection(connection);
379                             resultFuture.completed(internalEndpoint);
380                         } catch (final RuntimeException ex) {
381                             resultFuture.failed(ex);
382                         }
383                     }
384 
385                     @Override
386                     public void failed(final Exception ex) {
387                         resultFuture.failed(ex);
388                     }
389 
390                     @Override
391                     public void cancelled() {
392                         resultFuture.cancel();
393                     }
394 
395                 });
396         resultFuture.setDependency(connectFuture);
397         return resultFuture;
398     }
399 
400     @Override
401     public void upgrade(
402             final AsyncConnectionEndpoint endpoint,
403             final Object attachment,
404             final HttpContext context) {
405         Args.notNull(endpoint, "Managed endpoint");
406         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
407         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
408         final HttpRoute route = poolEntry.getRoute();
409         final ManagedAsyncClientConnection connection = poolEntry.getConnection();
410         connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), attachment);
411         if (LOG.isDebugEnabled()) {
412             LOG.debug("{}: upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
413         }
414     }
415 
416     @Override
417     public Set<HttpRoute> getRoutes() {
418         return pool.getRoutes();
419     }
420 
421     @Override
422     public void setMaxTotal(final int max) {
423         pool.setMaxTotal(max);
424     }
425 
426     @Override
427     public int getMaxTotal() {
428         return pool.getMaxTotal();
429     }
430 
431     @Override
432     public void setDefaultMaxPerRoute(final int max) {
433         pool.setDefaultMaxPerRoute(max);
434     }
435 
436     @Override
437     public int getDefaultMaxPerRoute() {
438         return pool.getDefaultMaxPerRoute();
439     }
440 
441     @Override
442     public void setMaxPerRoute(final HttpRoute route, final int max) {
443         pool.setMaxPerRoute(route, max);
444     }
445 
446     @Override
447     public int getMaxPerRoute(final HttpRoute route) {
448         return pool.getMaxPerRoute(route);
449     }
450 
451     @Override
452     public void closeIdle(final TimeValue idletime) {
453         pool.closeIdle(idletime);
454     }
455 
456     @Override
457     public void closeExpired() {
458         pool.closeExpired();
459     }
460 
461     @Override
462     public PoolStats getTotalStats() {
463         return pool.getTotalStats();
464     }
465 
466     @Override
467     public PoolStats getStats(final HttpRoute route) {
468         return pool.getStats(route);
469     }
470 
471     public TimeValue getValidateAfterInactivity() {
472         return validateAfterInactivity;
473     }
474 
475     /**
476      * Defines period of inactivity after which persistent connections must
477      * be re-validated prior to being {@link #lease(String, HttpRoute, Object, Timeout,
478      * FutureCallback)} leased} to the consumer. Negative values passed
479      * to this method disable connection validation. This check helps detect connections
480      * that have become stale (half-closed) while kept inactive in the pool.
481      */
482     public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
483         this.validateAfterInactivity = validateAfterInactivity;
484     }
485 
486     private static final AtomicLong COUNT = new AtomicLong(0);
487 
488     class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements Identifiable {
489 
490         private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
491         private final String id;
492 
493         InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
494             this.poolEntryRef = new AtomicReference<>(poolEntry);
495             this.id = String.format("ep-%08X", COUNT.getAndIncrement());
496         }
497 
498         @Override
499         public String getId() {
500             return id;
501         }
502 
503         PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
504             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
505             if (poolEntry == null) {
506                 throw new ConnectionShutdownException();
507             }
508             return poolEntry;
509         }
510 
511         PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
512             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
513             final ManagedAsyncClientConnection connection = poolEntry.getConnection();
514             Asserts.check(connection != null && connection.isOpen(), "Endpoint is not connected");
515             return poolEntry;
516         }
517 
518         PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
519             return poolEntryRef.getAndSet(null);
520         }
521 
522         @Override
523         public void close(final CloseMode closeMode) {
524             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
525             if (poolEntry != null) {
526                 if (LOG.isDebugEnabled()) {
527                     LOG.debug("{}: close {}", id, closeMode);
528                 }
529                 poolEntry.discardConnection(closeMode);
530             }
531         }
532 
533         @Override
534         public boolean isConnected() {
535             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
536             if (poolEntry == null) {
537                 return false;
538             }
539             final ManagedAsyncClientConnection connection = poolEntry.getConnection();
540             if (connection == null) {
541                 return false;
542             }
543             if (!connection.isOpen()) {
544                 poolEntry.discardConnection(CloseMode.IMMEDIATE);
545                 return false;
546             }
547             return true;
548         }
549 
550         @Override
551         public void setSocketTimeout(final Timeout timeout) {
552             getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
553         }
554 
555         @Override
556         public void execute(
557                 final String exchangeId,
558                 final AsyncClientExchangeHandler exchangeHandler,
559                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
560                 final HttpContext context) {
561             final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
562             if (LOG.isDebugEnabled()) {
563                 LOG.debug("{}: executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
564             }
565             connection.submitCommand(
566                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
567                     Command.Priority.NORMAL);
568         }
569 
570     }
571 
572 }