1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.integration;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collection;
35 import java.util.List;
36 import java.util.Queue;
37 import java.util.concurrent.ConcurrentLinkedQueue;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41
42 import org.apache.http.ConnectionClosedException;
43 import org.apache.http.HttpEntityEnclosingRequest;
44 import org.apache.http.HttpException;
45 import org.apache.http.HttpHeaders;
46 import org.apache.http.HttpHost;
47 import org.apache.http.HttpRequest;
48 import org.apache.http.HttpResponse;
49 import org.apache.http.HttpStatus;
50 import org.apache.http.HttpVersion;
51 import org.apache.http.entity.ContentType;
52 import org.apache.http.entity.StringEntity;
53 import org.apache.http.message.BasicHttpEntityEnclosingRequest;
54 import org.apache.http.message.BasicHttpRequest;
55 import org.apache.http.message.BasicHttpResponse;
56 import org.apache.http.nio.entity.NStringEntity;
57 import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
58 import org.apache.http.nio.protocol.BasicAsyncRequestHandler;
59 import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
60 import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
61 import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
62 import org.apache.http.nio.protocol.HttpAsyncExchange;
63 import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
64 import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
65 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
66 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
67 import org.apache.http.nio.reactor.ListenerEndpoint;
68 import org.apache.http.nio.testserver.HttpCoreNIOTestBase;
69 import org.apache.http.protocol.HttpContext;
70 import org.apache.http.protocol.HttpProcessor;
71 import org.apache.http.protocol.HttpRequestHandler;
72 import org.apache.http.protocol.ImmutableHttpProcessor;
73 import org.apache.http.protocol.RequestConnControl;
74 import org.apache.http.protocol.RequestContent;
75 import org.apache.http.protocol.RequestTargetHost;
76 import org.apache.http.protocol.RequestUserAgent;
77 import org.apache.http.util.EntityUtils;
78 import org.junit.After;
79 import org.junit.Assert;
80 import org.junit.Before;
81 import org.junit.Test;
82 import org.junit.runner.RunWith;
83 import org.junit.runners.Parameterized;
84
85
86
87
88 @RunWith(Parameterized.class)
89 public class TestHttpAsyncHandlersPipelining extends HttpCoreNIOTestBase {
90
91 private final static long RESULT_TIMEOUT_SEC = 30;
92
93 @Parameterized.Parameters(name = "{0}")
94 public static Collection<Object[]> protocols() {
95 return Arrays.asList(new Object[][]{
96 {ProtocolScheme.http},
97 {ProtocolScheme.https},
98 });
99 }
100
101 public TestHttpAsyncHandlersPipelining(final ProtocolScheme scheme) {
102 super(scheme);
103 }
104
105 public static final HttpProcessor DEFAULT_HTTP_PROC = new ImmutableHttpProcessor(
106 new RequestContent(),
107 new RequestTargetHost(),
108 new RequestConnControl(),
109 new RequestUserAgent("TEST-CLIENT/1.1"));
110
111 @Before
112 public void setUp() throws Exception {
113 initServer();
114 initClient();
115 }
116
117 @After
118 public void tearDown() throws Exception {
119 shutDownClient();
120 shutDownServer();
121 }
122
123 private HttpHost start() throws Exception {
124 this.server.start();
125 this.client.setHttpProcessor(DEFAULT_HTTP_PROC);
126 this.client.start();
127
128 final ListenerEndpoint endpoint = this.server.getListenerEndpoint();
129 endpoint.waitFor();
130
131 final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
132 return new HttpHost("localhost", address.getPort(), getScheme().name());
133 }
134
135 private static String createRequestUri(final String pattern, final int count) {
136 return pattern + "x" + count;
137 }
138
139 private static String createExpectedString(final String pattern, final int count) {
140 final StringBuilder buffer = new StringBuilder();
141 for (int i = 0; i < count; i++) {
142 buffer.append(pattern);
143 }
144 return buffer.toString();
145 }
146
147 @Test
148 public void testHttpGets() throws Exception {
149 this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
150 final HttpHost target = start();
151
152 this.client.setMaxPerRoute(3);
153 this.client.setMaxTotal(3);
154
155 final String pattern = RndTestPatternGenerator.generateText();
156 final int count = RndTestPatternGenerator.generateCount(1000);
157
158 final String expectedPattern = createExpectedString(pattern, count);
159
160 final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
161 for (int i = 0; i < 10; i++) {
162 final String requestUri = createRequestUri(pattern, count);
163 final Future<List<HttpResponse>> future = this.client.executePipelined(target,
164 new BasicHttpRequest("GET", requestUri),
165 new BasicHttpRequest("GET", requestUri),
166 new BasicHttpRequest("GET", requestUri));
167 queue.add(future);
168 }
169
170 while (!queue.isEmpty()) {
171 final Future<List<HttpResponse>> future = queue.remove();
172 final List<HttpResponse> responses = future.get(RESULT_TIMEOUT_SEC, TimeUnit.SECONDS);
173 Assert.assertNotNull(responses);
174 Assert.assertEquals(3, responses.size());
175 for (final HttpResponse response: responses) {
176 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
177 Assert.assertEquals(expectedPattern, EntityUtils.toString(response.getEntity()));
178 }
179 }
180 }
181
182 @Test
183 public void testHttpHeads() throws Exception {
184 this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
185 final HttpHost target = start();
186
187 this.client.setMaxPerRoute(3);
188 this.client.setMaxTotal(3);
189
190 final String pattern = RndTestPatternGenerator.generateText();
191 final int count = RndTestPatternGenerator.generateCount(1000);
192
193 final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
194 for (int i = 0; i < 10; i++) {
195 final String requestUri = createRequestUri(pattern, count);
196 final HttpRequest head1 = new BasicHttpRequest("HEAD", requestUri);
197 final HttpRequest head2 = new BasicHttpRequest("HEAD", requestUri);
198 final BasicHttpEntityEnclosingRequest post1 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
199 post1.setEntity(new NStringEntity("stuff", ContentType.TEXT_PLAIN));
200 final Future<List<HttpResponse>> future = this.client.executePipelined(target, head1, head2, post1);
201 queue.add(future);
202 }
203
204 while (!queue.isEmpty()) {
205 final Future<List<HttpResponse>> future = queue.remove();
206 final List<HttpResponse> responses = future.get(RESULT_TIMEOUT_SEC, TimeUnit.SECONDS);
207 Assert.assertNotNull(responses);
208 Assert.assertEquals(3, responses.size());
209 for (final HttpResponse response: responses) {
210 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
211 }
212 }
213 }
214
215 @Test
216 public void testHttpPosts() throws Exception {
217 this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
218 final HttpHost target = start();
219
220 this.client.setMaxPerRoute(3);
221 this.client.setMaxTotal(3);
222
223 final String pattern = RndTestPatternGenerator.generateText();
224 final int count = RndTestPatternGenerator.generateCount(1000);
225
226 final String expectedPattern = createExpectedString(pattern, count);
227
228 final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
229 for (int i = 0; i < 10; i++) {
230 final String requestUri = createRequestUri(pattern, count);
231 final HttpEntityEnclosingRequest request1 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
232 final NStringEntity entity1 = new NStringEntity(expectedPattern, ContentType.DEFAULT_TEXT);
233 entity1.setChunked(RndTestPatternGenerator.generateBoolean());
234 request1.setEntity(entity1);
235 final HttpEntityEnclosingRequest request2 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
236 final NStringEntity entity2 = new NStringEntity(expectedPattern, ContentType.DEFAULT_TEXT);
237 entity2.setChunked(RndTestPatternGenerator.generateBoolean());
238 request2.setEntity(entity2);
239 final HttpEntityEnclosingRequest request3 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
240 final NStringEntity entity3 = new NStringEntity(expectedPattern, ContentType.DEFAULT_TEXT);
241 entity3.setChunked(RndTestPatternGenerator.generateBoolean());
242 request3.setEntity(entity3);
243 final Future<List<HttpResponse>> future = this.client.executePipelined(target,
244 request1, request2, request3);
245 queue.add(future);
246 }
247
248 while (!queue.isEmpty()) {
249 final Future<List<HttpResponse>> future = queue.remove();
250 final List<HttpResponse> responses = future.get(RESULT_TIMEOUT_SEC, TimeUnit.SECONDS);
251 Assert.assertNotNull(responses);
252 Assert.assertEquals(3, responses.size());
253 for (final HttpResponse response: responses) {
254 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
255 Assert.assertEquals(expectedPattern, EntityUtils.toString(response.getEntity()));
256 }
257 }
258 }
259
260 @Test
261 public void testHttpDelayedResponse() throws Exception {
262
263 class DelayedRequestHandler implements HttpAsyncRequestHandler<HttpRequest> {
264
265 private final SimpleRequestHandler requestHandler;
266
267 public DelayedRequestHandler() {
268 super();
269 this.requestHandler = new SimpleRequestHandler();
270 }
271
272 @Override
273 public HttpAsyncRequestConsumer<HttpRequest> processRequest(
274 final HttpRequest request,
275 final HttpContext context) {
276 return new BasicAsyncRequestConsumer();
277 }
278
279 @Override
280 public void handle(
281 final HttpRequest request,
282 final HttpAsyncExchange httpexchange,
283 final HttpContext context) throws HttpException, IOException {
284 final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, HttpStatus.SC_OK, "OK");
285 new Thread() {
286 @Override
287 public void run() {
288
289 try { Thread.sleep(100); } catch(final InterruptedException ie) {}
290
291 try {
292 requestHandler.handle(request, response, context);
293 } catch (final Exception ex) {
294 response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
295 }
296 httpexchange.submitResponse(new BasicAsyncResponseProducer(response));
297 }
298 }.start();
299 }
300
301 }
302
303 this.server.registerHandler("*", new DelayedRequestHandler());
304 final HttpHost target = start();
305
306 this.client.setMaxPerRoute(3);
307 this.client.setMaxTotal(3);
308
309 final String pattern1 = RndTestPatternGenerator.generateText();
310 final String pattern2 = RndTestPatternGenerator.generateText();
311 final String pattern3 = RndTestPatternGenerator.generateText();
312 final int count = RndTestPatternGenerator.generateCount(1000);
313
314 final String expectedPattern1 = createExpectedString(pattern1, count);
315 final String expectedPattern2 = createExpectedString(pattern2, count);
316 final String expectedPattern3 = createExpectedString(pattern3, count);
317
318 final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
319 for (int i = 0; i < 1; i++) {
320 final HttpRequest request1 = new BasicHttpRequest("GET", createRequestUri(pattern1, count));
321 final HttpEntityEnclosingRequest request2 = new BasicHttpEntityEnclosingRequest("POST",
322 createRequestUri(pattern2, count));
323 final NStringEntity entity2 = new NStringEntity(expectedPattern2, ContentType.DEFAULT_TEXT);
324 entity2.setChunked(RndTestPatternGenerator.generateBoolean());
325 request2.setEntity(entity2);
326 final HttpEntityEnclosingRequest request3 = new BasicHttpEntityEnclosingRequest("POST",
327 createRequestUri(pattern3, count));
328 final NStringEntity entity3 = new NStringEntity(expectedPattern3, ContentType.DEFAULT_TEXT);
329 entity3.setChunked(RndTestPatternGenerator.generateBoolean());
330 request3.setEntity(entity3);
331 final Future<List<HttpResponse>> future = this.client.executePipelined(target,
332 request1, request2, request3);
333 queue.add(future);
334 }
335
336 while (!queue.isEmpty()) {
337 final Future<List<HttpResponse>> future = queue.remove();
338 final List<HttpResponse> responses = future.get(RESULT_TIMEOUT_SEC, TimeUnit.SECONDS);
339 Assert.assertNotNull(responses);
340 Assert.assertEquals(3, responses.size());
341 for (final HttpResponse response: responses) {
342 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
343 }
344 Assert.assertEquals(expectedPattern1, EntityUtils.toString(responses.get(0).getEntity()));
345 Assert.assertEquals(expectedPattern2, EntityUtils.toString(responses.get(1).getEntity()));
346 Assert.assertEquals(expectedPattern3, EntityUtils.toString(responses.get(2).getEntity()));
347 }
348 }
349
350 @Test
351 public void testUnexpectedConnectionClosure() throws Exception {
352 this.server.registerHandler("*", new BasicAsyncRequestHandler(new HttpRequestHandler() {
353
354 @Override
355 public void handle(
356 final HttpRequest request,
357 final HttpResponse response,
358 final HttpContext context) throws HttpException, IOException {
359 response.setStatusCode(HttpStatus.SC_OK);
360 response.setEntity(new StringEntity("all is well", ContentType.TEXT_PLAIN));
361 }
362
363 }));
364 this.server.registerHandler("/boom", new BasicAsyncRequestHandler(new HttpRequestHandler() {
365
366 @Override
367 public void handle(
368 final HttpRequest request,
369 final HttpResponse response,
370 final HttpContext context) throws HttpException, IOException {
371 response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
372 response.setHeader(HttpHeaders.CONNECTION, "Close");
373 response.setEntity(new StringEntity("boooooom!!!!!", ContentType.TEXT_PLAIN));
374 }
375
376 }));
377 final HttpHost target = start();
378
379 this.client.setMaxPerRoute(3);
380 this.client.setMaxTotal(3);
381
382 for (int i = 0; i < 3; i++) {
383
384 final HttpAsyncRequestProducer p1 = new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", "/"));
385 final HttpAsyncRequestProducer p2 = new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", "/boom"));
386 final HttpAsyncRequestProducer p3 = new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", "/"));
387 final List<HttpAsyncRequestProducer> requestProducers = new ArrayList<HttpAsyncRequestProducer>();
388 requestProducers.add(p1);
389 requestProducers.add(p2);
390 requestProducers.add(p3);
391
392 final HttpAsyncResponseConsumer<HttpResponse> c1 = new BasicAsyncResponseConsumer();
393 final HttpAsyncResponseConsumer<HttpResponse> c2 = new BasicAsyncResponseConsumer();
394 final HttpAsyncResponseConsumer<HttpResponse> c3 = new BasicAsyncResponseConsumer();
395 final List<HttpAsyncResponseConsumer<HttpResponse>> responseConsumers = new ArrayList<HttpAsyncResponseConsumer<HttpResponse>>();
396 responseConsumers.add(c1);
397 responseConsumers.add(c2);
398 responseConsumers.add(c3);
399
400 final Future<List<HttpResponse>> future = this.client.executePipelined(target, requestProducers, responseConsumers, null, null);
401 try {
402 future.get(RESULT_TIMEOUT_SEC, TimeUnit.SECONDS);
403 } catch (final ExecutionException ex) {
404 final Throwable cause = ex.getCause();
405 Assert.assertTrue(cause instanceof ConnectionClosedException);
406 }
407
408 Assert.assertTrue(c1.isDone());
409 Assert.assertNotNull(c1.getResult());
410 Assert.assertTrue(c2.isDone());
411 Assert.assertNotNull(c2.getResult());
412 Assert.assertTrue(c3.isDone());
413 Assert.assertNull(c3.getResult());
414 }
415 }
416
417 }