View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.nio.client;
28  
29  import java.io.IOException;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicLong;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.http.ConnectionClosedException;
38  import org.apache.http.ConnectionReuseStrategy;
39  import org.apache.http.HttpException;
40  import org.apache.http.HttpHost;
41  import org.apache.http.HttpResponse;
42  import org.apache.http.client.config.RequestConfig;
43  import org.apache.http.client.methods.HttpRequestWrapper;
44  import org.apache.http.client.protocol.HttpClientContext;
45  import org.apache.http.concurrent.FutureCallback;
46  import org.apache.http.conn.ConnectionKeepAliveStrategy;
47  import org.apache.http.conn.routing.HttpRoute;
48  import org.apache.http.conn.routing.RouteTracker;
49  import org.apache.http.impl.conn.ConnectionShutdownException;
50  import org.apache.http.nio.NHttpClientConnection;
51  import org.apache.http.nio.conn.NHttpClientConnectionManager;
52  import org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler;
53  import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
54  import org.apache.http.protocol.HttpContext;
55  import org.apache.http.util.Asserts;
56  
57  /**
58   * Abstract {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler} class
59   * that implements connection management aspects shared by all HTTP exchange handlers.
60   * <p>
61   * Instances of this class are expected to be accessed by one thread at a time only.
62   * The {@link #cancel()} method can be called concurrently by multiple threads.
63   */
64  abstract class AbstractClientExchangeHandler implements HttpAsyncClientExchangeHandler {
65  
66      private static final AtomicLong COUNTER = new AtomicLong(1);
67  
68      protected final Log log;
69  
70      private final long id;
71      private final HttpClientContext localContext;
72      private final NHttpClientConnectionManager connmgr;
73      private final ConnectionReuseStrategy connReuseStrategy;
74      private final ConnectionKeepAliveStrategy keepaliveStrategy;
75      private final AtomicReference<Future<NHttpClientConnection>> connectionFutureRef;
76      private final AtomicReference<NHttpClientConnection> managedConnRef;
77      private final AtomicReference<HttpRoute> routeRef;
78      private final AtomicReference<RouteTracker> routeTrackerRef;
79      private final AtomicBoolean routeEstablished;
80      private final AtomicReference<Long> validDurationRef;
81      private final AtomicReference<HttpRequestWrapper> requestRef;
82      private final AtomicReference<HttpResponse> responseRef;
83      private final AtomicBoolean completed;
84      private final AtomicBoolean closed;
85  
86      AbstractClientExchangeHandler(
87              final Log log,
88              final HttpClientContext localContext,
89              final NHttpClientConnectionManager connmgr,
90              final ConnectionReuseStrategy connReuseStrategy,
91              final ConnectionKeepAliveStrategy keepaliveStrategy) {
92          super();
93          this.log = log;
94          this.id = COUNTER.getAndIncrement();
95          this.localContext = localContext;
96          this.connmgr = connmgr;
97          this.connReuseStrategy = connReuseStrategy;
98          this.keepaliveStrategy = keepaliveStrategy;
99          this.connectionFutureRef = new AtomicReference<Future<NHttpClientConnection>>(null);
100         this.managedConnRef = new AtomicReference<NHttpClientConnection>(null);
101         this.routeRef = new AtomicReference<HttpRoute>(null);
102         this.routeTrackerRef = new AtomicReference<RouteTracker>(null);
103         this.routeEstablished = new AtomicBoolean(false);
104         this.validDurationRef = new AtomicReference<Long>(null);
105         this.requestRef = new AtomicReference<HttpRequestWrapper>(null);
106         this.responseRef = new AtomicReference<HttpResponse>(null);
107         this.completed = new AtomicBoolean(false);
108         this.closed = new AtomicBoolean(false);
109     }
110 
111     final long getId() {
112         return this.id;
113     }
114 
115     final boolean isCompleted() {
116         return this.completed.get();
117     }
118 
119     final void markCompleted() {
120         this.completed.set(true);
121     }
122 
123     final void markConnectionNonReusable() {
124         this.validDurationRef.set(null);
125     }
126 
127     final boolean isRouteEstablished() {
128         return this.routeEstablished.get();
129     }
130 
131     final HttpRoute getRoute() {
132         return this.routeRef.get();
133     }
134 
135     final void setRoute(final HttpRoute route) {
136         this.routeRef.set(route);
137     }
138 
139     final HttpRequestWrapper getCurrentRequest() {
140         return this.requestRef.get();
141     }
142 
143     final void setCurrentRequest(final HttpRequestWrapper request) {
144         this.requestRef.set(request);
145     }
146 
147     final HttpResponse getCurrentResponse() {
148         return this.responseRef.get();
149     }
150 
151     final void setCurrentResponse(final HttpResponse response) {
152         this.responseRef.set(response);
153     }
154 
155     final HttpRoute getActualRoute() {
156         final RouteTracker routeTracker = this.routeTrackerRef.get();
157         return routeTracker != null ? routeTracker.toRoute() : null;
158     }
159 
160     final void verifytRoute() {
161         if (!this.routeEstablished.get() && this.routeTrackerRef.get() == null) {
162             final NHttpClientConnection managedConn = this.managedConnRef.get();
163             Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
164             final boolean routeComplete = this.connmgr.isRouteComplete(managedConn);
165             this.routeEstablished.set(routeComplete);
166             if (!routeComplete) {
167                 if (this.log.isDebugEnabled()) {
168                     this.log.debug("[exchange: " + this.id + "] Start connection routing");
169                 }
170                 final HttpRoute route = this.routeRef.get();
171                 this.routeTrackerRef.set(new RouteTracker(route));
172             } else {
173                 if (this.log.isDebugEnabled()) {
174                     this.log.debug("[exchange: " + this.id + "] Connection route already established");
175                 }
176             }
177         }
178     }
179 
180     final void onRouteToTarget() throws IOException {
181         final NHttpClientConnection managedConn = this.managedConnRef.get();
182         Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
183         final HttpRoute route = this.routeRef.get();
184         Asserts.check(route != null, "Inconsistent state: HTTP route is null");
185         final RouteTracker routeTracker = this.routeTrackerRef.get();
186         Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
187         this.connmgr.startRoute(managedConn, route, this.localContext);
188         routeTracker.connectTarget(route.isSecure());
189     }
190 
191     final void onRouteToProxy() throws IOException {
192         final NHttpClientConnection managedConn = this.managedConnRef.get();
193         Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
194         final HttpRoute route = this.routeRef.get();
195         Asserts.check(route != null, "Inconsistent state: HTTP route is null");
196         final RouteTracker routeTracker = this.routeTrackerRef.get();
197         Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
198         this.connmgr.startRoute(managedConn, route, this.localContext);
199         final HttpHost proxy  = route.getProxyHost();
200         routeTracker.connectProxy(proxy, false);
201     }
202 
203     final void onRouteUpgrade() throws IOException {
204         final NHttpClientConnection managedConn = this.managedConnRef.get();
205         Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
206         final HttpRoute route = this.routeRef.get();
207         Asserts.check(route != null, "Inconsistent state: HTTP route is null");
208         final RouteTracker routeTracker = this.routeTrackerRef.get();
209         Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
210         this.connmgr.upgrade(managedConn, route, this.localContext);
211         routeTracker.layerProtocol(route.isSecure());
212     }
213 
214     final void onRouteTunnelToTarget() {
215         final RouteTracker routeTracker = this.routeTrackerRef.get();
216         Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
217         routeTracker.tunnelTarget(false);
218     }
219 
220     final void onRouteComplete() {
221         final NHttpClientConnection managedConn = this.managedConnRef.get();
222         Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
223         final HttpRoute route = this.routeRef.get();
224         Asserts.check(route != null, "Inconsistent state: HTTP route is null");
225         if (this.log.isDebugEnabled()) {
226             this.log.debug("[exchange: " + this.id + "] route completed");
227         }
228         this.connmgr.routeComplete(managedConn, route, this.localContext);
229         this.routeEstablished.set(true);
230         this.routeTrackerRef.set(null);
231     }
232 
233     final NHttpClientConnection getConnection() {
234         return this.managedConnRef.get();
235     }
236 
237     final void releaseConnection() {
238         final NHttpClientConnection localConn = this.managedConnRef.getAndSet(null);
239         if (localConn != null) {
240             if (this.log.isDebugEnabled()) {
241                 this.log.debug("[exchange: " + this.id + "] releasing connection");
242             }
243             localConn.getContext().removeAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
244             final Long validDuration = this.validDurationRef.get();
245             if (validDuration != null) {
246                 final Object userToken = this.localContext.getUserToken();
247                 this.connmgr.releaseConnection(localConn, userToken, validDuration, TimeUnit.MILLISECONDS);
248             } else {
249                 try {
250                     localConn.close();
251                     if (this.log.isDebugEnabled()) {
252                         this.log.debug("[exchange: " + this.id + "] connection discarded");
253                     }
254                 } catch (final IOException ex) {
255                     if (this.log.isDebugEnabled()) {
256                         this.log.debug("[exchange: " + this.id + "] " + ex.getMessage(), ex);
257                     }
258                 } finally {
259                     this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
260                 }
261             }
262         }
263     }
264 
265     final void discardConnection() {
266         final NHttpClientConnection localConn = this.managedConnRef.getAndSet(null);
267         if (localConn != null) {
268             try {
269                 localConn.shutdown();
270                 if (this.log.isDebugEnabled()) {
271                     this.log.debug("[exchange: " + this.id + "] connection aborted");
272                 }
273             } catch (final IOException ex) {
274                 if (this.log.isDebugEnabled()) {
275                     this.log.debug("[exchange: " + this.id + "] " + ex.getMessage(), ex);
276                 }
277             } finally {
278                 this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
279             }
280         }
281     }
282 
283     final boolean manageConnectionPersistence() {
284         final HttpResponse response = this.responseRef.get();
285         Asserts.check(response != null, "Inconsistent state: HTTP response");
286         final NHttpClientConnection managedConn = this.managedConnRef.get();
287         Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
288         final boolean keepAlive = managedConn.isOpen() &&
289                 this.connReuseStrategy.keepAlive(response, this.localContext);
290         if (keepAlive) {
291             final long validDuration = this.keepaliveStrategy.getKeepAliveDuration(
292                     response, this.localContext);
293             if (this.log.isDebugEnabled()) {
294                 final String s;
295                 if (validDuration > 0) {
296                     s = "for " + validDuration + " " + TimeUnit.MILLISECONDS;
297                 } else {
298                     s = "indefinitely";
299                 }
300                 this.log.debug("[exchange: " + this.id + "] Connection can be kept alive " + s);
301             }
302             this.validDurationRef.set(validDuration);
303         } else {
304             if (this.log.isDebugEnabled()) {
305                 this.log.debug("[exchange: " + this.id + "] Connection cannot be kept alive");
306             }
307             this.validDurationRef.set(null);
308         }
309         return keepAlive;
310     }
311 
312     private void connectionAllocated(final NHttpClientConnection managedConn) {
313         try {
314             if (this.log.isDebugEnabled()) {
315                 this.log.debug("[exchange: " + this.id + "] Connection allocated: " + managedConn);
316             }
317             this.connectionFutureRef.set(null);
318             this.managedConnRef.set(managedConn);
319 
320             if (this.closed.get()) {
321                 discardConnection();
322                 return;
323             }
324 
325             if (this.connmgr.isRouteComplete(managedConn)) {
326                 this.routeEstablished.set(true);
327                 this.routeTrackerRef.set(null);
328             }
329 
330             final HttpContext context = managedConn.getContext();
331             synchronized (context) {
332                 context.setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, this);
333                 if (managedConn.isStale()) {
334                     failed(new ConnectionClosedException("Connection closed"));
335                 } else {
336                     managedConn.requestOutput();
337                 }
338             }
339         } catch (final ConnectionShutdownException runex) {
340             failed(runex);
341         } catch (final RuntimeException runex) {
342             failed(runex);
343             throw runex;
344         }
345     }
346 
347     private void connectionRequestFailed(final Exception ex) {
348         if (this.log.isDebugEnabled()) {
349             this.log.debug("[exchange: " + this.id + "] connection request failed");
350         }
351         this.connectionFutureRef.set(null);
352         failed(ex);
353     }
354 
355     private void connectionRequestCancelled() {
356         if (this.log.isDebugEnabled()) {
357             this.log.debug("[exchange: " + this.id + "] Connection request cancelled");
358         }
359         this.connectionFutureRef.set(null);
360         try {
361             executionCancelled();
362         } finally {
363             close();
364         }
365     }
366 
367     final void requestConnection() {
368         final HttpRoute route = this.routeRef.get();
369         if (this.log.isDebugEnabled()) {
370             this.log.debug("[exchange: " + this.id + "] Request connection for " + route);
371         }
372 
373         discardConnection();
374 
375         this.validDurationRef.set(null);
376         this.routeTrackerRef.set(null);
377         this.routeEstablished.set(false);
378 
379         final Object userToken = this.localContext.getUserToken();
380         final RequestConfig config = this.localContext.getRequestConfig();
381         this.connectionFutureRef.set(this.connmgr.requestConnection(
382                 route,
383                 userToken,
384                 config.getConnectTimeout(),
385                 config.getConnectionRequestTimeout(),
386                 TimeUnit.MILLISECONDS,
387                 new FutureCallback<NHttpClientConnection>() {
388 
389                     @Override
390                     public void completed(final NHttpClientConnection managedConn) {
391                         connectionAllocated(managedConn);
392                     }
393 
394                     @Override
395                     public void failed(final Exception ex) {
396                         connectionRequestFailed(ex);
397                     }
398 
399                     @Override
400                     public void cancelled() {
401                         connectionRequestCancelled();
402                     }
403 
404                 }));
405     }
406 
407     abstract void start() throws HttpException, IOException;
408 
409     abstract void releaseResources();
410 
411     abstract void executionFailed(final Exception ex);
412 
413     abstract boolean executionCancelled();
414 
415     @Override
416     public final void close() {
417         if (this.closed.compareAndSet(false, true)) {
418             discardConnection();
419             releaseResources();
420         }
421     }
422 
423     @Override
424     public final boolean isDone() {
425         return this.completed.get();
426     }
427 
428     @Override
429     public final void failed(final Exception ex) {
430         if (this.closed.compareAndSet(false, true)) {
431             try {
432                 executionFailed(ex);
433             } finally {
434                 discardConnection();
435                 releaseResources();
436             }
437         }
438     }
439 
440     @Override
441     public final boolean cancel() {
442         if (this.log.isDebugEnabled()) {
443             this.log.debug("[exchange: " + this.id + "] Cancelled");
444         }
445         if (this.closed.compareAndSet(false, true)) {
446             try {
447                 final Future<NHttpClientConnection> connectionFuture = this.connectionFutureRef.getAndSet(null);
448                 if (connectionFuture != null) {
449                     connectionFuture.cancel(true);
450                 }
451                 return executionCancelled();
452             } finally {
453                 discardConnection();
454                 releaseResources();
455             }
456         }
457         return false;
458     }
459 
460 }