View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.classic;
29  
30  import java.io.IOException;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.TimeoutException;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import org.apache.hc.client5.http.HttpRoute;
36  import org.apache.hc.client5.http.classic.ExecRuntime;
37  import org.apache.hc.client5.http.config.RequestConfig;
38  import org.apache.hc.client5.http.impl.ConnPoolSupport;
39  import org.apache.hc.client5.http.io.ConnectionEndpoint;
40  import org.apache.hc.client5.http.io.HttpClientConnectionManager;
41  import org.apache.hc.client5.http.io.LeaseRequest;
42  import org.apache.hc.client5.http.protocol.HttpClientContext;
43  import org.apache.hc.core5.concurrent.Cancellable;
44  import org.apache.hc.core5.concurrent.CancellableDependency;
45  import org.apache.hc.core5.http.ClassicHttpRequest;
46  import org.apache.hc.core5.http.ClassicHttpResponse;
47  import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
48  import org.apache.hc.core5.http.HttpException;
49  import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
50  import org.apache.hc.core5.io.CloseMode;
51  import org.apache.hc.core5.util.Args;
52  import org.apache.hc.core5.util.TimeValue;
53  import org.apache.hc.core5.util.Timeout;
54  import org.slf4j.Logger;
55  
56  class InternalExecRuntime implements ExecRuntime, Cancellable {
57  
58      private final Logger log;
59  
60      private final HttpClientConnectionManager manager;
61      private final HttpRequestExecutor requestExecutor;
62      private final CancellableDependency cancellableDependency;
63      private final AtomicReference<ConnectionEndpoint> endpointRef;
64  
65      private volatile boolean reusable;
66      private volatile Object state;
67      private volatile TimeValue validDuration;
68  
69      InternalExecRuntime(
70              final Logger log,
71              final HttpClientConnectionManager manager,
72              final HttpRequestExecutor requestExecutor,
73              final CancellableDependency cancellableDependency) {
74          super();
75          this.log = log;
76          this.manager = manager;
77          this.requestExecutor = requestExecutor;
78          this.cancellableDependency = cancellableDependency;
79          this.endpointRef = new AtomicReference<>(null);
80          this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
81      }
82  
83      @Override
84      public boolean isExecutionAborted() {
85          return cancellableDependency != null && cancellableDependency.isCancelled();
86      }
87  
88      @Override
89      public boolean isEndpointAcquired() {
90          return endpointRef.get() != null;
91      }
92  
93      @Override
94      public void acquireEndpoint(
95              final String id, final HttpRoute route, final Object object, final HttpClientContext context) throws IOException {
96          Args.notNull(route, "Route");
97          if (endpointRef.get() == null) {
98              final RequestConfig requestConfig = context.getRequestConfig();
99              final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
100             if (log.isDebugEnabled()) {
101                 log.debug("{}: acquiring endpoint ({})", id, connectionRequestTimeout);
102             }
103             final LeaseRequest connRequest = manager.lease(id, route, connectionRequestTimeout, object);
104             state = object;
105             if (cancellableDependency != null) {
106                 cancellableDependency.setDependency(connRequest);
107                 if (cancellableDependency.isCancelled()) {
108                     connRequest.cancel();
109                     throw new RequestFailedException("Request aborted");
110                 }
111             }
112             try {
113                 final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout);
114                 endpointRef.set(connectionEndpoint);
115                 reusable = connectionEndpoint.isConnected();
116                 if (cancellableDependency != null) {
117                     cancellableDependency.setDependency(this);
118                     if (cancellableDependency.isCancelled()) {
119                         cancel();
120                         throw new RequestFailedException("Request aborted");
121                     }
122                 }
123                 if (log.isDebugEnabled()) {
124                     log.debug("{}: acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
125                 }
126             } catch(final TimeoutException ex) {
127                 throw new ConnectionRequestTimeoutException(ex.getMessage());
128             } catch(final InterruptedException interrupted) {
129                 Thread.currentThread().interrupt();
130                 throw new RequestFailedException("Request aborted", interrupted);
131             } catch(final ExecutionException ex) {
132                 Throwable cause = ex.getCause();
133                 if (cause == null) {
134                     cause = ex;
135                 }
136                 throw new RequestFailedException("Request execution failed", cause);
137             }
138         } else {
139             throw new IllegalStateException("Endpoint already acquired");
140         }
141     }
142 
143     ConnectionEndpoint ensureValid() {
144         final ConnectionEndpoint endpoint = endpointRef.get();
145         if (endpoint == null) {
146             throw new IllegalStateException("Endpoint not acquired / already released");
147         }
148         return endpoint;
149     }
150 
151     @Override
152     public boolean isEndpointConnected() {
153         final ConnectionEndpoint endpoint = endpointRef.get();
154         return endpoint != null && endpoint.isConnected();
155     }
156 
157     private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException {
158         if (cancellableDependency != null) {
159             if (cancellableDependency.isCancelled()) {
160                 throw new RequestFailedException("Request aborted");
161             }
162         }
163         final RequestConfig requestConfig = context.getRequestConfig();
164         final Timeout connectTimeout = requestConfig.getConnectTimeout();
165         if (log.isDebugEnabled()) {
166             log.debug("{}: connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
167         }
168         manager.connect(endpoint, connectTimeout, context);
169         if (log.isDebugEnabled()) {
170             log.debug("{}: endpoint connected", ConnPoolSupport.getId(endpoint));
171         }
172     }
173 
174     @Override
175     public void connectEndpoint(final HttpClientContext context) throws IOException {
176         final ConnectionEndpoint endpoint = ensureValid();
177         if (!endpoint.isConnected()) {
178             connectEndpoint(endpoint, context);
179         }
180     }
181 
182     @Override
183     public void disconnectEndpoint() throws IOException {
184         final ConnectionEndpoint endpoint = endpointRef.get();
185         if (endpoint != null) {
186             endpoint.close();
187             if (log.isDebugEnabled()) {
188                 log.debug("{}: endpoint closed", ConnPoolSupport.getId(endpoint));
189             }
190         }
191     }
192 
193     @Override
194     public void upgradeTls(final HttpClientContext context) throws IOException {
195         final ConnectionEndpoint endpoint = ensureValid();
196         if (log.isDebugEnabled()) {
197             log.debug("{}: upgrading endpoint", ConnPoolSupport.getId(endpoint));
198         }
199         manager.upgrade(endpoint, context);
200     }
201 
202     @Override
203     public ClassicHttpResponse execute(
204             final String id,
205             final ClassicHttpRequest request,
206             final HttpClientContext context) throws IOException, HttpException {
207         final ConnectionEndpoint endpoint = ensureValid();
208         if (!endpoint.isConnected()) {
209             connectEndpoint(endpoint, context);
210         }
211         final RequestConfig requestConfig = context.getRequestConfig();
212         final Timeout responseTimeout = requestConfig.getResponseTimeout();
213         if (responseTimeout != null) {
214             endpoint.setSocketTimeout(responseTimeout);
215         }
216         if (log.isDebugEnabled()) {
217             log.debug("{}: start execution {}", ConnPoolSupport.getId(endpoint), id);
218         }
219         return endpoint.execute(id, request, requestExecutor, context);
220     }
221 
222     @Override
223     public boolean isConnectionReusable() {
224         return reusable;
225     }
226 
227     @Override
228     public void markConnectionReusable(final Object state, final TimeValue validDuration) {
229         this.reusable = true;
230         this.state = state;
231         this.validDuration = validDuration;
232     }
233 
234     @Override
235     public void markConnectionNonReusable() {
236         reusable = false;
237     }
238 
239     private void discardEndpoint(final ConnectionEndpoint endpoint) {
240         try {
241             endpoint.close(CloseMode.IMMEDIATE);
242             if (log.isDebugEnabled()) {
243                 log.debug("{}: endpoint closed", ConnPoolSupport.getId(endpoint));
244             }
245         } finally {
246             if (log.isDebugEnabled()) {
247                 log.debug("{}: discarding endpoint", ConnPoolSupport.getId(endpoint));
248             }
249             manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
250         }
251     }
252 
253     @Override
254     public void releaseEndpoint() {
255         final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
256         if (endpoint != null) {
257             if (reusable) {
258                 if (log.isDebugEnabled()) {
259                     log.debug("{}: releasing valid endpoint", ConnPoolSupport.getId(endpoint));
260                 }
261                 manager.release(endpoint, state, validDuration);
262             } else {
263                 discardEndpoint(endpoint);
264             }
265         }
266     }
267 
268     @Override
269     public void discardEndpoint() {
270         final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
271         if (endpoint != null) {
272             discardEndpoint(endpoint);
273         }
274     }
275 
276     @Override
277     public boolean cancel() {
278         final boolean alreadyReleased = endpointRef.get() == null;
279         final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
280         if (endpoint != null) {
281             if (log.isDebugEnabled()) {
282                 log.debug("{}: cancel", ConnPoolSupport.getId(endpoint));
283             }
284             discardEndpoint(endpoint);
285         }
286         return !alreadyReleased;
287     }
288 
289     @Override
290     public ExecRuntime fork(final CancellableDependency cancellableDependency) {
291         return new InternalExecRuntime(log, manager, requestExecutor, cancellableDependency);
292     }
293 
294 }