1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.client;
28
29 import java.io.IOException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicLong;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.http.ConnectionClosedException;
38 import org.apache.http.ConnectionReuseStrategy;
39 import org.apache.http.HttpException;
40 import org.apache.http.HttpHost;
41 import org.apache.http.HttpResponse;
42 import org.apache.http.client.config.RequestConfig;
43 import org.apache.http.client.methods.HttpRequestWrapper;
44 import org.apache.http.client.protocol.HttpClientContext;
45 import org.apache.http.concurrent.FutureCallback;
46 import org.apache.http.conn.ConnectionKeepAliveStrategy;
47 import org.apache.http.conn.routing.HttpRoute;
48 import org.apache.http.conn.routing.RouteTracker;
49 import org.apache.http.impl.conn.ConnectionShutdownException;
50 import org.apache.http.nio.NHttpClientConnection;
51 import org.apache.http.nio.conn.NHttpClientConnectionManager;
52 import org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler;
53 import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
54 import org.apache.http.protocol.HttpContext;
55 import org.apache.http.util.Asserts;
56
57
58
59
60
61
62
63
64 abstract class AbstractClientExchangeHandler implements HttpAsyncClientExchangeHandler {
65
66 private static final AtomicLong COUNTER = new AtomicLong(1);
67
68 protected final Log log;
69
70 private final long id;
71 private final HttpClientContext localContext;
72 private final NHttpClientConnectionManager connmgr;
73 private final ConnectionReuseStrategy connReuseStrategy;
74 private final ConnectionKeepAliveStrategy keepaliveStrategy;
75 private final AtomicReference<Future<NHttpClientConnection>> connectionFutureRef;
76 private final AtomicReference<NHttpClientConnection> managedConnRef;
77 private final AtomicReference<HttpRoute> routeRef;
78 private final AtomicReference<RouteTracker> routeTrackerRef;
79 private final AtomicBoolean routeEstablished;
80 private final AtomicReference<Long> validDurationRef;
81 private final AtomicReference<HttpRequestWrapper> requestRef;
82 private final AtomicReference<HttpResponse> responseRef;
83 private final AtomicBoolean completed;
84 private final AtomicBoolean closed;
85
86 AbstractClientExchangeHandler(
87 final Log log,
88 final HttpClientContext localContext,
89 final NHttpClientConnectionManager connmgr,
90 final ConnectionReuseStrategy connReuseStrategy,
91 final ConnectionKeepAliveStrategy keepaliveStrategy) {
92 super();
93 this.log = log;
94 this.id = COUNTER.getAndIncrement();
95 this.localContext = localContext;
96 this.connmgr = connmgr;
97 this.connReuseStrategy = connReuseStrategy;
98 this.keepaliveStrategy = keepaliveStrategy;
99 this.connectionFutureRef = new AtomicReference<Future<NHttpClientConnection>>(null);
100 this.managedConnRef = new AtomicReference<NHttpClientConnection>(null);
101 this.routeRef = new AtomicReference<HttpRoute>(null);
102 this.routeTrackerRef = new AtomicReference<RouteTracker>(null);
103 this.routeEstablished = new AtomicBoolean(false);
104 this.validDurationRef = new AtomicReference<Long>(null);
105 this.requestRef = new AtomicReference<HttpRequestWrapper>(null);
106 this.responseRef = new AtomicReference<HttpResponse>(null);
107 this.completed = new AtomicBoolean(false);
108 this.closed = new AtomicBoolean(false);
109 }
110
111 final long getId() {
112 return this.id;
113 }
114
115 final boolean isCompleted() {
116 return this.completed.get();
117 }
118
119 final void markCompleted() {
120 this.completed.set(true);
121 }
122
123 final void markConnectionNonReusable() {
124 this.validDurationRef.set(null);
125 }
126
127 final boolean isRouteEstablished() {
128 return this.routeEstablished.get();
129 }
130
131 final HttpRoute getRoute() {
132 return this.routeRef.get();
133 }
134
135 final void setRoute(final HttpRoute route) {
136 this.routeRef.set(route);
137 }
138
139 final HttpRequestWrapper getCurrentRequest() {
140 return this.requestRef.get();
141 }
142
143 final void setCurrentRequest(final HttpRequestWrapper request) {
144 this.requestRef.set(request);
145 }
146
147 final HttpResponse getCurrentResponse() {
148 return this.responseRef.get();
149 }
150
151 final void setCurrentResponse(final HttpResponse response) {
152 this.responseRef.set(response);
153 }
154
155 final HttpRoute getActualRoute() {
156 final RouteTracker routeTracker = this.routeTrackerRef.get();
157 return routeTracker != null ? routeTracker.toRoute() : null;
158 }
159
160 final void verifytRoute() {
161 if (!this.routeEstablished.get() && this.routeTrackerRef.get() == null) {
162 final NHttpClientConnection managedConn = this.managedConnRef.get();
163 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
164 final boolean routeComplete = this.connmgr.isRouteComplete(managedConn);
165 this.routeEstablished.set(routeComplete);
166 if (!routeComplete) {
167 if (this.log.isDebugEnabled()) {
168 this.log.debug("[exchange: " + this.id + "] Start connection routing");
169 }
170 final HttpRoute route = this.routeRef.get();
171 this.routeTrackerRef.set(new RouteTracker(route));
172 } else {
173 if (this.log.isDebugEnabled()) {
174 this.log.debug("[exchange: " + this.id + "] Connection route already established");
175 }
176 }
177 }
178 }
179
180 final void onRouteToTarget() throws IOException {
181 final NHttpClientConnection managedConn = this.managedConnRef.get();
182 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
183 final HttpRoute route = this.routeRef.get();
184 Asserts.check(route != null, "Inconsistent state: HTTP route is null");
185 final RouteTracker routeTracker = this.routeTrackerRef.get();
186 Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
187 this.connmgr.startRoute(managedConn, route, this.localContext);
188 routeTracker.connectTarget(route.isSecure());
189 }
190
191 final void onRouteToProxy() throws IOException {
192 final NHttpClientConnection managedConn = this.managedConnRef.get();
193 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
194 final HttpRoute route = this.routeRef.get();
195 Asserts.check(route != null, "Inconsistent state: HTTP route is null");
196 final RouteTracker routeTracker = this.routeTrackerRef.get();
197 Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
198 this.connmgr.startRoute(managedConn, route, this.localContext);
199 final HttpHost proxy = route.getProxyHost();
200 routeTracker.connectProxy(proxy, false);
201 }
202
203 final void onRouteUpgrade() throws IOException {
204 final NHttpClientConnection managedConn = this.managedConnRef.get();
205 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
206 final HttpRoute route = this.routeRef.get();
207 Asserts.check(route != null, "Inconsistent state: HTTP route is null");
208 final RouteTracker routeTracker = this.routeTrackerRef.get();
209 Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
210 this.connmgr.upgrade(managedConn, route, this.localContext);
211 routeTracker.layerProtocol(route.isSecure());
212 }
213
214 final void onRouteTunnelToTarget() {
215 final RouteTracker routeTracker = this.routeTrackerRef.get();
216 Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
217 routeTracker.tunnelTarget(false);
218 }
219
220 final void onRouteComplete() {
221 final NHttpClientConnection managedConn = this.managedConnRef.get();
222 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
223 final HttpRoute route = this.routeRef.get();
224 Asserts.check(route != null, "Inconsistent state: HTTP route is null");
225 if (this.log.isDebugEnabled()) {
226 this.log.debug("[exchange: " + this.id + "] route completed");
227 }
228 this.connmgr.routeComplete(managedConn, route, this.localContext);
229 this.routeEstablished.set(true);
230 this.routeTrackerRef.set(null);
231 }
232
233 final NHttpClientConnection getConnection() {
234 return this.managedConnRef.get();
235 }
236
237 final void releaseConnection() {
238 final NHttpClientConnection localConn = this.managedConnRef.getAndSet(null);
239 if (localConn != null) {
240 if (this.log.isDebugEnabled()) {
241 this.log.debug("[exchange: " + this.id + "] releasing connection");
242 }
243 localConn.getContext().removeAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
244 final Long validDuration = this.validDurationRef.get();
245 if (validDuration != null) {
246 final Object userToken = this.localContext.getUserToken();
247 this.connmgr.releaseConnection(localConn, userToken, validDuration, TimeUnit.MILLISECONDS);
248 } else {
249 try {
250 localConn.close();
251 if (this.log.isDebugEnabled()) {
252 this.log.debug("[exchange: " + this.id + "] connection discarded");
253 }
254 } catch (final IOException ex) {
255 if (this.log.isDebugEnabled()) {
256 this.log.debug("[exchange: " + this.id + "] " + ex.getMessage(), ex);
257 }
258 } finally {
259 this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
260 }
261 }
262 }
263 }
264
265 final void discardConnection() {
266 final NHttpClientConnection localConn = this.managedConnRef.getAndSet(null);
267 if (localConn != null) {
268 try {
269 localConn.shutdown();
270 if (this.log.isDebugEnabled()) {
271 this.log.debug("[exchange: " + this.id + "] connection aborted");
272 }
273 } catch (final IOException ex) {
274 if (this.log.isDebugEnabled()) {
275 this.log.debug("[exchange: " + this.id + "] " + ex.getMessage(), ex);
276 }
277 } finally {
278 this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
279 }
280 }
281 }
282
283 final boolean manageConnectionPersistence() {
284 final HttpResponse response = this.responseRef.get();
285 Asserts.check(response != null, "Inconsistent state: HTTP response");
286 final NHttpClientConnection managedConn = this.managedConnRef.get();
287 Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
288 final boolean keepAlive = managedConn.isOpen() &&
289 this.connReuseStrategy.keepAlive(response, this.localContext);
290 if (keepAlive) {
291 final long validDuration = this.keepaliveStrategy.getKeepAliveDuration(
292 response, this.localContext);
293 if (this.log.isDebugEnabled()) {
294 final String s;
295 if (validDuration > 0) {
296 s = "for " + validDuration + " " + TimeUnit.MILLISECONDS;
297 } else {
298 s = "indefinitely";
299 }
300 this.log.debug("[exchange: " + this.id + "] Connection can be kept alive " + s);
301 }
302 this.validDurationRef.set(validDuration);
303 } else {
304 if (this.log.isDebugEnabled()) {
305 this.log.debug("[exchange: " + this.id + "] Connection cannot be kept alive");
306 }
307 this.validDurationRef.set(null);
308 }
309 return keepAlive;
310 }
311
312 private void connectionAllocated(final NHttpClientConnection managedConn) {
313 try {
314 if (this.log.isDebugEnabled()) {
315 this.log.debug("[exchange: " + this.id + "] Connection allocated: " + managedConn);
316 }
317 this.connectionFutureRef.set(null);
318 this.managedConnRef.set(managedConn);
319
320 if (this.closed.get()) {
321 discardConnection();
322 return;
323 }
324
325 if (this.connmgr.isRouteComplete(managedConn)) {
326 this.routeEstablished.set(true);
327 this.routeTrackerRef.set(null);
328 }
329
330 final HttpContext context = managedConn.getContext();
331 synchronized (context) {
332 context.setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, this);
333 if (managedConn.isStale()) {
334 failed(new ConnectionClosedException("Connection closed"));
335 } else {
336 managedConn.requestOutput();
337 }
338 }
339 } catch (final ConnectionShutdownException runex) {
340 failed(runex);
341 } catch (final RuntimeException runex) {
342 failed(runex);
343 throw runex;
344 }
345 }
346
347 private void connectionRequestFailed(final Exception ex) {
348 if (this.log.isDebugEnabled()) {
349 this.log.debug("[exchange: " + this.id + "] connection request failed");
350 }
351 this.connectionFutureRef.set(null);
352 failed(ex);
353 }
354
355 private void connectionRequestCancelled() {
356 if (this.log.isDebugEnabled()) {
357 this.log.debug("[exchange: " + this.id + "] Connection request cancelled");
358 }
359 this.connectionFutureRef.set(null);
360 try {
361 executionCancelled();
362 } finally {
363 close();
364 }
365 }
366
367 final void requestConnection() {
368 final HttpRoute route = this.routeRef.get();
369 if (this.log.isDebugEnabled()) {
370 this.log.debug("[exchange: " + this.id + "] Request connection for " + route);
371 }
372
373 discardConnection();
374
375 this.validDurationRef.set(null);
376 this.routeTrackerRef.set(null);
377 this.routeEstablished.set(false);
378
379 final Object userToken = this.localContext.getUserToken();
380 final RequestConfig config = this.localContext.getRequestConfig();
381 this.connectionFutureRef.set(this.connmgr.requestConnection(
382 route,
383 userToken,
384 config.getConnectTimeout(),
385 config.getConnectionRequestTimeout(),
386 TimeUnit.MILLISECONDS,
387 new FutureCallback<NHttpClientConnection>() {
388
389 @Override
390 public void completed(final NHttpClientConnection managedConn) {
391 connectionAllocated(managedConn);
392 }
393
394 @Override
395 public void failed(final Exception ex) {
396 connectionRequestFailed(ex);
397 }
398
399 @Override
400 public void cancelled() {
401 connectionRequestCancelled();
402 }
403
404 }));
405 }
406
407 abstract void start() throws HttpException, IOException;
408
409 abstract void releaseResources();
410
411 abstract void executionFailed(final Exception ex);
412
413 abstract boolean executionCancelled();
414
415 @Override
416 public final void close() {
417 if (this.closed.compareAndSet(false, true)) {
418 discardConnection();
419 releaseResources();
420 }
421 }
422
423 @Override
424 public final boolean isDone() {
425 return this.completed.get();
426 }
427
428 @Override
429 public final void failed(final Exception ex) {
430 if (this.closed.compareAndSet(false, true)) {
431 try {
432 executionFailed(ex);
433 } finally {
434 discardConnection();
435 releaseResources();
436 }
437 }
438 }
439
440 @Override
441 public final boolean cancel() {
442 if (this.log.isDebugEnabled()) {
443 this.log.debug("[exchange: " + this.id + "] Cancelled");
444 }
445 if (this.closed.compareAndSet(false, true)) {
446 try {
447 final Future<NHttpClientConnection> connectionFuture = this.connectionFutureRef.getAndSet(null);
448 if (connectionFuture != null) {
449 connectionFuture.cancel(true);
450 }
451 return executionCancelled();
452 } finally {
453 discardConnection();
454 releaseResources();
455 }
456 }
457 return false;
458 }
459
460 }