View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.Set;
33  import java.util.concurrent.CancellationException;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.ScheduledExecutorService;
38  import java.util.concurrent.ThreadFactory;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  
42  import org.apache.hc.client5.http.HttpRoute;
43  import org.apache.hc.client5.http.async.AsyncExecCallback;
44  import org.apache.hc.client5.http.async.AsyncExecChain;
45  import org.apache.hc.client5.http.async.AsyncExecRuntime;
46  import org.apache.hc.client5.http.auth.AuthSchemeFactory;
47  import org.apache.hc.client5.http.auth.CredentialsProvider;
48  import org.apache.hc.client5.http.config.Configurable;
49  import org.apache.hc.client5.http.config.RequestConfig;
50  import org.apache.hc.client5.http.cookie.CookieSpecFactory;
51  import org.apache.hc.client5.http.cookie.CookieStore;
52  import org.apache.hc.client5.http.impl.ExecSupport;
53  import org.apache.hc.client5.http.protocol.HttpClientContext;
54  import org.apache.hc.client5.http.routing.RoutingSupport;
55  import org.apache.hc.core5.concurrent.Cancellable;
56  import org.apache.hc.core5.concurrent.ComplexFuture;
57  import org.apache.hc.core5.concurrent.DefaultThreadFactory;
58  import org.apache.hc.core5.concurrent.FutureCallback;
59  import org.apache.hc.core5.http.EntityDetails;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpRequest;
63  import org.apache.hc.core5.http.HttpResponse;
64  import org.apache.hc.core5.http.HttpStatus;
65  import org.apache.hc.core5.http.config.Lookup;
66  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
67  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
68  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
69  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
70  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
71  import org.apache.hc.core5.http.nio.DataStreamChannel;
72  import org.apache.hc.core5.http.nio.HandlerFactory;
73  import org.apache.hc.core5.http.nio.RequestChannel;
74  import org.apache.hc.core5.http.protocol.HttpContext;
75  import org.apache.hc.core5.http.support.BasicRequestBuilder;
76  import org.apache.hc.core5.io.CloseMode;
77  import org.apache.hc.core5.io.ModalCloseable;
78  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79  import org.apache.hc.core5.util.TimeValue;
80  import org.slf4j.Logger;
81  import org.slf4j.LoggerFactory;
82  
83  abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
84  
85      private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor", true);
86  
87      private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
88      private final AsyncExecChainElement execChain;
89      private final Lookup<CookieSpecFactory> cookieSpecRegistry;
90      private final Lookup<AuthSchemeFactory> authSchemeRegistry;
91      private final CookieStore cookieStore;
92      private final CredentialsProvider credentialsProvider;
93      private final RequestConfig defaultConfig;
94      private final ConcurrentLinkedQueue<Closeable> closeables;
95      private final ScheduledExecutorService scheduledExecutorService;
96  
97      InternalAbstractHttpAsyncClient(
98              final DefaultConnectingIOReactor ioReactor,
99              final AsyncPushConsumerRegistry pushConsumerRegistry,
100             final ThreadFactory threadFactory,
101             final AsyncExecChainElement execChain,
102             final Lookup<CookieSpecFactory> cookieSpecRegistry,
103             final Lookup<AuthSchemeFactory> authSchemeRegistry,
104             final CookieStore cookieStore,
105             final CredentialsProvider credentialsProvider,
106             final RequestConfig defaultConfig,
107             final List<Closeable> closeables) {
108         super(ioReactor, pushConsumerRegistry, threadFactory);
109         this.execChain = execChain;
110         this.cookieSpecRegistry = cookieSpecRegistry;
111         this.authSchemeRegistry = authSchemeRegistry;
112         this.cookieStore = cookieStore;
113         this.credentialsProvider = credentialsProvider;
114         this.defaultConfig = defaultConfig;
115         this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
116         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
117     }
118 
119     @Override
120     void internalClose(final CloseMode closeMode) {
121         if (this.closeables != null) {
122             Closeable closeable;
123             while ((closeable = this.closeables.poll()) != null) {
124                 try {
125                     if (closeable instanceof ModalCloseable) {
126                         ((ModalCloseable) closeable).close(closeMode);
127                     } else {
128                         closeable.close();
129                     }
130                 } catch (final IOException ex) {
131                     LOG.error(ex.getMessage(), ex);
132                 }
133             }
134         }
135         final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
136         for (final Runnable runnable: runnables) {
137             if (runnable instanceof Cancellable) {
138                 ((Cancellable) runnable).cancel();
139             }
140         }
141     }
142 
143     private void setupContext(final HttpClientContext context) {
144         if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
145             context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
146         }
147         if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
148             context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
149         }
150         if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
151             context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
152         }
153         if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
154             context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
155         }
156         if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
157             context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
158         }
159     }
160 
161     abstract AsyncExecRuntime createAsyncExecRuntime(
162             HandlerFactory<AsyncPushConsumer> pushHandlerFactory, HttpRoute route);
163 
164     abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
165 
166     @Override
167     protected <T> Future<T> doExecute(
168             final HttpHost httpHost,
169             final AsyncRequestProducer requestProducer,
170             final AsyncResponseConsumer<T> responseConsumer,
171             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
172             final HttpContext context,
173             final FutureCallback<T> callback) {
174         final ComplexFuture<T> future = new ComplexFuture<>(callback);
175         try {
176             if (!isRunning()) {
177                 throw new CancellationException("Request execution cancelled");
178             }
179             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
180             requestProducer.sendRequest(new RequestChannel() {
181 
182                 @Override
183                 public void sendRequest(
184                         final HttpRequest request,
185                         final EntityDetails entityDetails,
186                         final HttpContext context) throws HttpException, IOException {
187 
188                     RequestConfig requestConfig = null;
189                     if (request instanceof Configurable) {
190                         requestConfig = ((Configurable) request).getConfig();
191                     }
192                     if (requestConfig != null) {
193                         clientContext.setRequestConfig(requestConfig);
194                     }
195 
196                     setupContext(clientContext);
197 
198                     final HttpRoute route = determineRoute(
199                             httpHost != null ? httpHost : RoutingSupport.determineHost(request),
200                             clientContext);
201                     final String exchangeId = ExecSupport.getNextExchangeId();
202                     clientContext.setExchangeId(exchangeId);
203                     if (LOG.isDebugEnabled()) {
204                         LOG.debug("{} preparing request execution", exchangeId);
205                     }
206                     final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory, route);
207 
208                     final AsyncExecChain.Scheduler scheduler = new AsyncExecChain.Scheduler() {
209 
210                         @Override
211                         public void scheduleExecution(final HttpRequest request,
212                                                       final AsyncEntityProducer entityProducer,
213                                                       final AsyncExecChain.Scope scope,
214                                                       final AsyncExecCallback asyncExecCallback,
215                                                       final TimeValue delay) {
216                             executeScheduled(request, entityProducer, scope, asyncExecCallback, delay);
217                         }
218 
219                     };
220 
221                     final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
222                             clientContext, execRuntime, scheduler, new AtomicInteger(1));
223                     final AtomicBoolean outputTerminated = new AtomicBoolean(false);
224                     executeImmediate(
225                             BasicRequestBuilder.copy(request).build(),
226                             entityDetails != null ? new AsyncEntityProducer() {
227 
228                                 @Override
229                                 public void releaseResources() {
230                                     requestProducer.releaseResources();
231                                 }
232 
233                                 @Override
234                                 public void failed(final Exception cause) {
235                                     requestProducer.failed(cause);
236                                 }
237 
238                                 @Override
239                                 public boolean isRepeatable() {
240                                     return requestProducer.isRepeatable();
241                                 }
242 
243                                 @Override
244                                 public long getContentLength() {
245                                     return entityDetails.getContentLength();
246                                 }
247 
248                                 @Override
249                                 public String getContentType() {
250                                     return entityDetails.getContentType();
251                                 }
252 
253                                 @Override
254                                 public String getContentEncoding() {
255                                     return entityDetails.getContentEncoding();
256                                 }
257 
258                                 @Override
259                                 public boolean isChunked() {
260                                     return entityDetails.isChunked();
261                                 }
262 
263                                 @Override
264                                 public Set<String> getTrailerNames() {
265                                     return entityDetails.getTrailerNames();
266                                 }
267 
268                                 @Override
269                                 public int available() {
270                                     return requestProducer.available();
271                                 }
272 
273                                 @Override
274                                 public void produce(final DataStreamChannel channel) throws IOException {
275                                     if (outputTerminated.get()) {
276                                         channel.endStream();
277                                         return;
278                                     }
279                                     requestProducer.produce(channel);
280                                 }
281 
282                             } : null,
283                             scope,
284                             new AsyncExecCallback() {
285 
286                                 @Override
287                                 public AsyncDataConsumer handleResponse(
288                                         final HttpResponse response,
289                                         final EntityDetails entityDetails) throws HttpException, IOException {
290                                     if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
291                                         outputTerminated.set(true);
292                                         requestProducer.releaseResources();
293                                     }
294                                     responseConsumer.consumeResponse(response, entityDetails, context,
295                                             new FutureCallback<T>() {
296 
297                                                 @Override
298                                                 public void completed(final T result) {
299                                                     future.completed(result);
300                                                 }
301 
302                                                 @Override
303                                                 public void failed(final Exception ex) {
304                                                     future.failed(ex);
305                                                 }
306 
307                                                 @Override
308                                                 public void cancelled() {
309                                                     future.cancel();
310                                                 }
311 
312                                             });
313                                     return entityDetails != null ? responseConsumer : null;
314                                 }
315 
316                                 @Override
317                                 public void handleInformationResponse(
318                                         final HttpResponse response) throws HttpException, IOException {
319                                     responseConsumer.informationResponse(response, context);
320                                 }
321 
322                                 @Override
323                                 public void completed() {
324                                     if (LOG.isDebugEnabled()) {
325                                         LOG.debug("{} message exchange successfully completed", exchangeId);
326                                     }
327                                     try {
328                                         execRuntime.releaseEndpoint();
329                                     } finally {
330                                         responseConsumer.releaseResources();
331                                         requestProducer.releaseResources();
332                                     }
333                                 }
334 
335                                 @Override
336                                 public void failed(final Exception cause) {
337                                     if (LOG.isDebugEnabled()) {
338                                         LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
339                                     }
340                                     try {
341                                         execRuntime.discardEndpoint();
342                                         responseConsumer.failed(cause);
343                                     } finally {
344                                         try {
345                                             future.failed(cause);
346                                         } finally {
347                                             responseConsumer.releaseResources();
348                                             requestProducer.releaseResources();
349                                         }
350                                     }
351                                 }
352 
353                             });
354                 }
355 
356             }, context);
357         } catch (final HttpException | IOException | IllegalStateException ex) {
358             future.failed(ex);
359         }
360         return future;
361     }
362 
363     void executeImmediate(
364             final HttpRequest request,
365             final AsyncEntityProducer entityProducer,
366             final AsyncExecChain.Scope scope,
367             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
368         execChain.execute(request, entityProducer, scope, asyncExecCallback);
369     }
370 
371     void executeScheduled(
372             final HttpRequest request,
373             final AsyncEntityProducer entityProducer,
374             final AsyncExecChain.Scope scope,
375             final AsyncExecCallback asyncExecCallback,
376             final TimeValue delay) {
377         final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
378                 request, entityProducer, scope, asyncExecCallback, delay);
379         if (TimeValue.isPositive(delay)) {
380             scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
381         } else {
382             scheduledExecutorService.execute(scheduledTask);
383         }
384     }
385 
386     class ScheduledRequestExecution implements Runnable, Cancellable {
387 
388         final HttpRequest request;
389         final AsyncEntityProducer entityProducer;
390         final AsyncExecChain.Scope scope;
391         final AsyncExecCallback asyncExecCallback;
392         final TimeValue delay;
393 
394         ScheduledRequestExecution(final HttpRequest request,
395                                   final AsyncEntityProducer entityProducer,
396                                   final AsyncExecChain.Scope scope,
397                                   final AsyncExecCallback asyncExecCallback,
398                                   final TimeValue delay) {
399             this.request = request;
400             this.entityProducer = entityProducer;
401             this.scope = scope;
402             this.asyncExecCallback = asyncExecCallback;
403             this.delay = delay;
404         }
405 
406         @Override
407         public void run() {
408             try {
409                 execChain.execute(request, entityProducer, scope, asyncExecCallback);
410             } catch (final Exception ex) {
411                 asyncExecCallback.failed(ex);
412             }
413         }
414 
415         @Override
416         public boolean cancel() {
417             asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
418             return true;
419         }
420 
421     }
422 
423 }