View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.Set;
33  import java.util.concurrent.CancellationException;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.ThreadFactory;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  
39  import org.apache.hc.client5.http.HttpRoute;
40  import org.apache.hc.client5.http.async.AsyncExecCallback;
41  import org.apache.hc.client5.http.async.AsyncExecChain;
42  import org.apache.hc.client5.http.async.AsyncExecRuntime;
43  import org.apache.hc.client5.http.auth.AuthSchemeFactory;
44  import org.apache.hc.client5.http.auth.CredentialsProvider;
45  import org.apache.hc.client5.http.config.Configurable;
46  import org.apache.hc.client5.http.config.RequestConfig;
47  import org.apache.hc.client5.http.cookie.CookieSpecFactory;
48  import org.apache.hc.client5.http.cookie.CookieStore;
49  import org.apache.hc.client5.http.impl.ExecSupport;
50  import org.apache.hc.client5.http.impl.RequestCopier;
51  import org.apache.hc.client5.http.protocol.HttpClientContext;
52  import org.apache.hc.client5.http.routing.RoutingSupport;
53  import org.apache.hc.core5.concurrent.ComplexFuture;
54  import org.apache.hc.core5.concurrent.FutureCallback;
55  import org.apache.hc.core5.http.EntityDetails;
56  import org.apache.hc.core5.http.HttpException;
57  import org.apache.hc.core5.http.HttpHost;
58  import org.apache.hc.core5.http.HttpRequest;
59  import org.apache.hc.core5.http.HttpResponse;
60  import org.apache.hc.core5.http.HttpStatus;
61  import org.apache.hc.core5.http.config.Lookup;
62  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
63  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
64  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
65  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
66  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
67  import org.apache.hc.core5.http.nio.DataStreamChannel;
68  import org.apache.hc.core5.http.nio.HandlerFactory;
69  import org.apache.hc.core5.http.nio.RequestChannel;
70  import org.apache.hc.core5.http.protocol.HttpContext;
71  import org.apache.hc.core5.io.CloseMode;
72  import org.apache.hc.core5.io.ModalCloseable;
73  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
74  import org.slf4j.Logger;
75  import org.slf4j.LoggerFactory;
76  
77  abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
78  
79      private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
80      private final AsyncExecChainElement execChain;
81      private final Lookup<CookieSpecFactory> cookieSpecRegistry;
82      private final Lookup<AuthSchemeFactory> authSchemeRegistry;
83      private final CookieStore cookieStore;
84      private final CredentialsProvider credentialsProvider;
85      private final RequestConfig defaultConfig;
86      private final ConcurrentLinkedQueue<Closeable> closeables;
87  
88      InternalAbstractHttpAsyncClient(
89              final DefaultConnectingIOReactor ioReactor,
90              final AsyncPushConsumerRegistry pushConsumerRegistry,
91              final ThreadFactory threadFactory,
92              final AsyncExecChainElement execChain,
93              final Lookup<CookieSpecFactory> cookieSpecRegistry,
94              final Lookup<AuthSchemeFactory> authSchemeRegistry,
95              final CookieStore cookieStore,
96              final CredentialsProvider credentialsProvider,
97              final RequestConfig defaultConfig,
98              final List<Closeable> closeables) {
99          super(ioReactor, pushConsumerRegistry, threadFactory);
100         this.execChain = execChain;
101         this.cookieSpecRegistry = cookieSpecRegistry;
102         this.authSchemeRegistry = authSchemeRegistry;
103         this.cookieStore = cookieStore;
104         this.credentialsProvider = credentialsProvider;
105         this.defaultConfig = defaultConfig;
106         this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
107     }
108 
109     @Override
110     void internalClose(final CloseMode closeMode) {
111         if (this.closeables != null) {
112             Closeable closeable;
113             while ((closeable = this.closeables.poll()) != null) {
114                 try {
115                     if (closeable instanceof ModalCloseable) {
116                         ((ModalCloseable) closeable).close(closeMode);
117                     } else {
118                         closeable.close();
119                     }
120                 } catch (final IOException ex) {
121                     LOG.error(ex.getMessage(), ex);
122                 }
123             }
124         }
125     }
126 
127     private void setupContext(final HttpClientContext context) {
128         if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
129             context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
130         }
131         if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
132             context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
133         }
134         if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
135             context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
136         }
137         if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
138             context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
139         }
140         if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
141             context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
142         }
143     }
144 
145     abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
146 
147     abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
148 
149     @Override
150     protected <T> Future<T> doExecute(
151             final HttpHost httpHost,
152             final AsyncRequestProducer requestProducer,
153             final AsyncResponseConsumer<T> responseConsumer,
154             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
155             final HttpContext context,
156             final FutureCallback<T> callback) {
157         final ComplexFuture<T> future = new ComplexFuture<>(callback);
158         try {
159             if (!isRunning()) {
160                 throw new CancellationException("Request execution cancelled");
161             }
162             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
163             requestProducer.sendRequest(new RequestChannel() {
164 
165                 @Override
166                 public void sendRequest(
167                         final HttpRequest request,
168                         final EntityDetails entityDetails,
169                         final HttpContext context) throws HttpException, IOException {
170 
171                     RequestConfig requestConfig = null;
172                     if (request instanceof Configurable) {
173                         requestConfig = ((Configurable) request).getConfig();
174                     }
175                     if (requestConfig != null) {
176                         clientContext.setRequestConfig(requestConfig);
177                     }
178                     final HttpRoute route = determineRoute(
179                             httpHost != null ? httpHost : RoutingSupport.determineHost(request),
180                             clientContext);
181                     final String exchangeId = ExecSupport.getNextExchangeId();
182                     if (LOG.isDebugEnabled()) {
183                         LOG.debug("{}: preparing request execution", exchangeId);
184                     }
185                     final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
186 
187                     setupContext(clientContext);
188 
189                     final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
190                             clientContext, execRuntime);
191                     final AtomicBoolean outputTerminated = new AtomicBoolean(false);
192                     execChain.execute(
193                             RequestCopier.INSTANCE.copy(request),
194                             entityDetails != null ? new AsyncEntityProducer() {
195 
196                                 @Override
197                                 public void releaseResources() {
198                                     requestProducer.releaseResources();
199                                 }
200 
201                                 @Override
202                                 public void failed(final Exception cause) {
203                                     requestProducer.failed(cause);
204                                 }
205 
206                                 @Override
207                                 public boolean isRepeatable() {
208                                     return requestProducer.isRepeatable();
209                                 }
210 
211                                 @Override
212                                 public long getContentLength() {
213                                     return entityDetails.getContentLength();
214                                 }
215 
216                                 @Override
217                                 public String getContentType() {
218                                     return entityDetails.getContentType();
219                                 }
220 
221                                 @Override
222                                 public String getContentEncoding() {
223                                     return entityDetails.getContentEncoding();
224                                 }
225 
226                                 @Override
227                                 public boolean isChunked() {
228                                     return entityDetails.isChunked();
229                                 }
230 
231                                 @Override
232                                 public Set<String> getTrailerNames() {
233                                     return entityDetails.getTrailerNames();
234                                 }
235 
236                                 @Override
237                                 public int available() {
238                                     return requestProducer.available();
239                                 }
240 
241                                 @Override
242                                 public void produce(final DataStreamChannel channel) throws IOException {
243                                     if (outputTerminated.get()) {
244                                         channel.endStream();
245                                         return;
246                                     }
247                                     requestProducer.produce(channel);
248                                 }
249 
250                             } : null,
251                             scope,
252                             new AsyncExecCallback() {
253 
254                                 @Override
255                                 public AsyncDataConsumer handleResponse(
256                                         final HttpResponse response,
257                                         final EntityDetails entityDetails) throws HttpException, IOException {
258                                     if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
259                                         outputTerminated.set(true);
260                                         requestProducer.releaseResources();
261                                     }
262                                     responseConsumer.consumeResponse(response, entityDetails, context,
263                                             new FutureCallback<T>() {
264 
265                                                 @Override
266                                                 public void completed(final T result) {
267                                                     future.completed(result);
268                                                 }
269 
270                                                 @Override
271                                                 public void failed(final Exception ex) {
272                                                     future.failed(ex);
273                                                 }
274 
275                                                 @Override
276                                                 public void cancelled() {
277                                                     future.cancel();
278                                                 }
279 
280                                             });
281                                     return responseConsumer;
282                                 }
283 
284                                 @Override
285                                 public void handleInformationResponse(
286                                         final HttpResponse response) throws HttpException, IOException {
287                                     responseConsumer.informationResponse(response, context);
288                                 }
289 
290                                 @Override
291                                 public void completed() {
292                                     if (LOG.isDebugEnabled()) {
293                                         LOG.debug("{}: message exchange successfully completed", exchangeId);
294                                     }
295                                     try {
296                                         execRuntime.releaseEndpoint();
297                                     } finally {
298                                         responseConsumer.releaseResources();
299                                         requestProducer.releaseResources();
300                                     }
301                                 }
302 
303                                 @Override
304                                 public void failed(final Exception cause) {
305                                     if (LOG.isDebugEnabled()) {
306                                         LOG.debug("{}: request failed: {}", exchangeId, cause.getMessage());
307                                     }
308                                     try {
309                                         execRuntime.discardEndpoint();
310                                         responseConsumer.failed(cause);
311                                     } finally {
312                                         try {
313                                             future.failed(cause);
314                                         } finally {
315                                             responseConsumer.releaseResources();
316                                             requestProducer.releaseResources();
317                                         }
318                                     }
319                                 }
320 
321                             });
322                 }
323 
324             }, context);
325         } catch (final HttpException | IOException | IllegalStateException ex) {
326             future.failed(ex);
327         }
328         return future;
329     }
330 
331 }