1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.bootstrap;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.hc.core5.annotation.Internal;
38 import org.apache.hc.core5.concurrent.BasicFuture;
39 import org.apache.hc.core5.concurrent.ComplexFuture;
40 import org.apache.hc.core5.concurrent.FutureCallback;
41 import org.apache.hc.core5.function.Callback;
42 import org.apache.hc.core5.function.Decorator;
43 import org.apache.hc.core5.http.ConnectionClosedException;
44 import org.apache.hc.core5.http.EntityDetails;
45 import org.apache.hc.core5.http.Header;
46 import org.apache.hc.core5.http.HttpConnection;
47 import org.apache.hc.core5.http.HttpException;
48 import org.apache.hc.core5.http.HttpHost;
49 import org.apache.hc.core5.http.HttpRequest;
50 import org.apache.hc.core5.http.HttpResponse;
51 import org.apache.hc.core5.http.ProtocolException;
52 import org.apache.hc.core5.http.impl.DefaultAddressResolver;
53 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
54 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
55 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
56 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
57 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
58 import org.apache.hc.core5.http.nio.CapacityChannel;
59 import org.apache.hc.core5.http.nio.DataStreamChannel;
60 import org.apache.hc.core5.http.nio.HandlerFactory;
61 import org.apache.hc.core5.http.nio.RequestChannel;
62 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
63 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
64 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
65 import org.apache.hc.core5.http.protocol.HttpContext;
66 import org.apache.hc.core5.http.protocol.HttpCoreContext;
67 import org.apache.hc.core5.io.CloseMode;
68 import org.apache.hc.core5.net.URIAuthority;
69 import org.apache.hc.core5.pool.ConnPoolControl;
70 import org.apache.hc.core5.pool.ManagedConnPool;
71 import org.apache.hc.core5.pool.PoolEntry;
72 import org.apache.hc.core5.pool.PoolStats;
73 import org.apache.hc.core5.reactor.Command;
74 import org.apache.hc.core5.reactor.IOEventHandler;
75 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
76 import org.apache.hc.core5.reactor.IOReactorConfig;
77 import org.apache.hc.core5.reactor.IOSession;
78 import org.apache.hc.core5.reactor.IOSessionListener;
79 import org.apache.hc.core5.util.Args;
80 import org.apache.hc.core5.util.TimeValue;
81 import org.apache.hc.core5.util.Timeout;
82
83
84
85
86
87
88 public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
89
90 private final ManagedConnPool<HttpHost, IOSession> connPool;
91
92
93
94
95 @Internal
96 public HttpAsyncRequester(
97 final IOReactorConfig ioReactorConfig,
98 final IOEventHandlerFactory eventHandlerFactory,
99 final Decorator<IOSession> ioSessionDecorator,
100 final Callback<Exception> exceptionCallback,
101 final IOSessionListener sessionListener,
102 final ManagedConnPool<HttpHost, IOSession> connPool) {
103 super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
104 ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
105 this.connPool = Args.notNull(connPool, "Connection pool");
106 }
107
108 @Override
109 public PoolStats getTotalStats() {
110 return connPool.getTotalStats();
111 }
112
113 @Override
114 public PoolStats getStats(final HttpHost route) {
115 return connPool.getStats(route);
116 }
117
118 @Override
119 public void setMaxTotal(final int max) {
120 connPool.setMaxTotal(max);
121 }
122
123 @Override
124 public int getMaxTotal() {
125 return connPool.getMaxTotal();
126 }
127
128 @Override
129 public void setDefaultMaxPerRoute(final int max) {
130 connPool.setDefaultMaxPerRoute(max);
131 }
132
133 @Override
134 public int getDefaultMaxPerRoute() {
135 return connPool.getDefaultMaxPerRoute();
136 }
137
138 @Override
139 public void setMaxPerRoute(final HttpHost route, final int max) {
140 connPool.setMaxPerRoute(route, max);
141 }
142
143 @Override
144 public int getMaxPerRoute(final HttpHost route) {
145 return connPool.getMaxPerRoute(route);
146 }
147
148 @Override
149 public void closeIdle(final TimeValue idleTime) {
150 connPool.closeIdle(idleTime);
151 }
152
153 @Override
154 public void closeExpired() {
155 connPool.closeExpired();
156 }
157
158 @Override
159 public Set<HttpHost> getRoutes() {
160 return connPool.getRoutes();
161 }
162
163 public Future<AsyncClientEndpoint> connect(
164 final HttpHost host,
165 final Timeout timeout,
166 final Object attachment,
167 final FutureCallback<AsyncClientEndpoint> callback) {
168 return doConnect(host, timeout, attachment, callback);
169 }
170
171 protected Future<AsyncClientEndpoint> doConnect(
172 final HttpHost host,
173 final Timeout timeout,
174 final Object attachment,
175 final FutureCallback<AsyncClientEndpoint> callback) {
176 Args.notNull(host, "Host");
177 Args.notNull(timeout, "Timeout");
178 final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
179 final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
180 host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
181
182 @Override
183 public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
184 final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
185 final IOSession ioSession = poolEntry.getConnection();
186 if (ioSession != null && !ioSession.isOpen()) {
187 poolEntry.discardConnection(CloseMode.IMMEDIATE);
188 }
189 if (poolEntry.hasConnection()) {
190 resultFuture.completed(endpoint);
191 } else {
192 final Future<IOSession> futute = requestSession(host, timeout, attachment, new FutureCallback<IOSession>() {
193
194 @Override
195 public void completed(final IOSession session) {
196 session.setSocketTimeout(timeout);
197 poolEntry.assignConnection(session);
198 resultFuture.completed(endpoint);
199 }
200
201 @Override
202 public void failed(final Exception cause) {
203 try {
204 resultFuture.failed(cause);
205 } finally {
206 endpoint.releaseAndDiscard();
207 }
208 }
209
210 @Override
211 public void cancelled() {
212 try {
213 resultFuture.cancel();
214 } finally {
215 endpoint.releaseAndDiscard();
216 }
217 }
218
219 });
220 resultFuture.setDependency(futute);
221 }
222 }
223
224 @Override
225 public void failed(final Exception ex) {
226 resultFuture.failed(ex);
227 }
228
229 @Override
230 public void cancelled() {
231 resultFuture.cancel();
232 }
233
234 });
235 resultFuture.setDependency(leaseFuture);
236 return resultFuture;
237 }
238
239 public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) {
240 return connect(host, timeout, null, null);
241 }
242
243 public void execute(
244 final AsyncClientExchangeHandler exchangeHandler,
245 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
246 final Timeout timeout,
247 final HttpContext executeContext) {
248 Args.notNull(exchangeHandler, "Exchange handler");
249 Args.notNull(timeout, "Timeout");
250 Args.notNull(executeContext, "Context");
251 try {
252 exchangeHandler.produceRequest(new RequestChannel() {
253
254 @Override
255 public void sendRequest(
256 final HttpRequest request,
257 final EntityDetails entityDetails, final HttpContext requestContext) throws HttpException, IOException {
258 final String scheme = request.getScheme();
259 final URIAuthority authority = request.getAuthority();
260 if (authority == null) {
261 throw new ProtocolException("Request authority not specified");
262 }
263 final HttpHostttp/HttpHost.html#HttpHost">HttpHost target = new HttpHost(scheme, authority);
264 connect(target, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
265
266 @Override
267 public void completed(final AsyncClientEndpoint endpoint) {
268 endpoint.execute(new AsyncClientExchangeHandler() {
269
270 @Override
271 public void releaseResources() {
272 endpoint.releaseAndDiscard();
273 exchangeHandler.releaseResources();
274 }
275
276 @Override
277 public void failed(final Exception cause) {
278 endpoint.releaseAndDiscard();
279 exchangeHandler.failed(cause);
280 }
281
282 @Override
283 public void cancel() {
284 endpoint.releaseAndDiscard();
285 exchangeHandler.cancel();
286 }
287
288 @Override
289 public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
290 channel.sendRequest(request, entityDetails, httpContext);
291 }
292
293 @Override
294 public int available() {
295 return exchangeHandler.available();
296 }
297
298 @Override
299 public void produce(final DataStreamChannel channel) throws IOException {
300 exchangeHandler.produce(channel);
301 }
302
303 @Override
304 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
305 exchangeHandler.consumeInformation(response, httpContext);
306 }
307
308 @Override
309 public void consumeResponse(
310 final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
311 if (entityDetails == null) {
312 endpoint.releaseAndReuse();
313 }
314 exchangeHandler.consumeResponse(response, entityDetails, httpContext);
315 }
316
317 @Override
318 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
319 exchangeHandler.updateCapacity(capacityChannel);
320 }
321
322 @Override
323 public void consume(final ByteBuffer src) throws IOException {
324 exchangeHandler.consume(src);
325 }
326
327 @Override
328 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
329 endpoint.releaseAndReuse();
330 exchangeHandler.streamEnd(trailers);
331 }
332
333 }, pushHandlerFactory, executeContext);
334
335 }
336
337 @Override
338 public void failed(final Exception ex) {
339 exchangeHandler.failed(ex);
340 }
341
342 @Override
343 public void cancelled() {
344 exchangeHandler.cancel();
345 }
346
347 });
348
349 }
350
351 }, executeContext);
352
353 } catch (final IOException | HttpException ex) {
354 exchangeHandler.failed(ex);
355 }
356 }
357
358 public void execute(
359 final AsyncClientExchangeHandler exchangeHandler,
360 final Timeout timeout,
361 final HttpContext executeContext) {
362 execute(exchangeHandler, null, timeout, executeContext);
363 }
364
365 public final <T> Future<T> execute(
366 final AsyncRequestProducer requestProducer,
367 final AsyncResponseConsumer<T> responseConsumer,
368 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
369 final Timeout timeout,
370 final HttpContext context,
371 final FutureCallback<T> callback) {
372 Args.notNull(requestProducer, "Request producer");
373 Args.notNull(responseConsumer, "Response consumer");
374 Args.notNull(timeout, "Timeout");
375 final BasicFuture<T> future = new BasicFuture<>(callback);
376 final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
377
378 @Override
379 public void completed(final T result) {
380 future.completed(result);
381 }
382
383 @Override
384 public void failed(final Exception ex) {
385 future.failed(ex);
386 }
387
388 @Override
389 public void cancelled() {
390 future.cancel();
391 }
392
393 });
394 execute(exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
395 return future;
396 }
397
398 public final <T> Future<T> execute(
399 final AsyncRequestProducer requestProducer,
400 final AsyncResponseConsumer<T> responseConsumer,
401 final Timeout timeout,
402 final HttpContext context,
403 final FutureCallback<T> callback) {
404 return execute(requestProducer, responseConsumer, null, timeout, context, callback);
405 }
406
407 public final <T> Future<T> execute(
408 final AsyncRequestProducer requestProducer,
409 final AsyncResponseConsumer<T> responseConsumer,
410 final Timeout timeout,
411 final FutureCallback<T> callback) {
412 return execute(requestProducer, responseConsumer, null, timeout, null, callback);
413 }
414
415 private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
416
417 final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
418
419 InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
420 this.poolEntryRef = new AtomicReference<>(poolEntry);
421 }
422
423 @Override
424 public void execute(
425 final AsyncClientExchangeHandler exchangeHandler,
426 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
427 final HttpContext context) {
428 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
429 if (poolEntry == null) {
430 throw new IllegalStateException("Endpoint has already been released");
431 }
432 final IOSession ioSession = poolEntry.getConnection();
433 if (ioSession == null) {
434 throw new IllegalStateException("I/O session is invalid");
435 }
436 ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
437 if (!ioSession.isOpen()) {
438 try {
439 exchangeHandler.failed(new ConnectionClosedException());
440 } finally {
441 exchangeHandler.releaseResources();
442 }
443 }
444 }
445
446 @Override
447 public boolean isConnected() {
448 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
449 if (poolEntry != null) {
450 final IOSession ioSession = poolEntry.getConnection();
451 if (ioSession == null || !ioSession.isOpen()) {
452 return false;
453 }
454 final IOEventHandler handler = ioSession.getHandler();
455 return (handler instanceof HttpConnection../../../org/apache/hc/core5/http/HttpConnection.html#HttpConnection">HttpConnection) && ((HttpConnection) handler).isOpen();
456 }
457 return false;
458 }
459
460 @Override
461 public void releaseAndReuse() {
462 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
463 if (poolEntry != null) {
464 final IOSession ioSession = poolEntry.getConnection();
465 connPool.release(poolEntry, ioSession != null && ioSession.isOpen());
466 }
467 }
468
469 @Override
470 public void releaseAndDiscard() {
471 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
472 if (poolEntry != null) {
473 poolEntry.discardConnection(CloseMode.GRACEFUL);
474 connPool.release(poolEntry, false);
475 }
476 }
477
478 }
479
480 }