View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.ByteBuffer;
33  import java.util.List;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.hc.client5.http.AuthenticationStrategy;
37  import org.apache.hc.client5.http.HttpRoute;
38  import org.apache.hc.client5.http.RouteTracker;
39  import org.apache.hc.client5.http.SchemePortResolver;
40  import org.apache.hc.client5.http.async.AsyncExecCallback;
41  import org.apache.hc.client5.http.async.AsyncExecChain;
42  import org.apache.hc.client5.http.async.AsyncExecChainHandler;
43  import org.apache.hc.client5.http.async.AsyncExecRuntime;
44  import org.apache.hc.client5.http.auth.AuthExchange;
45  import org.apache.hc.client5.http.auth.ChallengeType;
46  import org.apache.hc.client5.http.config.RequestConfig;
47  import org.apache.hc.client5.http.impl.TunnelRefusedException;
48  import org.apache.hc.client5.http.impl.auth.AuthCacheKeeper;
49  import org.apache.hc.client5.http.impl.auth.HttpAuthenticator;
50  import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
51  import org.apache.hc.client5.http.protocol.HttpClientContext;
52  import org.apache.hc.client5.http.routing.HttpRouteDirector;
53  import org.apache.hc.core5.annotation.Contract;
54  import org.apache.hc.core5.annotation.Internal;
55  import org.apache.hc.core5.annotation.ThreadingBehavior;
56  import org.apache.hc.core5.concurrent.CancellableDependency;
57  import org.apache.hc.core5.concurrent.FutureCallback;
58  import org.apache.hc.core5.http.EntityDetails;
59  import org.apache.hc.core5.http.Header;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpRequest;
63  import org.apache.hc.core5.http.HttpResponse;
64  import org.apache.hc.core5.http.HttpStatus;
65  import org.apache.hc.core5.http.HttpVersion;
66  import org.apache.hc.core5.http.Method;
67  import org.apache.hc.core5.http.message.BasicHttpRequest;
68  import org.apache.hc.core5.http.message.StatusLine;
69  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
70  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
71  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
72  import org.apache.hc.core5.http.nio.CapacityChannel;
73  import org.apache.hc.core5.http.nio.DataStreamChannel;
74  import org.apache.hc.core5.http.nio.RequestChannel;
75  import org.apache.hc.core5.http.protocol.HttpContext;
76  import org.apache.hc.core5.http.protocol.HttpCoreContext;
77  import org.apache.hc.core5.http.protocol.HttpProcessor;
78  import org.apache.hc.core5.util.Args;
79  import org.slf4j.Logger;
80  import org.slf4j.LoggerFactory;
81  
82  /**
83   * Request execution handler in the asynchronous request execution chain
84   * that is responsible for establishing connection to the target
85   * origin server as specified by the current connection route.
86   *
87   * @since 5.0
88   */
89  @Contract(threading = ThreadingBehavior.STATELESS)
90  @Internal
91  public final class AsyncConnectExec implements AsyncExecChainHandler {
92  
93      private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectExec.class);
94  
95      private final HttpProcessor proxyHttpProcessor;
96      private final AuthenticationStrategy proxyAuthStrategy;
97      private final HttpAuthenticator authenticator;
98      private final AuthCacheKeeper authCacheKeeper;
99      private final HttpRouteDirector routeDirector;
100 
101     public AsyncConnectExec(
102             final HttpProcessor proxyHttpProcessor,
103             final AuthenticationStrategy proxyAuthStrategy,
104             final SchemePortResolver schemePortResolver,
105             final boolean authCachingDisabled) {
106         Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
107         Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
108         this.proxyHttpProcessor = proxyHttpProcessor;
109         this.proxyAuthStrategy  = proxyAuthStrategy;
110         this.authenticator = new HttpAuthenticator();
111         this.authCacheKeeper = authCachingDisabled ? null : new AuthCacheKeeper(schemePortResolver);
112         this.routeDirector = BasicRouteDirector.INSTANCE;
113     }
114 
115     static class State {
116 
117         State(final HttpRoute route) {
118             tracker = new RouteTracker(route);
119         }
120 
121         final RouteTracker tracker;
122 
123         volatile boolean challenged;
124         volatile boolean tunnelRefused;
125 
126     }
127 
128     @Override
129     public void execute(
130             final HttpRequest request,
131             final AsyncEntityProducer entityProducer,
132             final AsyncExecChain.Scope scope,
133             final AsyncExecChain chain,
134             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
135         Args.notNull(request, "HTTP request");
136         Args.notNull(scope, "Scope");
137 
138         final String exchangeId = scope.exchangeId;
139         final HttpRoute route = scope.route;
140         final CancellableDependency cancellableDependency = scope.cancellableDependency;
141         final HttpClientContext clientContext = scope.clientContext;
142         final AsyncExecRuntime execRuntime = scope.execRuntime;
143         final State state = new State(route);
144 
145         if (!execRuntime.isEndpointAcquired()) {
146             final Object userToken = clientContext.getUserToken();
147             if (LOG.isDebugEnabled()) {
148                 LOG.debug("{} acquiring connection with route {}", exchangeId, route);
149             }
150             cancellableDependency.setDependency(execRuntime.acquireEndpoint(
151                     exchangeId, route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
152 
153                         @Override
154                         public void completed(final AsyncExecRuntime execRuntime) {
155                             if (execRuntime.isEndpointConnected()) {
156                                 try {
157                                     chain.proceed(request, entityProducer, scope, asyncExecCallback);
158                                 } catch (final HttpException | IOException ex) {
159                                     asyncExecCallback.failed(ex);
160                                 }
161                             } else {
162                                 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
163                             }
164                         }
165 
166                         @Override
167                         public void failed(final Exception ex) {
168                             asyncExecCallback.failed(ex);
169                         }
170 
171                         @Override
172                         public void cancelled() {
173                             asyncExecCallback.failed(new InterruptedIOException());
174                         }
175 
176                     }));
177         } else {
178             if (execRuntime.isEndpointConnected()) {
179                 try {
180                     chain.proceed(request, entityProducer, scope, asyncExecCallback);
181                 } catch (final HttpException | IOException ex) {
182                     asyncExecCallback.failed(ex);
183                 }
184             } else {
185                 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
186             }
187         }
188 
189     }
190 
191     private void proceedToNextHop(
192             final State state,
193             final HttpRequest request,
194             final AsyncEntityProducer entityProducer,
195             final AsyncExecChain.Scope scope,
196             final AsyncExecChain chain,
197             final AsyncExecCallback asyncExecCallback) {
198         final RouteTracker tracker = state.tracker;
199         final String exchangeId = scope.exchangeId;
200         final HttpRoute route = scope.route;
201         final AsyncExecRuntime execRuntime = scope.execRuntime;
202         final CancellableDependency operation = scope.cancellableDependency;
203         final HttpClientContext clientContext = scope.clientContext;
204 
205         final HttpRoute fact = tracker.toRoute();
206         final int step = routeDirector.nextStep(route, fact);
207 
208         switch (step) {
209             case HttpRouteDirector.CONNECT_TARGET:
210                 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
211 
212                     @Override
213                     public void completed(final AsyncExecRuntime execRuntime) {
214                         tracker.connectTarget(route.isSecure());
215                         if (LOG.isDebugEnabled()) {
216                             LOG.debug("{} connected to target", exchangeId);
217                         }
218                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
219                     }
220 
221                     @Override
222                     public void failed(final Exception ex) {
223                         asyncExecCallback.failed(ex);
224                     }
225 
226                     @Override
227                     public void cancelled() {
228                         asyncExecCallback.failed(new InterruptedIOException());
229                     }
230 
231                 }));
232                 break;
233 
234             case HttpRouteDirector.CONNECT_PROXY:
235                 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
236 
237                     @Override
238                     public void completed(final AsyncExecRuntime execRuntime) {
239                         final HttpHost proxy  = route.getProxyHost();
240                         tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
241                         if (LOG.isDebugEnabled()) {
242                             LOG.debug("{} connected to proxy", exchangeId);
243                         }
244                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
245                     }
246 
247                     @Override
248                     public void failed(final Exception ex) {
249                         asyncExecCallback.failed(ex);
250                     }
251 
252                     @Override
253                     public void cancelled() {
254                         asyncExecCallback.failed(new InterruptedIOException());
255                     }
256 
257                 }));
258                 break;
259 
260                 case HttpRouteDirector.TUNNEL_TARGET:
261                     try {
262                         final HttpHost proxy = route.getProxyHost();
263                         final HttpHost target = route.getTargetHost();
264                         if (LOG.isDebugEnabled()) {
265                             LOG.debug("{} create tunnel", exchangeId);
266                         }
267                         createTunnel(state, proxy, target, scope, new AsyncExecCallback() {
268 
269                             @Override
270                             public AsyncDataConsumer handleResponse(
271                                     final HttpResponse response,
272                                     final EntityDetails entityDetails) throws HttpException, IOException {
273                                 return asyncExecCallback.handleResponse(response, entityDetails);
274                             }
275 
276                             @Override
277                             public void handleInformationResponse(
278                                     final HttpResponse response) throws HttpException, IOException {
279                                 asyncExecCallback.handleInformationResponse(response);
280                             }
281 
282                             @Override
283                             public void completed() {
284                                 if (!execRuntime.isEndpointConnected()) {
285                                     // Remote endpoint disconnected. Need to start over
286                                     if (LOG.isDebugEnabled()) {
287                                         LOG.debug("{} proxy disconnected", exchangeId);
288                                     }
289                                     state.tracker.reset();
290                                 }
291                                 if (state.challenged) {
292                                     if (LOG.isDebugEnabled()) {
293                                         LOG.debug("{} proxy authentication required", exchangeId);
294                                     }
295                                     proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
296                                 } else {
297                                     if (state.tunnelRefused) {
298                                         if (LOG.isDebugEnabled()) {
299                                             LOG.debug("{} tunnel refused", exchangeId);
300                                         }
301                                         asyncExecCallback.failed(new TunnelRefusedException("Tunnel refused", null));
302                                     } else {
303                                         if (LOG.isDebugEnabled()) {
304                                             LOG.debug("{} tunnel to target created", exchangeId);
305                                         }
306                                         tracker.tunnelTarget(false);
307                                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
308                                     }
309                                 }
310                             }
311 
312                         @Override
313                         public void failed(final Exception cause) {
314                             execRuntime.markConnectionNonReusable();
315                             asyncExecCallback.failed(cause);
316                         }
317 
318                     });
319                 } catch (final HttpException | IOException ex) {
320                     asyncExecCallback.failed(ex);
321                 }
322                 break;
323 
324             case HttpRouteDirector.TUNNEL_PROXY:
325                 // The most simple example for this case is a proxy chain
326                 // of two proxies, where P1 must be tunnelled to P2.
327                 // route: Source -> P1 -> P2 -> Target (3 hops)
328                 // fact:  Source -> P1 -> Target       (2 hops)
329                 asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
330                 break;
331 
332             case HttpRouteDirector.LAYER_PROTOCOL:
333                 execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
334 
335                     @Override
336                     public void completed(final AsyncExecRuntime asyncExecRuntime) {
337                         if (LOG.isDebugEnabled()) {
338                             LOG.debug("{} upgraded to TLS", exchangeId);
339                         }
340                         tracker.layerProtocol(route.isSecure());
341                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
342                     }
343 
344                     @Override
345                     public void failed(final Exception ex) {
346                         asyncExecCallback.failed(ex);
347                     }
348 
349                     @Override
350                     public void cancelled() {
351                         asyncExecCallback.failed(new InterruptedIOException());
352                     }
353 
354                 });
355                 break;
356 
357             case HttpRouteDirector.UNREACHABLE:
358                 asyncExecCallback.failed(new HttpException("Unable to establish route: " +
359                         "planned = " + route + "; current = " + fact));
360                 break;
361 
362             case HttpRouteDirector.COMPLETE:
363                 if (LOG.isDebugEnabled()) {
364                     LOG.debug("{} route fully established", exchangeId);
365                 }
366                 try {
367                     chain.proceed(request, entityProducer, scope, asyncExecCallback);
368                 } catch (final HttpException | IOException ex) {
369                     asyncExecCallback.failed(ex);
370                 }
371                 break;
372 
373             default:
374                 throw new IllegalStateException("Unknown step indicator "  + step + " from RouteDirector.");
375         }
376     }
377 
378     private void createTunnel(
379             final State state,
380             final HttpHost proxy,
381             final HttpHost nextHop,
382             final AsyncExecChain.Scope scope,
383             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
384 
385         final CancellableDependency operation = scope.cancellableDependency;
386         final HttpClientContext clientContext = scope.clientContext;
387         final AsyncExecRuntime execRuntime = scope.execRuntime;
388         final String exchangeId = scope.exchangeId;
389 
390         final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
391 
392         if (authCacheKeeper != null) {
393             authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
394         }
395 
396         final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
397 
398             private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
399 
400             @Override
401             public void releaseResources() {
402                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
403                 if (entityConsumer != null) {
404                     entityConsumer.releaseResources();
405                 }
406             }
407 
408             @Override
409             public void failed(final Exception cause) {
410                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
411                 if (entityConsumer != null) {
412                     entityConsumer.releaseResources();
413                 }
414                 asyncExecCallback.failed(cause);
415             }
416 
417             @Override
418             public void cancel() {
419                 failed(new InterruptedIOException());
420             }
421 
422             @Override
423             public void produceRequest(final RequestChannel requestChannel,
424                                        final HttpContext httpContext) throws HttpException, IOException {
425                 final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
426                 connect.setVersion(HttpVersion.HTTP_1_1);
427 
428                 proxyHttpProcessor.process(connect, null, clientContext);
429                 authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
430 
431                 requestChannel.sendRequest(connect, null, clientContext);
432             }
433 
434             @Override
435             public void produce(final DataStreamChannel dataStreamChannel) throws IOException {
436             }
437 
438             @Override
439             public int available() {
440                 return 0;
441             }
442 
443             @Override
444             public void consumeInformation(final HttpResponse httpResponse,
445                                            final HttpContext httpContext) throws HttpException, IOException {
446             }
447 
448             @Override
449             public void consumeResponse(final HttpResponse response,
450                                         final EntityDetails entityDetails,
451                                         final HttpContext httpContext) throws HttpException, IOException {
452                 clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
453                 proxyHttpProcessor.process(response, entityDetails, clientContext);
454 
455                 final int status = response.getCode();
456                 if (status < HttpStatus.SC_SUCCESS) {
457                     throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
458                 }
459 
460                 if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
461                     state.challenged = true;
462                 } else {
463                     state.challenged = false;
464                     if (status >= HttpStatus.SC_REDIRECTION) {
465                         state.tunnelRefused = true;
466                         entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
467                     } else if (status == HttpStatus.SC_OK) {
468                         asyncExecCallback.completed();
469                     } else {
470                         throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
471                     }
472                 }
473             }
474 
475             @Override
476             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
477                 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
478                 if (entityConsumer != null) {
479                     entityConsumer.updateCapacity(capacityChannel);
480                 } else {
481                     capacityChannel.update(Integer.MAX_VALUE);
482                 }
483             }
484 
485             @Override
486             public void consume(final ByteBuffer src) throws IOException {
487                 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
488                 if (entityConsumer != null) {
489                     entityConsumer.consume(src);
490                 }
491             }
492 
493             @Override
494             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
495                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
496                 if (entityConsumer != null) {
497                     entityConsumer.streamEnd(trailers);
498                 }
499                 asyncExecCallback.completed();
500             }
501 
502         };
503 
504         if (LOG.isDebugEnabled()) {
505             operation.setDependency(execRuntime.execute(
506                     exchangeId,
507                     new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
508                     clientContext));
509         } else {
510             operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
511         }
512 
513     }
514 
515     private boolean needAuthentication(
516             final AuthExchange proxyAuthExchange,
517             final HttpHost proxy,
518             final HttpResponse response,
519             final HttpClientContext context) {
520         final RequestConfig config = context.getRequestConfig();
521         if (config.isAuthenticationEnabled()) {
522             final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
523 
524             if (authCacheKeeper != null) {
525                 if (proxyAuthRequested) {
526                     authCacheKeeper.updateOnChallenge(proxy, null, proxyAuthExchange, context);
527                 } else {
528                     authCacheKeeper.updateOnNoChallenge(proxy, null, proxyAuthExchange, context);
529                 }
530             }
531 
532             if (proxyAuthRequested) {
533                 final boolean updated = authenticator.updateAuthState(proxy, ChallengeType.PROXY, response,
534                         proxyAuthStrategy, proxyAuthExchange, context);
535 
536                 if (authCacheKeeper != null) {
537                     authCacheKeeper.updateOnResponse(proxy, null, proxyAuthExchange, context);
538                 }
539 
540                 return updated;
541             }
542         }
543         return false;
544     }
545 
546 }