1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.client;
28
29 import java.io.IOException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicLong;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.http.ConnectionClosedException;
38 import org.apache.http.ConnectionReuseStrategy;
39 import org.apache.http.HttpHost;
40 import org.apache.http.HttpResponse;
41 import org.apache.http.client.config.RequestConfig;
42 import org.apache.http.client.methods.HttpRequestWrapper;
43 import org.apache.http.client.protocol.HttpClientContext;
44 import org.apache.http.concurrent.FutureCallback;
45 import org.apache.http.conn.ConnectionKeepAliveStrategy;
46 import org.apache.http.conn.routing.HttpRoute;
47 import org.apache.http.conn.routing.RouteTracker;
48 import org.apache.http.nio.NHttpClientConnection;
49 import org.apache.http.nio.conn.NHttpClientConnectionManager;
50 import org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler;
51 import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
52 import org.apache.http.protocol.HttpContext;
53 import org.apache.http.util.Asserts;
54
55
56
57
58
59
60
61
62 abstract class AbstractClientExchangeHandler implements HttpAsyncClientExchangeHandler {
63
64 private static final AtomicLong COUNTER = new AtomicLong(1);
65
66 protected final Log log;
67
68 private final long id;
69 private final HttpClientContext localContext;
70 private final NHttpClientConnectionManager connmgr;
71 private final ConnectionReuseStrategy connReuseStrategy;
72 private final ConnectionKeepAliveStrategy keepaliveStrategy;
73 private final AtomicReference<Future<NHttpClientConnection>> connectionFutureRef;
74 private final AtomicReference<NHttpClientConnection> managedConnRef;
75 private final AtomicReference<HttpRoute> routeRef;
76 private final AtomicReference<RouteTracker> routeTrackerRef;
77 private final AtomicBoolean routeEstablished;
78 private final AtomicReference<Long> validDurationRef;
79 private final AtomicReference<HttpRequestWrapper> requestRef;
80 private final AtomicReference<HttpResponse> responseRef;
81 private final AtomicBoolean completed;
82 private final AtomicBoolean closed;
83
84 AbstractClientExchangeHandler(
85 final Log log,
86 final HttpClientContext localContext,
87 final NHttpClientConnectionManager connmgr,
88 final ConnectionReuseStrategy connReuseStrategy,
89 final ConnectionKeepAliveStrategy keepaliveStrategy) {
90 super();
91 this.log = log;
92 this.id = COUNTER.getAndIncrement();
93 this.localContext = localContext;
94 this.connmgr = connmgr;
95 this.connReuseStrategy = connReuseStrategy;
96 this.keepaliveStrategy = keepaliveStrategy;
97 this.connectionFutureRef = new AtomicReference<Future<NHttpClientConnection>>(null);
98 this.managedConnRef = new AtomicReference<NHttpClientConnection>(null);
99 this.routeRef = new AtomicReference<HttpRoute>(null);
100 this.routeTrackerRef = new AtomicReference<RouteTracker>(null);
101 this.routeEstablished = new AtomicBoolean(false);
102 this.validDurationRef = new AtomicReference<Long>(null);
103 this.requestRef = new AtomicReference<HttpRequestWrapper>(null);
104 this.responseRef = new AtomicReference<HttpResponse>(null);
105 this.completed = new AtomicBoolean(false);
106 this.closed = new AtomicBoolean(false);
107 }
108
109 final long getId() {
110 return this.id;
111 }
112
113 final boolean isCompleted() {
114 return this.completed.get();
115 }
116
117 final void markCompleted() {
118 this.completed.set(true);
119 }
120
121 final void markConnectionNonReusable() {
122 this.validDurationRef.set(null);
123 }
124
125 final boolean isRouteEstablished() {
126 return this.routeEstablished.get();
127 }
128
129 final HttpRoute getRoute() {
130 return this.routeRef.get();
131 }
132
133 final void setRoute(final HttpRoute route) {
134 this.routeRef.set(route);
135 }
136
137 final HttpRequestWrapper getCurrentRequest() {
138 return this.requestRef.get();
139 }
140
141 final void setCurrentRequest(final HttpRequestWrapper request) {
142 this.requestRef.set(request);
143 }
144
145 final HttpResponse getCurrentResponse() {
146 return this.responseRef.get();
147 }
148
149 final void setCurrentResponse(final HttpResponse response) {
150 this.responseRef.set(response);
151 }
152
153 final HttpRoute getActualRoute() {
154 final RouteTracker routeTracker = this.routeTrackerRef.get();
155 return routeTracker != null ? routeTracker.toRoute() : null;
156 }
157
158 final void verifytRoute() {
159 if (!this.routeEstablished.get() && this.routeTrackerRef.get() == null) {
160 final NHttpClientConnection managedConn = this.managedConnRef.get();
161 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
162 final boolean routeComplete = this.connmgr.isRouteComplete(managedConn);
163 this.routeEstablished.set(routeComplete);
164 if (!routeComplete) {
165 if (this.log.isDebugEnabled()) {
166 this.log.debug("[exchange: " + this.id + "] Start connection routing");
167 }
168 final HttpRoute route = this.routeRef.get();
169 this.routeTrackerRef.set(new RouteTracker(route));
170 } else {
171 if (this.log.isDebugEnabled()) {
172 this.log.debug("[exchange: " + this.id + "] Connection route already established");
173 }
174 }
175 }
176 }
177
178 final void onRouteToTarget() throws IOException {
179 final NHttpClientConnection managedConn = this.managedConnRef.get();
180 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
181 final HttpRoute route = this.routeRef.get();
182 Asserts.check(route != null, "Inconsistent state: HTTP route is null");
183 final RouteTracker routeTracker = this.routeTrackerRef.get();
184 Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
185 this.connmgr.startRoute(managedConn, route, this.localContext);
186 routeTracker.connectTarget(route.isSecure());
187 }
188
189 final void onRouteToProxy() throws IOException {
190 final NHttpClientConnection managedConn = this.managedConnRef.get();
191 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
192 final HttpRoute route = this.routeRef.get();
193 Asserts.check(route != null, "Inconsistent state: HTTP route is null");
194 final RouteTracker routeTracker = this.routeTrackerRef.get();
195 Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
196 this.connmgr.startRoute(managedConn, route, this.localContext);
197 final HttpHost proxy = route.getProxyHost();
198 routeTracker.connectProxy(proxy, false);
199 }
200
201 final void onRouteUpgrade() throws IOException {
202 final NHttpClientConnection managedConn = this.managedConnRef.get();
203 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
204 final HttpRoute route = this.routeRef.get();
205 Asserts.check(route != null, "Inconsistent state: HTTP route is null");
206 final RouteTracker routeTracker = this.routeTrackerRef.get();
207 Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
208 this.connmgr.upgrade(managedConn, route, this.localContext);
209 routeTracker.layerProtocol(route.isSecure());
210 }
211
212 final void onRouteTunnelToTarget() {
213 final RouteTracker routeTracker = this.routeTrackerRef.get();
214 Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
215 routeTracker.tunnelTarget(false);
216 }
217
218 final void onRouteComplete() {
219 final NHttpClientConnection managedConn = this.managedConnRef.get();
220 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
221 final HttpRoute route = this.routeRef.get();
222 Asserts.check(route != null, "Inconsistent state: HTTP route is null");
223 if (this.log.isDebugEnabled()) {
224 this.log.debug("[exchange: " + this.id + "] route completed");
225 }
226 this.connmgr.routeComplete(managedConn, route, this.localContext);
227 this.routeEstablished.set(true);
228 this.routeTrackerRef.set(null);
229 }
230
231 final NHttpClientConnection getConnection() {
232 return this.managedConnRef.get();
233 }
234
235 final void releaseConnection() {
236 final NHttpClientConnection localConn = this.managedConnRef.getAndSet(null);
237 if (localConn != null) {
238 if (this.log.isDebugEnabled()) {
239 this.log.debug("[exchange: " + this.id + "] releasing connection");
240 }
241 localConn.getContext().removeAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
242 final Long validDuration = this.validDurationRef.get();
243 if (validDuration != null) {
244 final Object userToken = this.localContext.getUserToken();
245 this.connmgr.releaseConnection(localConn, userToken, validDuration, TimeUnit.MILLISECONDS);
246 } else {
247 try {
248 localConn.close();
249 if (this.log.isDebugEnabled()) {
250 this.log.debug("[exchange: " + this.id + "] connection discarded");
251 }
252 } catch (final IOException ex) {
253 if (this.log.isDebugEnabled()) {
254 this.log.debug("[exchange: " + this.id + "] " + ex.getMessage(), ex);
255 }
256 } finally {
257 this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
258 }
259 }
260 }
261 }
262
263 final void discardConnection() {
264 final NHttpClientConnection localConn = this.managedConnRef.getAndSet(null);
265 if (localConn != null) {
266 try {
267 localConn.shutdown();
268 if (this.log.isDebugEnabled()) {
269 this.log.debug("[exchange: " + this.id + "] connection aborted");
270 }
271 } catch (final IOException ex) {
272 if (this.log.isDebugEnabled()) {
273 this.log.debug("[exchange: " + this.id + "] " + ex.getMessage(), ex);
274 }
275 } finally {
276 this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
277 }
278 }
279 }
280
281 final boolean manageConnectionPersistence() {
282 final HttpResponse response = this.responseRef.get();
283 Asserts.check(response != null, "Inconsistent state: HTTP response");
284 final NHttpClientConnection managedConn = this.managedConnRef.get();
285 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
286 final boolean keepAlive = managedConn.isOpen() &&
287 this.connReuseStrategy.keepAlive(response, this.localContext);
288 if (keepAlive) {
289 final long validDuration = this.keepaliveStrategy.getKeepAliveDuration(
290 response, this.localContext);
291 if (this.log.isDebugEnabled()) {
292 final String s;
293 if (validDuration > 0) {
294 s = "for " + validDuration + " " + TimeUnit.MILLISECONDS;
295 } else {
296 s = "indefinitely";
297 }
298 this.log.debug("[exchange: " + this.id + "] Connection can be kept alive " + s);
299 }
300 this.validDurationRef.set(validDuration);
301 } else {
302 if (this.log.isDebugEnabled()) {
303 this.log.debug("[exchange: " + this.id + "] Connection cannot be kept alive");
304 }
305 this.validDurationRef.set(null);
306 }
307 return keepAlive;
308 }
309
310 private void connectionAllocated(final NHttpClientConnection managedConn) {
311 try {
312 if (this.log.isDebugEnabled()) {
313 this.log.debug("[exchange: " + this.id + "] Connection allocated: " + managedConn);
314 }
315 this.connectionFutureRef.set(null);
316 this.managedConnRef.set(managedConn);
317
318 if (this.closed.get()) {
319 discardConnection();
320 return;
321 }
322
323 if (this.connmgr.isRouteComplete(managedConn)) {
324 this.routeEstablished.set(true);
325 this.routeTrackerRef.set(null);
326 }
327
328 final HttpContext context = managedConn.getContext();
329 synchronized (context) {
330 context.setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, this);
331 if (managedConn.isStale()) {
332 failed(new ConnectionClosedException("Connection closed"));
333 } else {
334 managedConn.requestOutput();
335 }
336 }
337 } catch (final RuntimeException runex) {
338 failed(runex);
339 throw runex;
340 }
341 }
342
343 private void connectionRequestFailed(final Exception ex) {
344 if (this.log.isDebugEnabled()) {
345 this.log.debug("[exchange: " + this.id + "] connection request failed");
346 }
347 this.connectionFutureRef.set(null);
348 failed(ex);
349 }
350
351 private void connectionRequestCancelled() {
352 if (this.log.isDebugEnabled()) {
353 this.log.debug("[exchange: " + this.id + "] Connection request cancelled");
354 }
355 this.connectionFutureRef.set(null);
356 try {
357 executionCancelled();
358 } finally {
359 close();
360 }
361 }
362
363 final void requestConnection() {
364 final HttpRoute route = this.routeRef.get();
365 if (this.log.isDebugEnabled()) {
366 this.log.debug("[exchange: " + this.id + "] Request connection for " + route);
367 }
368
369 discardConnection();
370
371 this.validDurationRef.set(null);
372 this.routeTrackerRef.set(null);
373 this.routeEstablished.set(false);
374
375 final Object userToken = this.localContext.getUserToken();
376 final RequestConfig config = this.localContext.getRequestConfig();
377 this.connectionFutureRef.set(this.connmgr.requestConnection(
378 route,
379 userToken,
380 config.getConnectTimeout(),
381 config.getConnectionRequestTimeout(),
382 TimeUnit.MILLISECONDS,
383 new FutureCallback<NHttpClientConnection>() {
384
385 @Override
386 public void completed(final NHttpClientConnection managedConn) {
387 connectionAllocated(managedConn);
388 }
389
390 @Override
391 public void failed(final Exception ex) {
392 connectionRequestFailed(ex);
393 }
394
395 @Override
396 public void cancelled() {
397 connectionRequestCancelled();
398 }
399
400 }));
401 }
402
403 abstract void releaseResources();
404
405 abstract void executionFailed(final Exception ex);
406
407 abstract boolean executionCancelled();
408
409 @Override
410 public final void close() {
411 if (this.closed.compareAndSet(false, true)) {
412 discardConnection();
413 releaseResources();
414 }
415 }
416
417 @Override
418 public final boolean isDone() {
419 return this.completed.get();
420 }
421
422 @Override
423 public final void failed(final Exception ex) {
424 if (this.closed.compareAndSet(false, true)) {
425 try {
426 executionFailed(ex);
427 } finally {
428 discardConnection();
429 releaseResources();
430 }
431 }
432 }
433
434 @Override
435 public final boolean cancel() {
436 if (this.log.isDebugEnabled()) {
437 this.log.debug("[exchange: " + this.id + "] Cancelled");
438 }
439 if (this.closed.compareAndSet(false, true)) {
440 try {
441 final Future<NHttpClientConnection> connectionFuture = this.connectionFutureRef.getAndSet(null);
442 if (connectionFuture != null) {
443 connectionFuture.cancel(true);
444 }
445 return executionCancelled();
446 } finally {
447 discardConnection();
448 releaseResources();
449 }
450 }
451 return false;
452 }
453
454 }