1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.client.integration;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketException;
34 import java.util.Collections;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import org.apache.http.HttpClientConnection;
41 import org.apache.http.HttpHost;
42 import org.apache.http.HttpRequest;
43 import org.apache.http.HttpResponse;
44 import org.apache.http.HttpStatus;
45 import org.apache.http.HttpVersion;
46 import org.apache.http.config.Registry;
47 import org.apache.http.config.RegistryBuilder;
48 import org.apache.http.conn.ConnectTimeoutException;
49 import org.apache.http.conn.ConnectionPoolTimeoutException;
50 import org.apache.http.conn.ConnectionRequest;
51 import org.apache.http.conn.HttpClientConnectionManager;
52 import org.apache.http.conn.routing.HttpRoute;
53 import org.apache.http.conn.socket.ConnectionSocketFactory;
54 import org.apache.http.conn.socket.PlainConnectionSocketFactory;
55 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
56 import org.apache.http.localserver.LocalServerTestBase;
57 import org.apache.http.message.BasicHttpRequest;
58 import org.apache.http.protocol.BasicHttpContext;
59 import org.apache.http.protocol.HttpContext;
60 import org.apache.http.protocol.HttpCoreContext;
61 import org.apache.http.protocol.HttpProcessor;
62 import org.apache.http.protocol.HttpRequestExecutor;
63 import org.apache.http.protocol.ImmutableHttpProcessor;
64 import org.apache.http.protocol.RequestConnControl;
65 import org.apache.http.protocol.RequestContent;
66 import org.apache.http.util.EntityUtils;
67 import org.junit.Assert;
68 import org.junit.Test;
69
70
71
72
73
74 public class TestConnectionManagement extends LocalServerTestBase {
75
76 private static HttpClientConnection getConnection(
77 final HttpClientConnectionManager mgr,
78 final HttpRoute route,
79 final long timeout,
80 final TimeUnit unit) throws ConnectionPoolTimeoutException, ExecutionException, InterruptedException {
81 final ConnectionRequest connRequest = mgr.requestConnection(route, null);
82 return connRequest.get(timeout, unit);
83 }
84
85 private static HttpClientConnection getConnection(
86 final HttpClientConnectionManager mgr,
87 final HttpRoute route) throws ConnectionPoolTimeoutException, ExecutionException, InterruptedException {
88 final ConnectionRequest connRequest = mgr.requestConnection(route, null);
89 return connRequest.get(0, TimeUnit.MILLISECONDS);
90 }
91
92
93
94
95 @Test
96 public void testReleaseConnection() throws Exception {
97
98 this.connManager.setMaxTotal(1);
99
100 final HttpHost target = start();
101 final HttpRoute route = new HttpRoute(target, null, false);
102 final int rsplen = 8;
103 final String uri = "/random/" + rsplen;
104
105 final HttpRequest request = new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
106 final HttpContext context = new BasicHttpContext();
107
108 HttpClientConnection conn = getConnection(this.connManager, route);
109 this.connManager.connect(conn, route, 0, context);
110 this.connManager.routeComplete(conn, route, context);
111
112 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
113 context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
114
115 final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
116 new RequestContent(), new RequestConnControl());
117
118 final HttpRequestExecutor exec = new HttpRequestExecutor();
119 exec.preProcess(request, httpProcessor, context);
120 HttpResponse response = exec.execute(request, conn, context);
121
122 Assert.assertEquals("wrong status in first response",
123 HttpStatus.SC_OK,
124 response.getStatusLine().getStatusCode());
125 byte[] data = EntityUtils.toByteArray(response.getEntity());
126 Assert.assertEquals("wrong length of first response entity",
127 rsplen, data.length);
128
129
130
131 try {
132
133 getConnection(this.connManager, route, 10L, TimeUnit.MILLISECONDS);
134 Assert.fail("ConnectionPoolTimeoutException should have been thrown");
135 } catch (final ConnectionPoolTimeoutException e) {
136
137 }
138
139 conn.close();
140 this.connManager.releaseConnection(conn, null, -1, null);
141 conn = getConnection(this.connManager, route);
142 Assert.assertFalse("connection should have been closed", conn.isOpen());
143
144 this.connManager.connect(conn, route, 0, context);
145 this.connManager.routeComplete(conn, route, context);
146
147
148 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
149 response = exec.execute(request, conn, context);
150
151 Assert.assertEquals("wrong status in second response",
152 HttpStatus.SC_OK,
153 response.getStatusLine().getStatusCode());
154 data = EntityUtils.toByteArray(response.getEntity());
155 Assert.assertEquals("wrong length of second response entity",
156 rsplen, data.length);
157
158
159
160
161 this.connManager.releaseConnection(conn, null, -1, null);
162 conn = getConnection(this.connManager, route);
163 Assert.assertTrue("connection should have been open", conn.isOpen());
164
165
166 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
167 response = exec.execute(request, conn, context);
168
169 Assert.assertEquals("wrong status in third response",
170 HttpStatus.SC_OK,
171 response.getStatusLine().getStatusCode());
172 data = EntityUtils.toByteArray(response.getEntity());
173 Assert.assertEquals("wrong length of third response entity",
174 rsplen, data.length);
175
176
177 this.connManager.releaseConnection(conn, null, -1, null);
178 this.connManager.shutdown();
179 }
180
181
182
183
184 @Test
185 public void testReleaseConnectionWithTimeLimits() throws Exception {
186
187 this.connManager.setMaxTotal(1);
188
189 final HttpHost target = start();
190 final HttpRoute route = new HttpRoute(target, null, false);
191 final int rsplen = 8;
192 final String uri = "/random/" + rsplen;
193
194 final HttpRequest request = new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
195 final HttpContext context = new BasicHttpContext();
196
197 HttpClientConnection conn = getConnection(this.connManager, route);
198 this.connManager.connect(conn, route, 0, context);
199 this.connManager.routeComplete(conn, route, context);
200
201 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
202 context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
203
204 final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
205 new RequestContent(), new RequestConnControl());
206
207 final HttpRequestExecutor exec = new HttpRequestExecutor();
208 exec.preProcess(request, httpProcessor, context);
209 HttpResponse response = exec.execute(request, conn, context);
210
211 Assert.assertEquals("wrong status in first response",
212 HttpStatus.SC_OK,
213 response.getStatusLine().getStatusCode());
214 byte[] data = EntityUtils.toByteArray(response.getEntity());
215 Assert.assertEquals("wrong length of first response entity",
216 rsplen, data.length);
217
218
219
220 try {
221
222 getConnection(this.connManager, route, 10L, TimeUnit.MILLISECONDS);
223 Assert.fail("ConnectionPoolTimeoutException should have been thrown");
224 } catch (final ConnectionPoolTimeoutException e) {
225
226 }
227
228 conn.close();
229 this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
230 conn = getConnection(this.connManager, route);
231 Assert.assertFalse("connection should have been closed", conn.isOpen());
232
233
234 this.connManager.connect(conn, route, 0, context);
235 this.connManager.routeComplete(conn, route, context);
236
237 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
238 response = exec.execute(request, conn, context);
239
240 Assert.assertEquals("wrong status in second response",
241 HttpStatus.SC_OK,
242 response.getStatusLine().getStatusCode());
243 data = EntityUtils.toByteArray(response.getEntity());
244 Assert.assertEquals("wrong length of second response entity",
245 rsplen, data.length);
246
247
248 this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
249 conn = getConnection(this.connManager, route);
250 Assert.assertTrue("connection should have been open", conn.isOpen());
251
252
253 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
254 response = exec.execute(request, conn, context);
255
256 Assert.assertEquals("wrong status in third response",
257 HttpStatus.SC_OK,
258 response.getStatusLine().getStatusCode());
259 data = EntityUtils.toByteArray(response.getEntity());
260 Assert.assertEquals("wrong length of third response entity",
261 rsplen, data.length);
262
263
264 this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
265 Thread.sleep(150);
266 conn = getConnection(this.connManager, route);
267 Assert.assertTrue("connection should have been closed", !conn.isOpen());
268
269
270 this.connManager.connect(conn, route, 0, context);
271 this.connManager.routeComplete(conn, route, context);
272
273 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
274 response = exec.execute(request, conn, context);
275
276 Assert.assertEquals("wrong status in third response",
277 HttpStatus.SC_OK,
278 response.getStatusLine().getStatusCode());
279 data = EntityUtils.toByteArray(response.getEntity());
280 Assert.assertEquals("wrong length of fourth response entity",
281 rsplen, data.length);
282
283
284 this.connManager.shutdown();
285 }
286
287 @Test
288 public void testCloseExpiredIdleConnections() throws Exception {
289
290 this.connManager.setMaxTotal(1);
291
292 final HttpHost target = start();
293 final HttpRoute route = new HttpRoute(target, null, false);
294 final HttpContext context = new BasicHttpContext();
295
296 final HttpClientConnection conn = getConnection(this.connManager, route);
297 this.connManager.connect(conn, route, 0, context);
298 this.connManager.routeComplete(conn, route, context);
299
300 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
301 Assert.assertEquals(1, this.connManager.getTotalStats().getLeased());
302 Assert.assertEquals(1, this.connManager.getStats(route).getLeased());
303
304 this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
305
306
307 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
308 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
309 Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
310
311 this.connManager.closeExpiredConnections();
312
313
314 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
315 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
316 Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
317
318 Thread.sleep(150);
319
320 this.connManager.closeExpiredConnections();
321
322
323 Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes());
324 Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
325 Assert.assertEquals(0, this.connManager.getStats(route).getAvailable());
326
327 this.connManager.shutdown();
328 }
329
330 @Test
331 public void testCloseExpiredTTLConnections() throws Exception {
332
333 this.connManager = new PoolingHttpClientConnectionManager(
334 100, TimeUnit.MILLISECONDS);
335 this.clientBuilder.setConnectionManager(this.connManager);
336
337 this.connManager.setMaxTotal(1);
338
339 final HttpHost target = start();
340 final HttpRoute route = new HttpRoute(target, null, false);
341 final HttpContext context = new BasicHttpContext();
342
343 final HttpClientConnection conn = getConnection(this.connManager, route);
344 this.connManager.connect(conn, route, 0, context);
345 this.connManager.routeComplete(conn, route, context);
346
347 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
348 Assert.assertEquals(1, this.connManager.getTotalStats().getLeased());
349 Assert.assertEquals(1, this.connManager.getStats(route).getLeased());
350
351 this.connManager.releaseConnection(conn, null, -1, TimeUnit.MILLISECONDS);
352
353
354 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
355 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
356 Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
357
358 this.connManager.closeExpiredConnections();
359
360
361 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
362 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
363 Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
364
365 Thread.sleep(150);
366
367 this.connManager.closeExpiredConnections();
368
369
370 Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes());
371 Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
372 Assert.assertEquals(0, this.connManager.getStats(route).getAvailable());
373
374 this.connManager.shutdown();
375 }
376
377
378
379
380
381 @Test
382 public void testReleaseConnectionOnAbort() throws Exception {
383
384 this.connManager.setMaxTotal(1);
385
386 final HttpHost target = start();
387 final HttpRoute route = new HttpRoute(target, null, false);
388 final int rsplen = 8;
389 final String uri = "/random/" + rsplen;
390 final HttpContext context = new BasicHttpContext();
391
392 final HttpRequest request =
393 new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
394
395 HttpClientConnection conn = getConnection(this.connManager, route);
396 this.connManager.connect(conn, route, 0, context);
397 this.connManager.routeComplete(conn, route, context);
398
399 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
400 context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
401
402 final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
403 new RequestContent(), new RequestConnControl());
404
405 final HttpRequestExecutor exec = new HttpRequestExecutor();
406 exec.preProcess(request, httpProcessor, context);
407 final HttpResponse response = exec.execute(request, conn, context);
408
409 Assert.assertEquals("wrong status in first response",
410 HttpStatus.SC_OK,
411 response.getStatusLine().getStatusCode());
412
413
414 try {
415
416 getConnection(this.connManager, route, 100L, TimeUnit.MILLISECONDS);
417 Assert.fail("ConnectionPoolTimeoutException should have been thrown");
418 } catch (final ConnectionPoolTimeoutException e) {
419
420 }
421
422
423 Assert.assertTrue(conn instanceof HttpClientConnection);
424 conn.shutdown();
425 this.connManager.releaseConnection(conn, null, -1, null);
426
427
428 conn = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
429 Assert.assertFalse("connection should have been closed", conn.isOpen());
430
431 this.connManager.releaseConnection(conn, null, -1, null);
432 this.connManager.shutdown();
433 }
434
435 @Test
436 public void testAbortDuringConnecting() throws Exception {
437 final CountDownLatch connectLatch = new CountDownLatch(1);
438 final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
439 connectLatch, WaitPolicy.BEFORE_CONNECT, PlainConnectionSocketFactory.getSocketFactory());
440 final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
441 .register("http", stallingSocketFactory)
442 .build();
443
444 this.connManager = new PoolingHttpClientConnectionManager(registry);
445 this.clientBuilder.setConnectionManager(this.connManager);
446
447 this.connManager.setMaxTotal(1);
448
449 final HttpHost target = start();
450 final HttpRoute route = new HttpRoute(target, null, false);
451 final HttpContext context = new BasicHttpContext();
452
453 final HttpClientConnection conn = getConnection(this.connManager, route);
454
455 final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
456 final Thread abortingThread = new Thread(new Runnable() {
457 @Override
458 public void run() {
459 try {
460 stallingSocketFactory.waitForState();
461 conn.shutdown();
462 connManager.releaseConnection(conn, null, -1, null);
463 connectLatch.countDown();
464 } catch (final Throwable e) {
465 throwRef.set(e);
466 }
467 }
468 });
469 abortingThread.start();
470
471 try {
472 this.connManager.connect(conn, route, 0, context);
473 this.connManager.routeComplete(conn, route, context);
474 Assert.fail("expected SocketException");
475 } catch(final SocketException expected) {}
476
477 abortingThread.join(5000);
478 if(throwRef.get() != null) {
479 throw new RuntimeException(throwRef.get());
480 }
481
482 Assert.assertFalse(conn.isOpen());
483
484
485 final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
486 Assert.assertFalse("connection should have been closed", conn2.isOpen());
487
488 this.connManager.releaseConnection(conn2, null, -1, null);
489 this.connManager.shutdown();
490 }
491
492 @Test
493 public void testAbortBeforeSocketCreate() throws Exception {
494 final CountDownLatch connectLatch = new CountDownLatch(1);
495 final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
496 connectLatch, WaitPolicy.BEFORE_CREATE, PlainConnectionSocketFactory.getSocketFactory());
497 final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
498 .register("http", stallingSocketFactory)
499 .build();
500
501 this.connManager = new PoolingHttpClientConnectionManager(registry);
502 this.clientBuilder.setConnectionManager(this.connManager);
503
504 this.connManager.setMaxTotal(1);
505
506 final HttpHost target = start();
507 final HttpRoute route = new HttpRoute(target, null, false);
508 final HttpContext context = new BasicHttpContext();
509
510 final HttpClientConnection conn = getConnection(this.connManager, route);
511
512 final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
513 final Thread abortingThread = new Thread(new Runnable() {
514 @Override
515 public void run() {
516 try {
517 stallingSocketFactory.waitForState();
518 conn.shutdown();
519 connManager.releaseConnection(conn, null, -1, null);
520 connectLatch.countDown();
521 } catch (final Throwable e) {
522 throwRef.set(e);
523 }
524 }
525 });
526 abortingThread.start();
527
528 try {
529 this.connManager.connect(conn, route, 0, context);
530 this.connManager.routeComplete(conn, route, context);
531 Assert.fail("IOException expected");
532 } catch(final IOException expected) {
533 }
534
535 abortingThread.join(5000);
536 if(throwRef.get() != null) {
537 throw new RuntimeException(throwRef.get());
538 }
539
540 Assert.assertFalse(conn.isOpen());
541
542
543 final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
544 Assert.assertFalse("connection should have been closed", conn2.isOpen());
545
546 this.connManager.releaseConnection(conn2, null, -1, null);
547 this.connManager.shutdown();
548 }
549
550 @Test
551 public void testAbortAfterSocketConnect() throws Exception {
552 final CountDownLatch connectLatch = new CountDownLatch(1);
553 final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
554 connectLatch, WaitPolicy.AFTER_CONNECT, PlainConnectionSocketFactory.getSocketFactory());
555 final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
556 .register("http", stallingSocketFactory)
557 .build();
558
559 this.connManager = new PoolingHttpClientConnectionManager(registry);
560 this.clientBuilder.setConnectionManager(this.connManager);
561
562 this.connManager.setMaxTotal(1);
563
564 final HttpHost target = start();
565 final HttpRoute route = new HttpRoute(target, null, false);
566 final HttpContext context = new BasicHttpContext();
567
568 final HttpClientConnection conn = getConnection(this.connManager, route);
569
570 final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
571 final Thread abortingThread = new Thread(new Runnable() {
572 @Override
573 public void run() {
574 try {
575 stallingSocketFactory.waitForState();
576 conn.shutdown();
577 connManager.releaseConnection(conn, null, -1, null);
578 connectLatch.countDown();
579 } catch (final Throwable e) {
580 throwRef.set(e);
581 }
582 }
583 });
584 abortingThread.start();
585
586 try {
587 this.connManager.connect(conn, route, 0, context);
588 this.connManager.routeComplete(conn, route, context);
589 Assert.fail("IOException expected");
590 } catch(final IOException expected) {
591 }
592
593 abortingThread.join(5000);
594 if(throwRef.get() != null) {
595 throw new RuntimeException(throwRef.get());
596 }
597
598 Assert.assertFalse(conn.isOpen());
599
600
601 final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
602 Assert.assertFalse("connection should have been closed", conn2.isOpen());
603
604 this.connManager.releaseConnection(conn2, null, -1, null);
605 this.connManager.shutdown();
606 }
607
608 static class LatchSupport {
609
610 private final CountDownLatch continueLatch;
611 private final CountDownLatch waitLatch = new CountDownLatch(1);
612 protected final WaitPolicy waitPolicy;
613
614 LatchSupport(final CountDownLatch continueLatch, final WaitPolicy waitPolicy) {
615 this.continueLatch = continueLatch;
616 this.waitPolicy = waitPolicy;
617 }
618
619 void waitForState() throws InterruptedException {
620 if(!waitLatch.await(1, TimeUnit.SECONDS)) {
621 throw new RuntimeException("waited too long");
622 }
623 }
624
625 void latch() {
626 waitLatch.countDown();
627 try {
628 if (!continueLatch.await(60, TimeUnit.SECONDS)) {
629 throw new RuntimeException("waited too long!");
630 }
631 } catch (final InterruptedException e) {
632 throw new RuntimeException(e);
633 }
634 }
635 }
636
637 private static class StallingSocketFactory extends LatchSupport implements ConnectionSocketFactory {
638
639 private final ConnectionSocketFactory delegate;
640
641 public StallingSocketFactory(
642 final CountDownLatch continueLatch,
643 final WaitPolicy waitPolicy,
644 final ConnectionSocketFactory delegate) {
645 super(continueLatch, waitPolicy);
646 this.delegate = delegate;
647 }
648
649 @Override
650 public Socket connectSocket(
651 final int connectTimeout,
652 final Socket sock,
653 final HttpHost host,
654 final InetSocketAddress remoteAddress,
655 final InetSocketAddress localAddress,
656 final HttpContext context) throws IOException, ConnectTimeoutException {
657 if(waitPolicy == WaitPolicy.BEFORE_CONNECT) {
658 latch();
659 }
660
661 final Socket socket = delegate.connectSocket(
662 connectTimeout, sock, host, remoteAddress, localAddress, context);
663
664 if(waitPolicy == WaitPolicy.AFTER_CONNECT) {
665 latch();
666 }
667
668 return socket;
669 }
670
671 @Override
672 public Socket createSocket(final HttpContext context) throws IOException {
673 if(waitPolicy == WaitPolicy.BEFORE_CREATE) {
674 latch();
675 }
676
677 return delegate.createSocket(context);
678 }
679
680 }
681
682 private enum WaitPolicy { BEFORE_CREATE, BEFORE_CONNECT, AFTER_CONNECT, AFTER_OPEN }
683
684 }