View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.bootstrap;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.util.List;
33  import java.util.Set;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.core5.annotation.Internal;
38  import org.apache.hc.core5.concurrent.BasicFuture;
39  import org.apache.hc.core5.concurrent.CallbackContribution;
40  import org.apache.hc.core5.concurrent.ComplexFuture;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.concurrent.FutureContribution;
43  import org.apache.hc.core5.function.Callback;
44  import org.apache.hc.core5.function.Decorator;
45  import org.apache.hc.core5.http.ConnectionClosedException;
46  import org.apache.hc.core5.http.EntityDetails;
47  import org.apache.hc.core5.http.Header;
48  import org.apache.hc.core5.http.HttpConnection;
49  import org.apache.hc.core5.http.HttpException;
50  import org.apache.hc.core5.http.HttpHost;
51  import org.apache.hc.core5.http.HttpResponse;
52  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
53  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
54  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
55  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
56  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
57  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
58  import org.apache.hc.core5.http.nio.CapacityChannel;
59  import org.apache.hc.core5.http.nio.DataStreamChannel;
60  import org.apache.hc.core5.http.nio.HandlerFactory;
61  import org.apache.hc.core5.http.nio.RequestChannel;
62  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
63  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
64  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
65  import org.apache.hc.core5.http.nio.ssl.TlsUpgradeCapable;
66  import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
67  import org.apache.hc.core5.http.protocol.HttpContext;
68  import org.apache.hc.core5.http.protocol.HttpCoreContext;
69  import org.apache.hc.core5.io.CloseMode;
70  import org.apache.hc.core5.net.NamedEndpoint;
71  import org.apache.hc.core5.net.URIAuthority;
72  import org.apache.hc.core5.pool.ConnPoolControl;
73  import org.apache.hc.core5.pool.ManagedConnPool;
74  import org.apache.hc.core5.pool.PoolEntry;
75  import org.apache.hc.core5.pool.PoolStats;
76  import org.apache.hc.core5.reactor.Command;
77  import org.apache.hc.core5.reactor.EndpointParameters;
78  import org.apache.hc.core5.reactor.IOEventHandler;
79  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
80  import org.apache.hc.core5.reactor.IOReactorConfig;
81  import org.apache.hc.core5.reactor.IOSession;
82  import org.apache.hc.core5.reactor.IOSessionListener;
83  import org.apache.hc.core5.reactor.ProtocolIOSession;
84  import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
85  import org.apache.hc.core5.util.Args;
86  import org.apache.hc.core5.util.TimeValue;
87  import org.apache.hc.core5.util.Timeout;
88  
89  /**
90   * HTTP/1.1 client side message exchange initiator.
91   *
92   * @since 5.0
93   */
94  public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
95  
96      private final ManagedConnPool<HttpHost, IOSession> connPool;
97      private final TlsStrategy tlsStrategy;
98      private final Timeout handshakeTimeout;
99  
100     /**
101      * Use {@link AsyncRequesterBootstrap} to create instances of this class.
102      *
103      * @since 5.2
104      */
105     @Internal
106     public HttpAsyncRequester(
107             final IOReactorConfig ioReactorConfig,
108             final IOEventHandlerFactory eventHandlerFactory,
109             final Decorator<IOSession> ioSessionDecorator,
110             final Callback<Exception> exceptionCallback,
111             final IOSessionListener sessionListener,
112             final ManagedConnPool<HttpHost, IOSession> connPool,
113             final TlsStrategy tlsStrategy,
114             final Timeout handshakeTimeout) {
115         super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
116                 ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
117         this.connPool = Args.notNull(connPool, "Connection pool");
118         this.tlsStrategy = tlsStrategy;
119         this.handshakeTimeout = handshakeTimeout;
120     }
121 
122     /**
123      * Use {@link AsyncRequesterBootstrap} to create instances of this class.
124      */
125     @Internal
126     public HttpAsyncRequester(
127             final IOReactorConfig ioReactorConfig,
128             final IOEventHandlerFactory eventHandlerFactory,
129             final Decorator<IOSession> ioSessionDecorator,
130             final Callback<Exception> exceptionCallback,
131             final IOSessionListener sessionListener,
132             final ManagedConnPool<HttpHost, IOSession> connPool) {
133         this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
134                 null, null);
135     }
136 
137     @Override
138     public PoolStats getTotalStats() {
139         return connPool.getTotalStats();
140     }
141 
142     @Override
143     public PoolStats getStats(final HttpHost route) {
144         return connPool.getStats(route);
145     }
146 
147     @Override
148     public void setMaxTotal(final int max) {
149         connPool.setMaxTotal(max);
150     }
151 
152     @Override
153     public int getMaxTotal() {
154         return connPool.getMaxTotal();
155     }
156 
157     @Override
158     public void setDefaultMaxPerRoute(final int max) {
159         connPool.setDefaultMaxPerRoute(max);
160     }
161 
162     @Override
163     public int getDefaultMaxPerRoute() {
164         return connPool.getDefaultMaxPerRoute();
165     }
166 
167     @Override
168     public void setMaxPerRoute(final HttpHost route, final int max) {
169         connPool.setMaxPerRoute(route, max);
170     }
171 
172     @Override
173     public int getMaxPerRoute(final HttpHost route) {
174         return connPool.getMaxPerRoute(route);
175     }
176 
177     @Override
178     public void closeIdle(final TimeValue idleTime) {
179         connPool.closeIdle(idleTime);
180     }
181 
182     @Override
183     public void closeExpired() {
184         connPool.closeExpired();
185     }
186 
187     @Override
188     public Set<HttpHost> getRoutes() {
189         return connPool.getRoutes();
190     }
191 
192     public Future<AsyncClientEndpoint> connect(
193             final HttpHost host,
194             final Timeout timeout,
195             final Object attachment,
196             final FutureCallback<AsyncClientEndpoint> callback) {
197         return doConnect(host, timeout, attachment, callback);
198     }
199 
200     protected Future<AsyncClientEndpoint> doConnect(
201             final HttpHost host,
202             final Timeout timeout,
203             final Object attachment,
204             final FutureCallback<AsyncClientEndpoint> callback) {
205         Args.notNull(host, "Host");
206         Args.notNull(timeout, "Timeout");
207         final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
208         final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
209                 host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
210 
211                     @Override
212                     public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
213                         final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
214                         final IOSession ioSession = poolEntry.getConnection();
215                         if (ioSession != null && !ioSession.isOpen()) {
216                             poolEntry.discardConnection(CloseMode.IMMEDIATE);
217                         }
218                         if (poolEntry.hasConnection()) {
219                             resultFuture.completed(endpoint);
220                         } else {
221                             final Future<IOSession> future = requestSession(
222                                     host,
223                                     timeout,
224                                     new EndpointParameters(host, attachment),
225                                     new FutureCallback<IOSession>() {
226 
227                                         @Override
228                                         public void completed(final IOSession session) {
229                                             session.setSocketTimeout(timeout);
230                                             poolEntry.assignConnection(session);
231                                             resultFuture.completed(endpoint);
232                                         }
233 
234                                         @Override
235                                         public void failed(final Exception cause) {
236                                             try {
237                                                 resultFuture.failed(cause);
238                                             } finally {
239                                                 endpoint.releaseAndDiscard();
240                                             }
241                                         }
242 
243                                         @Override
244                                         public void cancelled() {
245                                             try {
246                                                 resultFuture.cancel();
247                                             } finally {
248                                                 endpoint.releaseAndDiscard();
249                                             }
250                                         }
251 
252                                     });
253                             resultFuture.setDependency(future);
254                         }
255                     }
256 
257                     @Override
258                     public void failed(final Exception ex) {
259                         resultFuture.failed(ex);
260                     }
261 
262                     @Override
263                     public void cancelled() {
264                         resultFuture.cancel();
265                     }
266 
267                 });
268         resultFuture.setDependency(leaseFuture);
269         return resultFuture;
270     }
271 
272     public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) {
273         return connect(host, timeout, null, null);
274     }
275 
276     /**
277      * @since 5.3
278      */
279     public void execute(
280             final HttpHost target,
281             final AsyncClientExchangeHandler exchangeHandler,
282             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
283             final Timeout timeout,
284             final HttpContext executeContext) {
285         Args.notNull(exchangeHandler, "Exchange handler");
286         Args.notNull(timeout, "Timeout");
287         Args.notNull(executeContext, "Context");
288         try {
289             exchangeHandler.produceRequest((request, entityDetails, requestContext) -> {
290                 final HttpHost host = target != null ? target : defaultTarget(request);
291                 if (request.getAuthority() == null) {
292                     request.setAuthority(new URIAuthority(host.getHostName(), host.getPort()));
293                 }
294                 connect(host, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
295 
296                     @Override
297                     public void completed(final AsyncClientEndpoint endpoint) {
298                         endpoint.execute(new AsyncClientExchangeHandler() {
299 
300                             @Override
301                             public void releaseResources() {
302                                 endpoint.releaseAndDiscard();
303                                 exchangeHandler.releaseResources();
304                             }
305 
306                             @Override
307                             public void failed(final Exception cause) {
308                                 endpoint.releaseAndDiscard();
309                                 exchangeHandler.failed(cause);
310                             }
311 
312                             @Override
313                             public void cancel() {
314                                 endpoint.releaseAndDiscard();
315                                 exchangeHandler.cancel();
316                             }
317 
318                             @Override
319                             public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
320                                 channel.sendRequest(request, entityDetails, httpContext);
321                             }
322 
323                             @Override
324                             public int available() {
325                                 return exchangeHandler.available();
326                             }
327 
328                             @Override
329                             public void produce(final DataStreamChannel channel) throws IOException {
330                                 exchangeHandler.produce(channel);
331                             }
332 
333                             @Override
334                             public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
335                                 exchangeHandler.consumeInformation(response, httpContext);
336                             }
337 
338                             @Override
339                             public void consumeResponse(
340                                     final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
341                                 if (entityDetails == null) {
342                                     endpoint.releaseAndReuse();
343                                 }
344                                 exchangeHandler.consumeResponse(response, entityDetails, httpContext);
345                             }
346 
347                             @Override
348                             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
349                                 exchangeHandler.updateCapacity(capacityChannel);
350                             }
351 
352                             @Override
353                             public void consume(final ByteBuffer src) throws IOException {
354                                 exchangeHandler.consume(src);
355                             }
356 
357                             @Override
358                             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
359                                 endpoint.releaseAndReuse();
360                                 exchangeHandler.streamEnd(trailers);
361                             }
362 
363                         }, pushHandlerFactory, executeContext);
364 
365                     }
366 
367                     @Override
368                     public void failed(final Exception ex) {
369                         exchangeHandler.failed(ex);
370                     }
371 
372                     @Override
373                     public void cancelled() {
374                         exchangeHandler.cancel();
375                     }
376 
377                 });
378 
379             }, executeContext);
380 
381         } catch (final IOException | HttpException ex) {
382             exchangeHandler.failed(ex);
383         }
384     }
385 
386     public void execute(
387             final AsyncClientExchangeHandler exchangeHandler,
388             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
389             final Timeout timeout,
390             final HttpContext executeContext) {
391         execute(null, exchangeHandler, pushHandlerFactory, timeout, executeContext);
392     }
393 
394     public void execute(
395             final AsyncClientExchangeHandler exchangeHandler,
396             final Timeout timeout,
397             final HttpContext executeContext) {
398         execute(exchangeHandler, null, timeout, executeContext);
399     }
400 
401     /**
402      * @since 5.3
403      */
404     public final <T> Future<T> execute(
405             final HttpHost target,
406             final AsyncRequestProducer requestProducer,
407             final AsyncResponseConsumer<T> responseConsumer,
408             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
409             final Timeout timeout,
410             final HttpContext context,
411             final FutureCallback<T> callback) {
412         Args.notNull(requestProducer, "Request producer");
413         Args.notNull(responseConsumer, "Response consumer");
414         Args.notNull(timeout, "Timeout");
415         final BasicFuture<T> future = new BasicFuture<>(callback);
416         final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(
417                 requestProducer,
418                 responseConsumer,
419                 new FutureContribution<T>(future) {
420 
421                     @Override
422                     public void completed(final T result) {
423                         future.completed(result);
424                     }
425 
426                 });
427         execute(target, exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
428         return future;
429     }
430 
431     public final <T> Future<T> execute(
432             final AsyncRequestProducer requestProducer,
433             final AsyncResponseConsumer<T> responseConsumer,
434             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
435             final Timeout timeout,
436             final HttpContext context,
437             final FutureCallback<T> callback) {
438         return execute(null, requestProducer, responseConsumer, pushHandlerFactory, timeout, context, callback);
439     }
440 
441     public final <T> Future<T> execute(
442             final AsyncRequestProducer requestProducer,
443             final AsyncResponseConsumer<T> responseConsumer,
444             final Timeout timeout,
445             final HttpContext context,
446             final FutureCallback<T> callback) {
447         return execute(requestProducer, responseConsumer, null, timeout, context, callback);
448     }
449 
450     /**
451      * @since 5.3
452      */
453     public final <T> Future<T> execute(
454             final HttpHost target,
455             final AsyncRequestProducer requestProducer,
456             final AsyncResponseConsumer<T> responseConsumer,
457             final Timeout timeout,
458             final FutureCallback<T> callback) {
459         return execute(target, requestProducer, responseConsumer, null, timeout, null, callback);
460     }
461 
462     public final <T> Future<T> execute(
463             final AsyncRequestProducer requestProducer,
464             final AsyncResponseConsumer<T> responseConsumer,
465             final Timeout timeout,
466             final FutureCallback<T> callback) {
467         return execute(requestProducer, responseConsumer, null, timeout, null, callback);
468     }
469 
470     protected void doTlsUpgrade(
471             final ProtocolIOSession ioSession,
472             final NamedEndpoint endpoint,
473             final FutureCallback<ProtocolIOSession> callback) {
474         if (tlsStrategy != null) {
475             tlsStrategy.upgrade(ioSession,
476                     endpoint,
477                     null,
478                     handshakeTimeout,
479                     new CallbackContribution<TransportSecurityLayer>(callback) {
480 
481                         @Override
482                         public void completed(final TransportSecurityLayer transportSecurityLayer) {
483                             if (callback != null) {
484                                 callback.completed(ioSession);
485                             }
486                         }
487 
488                     });
489         } else {
490             throw new IllegalStateException("TLS upgrade not supported");
491         }
492     }
493 
494     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint implements TlsUpgradeCapable {
495 
496         final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
497 
498         InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
499             this.poolEntryRef = new AtomicReference<>(poolEntry);
500         }
501 
502         private IOSession getIOSession() {
503             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
504             if (poolEntry == null) {
505                 throw new IllegalStateException("Endpoint has already been released");
506             }
507             final IOSession ioSession = poolEntry.getConnection();
508             if (ioSession == null) {
509                 throw new IllegalStateException("I/O session is invalid");
510             }
511             return ioSession;
512         }
513 
514         @Override
515         public void execute(
516                 final AsyncClientExchangeHandler exchangeHandler,
517                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
518                 final HttpContext context) {
519             final IOSession ioSession = getIOSession();
520             ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
521             if (!ioSession.isOpen()) {
522                 try {
523                     exchangeHandler.failed(new ConnectionClosedException());
524                 } finally {
525                     exchangeHandler.releaseResources();
526                 }
527             }
528         }
529 
530         @Override
531         public boolean isConnected() {
532             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
533             if (poolEntry != null) {
534                 final IOSession ioSession = poolEntry.getConnection();
535                 if (ioSession == null || !ioSession.isOpen()) {
536                     return false;
537                 }
538                 final IOEventHandler handler = ioSession.getHandler();
539                 return (handler instanceof HttpConnection) && ((HttpConnection) handler).isOpen();
540             }
541             return false;
542         }
543 
544         @Override
545         public void releaseAndReuse() {
546             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
547             if (poolEntry != null) {
548                 final IOSession ioSession = poolEntry.getConnection();
549                 connPool.release(poolEntry, ioSession != null && ioSession.isOpen());
550             }
551         }
552 
553         @Override
554         public void releaseAndDiscard() {
555             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
556             if (poolEntry != null) {
557                 poolEntry.discardConnection(CloseMode.GRACEFUL);
558                 connPool.release(poolEntry, false);
559             }
560         }
561 
562         @Override
563         public void tlsUpgrade(final NamedEndpoint endpoint, final FutureCallback<ProtocolIOSession> callback) {
564             final IOSession ioSession = getIOSession();
565             if (ioSession instanceof ProtocolIOSession) {
566                 doTlsUpgrade((ProtocolIOSession) ioSession, endpoint, callback);
567             } else {
568                 throw new IllegalStateException("TLS upgrade not supported");
569             }
570         }
571     }
572 
573 }