1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32
33 import org.apache.hc.client5.http.AuthenticationStrategy;
34 import org.apache.hc.client5.http.HttpRoute;
35 import org.apache.hc.client5.http.RouteTracker;
36 import org.apache.hc.client5.http.async.AsyncExecCallback;
37 import org.apache.hc.client5.http.async.AsyncExecChain;
38 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
39 import org.apache.hc.client5.http.async.AsyncExecRuntime;
40 import org.apache.hc.client5.http.auth.AuthExchange;
41 import org.apache.hc.client5.http.auth.ChallengeType;
42 import org.apache.hc.client5.http.config.RequestConfig;
43 import org.apache.hc.client5.http.impl.TunnelRefusedException;
44 import org.apache.hc.client5.http.impl.auth.HttpAuthenticator;
45 import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
46 import org.apache.hc.client5.http.protocol.HttpClientContext;
47 import org.apache.hc.client5.http.routing.HttpRouteDirector;
48 import org.apache.hc.core5.annotation.Contract;
49 import org.apache.hc.core5.annotation.Internal;
50 import org.apache.hc.core5.annotation.ThreadingBehavior;
51 import org.apache.hc.core5.concurrent.CancellableDependency;
52 import org.apache.hc.core5.concurrent.FutureCallback;
53 import org.apache.hc.core5.http.EntityDetails;
54 import org.apache.hc.core5.http.HttpException;
55 import org.apache.hc.core5.http.HttpHost;
56 import org.apache.hc.core5.http.HttpRequest;
57 import org.apache.hc.core5.http.HttpResponse;
58 import org.apache.hc.core5.http.HttpStatus;
59 import org.apache.hc.core5.http.HttpVersion;
60 import org.apache.hc.core5.http.message.BasicHttpRequest;
61 import org.apache.hc.core5.http.message.StatusLine;
62 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
63 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
64 import org.apache.hc.core5.http.protocol.HttpCoreContext;
65 import org.apache.hc.core5.http.protocol.HttpProcessor;
66 import org.apache.hc.core5.util.Args;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70
71
72
73
74
75
76
77 @Contract(threading = ThreadingBehavior.STATELESS)
78 @Internal
79 public final class AsyncConnectExec implements AsyncExecChainHandler {
80
81 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectExec.class);
82
83 private final HttpProcessor proxyHttpProcessor;
84 private final AuthenticationStrategy proxyAuthStrategy;
85 private final HttpAuthenticator authenticator;
86 private final HttpRouteDirector routeDirector;
87
88 public AsyncConnectExec(
89 final HttpProcessor proxyHttpProcessor,
90 final AuthenticationStrategy proxyAuthStrategy) {
91 Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
92 Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
93 this.proxyHttpProcessor = proxyHttpProcessor;
94 this.proxyAuthStrategy = proxyAuthStrategy;
95 this.authenticator = new HttpAuthenticator(LOG);
96 this.routeDirector = new BasicRouteDirector();
97 }
98
99 static class State {
100
101 State(final HttpRoute route) {
102 tracker = new RouteTracker(route);
103 }
104
105 final RouteTracker tracker;
106
107 volatile boolean challenged;
108 volatile boolean tunnelRefused;
109
110 }
111
112 @Override
113 public void execute(
114 final HttpRequest request,
115 final AsyncEntityProducer entityProducer,
116 final AsyncExecChain.Scope scope,
117 final AsyncExecChain chain,
118 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
119 Args.notNull(request, "HTTP request");
120 Args.notNull(scope, "Scope");
121
122 final String exchangeId = scope.exchangeId;
123 final HttpRoute route = scope.route;
124 final CancellableDependency cancellableDependency = scope.cancellableDependency;
125 final HttpClientContext clientContext = scope.clientContext;
126 final AsyncExecRuntime execRuntime = scope.execRuntime;
127 final State state = new State(route);
128
129 if (!execRuntime.isEndpointAcquired()) {
130 final Object userToken = clientContext.getUserToken();
131 if (LOG.isDebugEnabled()) {
132 LOG.debug("{}: acquiring connection with route {}", exchangeId, route);
133 }
134 cancellableDependency.setDependency(execRuntime.acquireEndpoint(
135 exchangeId, route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
136
137 @Override
138 public void completed(final AsyncExecRuntime execRuntime) {
139 if (execRuntime.isEndpointConnected()) {
140 try {
141 chain.proceed(request, entityProducer, scope, asyncExecCallback);
142 } catch (final HttpException | IOException ex) {
143 asyncExecCallback.failed(ex);
144 }
145 } else {
146 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
147 }
148 }
149
150 @Override
151 public void failed(final Exception ex) {
152 asyncExecCallback.failed(ex);
153 }
154
155 @Override
156 public void cancelled() {
157 asyncExecCallback.failed(new InterruptedIOException());
158 }
159
160 }));
161 } else {
162 if (execRuntime.isEndpointConnected()) {
163 try {
164 chain.proceed(request, entityProducer, scope, asyncExecCallback);
165 } catch (final HttpException | IOException ex) {
166 asyncExecCallback.failed(ex);
167 }
168 } else {
169 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
170 }
171 }
172
173 }
174
175 private void proceedToNextHop(
176 final State state,
177 final HttpRequest request,
178 final AsyncEntityProducer entityProducer,
179 final AsyncExecChain.Scope scope,
180 final AsyncExecChain chain,
181 final AsyncExecCallback asyncExecCallback) {
182 final RouteTracker tracker = state.tracker;
183 final String exchangeId = scope.exchangeId;
184 final HttpRoute route = scope.route;
185 final AsyncExecRuntime execRuntime = scope.execRuntime;
186 final CancellableDependency operation = scope.cancellableDependency;
187 final HttpClientContext clientContext = scope.clientContext;
188
189 int step;
190 do {
191 final HttpRoute fact = tracker.toRoute();
192 step = routeDirector.nextStep(route, fact);
193 switch (step) {
194 case HttpRouteDirector.CONNECT_TARGET:
195 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
196
197 @Override
198 public void completed(final AsyncExecRuntime execRuntime) {
199 tracker.connectTarget(route.isSecure());
200 if (LOG.isDebugEnabled()) {
201 LOG.debug("{}: connected to target", exchangeId);
202 }
203 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
204 }
205
206 @Override
207 public void failed(final Exception ex) {
208 asyncExecCallback.failed(ex);
209 }
210
211 @Override
212 public void cancelled() {
213 asyncExecCallback.failed(new InterruptedIOException());
214 }
215
216 }));
217 return;
218
219 case HttpRouteDirector.CONNECT_PROXY:
220 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
221
222 @Override
223 public void completed(final AsyncExecRuntime execRuntime) {
224 final HttpHost proxy = route.getProxyHost();
225 tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("{}: connected to proxy", exchangeId);
228 }
229 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
230 }
231
232 @Override
233 public void failed(final Exception ex) {
234 asyncExecCallback.failed(ex);
235 }
236
237 @Override
238 public void cancelled() {
239 asyncExecCallback.failed(new InterruptedIOException());
240 }
241
242 }));
243 return;
244
245 case HttpRouteDirector.TUNNEL_TARGET:
246 try {
247 final HttpHost proxy = route.getProxyHost();
248 final HttpHost target = route.getTargetHost();
249 createTunnel(state, proxy ,target, scope, chain, new AsyncExecCallback() {
250
251 @Override
252 public AsyncDataConsumer handleResponse(
253 final HttpResponse response,
254 final EntityDetails entityDetails) throws HttpException, IOException {
255 return asyncExecCallback.handleResponse(response, entityDetails);
256 }
257
258 @Override
259 public void handleInformationResponse(
260 final HttpResponse response) throws HttpException, IOException {
261 asyncExecCallback.handleInformationResponse(response);
262 }
263
264 @Override
265 public void completed() {
266 if (LOG.isDebugEnabled()) {
267 LOG.debug("{}: tunnel to target created", exchangeId);
268 }
269 tracker.tunnelTarget(false);
270 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
271 }
272
273 @Override
274 public void failed(final Exception cause) {
275 asyncExecCallback.failed(cause);
276 }
277
278 });
279 } catch (final HttpException | IOException ex) {
280 asyncExecCallback.failed(ex);
281 }
282 return;
283
284 case HttpRouteDirector.TUNNEL_PROXY:
285
286
287
288
289 asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
290 return;
291
292 case HttpRouteDirector.LAYER_PROTOCOL:
293 execRuntime.upgradeTls(clientContext);
294 if (LOG.isDebugEnabled()) {
295 LOG.debug("{}: upgraded to TLS", exchangeId);
296 }
297 tracker.layerProtocol(route.isSecure());
298 break;
299
300 case HttpRouteDirector.UNREACHABLE:
301 asyncExecCallback.failed(new HttpException("Unable to establish route: " +
302 "planned = " + route + "; current = " + fact));
303 return;
304
305 case HttpRouteDirector.COMPLETE:
306 if (LOG.isDebugEnabled()) {
307 LOG.debug("{}: route fully established", exchangeId);
308 }
309 try {
310 chain.proceed(request, entityProducer, scope, asyncExecCallback);
311 } catch (final HttpException | IOException ex) {
312 asyncExecCallback.failed(ex);
313 }
314 break;
315
316 default:
317 throw new IllegalStateException("Unknown step indicator " + step + " from RouteDirector.");
318 }
319 } while (step > HttpRouteDirector.COMPLETE);
320 }
321
322 private void createTunnel(
323 final State state,
324 final HttpHost proxy,
325 final HttpHost nextHop,
326 final AsyncExecChain.Scope scope,
327 final AsyncExecChain chain,
328 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
329
330 final AsyncExecRuntime execRuntime = scope.execRuntime;
331 final HttpClientContext clientContext = scope.clientContext;
332
333 final AuthExchange proxyAuthExchange = proxy != AuthExchange_keyword">null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
334
335 final HttpRequest connect = new BasicHttpRequest("CONNECT", nextHop, nextHop.toHostString());
336 connect.setVersion(HttpVersion.HTTP_1_1);
337
338 proxyHttpProcessor.process(connect, null, clientContext);
339 authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
340
341 chain.proceed(connect, null, scope, new AsyncExecCallback() {
342
343 @Override
344 public AsyncDataConsumer handleResponse(
345 final HttpResponse response,
346 final EntityDetails entityDetails) throws HttpException, IOException {
347
348 clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
349 proxyHttpProcessor.process(response, entityDetails, clientContext);
350
351 final int status = response.getCode();
352 if (status < HttpStatus.SC_SUCCESS) {
353 throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
354 }
355
356 if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
357 state.challenged = true;
358 return null;
359 }
360 state.challenged = false;
361 if (status >= HttpStatus.SC_REDIRECTION) {
362 state.tunnelRefused = true;
363 return asyncExecCallback.handleResponse(response, entityDetails);
364 }
365 return null;
366 }
367
368 @Override
369 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
370 }
371
372 @Override
373 public void completed() {
374 if (!execRuntime.isEndpointConnected()) {
375 state.tracker.reset();
376 }
377 if (state.challenged) {
378 try {
379 createTunnel(state, proxy, nextHop, scope, chain, asyncExecCallback);
380 } catch (final HttpException | IOException ex) {
381 asyncExecCallback.failed(ex);
382 }
383 } else {
384 if (state.tunnelRefused) {
385 asyncExecCallback.failed(new TunnelRefusedException("Tunnel refused", null));
386 } else {
387 asyncExecCallback.completed();
388 }
389 }
390 }
391
392 @Override
393 public void failed(final Exception cause) {
394 asyncExecCallback.failed(cause);
395 }
396
397 });
398
399 }
400
401 private boolean needAuthentication(
402 final AuthExchange proxyAuthExchange,
403 final HttpHost proxy,
404 final HttpResponse response,
405 final HttpClientContext context) {
406 final RequestConfig config = context.getRequestConfig();
407 if (config.isAuthenticationEnabled()) {
408 final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
409 if (proxyAuthRequested) {
410 return authenticator.updateAuthState(proxy, ChallengeType.PROXY, response,
411 proxyAuthStrategy, proxyAuthExchange, context);
412 }
413 }
414 return false;
415 }
416
417 }