View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.client.integration;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketException;
34  import java.util.Collections;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.http.HttpClientConnection;
41  import org.apache.http.HttpHost;
42  import org.apache.http.HttpRequest;
43  import org.apache.http.HttpRequestInterceptor;
44  import org.apache.http.HttpResponse;
45  import org.apache.http.HttpStatus;
46  import org.apache.http.HttpVersion;
47  import org.apache.http.config.Registry;
48  import org.apache.http.config.RegistryBuilder;
49  import org.apache.http.conn.ConnectTimeoutException;
50  import org.apache.http.conn.ConnectionPoolTimeoutException;
51  import org.apache.http.conn.ConnectionRequest;
52  import org.apache.http.conn.HttpClientConnectionManager;
53  import org.apache.http.conn.routing.HttpRoute;
54  import org.apache.http.conn.socket.ConnectionSocketFactory;
55  import org.apache.http.conn.socket.PlainConnectionSocketFactory;
56  import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
57  import org.apache.http.localserver.LocalServerTestBase;
58  import org.apache.http.message.BasicHttpRequest;
59  import org.apache.http.protocol.BasicHttpContext;
60  import org.apache.http.protocol.HttpContext;
61  import org.apache.http.protocol.HttpCoreContext;
62  import org.apache.http.protocol.HttpProcessor;
63  import org.apache.http.protocol.HttpRequestExecutor;
64  import org.apache.http.protocol.ImmutableHttpProcessor;
65  import org.apache.http.protocol.RequestConnControl;
66  import org.apache.http.protocol.RequestContent;
67  import org.apache.http.util.EntityUtils;
68  import org.junit.Assert;
69  import org.junit.Test;
70  
71  /**
72   * Tests for {@code PoolingHttpClientConnectionManager} that do require a server
73   * to communicate with.
74   */
75  public class TestConnectionManagement extends LocalServerTestBase {
76  
77      private static HttpClientConnection getConnection(
78              final HttpClientConnectionManager mgr,
79              final HttpRoute route,
80              final long timeout,
81              final TimeUnit unit) throws ConnectionPoolTimeoutException, ExecutionException, InterruptedException {
82          final ConnectionRequest connRequest = mgr.requestConnection(route, null);
83          return connRequest.get(timeout, unit);
84      }
85  
86      private static HttpClientConnection getConnection(
87              final HttpClientConnectionManager mgr,
88              final HttpRoute route) throws ConnectionPoolTimeoutException, ExecutionException, InterruptedException {
89          final ConnectionRequest connRequest = mgr.requestConnection(route, null);
90          return connRequest.get(0, TimeUnit.MILLISECONDS);
91      }
92  
93      /**
94       * Tests releasing and re-using a connection after a response is read.
95       */
96      @Test
97      public void testReleaseConnection() throws Exception {
98  
99          this.connManager.setMaxTotal(1);
100 
101         final HttpHost target = start();
102         final HttpRoute route = new HttpRoute(target, null, false);
103         final int      rsplen = 8;
104         final String      uri = "/random/" + rsplen;
105 
106         final HttpRequest request = new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
107         final HttpContext context = new BasicHttpContext();
108 
109         HttpClientConnection conn = getConnection(this.connManager, route);
110         this.connManager.connect(conn, route, 0, context);
111         this.connManager.routeComplete(conn, route, context);
112 
113         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
114         context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
115 
116         final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
117                 new HttpRequestInterceptor[] { new RequestContent(), new RequestConnControl() });
118 
119         final HttpRequestExecutor exec = new HttpRequestExecutor();
120         exec.preProcess(request, httpProcessor, context);
121         HttpResponse response = exec.execute(request, conn, context);
122 
123         Assert.assertEquals("wrong status in first response",
124                      HttpStatus.SC_OK,
125                      response.getStatusLine().getStatusCode());
126         byte[] data = EntityUtils.toByteArray(response.getEntity());
127         Assert.assertEquals("wrong length of first response entity",
128                      rsplen, data.length);
129         // ignore data, but it must be read
130 
131         // check that there is no auto-release by default
132         try {
133             // this should fail quickly, connection has not been released
134             getConnection(this.connManager, route, 10L, TimeUnit.MILLISECONDS);
135             Assert.fail("ConnectionPoolTimeoutException should have been thrown");
136         } catch (final ConnectionPoolTimeoutException e) {
137             // expected
138         }
139 
140         conn.close();
141         this.connManager.releaseConnection(conn, null, -1, null);
142         conn = getConnection(this.connManager, route);
143         Assert.assertFalse("connection should have been closed", conn.isOpen());
144 
145         this.connManager.connect(conn, route, 0, context);
146         this.connManager.routeComplete(conn, route, context);
147 
148         // repeat the communication, no need to prepare the request again
149         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
150         response = exec.execute(request, conn, context);
151 
152         Assert.assertEquals("wrong status in second response",
153                      HttpStatus.SC_OK,
154                      response.getStatusLine().getStatusCode());
155         data = EntityUtils.toByteArray(response.getEntity());
156         Assert.assertEquals("wrong length of second response entity",
157                      rsplen, data.length);
158         // ignore data, but it must be read
159 
160         // release connection after marking it for re-use
161         // expect the next connection obtained to be open
162         this.connManager.releaseConnection(conn, null, -1, null);
163         conn = getConnection(this.connManager, route);
164         Assert.assertTrue("connection should have been open", conn.isOpen());
165 
166         // repeat the communication, no need to prepare the request again
167         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
168         response = exec.execute(request, conn, context);
169 
170         Assert.assertEquals("wrong status in third response",
171                      HttpStatus.SC_OK,
172                      response.getStatusLine().getStatusCode());
173         data = EntityUtils.toByteArray(response.getEntity());
174         Assert.assertEquals("wrong length of third response entity",
175                      rsplen, data.length);
176         // ignore data, but it must be read
177 
178         this.connManager.releaseConnection(conn, null, -1, null);
179         this.connManager.shutdown();
180     }
181 
182     /**
183      * Tests releasing with time limits.
184      */
185     @Test
186     public void testReleaseConnectionWithTimeLimits() throws Exception {
187 
188         this.connManager.setMaxTotal(1);
189 
190         final HttpHost target = start();
191         final HttpRoute route = new HttpRoute(target, null, false);
192         final int      rsplen = 8;
193         final String      uri = "/random/" + rsplen;
194 
195         final HttpRequest request = new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
196         final HttpContext context = new BasicHttpContext();
197 
198         HttpClientConnection conn = getConnection(this.connManager, route);
199         this.connManager.connect(conn, route, 0, context);
200         this.connManager.routeComplete(conn, route, context);
201 
202         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
203         context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
204 
205         final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
206                 new HttpRequestInterceptor[] { new RequestContent(), new RequestConnControl() });
207 
208         final HttpRequestExecutor exec = new HttpRequestExecutor();
209         exec.preProcess(request, httpProcessor, context);
210         HttpResponse response = exec.execute(request, conn, context);
211 
212         Assert.assertEquals("wrong status in first response",
213                      HttpStatus.SC_OK,
214                      response.getStatusLine().getStatusCode());
215         byte[] data = EntityUtils.toByteArray(response.getEntity());
216         Assert.assertEquals("wrong length of first response entity",
217                      rsplen, data.length);
218         // ignore data, but it must be read
219 
220         // check that there is no auto-release by default
221         try {
222             // this should fail quickly, connection has not been released
223             getConnection(this.connManager, route, 10L, TimeUnit.MILLISECONDS);
224             Assert.fail("ConnectionPoolTimeoutException should have been thrown");
225         } catch (final ConnectionPoolTimeoutException e) {
226             // expected
227         }
228 
229         conn.close();
230         this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
231         conn = getConnection(this.connManager, route);
232         Assert.assertFalse("connection should have been closed", conn.isOpen());
233 
234         // repeat the communication, no need to prepare the request again
235         this.connManager.connect(conn, route, 0, context);
236         this.connManager.routeComplete(conn, route, context);
237 
238         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
239         response = exec.execute(request, conn, context);
240 
241         Assert.assertEquals("wrong status in second response",
242                      HttpStatus.SC_OK,
243                      response.getStatusLine().getStatusCode());
244         data = EntityUtils.toByteArray(response.getEntity());
245         Assert.assertEquals("wrong length of second response entity",
246                      rsplen, data.length);
247         // ignore data, but it must be read
248 
249         this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
250         conn = getConnection(this.connManager, route);
251         Assert.assertTrue("connection should have been open", conn.isOpen());
252 
253         // repeat the communication, no need to prepare the request again
254         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
255         response = exec.execute(request, conn, context);
256 
257         Assert.assertEquals("wrong status in third response",
258                      HttpStatus.SC_OK,
259                      response.getStatusLine().getStatusCode());
260         data = EntityUtils.toByteArray(response.getEntity());
261         Assert.assertEquals("wrong length of third response entity",
262                      rsplen, data.length);
263         // ignore data, but it must be read
264 
265         this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
266         Thread.sleep(150);
267         conn = getConnection(this.connManager, route);
268         Assert.assertTrue("connection should have been closed", !conn.isOpen());
269 
270         // repeat the communication, no need to prepare the request again
271         this.connManager.connect(conn, route, 0, context);
272         this.connManager.routeComplete(conn, route, context);
273 
274         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
275         response = exec.execute(request, conn, context);
276 
277         Assert.assertEquals("wrong status in third response",
278                      HttpStatus.SC_OK,
279                      response.getStatusLine().getStatusCode());
280         data = EntityUtils.toByteArray(response.getEntity());
281         Assert.assertEquals("wrong length of fourth response entity",
282                      rsplen, data.length);
283         // ignore data, but it must be read
284 
285         this.connManager.shutdown();
286     }
287 
288     @Test
289     public void testCloseExpiredIdleConnections() throws Exception {
290 
291         this.connManager.setMaxTotal(1);
292 
293         final HttpHost target = start();
294         final HttpRoute route = new HttpRoute(target, null, false);
295         final HttpContext context = new BasicHttpContext();
296 
297         final HttpClientConnection conn = getConnection(this.connManager, route);
298         this.connManager.connect(conn, route, 0, context);
299         this.connManager.routeComplete(conn, route, context);
300 
301         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
302         Assert.assertEquals(1, this.connManager.getTotalStats().getLeased());
303         Assert.assertEquals(1, this.connManager.getStats(route).getLeased());
304 
305         this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
306 
307         // Released, still active.
308         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
309         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
310         Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
311 
312         this.connManager.closeExpiredConnections();
313 
314         // Time has not expired yet.
315         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
316         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
317         Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
318 
319         Thread.sleep(150);
320 
321         this.connManager.closeExpiredConnections();
322 
323         // Time expired now, connections are destroyed.
324         Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes());
325         Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
326         Assert.assertEquals(0, this.connManager.getStats(route).getAvailable());
327 
328         this.connManager.shutdown();
329     }
330 
331     @Test
332     public void testCloseExpiredTTLConnections() throws Exception {
333 
334         this.connManager = new PoolingHttpClientConnectionManager(
335                 100, TimeUnit.MILLISECONDS);
336         this.clientBuilder.setConnectionManager(this.connManager);
337 
338         this.connManager.setMaxTotal(1);
339 
340         final HttpHost target = start();
341         final HttpRoute route = new HttpRoute(target, null, false);
342         final HttpContext context = new BasicHttpContext();
343 
344         final HttpClientConnection conn = getConnection(this.connManager, route);
345         this.connManager.connect(conn, route, 0, context);
346         this.connManager.routeComplete(conn, route, context);
347 
348         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
349         Assert.assertEquals(1, this.connManager.getTotalStats().getLeased());
350         Assert.assertEquals(1, this.connManager.getStats(route).getLeased());
351         // Release, let remain idle for forever
352         this.connManager.releaseConnection(conn, null, -1, TimeUnit.MILLISECONDS);
353 
354         // Released, still active.
355         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
356         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
357         Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
358 
359         this.connManager.closeExpiredConnections();
360 
361         // Time has not expired yet.
362         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
363         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
364         Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
365 
366         Thread.sleep(150);
367 
368         this.connManager.closeExpiredConnections();
369 
370         // TTL expired now, connections are destroyed.
371         Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes());
372         Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
373         Assert.assertEquals(0, this.connManager.getStats(route).getAvailable());
374 
375         this.connManager.shutdown();
376     }
377 
378     /**
379      * Tests releasing connection from #abort method called from the
380      * main execution thread while there is no blocking I/O operation.
381      */
382     @Test
383     public void testReleaseConnectionOnAbort() throws Exception {
384 
385         this.connManager.setMaxTotal(1);
386 
387         final HttpHost target = start();
388         final HttpRoute route = new HttpRoute(target, null, false);
389         final int      rsplen = 8;
390         final String      uri = "/random/" + rsplen;
391         final HttpContext context = new BasicHttpContext();
392 
393         final HttpRequest request =
394             new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
395 
396         HttpClientConnection conn = getConnection(this.connManager, route);
397         this.connManager.connect(conn, route, 0, context);
398         this.connManager.routeComplete(conn, route, context);
399 
400         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
401         context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
402 
403         final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
404                 new HttpRequestInterceptor[] { new RequestContent(), new RequestConnControl() });
405 
406         final HttpRequestExecutor exec = new HttpRequestExecutor();
407         exec.preProcess(request, httpProcessor, context);
408         final HttpResponse response = exec.execute(request, conn, context);
409 
410         Assert.assertEquals("wrong status in first response",
411                      HttpStatus.SC_OK,
412                      response.getStatusLine().getStatusCode());
413 
414         // check that there are no connections available
415         try {
416             // this should fail quickly, connection has not been released
417             getConnection(this.connManager, route, 100L, TimeUnit.MILLISECONDS);
418             Assert.fail("ConnectionPoolTimeoutException should have been thrown");
419         } catch (final ConnectionPoolTimeoutException e) {
420             // expected
421         }
422 
423         // abort the connection
424         Assert.assertTrue(conn instanceof HttpClientConnection);
425         conn.shutdown();
426         this.connManager.releaseConnection(conn, null, -1, null);
427 
428         // the connection is expected to be released back to the manager
429         conn = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
430         Assert.assertFalse("connection should have been closed", conn.isOpen());
431 
432         this.connManager.releaseConnection(conn, null, -1, null);
433         this.connManager.shutdown();
434     }
435 
436     @Test
437     public void testAbortDuringConnecting() throws Exception {
438         final CountDownLatch connectLatch = new CountDownLatch(1);
439         final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
440                 connectLatch, WaitPolicy.BEFORE_CONNECT, PlainConnectionSocketFactory.getSocketFactory());
441         final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
442             .register("http", stallingSocketFactory)
443             .build();
444 
445         this.connManager = new PoolingHttpClientConnectionManager(registry);
446         this.clientBuilder.setConnectionManager(this.connManager);
447 
448         this.connManager.setMaxTotal(1);
449 
450         final HttpHost target = start();
451         final HttpRoute route = new HttpRoute(target, null, false);
452         final HttpContext context = new BasicHttpContext();
453 
454         final HttpClientConnection conn = getConnection(this.connManager, route);
455 
456         final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
457         final Thread abortingThread = new Thread(new Runnable() {
458             @Override
459             public void run() {
460                 try {
461                     stallingSocketFactory.waitForState();
462                     conn.shutdown();
463                     connManager.releaseConnection(conn, null, -1, null);
464                     connectLatch.countDown();
465                 } catch (final Throwable e) {
466                     throwRef.set(e);
467                 }
468             }
469         });
470         abortingThread.start();
471 
472         try {
473             this.connManager.connect(conn, route, 0, context);
474             this.connManager.routeComplete(conn, route, context);
475             Assert.fail("expected SocketException");
476         } catch(final SocketException expected) {}
477 
478         abortingThread.join(5000);
479         if(throwRef.get() != null) {
480             throw new RuntimeException(throwRef.get());
481         }
482 
483         Assert.assertFalse(conn.isOpen());
484 
485         // the connection is expected to be released back to the manager
486         final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
487         Assert.assertFalse("connection should have been closed", conn2.isOpen());
488 
489         this.connManager.releaseConnection(conn2, null, -1, null);
490         this.connManager.shutdown();
491     }
492 
493     @Test
494     public void testAbortBeforeSocketCreate() throws Exception {
495         final CountDownLatch connectLatch = new CountDownLatch(1);
496         final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
497                 connectLatch, WaitPolicy.BEFORE_CREATE, PlainConnectionSocketFactory.getSocketFactory());
498         final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
499             .register("http", stallingSocketFactory)
500             .build();
501 
502         this.connManager = new PoolingHttpClientConnectionManager(registry);
503         this.clientBuilder.setConnectionManager(this.connManager);
504 
505         this.connManager.setMaxTotal(1);
506 
507         final HttpHost target = start();
508         final HttpRoute route = new HttpRoute(target, null, false);
509         final HttpContext context = new BasicHttpContext();
510 
511         final HttpClientConnection conn = getConnection(this.connManager, route);
512 
513         final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
514         final Thread abortingThread = new Thread(new Runnable() {
515             @Override
516             public void run() {
517                 try {
518                     stallingSocketFactory.waitForState();
519                     conn.shutdown();
520                     connManager.releaseConnection(conn, null, -1, null);
521                     connectLatch.countDown();
522                 } catch (final Throwable e) {
523                     throwRef.set(e);
524                 }
525             }
526         });
527         abortingThread.start();
528 
529         try {
530             this.connManager.connect(conn, route, 0, context);
531             this.connManager.routeComplete(conn, route, context);
532             Assert.fail("IOException expected");
533         } catch(final IOException expected) {
534         }
535 
536         abortingThread.join(5000);
537         if(throwRef.get() != null) {
538             throw new RuntimeException(throwRef.get());
539         }
540 
541         Assert.assertFalse(conn.isOpen());
542 
543         // the connection is expected to be released back to the manager
544         final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
545         Assert.assertFalse("connection should have been closed", conn2.isOpen());
546 
547         this.connManager.releaseConnection(conn2, null, -1, null);
548         this.connManager.shutdown();
549     }
550 
551     @Test
552     public void testAbortAfterSocketConnect() throws Exception {
553         final CountDownLatch connectLatch = new CountDownLatch(1);
554         final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
555                 connectLatch, WaitPolicy.AFTER_CONNECT, PlainConnectionSocketFactory.getSocketFactory());
556         final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
557             .register("http", stallingSocketFactory)
558             .build();
559 
560         this.connManager = new PoolingHttpClientConnectionManager(registry);
561         this.clientBuilder.setConnectionManager(this.connManager);
562 
563         this.connManager.setMaxTotal(1);
564 
565         final HttpHost target = start();
566         final HttpRoute route = new HttpRoute(target, null, false);
567         final HttpContext context = new BasicHttpContext();
568 
569         final HttpClientConnection conn = getConnection(this.connManager, route);
570 
571         final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
572         final Thread abortingThread = new Thread(new Runnable() {
573             @Override
574             public void run() {
575                 try {
576                     stallingSocketFactory.waitForState();
577                     conn.shutdown();
578                     connManager.releaseConnection(conn, null, -1, null);
579                     connectLatch.countDown();
580                 } catch (final Throwable e) {
581                     throwRef.set(e);
582                 }
583             }
584         });
585         abortingThread.start();
586 
587         try {
588             this.connManager.connect(conn, route, 0, context);
589             this.connManager.routeComplete(conn, route, context);
590             Assert.fail("IOException expected");
591         } catch(final IOException expected) {
592         }
593 
594         abortingThread.join(5000);
595         if(throwRef.get() != null) {
596             throw new RuntimeException(throwRef.get());
597         }
598 
599         Assert.assertFalse(conn.isOpen());
600 
601         // the connection is expected to be released back to the manager
602         final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
603         Assert.assertFalse("connection should have been closed", conn2.isOpen());
604 
605         this.connManager.releaseConnection(conn2, null, -1, null);
606         this.connManager.shutdown();
607     }
608 
609     static class LatchSupport {
610 
611         private final CountDownLatch continueLatch;
612         private final CountDownLatch waitLatch = new CountDownLatch(1);
613         protected final WaitPolicy waitPolicy;
614 
615         LatchSupport(final CountDownLatch continueLatch, final WaitPolicy waitPolicy) {
616             this.continueLatch = continueLatch;
617             this.waitPolicy = waitPolicy;
618         }
619 
620         void waitForState() throws InterruptedException {
621             if(!waitLatch.await(1, TimeUnit.SECONDS)) {
622                 throw new RuntimeException("waited too long");
623             }
624         }
625 
626         void latch() {
627             waitLatch.countDown();
628             try {
629                 if (!continueLatch.await(60, TimeUnit.SECONDS)) {
630                     throw new RuntimeException("waited too long!");
631                 }
632             } catch (final InterruptedException e) {
633                 throw new RuntimeException(e);
634             }
635         }
636     }
637 
638     private static class StallingSocketFactory extends LatchSupport implements ConnectionSocketFactory {
639 
640         private final ConnectionSocketFactory delegate;
641 
642         public StallingSocketFactory(
643                 final CountDownLatch continueLatch,
644                 final WaitPolicy waitPolicy,
645                 final ConnectionSocketFactory delegate) {
646             super(continueLatch, waitPolicy);
647             this.delegate = delegate;
648         }
649 
650         @Override
651         public Socket connectSocket(
652                 final int connectTimeout,
653                 final Socket sock,
654                 final HttpHost host,
655                 final InetSocketAddress remoteAddress,
656                 final InetSocketAddress localAddress,
657                 final HttpContext context) throws IOException, ConnectTimeoutException {
658             if(waitPolicy == WaitPolicy.BEFORE_CONNECT) {
659                 latch();
660             }
661 
662             final Socket socket = delegate.connectSocket(
663                     connectTimeout, sock, host, remoteAddress, localAddress, context);
664 
665             if(waitPolicy == WaitPolicy.AFTER_CONNECT) {
666                 latch();
667             }
668 
669             return socket;
670         }
671 
672         @Override
673         public Socket createSocket(final HttpContext context) throws IOException {
674             if(waitPolicy == WaitPolicy.BEFORE_CREATE) {
675                 latch();
676             }
677 
678             return delegate.createSocket(context);
679         }
680 
681     }
682 
683     private enum WaitPolicy { BEFORE_CREATE, BEFORE_CONNECT, AFTER_CONNECT, AFTER_OPEN }
684 
685 }