1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41
42 import org.apache.hc.client5.http.HttpRoute;
43 import org.apache.hc.client5.http.async.AsyncExecCallback;
44 import org.apache.hc.client5.http.async.AsyncExecChain;
45 import org.apache.hc.client5.http.async.AsyncExecRuntime;
46 import org.apache.hc.client5.http.auth.AuthSchemeFactory;
47 import org.apache.hc.client5.http.auth.CredentialsProvider;
48 import org.apache.hc.client5.http.config.Configurable;
49 import org.apache.hc.client5.http.config.RequestConfig;
50 import org.apache.hc.client5.http.cookie.CookieSpecFactory;
51 import org.apache.hc.client5.http.cookie.CookieStore;
52 import org.apache.hc.client5.http.impl.ExecSupport;
53 import org.apache.hc.client5.http.protocol.HttpClientContext;
54 import org.apache.hc.client5.http.routing.RoutingSupport;
55 import org.apache.hc.core5.concurrent.Cancellable;
56 import org.apache.hc.core5.concurrent.ComplexFuture;
57 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
58 import org.apache.hc.core5.concurrent.FutureCallback;
59 import org.apache.hc.core5.http.EntityDetails;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpRequest;
63 import org.apache.hc.core5.http.HttpResponse;
64 import org.apache.hc.core5.http.HttpStatus;
65 import org.apache.hc.core5.http.config.Lookup;
66 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
67 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
68 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
69 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
70 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
71 import org.apache.hc.core5.http.nio.DataStreamChannel;
72 import org.apache.hc.core5.http.nio.HandlerFactory;
73 import org.apache.hc.core5.http.nio.RequestChannel;
74 import org.apache.hc.core5.http.protocol.HttpContext;
75 import org.apache.hc.core5.http.support.BasicRequestBuilder;
76 import org.apache.hc.core5.io.CloseMode;
77 import org.apache.hc.core5.io.ModalCloseable;
78 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79 import org.apache.hc.core5.util.TimeValue;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83 abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
84
85 private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor");
86
87 private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
88 private final AsyncExecChainElement execChain;
89 private final Lookup<CookieSpecFactory> cookieSpecRegistry;
90 private final Lookup<AuthSchemeFactory> authSchemeRegistry;
91 private final CookieStore cookieStore;
92 private final CredentialsProvider credentialsProvider;
93 private final RequestConfig defaultConfig;
94 private final ConcurrentLinkedQueue<Closeable> closeables;
95 private final ScheduledExecutorService scheduledExecutorService;
96
97 InternalAbstractHttpAsyncClient(
98 final DefaultConnectingIOReactor ioReactor,
99 final AsyncPushConsumerRegistry pushConsumerRegistry,
100 final ThreadFactory threadFactory,
101 final AsyncExecChainElement execChain,
102 final Lookup<CookieSpecFactory> cookieSpecRegistry,
103 final Lookup<AuthSchemeFactory> authSchemeRegistry,
104 final CookieStore cookieStore,
105 final CredentialsProvider credentialsProvider,
106 final RequestConfig defaultConfig,
107 final List<Closeable> closeables) {
108 super(ioReactor, pushConsumerRegistry, threadFactory);
109 this.execChain = execChain;
110 this.cookieSpecRegistry = cookieSpecRegistry;
111 this.authSchemeRegistry = authSchemeRegistry;
112 this.cookieStore = cookieStore;
113 this.credentialsProvider = credentialsProvider;
114 this.defaultConfig = defaultConfig;
115 this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
116 this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
117 }
118
119 @Override
120 void internalClose(final CloseMode closeMode) {
121 if (this.closeables != null) {
122 Closeable closeable;
123 while ((closeable = this.closeables.poll()) != null) {
124 try {
125 if (closeable instanceof ModalCloseable) {
126 ((ModalCloseable) closeable).close(closeMode);
127 } else {
128 closeable.close();
129 }
130 } catch (final IOException ex) {
131 LOG.error(ex.getMessage(), ex);
132 }
133 }
134 }
135 final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
136 for (final Runnable runnable: runnables) {
137 if (runnable instanceof Cancellable) {
138 ((Cancellable) runnable).cancel();
139 }
140 }
141 }
142
143 private void setupContext(final HttpClientContext context) {
144 if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
145 context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
146 }
147 if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
148 context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
149 }
150 if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
151 context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
152 }
153 if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
154 context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
155 }
156 if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
157 context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
158 }
159 }
160
161 abstract AsyncExecRuntime createAsyncExecRuntime(
162 HandlerFactory<AsyncPushConsumer> pushHandlerFactory, HttpRoute route);
163
164 abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
165
166 @Override
167 protected <T> Future<T> doExecute(
168 final HttpHost httpHost,
169 final AsyncRequestProducer requestProducer,
170 final AsyncResponseConsumer<T> responseConsumer,
171 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
172 final HttpContext context,
173 final FutureCallback<T> callback) {
174 final ComplexFuture<T> future = new ComplexFuture<>(callback);
175 try {
176 if (!isRunning()) {
177 throw new CancellationException("Request execution cancelled");
178 }
179 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
180 requestProducer.sendRequest(new RequestChannel() {
181
182 @Override
183 public void sendRequest(
184 final HttpRequest request,
185 final EntityDetails entityDetails,
186 final HttpContext context) throws HttpException, IOException {
187
188 RequestConfig requestConfig = null;
189 if (request instanceof Configurable) {
190 requestConfig = ((Configurable) request).getConfig();
191 }
192 if (requestConfig != null) {
193 clientContext.setRequestConfig(requestConfig);
194 }
195 final HttpRoute route = determineRoute(
196 httpHost != null ? httpHost : RoutingSupport.determineHost(request),
197 clientContext);
198 final String exchangeId = ExecSupport.getNextExchangeId();
199 clientContext.setExchangeId(exchangeId);
200 if (LOG.isDebugEnabled()) {
201 LOG.debug("{} preparing request execution", exchangeId);
202 }
203 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory, route);
204
205 setupContext(clientContext);
206
207 final AsyncExecChain.Scheduler scheduler = new AsyncExecChain.Scheduler() {
208
209 @Override
210 public void scheduleExecution(final HttpRequest request,
211 final AsyncEntityProducer entityProducer,
212 final AsyncExecChain.Scope scope,
213 final AsyncExecCallback asyncExecCallback,
214 final TimeValue delay) {
215 executeScheduled(request, entityProducer, scope, asyncExecCallback, delay);
216 }
217
218 };
219
220 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
221 clientContext, execRuntime, scheduler, new AtomicInteger(1));
222 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
223 executeImmediate(
224 BasicRequestBuilder.copy(request).build(),
225 entityDetails != null ? new AsyncEntityProducer() {
226
227 @Override
228 public void releaseResources() {
229 requestProducer.releaseResources();
230 }
231
232 @Override
233 public void failed(final Exception cause) {
234 requestProducer.failed(cause);
235 }
236
237 @Override
238 public boolean isRepeatable() {
239 return requestProducer.isRepeatable();
240 }
241
242 @Override
243 public long getContentLength() {
244 return entityDetails.getContentLength();
245 }
246
247 @Override
248 public String getContentType() {
249 return entityDetails.getContentType();
250 }
251
252 @Override
253 public String getContentEncoding() {
254 return entityDetails.getContentEncoding();
255 }
256
257 @Override
258 public boolean isChunked() {
259 return entityDetails.isChunked();
260 }
261
262 @Override
263 public Set<String> getTrailerNames() {
264 return entityDetails.getTrailerNames();
265 }
266
267 @Override
268 public int available() {
269 return requestProducer.available();
270 }
271
272 @Override
273 public void produce(final DataStreamChannel channel) throws IOException {
274 if (outputTerminated.get()) {
275 channel.endStream();
276 return;
277 }
278 requestProducer.produce(channel);
279 }
280
281 } : null,
282 scope,
283 new AsyncExecCallback() {
284
285 @Override
286 public AsyncDataConsumer handleResponse(
287 final HttpResponse response,
288 final EntityDetails entityDetails) throws HttpException, IOException {
289 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
290 outputTerminated.set(true);
291 requestProducer.releaseResources();
292 }
293 responseConsumer.consumeResponse(response, entityDetails, context,
294 new FutureCallback<T>() {
295
296 @Override
297 public void completed(final T result) {
298 future.completed(result);
299 }
300
301 @Override
302 public void failed(final Exception ex) {
303 future.failed(ex);
304 }
305
306 @Override
307 public void cancelled() {
308 future.cancel();
309 }
310
311 });
312 return entityDetails != null ? responseConsumer : null;
313 }
314
315 @Override
316 public void handleInformationResponse(
317 final HttpResponse response) throws HttpException, IOException {
318 responseConsumer.informationResponse(response, context);
319 }
320
321 @Override
322 public void completed() {
323 if (LOG.isDebugEnabled()) {
324 LOG.debug("{} message exchange successfully completed", exchangeId);
325 }
326 try {
327 execRuntime.releaseEndpoint();
328 } finally {
329 responseConsumer.releaseResources();
330 requestProducer.releaseResources();
331 }
332 }
333
334 @Override
335 public void failed(final Exception cause) {
336 if (LOG.isDebugEnabled()) {
337 LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
338 }
339 try {
340 execRuntime.discardEndpoint();
341 responseConsumer.failed(cause);
342 } finally {
343 try {
344 future.failed(cause);
345 } finally {
346 responseConsumer.releaseResources();
347 requestProducer.releaseResources();
348 }
349 }
350 }
351
352 });
353 }
354
355 }, context);
356 } catch (final HttpException | IOException | IllegalStateException ex) {
357 future.failed(ex);
358 }
359 return future;
360 }
361
362 void executeImmediate(
363 final HttpRequest request,
364 final AsyncEntityProducer entityProducer,
365 final AsyncExecChain.Scope scope,
366 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
367 execChain.execute(request, entityProducer, scope, asyncExecCallback);
368 }
369
370 void executeScheduled(
371 final HttpRequest request,
372 final AsyncEntityProducer entityProducer,
373 final AsyncExecChain.Scope scope,
374 final AsyncExecCallback asyncExecCallback,
375 final TimeValue delay) {
376 final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
377 request, entityProducer, scope, asyncExecCallback, delay);
378 if (TimeValue.isPositive(delay)) {
379 scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
380 } else {
381 scheduledExecutorService.execute(scheduledTask);
382 }
383 }
384
385 class ScheduledRequestExecution implements Runnable, Cancellable {
386
387 final HttpRequest request;
388 final AsyncEntityProducer entityProducer;
389 final AsyncExecChain.Scope scope;
390 final AsyncExecCallback asyncExecCallback;
391 final TimeValue delay;
392
393 ScheduledRequestExecution(final HttpRequest request,
394 final AsyncEntityProducer entityProducer,
395 final AsyncExecChain.Scope scope,
396 final AsyncExecCallback asyncExecCallback,
397 final TimeValue delay) {
398 this.request = request;
399 this.entityProducer = entityProducer;
400 this.scope = scope;
401 this.asyncExecCallback = asyncExecCallback;
402 this.delay = delay;
403 }
404
405 @Override
406 public void run() {
407 try {
408 execChain.execute(request, entityProducer, scope, asyncExecCallback);
409 } catch (final Exception ex) {
410 asyncExecCallback.failed(ex);
411 }
412 }
413
414 @Override
415 public boolean cancel() {
416 asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
417 return true;
418 }
419
420 }
421
422 }