View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.CancellationException;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.ThreadFactory;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import org.apache.hc.client5.http.HttpRoute;
39  import org.apache.hc.client5.http.SchemePortResolver;
40  import org.apache.hc.client5.http.config.Configurable;
41  import org.apache.hc.client5.http.config.RequestConfig;
42  import org.apache.hc.client5.http.impl.ConnPoolSupport;
43  import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
44  import org.apache.hc.client5.http.impl.ExecSupport;
45  import org.apache.hc.client5.http.impl.classic.RequestFailedException;
46  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
47  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
48  import org.apache.hc.client5.http.protocol.HttpClientContext;
49  import org.apache.hc.client5.http.routing.RoutingSupport;
50  import org.apache.hc.core5.annotation.Contract;
51  import org.apache.hc.core5.annotation.ThreadingBehavior;
52  import org.apache.hc.core5.concurrent.BasicFuture;
53  import org.apache.hc.core5.concurrent.Cancellable;
54  import org.apache.hc.core5.concurrent.ComplexCancellable;
55  import org.apache.hc.core5.concurrent.ComplexFuture;
56  import org.apache.hc.core5.concurrent.FutureCallback;
57  import org.apache.hc.core5.function.Callback;
58  import org.apache.hc.core5.http.EntityDetails;
59  import org.apache.hc.core5.http.Header;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpRequest;
63  import org.apache.hc.core5.http.HttpResponse;
64  import org.apache.hc.core5.http.HttpStatus;
65  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
66  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68  import org.apache.hc.core5.http.nio.CapacityChannel;
69  import org.apache.hc.core5.http.nio.DataStreamChannel;
70  import org.apache.hc.core5.http.nio.HandlerFactory;
71  import org.apache.hc.core5.http.nio.RequestChannel;
72  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
73  import org.apache.hc.core5.http.protocol.HttpContext;
74  import org.apache.hc.core5.http2.HttpVersionPolicy;
75  import org.apache.hc.core5.io.CloseMode;
76  import org.apache.hc.core5.io.Closer;
77  import org.apache.hc.core5.reactor.Command;
78  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
80  import org.apache.hc.core5.reactor.IOReactorConfig;
81  import org.apache.hc.core5.reactor.IOSession;
82  import org.apache.hc.core5.util.Args;
83  import org.apache.hc.core5.util.Asserts;
84  import org.apache.hc.core5.util.TimeValue;
85  import org.apache.hc.core5.util.Timeout;
86  import org.slf4j.Logger;
87  import org.slf4j.LoggerFactory;
88  
89  /**
90   * Minimal implementation of {@link CloseableHttpAsyncClient}. This client is
91   * optimized for HTTP/1.1 and HTTP/2 message transport and does not support
92   * advanced HTTP protocol functionality such as request execution via a proxy,
93   * state management, authentication and request redirects.
94   * <p>
95   * Concurrent message exchanges executed by this client will get assigned to
96   * separate connections leased from the connection pool.
97   * </p>
98   *
99   * @since 5.0
100  */
101 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
102 public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
103 
104     private static final Logger LOG = LoggerFactory.getLogger(MinimalHttpAsyncClient.class);
105     private final AsyncClientConnectionManager manager;
106     private final SchemePortResolver schemePortResolver;
107     private final HttpVersionPolicy versionPolicy;
108 
109     MinimalHttpAsyncClient(
110             final IOEventHandlerFactory eventHandlerFactory,
111             final AsyncPushConsumerRegistry pushConsumerRegistry,
112             final HttpVersionPolicy versionPolicy,
113             final IOReactorConfig reactorConfig,
114             final ThreadFactory threadFactory,
115             final ThreadFactory workerThreadFactory,
116             final AsyncClientConnectionManager manager,
117             final SchemePortResolver schemePortResolver) {
118         super(new DefaultConnectingIOReactor(
119                 eventHandlerFactory,
120                 reactorConfig,
121                 workerThreadFactory,
122                 LoggingIOSessionDecorator.INSTANCE,
123                 LoggingExceptionCallback.INSTANCE,
124                 null,
125                 new Callback<IOSession>() {
126 
127                     @Override
128                     public void execute(final IOSession ioSession) {
129                         ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL);
130                     }
131 
132                 }),
133                 pushConsumerRegistry,
134                 threadFactory);
135         this.manager = manager;
136         this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
137         this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
138     }
139 
140     private Future<AsyncConnectionEndpoint> leaseEndpoint(
141             final HttpHost host,
142             final Timeout connectionRequestTimeout,
143             final Timeout connectTimeout,
144             final HttpClientContext clientContext,
145             final FutureCallback<AsyncConnectionEndpoint> callback) {
146         final HttpRoute/http/HttpRoute.html#HttpRoute">HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
147         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
148         final String exchangeId = ExecSupport.getNextExchangeId();
149         final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
150                 exchangeId,
151                 route,
152                 null,
153                 connectionRequestTimeout,
154                 new FutureCallback<AsyncConnectionEndpoint>() {
155 
156                     @Override
157                     public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
158                         if (connectionEndpoint.isConnected()) {
159                             resultFuture.completed(connectionEndpoint);
160                         } else {
161                             final Future<AsyncConnectionEndpoint> connectFuture = manager.connect(
162                                     connectionEndpoint,
163                                     getConnectionInitiator(),
164                                     connectTimeout,
165                                     versionPolicy,
166                                     clientContext,
167                                     new FutureCallback<AsyncConnectionEndpoint>() {
168 
169                                         @Override
170                                         public void completed(final AsyncConnectionEndpoint result) {
171                                             resultFuture.completed(result);
172                                         }
173 
174                                         @Override
175                                         public void failed(final Exception ex) {
176                                             resultFuture.failed(ex);
177                                         }
178 
179                                         @Override
180                                         public void cancelled() {
181                                             resultFuture.cancel(true);
182                                         }
183 
184                                     });
185                             resultFuture.setDependency(connectFuture);
186                         }
187                     }
188 
189                     @Override
190                     public void failed(final Exception ex) {
191                         callback.failed(ex);
192                     }
193 
194                     @Override
195                     public void cancelled() {
196                         callback.cancelled();
197                     }
198 
199                 });
200         resultFuture.setDependency(leaseFuture);
201         return resultFuture;
202     }
203 
204     public final Future<AsyncClientEndpoint> lease(
205             final HttpHost host,
206             final FutureCallback<AsyncClientEndpoint> callback) {
207         return lease(host, HttpClientContext.create(), callback);
208     }
209 
210     public Future<AsyncClientEndpoint> lease(
211             final HttpHost host,
212             final HttpContext context,
213             final FutureCallback<AsyncClientEndpoint> callback) {
214         Args.notNull(host, "Host");
215         Args.notNull(context, "HTTP context");
216         final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
217         if (!isRunning()) {
218             future.failed(new CancellationException("Connection lease cancelled"));
219             return future;
220         }
221         final HttpClientContext clientContext = HttpClientContext.adapt(context);
222         final RequestConfig requestConfig = clientContext.getRequestConfig();
223         final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
224         final Timeout connectTimeout = requestConfig.getConnectTimeout();
225         leaseEndpoint(
226                 host,
227                 connectionRequestTimeout,
228                 connectTimeout,
229                 clientContext,
230                 new FutureCallback<AsyncConnectionEndpoint>() {
231 
232                     @Override
233                     public void completed(final AsyncConnectionEndpoint result) {
234                         future.completed(new InternalAsyncClientEndpoint(result));
235                     }
236 
237                     @Override
238                     public void failed(final Exception ex) {
239                         future.failed(ex);
240                     }
241 
242                     @Override
243                     public void cancelled() {
244                         future.cancel(true);
245                     }
246 
247                 });
248         return future;
249     }
250 
251     @Override
252     public Cancellable execute(
253             final AsyncClientExchangeHandler exchangeHandler,
254             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
255             final HttpContext context) {
256         final ComplexCancellable cancellable = new ComplexCancellable();
257         try {
258             if (!isRunning()) {
259                 throw new CancellationException("Request execution cancelled");
260             }
261             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
262             exchangeHandler.produceRequest(new RequestChannel() {
263 
264                 @Override
265                 public void sendRequest(
266                         final HttpRequest request,
267                         final EntityDetails entityDetails,
268                         final HttpContext context) throws HttpException, IOException {
269                     RequestConfig requestConfig = null;
270                     if (request instanceof Configurable) {
271                         requestConfig = ((Configurable) request).getConfig();
272                     }
273                     if (requestConfig != null) {
274                         clientContext.setRequestConfig(requestConfig);
275                     } else {
276                         requestConfig = clientContext.getRequestConfig();
277                     }
278                     final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
279                     final Timeout connectTimeout = requestConfig.getConnectTimeout();
280                     final Timeout responseTimeout = requestConfig.getResponseTimeout();
281                     final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
282 
283                     final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
284                             target,
285                             connectionRequestTimeout,
286                             connectTimeout,
287                             clientContext,
288                             new FutureCallback<AsyncConnectionEndpoint>() {
289 
290                                 @Override
291                                 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
292                                     final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
293                                     final AtomicInteger messageCountDown = new AtomicInteger(2);
294                                     final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
295 
296                                         @Override
297                                         public void releaseResources() {
298                                             try {
299                                                 exchangeHandler.releaseResources();
300                                             } finally {
301                                                 endpoint.releaseAndDiscard();
302                                             }
303                                         }
304 
305                                         @Override
306                                         public void failed(final Exception cause) {
307                                             try {
308                                                 exchangeHandler.failed(cause);
309                                             } finally {
310                                                 endpoint.releaseAndDiscard();
311                                             }
312                                         }
313 
314                                         @Override
315                                         public void cancel() {
316                                             failed(new RequestFailedException("Request aborted"));
317                                         }
318 
319                                         @Override
320                                         public void produceRequest(
321                                                 final RequestChannel channel,
322                                                 final HttpContext context) throws HttpException, IOException {
323                                             channel.sendRequest(request, entityDetails, context);
324                                             if (entityDetails == null) {
325                                                 messageCountDown.decrementAndGet();
326                                             }
327                                         }
328 
329                                         @Override
330                                         public int available() {
331                                             return exchangeHandler.available();
332                                         }
333 
334                                         @Override
335                                         public void produce(final DataStreamChannel channel) throws IOException {
336                                             exchangeHandler.produce(new DataStreamChannel() {
337 
338                                                 @Override
339                                                 public void requestOutput() {
340                                                     channel.requestOutput();
341                                                 }
342 
343                                                 @Override
344                                                 public int write(final ByteBuffer src) throws IOException {
345                                                     return channel.write(src);
346                                                 }
347 
348                                                 @Override
349                                                 public void endStream(final List<? extends Header> trailers) throws IOException {
350                                                     channel.endStream(trailers);
351                                                     if (messageCountDown.decrementAndGet() <= 0) {
352                                                         endpoint.releaseAndReuse();
353                                                     }
354                                                 }
355 
356                                                 @Override
357                                                 public void endStream() throws IOException {
358                                                     channel.endStream();
359                                                     if (messageCountDown.decrementAndGet() <= 0) {
360                                                         endpoint.releaseAndReuse();
361                                                     }
362                                                 }
363 
364                                             });
365                                         }
366 
367                                         @Override
368                                         public void consumeInformation(
369                                                 final HttpResponse response,
370                                                 final HttpContext context) throws HttpException, IOException {
371                                             exchangeHandler.consumeInformation(response, context);
372                                         }
373 
374                                         @Override
375                                         public void consumeResponse(
376                                                 final HttpResponse response,
377                                                 final EntityDetails entityDetails,
378                                                 final HttpContext context) throws HttpException, IOException {
379                                             exchangeHandler.consumeResponse(response, entityDetails, context);
380                                             if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
381                                                 messageCountDown.decrementAndGet();
382                                             }
383                                             if (entityDetails == null) {
384                                                 if (messageCountDown.decrementAndGet() <= 0) {
385                                                     endpoint.releaseAndReuse();
386                                                 }
387                                             }
388                                         }
389 
390                                         @Override
391                                         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
392                                             exchangeHandler.updateCapacity(capacityChannel);
393                                         }
394 
395                                         @Override
396                                         public void consume(final ByteBuffer src) throws IOException {
397                                             exchangeHandler.consume(src);
398                                         }
399 
400                                         @Override
401                                         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
402                                             if (messageCountDown.decrementAndGet() <= 0) {
403                                                 endpoint.releaseAndReuse();
404                                             }
405                                             exchangeHandler.streamEnd(trailers);
406                                         }
407 
408                                     };
409                                     if (responseTimeout != null) {
410                                         endpoint.setSocketTimeout(responseTimeout);
411                                     }
412                                     endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
413                                 }
414 
415                                 @Override
416                                 public void failed(final Exception ex) {
417                                     exchangeHandler.failed(ex);
418                                 }
419 
420                                 @Override
421                                 public void cancelled() {
422                                     exchangeHandler.cancel();
423                                 }
424 
425                             });
426 
427                     cancellable.setDependency(new Cancellable() {
428 
429                         @Override
430                         public boolean cancel() {
431                             return leaseFuture.cancel(true);
432                         }
433 
434                     });
435                 }
436             }, context);
437 
438         } catch (final HttpException | IOException | IllegalStateException ex) {
439             exchangeHandler.failed(ex);
440         }
441         return cancellable;
442     }
443 
444     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
445 
446         private final AsyncConnectionEndpoint connectionEndpoint;
447         private final AtomicBoolean released;
448 
449         InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
450             this.connectionEndpoint = connectionEndpoint;
451             this.released = new AtomicBoolean(false);
452         }
453 
454         boolean isReleased() {
455             return released.get();
456         }
457 
458         @Override
459         public boolean isConnected() {
460             return !isReleased() && connectionEndpoint.isConnected();
461         }
462 
463         @Override
464         public void execute(
465                 final AsyncClientExchangeHandler exchangeHandler,
466                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
467                 final HttpContext context) {
468             Asserts.check(!released.get(), "Endpoint has already been released");
469 
470             final String exchangeId = ExecSupport.getNextExchangeId();
471             if (LOG.isDebugEnabled()) {
472                 LOG.debug("{}: executing message exchange {}", ConnPoolSupport.getId(connectionEndpoint), exchangeId);
473                 connectionEndpoint.execute(
474                         exchangeId,
475                         new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
476                         pushHandlerFactory,
477                         context);
478             } else {
479                 connectionEndpoint.execute(exchangeId, exchangeHandler, context);
480             }
481         }
482 
483         public void setSocketTimeout(final Timeout timeout) {
484             connectionEndpoint.setSocketTimeout(timeout);
485         }
486 
487         @Override
488         public void releaseAndReuse() {
489             if (released.compareAndSet(false, true)) {
490                 manager.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECOND);
491             }
492         }
493 
494         @Override
495         public void releaseAndDiscard() {
496             if (released.compareAndSet(false, true)) {
497                 Closer.closeQuietly(connectionEndpoint);
498                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
499             }
500         }
501 
502     }
503 
504 }