View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.hc.client5.http.HttpRoute;
34  import org.apache.hc.client5.http.async.AsyncExecRuntime;
35  import org.apache.hc.client5.http.config.RequestConfig;
36  import org.apache.hc.client5.http.config.TlsConfig;
37  import org.apache.hc.client5.http.impl.ConnPoolSupport;
38  import org.apache.hc.client5.http.impl.Operations;
39  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
40  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
41  import org.apache.hc.client5.http.protocol.HttpClientContext;
42  import org.apache.hc.core5.concurrent.CallbackContribution;
43  import org.apache.hc.core5.concurrent.Cancellable;
44  import org.apache.hc.core5.concurrent.FutureCallback;
45  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
46  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
47  import org.apache.hc.core5.http.nio.HandlerFactory;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.reactor.ConnectionInitiator;
50  import org.apache.hc.core5.util.TimeValue;
51  import org.apache.hc.core5.util.Timeout;
52  import org.slf4j.Logger;
53  
54  class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
55  
56      private final Logger log;
57      private final AsyncClientConnectionManager manager;
58      private final ConnectionInitiator connectionInitiator;
59      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
60      /**
61       * @deprecated TLS should be configured by the connection manager
62       */
63      @Deprecated
64      private final TlsConfig tlsConfig;
65      private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
66      private volatile boolean reusable;
67      private volatile Object state;
68      private volatile TimeValue validDuration;
69  
70      InternalHttpAsyncExecRuntime(
71              final Logger log,
72              final AsyncClientConnectionManager manager,
73              final ConnectionInitiator connectionInitiator,
74              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
75              final TlsConfig tlsConfig) {
76          super();
77          this.log = log;
78          this.manager = manager;
79          this.connectionInitiator = connectionInitiator;
80          this.pushHandlerFactory = pushHandlerFactory;
81          this.tlsConfig = tlsConfig;
82          this.endpointRef = new AtomicReference<>();
83          this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
84      }
85  
86      @Override
87      public boolean isEndpointAcquired() {
88          return endpointRef.get() != null;
89      }
90  
91      @Override
92      public Cancellable acquireEndpoint(
93              final String id,
94              final HttpRoute route,
95              final Object object,
96              final HttpClientContext context,
97              final FutureCallback<AsyncExecRuntime> callback) {
98          if (endpointRef.get() == null) {
99              state = object;
100             final RequestConfig requestConfig = context.getRequestConfig();
101             final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
102             if (log.isDebugEnabled()) {
103                 log.debug("{} acquiring endpoint ({})", id, connectionRequestTimeout);
104             }
105             return Operations.cancellable(manager.lease(
106                     id,
107                     route,
108                     object,
109                     connectionRequestTimeout,
110                     new FutureCallback<AsyncConnectionEndpoint>() {
111 
112                         @Override
113                         public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
114                             endpointRef.set(connectionEndpoint);
115                             reusable = connectionEndpoint.isConnected();
116                             if (log.isDebugEnabled()) {
117                                 log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
118                             }
119                             callback.completed(InternalHttpAsyncExecRuntime.this);
120                         }
121 
122                         @Override
123                         public void failed(final Exception ex) {
124                             callback.failed(ex);
125                         }
126 
127                         @Override
128                         public void cancelled() {
129                             callback.cancelled();
130                         }
131                     }));
132         }
133         callback.completed(this);
134         return Operations.nonCancellable();
135     }
136 
137     private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
138         try {
139             endpoint.close(CloseMode.IMMEDIATE);
140             if (log.isDebugEnabled()) {
141                 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
142             }
143         } finally {
144             if (log.isDebugEnabled()) {
145                 log.debug("{} discarding endpoint", ConnPoolSupport.getId(endpoint));
146             }
147             manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
148         }
149     }
150 
151     @Override
152     public void releaseEndpoint() {
153         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
154         if (endpoint != null) {
155             if (reusable) {
156                 if (log.isDebugEnabled()) {
157                     log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
158                 }
159                 manager.release(endpoint, state, validDuration);
160             } else {
161                 discardEndpoint(endpoint);
162             }
163         }
164     }
165 
166     @Override
167     public void discardEndpoint() {
168         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
169         if (endpoint != null) {
170             discardEndpoint(endpoint);
171         }
172     }
173 
174     @Override
175     public boolean validateConnection() {
176         if (reusable) {
177             final AsyncConnectionEndpoint endpoint = endpointRef.get();
178             return endpoint != null && endpoint.isConnected();
179         }
180         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
181         if (endpoint != null) {
182             discardEndpoint(endpoint);
183         }
184         return false;
185     }
186 
187     AsyncConnectionEndpoint ensureValid() {
188         final AsyncConnectionEndpoint endpoint = endpointRef.get();
189         if (endpoint == null) {
190             throw new IllegalStateException("Endpoint not acquired / already released");
191         }
192         return endpoint;
193     }
194 
195     @Override
196     public boolean isEndpointConnected() {
197         final AsyncConnectionEndpoint endpoint = endpointRef.get();
198         return endpoint != null && endpoint.isConnected();
199     }
200 
201     @Override
202     public Cancellable connectEndpoint(
203             final HttpClientContext context,
204             final FutureCallback<AsyncExecRuntime> callback) {
205         final AsyncConnectionEndpoint endpoint = ensureValid();
206         if (endpoint.isConnected()) {
207             callback.completed(this);
208             return Operations.nonCancellable();
209         }
210         final RequestConfig requestConfig = context.getRequestConfig();
211         @SuppressWarnings("deprecation")
212         final Timeout connectTimeout = requestConfig.getConnectTimeout();
213         if (log.isDebugEnabled()) {
214             log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
215         }
216         return Operations.cancellable(manager.connect(
217                 endpoint,
218                 connectionInitiator,
219                 connectTimeout,
220                 tlsConfig,
221                 context,
222                 new CallbackContribution<AsyncConnectionEndpoint>(callback) {
223 
224                     @Override
225                     public void completed(final AsyncConnectionEndpoint endpoint) {
226                         if (log.isDebugEnabled()) {
227                             log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
228                         }
229                         if (callback != null) {
230                             callback.completed(InternalHttpAsyncExecRuntime.this);
231                         }
232                     }
233 
234         }));
235 
236     }
237 
238     @Override
239     public void disconnectEndpoint() {
240         final AsyncConnectionEndpoint endpoint = endpointRef.get();
241         if (endpoint != null) {
242             endpoint.close(CloseMode.GRACEFUL);
243         }
244     }
245 
246     @Override
247     public void upgradeTls(final HttpClientContext context) {
248         upgradeTls(context, null);
249     }
250 
251     @Override
252     public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
253         final AsyncConnectionEndpoint endpoint = ensureValid();
254         if (log.isDebugEnabled()) {
255             log.debug("{} upgrading endpoint", ConnPoolSupport.getId(endpoint));
256         }
257         manager.upgrade(endpoint, tlsConfig, context, new CallbackContribution<AsyncConnectionEndpoint>(callback) {
258 
259             @Override
260             public void completed(final AsyncConnectionEndpoint endpoint) {
261                 if (callback != null) {
262                     callback.completed(InternalHttpAsyncExecRuntime.this);
263                 }
264             }
265 
266         });
267     }
268 
269     @Override
270     public Cancellable execute(
271             final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
272         final AsyncConnectionEndpoint endpoint = ensureValid();
273         if (endpoint.isConnected()) {
274             if (log.isDebugEnabled()) {
275                 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
276             }
277             final RequestConfig requestConfig = context.getRequestConfig();
278             final Timeout responseTimeout = requestConfig.getResponseTimeout();
279             if (responseTimeout != null) {
280                 endpoint.setSocketTimeout(responseTimeout);
281             }
282             endpoint.execute(id, exchangeHandler, context);
283             if (context.getRequestConfig().isHardCancellationEnabled()) {
284                 return () -> {
285                     exchangeHandler.cancel();
286                     return true;
287                 };
288             }
289         } else {
290             connectEndpoint(context, new FutureCallback<AsyncExecRuntime>() {
291 
292                 @Override
293                 public void completed(final AsyncExecRuntime runtime) {
294                     if (log.isDebugEnabled()) {
295                         log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
296                     }
297                     try {
298                         endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
299                     } catch (final RuntimeException ex) {
300                         failed(ex);
301                     }
302                 }
303 
304                 @Override
305                 public void failed(final Exception ex) {
306                     exchangeHandler.failed(ex);
307                 }
308 
309                 @Override
310                 public void cancelled() {
311                     exchangeHandler.failed(new InterruptedIOException());
312                 }
313 
314             });
315         }
316         return Operations.nonCancellable();
317     }
318 
319     @Override
320     public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
321         reusable = true;
322         state = newState;
323         validDuration = newValidDuration;
324     }
325 
326     @Override
327     public void markConnectionNonReusable() {
328         reusable = false;
329         state = null;
330         validDuration = null;
331     }
332 
333     @Override
334     public AsyncExecRuntime fork() {
335         return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig);
336     }
337 
338 }