View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.client.integration;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketException;
34  import java.util.Collections;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.http.HttpClientConnection;
41  import org.apache.http.HttpHost;
42  import org.apache.http.HttpRequest;
43  import org.apache.http.HttpResponse;
44  import org.apache.http.HttpStatus;
45  import org.apache.http.HttpVersion;
46  import org.apache.http.config.Registry;
47  import org.apache.http.config.RegistryBuilder;
48  import org.apache.http.conn.ConnectTimeoutException;
49  import org.apache.http.conn.ConnectionPoolTimeoutException;
50  import org.apache.http.conn.ConnectionRequest;
51  import org.apache.http.conn.HttpClientConnectionManager;
52  import org.apache.http.conn.routing.HttpRoute;
53  import org.apache.http.conn.socket.ConnectionSocketFactory;
54  import org.apache.http.conn.socket.PlainConnectionSocketFactory;
55  import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
56  import org.apache.http.localserver.LocalServerTestBase;
57  import org.apache.http.message.BasicHttpRequest;
58  import org.apache.http.protocol.BasicHttpContext;
59  import org.apache.http.protocol.HttpContext;
60  import org.apache.http.protocol.HttpCoreContext;
61  import org.apache.http.protocol.HttpProcessor;
62  import org.apache.http.protocol.HttpRequestExecutor;
63  import org.apache.http.protocol.ImmutableHttpProcessor;
64  import org.apache.http.protocol.RequestConnControl;
65  import org.apache.http.protocol.RequestContent;
66  import org.apache.http.util.EntityUtils;
67  import org.junit.Assert;
68  import org.junit.Test;
69  
70  /**
71   * Tests for {@code PoolingHttpClientConnectionManager} that do require a server
72   * to communicate with.
73   */
74  public class TestConnectionManagement extends LocalServerTestBase {
75  
76      private static HttpClientConnection getConnection(
77              final HttpClientConnectionManager mgr,
78              final HttpRoute route,
79              final long timeout,
80              final TimeUnit unit) throws ConnectionPoolTimeoutException, ExecutionException, InterruptedException {
81          final ConnectionRequest connRequest = mgr.requestConnection(route, null);
82          return connRequest.get(timeout, unit);
83      }
84  
85      private static HttpClientConnection getConnection(
86              final HttpClientConnectionManager mgr,
87              final HttpRoute route) throws ConnectionPoolTimeoutException, ExecutionException, InterruptedException {
88          final ConnectionRequest connRequest = mgr.requestConnection(route, null);
89          return connRequest.get(0, TimeUnit.MILLISECONDS);
90      }
91  
92      /**
93       * Tests releasing and re-using a connection after a response is read.
94       */
95      @Test
96      public void testReleaseConnection() throws Exception {
97  
98          this.connManager.setMaxTotal(1);
99  
100         final HttpHost target = start();
101         final HttpRoute route = new HttpRoute(target, null, false);
102         final int      rsplen = 8;
103         final String      uri = "/random/" + rsplen;
104 
105         final HttpRequest request = new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
106         final HttpContext context = new BasicHttpContext();
107 
108         HttpClientConnection conn = getConnection(this.connManager, route);
109         this.connManager.connect(conn, route, 0, context);
110         this.connManager.routeComplete(conn, route, context);
111 
112         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
113         context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
114 
115         final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
116                 new RequestContent(), new RequestConnControl());
117 
118         final HttpRequestExecutor exec = new HttpRequestExecutor();
119         exec.preProcess(request, httpProcessor, context);
120         HttpResponse response = exec.execute(request, conn, context);
121 
122         Assert.assertEquals("wrong status in first response",
123                      HttpStatus.SC_OK,
124                      response.getStatusLine().getStatusCode());
125         byte[] data = EntityUtils.toByteArray(response.getEntity());
126         Assert.assertEquals("wrong length of first response entity",
127                      rsplen, data.length);
128         // ignore data, but it must be read
129 
130         // check that there is no auto-release by default
131         try {
132             // this should fail quickly, connection has not been released
133             getConnection(this.connManager, route, 10L, TimeUnit.MILLISECONDS);
134             Assert.fail("ConnectionPoolTimeoutException should have been thrown");
135         } catch (final ConnectionPoolTimeoutException e) {
136             // expected
137         }
138 
139         conn.close();
140         this.connManager.releaseConnection(conn, null, -1, null);
141         conn = getConnection(this.connManager, route);
142         Assert.assertFalse("connection should have been closed", conn.isOpen());
143 
144         this.connManager.connect(conn, route, 0, context);
145         this.connManager.routeComplete(conn, route, context);
146 
147         // repeat the communication, no need to prepare the request again
148         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
149         response = exec.execute(request, conn, context);
150 
151         Assert.assertEquals("wrong status in second response",
152                      HttpStatus.SC_OK,
153                      response.getStatusLine().getStatusCode());
154         data = EntityUtils.toByteArray(response.getEntity());
155         Assert.assertEquals("wrong length of second response entity",
156                      rsplen, data.length);
157         // ignore data, but it must be read
158 
159         // release connection after marking it for re-use
160         // expect the next connection obtained to be open
161         this.connManager.releaseConnection(conn, null, -1, null);
162         conn = getConnection(this.connManager, route);
163         Assert.assertTrue("connection should have been open", conn.isOpen());
164 
165         // repeat the communication, no need to prepare the request again
166         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
167         response = exec.execute(request, conn, context);
168 
169         Assert.assertEquals("wrong status in third response",
170                      HttpStatus.SC_OK,
171                      response.getStatusLine().getStatusCode());
172         data = EntityUtils.toByteArray(response.getEntity());
173         Assert.assertEquals("wrong length of third response entity",
174                      rsplen, data.length);
175         // ignore data, but it must be read
176 
177         this.connManager.releaseConnection(conn, null, -1, null);
178         this.connManager.shutdown();
179     }
180 
181     /**
182      * Tests releasing with time limits.
183      */
184     @Test
185     public void testReleaseConnectionWithTimeLimits() throws Exception {
186 
187         this.connManager.setMaxTotal(1);
188 
189         final HttpHost target = start();
190         final HttpRoute route = new HttpRoute(target, null, false);
191         final int      rsplen = 8;
192         final String      uri = "/random/" + rsplen;
193 
194         final HttpRequest request = new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
195         final HttpContext context = new BasicHttpContext();
196 
197         HttpClientConnection conn = getConnection(this.connManager, route);
198         this.connManager.connect(conn, route, 0, context);
199         this.connManager.routeComplete(conn, route, context);
200 
201         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
202         context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
203 
204         final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
205                 new RequestContent(), new RequestConnControl());
206 
207         final HttpRequestExecutor exec = new HttpRequestExecutor();
208         exec.preProcess(request, httpProcessor, context);
209         HttpResponse response = exec.execute(request, conn, context);
210 
211         Assert.assertEquals("wrong status in first response",
212                      HttpStatus.SC_OK,
213                      response.getStatusLine().getStatusCode());
214         byte[] data = EntityUtils.toByteArray(response.getEntity());
215         Assert.assertEquals("wrong length of first response entity",
216                      rsplen, data.length);
217         // ignore data, but it must be read
218 
219         // check that there is no auto-release by default
220         try {
221             // this should fail quickly, connection has not been released
222             getConnection(this.connManager, route, 10L, TimeUnit.MILLISECONDS);
223             Assert.fail("ConnectionPoolTimeoutException should have been thrown");
224         } catch (final ConnectionPoolTimeoutException e) {
225             // expected
226         }
227 
228         conn.close();
229         this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
230         conn = getConnection(this.connManager, route);
231         Assert.assertFalse("connection should have been closed", conn.isOpen());
232 
233         // repeat the communication, no need to prepare the request again
234         this.connManager.connect(conn, route, 0, context);
235         this.connManager.routeComplete(conn, route, context);
236 
237         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
238         response = exec.execute(request, conn, context);
239 
240         Assert.assertEquals("wrong status in second response",
241                      HttpStatus.SC_OK,
242                      response.getStatusLine().getStatusCode());
243         data = EntityUtils.toByteArray(response.getEntity());
244         Assert.assertEquals("wrong length of second response entity",
245                      rsplen, data.length);
246         // ignore data, but it must be read
247 
248         this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
249         conn = getConnection(this.connManager, route);
250         Assert.assertTrue("connection should have been open", conn.isOpen());
251 
252         // repeat the communication, no need to prepare the request again
253         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
254         response = exec.execute(request, conn, context);
255 
256         Assert.assertEquals("wrong status in third response",
257                      HttpStatus.SC_OK,
258                      response.getStatusLine().getStatusCode());
259         data = EntityUtils.toByteArray(response.getEntity());
260         Assert.assertEquals("wrong length of third response entity",
261                      rsplen, data.length);
262         // ignore data, but it must be read
263 
264         this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
265         Thread.sleep(150);
266         conn = getConnection(this.connManager, route);
267         Assert.assertTrue("connection should have been closed", !conn.isOpen());
268 
269         // repeat the communication, no need to prepare the request again
270         this.connManager.connect(conn, route, 0, context);
271         this.connManager.routeComplete(conn, route, context);
272 
273         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
274         response = exec.execute(request, conn, context);
275 
276         Assert.assertEquals("wrong status in third response",
277                      HttpStatus.SC_OK,
278                      response.getStatusLine().getStatusCode());
279         data = EntityUtils.toByteArray(response.getEntity());
280         Assert.assertEquals("wrong length of fourth response entity",
281                      rsplen, data.length);
282         // ignore data, but it must be read
283 
284         this.connManager.shutdown();
285     }
286 
287     @Test
288     public void testCloseExpiredIdleConnections() throws Exception {
289 
290         this.connManager.setMaxTotal(1);
291 
292         final HttpHost target = start();
293         final HttpRoute route = new HttpRoute(target, null, false);
294         final HttpContext context = new BasicHttpContext();
295 
296         final HttpClientConnection conn = getConnection(this.connManager, route);
297         this.connManager.connect(conn, route, 0, context);
298         this.connManager.routeComplete(conn, route, context);
299 
300         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
301         Assert.assertEquals(1, this.connManager.getTotalStats().getLeased());
302         Assert.assertEquals(1, this.connManager.getStats(route).getLeased());
303 
304         this.connManager.releaseConnection(conn, null, 100, TimeUnit.MILLISECONDS);
305 
306         // Released, still active.
307         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
308         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
309         Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
310 
311         this.connManager.closeExpiredConnections();
312 
313         // Time has not expired yet.
314         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
315         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
316         Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
317 
318         Thread.sleep(150);
319 
320         this.connManager.closeExpiredConnections();
321 
322         // Time expired now, connections are destroyed.
323         Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes());
324         Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
325         Assert.assertEquals(0, this.connManager.getStats(route).getAvailable());
326 
327         this.connManager.shutdown();
328     }
329 
330     @Test
331     public void testCloseExpiredTTLConnections() throws Exception {
332 
333         this.connManager = new PoolingHttpClientConnectionManager(
334                 100, TimeUnit.MILLISECONDS);
335         this.clientBuilder.setConnectionManager(this.connManager);
336 
337         this.connManager.setMaxTotal(1);
338 
339         final HttpHost target = start();
340         final HttpRoute route = new HttpRoute(target, null, false);
341         final HttpContext context = new BasicHttpContext();
342 
343         final HttpClientConnection conn = getConnection(this.connManager, route);
344         this.connManager.connect(conn, route, 0, context);
345         this.connManager.routeComplete(conn, route, context);
346 
347         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
348         Assert.assertEquals(1, this.connManager.getTotalStats().getLeased());
349         Assert.assertEquals(1, this.connManager.getStats(route).getLeased());
350         // Release, let remain idle for forever
351         this.connManager.releaseConnection(conn, null, -1, TimeUnit.MILLISECONDS);
352 
353         // Released, still active.
354         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
355         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
356         Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
357 
358         this.connManager.closeExpiredConnections();
359 
360         // Time has not expired yet.
361         Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
362         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
363         Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
364 
365         Thread.sleep(150);
366 
367         this.connManager.closeExpiredConnections();
368 
369         // TTL expired now, connections are destroyed.
370         Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes());
371         Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
372         Assert.assertEquals(0, this.connManager.getStats(route).getAvailable());
373 
374         this.connManager.shutdown();
375     }
376 
377     /**
378      * Tests releasing connection from #abort method called from the
379      * main execution thread while there is no blocking I/O operation.
380      */
381     @Test
382     public void testReleaseConnectionOnAbort() throws Exception {
383 
384         this.connManager.setMaxTotal(1);
385 
386         final HttpHost target = start();
387         final HttpRoute route = new HttpRoute(target, null, false);
388         final int      rsplen = 8;
389         final String      uri = "/random/" + rsplen;
390         final HttpContext context = new BasicHttpContext();
391 
392         final HttpRequest request =
393             new BasicHttpRequest("GET", uri, HttpVersion.HTTP_1_1);
394 
395         HttpClientConnection conn = getConnection(this.connManager, route);
396         this.connManager.connect(conn, route, 0, context);
397         this.connManager.routeComplete(conn, route, context);
398 
399         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
400         context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
401 
402         final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
403                 new RequestContent(), new RequestConnControl());
404 
405         final HttpRequestExecutor exec = new HttpRequestExecutor();
406         exec.preProcess(request, httpProcessor, context);
407         final HttpResponse response = exec.execute(request, conn, context);
408 
409         Assert.assertEquals("wrong status in first response",
410                      HttpStatus.SC_OK,
411                      response.getStatusLine().getStatusCode());
412 
413         // check that there are no connections available
414         try {
415             // this should fail quickly, connection has not been released
416             getConnection(this.connManager, route, 100L, TimeUnit.MILLISECONDS);
417             Assert.fail("ConnectionPoolTimeoutException should have been thrown");
418         } catch (final ConnectionPoolTimeoutException e) {
419             // expected
420         }
421 
422         // abort the connection
423         Assert.assertTrue(conn instanceof HttpClientConnection);
424         conn.shutdown();
425         this.connManager.releaseConnection(conn, null, -1, null);
426 
427         // the connection is expected to be released back to the manager
428         conn = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
429         Assert.assertFalse("connection should have been closed", conn.isOpen());
430 
431         this.connManager.releaseConnection(conn, null, -1, null);
432         this.connManager.shutdown();
433     }
434 
435     @Test
436     public void testAbortDuringConnecting() throws Exception {
437         final CountDownLatch connectLatch = new CountDownLatch(1);
438         final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
439                 connectLatch, WaitPolicy.BEFORE_CONNECT, PlainConnectionSocketFactory.getSocketFactory());
440         final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
441             .register("http", stallingSocketFactory)
442             .build();
443 
444         this.connManager = new PoolingHttpClientConnectionManager(registry);
445         this.clientBuilder.setConnectionManager(this.connManager);
446 
447         this.connManager.setMaxTotal(1);
448 
449         final HttpHost target = start();
450         final HttpRoute route = new HttpRoute(target, null, false);
451         final HttpContext context = new BasicHttpContext();
452 
453         final HttpClientConnection conn = getConnection(this.connManager, route);
454 
455         final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
456         final Thread abortingThread = new Thread(new Runnable() {
457             @Override
458             public void run() {
459                 try {
460                     stallingSocketFactory.waitForState();
461                     conn.shutdown();
462                     connManager.releaseConnection(conn, null, -1, null);
463                     connectLatch.countDown();
464                 } catch (final Throwable e) {
465                     throwRef.set(e);
466                 }
467             }
468         });
469         abortingThread.start();
470 
471         try {
472             this.connManager.connect(conn, route, 0, context);
473             this.connManager.routeComplete(conn, route, context);
474             Assert.fail("expected SocketException");
475         } catch(final SocketException expected) {}
476 
477         abortingThread.join(5000);
478         if(throwRef.get() != null) {
479             throw new RuntimeException(throwRef.get());
480         }
481 
482         Assert.assertFalse(conn.isOpen());
483 
484         // the connection is expected to be released back to the manager
485         final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
486         Assert.assertFalse("connection should have been closed", conn2.isOpen());
487 
488         this.connManager.releaseConnection(conn2, null, -1, null);
489         this.connManager.shutdown();
490     }
491 
492     @Test
493     public void testAbortBeforeSocketCreate() throws Exception {
494         final CountDownLatch connectLatch = new CountDownLatch(1);
495         final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
496                 connectLatch, WaitPolicy.BEFORE_CREATE, PlainConnectionSocketFactory.getSocketFactory());
497         final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
498             .register("http", stallingSocketFactory)
499             .build();
500 
501         this.connManager = new PoolingHttpClientConnectionManager(registry);
502         this.clientBuilder.setConnectionManager(this.connManager);
503 
504         this.connManager.setMaxTotal(1);
505 
506         final HttpHost target = start();
507         final HttpRoute route = new HttpRoute(target, null, false);
508         final HttpContext context = new BasicHttpContext();
509 
510         final HttpClientConnection conn = getConnection(this.connManager, route);
511 
512         final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
513         final Thread abortingThread = new Thread(new Runnable() {
514             @Override
515             public void run() {
516                 try {
517                     stallingSocketFactory.waitForState();
518                     conn.shutdown();
519                     connManager.releaseConnection(conn, null, -1, null);
520                     connectLatch.countDown();
521                 } catch (final Throwable e) {
522                     throwRef.set(e);
523                 }
524             }
525         });
526         abortingThread.start();
527 
528         try {
529             this.connManager.connect(conn, route, 0, context);
530             this.connManager.routeComplete(conn, route, context);
531             Assert.fail("IOException expected");
532         } catch(final IOException expected) {
533         }
534 
535         abortingThread.join(5000);
536         if(throwRef.get() != null) {
537             throw new RuntimeException(throwRef.get());
538         }
539 
540         Assert.assertFalse(conn.isOpen());
541 
542         // the connection is expected to be released back to the manager
543         final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
544         Assert.assertFalse("connection should have been closed", conn2.isOpen());
545 
546         this.connManager.releaseConnection(conn2, null, -1, null);
547         this.connManager.shutdown();
548     }
549 
550     @Test
551     public void testAbortAfterSocketConnect() throws Exception {
552         final CountDownLatch connectLatch = new CountDownLatch(1);
553         final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(
554                 connectLatch, WaitPolicy.AFTER_CONNECT, PlainConnectionSocketFactory.getSocketFactory());
555         final Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
556             .register("http", stallingSocketFactory)
557             .build();
558 
559         this.connManager = new PoolingHttpClientConnectionManager(registry);
560         this.clientBuilder.setConnectionManager(this.connManager);
561 
562         this.connManager.setMaxTotal(1);
563 
564         final HttpHost target = start();
565         final HttpRoute route = new HttpRoute(target, null, false);
566         final HttpContext context = new BasicHttpContext();
567 
568         final HttpClientConnection conn = getConnection(this.connManager, route);
569 
570         final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
571         final Thread abortingThread = new Thread(new Runnable() {
572             @Override
573             public void run() {
574                 try {
575                     stallingSocketFactory.waitForState();
576                     conn.shutdown();
577                     connManager.releaseConnection(conn, null, -1, null);
578                     connectLatch.countDown();
579                 } catch (final Throwable e) {
580                     throwRef.set(e);
581                 }
582             }
583         });
584         abortingThread.start();
585 
586         try {
587             this.connManager.connect(conn, route, 0, context);
588             this.connManager.routeComplete(conn, route, context);
589             Assert.fail("IOException expected");
590         } catch(final IOException expected) {
591         }
592 
593         abortingThread.join(5000);
594         if(throwRef.get() != null) {
595             throw new RuntimeException(throwRef.get());
596         }
597 
598         Assert.assertFalse(conn.isOpen());
599 
600         // the connection is expected to be released back to the manager
601         final HttpClientConnection conn2 = getConnection(this.connManager, route, 5L, TimeUnit.SECONDS);
602         Assert.assertFalse("connection should have been closed", conn2.isOpen());
603 
604         this.connManager.releaseConnection(conn2, null, -1, null);
605         this.connManager.shutdown();
606     }
607 
608     static class LatchSupport {
609 
610         private final CountDownLatch continueLatch;
611         private final CountDownLatch waitLatch = new CountDownLatch(1);
612         protected final WaitPolicy waitPolicy;
613 
614         LatchSupport(final CountDownLatch continueLatch, final WaitPolicy waitPolicy) {
615             this.continueLatch = continueLatch;
616             this.waitPolicy = waitPolicy;
617         }
618 
619         void waitForState() throws InterruptedException {
620             if(!waitLatch.await(1, TimeUnit.SECONDS)) {
621                 throw new RuntimeException("waited too long");
622             }
623         }
624 
625         void latch() {
626             waitLatch.countDown();
627             try {
628                 if (!continueLatch.await(60, TimeUnit.SECONDS)) {
629                     throw new RuntimeException("waited too long!");
630                 }
631             } catch (final InterruptedException e) {
632                 throw new RuntimeException(e);
633             }
634         }
635     }
636 
637     private static class StallingSocketFactory extends LatchSupport implements ConnectionSocketFactory {
638 
639         private final ConnectionSocketFactory delegate;
640 
641         public StallingSocketFactory(
642                 final CountDownLatch continueLatch,
643                 final WaitPolicy waitPolicy,
644                 final ConnectionSocketFactory delegate) {
645             super(continueLatch, waitPolicy);
646             this.delegate = delegate;
647         }
648 
649         @Override
650         public Socket connectSocket(
651                 final int connectTimeout,
652                 final Socket sock,
653                 final HttpHost host,
654                 final InetSocketAddress remoteAddress,
655                 final InetSocketAddress localAddress,
656                 final HttpContext context) throws IOException, ConnectTimeoutException {
657             if(waitPolicy == WaitPolicy.BEFORE_CONNECT) {
658                 latch();
659             }
660 
661             final Socket socket = delegate.connectSocket(
662                     connectTimeout, sock, host, remoteAddress, localAddress, context);
663 
664             if(waitPolicy == WaitPolicy.AFTER_CONNECT) {
665                 latch();
666             }
667 
668             return socket;
669         }
670 
671         @Override
672         public Socket createSocket(final HttpContext context) throws IOException {
673             if(waitPolicy == WaitPolicy.BEFORE_CREATE) {
674                 latch();
675             }
676 
677             return delegate.createSocket(context);
678         }
679 
680     }
681 
682     private enum WaitPolicy { BEFORE_CREATE, BEFORE_CONNECT, AFTER_CONNECT, AFTER_OPEN }
683 
684 }