1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.client.integration;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketException;
34 import java.util.Collections;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import org.apache.http.HttpClientConnection;
41 import org.apache.http.HttpHost;
42 import org.apache.http.HttpRequest;
43 import org.apache.http.HttpRequestInterceptor;
44 import org.apache.http.HttpResponse;
45 import org.apache.http.HttpStatus;
46 import org.apache.http.HttpVersion;
47 import org.apache.http.config.Registry;
48 import org.apache.http.config.RegistryBuilder;
49 import org.apache.http.conn.ConnectTimeoutException;
50 import org.apache.http.conn.ConnectionPoolTimeoutException;
51 import org.apache.http.conn.ConnectionRequest;
52 import org.apache.http.conn.HttpClientConnectionManager;
53 import org.apache.http.conn.routing.HttpRoute;
54 import org.apache.http.conn.socket.ConnectionSocketFactory;
55 import org.apache.http.conn.socket.PlainConnectionSocketFactory;
56 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
57 import org.apache.http.localserver.LocalServerTestBase;
58 import org.apache.http.message.BasicHttpRequest;
59 import org.apache.http.protocol.BasicHttpContext;
60 import org.apache.http.protocol.HttpContext;
61 import org.apache.http.protocol.HttpCoreContext;
62 import org.apache.http.protocol.HttpProcessor;
63 import org.apache.http.protocol.HttpRequestExecutor;
64 import org.apache.http.protocol.ImmutableHttpProcessor;
65 import org.apache.http.protocol.RequestConnControl;
66 import org.apache.http.protocol.RequestContent;
67 import org.apache.http.util.EntityUtils;
68 import org.junit.Assert;
69 import org.junit.Test;
70
71
72
73
74
75 public class TestConnectionManagement extends LocalServerTestBase {
76
77 private static HttpClientConnection getConnection(
78 final HttpClientConnectionManager mgr,
79 final HttpRoute route,
80 final long timeout,
81 final TimeUnit unit) throws ConnectionPoolTimeoutException, ExecutionException, InterruptedException {
82 final ConnectionRequest connRequest = mgr.requestConnection(route, null);
83 return connRequest.get(timeout, unit);
84 }
85
86 private static HttpClientConnection getConnection(
87 final HttpClientConnectionManager mgr,
88 final HttpRoute route) throws ConnectionPoolTimeoutException, ExecutionException, InterruptedException {
89 final ConnectionRequest connRequest = mgr.requestConnection(route, null);
90 return connRequest.get(0, TimeUnit.MILLISECONDS);
91 }
92
93
94
95
96 @Test
97 public void testReleaseConnection() throws Exception {
98
99 this.connManager.setMaxTotal(1);
100
101 final HttpHost target = start();
102 final HttpRoute route = new HttpRoute(target, null, false);
103 final int rsplen = 8;
104 final String uri = "/random/" + rsplen;
105
106 final HttpRequest request = new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
107 final HttpContext context = new BasicHttpContext();
108
109 HttpClientConnection conn = getConnection(this.connManager, route);
110 this.connManager.connect(conn, route, 0, context);
111 this.connManager.routeComplete(conn, route, context);
112
113 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
114 context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
115
116 final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
117 new HttpRequestInterceptor[] { new RequestContent(), new RequestConnControl() });
118
119 final HttpRequestExecutor exec = new HttpRequestExecutor();
120 exec.preProcess(request, httpProcessor, context);
121 HttpResponse response = exec.execute(request, conn, context);
122
123 Assert.assertEquals("wrong status in first response",
124 HttpStatus.SC_OK,
125 response.getStatusLine().getStatusCode());
126 byte[] data = EntityUtils.toByteArray(response.getEntity());
127 Assert.assertEquals("wrong length of first response entity",
128 rsplen, data.length);
129
130
131
132 try {
133
134 getConnection(this.connManager, route, 10L, TimeUnit.MILLISECONDS);
135 Assert.fail("ConnectionPoolTimeoutException should have been thrown");
136 } catch (final ConnectionPoolTimeoutException e) {
137
138 }
139
140 conn.close();
141 this.connManager.releaseConnection(conn, null, -1, null);
142 conn = getConnection(this.connManager, route);
143 Assert.assertFalse("connection should have been closed", conn.isOpen());
144
145 this.connManager.connect(conn, route, 0, context);
146 this.connManager.routeComplete(conn, route, context);
147
148
149 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
150 response = exec.execute(request, conn, context);
151
152 Assert.assertEquals("wrong status in second response",
153 HttpStatus.SC_OK,
154 response.getStatusLine().getStatusCode());
155 data = EntityUtils.toByteArray(response.getEntity());
156 Assert.assertEquals("wrong length of second response entity",
157 rsplen, data.length);
158
159
160
161
162 this.connManager.releaseConnection(conn, null, -1, null);
163 conn = getConnection(this.connManager, route);
164 Assert.assertTrue("connection should have been open", conn.isOpen());
165
166
167 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
168 response = exec.execute(request, conn, context);
169
170 Assert.assertEquals("wrong status in third response",
171 HttpStatus.SC_OK,
172 response.getStatusLine().getStatusCode());
173 data = EntityUtils.toByteArray(response.getEntity());
174 Assert.assertEquals("wrong length of third response entity",
175 rsplen, data.length);
176
177
178 this.connManager.releaseConnection(conn, null, -1, null);
179 this.connManager.shutdown();
180 }
181
182
183
184
185 @Test
186 public void testReleaseConnectionWithTimeLimits() throws Exception {
187
188 this.connManager.setMaxTotal(1);
189
190 final HttpHost target = start();
191 final HttpRoute route = new HttpRoute(target, null, false);
192 final int rsplen = 8;
193 final String uri = "/random/" + rsplen;
194
195 final HttpRequest request = new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
196 final HttpContext context = new BasicHttpContext();
197
198 HttpClientConnection conn = getConnection(this.connManager, route);
199 this.connManager.connect(conn, route, 0, context);
200 this.connManager.routeComplete(conn, route, context);
201
202 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
203 context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
204
205 final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
206 new HttpRequestInterceptor[] { new RequestContent(), new RequestConnControl() });
207
208 final HttpRequestExecutor exec = new HttpRequestExecutor();
209 exec.preProcess(request, httpProcessor, context);
210 HttpResponse response = exec.execute(request, conn, context);
211
212 Assert.assertEquals("wrong status in first response",
213 HttpStatus.SC_OK,
214 response.getStatusLine().getStatusCode());
215 byte[] data = EntityUtils.toByteArray(response.getEntity());
216 Assert.assertEquals("wrong length of first response entity",
217 rsplen, data.length);
218
219
220
221 try {
222
223 getConnection(this.connManager, route, 10L, TimeUnit.MILLISECONDS);
224 Assert.fail("ConnectionPoolTimeoutException should have been thrown");
225 } catch (final ConnectionPoolTimeoutException e) {
226
227 }
228
229 conn.close();
230 this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
231 conn = getConnection(this.connManager, route);
232 Assert.assertFalse("connection should have been closed", conn.isOpen());
233
234
235 this.connManager.connect(conn, route, 0, context);
236 this.connManager.routeComplete(conn, route, context);
237
238 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
239 response = exec.execute(request, conn, context);
240
241 Assert.assertEquals("wrong status in second response",
242 HttpStatus.SC_OK,
243 response.getStatusLine().getStatusCode());
244 data = EntityUtils.toByteArray(response.getEntity());
245 Assert.assertEquals("wrong length of second response entity",
246 rsplen, data.length);
247
248
249 this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
250 conn = getConnection(this.connManager, route);
251 Assert.assertTrue("connection should have been open", conn.isOpen());
252
253
254 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
255 response = exec.execute(request, conn, context);
256
257 Assert.assertEquals("wrong status in third response",
258 HttpStatus.SC_OK,
259 response.getStatusLine().getStatusCode());
260 data = EntityUtils.toByteArray(response.getEntity());
261 Assert.assertEquals("wrong length of third response entity",
262 rsplen, data.length);
263
264
265 this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
266 Thread.sleep(150);
267 conn = getConnection(this.connManager, route);
268 Assert.assertTrue("connection should have been closed", !conn.isOpen());
269
270
271 this.connManager.connect(conn, route, 0, context);
272 this.connManager.routeComplete(conn, route, context);
273
274 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
275 response = exec.execute(request, conn, context);
276
277 Assert.assertEquals("wrong status in third response",
278 HttpStatus.SC_OK,
279 response.getStatusLine().getStatusCode());
280 data = EntityUtils.toByteArray(response.getEntity());
281 Assert.assertEquals("wrong length of fourth response entity",
282 rsplen, data.length);
283
284
285 this.connManager.shutdown();
286 }
287
288 @Test
289 public void testCloseExpiredIdleConnections() throws Exception {
290
291 this.connManager.setMaxTotal(1);
292
293 final HttpHost target = start();
294 final HttpRoute route = new HttpRoute(target, null, false);
295 final HttpContext context = new BasicHttpContext();
296
297 final HttpClientConnection conn = getConnection(this.connManager, route);
298 this.connManager.connect(conn, route, 0, context);
299 this.connManager.routeComplete(conn, route, context);
300
301 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
302 Assert.assertEquals(1, this.connManager.getTotalStats().getLeased());
303 Assert.assertEquals(1, this.connManager.getStats(route).getLeased());
304
305 this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
306
307
308 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
309 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
310 Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
311
312 this.connManager.closeExpiredConnections();
313
314
315 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
316 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
317 Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
318
319 Thread.sleep(150);
320
321 this.connManager.closeExpiredConnections();
322
323
324 Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes());
325 Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
326 Assert.assertEquals(0, this.connManager.getStats(route).getAvailable());
327
328 this.connManager.shutdown();
329 }
330
331 @Test
332 public void testCloseExpiredTTLConnections() throws Exception {
333
334 this.connManager = new PoolingHttpClientConnectionManager(
335 100, TimeUnit.MILLISECONDS);
336 this.clientBuilder.setConnectionManager(this.connManager);
337
338 this.connManager.setMaxTotal(1);
339
340 final HttpHost target = start();
341 final HttpRoute route = new HttpRoute(target, null, false);
342 final HttpContext context = new BasicHttpContext();
343
344 final HttpClientConnection conn = getConnection(this.connManager, route);
345 this.connManager.connect(conn, route, 0, context);
346 this.connManager.routeComplete(conn, route, context);
347
348 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
349 Assert.assertEquals(1, this.connManager.getTotalStats().getLeased());
350 Assert.assertEquals(1, this.connManager.getStats(route).getLeased());
351
352 this.connManager.releaseConnection(conn, null, -1, TimeUnit.MILLISECONDS);
353
354
355 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
356 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
357 Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
358
359 this.connManager.closeExpiredConnections();
360
361
362 Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
363 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
364 Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
365
366 Thread.sleep(150);
367
368 this.connManager.closeExpiredConnections();
369
370
371 Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes());
372 Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
373 Assert.assertEquals(0, this.connManager.getStats(route).getAvailable());
374
375 this.connManager.shutdown();
376 }
377
378
379
380
381
382 @Test
383 public void testReleaseConnectionOnAbort() throws Exception {
384
385 this.connManager.setMaxTotal(1);
386
387 final HttpHost target = start();
388 final HttpRoute route = new HttpRoute(target, null, false);
389 final int rsplen = 8;
390 final String uri = "/random/" + rsplen;
391 final HttpContext context = new BasicHttpContext();
392
393 final HttpRequest request =
394 new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
395
396 HttpClientConnection conn = getConnection(this.connManager, route);
397 this.connManager.connect(conn, route, 0, context);
398 this.connManager.routeComplete(conn, route, context);
399
400 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
401 context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
402
403 final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
404 new HttpRequestInterceptor[] { new RequestContent(), new RequestConnControl() });
405
406 final HttpRequestExecutor exec = new HttpRequestExecutor();
407 exec.preProcess(request, httpProcessor, context);
408 final HttpResponse response = exec.execute(request, conn, context);
409
410 Assert.assertEquals("wrong status in first response",
411 HttpStatus.SC_OK,
412 response.getStatusLine().getStatusCode());
413
414
415 try {
416
417 getConnection(this.connManager, route, 100L, TimeUnit.MILLISECONDS);
418 Assert.fail("ConnectionPoolTimeoutException should have been thrown");
419 } catch (final ConnectionPoolTimeoutException e) {
420
421 }
422
423
424 Assert.assertTrue(conn instanceof HttpClientConnection);
425 conn.shutdown();
426 this.connManager.releaseConnection(conn, null, -1, null);
427
428
429 conn = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
430 Assert.assertFalse("connection should have been closed", conn.isOpen());
431
432 this.connManager.releaseConnection(conn, null, -1, null);
433 this.connManager.shutdown();
434 }
435
436 @Test
437 public void testAbortDuringConnecting() throws Exception {
438 final CountDownLatch connectLatch = new CountDownLatch(1);
439 final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
440 connectLatch, WaitPolicy.BEFORE_CONNECT, PlainConnectionSocketFactory.getSocketFactory());
441 final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
442 .register("http", stallingSocketFactory)
443 .build();
444
445 this.connManager = new PoolingHttpClientConnectionManager(registry);
446 this.clientBuilder.setConnectionManager(this.connManager);
447
448 this.connManager.setMaxTotal(1);
449
450 final HttpHost target = start();
451 final HttpRoute route = new HttpRoute(target, null, false);
452 final HttpContext context = new BasicHttpContext();
453
454 final HttpClientConnection conn = getConnection(this.connManager, route);
455
456 final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
457 final Thread abortingThread = new Thread(new Runnable() {
458 @Override
459 public void run() {
460 try {
461 stallingSocketFactory.waitForState();
462 conn.shutdown();
463 connManager.releaseConnection(conn, null, -1, null);
464 connectLatch.countDown();
465 } catch (final Throwable e) {
466 throwRef.set(e);
467 }
468 }
469 });
470 abortingThread.start();
471
472 try {
473 this.connManager.connect(conn, route, 0, context);
474 this.connManager.routeComplete(conn, route, context);
475 Assert.fail("expected SocketException");
476 } catch(final SocketException expected) {}
477
478 abortingThread.join(5000);
479 if(throwRef.get() != null) {
480 throw new RuntimeException(throwRef.get());
481 }
482
483 Assert.assertFalse(conn.isOpen());
484
485
486 final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
487 Assert.assertFalse("connection should have been closed", conn2.isOpen());
488
489 this.connManager.releaseConnection(conn2, null, -1, null);
490 this.connManager.shutdown();
491 }
492
493 @Test
494 public void testAbortBeforeSocketCreate() throws Exception {
495 final CountDownLatch connectLatch = new CountDownLatch(1);
496 final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
497 connectLatch, WaitPolicy.BEFORE_CREATE, PlainConnectionSocketFactory.getSocketFactory());
498 final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
499 .register("http", stallingSocketFactory)
500 .build();
501
502 this.connManager = new PoolingHttpClientConnectionManager(registry);
503 this.clientBuilder.setConnectionManager(this.connManager);
504
505 this.connManager.setMaxTotal(1);
506
507 final HttpHost target = start();
508 final HttpRoute route = new HttpRoute(target, null, false);
509 final HttpContext context = new BasicHttpContext();
510
511 final HttpClientConnection conn = getConnection(this.connManager, route);
512
513 final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
514 final Thread abortingThread = new Thread(new Runnable() {
515 @Override
516 public void run() {
517 try {
518 stallingSocketFactory.waitForState();
519 conn.shutdown();
520 connManager.releaseConnection(conn, null, -1, null);
521 connectLatch.countDown();
522 } catch (final Throwable e) {
523 throwRef.set(e);
524 }
525 }
526 });
527 abortingThread.start();
528
529 try {
530 this.connManager.connect(conn, route, 0, context);
531 this.connManager.routeComplete(conn, route, context);
532 Assert.fail("IOException expected");
533 } catch(final IOException expected) {
534 }
535
536 abortingThread.join(5000);
537 if(throwRef.get() != null) {
538 throw new RuntimeException(throwRef.get());
539 }
540
541 Assert.assertFalse(conn.isOpen());
542
543
544 final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
545 Assert.assertFalse("connection should have been closed", conn2.isOpen());
546
547 this.connManager.releaseConnection(conn2, null, -1, null);
548 this.connManager.shutdown();
549 }
550
551 @Test
552 public void testAbortAfterSocketConnect() throws Exception {
553 final CountDownLatch connectLatch = new CountDownLatch(1);
554 final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
555 connectLatch, WaitPolicy.AFTER_CONNECT, PlainConnectionSocketFactory.getSocketFactory());
556 final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
557 .register("http", stallingSocketFactory)
558 .build();
559
560 this.connManager = new PoolingHttpClientConnectionManager(registry);
561 this.clientBuilder.setConnectionManager(this.connManager);
562
563 this.connManager.setMaxTotal(1);
564
565 final HttpHost target = start();
566 final HttpRoute route = new HttpRoute(target, null, false);
567 final HttpContext context = new BasicHttpContext();
568
569 final HttpClientConnection conn = getConnection(this.connManager, route);
570
571 final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
572 final Thread abortingThread = new Thread(new Runnable() {
573 @Override
574 public void run() {
575 try {
576 stallingSocketFactory.waitForState();
577 conn.shutdown();
578 connManager.releaseConnection(conn, null, -1, null);
579 connectLatch.countDown();
580 } catch (final Throwable e) {
581 throwRef.set(e);
582 }
583 }
584 });
585 abortingThread.start();
586
587 try {
588 this.connManager.connect(conn, route, 0, context);
589 this.connManager.routeComplete(conn, route, context);
590 Assert.fail("IOException expected");
591 } catch(final IOException expected) {
592 }
593
594 abortingThread.join(5000);
595 if(throwRef.get() != null) {
596 throw new RuntimeException(throwRef.get());
597 }
598
599 Assert.assertFalse(conn.isOpen());
600
601
602 final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
603 Assert.assertFalse("connection should have been closed", conn2.isOpen());
604
605 this.connManager.releaseConnection(conn2, null, -1, null);
606 this.connManager.shutdown();
607 }
608
609 static class LatchSupport {
610
611 private final CountDownLatch continueLatch;
612 private final CountDownLatch waitLatch = new CountDownLatch(1);
613 protected final WaitPolicy waitPolicy;
614
615 LatchSupport(final CountDownLatch continueLatch, final WaitPolicy waitPolicy) {
616 this.continueLatch = continueLatch;
617 this.waitPolicy = waitPolicy;
618 }
619
620 void waitForState() throws InterruptedException {
621 if(!waitLatch.await(1, TimeUnit.SECONDS)) {
622 throw new RuntimeException("waited too long");
623 }
624 }
625
626 void latch() {
627 waitLatch.countDown();
628 try {
629 if (!continueLatch.await(60, TimeUnit.SECONDS)) {
630 throw new RuntimeException("waited too long!");
631 }
632 } catch (final InterruptedException e) {
633 throw new RuntimeException(e);
634 }
635 }
636 }
637
638 private static class StallingSocketFactory extends LatchSupport implements ConnectionSocketFactory {
639
640 private final ConnectionSocketFactory delegate;
641
642 public StallingSocketFactory(
643 final CountDownLatch continueLatch,
644 final WaitPolicy waitPolicy,
645 final ConnectionSocketFactory delegate) {
646 super(continueLatch, waitPolicy);
647 this.delegate = delegate;
648 }
649
650 @Override
651 public Socket connectSocket(
652 final int connectTimeout,
653 final Socket sock,
654 final HttpHost host,
655 final InetSocketAddress remoteAddress,
656 final InetSocketAddress localAddress,
657 final HttpContext context) throws IOException, ConnectTimeoutException {
658 if(waitPolicy == WaitPolicy.BEFORE_CONNECT) {
659 latch();
660 }
661
662 final Socket socket = delegate.connectSocket(
663 connectTimeout, sock, host, remoteAddress, localAddress, context);
664
665 if(waitPolicy == WaitPolicy.AFTER_CONNECT) {
666 latch();
667 }
668
669 return socket;
670 }
671
672 @Override
673 public Socket createSocket(final HttpContext context) throws IOException {
674 if(waitPolicy == WaitPolicy.BEFORE_CREATE) {
675 latch();
676 }
677
678 return delegate.createSocket(context);
679 }
680
681 }
682
683 private enum WaitPolicy { BEFORE_CREATE, BEFORE_CONNECT, AFTER_CONNECT, AFTER_OPEN }
684
685 }