View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.bootstrap;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.util.List;
33  import java.util.Set;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.core5.annotation.Internal;
38  import org.apache.hc.core5.concurrent.BasicFuture;
39  import org.apache.hc.core5.concurrent.ComplexFuture;
40  import org.apache.hc.core5.concurrent.FutureCallback;
41  import org.apache.hc.core5.function.Callback;
42  import org.apache.hc.core5.function.Decorator;
43  import org.apache.hc.core5.http.ConnectionClosedException;
44  import org.apache.hc.core5.http.EntityDetails;
45  import org.apache.hc.core5.http.Header;
46  import org.apache.hc.core5.http.HttpConnection;
47  import org.apache.hc.core5.http.HttpException;
48  import org.apache.hc.core5.http.HttpHost;
49  import org.apache.hc.core5.http.HttpRequest;
50  import org.apache.hc.core5.http.HttpResponse;
51  import org.apache.hc.core5.http.ProtocolException;
52  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
53  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
54  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
55  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
56  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
57  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
58  import org.apache.hc.core5.http.nio.CapacityChannel;
59  import org.apache.hc.core5.http.nio.DataStreamChannel;
60  import org.apache.hc.core5.http.nio.HandlerFactory;
61  import org.apache.hc.core5.http.nio.RequestChannel;
62  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
63  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
64  import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
65  import org.apache.hc.core5.http.protocol.HttpContext;
66  import org.apache.hc.core5.http.protocol.HttpCoreContext;
67  import org.apache.hc.core5.io.CloseMode;
68  import org.apache.hc.core5.net.URIAuthority;
69  import org.apache.hc.core5.pool.ConnPoolControl;
70  import org.apache.hc.core5.pool.ManagedConnPool;
71  import org.apache.hc.core5.pool.PoolEntry;
72  import org.apache.hc.core5.pool.PoolStats;
73  import org.apache.hc.core5.reactor.Command;
74  import org.apache.hc.core5.reactor.IOEventHandler;
75  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
76  import org.apache.hc.core5.reactor.IOReactorConfig;
77  import org.apache.hc.core5.reactor.IOSession;
78  import org.apache.hc.core5.reactor.IOSessionListener;
79  import org.apache.hc.core5.util.Args;
80  import org.apache.hc.core5.util.TimeValue;
81  import org.apache.hc.core5.util.Timeout;
82  
83  /**
84   * HTTP/1.1 client side message exchange initiator.
85   *
86   * @since 5.0
87   */
88  public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
89  
90      private final ManagedConnPool<HttpHost, IOSession> connPool;
91  
92      /**
93       * Use {@link AsyncRequesterBootstrap} to create instances of this class.
94       */
95      @Internal
96      public HttpAsyncRequester(
97              final IOReactorConfig ioReactorConfig,
98              final IOEventHandlerFactory eventHandlerFactory,
99              final Decorator<IOSession> ioSessionDecorator,
100             final Callback<Exception> exceptionCallback,
101             final IOSessionListener sessionListener,
102             final ManagedConnPool<HttpHost, IOSession> connPool) {
103         super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
104                         ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
105         this.connPool = Args.notNull(connPool, "Connection pool");
106     }
107 
108     @Override
109     public PoolStats getTotalStats() {
110         return connPool.getTotalStats();
111     }
112 
113     @Override
114     public PoolStats getStats(final HttpHost route) {
115         return connPool.getStats(route);
116     }
117 
118     @Override
119     public void setMaxTotal(final int max) {
120         connPool.setMaxTotal(max);
121     }
122 
123     @Override
124     public int getMaxTotal() {
125         return connPool.getMaxTotal();
126     }
127 
128     @Override
129     public void setDefaultMaxPerRoute(final int max) {
130         connPool.setDefaultMaxPerRoute(max);
131     }
132 
133     @Override
134     public int getDefaultMaxPerRoute() {
135         return connPool.getDefaultMaxPerRoute();
136     }
137 
138     @Override
139     public void setMaxPerRoute(final HttpHost route, final int max) {
140         connPool.setMaxPerRoute(route, max);
141     }
142 
143     @Override
144     public int getMaxPerRoute(final HttpHost route) {
145         return connPool.getMaxPerRoute(route);
146     }
147 
148     @Override
149     public void closeIdle(final TimeValue idleTime) {
150         connPool.closeIdle(idleTime);
151     }
152 
153     @Override
154     public void closeExpired() {
155         connPool.closeExpired();
156     }
157 
158     @Override
159     public Set<HttpHost> getRoutes() {
160         return connPool.getRoutes();
161     }
162 
163     public Future<AsyncClientEndpoint> connect(
164             final HttpHost host,
165             final Timeout timeout,
166             final Object attachment,
167             final FutureCallback<AsyncClientEndpoint> callback) {
168         return doConnect(host, timeout, attachment, callback);
169     }
170 
171     protected Future<AsyncClientEndpoint> doConnect(
172             final HttpHost host,
173             final Timeout timeout,
174             final Object attachment,
175             final FutureCallback<AsyncClientEndpoint> callback) {
176         Args.notNull(host, "Host");
177         Args.notNull(timeout, "Timeout");
178         final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
179         final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
180                 host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
181 
182             @Override
183             public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
184                 final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
185                 final IOSession ioSession = poolEntry.getConnection();
186                 if (ioSession != null && !ioSession.isOpen()) {
187                     poolEntry.discardConnection(CloseMode.IMMEDIATE);
188                 }
189                 if (poolEntry.hasConnection()) {
190                     resultFuture.completed(endpoint);
191                 } else {
192                     final Future<IOSession> futute = requestSession(host, timeout, attachment, new FutureCallback<IOSession>() {
193 
194                         @Override
195                         public void completed(final IOSession session) {
196                             session.setSocketTimeout(timeout);
197                             poolEntry.assignConnection(session);
198                             resultFuture.completed(endpoint);
199                         }
200 
201                         @Override
202                         public void failed(final Exception cause) {
203                             try {
204                                 resultFuture.failed(cause);
205                             } finally {
206                                 endpoint.releaseAndDiscard();
207                             }
208                         }
209 
210                         @Override
211                         public void cancelled() {
212                             try {
213                                 resultFuture.cancel();
214                             } finally {
215                                 endpoint.releaseAndDiscard();
216                             }
217                         }
218 
219                     });
220                     resultFuture.setDependency(futute);
221                 }
222             }
223 
224             @Override
225             public void failed(final Exception ex) {
226                 resultFuture.failed(ex);
227             }
228 
229             @Override
230             public void cancelled() {
231                 resultFuture.cancel();
232             }
233 
234         });
235         resultFuture.setDependency(leaseFuture);
236         return resultFuture;
237     }
238 
239     public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) {
240         return connect(host, timeout, null, null);
241     }
242 
243     public void execute(
244             final AsyncClientExchangeHandler exchangeHandler,
245             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
246             final Timeout timeout,
247             final HttpContext executeContext) {
248         Args.notNull(exchangeHandler, "Exchange handler");
249         Args.notNull(timeout, "Timeout");
250         Args.notNull(executeContext, "Context");
251         try {
252             exchangeHandler.produceRequest(new RequestChannel() {
253 
254                 @Override
255                 public void sendRequest(
256                         final HttpRequest request,
257                         final EntityDetails entityDetails, final HttpContext requestContext) throws HttpException, IOException {
258                     final String scheme = request.getScheme();
259                     final URIAuthority authority = request.getAuthority();
260                     if (authority == null) {
261                         throw new ProtocolException("Request authority not specified");
262                     }
263                     final HttpHostttp/HttpHost.html#HttpHost">HttpHost target = new HttpHost(scheme, authority);
264                     connect(target, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
265 
266                         @Override
267                         public void completed(final AsyncClientEndpoint endpoint) {
268                             endpoint.execute(new AsyncClientExchangeHandler() {
269 
270                                 @Override
271                                 public void releaseResources() {
272                                     endpoint.releaseAndDiscard();
273                                     exchangeHandler.releaseResources();
274                                 }
275 
276                                 @Override
277                                 public void failed(final Exception cause) {
278                                     endpoint.releaseAndDiscard();
279                                     exchangeHandler.failed(cause);
280                                 }
281 
282                                 @Override
283                                 public void cancel() {
284                                     endpoint.releaseAndDiscard();
285                                     exchangeHandler.cancel();
286                                 }
287 
288                                 @Override
289                                 public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
290                                     channel.sendRequest(request, entityDetails, httpContext);
291                                 }
292 
293                                 @Override
294                                 public int available() {
295                                     return exchangeHandler.available();
296                                 }
297 
298                                 @Override
299                                 public void produce(final DataStreamChannel channel) throws IOException {
300                                     exchangeHandler.produce(channel);
301                                 }
302 
303                                 @Override
304                                 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
305                                     exchangeHandler.consumeInformation(response, httpContext);
306                                 }
307 
308                                 @Override
309                                 public void consumeResponse(
310                                         final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
311                                     if (entityDetails == null) {
312                                         endpoint.releaseAndReuse();
313                                     }
314                                     exchangeHandler.consumeResponse(response, entityDetails, httpContext);
315                                 }
316 
317                                 @Override
318                                 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
319                                     exchangeHandler.updateCapacity(capacityChannel);
320                                 }
321 
322                                 @Override
323                                 public void consume(final ByteBuffer src) throws IOException {
324                                     exchangeHandler.consume(src);
325                                 }
326 
327                                 @Override
328                                 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
329                                     endpoint.releaseAndReuse();
330                                     exchangeHandler.streamEnd(trailers);
331                                 }
332 
333                             }, pushHandlerFactory, executeContext);
334 
335                         }
336 
337                         @Override
338                         public void failed(final Exception ex) {
339                             exchangeHandler.failed(ex);
340                         }
341 
342                         @Override
343                         public void cancelled() {
344                             exchangeHandler.cancel();
345                         }
346 
347                     });
348 
349                 }
350 
351             }, executeContext);
352 
353         } catch (final IOException | HttpException ex) {
354             exchangeHandler.failed(ex);
355         }
356     }
357 
358     public void execute(
359             final AsyncClientExchangeHandler exchangeHandler,
360             final Timeout timeout,
361             final HttpContext executeContext) {
362         execute(exchangeHandler, null, timeout, executeContext);
363     }
364 
365     public final <T> Future<T> execute(
366             final AsyncRequestProducer requestProducer,
367             final AsyncResponseConsumer<T> responseConsumer,
368             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
369             final Timeout timeout,
370             final HttpContext context,
371             final FutureCallback<T> callback) {
372         Args.notNull(requestProducer, "Request producer");
373         Args.notNull(responseConsumer, "Response consumer");
374         Args.notNull(timeout, "Timeout");
375         final BasicFuture<T> future = new BasicFuture<>(callback);
376         final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
377 
378             @Override
379             public void completed(final T result) {
380                 future.completed(result);
381             }
382 
383             @Override
384             public void failed(final Exception ex) {
385                 future.failed(ex);
386             }
387 
388             @Override
389             public void cancelled() {
390                 future.cancel();
391             }
392 
393         });
394         execute(exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
395         return future;
396     }
397 
398     public final <T> Future<T> execute(
399             final AsyncRequestProducer requestProducer,
400             final AsyncResponseConsumer<T> responseConsumer,
401             final Timeout timeout,
402             final HttpContext context,
403             final FutureCallback<T> callback) {
404         return execute(requestProducer, responseConsumer, null, timeout, context, callback);
405     }
406 
407     public final <T> Future<T> execute(
408             final AsyncRequestProducer requestProducer,
409             final AsyncResponseConsumer<T> responseConsumer,
410             final Timeout timeout,
411             final FutureCallback<T> callback) {
412         return execute(requestProducer, responseConsumer, null, timeout, null, callback);
413     }
414 
415     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
416 
417         final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
418 
419         InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
420             this.poolEntryRef = new AtomicReference<>(poolEntry);
421         }
422 
423         @Override
424         public void execute(
425                 final AsyncClientExchangeHandler exchangeHandler,
426                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
427                 final HttpContext context) {
428             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
429             if (poolEntry == null) {
430                 throw new IllegalStateException("Endpoint has already been released");
431             }
432             final IOSession ioSession = poolEntry.getConnection();
433             if (ioSession == null) {
434                 throw new IllegalStateException("I/O session is invalid");
435             }
436             ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
437             if (!ioSession.isOpen()) {
438                 try {
439                     exchangeHandler.failed(new ConnectionClosedException());
440                 } finally {
441                     exchangeHandler.releaseResources();
442                 }
443             }
444         }
445 
446         @Override
447         public boolean isConnected() {
448             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
449             if (poolEntry != null) {
450                 final IOSession ioSession = poolEntry.getConnection();
451                 if (ioSession == null || !ioSession.isOpen()) {
452                     return false;
453                 }
454                 final IOEventHandler handler = ioSession.getHandler();
455                 return (handler instanceof HttpConnection../../../org/apache/hc/core5/http/HttpConnection.html#HttpConnection">HttpConnection) && ((HttpConnection) handler).isOpen();
456             }
457             return false;
458         }
459 
460         @Override
461         public void releaseAndReuse() {
462             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
463             if (poolEntry != null) {
464                 final IOSession ioSession = poolEntry.getConnection();
465                 connPool.release(poolEntry, ioSession != null && ioSession.isOpen());
466             }
467         }
468 
469         @Override
470         public void releaseAndDiscard() {
471             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
472             if (poolEntry != null) {
473                 poolEntry.discardConnection(CloseMode.GRACEFUL);
474                 connPool.release(poolEntry, false);
475             }
476         }
477 
478     }
479 
480 }