1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.testing.sync;
29
30 import java.io.ByteArrayInputStream;
31 import java.io.IOException;
32 import java.net.URI;
33 import java.nio.charset.StandardCharsets;
34 import java.util.ArrayList;
35 import java.util.List;
36
37 import org.apache.hc.client5.http.classic.methods.HttpGet;
38 import org.apache.hc.client5.http.classic.methods.HttpPost;
39 import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
40 import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
41 import org.apache.hc.core5.http.ClassicHttpResponse;
42 import org.apache.hc.core5.http.ContentType;
43 import org.apache.hc.core5.http.EntityDetails;
44 import org.apache.hc.core5.http.Header;
45 import org.apache.hc.core5.http.HeaderElements;
46 import org.apache.hc.core5.http.HttpException;
47 import org.apache.hc.core5.http.HttpHeaders;
48 import org.apache.hc.core5.http.HttpHost;
49 import org.apache.hc.core5.http.HttpResponse;
50 import org.apache.hc.core5.http.HttpResponseInterceptor;
51 import org.apache.hc.core5.http.impl.HttpProcessors;
52 import org.apache.hc.core5.http.io.entity.EntityUtils;
53 import org.apache.hc.core5.http.io.entity.InputStreamEntity;
54 import org.apache.hc.core5.http.protocol.HttpContext;
55 import org.apache.hc.core5.http.protocol.HttpProcessor;
56 import org.junit.Assert;
57 import org.junit.Test;
58
59 public class TestConnectionReuse extends LocalServerTestBase {
60
61 @Test
62 public void testReuseOfPersistentConnections() throws Exception {
63 this.connManager.setMaxTotal(5);
64 this.connManager.setDefaultMaxPerRoute(5);
65
66 final HttpHost target = start();
67
68 final WorkerThread[] workers = new WorkerThread[10];
69 for (int i = 0; i < workers.length; i++) {
70 workers[i] = new WorkerThread(
71 this.httpclient,
72 target,
73 new URI("/random/2000"),
74 10, false);
75 }
76
77 for (final WorkerThread worker : workers) {
78 worker.start();
79 }
80 for (final WorkerThread worker : workers) {
81 worker.join(10000);
82 final Exception ex = worker.getException();
83 if (ex != null) {
84 throw ex;
85 }
86 }
87
88
89 Assert.assertEquals(0, this.connManager.getTotalStats().getLeased());
90
91 Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
92 }
93
94 @Test
95 public void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
96 this.connManager.setMaxTotal(5);
97 this.connManager.setDefaultMaxPerRoute(5);
98
99 final HttpHost target = start();
100
101 final WorkerThread[] workers = new WorkerThread[10];
102 for (int i = 0; i < workers.length; i++) {
103 final List<HttpUriRequestBase> requests = new ArrayList<>();
104 for (int j = 0; j < 10; j++) {
105 final HttpPost post = new HttpPost(new URI("/random/2000"));
106
107 post.setEntity(new InputStreamEntity(
108 new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
109 ContentType.APPLICATION_OCTET_STREAM));
110 requests.add(post);
111 }
112 workers[i] = new WorkerThread(this.httpclient, target, false, requests);
113 }
114
115 for (final WorkerThread worker : workers) {
116 worker.start();
117 }
118 for (final WorkerThread worker : workers) {
119 worker.join(10000);
120 final Exception ex = worker.getException();
121 if (ex != null) {
122 throw ex;
123 }
124 }
125
126
127 Assert.assertEquals(0, this.connManager.getTotalStats().getLeased());
128
129 Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
130 }
131
132 private static class AlwaysCloseConn implements HttpResponseInterceptor {
133
134 @Override
135 public void process(
136 final HttpResponse response,
137 final EntityDetails entityDetails,
138 final HttpContext context) throws HttpException, IOException {
139 response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
140 }
141
142 }
143
144 @Test
145 public void testReuseOfClosedConnections() throws Exception {
146 this.connManager.setMaxTotal(5);
147 this.connManager.setDefaultMaxPerRoute(5);
148
149 final HttpProcessor httpproc = HttpProcessors.customServer(null)
150 .add(new AlwaysCloseConn())
151 .build();
152 final HttpHost target = start(httpproc, null);
153
154 final WorkerThread[] workers = new WorkerThread[10];
155 for (int i = 0; i < workers.length; i++) {
156 workers[i] = new WorkerThread(
157 this.httpclient,
158 target,
159 new URI("/random/2000"),
160 10, false);
161 }
162
163 for (final WorkerThread worker : workers) {
164 worker.start();
165 }
166 for (final WorkerThread worker : workers) {
167 worker.join(10000);
168 final Exception ex = worker.getException();
169 if (ex != null) {
170 throw ex;
171 }
172 }
173
174
175 Assert.assertEquals(0, this.connManager.getTotalStats().getLeased());
176
177 Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
178 }
179
180 @Test
181 public void testReuseOfAbortedConnections() throws Exception {
182 this.connManager.setMaxTotal(5);
183 this.connManager.setDefaultMaxPerRoute(5);
184
185 final HttpHost target = start();
186
187 final WorkerThread[] workers = new WorkerThread[10];
188 for (int i = 0; i < workers.length; i++) {
189 workers[i] = new WorkerThread(
190 this.httpclient,
191 target,
192 new URI("/random/2000"),
193 10, true);
194 }
195
196 for (final WorkerThread worker : workers) {
197 worker.start();
198 }
199 for (final WorkerThread worker : workers) {
200 worker.join(10000);
201 final Exception ex = worker.getException();
202 if (ex != null) {
203 throw ex;
204 }
205 }
206
207
208 Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
209 }
210
211 @Test
212 public void testKeepAliveHeaderRespected() throws Exception {
213 this.connManager.setMaxTotal(1);
214 this.connManager.setDefaultMaxPerRoute(1);
215
216 final HttpProcessor httpproc = HttpProcessors.customServer(null)
217 .add(new ResponseKeepAlive())
218 .build();
219 final HttpHost target = start(httpproc, null);
220
221 ClassicHttpResponse response = this.httpclient.execute(target, new HttpGet("/random/2000"));
222 EntityUtils.consume(response.getEntity());
223
224 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
225
226 response = this.httpclient.execute(target, new HttpGet("/random/2000"));
227 EntityUtils.consume(response.getEntity());
228
229 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
230
231
232 Thread.sleep(1100);
233 response = this.httpclient.execute(target, new HttpGet("/random/2000"));
234 EntityUtils.consume(response.getEntity());
235
236 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
237
238
239
240 Thread.sleep(500);
241 response = this.httpclient.execute(target, new HttpGet("/random/2000"));
242 EntityUtils.consume(response.getEntity());
243
244
245 Assert.assertEquals(0, this.connManager.getTotalStats().getLeased());
246 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
247 }
248
249 private static class WorkerThread extends Thread {
250
251 private final HttpHost target;
252 private final CloseableHttpClient httpclient;
253 private final boolean forceClose;
254 private final List<HttpUriRequestBase> requests;
255
256 private volatile Exception exception;
257
258 public WorkerThread(
259 final CloseableHttpClient httpclient,
260 final HttpHost target,
261 final URI requestURI,
262 final int repetitions,
263 final boolean forceClose) {
264 super();
265 this.httpclient = httpclient;
266 this.target = target;
267 this.forceClose = forceClose;
268 this.requests = new ArrayList<>(repetitions);
269 for (int i = 0; i < repetitions; i++) {
270 requests.add(new HttpGet(requestURI));
271 }
272 }
273
274 public WorkerThread(
275 final CloseableHttpClient httpclient,
276 final HttpHost target,
277 final boolean forceClose,
278 final List<HttpUriRequestBase> requests) {
279 super();
280 this.httpclient = httpclient;
281 this.target = target;
282 this.forceClose = forceClose;
283 this.requests = requests;
284 }
285
286 @Override
287 public void run() {
288 try {
289 for (final HttpUriRequestBase request : requests) {
290 final ClassicHttpResponse response = this.httpclient.execute(
291 this.target,
292 request);
293 if (this.forceClose) {
294 request.cancel();
295 } else {
296 EntityUtils.consume(response.getEntity());
297 }
298 }
299 } catch (final Exception ex) {
300 this.exception = ex;
301 }
302 }
303
304 public Exception getException() {
305 return exception;
306 }
307
308 }
309
310
311
312 private static class ResponseKeepAlive implements HttpResponseInterceptor {
313 @Override
314 public void process(
315 final HttpResponse response,
316 final EntityDetails entityDetails,
317 final HttpContext context) throws HttpException, IOException {
318 final Header connection = response.getFirstHeader(HttpHeaders.CONNECTION);
319 if(connection != null) {
320 if(!connection.getValue().equalsIgnoreCase("Close")) {
321 response.addHeader(HeaderElements.KEEP_ALIVE, "timeout=1");
322 }
323 }
324 }
325 }
326
327 }