1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41
42 import org.apache.hc.client5.http.HttpRoute;
43 import org.apache.hc.client5.http.async.AsyncExecCallback;
44 import org.apache.hc.client5.http.async.AsyncExecChain;
45 import org.apache.hc.client5.http.async.AsyncExecRuntime;
46 import org.apache.hc.client5.http.auth.AuthSchemeFactory;
47 import org.apache.hc.client5.http.auth.CredentialsProvider;
48 import org.apache.hc.client5.http.config.Configurable;
49 import org.apache.hc.client5.http.config.RequestConfig;
50 import org.apache.hc.client5.http.cookie.CookieSpecFactory;
51 import org.apache.hc.client5.http.cookie.CookieStore;
52 import org.apache.hc.client5.http.impl.ExecSupport;
53 import org.apache.hc.client5.http.protocol.HttpClientContext;
54 import org.apache.hc.client5.http.routing.RoutingSupport;
55 import org.apache.hc.core5.concurrent.Cancellable;
56 import org.apache.hc.core5.concurrent.ComplexFuture;
57 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
58 import org.apache.hc.core5.concurrent.FutureCallback;
59 import org.apache.hc.core5.http.EntityDetails;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpRequest;
63 import org.apache.hc.core5.http.HttpResponse;
64 import org.apache.hc.core5.http.HttpStatus;
65 import org.apache.hc.core5.http.config.Lookup;
66 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
67 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
68 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
69 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
70 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
71 import org.apache.hc.core5.http.nio.DataStreamChannel;
72 import org.apache.hc.core5.http.nio.HandlerFactory;
73 import org.apache.hc.core5.http.nio.RequestChannel;
74 import org.apache.hc.core5.http.protocol.HttpContext;
75 import org.apache.hc.core5.http.support.BasicRequestBuilder;
76 import org.apache.hc.core5.io.CloseMode;
77 import org.apache.hc.core5.io.ModalCloseable;
78 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79 import org.apache.hc.core5.util.TimeValue;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83 abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
84
85 private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor", true);
86
87 private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
88 private final AsyncExecChainElement execChain;
89 private final Lookup<CookieSpecFactory> cookieSpecRegistry;
90 private final Lookup<AuthSchemeFactory> authSchemeRegistry;
91 private final CookieStore cookieStore;
92 private final CredentialsProvider credentialsProvider;
93 private final RequestConfig defaultConfig;
94 private final ConcurrentLinkedQueue<Closeable> closeables;
95 private final ScheduledExecutorService scheduledExecutorService;
96
97 InternalAbstractHttpAsyncClient(
98 final DefaultConnectingIOReactor ioReactor,
99 final AsyncPushConsumerRegistry pushConsumerRegistry,
100 final ThreadFactory threadFactory,
101 final AsyncExecChainElement execChain,
102 final Lookup<CookieSpecFactory> cookieSpecRegistry,
103 final Lookup<AuthSchemeFactory> authSchemeRegistry,
104 final CookieStore cookieStore,
105 final CredentialsProvider credentialsProvider,
106 final RequestConfig defaultConfig,
107 final List<Closeable> closeables) {
108 super(ioReactor, pushConsumerRegistry, threadFactory);
109 this.execChain = execChain;
110 this.cookieSpecRegistry = cookieSpecRegistry;
111 this.authSchemeRegistry = authSchemeRegistry;
112 this.cookieStore = cookieStore;
113 this.credentialsProvider = credentialsProvider;
114 this.defaultConfig = defaultConfig;
115 this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
116 this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
117 }
118
119 @Override
120 void internalClose(final CloseMode closeMode) {
121 if (this.closeables != null) {
122 Closeable closeable;
123 while ((closeable = this.closeables.poll()) != null) {
124 try {
125 if (closeable instanceof ModalCloseable) {
126 ((ModalCloseable) closeable).close(closeMode);
127 } else {
128 closeable.close();
129 }
130 } catch (final IOException ex) {
131 LOG.error(ex.getMessage(), ex);
132 }
133 }
134 }
135 final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
136 for (final Runnable runnable: runnables) {
137 if (runnable instanceof Cancellable) {
138 ((Cancellable) runnable).cancel();
139 }
140 }
141 }
142
143 private void setupContext(final HttpClientContext context) {
144 if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
145 context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
146 }
147 if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
148 context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
149 }
150 if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
151 context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
152 }
153 if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
154 context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
155 }
156 if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
157 context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
158 }
159 }
160
161 abstract AsyncExecRuntime createAsyncExecRuntime(
162 HandlerFactory<AsyncPushConsumer> pushHandlerFactory, HttpRoute route);
163
164 abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
165
166 @Override
167 protected <T> Future<T> doExecute(
168 final HttpHost httpHost,
169 final AsyncRequestProducer requestProducer,
170 final AsyncResponseConsumer<T> responseConsumer,
171 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
172 final HttpContext context,
173 final FutureCallback<T> callback) {
174 final ComplexFuture<T> future = new ComplexFuture<>(callback);
175 try {
176 if (!isRunning()) {
177 throw new CancellationException("Request execution cancelled");
178 }
179 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
180 requestProducer.sendRequest(new RequestChannel() {
181
182 @Override
183 public void sendRequest(
184 final HttpRequest request,
185 final EntityDetails entityDetails,
186 final HttpContext context) throws HttpException, IOException {
187
188 RequestConfig requestConfig = null;
189 if (request instanceof Configurable) {
190 requestConfig = ((Configurable) request).getConfig();
191 }
192 if (requestConfig != null) {
193 clientContext.setRequestConfig(requestConfig);
194 }
195
196 setupContext(clientContext);
197
198 final HttpRoute route = determineRoute(
199 httpHost != null ? httpHost : RoutingSupport.determineHost(request),
200 clientContext);
201 final String exchangeId = ExecSupport.getNextExchangeId();
202 clientContext.setExchangeId(exchangeId);
203 if (LOG.isDebugEnabled()) {
204 LOG.debug("{} preparing request execution", exchangeId);
205 }
206 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory, route);
207
208 final AsyncExecChain.Scheduler scheduler = new AsyncExecChain.Scheduler() {
209
210 @Override
211 public void scheduleExecution(final HttpRequest request,
212 final AsyncEntityProducer entityProducer,
213 final AsyncExecChain.Scope scope,
214 final AsyncExecCallback asyncExecCallback,
215 final TimeValue delay) {
216 executeScheduled(request, entityProducer, scope, asyncExecCallback, delay);
217 }
218
219 };
220
221 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
222 clientContext, execRuntime, scheduler, new AtomicInteger(1));
223 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
224 executeImmediate(
225 BasicRequestBuilder.copy(request).build(),
226 entityDetails != null ? new AsyncEntityProducer() {
227
228 @Override
229 public void releaseResources() {
230 requestProducer.releaseResources();
231 }
232
233 @Override
234 public void failed(final Exception cause) {
235 requestProducer.failed(cause);
236 }
237
238 @Override
239 public boolean isRepeatable() {
240 return requestProducer.isRepeatable();
241 }
242
243 @Override
244 public long getContentLength() {
245 return entityDetails.getContentLength();
246 }
247
248 @Override
249 public String getContentType() {
250 return entityDetails.getContentType();
251 }
252
253 @Override
254 public String getContentEncoding() {
255 return entityDetails.getContentEncoding();
256 }
257
258 @Override
259 public boolean isChunked() {
260 return entityDetails.isChunked();
261 }
262
263 @Override
264 public Set<String> getTrailerNames() {
265 return entityDetails.getTrailerNames();
266 }
267
268 @Override
269 public int available() {
270 return requestProducer.available();
271 }
272
273 @Override
274 public void produce(final DataStreamChannel channel) throws IOException {
275 if (outputTerminated.get()) {
276 channel.endStream();
277 return;
278 }
279 requestProducer.produce(channel);
280 }
281
282 } : null,
283 scope,
284 new AsyncExecCallback() {
285
286 @Override
287 public AsyncDataConsumer handleResponse(
288 final HttpResponse response,
289 final EntityDetails entityDetails) throws HttpException, IOException {
290 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
291 outputTerminated.set(true);
292 requestProducer.releaseResources();
293 }
294 responseConsumer.consumeResponse(response, entityDetails, context,
295 new FutureCallback<T>() {
296
297 @Override
298 public void completed(final T result) {
299 future.completed(result);
300 }
301
302 @Override
303 public void failed(final Exception ex) {
304 future.failed(ex);
305 }
306
307 @Override
308 public void cancelled() {
309 future.cancel();
310 }
311
312 });
313 return entityDetails != null ? responseConsumer : null;
314 }
315
316 @Override
317 public void handleInformationResponse(
318 final HttpResponse response) throws HttpException, IOException {
319 responseConsumer.informationResponse(response, context);
320 }
321
322 @Override
323 public void completed() {
324 if (LOG.isDebugEnabled()) {
325 LOG.debug("{} message exchange successfully completed", exchangeId);
326 }
327 try {
328 execRuntime.releaseEndpoint();
329 } finally {
330 responseConsumer.releaseResources();
331 requestProducer.releaseResources();
332 }
333 }
334
335 @Override
336 public void failed(final Exception cause) {
337 if (LOG.isDebugEnabled()) {
338 LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
339 }
340 try {
341 execRuntime.discardEndpoint();
342 responseConsumer.failed(cause);
343 } finally {
344 try {
345 future.failed(cause);
346 } finally {
347 responseConsumer.releaseResources();
348 requestProducer.releaseResources();
349 }
350 }
351 }
352
353 });
354 }
355
356 }, context);
357 } catch (final HttpException | IOException | IllegalStateException ex) {
358 future.failed(ex);
359 }
360 return future;
361 }
362
363 void executeImmediate(
364 final HttpRequest request,
365 final AsyncEntityProducer entityProducer,
366 final AsyncExecChain.Scope scope,
367 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
368 execChain.execute(request, entityProducer, scope, asyncExecCallback);
369 }
370
371 void executeScheduled(
372 final HttpRequest request,
373 final AsyncEntityProducer entityProducer,
374 final AsyncExecChain.Scope scope,
375 final AsyncExecCallback asyncExecCallback,
376 final TimeValue delay) {
377 final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
378 request, entityProducer, scope, asyncExecCallback, delay);
379 if (TimeValue.isPositive(delay)) {
380 scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
381 } else {
382 scheduledExecutorService.execute(scheduledTask);
383 }
384 }
385
386 class ScheduledRequestExecution implements Runnable, Cancellable {
387
388 final HttpRequest request;
389 final AsyncEntityProducer entityProducer;
390 final AsyncExecChain.Scope scope;
391 final AsyncExecCallback asyncExecCallback;
392 final TimeValue delay;
393
394 ScheduledRequestExecution(final HttpRequest request,
395 final AsyncEntityProducer entityProducer,
396 final AsyncExecChain.Scope scope,
397 final AsyncExecCallback asyncExecCallback,
398 final TimeValue delay) {
399 this.request = request;
400 this.entityProducer = entityProducer;
401 this.scope = scope;
402 this.asyncExecCallback = asyncExecCallback;
403 this.delay = delay;
404 }
405
406 @Override
407 public void run() {
408 try {
409 execChain.execute(request, entityProducer, scope, asyncExecCallback);
410 } catch (final Exception ex) {
411 asyncExecCallback.failed(ex);
412 }
413 }
414
415 @Override
416 public boolean cancel() {
417 asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
418 return true;
419 }
420
421 }
422
423 }