View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.testing.sync;
29  
30  import java.io.ByteArrayInputStream;
31  import java.io.IOException;
32  import java.net.URI;
33  import java.nio.charset.StandardCharsets;
34  import java.util.ArrayList;
35  import java.util.List;
36  
37  import org.apache.hc.client5.http.classic.methods.HttpGet;
38  import org.apache.hc.client5.http.classic.methods.HttpPost;
39  import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
40  import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
41  import org.apache.hc.core5.http.ClassicHttpResponse;
42  import org.apache.hc.core5.http.ContentType;
43  import org.apache.hc.core5.http.EntityDetails;
44  import org.apache.hc.core5.http.Header;
45  import org.apache.hc.core5.http.HeaderElements;
46  import org.apache.hc.core5.http.HttpException;
47  import org.apache.hc.core5.http.HttpHeaders;
48  import org.apache.hc.core5.http.HttpHost;
49  import org.apache.hc.core5.http.HttpResponse;
50  import org.apache.hc.core5.http.HttpResponseInterceptor;
51  import org.apache.hc.core5.http.impl.HttpProcessors;
52  import org.apache.hc.core5.http.io.entity.EntityUtils;
53  import org.apache.hc.core5.http.io.entity.InputStreamEntity;
54  import org.apache.hc.core5.http.protocol.HttpContext;
55  import org.apache.hc.core5.http.protocol.HttpProcessor;
56  import org.junit.Assert;
57  import org.junit.Test;
58  
59  public class TestConnectionReuse extends LocalServerTestBase {
60  
61      @Test
62      public void testReuseOfPersistentConnections() throws Exception {
63          this.connManager.setMaxTotal(5);
64          this.connManager.setDefaultMaxPerRoute(5);
65  
66          final HttpHost target = start();
67  
68          final WorkerThread[] workers = new WorkerThread[10];
69          for (int i = 0; i < workers.length; i++) {
70              workers[i] = new WorkerThread(
71                      this.httpclient,
72                      target,
73                      new URI("/random/2000"),
74                      10, false);
75          }
76  
77          for (final WorkerThread worker : workers) {
78              worker.start();
79          }
80          for (final WorkerThread worker : workers) {
81              worker.join(10000);
82              final Exception ex = worker.getException();
83              if (ex != null) {
84                  throw ex;
85              }
86          }
87  
88          // Expect leased connections to be returned
89          Assert.assertEquals(0, this.connManager.getTotalStats().getLeased());
90          // Expect some connection in the pool
91          Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
92      }
93  
94      @Test
95      public void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
96          this.connManager.setMaxTotal(5);
97          this.connManager.setDefaultMaxPerRoute(5);
98  
99          final HttpHost target = start();
100 
101         final WorkerThread[] workers = new WorkerThread[10];
102         for (int i = 0; i < workers.length; i++) {
103             final List<HttpUriRequestBase> requests = new ArrayList<>();
104             for (int j = 0; j < 10; j++) {
105                 final HttpPost post = new HttpPost(new URI("/random/2000"));
106                 // non-repeatable
107                 post.setEntity(new InputStreamEntity(
108                         new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
109                         ContentType.APPLICATION_OCTET_STREAM));
110                 requests.add(post);
111             }
112             workers[i] = new WorkerThread(this.httpclient, target, false, requests);
113         }
114 
115         for (final WorkerThread worker : workers) {
116             worker.start();
117         }
118         for (final WorkerThread worker : workers) {
119             worker.join(10000);
120             final Exception ex = worker.getException();
121             if (ex != null) {
122                 throw ex;
123             }
124         }
125 
126         // Expect leased connections to be returned
127         Assert.assertEquals(0, this.connManager.getTotalStats().getLeased());
128         // Expect some connection in the pool
129         Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
130     }
131 
132     private static class AlwaysCloseConn implements HttpResponseInterceptor {
133 
134         @Override
135         public void process(
136                 final HttpResponse response,
137                 final EntityDetails entityDetails,
138                 final HttpContext context) throws HttpException, IOException {
139             response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
140         }
141 
142     }
143 
144     @Test
145     public void testReuseOfClosedConnections() throws Exception {
146         this.connManager.setMaxTotal(5);
147         this.connManager.setDefaultMaxPerRoute(5);
148 
149         final HttpProcessor httpproc = HttpProcessors.customServer(null)
150                 .add(new AlwaysCloseConn())
151                 .build();
152         final HttpHost target = start(httpproc, null);
153 
154         final WorkerThread[] workers = new WorkerThread[10];
155         for (int i = 0; i < workers.length; i++) {
156             workers[i] = new WorkerThread(
157                     this.httpclient,
158                     target,
159                     new URI("/random/2000"),
160                     10, false);
161         }
162 
163         for (final WorkerThread worker : workers) {
164             worker.start();
165         }
166         for (final WorkerThread worker : workers) {
167             worker.join(10000);
168             final Exception ex = worker.getException();
169             if (ex != null) {
170                 throw ex;
171             }
172         }
173 
174         // Expect leased connections to be returned
175         Assert.assertEquals(0, this.connManager.getTotalStats().getLeased());
176         // Expect zero connections in the pool
177         Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
178     }
179 
180     @Test
181     public void testReuseOfAbortedConnections() throws Exception {
182         this.connManager.setMaxTotal(5);
183         this.connManager.setDefaultMaxPerRoute(5);
184 
185         final HttpHost target = start();
186 
187         final WorkerThread[] workers = new WorkerThread[10];
188         for (int i = 0; i < workers.length; i++) {
189             workers[i] = new WorkerThread(
190                     this.httpclient,
191                     target,
192                     new URI("/random/2000"),
193                     10, true);
194         }
195 
196         for (final WorkerThread worker : workers) {
197             worker.start();
198         }
199         for (final WorkerThread worker : workers) {
200             worker.join(10000);
201             final Exception ex = worker.getException();
202             if (ex != null) {
203                 throw ex;
204             }
205         }
206 
207         // Expect zero connections in the pool
208         Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
209     }
210 
211     @Test
212     public void testKeepAliveHeaderRespected() throws Exception {
213         this.connManager.setMaxTotal(1);
214         this.connManager.setDefaultMaxPerRoute(1);
215 
216         final HttpProcessor httpproc = HttpProcessors.customServer(null)
217                 .add(new ResponseKeepAlive())
218                 .build();
219         final HttpHost target = start(httpproc, null);
220 
221         ClassicHttpResponse response = this.httpclient.execute(target, new HttpGet("/random/2000"));
222         EntityUtils.consume(response.getEntity());
223 
224         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
225 
226         response = this.httpclient.execute(target, new HttpGet("/random/2000"));
227         EntityUtils.consume(response.getEntity());
228 
229         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
230 
231         // Now sleep for 1.1 seconds and let the timeout do its work
232         Thread.sleep(1100);
233         response = this.httpclient.execute(target, new HttpGet("/random/2000"));
234         EntityUtils.consume(response.getEntity());
235 
236         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
237 
238         // Do another request just under the 1 second limit & make
239         // sure we reuse that connection.
240         Thread.sleep(500);
241         response = this.httpclient.execute(target, new HttpGet("/random/2000"));
242         EntityUtils.consume(response.getEntity());
243 
244         // Expect leased connections to be returned
245         Assert.assertEquals(0, this.connManager.getTotalStats().getLeased());
246         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
247     }
248 
249     private static class WorkerThread extends Thread {
250 
251         private final HttpHost target;
252         private final CloseableHttpClient httpclient;
253         private final boolean forceClose;
254         private final List<HttpUriRequestBase> requests;
255 
256         private volatile Exception exception;
257 
258         public WorkerThread(
259                 final CloseableHttpClient httpclient,
260                 final HttpHost target,
261                 final URI requestURI,
262                 final int repetitions,
263                 final boolean forceClose) {
264             super();
265             this.httpclient = httpclient;
266             this.target = target;
267             this.forceClose = forceClose;
268             this.requests = new ArrayList<>(repetitions);
269             for (int i = 0; i < repetitions; i++) {
270                 requests.add(new HttpGet(requestURI));
271             }
272         }
273 
274         public WorkerThread(
275                 final CloseableHttpClient httpclient,
276                 final HttpHost target,
277                 final boolean forceClose,
278                 final List<HttpUriRequestBase> requests) {
279             super();
280             this.httpclient = httpclient;
281             this.target = target;
282             this.forceClose = forceClose;
283             this.requests = requests;
284         }
285 
286         @Override
287         public void run() {
288             try {
289                 for (final HttpUriRequestBase request : requests) {
290                     final ClassicHttpResponse response = this.httpclient.execute(
291                             this.target,
292                             request);
293                     if (this.forceClose) {
294                         request.cancel();
295                     } else {
296                         EntityUtils.consume(response.getEntity());
297                     }
298                 }
299             } catch (final Exception ex) {
300                 this.exception = ex;
301             }
302         }
303 
304         public Exception getException() {
305             return exception;
306         }
307 
308     }
309 
310     // A very basic keep-alive header interceptor, to add Keep-Alive: timeout=1
311     // if there is no Connection: close header.
312     private static class ResponseKeepAlive implements HttpResponseInterceptor {
313         @Override
314         public void process(
315                 final HttpResponse response,
316                 final EntityDetails entityDetails,
317                 final HttpContext context) throws HttpException, IOException {
318             final Header connection = response.getFirstHeader(HttpHeaders.CONNECTION);
319             if(connection != null) {
320                 if(!connection.getValue().equalsIgnoreCase("Close")) {
321                     response.addHeader(HeaderElements.KEEP_ALIVE, "timeout=1");
322                 }
323             }
324         }
325     }
326 
327 }