View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.Set;
33  import java.util.concurrent.CancellationException;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.ScheduledExecutorService;
38  import java.util.concurrent.ThreadFactory;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  
42  import org.apache.hc.client5.http.HttpRoute;
43  import org.apache.hc.client5.http.async.AsyncExecCallback;
44  import org.apache.hc.client5.http.async.AsyncExecChain;
45  import org.apache.hc.client5.http.async.AsyncExecRuntime;
46  import org.apache.hc.client5.http.auth.AuthSchemeFactory;
47  import org.apache.hc.client5.http.auth.CredentialsProvider;
48  import org.apache.hc.client5.http.config.Configurable;
49  import org.apache.hc.client5.http.config.RequestConfig;
50  import org.apache.hc.client5.http.cookie.CookieSpecFactory;
51  import org.apache.hc.client5.http.cookie.CookieStore;
52  import org.apache.hc.client5.http.impl.ExecSupport;
53  import org.apache.hc.client5.http.protocol.HttpClientContext;
54  import org.apache.hc.client5.http.routing.RoutingSupport;
55  import org.apache.hc.core5.concurrent.Cancellable;
56  import org.apache.hc.core5.concurrent.ComplexFuture;
57  import org.apache.hc.core5.concurrent.DefaultThreadFactory;
58  import org.apache.hc.core5.concurrent.FutureCallback;
59  import org.apache.hc.core5.http.EntityDetails;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpRequest;
63  import org.apache.hc.core5.http.HttpResponse;
64  import org.apache.hc.core5.http.HttpStatus;
65  import org.apache.hc.core5.http.config.Lookup;
66  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
67  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
68  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
69  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
70  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
71  import org.apache.hc.core5.http.nio.DataStreamChannel;
72  import org.apache.hc.core5.http.nio.HandlerFactory;
73  import org.apache.hc.core5.http.protocol.HttpContext;
74  import org.apache.hc.core5.http.support.BasicRequestBuilder;
75  import org.apache.hc.core5.io.CloseMode;
76  import org.apache.hc.core5.io.ModalCloseable;
77  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
78  import org.apache.hc.core5.util.TimeValue;
79  import org.slf4j.Logger;
80  import org.slf4j.LoggerFactory;
81  
82  abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
83  
84      private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor", true);
85  
86      private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
87      private final AsyncExecChainElement execChain;
88      private final Lookup<CookieSpecFactory> cookieSpecRegistry;
89      private final Lookup<AuthSchemeFactory> authSchemeRegistry;
90      private final CookieStore cookieStore;
91      private final CredentialsProvider credentialsProvider;
92      private final RequestConfig defaultConfig;
93      private final ConcurrentLinkedQueue<Closeable> closeables;
94      private final ScheduledExecutorService scheduledExecutorService;
95      private final AsyncExecChain.Scheduler scheduler;
96  
97      InternalAbstractHttpAsyncClient(
98              final DefaultConnectingIOReactor ioReactor,
99              final AsyncPushConsumerRegistry pushConsumerRegistry,
100             final ThreadFactory threadFactory,
101             final AsyncExecChainElement execChain,
102             final Lookup<CookieSpecFactory> cookieSpecRegistry,
103             final Lookup<AuthSchemeFactory> authSchemeRegistry,
104             final CookieStore cookieStore,
105             final CredentialsProvider credentialsProvider,
106             final RequestConfig defaultConfig,
107             final List<Closeable> closeables) {
108         super(ioReactor, pushConsumerRegistry, threadFactory);
109         this.execChain = execChain;
110         this.cookieSpecRegistry = cookieSpecRegistry;
111         this.authSchemeRegistry = authSchemeRegistry;
112         this.cookieStore = cookieStore;
113         this.credentialsProvider = credentialsProvider;
114         this.defaultConfig = defaultConfig;
115         this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
116         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
117         this.scheduler = new AsyncExecChain.Scheduler() {
118 
119             @Override
120             public void scheduleExecution(
121                     final HttpRequest request,
122                     final AsyncEntityProducer entityProducer,
123                     final AsyncExecChain.Scope scope,
124                     final AsyncExecCallback asyncExecCallback,
125                     final TimeValue delay) {
126                 executeScheduled(request, entityProducer, scope, execChain::execute, asyncExecCallback, delay);
127             }
128 
129             @Override
130             public void scheduleExecution(
131                     final HttpRequest request,
132                     final AsyncEntityProducer entityProducer,
133                     final AsyncExecChain.Scope scope,
134                     final AsyncExecChain chain,
135                     final AsyncExecCallback asyncExecCallback,
136                     final TimeValue delay) {
137                 executeScheduled(request, entityProducer, scope, chain, asyncExecCallback, delay);
138             }
139         };
140 
141     }
142 
143     @Override
144     void internalClose(final CloseMode closeMode) {
145         if (this.closeables != null) {
146             Closeable closeable;
147             while ((closeable = this.closeables.poll()) != null) {
148                 try {
149                     if (closeable instanceof ModalCloseable) {
150                         ((ModalCloseable) closeable).close(closeMode);
151                     } else {
152                         closeable.close();
153                     }
154                 } catch (final IOException ex) {
155                     LOG.error(ex.getMessage(), ex);
156                 }
157             }
158         }
159         final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
160         for (final Runnable runnable: runnables) {
161             if (runnable instanceof Cancellable) {
162                 ((Cancellable) runnable).cancel();
163             }
164         }
165     }
166 
167     private void setupContext(final HttpClientContext context) {
168         if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
169             context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
170         }
171         if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
172             context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
173         }
174         if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
175             context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
176         }
177         if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
178             context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
179         }
180         if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
181             context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
182         }
183     }
184 
185     abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
186 
187     abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
188 
189     @Override
190     protected <T> Future<T> doExecute(
191             final HttpHost httpHost,
192             final AsyncRequestProducer requestProducer,
193             final AsyncResponseConsumer<T> responseConsumer,
194             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
195             final HttpContext context,
196             final FutureCallback<T> callback) {
197         final ComplexFuture<T> future = new ComplexFuture<>(callback);
198         try {
199             if (!isRunning()) {
200                 throw new CancellationException("Request execution cancelled");
201             }
202             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
203             requestProducer.sendRequest((request, entityDetails, c) -> {
204 
205                 RequestConfig requestConfig = null;
206                 if (request instanceof Configurable) {
207                     requestConfig = ((Configurable) request).getConfig();
208                 }
209                 if (requestConfig != null) {
210                     clientContext.setRequestConfig(requestConfig);
211                 }
212 
213                 setupContext(clientContext);
214 
215                 final HttpRoute route = determineRoute(
216                         httpHost != null ? httpHost : RoutingSupport.determineHost(request),
217                         clientContext);
218                 final String exchangeId = ExecSupport.getNextExchangeId();
219                 clientContext.setExchangeId(exchangeId);
220                 if (LOG.isDebugEnabled()) {
221                     LOG.debug("{} preparing request execution", exchangeId);
222                 }
223                 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
224 
225                 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
226                         clientContext, execRuntime, scheduler, new AtomicInteger(1));
227                 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
228                 executeImmediate(
229                         BasicRequestBuilder.copy(request).build(),
230                         entityDetails != null ? new AsyncEntityProducer() {
231 
232                             @Override
233                             public void releaseResources() {
234                                 requestProducer.releaseResources();
235                             }
236 
237                             @Override
238                             public void failed(final Exception cause) {
239                                 requestProducer.failed(cause);
240                             }
241 
242                             @Override
243                             public boolean isRepeatable() {
244                                 return requestProducer.isRepeatable();
245                             }
246 
247                             @Override
248                             public long getContentLength() {
249                                 return entityDetails.getContentLength();
250                             }
251 
252                             @Override
253                             public String getContentType() {
254                                 return entityDetails.getContentType();
255                             }
256 
257                             @Override
258                             public String getContentEncoding() {
259                                 return entityDetails.getContentEncoding();
260                             }
261 
262                             @Override
263                             public boolean isChunked() {
264                                 return entityDetails.isChunked();
265                             }
266 
267                             @Override
268                             public Set<String> getTrailerNames() {
269                                 return entityDetails.getTrailerNames();
270                             }
271 
272                             @Override
273                             public int available() {
274                                 return requestProducer.available();
275                             }
276 
277                             @Override
278                             public void produce(final DataStreamChannel channel) throws IOException {
279                                 if (outputTerminated.get()) {
280                                     channel.endStream();
281                                     return;
282                                 }
283                                 requestProducer.produce(channel);
284                             }
285 
286                         } : null,
287                         scope,
288                         execChain::execute,
289                         new AsyncExecCallback() {
290 
291                             @Override
292                             public AsyncDataConsumer handleResponse(
293                                     final HttpResponse response,
294                                     final EntityDetails entityDetails) throws HttpException, IOException {
295                                 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
296                                     outputTerminated.set(true);
297                                     requestProducer.releaseResources();
298                                 }
299                                 responseConsumer.consumeResponse(response, entityDetails, c,
300                                         new FutureCallback<T>() {
301 
302                                             @Override
303                                             public void completed(final T result) {
304                                                 future.completed(result);
305                                             }
306 
307                                             @Override
308                                             public void failed(final Exception ex) {
309                                                 future.failed(ex);
310                                             }
311 
312                                             @Override
313                                             public void cancelled() {
314                                                 future.cancel();
315                                             }
316 
317                                         });
318                                 return entityDetails != null ? responseConsumer : null;
319                             }
320 
321                             @Override
322                             public void handleInformationResponse(
323                                     final HttpResponse response) throws HttpException, IOException {
324                                 responseConsumer.informationResponse(response, c);
325                             }
326 
327                             @Override
328                             public void completed() {
329                                 if (LOG.isDebugEnabled()) {
330                                     LOG.debug("{} message exchange successfully completed", exchangeId);
331                                 }
332                                 try {
333                                     execRuntime.releaseEndpoint();
334                                 } finally {
335                                     responseConsumer.releaseResources();
336                                     requestProducer.releaseResources();
337                                 }
338                             }
339 
340                             @Override
341                             public void failed(final Exception cause) {
342                                 if (LOG.isDebugEnabled()) {
343                                     LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
344                                 }
345                                 try {
346                                     execRuntime.discardEndpoint();
347                                     responseConsumer.failed(cause);
348                                 } finally {
349                                     try {
350                                         future.failed(cause);
351                                     } finally {
352                                         responseConsumer.releaseResources();
353                                         requestProducer.releaseResources();
354                                     }
355                                 }
356                             }
357 
358                         });
359             }, context);
360         } catch (final HttpException | IOException | IllegalStateException ex) {
361             future.failed(ex);
362         }
363         return future;
364     }
365 
366     void executeImmediate(
367             final HttpRequest request,
368             final AsyncEntityProducer entityProducer,
369             final AsyncExecChain.Scope scope,
370             final AsyncExecChain chain,
371             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
372         chain.proceed(request, entityProducer, scope, asyncExecCallback);
373     }
374 
375     void executeScheduled(
376             final HttpRequest request,
377             final AsyncEntityProducer entityProducer,
378             final AsyncExecChain.Scope scope,
379             final AsyncExecChain chain,
380             final AsyncExecCallback asyncExecCallback,
381             final TimeValue delay) {
382         final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
383                 request, entityProducer, scope, chain, asyncExecCallback, delay);
384         if (TimeValue.isPositive(delay)) {
385             scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
386         } else {
387             scheduledExecutorService.execute(scheduledTask);
388         }
389     }
390 
391     class ScheduledRequestExecution implements Runnable, Cancellable {
392 
393         final HttpRequest request;
394         final AsyncEntityProducer entityProducer;
395         final AsyncExecChain.Scope scope;
396         final AsyncExecChain chain;
397         final AsyncExecCallback asyncExecCallback;
398         final TimeValue delay;
399 
400         ScheduledRequestExecution(final HttpRequest request,
401                                   final AsyncEntityProducer entityProducer,
402                                   final AsyncExecChain.Scope scope,
403                                   final AsyncExecChain chain,
404                                   final AsyncExecCallback asyncExecCallback,
405                                   final TimeValue delay) {
406             this.request = request;
407             this.entityProducer = entityProducer;
408             this.scope = scope;
409             this.chain = chain;
410             this.asyncExecCallback = asyncExecCallback;
411             this.delay = delay;
412         }
413 
414         @Override
415         public void run() {
416             try {
417                 chain.proceed(request, entityProducer, scope, asyncExecCallback);
418             } catch (final Exception ex) {
419                 asyncExecCallback.failed(ex);
420             }
421         }
422 
423         @Override
424         public boolean cancel() {
425             asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
426             return true;
427         }
428 
429     }
430 
431 }