1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32
33 import org.apache.hc.client5.http.AuthenticationStrategy;
34 import org.apache.hc.client5.http.HttpRoute;
35 import org.apache.hc.client5.http.RouteTracker;
36 import org.apache.hc.client5.http.async.AsyncExecCallback;
37 import org.apache.hc.client5.http.async.AsyncExecChain;
38 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
39 import org.apache.hc.client5.http.async.AsyncExecRuntime;
40 import org.apache.hc.client5.http.auth.AuthExchange;
41 import org.apache.hc.client5.http.auth.ChallengeType;
42 import org.apache.hc.client5.http.config.RequestConfig;
43 import org.apache.hc.client5.http.impl.TunnelRefusedException;
44 import org.apache.hc.client5.http.impl.auth.HttpAuthenticator;
45 import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
46 import org.apache.hc.client5.http.protocol.HttpClientContext;
47 import org.apache.hc.client5.http.routing.HttpRouteDirector;
48 import org.apache.hc.core5.annotation.Contract;
49 import org.apache.hc.core5.annotation.Internal;
50 import org.apache.hc.core5.annotation.ThreadingBehavior;
51 import org.apache.hc.core5.concurrent.CancellableDependency;
52 import org.apache.hc.core5.concurrent.FutureCallback;
53 import org.apache.hc.core5.http.EntityDetails;
54 import org.apache.hc.core5.http.HttpException;
55 import org.apache.hc.core5.http.HttpHost;
56 import org.apache.hc.core5.http.HttpRequest;
57 import org.apache.hc.core5.http.HttpResponse;
58 import org.apache.hc.core5.http.HttpStatus;
59 import org.apache.hc.core5.http.HttpVersion;
60 import org.apache.hc.core5.http.Method;
61 import org.apache.hc.core5.http.message.BasicHttpRequest;
62 import org.apache.hc.core5.http.message.StatusLine;
63 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
64 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
65 import org.apache.hc.core5.http.protocol.HttpCoreContext;
66 import org.apache.hc.core5.http.protocol.HttpProcessor;
67 import org.apache.hc.core5.util.Args;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71
72
73
74
75
76
77
78 @Contract(threading = ThreadingBehavior.STATELESS)
79 @Internal
80 public final class AsyncConnectExec implements AsyncExecChainHandler {
81
82 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectExec.class);
83
84 private final HttpProcessor proxyHttpProcessor;
85 private final AuthenticationStrategy proxyAuthStrategy;
86 private final HttpAuthenticator authenticator;
87 private final HttpRouteDirector routeDirector;
88
89 public AsyncConnectExec(
90 final HttpProcessor proxyHttpProcessor,
91 final AuthenticationStrategy proxyAuthStrategy) {
92 Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
93 Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
94 this.proxyHttpProcessor = proxyHttpProcessor;
95 this.proxyAuthStrategy = proxyAuthStrategy;
96 this.authenticator = new HttpAuthenticator(LOG);
97 this.routeDirector = new BasicRouteDirector();
98 }
99
100 static class State {
101
102 State(final HttpRoute route) {
103 tracker = new RouteTracker(route);
104 }
105
106 final RouteTracker tracker;
107
108 volatile boolean challenged;
109 volatile boolean tunnelRefused;
110
111 }
112
113 @Override
114 public void execute(
115 final HttpRequest request,
116 final AsyncEntityProducer entityProducer,
117 final AsyncExecChain.Scope scope,
118 final AsyncExecChain chain,
119 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
120 Args.notNull(request, "HTTP request");
121 Args.notNull(scope, "Scope");
122
123 final String exchangeId = scope.exchangeId;
124 final HttpRoute route = scope.route;
125 final CancellableDependency cancellableDependency = scope.cancellableDependency;
126 final HttpClientContext clientContext = scope.clientContext;
127 final AsyncExecRuntime execRuntime = scope.execRuntime;
128 final State state = new State(route);
129
130 if (!execRuntime.isEndpointAcquired()) {
131 final Object userToken = clientContext.getUserToken();
132 if (LOG.isDebugEnabled()) {
133 LOG.debug("{} acquiring connection with route {}", exchangeId, route);
134 }
135 cancellableDependency.setDependency(execRuntime.acquireEndpoint(
136 exchangeId, route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
137
138 @Override
139 public void completed(final AsyncExecRuntime execRuntime) {
140 if (execRuntime.isEndpointConnected()) {
141 try {
142 chain.proceed(request, entityProducer, scope, asyncExecCallback);
143 } catch (final HttpException | IOException ex) {
144 asyncExecCallback.failed(ex);
145 }
146 } else {
147 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
148 }
149 }
150
151 @Override
152 public void failed(final Exception ex) {
153 asyncExecCallback.failed(ex);
154 }
155
156 @Override
157 public void cancelled() {
158 asyncExecCallback.failed(new InterruptedIOException());
159 }
160
161 }));
162 } else {
163 if (execRuntime.isEndpointConnected()) {
164 try {
165 chain.proceed(request, entityProducer, scope, asyncExecCallback);
166 } catch (final HttpException | IOException ex) {
167 asyncExecCallback.failed(ex);
168 }
169 } else {
170 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
171 }
172 }
173
174 }
175
176 private void proceedToNextHop(
177 final State state,
178 final HttpRequest request,
179 final AsyncEntityProducer entityProducer,
180 final AsyncExecChain.Scope scope,
181 final AsyncExecChain chain,
182 final AsyncExecCallback asyncExecCallback) {
183 final RouteTracker tracker = state.tracker;
184 final String exchangeId = scope.exchangeId;
185 final HttpRoute route = scope.route;
186 final AsyncExecRuntime execRuntime = scope.execRuntime;
187 final CancellableDependency operation = scope.cancellableDependency;
188 final HttpClientContext clientContext = scope.clientContext;
189
190 int step;
191 do {
192 final HttpRoute fact = tracker.toRoute();
193 step = routeDirector.nextStep(route, fact);
194 switch (step) {
195 case HttpRouteDirector.CONNECT_TARGET:
196 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
197
198 @Override
199 public void completed(final AsyncExecRuntime execRuntime) {
200 tracker.connectTarget(route.isSecure());
201 if (LOG.isDebugEnabled()) {
202 LOG.debug("{} connected to target", exchangeId);
203 }
204 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
205 }
206
207 @Override
208 public void failed(final Exception ex) {
209 asyncExecCallback.failed(ex);
210 }
211
212 @Override
213 public void cancelled() {
214 asyncExecCallback.failed(new InterruptedIOException());
215 }
216
217 }));
218 return;
219
220 case HttpRouteDirector.CONNECT_PROXY:
221 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
222
223 @Override
224 public void completed(final AsyncExecRuntime execRuntime) {
225 final HttpHost proxy = route.getProxyHost();
226 tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
227 if (LOG.isDebugEnabled()) {
228 LOG.debug("{} connected to proxy", exchangeId);
229 }
230 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
231 }
232
233 @Override
234 public void failed(final Exception ex) {
235 asyncExecCallback.failed(ex);
236 }
237
238 @Override
239 public void cancelled() {
240 asyncExecCallback.failed(new InterruptedIOException());
241 }
242
243 }));
244 return;
245
246 case HttpRouteDirector.TUNNEL_TARGET:
247 try {
248 final HttpHost proxy = route.getProxyHost();
249 final HttpHost target = route.getTargetHost();
250 if (LOG.isDebugEnabled()) {
251 LOG.debug("{} create tunnel", exchangeId);
252 }
253 createTunnel(state, proxy, target, scope, chain, new AsyncExecCallback() {
254
255 @Override
256 public AsyncDataConsumer handleResponse(
257 final HttpResponse response,
258 final EntityDetails entityDetails) throws HttpException, IOException {
259 return asyncExecCallback.handleResponse(response, entityDetails);
260 }
261
262 @Override
263 public void handleInformationResponse(
264 final HttpResponse response) throws HttpException, IOException {
265 asyncExecCallback.handleInformationResponse(response);
266 }
267
268 @Override
269 public void completed() {
270 if (!execRuntime.isEndpointConnected()) {
271
272 if (LOG.isDebugEnabled()) {
273 LOG.debug("{} proxy disconnected", exchangeId);
274 }
275 state.tracker.reset();
276 }
277 if (state.challenged) {
278 if (LOG.isDebugEnabled()) {
279 LOG.debug("{} proxy authentication required", exchangeId);
280 }
281 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
282 } else {
283 if (state.tunnelRefused) {
284 if (LOG.isDebugEnabled()) {
285 LOG.debug("{} tunnel refused", exchangeId);
286 }
287 asyncExecCallback.failed(new TunnelRefusedException("Tunnel refused", null));
288 } else {
289 if (LOG.isDebugEnabled()) {
290 LOG.debug("{} tunnel to target created", exchangeId);
291 }
292 tracker.tunnelTarget(false);
293 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
294 }
295 }
296 }
297
298 @Override
299 public void failed(final Exception cause) {
300 asyncExecCallback.failed(cause);
301 }
302
303 });
304 } catch (final HttpException | IOException ex) {
305 asyncExecCallback.failed(ex);
306 }
307 return;
308
309 case HttpRouteDirector.TUNNEL_PROXY:
310
311
312
313
314 asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
315 return;
316
317 case HttpRouteDirector.LAYER_PROTOCOL:
318 execRuntime.upgradeTls(clientContext);
319 if (LOG.isDebugEnabled()) {
320 LOG.debug("{} upgraded to TLS", exchangeId);
321 }
322 tracker.layerProtocol(route.isSecure());
323 break;
324
325 case HttpRouteDirector.UNREACHABLE:
326 asyncExecCallback.failed(new HttpException("Unable to establish route: " +
327 "planned = " + route + "; current = " + fact));
328 return;
329
330 case HttpRouteDirector.COMPLETE:
331 if (LOG.isDebugEnabled()) {
332 LOG.debug("{} route fully established", exchangeId);
333 }
334 try {
335 chain.proceed(request, entityProducer, scope, asyncExecCallback);
336 } catch (final HttpException | IOException ex) {
337 asyncExecCallback.failed(ex);
338 }
339 break;
340
341 default:
342 throw new IllegalStateException("Unknown step indicator " + step + " from RouteDirector.");
343 }
344 } while (step > HttpRouteDirector.COMPLETE);
345 }
346
347 private void createTunnel(
348 final State state,
349 final HttpHost proxy,
350 final HttpHost nextHop,
351 final AsyncExecChain.Scope scope,
352 final AsyncExecChain chain,
353 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
354
355 final AsyncExecRuntime execRuntime = scope.execRuntime;
356 final HttpClientContext clientContext = scope.clientContext;
357
358 final AuthExchange proxyAuthExchange = proxy != AuthExchange_keyword">null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
359
360 final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
361 connect.setVersion(HttpVersion.HTTP_1_1);
362
363 proxyHttpProcessor.process(connect, null, clientContext);
364 authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
365
366 chain.proceed(connect, null, scope, new AsyncExecCallback() {
367
368 @Override
369 public AsyncDataConsumer handleResponse(
370 final HttpResponse response,
371 final EntityDetails entityDetails) throws HttpException, IOException {
372
373 clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
374 proxyHttpProcessor.process(response, entityDetails, clientContext);
375
376 final int status = response.getCode();
377 if (status < HttpStatus.SC_SUCCESS) {
378 throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
379 }
380
381 if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
382 state.challenged = true;
383 return null;
384 }
385 state.challenged = false;
386 if (status >= HttpStatus.SC_REDIRECTION) {
387 state.tunnelRefused = true;
388 return asyncExecCallback.handleResponse(response, entityDetails);
389 }
390 return null;
391 }
392
393 @Override
394 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
395 }
396
397 @Override
398 public void completed() {
399 asyncExecCallback.completed();
400 }
401
402 @Override
403 public void failed(final Exception cause) {
404 asyncExecCallback.failed(cause);
405 }
406
407 });
408
409 }
410
411 private boolean needAuthentication(
412 final AuthExchange proxyAuthExchange,
413 final HttpHost proxy,
414 final HttpResponse response,
415 final HttpClientContext context) {
416 final RequestConfig config = context.getRequestConfig();
417 if (config.isAuthenticationEnabled()) {
418 final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
419 if (proxyAuthRequested) {
420 return authenticator.updateAuthState(proxy, ChallengeType.PROXY, response,
421 proxyAuthStrategy, proxyAuthExchange, context);
422 }
423 }
424 return false;
425 }
426
427 }