1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import static org.hamcrest.MatcherAssert.assertThat;
31
32 import java.io.BufferedReader;
33 import java.io.BufferedWriter;
34 import java.io.IOException;
35 import java.io.InputStream;
36 import java.io.InputStreamReader;
37 import java.io.InterruptedIOException;
38 import java.io.OutputStream;
39 import java.io.OutputStreamWriter;
40 import java.net.InetSocketAddress;
41 import java.net.URI;
42 import java.net.URISyntaxException;
43 import java.nio.ByteBuffer;
44 import java.nio.channels.WritableByteChannel;
45 import java.nio.charset.Charset;
46 import java.nio.charset.StandardCharsets;
47 import java.util.HashMap;
48 import java.util.LinkedList;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.Queue;
52 import java.util.Random;
53 import java.util.StringTokenizer;
54 import java.util.concurrent.CancellationException;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.Future;
58 import java.util.concurrent.atomic.AtomicReference;
59
60 import org.apache.hc.core5.http.ConnectionReuseStrategy;
61 import org.apache.hc.core5.http.ContentLengthStrategy;
62 import org.apache.hc.core5.http.ContentType;
63 import org.apache.hc.core5.http.EntityDetails;
64 import org.apache.hc.core5.http.Header;
65 import org.apache.hc.core5.http.HeaderElements;
66 import org.apache.hc.core5.http.HttpException;
67 import org.apache.hc.core5.http.HttpHeaders;
68 import org.apache.hc.core5.http.HttpHost;
69 import org.apache.hc.core5.http.HttpRequest;
70 import org.apache.hc.core5.http.HttpResponse;
71 import org.apache.hc.core5.http.HttpStatus;
72 import org.apache.hc.core5.http.HttpVersion;
73 import org.apache.hc.core5.http.MalformedChunkCodingException;
74 import org.apache.hc.core5.http.Message;
75 import org.apache.hc.core5.http.Method;
76 import org.apache.hc.core5.http.ProtocolException;
77 import org.apache.hc.core5.http.URIScheme;
78 import org.apache.hc.core5.http.config.CharCodingConfig;
79 import org.apache.hc.core5.http.config.Http1Config;
80 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
81 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
82 import org.apache.hc.core5.http.impl.Http1StreamListener;
83 import org.apache.hc.core5.http.impl.HttpProcessors;
84 import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
85 import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
86 import org.apache.hc.core5.http.message.BasicHttpRequest;
87 import org.apache.hc.core5.http.message.BasicHttpResponse;
88 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
89 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
90 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
91 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
92 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
93 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
94 import org.apache.hc.core5.http.nio.CapacityChannel;
95 import org.apache.hc.core5.http.nio.ContentEncoder;
96 import org.apache.hc.core5.http.nio.DataStreamChannel;
97 import org.apache.hc.core5.http.nio.HandlerFactory;
98 import org.apache.hc.core5.http.nio.NHttpMessageParser;
99 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
100 import org.apache.hc.core5.http.nio.ResponseChannel;
101 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
102 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
103 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
104 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
105 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
106 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
107 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
108 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
109 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
110 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
111 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
112 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
113 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
114 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
115 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
116 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
117 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
118 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
119 import org.apache.hc.core5.http.protocol.HttpContext;
120 import org.apache.hc.core5.http.protocol.HttpProcessor;
121 import org.apache.hc.core5.http.protocol.RequestConnControl;
122 import org.apache.hc.core5.http.protocol.RequestContent;
123 import org.apache.hc.core5.http.protocol.RequestTargetHost;
124 import org.apache.hc.core5.http.protocol.RequestValidateHost;
125 import org.apache.hc.core5.reactor.IOSession;
126 import org.apache.hc.core5.reactor.ProtocolIOSession;
127 import org.apache.hc.core5.testing.SSLTestContexts;
128 import org.apache.hc.core5.testing.nio.extension.Http1TestResources;
129 import org.apache.hc.core5.util.CharArrayBuffer;
130 import org.apache.hc.core5.util.TextUtils;
131 import org.apache.hc.core5.util.Timeout;
132 import org.hamcrest.CoreMatchers;
133 import org.junit.jupiter.api.Assertions;
134 import org.junit.jupiter.api.Test;
135 import org.junit.jupiter.api.extension.RegisterExtension;
136
137 public abstract class Http1IntegrationTest {
138
139 private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
140 private static final Timeout LONG_TIMEOUT = Timeout.ofMinutes(2);
141
142 private final URIScheme scheme;
143
144 @RegisterExtension
145 private final Http1TestResources resources;
146
147 public Http1IntegrationTest(final URIScheme scheme) {
148 this.scheme = scheme;
149 this.resources = new Http1TestResources(scheme, TIMEOUT);
150 }
151
152 private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
153 try {
154 return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
155 } catch (final URISyntaxException e) {
156 throw new IllegalStateException();
157 }
158 }
159
160 @Test
161 public void testSimpleGet() throws Exception {
162 final Http1TestServer server = resources.server();
163 final Http1TestClient client = resources.client();
164
165 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
166 final InetSocketAddress serverEndpoint = server.start();
167
168 client.start();
169 final Future<ClientSessionEndpoint> connectFuture = client.connect(
170 "localhost", serverEndpoint.getPort(), TIMEOUT);
171 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
172
173 for (int i = 0; i < 5; i++) {
174 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
175 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
176 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
177 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
178 Assertions.assertNotNull(result);
179 final HttpResponse response1 = result.getHead();
180 final String entity1 = result.getBody();
181 Assertions.assertNotNull(response1);
182 Assertions.assertEquals(200, response1.getCode());
183 Assertions.assertEquals("Hi there", entity1);
184 }
185 }
186
187 @Test
188 public void testSimpleGetConnectionClose() throws Exception {
189 final Http1TestServer server = resources.server();
190 final Http1TestClient client = resources.client();
191
192 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
193 final InetSocketAddress serverEndpoint = server.start();
194
195 client.start();
196 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
197 for (int i = 0; i < 5; i++) {
198 final Future<ClientSessionEndpoint> connectFuture = client.connect(
199 "localhost", serverEndpoint.getPort(), TIMEOUT);
200 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
201 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
202 AsyncRequestBuilder.get(requestURI)
203 .addHeader(HttpHeaders.CONNECTION, "close")
204 .build(),
205 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
206 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
207 Assertions.assertNotNull(result);
208 final HttpResponse response1 = result.getHead();
209 final String entity1 = result.getBody();
210 Assertions.assertNotNull(response1);
211 Assertions.assertEquals(200, response1.getCode());
212 Assertions.assertEquals("Hi there", entity1);
213 }
214 }
215 }
216
217 @Test
218 public void testSimpleGetIdentityTransfer() throws Exception {
219 final Http1TestServer server = resources.server();
220 final Http1TestClient client = resources.client();
221
222 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
223 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
224 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
225
226 client.start();
227
228 final int reqNo = 5;
229
230 for (int i = 0; i < reqNo; i++) {
231 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
232 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
233
234 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
235 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
236 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
237 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
238
239 streamEndpoint.close();
240
241 Assertions.assertNotNull(result);
242 final HttpResponse response = result.getHead();
243 final String entity = result.getBody();
244 Assertions.assertNotNull(response);
245 Assertions.assertEquals(200, response.getCode());
246 Assertions.assertEquals("Hi there", entity);
247 }
248
249 }
250
251 @Test
252 public void testPostIdentityTransfer() throws Exception {
253 final Http1TestServer server = resources.server();
254 final Http1TestClient client = resources.client();
255
256 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
257 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
258 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
259
260 client.start();
261
262 final int reqNo = 5;
263
264 for (int i = 0; i < reqNo; i++) {
265 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
266 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
267
268 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
269 new BasicRequestProducer(Method.POST,
270 createRequestURI(serverEndpoint, "/hello"),
271 new MultiLineEntityProducer("Hello", 16 * i)),
272 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
273 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
274
275 streamEndpoint.close();
276
277 Assertions.assertNotNull(result);
278 final HttpResponse response = result.getHead();
279 final String entity = result.getBody();
280 Assertions.assertNotNull(response);
281 Assertions.assertEquals(200, response.getCode());
282 Assertions.assertEquals("Hi there", entity);
283 }
284 }
285
286 @Test
287 public void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
288 final Http1TestServer server = resources.server();
289 final Http1TestClient client = resources.client();
290
291 server.register("/hello", () -> new ImmediateResponseExchangeHandler(500, "Go away"));
292 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
293 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
294
295 client.start();
296
297 final int reqNo = 5;
298
299 for (int i = 0; i < reqNo; i++) {
300 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
301 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
302
303 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
304 new BasicRequestProducer(Method.POST,
305 createRequestURI(serverEndpoint, "/hello"),
306 new MultiLineEntityProducer("Hello", 16 * i)),
307 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
308 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
309
310 streamEndpoint.close();
311
312 Assertions.assertNotNull(result);
313 final HttpResponse response = result.getHead();
314 final String entity = result.getBody();
315 Assertions.assertNotNull(response);
316 Assertions.assertEquals(500, response.getCode());
317 Assertions.assertEquals("Go away", entity);
318 }
319 }
320
321 @Test
322 public void testSimpleGetsPipelined() throws Exception {
323 final Http1TestServer server = resources.server();
324 final Http1TestClient client = resources.client();
325
326 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
327 final InetSocketAddress serverEndpoint = server.start();
328
329 client.start();
330 final Future<ClientSessionEndpoint> connectFuture = client.connect(
331 "localhost", serverEndpoint.getPort(), TIMEOUT);
332 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
333
334 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
335 for (int i = 0; i < 5; i++) {
336 queue.add(streamEndpoint.execute(
337 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
338 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
339 }
340 while (!queue.isEmpty()) {
341 final Future<Message<HttpResponse, String>> future = queue.remove();
342 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
343 Assertions.assertNotNull(result);
344 final HttpResponse response = result.getHead();
345 final String entity = result.getBody();
346 Assertions.assertNotNull(response);
347 Assertions.assertEquals(200, response.getCode());
348 Assertions.assertEquals("Hi there", entity);
349 }
350 }
351
352 @Test
353 public void testLargeGet() throws Exception {
354 final Http1TestServer server = resources.server();
355 final Http1TestClient client = resources.client();
356
357 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 5000));
358 final InetSocketAddress serverEndpoint = server.start();
359
360 client.start();
361 final Future<ClientSessionEndpoint> connectFuture = client.connect(
362 "localhost", serverEndpoint.getPort(), TIMEOUT);
363 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
364
365 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
366 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
367 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
368
369 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
370 Assertions.assertNotNull(result1);
371 final HttpResponse response1 = result1.getHead();
372 Assertions.assertNotNull(response1);
373 Assertions.assertEquals(200, response1.getCode());
374 final String s1 = result1.getBody();
375 Assertions.assertNotNull(s1);
376 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
377 while (t1.hasMoreTokens()) {
378 Assertions.assertEquals("0123456789abcdef", t1.nextToken());
379 }
380
381 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
382 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
383 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
384
385 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
386 Assertions.assertNotNull(result2);
387 final HttpResponse response2 = result2.getHead();
388 Assertions.assertNotNull(response2);
389 Assertions.assertEquals(200, response2.getCode());
390 final String s2 = result2.getBody();
391 Assertions.assertNotNull(s2);
392 final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
393 while (t2.hasMoreTokens()) {
394 Assertions.assertEquals("0123456789abcdef", t2.nextToken());
395 }
396 }
397
398 @Test
399 public void testLargeGetsPipelined() throws Exception {
400 final Http1TestServer server = resources.server();
401 final Http1TestClient client = resources.client();
402
403 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
404 final InetSocketAddress serverEndpoint = server.start();
405
406 client.start();
407 final Future<ClientSessionEndpoint> connectFuture = client.connect(
408 "localhost", serverEndpoint.getPort(), TIMEOUT);
409 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
410
411 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
412 for (int i = 0; i < 5; i++) {
413 queue.add(streamEndpoint.execute(
414 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
415 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
416 }
417 while (!queue.isEmpty()) {
418 final Future<Message<HttpResponse, String>> future = queue.remove();
419 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
420 Assertions.assertNotNull(result);
421 final HttpResponse response = result.getHead();
422 Assertions.assertNotNull(response);
423 Assertions.assertEquals(200, response.getCode());
424 final String entity = result.getBody();
425 Assertions.assertNotNull(entity);
426 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
427 while (t.hasMoreTokens()) {
428 Assertions.assertEquals("0123456789abcdef", t.nextToken());
429 }
430 }
431 }
432
433 @Test
434 public void testBasicPost() throws Exception {
435 final Http1TestServer server = resources.server();
436 final Http1TestClient client = resources.client();
437
438 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
439 final InetSocketAddress serverEndpoint = server.start();
440
441 client.start();
442 final Future<ClientSessionEndpoint> connectFuture = client.connect(
443 "localhost", serverEndpoint.getPort(), TIMEOUT);
444 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
445
446 for (int i = 0; i < 5; i++) {
447 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
448 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
449 AsyncEntityProducers.create("Hi there")),
450 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
451 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
452 Assertions.assertNotNull(result);
453 final HttpResponse response1 = result.getHead();
454 final String entity1 = result.getBody();
455 Assertions.assertNotNull(response1);
456 Assertions.assertEquals(200, response1.getCode());
457 Assertions.assertEquals("Hi back", entity1);
458 }
459 }
460
461 @Test
462 public void testBasicPostPipelined() throws Exception {
463 final Http1TestServer server = resources.server();
464 final Http1TestClient client = resources.client();
465
466 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
467 final InetSocketAddress serverEndpoint = server.start();
468
469 client.start();
470 final Future<ClientSessionEndpoint> connectFuture = client.connect(
471 "localhost", serverEndpoint.getPort(), TIMEOUT);
472 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
473
474 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
475 for (int i = 0; i < 5; i++) {
476 queue.add(streamEndpoint.execute(
477 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
478 AsyncEntityProducers.create("Hi there")),
479 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
480 }
481 while (!queue.isEmpty()) {
482 final Future<Message<HttpResponse, String>> future = queue.remove();
483 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
484 Assertions.assertNotNull(result);
485 final HttpResponse response = result.getHead();
486 final String entity = result.getBody();
487 Assertions.assertNotNull(response);
488 Assertions.assertEquals(200, response.getCode());
489 Assertions.assertEquals("Hi back", entity);
490 }
491 }
492
493 @Test
494 public void testHttp10Post() throws Exception {
495 final Http1TestServer server = resources.server();
496 final Http1TestClient client = resources.client();
497
498 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
499 final InetSocketAddress serverEndpoint = server.start();
500
501 client.start();
502 final Future<ClientSessionEndpoint> connectFuture = client.connect(
503 "localhost", serverEndpoint.getPort(), TIMEOUT);
504 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
505
506 for (int i = 0; i < 5; i++) {
507 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
508 request.setVersion(HttpVersion.HTTP_1_0);
509 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
510 new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
511 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
512 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
513 Assertions.assertNotNull(result);
514 final HttpResponse response1 = result.getHead();
515 final String entity1 = result.getBody();
516 Assertions.assertNotNull(response1);
517 Assertions.assertEquals(200, response1.getCode());
518 Assertions.assertEquals("Hi back", entity1);
519 }
520 }
521
522 @Test
523 public void testNoEntityPost() throws Exception {
524 final Http1TestServer server = resources.server();
525 final Http1TestClient client = resources.client();
526
527 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
528 final InetSocketAddress serverEndpoint = server.start();
529
530 client.start();
531 final Future<ClientSessionEndpoint> connectFuture = client.connect(
532 "localhost", serverEndpoint.getPort(), TIMEOUT);
533 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
534
535 for (int i = 0; i < 5; i++) {
536 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
537 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
538 new BasicRequestProducer(request, null),
539 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
540 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
541 Assertions.assertNotNull(result);
542 final HttpResponse response1 = result.getHead();
543 final String entity1 = result.getBody();
544 Assertions.assertNotNull(response1);
545 Assertions.assertEquals(200, response1.getCode());
546 Assertions.assertEquals("Hi back", entity1);
547 }
548 }
549
550 @Test
551 public void testLargePost() throws Exception {
552 final Http1TestServer server = resources.server();
553 final Http1TestClient client = resources.client();
554
555 server.register("*", () -> new EchoHandler(2048));
556 final InetSocketAddress serverEndpoint = server.start();
557
558 client.start();
559 final Future<ClientSessionEndpoint> connectFuture = client.connect(
560 "localhost", serverEndpoint.getPort(), TIMEOUT);
561 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
562
563 for (int i = 0; i < 5; i++) {
564 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
565 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
566 new MultiLineEntityProducer("0123456789abcdef", 5000)),
567 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
568 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
569 Assertions.assertNotNull(result);
570 final HttpResponse response = result.getHead();
571 Assertions.assertNotNull(response);
572 Assertions.assertEquals(200, response.getCode());
573 final String entity = result.getBody();
574 Assertions.assertNotNull(entity);
575 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
576 while (t.hasMoreTokens()) {
577 Assertions.assertEquals("0123456789abcdef", t.nextToken());
578 }
579 }
580 }
581
582 @Test
583 public void testPostsPipelinedLargeResponse() throws Exception {
584 final Http1TestServer server = resources.server();
585 final Http1TestClient client = resources.client();
586
587 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
588 final InetSocketAddress serverEndpoint = server.start();
589
590 client.start();
591 final Future<ClientSessionEndpoint> connectFuture = client.connect(
592 "localhost", serverEndpoint.getPort(), TIMEOUT);
593 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
594
595 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
596 for (int i = 0; i < 2; i++) {
597 queue.add(streamEndpoint.execute(
598 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
599 AsyncEntityProducers.create("Hi there")),
600 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
601 }
602 while (!queue.isEmpty()) {
603 final Future<Message<HttpResponse, String>> future = queue.remove();
604 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
605 Assertions.assertNotNull(result);
606 final HttpResponse response = result.getHead();
607 Assertions.assertNotNull(response);
608 Assertions.assertEquals(200, response.getCode());
609 final String entity = result.getBody();
610 Assertions.assertNotNull(entity);
611 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
612 while (t.hasMoreTokens()) {
613 Assertions.assertEquals("0123456789abcdef", t.nextToken());
614 }
615 }
616 }
617
618
619 @Test
620 public void testLargePostsPipelined() throws Exception {
621 final Http1TestServer server = resources.server();
622 final Http1TestClient client = resources.client();
623
624 server.register("*", () -> new EchoHandler(2048));
625 final InetSocketAddress serverEndpoint = server.start();
626
627 client.start();
628 final Future<ClientSessionEndpoint> connectFuture = client.connect(
629 "localhost", serverEndpoint.getPort(), TIMEOUT);
630 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
631
632 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
633 for (int i = 0; i < 5; i++) {
634 queue.add(streamEndpoint.execute(
635 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
636 new MultiLineEntityProducer("0123456789abcdef", 5000)),
637 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
638 }
639 while (!queue.isEmpty()) {
640 final Future<Message<HttpResponse, String>> future = queue.remove();
641 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
642 Assertions.assertNotNull(result);
643 final HttpResponse response = result.getHead();
644 Assertions.assertNotNull(response);
645 Assertions.assertEquals(200, response.getCode());
646 final String entity = result.getBody();
647 Assertions.assertNotNull(entity);
648 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
649 while (t.hasMoreTokens()) {
650 Assertions.assertEquals("0123456789abcdef", t.nextToken());
651 }
652 }
653 }
654
655 @Test
656 public void testSimpleHead() throws Exception {
657 final Http1TestServer server = resources.server();
658 final Http1TestClient client = resources.client();
659
660 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
661 final InetSocketAddress serverEndpoint = server.start();
662
663 client.start();
664 final Future<ClientSessionEndpoint> connectFuture = client.connect(
665 "localhost", serverEndpoint.getPort(), TIMEOUT);
666 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
667
668 for (int i = 0; i < 5; i++) {
669 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
670 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
671 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
672 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
673 Assertions.assertNotNull(result);
674 final HttpResponse response1 = result.getHead();
675 Assertions.assertNotNull(response1);
676 Assertions.assertEquals(200, response1.getCode());
677 Assertions.assertNull(result.getBody());
678 }
679 }
680
681 @Test
682 public void testSimpleHeadConnectionClose() throws Exception {
683 final Http1TestServer server = resources.server();
684 final Http1TestClient client = resources.client();
685
686 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
687 final InetSocketAddress serverEndpoint = server.start();
688
689 client.start();
690 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
691 for (int i = 0; i < 5; i++) {
692 final Future<ClientSessionEndpoint> connectFuture = client.connect(
693 "localhost", serverEndpoint.getPort(), TIMEOUT);
694 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
695 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
696 AsyncRequestBuilder.head(requestURI)
697 .addHeader(HttpHeaders.CONNECTION, "close")
698 .build(),
699 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
700 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
701 Assertions.assertNotNull(result);
702 final HttpResponse response1 = result.getHead();
703 Assertions.assertNotNull(response1);
704 Assertions.assertEquals(200, response1.getCode());
705 Assertions.assertNull(result.getBody());
706 }
707 }
708 }
709
710 @Test
711 public void testHeadPipelined() throws Exception {
712 final Http1TestServer server = resources.server();
713 final Http1TestClient client = resources.client();
714
715 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
716 final InetSocketAddress serverEndpoint = server.start();
717
718 client.start();
719 final Future<ClientSessionEndpoint> connectFuture = client.connect(
720 "localhost", serverEndpoint.getPort(), TIMEOUT);
721 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
722
723 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
724 for (int i = 0; i < 5; i++) {
725 queue.add(streamEndpoint.execute(
726 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
727 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
728 }
729 while (!queue.isEmpty()) {
730 final Future<Message<HttpResponse, String>> future = queue.remove();
731 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
732 Assertions.assertNotNull(result);
733 final HttpResponse response1 = result.getHead();
734 Assertions.assertNotNull(response1);
735 Assertions.assertEquals(200, response1.getCode());
736 Assertions.assertNull(result.getBody());
737 }
738 }
739
740 @Test
741 public void testExpectationFailed() throws Exception {
742 final Http1TestServer server = resources.server();
743 final Http1TestClient client = resources.client();
744
745 server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
746
747 @Override
748 protected void handle(
749 final Message<HttpRequest, String> request,
750 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
751 final HttpContext context) throws IOException, HttpException {
752 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
753
754 }
755 });
756 final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
757
758 @Override
759 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
760 final Header h = request.getFirstHeader("password");
761 if (h != null && "secret".equals(h.getValue())) {
762 return null;
763 } else {
764 return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
765 }
766 }
767 }, Http1Config.DEFAULT);
768
769 client.start();
770 final Future<IOSession> sessionFuture = client.requestSession(
771 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
772 final IOSession ioSession = sessionFuture.get();
773 try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
774
775 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
776 request1.addHeader("password", "secret");
777 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
778 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
779 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
780 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
781 Assertions.assertNotNull(result1);
782 final HttpResponse response1 = result1.getHead();
783 Assertions.assertNotNull(response1);
784 Assertions.assertEquals(200, response1.getCode());
785 Assertions.assertEquals("All is well", result1.getBody());
786
787 Assertions.assertTrue(ioSession.isOpen());
788
789 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
790 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
791 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
792 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
793 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
794 Assertions.assertNotNull(result2);
795 final HttpResponse response2 = result2.getHead();
796 Assertions.assertNotNull(response2);
797 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
798 Assertions.assertEquals("You shall not pass", result2.getBody());
799
800 Assertions.assertTrue(ioSession.isOpen());
801
802 final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
803 request3.addHeader("password", "secret");
804 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
805 new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
806 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
807 final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
808 Assertions.assertNotNull(result3);
809 final HttpResponse response3 = result3.getHead();
810 Assertions.assertNotNull(response3);
811 Assertions.assertEquals(200, response3.getCode());
812 Assertions.assertEquals("All is well", result3.getBody());
813
814 Assertions.assertTrue(ioSession.isOpen());
815
816 final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
817 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
818 new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
819 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
820 final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
821 Assertions.assertNotNull(result4);
822 final HttpResponse response4 = result4.getHead();
823 Assertions.assertNotNull(response4);
824 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
825 Assertions.assertEquals("You shall not pass", result4.getBody());
826
827 Assertions.assertFalse(ioSession.isOpen());
828 }
829 }
830
831 @Test
832 public void testExpectationFailedCloseConnection() throws Exception {
833 final Http1TestServer server = resources.server();
834 final Http1TestClient client = resources.client();
835
836 server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
837
838 @Override
839 protected void handle(
840 final Message<HttpRequest, String> request,
841 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
842 final HttpContext context) throws IOException, HttpException {
843 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
844
845 }
846 });
847 final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
848
849 @Override
850 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
851 final Header h = request.getFirstHeader("password");
852 if (h != null && "secret".equals(h.getValue())) {
853 return null;
854 } else {
855 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
856 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
857 return new BasicResponseProducer(response, "You shall not pass");
858 }
859 }
860 }, Http1Config.DEFAULT);
861
862 client.start();
863 final Future<IOSession> sessionFuture = client.requestSession(
864 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
865 final IOSession ioSession = sessionFuture.get();
866 try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
867
868 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
869 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
870 new BasicRequestProducer(request1, new MultiBinEntityProducer(
871 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
872 100000,
873 ContentType.TEXT_PLAIN)),
874 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
875 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
876 Assertions.assertNotNull(result1);
877 final HttpResponse response1 = result1.getHead();
878 Assertions.assertNotNull(response1);
879 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
880 Assertions.assertNotNull("You shall not pass", result1.getBody());
881
882 Assertions.assertFalse(streamEndpoint.isOpen());
883 }
884 }
885
886 @Test
887 public void testDelayedExpectationVerification() throws Exception {
888 final Http1TestServer server = resources.server();
889 final Http1TestClient client = resources.client();
890
891 server.register("*", () -> new AsyncServerExchangeHandler() {
892
893 private final Random random = new Random(System.currentTimeMillis());
894 private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
895 "All is well");
896
897 @Override
898 public void handleRequest(
899 final HttpRequest request,
900 final EntityDetails entityDetails,
901 final ResponseChannel responseChannel,
902 final HttpContext context) throws HttpException, IOException {
903
904 Executors.newSingleThreadExecutor().execute(() -> {
905 try {
906 if (entityDetails != null) {
907 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
908 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
909 Thread.sleep(random.nextInt(1000));
910 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
911 }
912 final HttpResponse response = new BasicHttpResponse(200);
913 synchronized (entityProducer) {
914 responseChannel.sendResponse(response, entityProducer, context);
915 }
916 }
917 } catch (final Exception ignore) {
918
919 }
920 });
921
922 }
923
924 @Override
925 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
926 capacityChannel.update(Integer.MAX_VALUE);
927 }
928
929 @Override
930 public void consume(final ByteBuffer src) throws IOException {
931 }
932
933 @Override
934 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
935 }
936
937 @Override
938 public int available() {
939 synchronized (entityProducer) {
940 return entityProducer.available();
941 }
942 }
943
944 @Override
945 public void produce(final DataStreamChannel channel) throws IOException {
946 synchronized (entityProducer) {
947 entityProducer.produce(channel);
948 }
949 }
950
951 @Override
952 public void failed(final Exception cause) {
953 }
954
955 @Override
956 public void releaseResources() {
957 }
958
959 });
960 final InetSocketAddress serverEndpoint = server.start();
961
962 client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
963 final Future<ClientSessionEndpoint> connectFuture = client.connect(
964 "localhost", serverEndpoint.getPort(), TIMEOUT);
965 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
966
967 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
968 for (int i = 0; i < 5; i++) {
969 queue.add(streamEndpoint.execute(
970 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
971 AsyncEntityProducers.create("Some important message")),
972 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
973 }
974 while (!queue.isEmpty()) {
975 final Future<Message<HttpResponse, String>> future = queue.remove();
976 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
977 Assertions.assertNotNull(result);
978 final HttpResponse response = result.getHead();
979 Assertions.assertNotNull(response);
980 Assertions.assertEquals(200, response.getCode());
981 Assertions.assertNotNull("All is well", result.getBody());
982 }
983 }
984
985 @Test
986 public void testPrematureResponse() throws Exception {
987 final Http1TestServer server = resources.server();
988 final Http1TestClient client = resources.client();
989
990 server.register("*", () -> new AsyncServerExchangeHandler() {
991
992 private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
993
994 @Override
995 public void handleRequest(
996 final HttpRequest request,
997 final EntityDetails entityDetails,
998 final ResponseChannel responseChannel,
999 final HttpContext context) throws HttpException, IOException {
1000 final AsyncResponseProducer producer;
1001 final Header h = request.getFirstHeader("password");
1002 if (h != null && "secret".equals(h.getValue())) {
1003 producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1004 } else {
1005 producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1006 }
1007 responseProducer.set(producer);
1008 producer.sendResponse(responseChannel, context);
1009 }
1010
1011 @Override
1012 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1013 capacityChannel.update(Integer.MAX_VALUE);
1014 }
1015
1016 @Override
1017 public void consume(final ByteBuffer src) throws IOException {
1018 }
1019
1020 @Override
1021 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1022 }
1023
1024 @Override
1025 public int available() {
1026 final AsyncResponseProducer producer = responseProducer.get();
1027 return producer.available();
1028 }
1029
1030 @Override
1031 public void produce(final DataStreamChannel channel) throws IOException {
1032 final AsyncResponseProducer producer = responseProducer.get();
1033 producer.produce(channel);
1034 }
1035
1036 @Override
1037 public void failed(final Exception cause) {
1038 }
1039
1040 @Override
1041 public void releaseResources() {
1042 }
1043 });
1044 final InetSocketAddress serverEndpoint = server.start();
1045
1046 client.start();
1047 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1048 "localhost", serverEndpoint.getPort(), TIMEOUT);
1049 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1050
1051 for (int i = 0; i < 3; i++) {
1052 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1053 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1054 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1055 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1056 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1057 Assertions.assertNotNull(result1);
1058 final HttpResponse response1 = result1.getHead();
1059 Assertions.assertNotNull(response1);
1060 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1061 Assertions.assertNotNull("You shall not pass", result1.getBody());
1062
1063 Assertions.assertTrue(streamEndpoint.isOpen());
1064 }
1065 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1066 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1067 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1068 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1069 100000,
1070 ContentType.TEXT_PLAIN)),
1071 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1072 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1073 Assertions.assertNotNull(result1);
1074 final HttpResponse response1 = result1.getHead();
1075 Assertions.assertNotNull(response1);
1076 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1077 Assertions.assertNotNull("You shall not pass", result1.getBody());
1078 }
1079
1080 @Test
1081 public void testSlowResponseConsumer() throws Exception {
1082 final Http1TestServer server = resources.server();
1083 final Http1TestClient client = resources.client();
1084
1085 server.register("/", () -> new MultiLineResponseHandler("0123456789abcd", 100));
1086 final InetSocketAddress serverEndpoint = server.start();
1087
1088 client.start(Http1Config.custom().setBufferSize(256).build());
1089
1090 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1091 "localhost", serverEndpoint.getPort(), TIMEOUT);
1092 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1093
1094 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1095 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1096 new BasicRequestProducer(request1, null),
1097 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1098
1099 @Override
1100 protected String consumeData(
1101 final ContentType contentType, final InputStream inputStream) throws IOException {
1102 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1103
1104 final StringBuilder buffer = new StringBuilder();
1105 try {
1106 final byte[] tmp = new byte[16];
1107 int l;
1108 while ((l = inputStream.read(tmp)) != -1) {
1109 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1110 Thread.sleep(50);
1111 }
1112 } catch (final InterruptedException ex) {
1113 Thread.currentThread().interrupt();
1114 throw new InterruptedIOException(ex.getMessage());
1115 }
1116 return buffer.toString();
1117 }
1118 }),
1119 null);
1120
1121 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1122 Assertions.assertNotNull(result1);
1123 final HttpResponse response1 = result1.getHead();
1124 Assertions.assertNotNull(response1);
1125 Assertions.assertEquals(200, response1.getCode());
1126 final String s1 = result1.getBody();
1127 Assertions.assertNotNull(s1);
1128 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1129 while (t1.hasMoreTokens()) {
1130 Assertions.assertEquals("0123456789abcd", t1.nextToken());
1131 }
1132 }
1133
1134 @Test
1135 public void testSlowRequestProducer() throws Exception {
1136 final Http1TestServer server = resources.server();
1137 final Http1TestClient client = resources.client();
1138
1139 server.register("*", () -> new EchoHandler(2048));
1140 final InetSocketAddress serverEndpoint = server.start();
1141
1142 client.start();
1143 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1144 "localhost", serverEndpoint.getPort(), TIMEOUT);
1145 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1146
1147 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1148 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1149 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1150
1151 @Override
1152 protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1153 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1154 try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1155 for (int i = 0; i < 500; i++) {
1156 if (i % 100 == 0) {
1157 writer.flush();
1158 Thread.sleep(500);
1159 }
1160 writer.write("0123456789abcdef\r\n");
1161 }
1162 } catch (final InterruptedException ex) {
1163 Thread.currentThread().interrupt();
1164 throw new InterruptedIOException(ex.getMessage());
1165 }
1166 }
1167
1168 }),
1169 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1170 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1171 Assertions.assertNotNull(result1);
1172 final HttpResponse response1 = result1.getHead();
1173 Assertions.assertNotNull(response1);
1174 Assertions.assertEquals(200, response1.getCode());
1175 final String s1 = result1.getBody();
1176 Assertions.assertNotNull(s1);
1177 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1178 while (t1.hasMoreTokens()) {
1179 Assertions.assertEquals("0123456789abcdef", t1.nextToken());
1180 }
1181 }
1182
1183 @Test
1184 public void testSlowResponseProducer() throws Exception {
1185 final Http1TestServer server = resources.server();
1186 final Http1TestClient client = resources.client();
1187
1188 server.register("*", () -> new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1189
1190 @Override
1191 protected void handle(
1192 final HttpRequest request,
1193 final InputStream requestStream,
1194 final HttpResponse response,
1195 final OutputStream responseStream,
1196 final HttpContext context) throws IOException, HttpException {
1197
1198 if (!"/hello".equals(request.getPath())) {
1199 response.setCode(HttpStatus.SC_NOT_FOUND);
1200 return;
1201 }
1202 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1203 response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1204 return;
1205 }
1206 if (requestStream == null) {
1207 return;
1208 }
1209 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1210 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1211 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1212 response.setCode(HttpStatus.SC_OK);
1213 response.setHeader(h1);
1214 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1215 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1216 try {
1217 String l;
1218 int count = 0;
1219 while ((l = reader.readLine()) != null) {
1220 writer.write(l);
1221 writer.write("\r\n");
1222 count++;
1223 if (count % 500 == 0) {
1224 Thread.sleep(500);
1225 }
1226 }
1227 writer.flush();
1228 } catch (final InterruptedException ex) {
1229 Thread.currentThread().interrupt();
1230 throw new InterruptedIOException(ex.getMessage());
1231 }
1232 }
1233 }
1234 });
1235 final InetSocketAddress serverEndpoint = server.start();
1236
1237 client.start(Http1Config.custom().setBufferSize(256).build());
1238
1239 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1240 "localhost", serverEndpoint.getPort(), TIMEOUT);
1241 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1242
1243 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1244 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1245 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1246 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1247 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1248 Assertions.assertNotNull(result1);
1249 final HttpResponse response1 = result1.getHead();
1250 Assertions.assertNotNull(response1);
1251 Assertions.assertEquals(200, response1.getCode());
1252 final String s1 = result1.getBody();
1253 Assertions.assertNotNull(s1);
1254 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1255 while (t1.hasMoreTokens()) {
1256 Assertions.assertEquals("0123456789abcd", t1.nextToken());
1257 }
1258 }
1259
1260 @Test
1261 public void testPipelinedConnectionClose() throws Exception {
1262 final Http1TestServer server = resources.server();
1263 final Http1TestClient client = resources.client();
1264
1265 server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1266 final InetSocketAddress serverEndpoint = server.start();
1267
1268 client.start();
1269 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1270 "localhost", serverEndpoint.getPort(), TIMEOUT);
1271 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1272
1273 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1274 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1275 AsyncEntityProducers.create("Hi there")),
1276 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1277 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1278 request2.addHeader(HttpHeaders.CONNECTION, "close");
1279 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1280 new BasicRequestProducer(request2,
1281 AsyncEntityProducers.create("Hi there")),
1282 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1283 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1284 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1285 AsyncEntityProducers.create("Hi there")),
1286 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1287
1288 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1289 Assertions.assertNotNull(result1);
1290 final HttpResponse response1 = result1.getHead();
1291 final String entity1 = result1.getBody();
1292 Assertions.assertNotNull(response1);
1293 Assertions.assertEquals(200, response1.getCode());
1294 Assertions.assertEquals("Hi back", entity1);
1295
1296 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1297 Assertions.assertNotNull(result2);
1298 final HttpResponse response2 = result2.getHead();
1299 final String entity2 = result2.getBody();
1300 Assertions.assertNotNull(response2);
1301 Assertions.assertEquals(200, response2.getCode());
1302 Assertions.assertEquals("Hi back", entity2);
1303
1304 final Exception exception = Assertions.assertThrows(Exception.class, () ->
1305 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1306 assertThat(exception, CoreMatchers.anyOf(
1307 CoreMatchers.instanceOf(CancellationException.class),
1308 CoreMatchers.instanceOf(ExecutionException.class)));
1309
1310 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1311 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1312 AsyncEntityProducers.create("Hi there")),
1313 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1314 final Exception exception2 = Assertions.assertThrows(Exception.class, () ->
1315 future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1316 assertThat(exception2, CoreMatchers.anyOf(
1317 CoreMatchers.instanceOf(CancellationException.class),
1318 CoreMatchers.instanceOf(ExecutionException.class)));
1319 }
1320
1321 @Test
1322 public void testPipelinedInvalidRequest() throws Exception {
1323 final Http1TestServer server = resources.server();
1324 final Http1TestClient client = resources.client();
1325
1326 server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1327 final InetSocketAddress serverEndpoint = server.start();
1328
1329 client.start();
1330 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1331 "localhost", serverEndpoint.getPort(), TIMEOUT);
1332 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1333
1334 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1335 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1336 AsyncEntityProducers.create("Hi there")),
1337 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1338 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1339 request2.addHeader(HttpHeaders.HOST, "blah:blah");
1340 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1341 new BasicRequestProducer(request2,
1342 AsyncEntityProducers.create("Hi there")),
1343 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1344 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1345 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1346 AsyncEntityProducers.create("Hi there")),
1347 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1348
1349 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1350 Assertions.assertNotNull(result1);
1351 final HttpResponse response1 = result1.getHead();
1352 final String entity1 = result1.getBody();
1353 Assertions.assertNotNull(response1);
1354 Assertions.assertEquals(200, response1.getCode());
1355 Assertions.assertEquals("Hi back", entity1);
1356
1357 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1358 Assertions.assertNotNull(result2);
1359 final HttpResponse response2 = result2.getHead();
1360 final String entity2 = result2.getBody();
1361 Assertions.assertNotNull(response2);
1362 Assertions.assertEquals(400, response2.getCode());
1363 Assertions.assertTrue(entity2.length() > 0);
1364
1365
1366 final Exception exception = Assertions.assertThrows(Exception.class, () ->
1367 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1368 assertThat(exception, CoreMatchers.anyOf(
1369 CoreMatchers.instanceOf(CancellationException.class),
1370 CoreMatchers.instanceOf(ExecutionException.class)));
1371 }
1372
1373 private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1374
1375 private static class BrokenChunkEncoder extends AbstractContentEncoder {
1376
1377 private final CharArrayBuffer lineBuffer;
1378 private boolean done;
1379
1380 BrokenChunkEncoder(
1381 final WritableByteChannel channel,
1382 final SessionOutputBuffer buffer,
1383 final BasicHttpTransportMetrics metrics) {
1384 super(channel, buffer, metrics);
1385 lineBuffer = new CharArrayBuffer(16);
1386 }
1387
1388 @Override
1389 public void complete(final List<? extends Header> trailers) throws IOException {
1390 super.complete(trailers);
1391 }
1392
1393 @Override
1394 public int write(final ByteBuffer src) throws IOException {
1395 final int chunk;
1396 if (!done) {
1397 lineBuffer.clear();
1398 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1399 buffer().writeLine(lineBuffer);
1400 buffer().write(ByteBuffer.wrap(GARBAGE));
1401 done = true;
1402 chunk = GARBAGE.length;
1403 } else {
1404 chunk = 0;
1405 }
1406 final long bytesWritten = buffer().flush(channel());
1407 if (bytesWritten > 0) {
1408 metrics().incrementBytesTransferred(bytesWritten);
1409 }
1410 if (!buffer().hasData()) {
1411 channel().close();
1412 }
1413 return chunk;
1414 }
1415
1416 }
1417
1418 @Test
1419 public void testTruncatedChunk() throws Exception {
1420 final Http1TestServer server = resources.server();
1421 final Http1TestClient client = resources.client();
1422
1423 final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1424 HttpProcessors.server(),
1425 (request, context) -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1426
1427 @Override
1428 protected void handle(
1429 final Message<HttpRequest, String> request,
1430 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1431 final HttpContext context) throws IOException, HttpException {
1432 responseTrigger.submitResponse(
1433 new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1434 }
1435
1436 },
1437 Http1Config.DEFAULT,
1438 CharCodingConfig.DEFAULT,
1439 DefaultConnectionReuseStrategy.INSTANCE,
1440 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1441
1442 @Override
1443 protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1444 final ProtocolIOSession ioSession,
1445 final HttpProcessor httpProcessor,
1446 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1447 final Http1Config http1Config,
1448 final CharCodingConfig connectionConfig,
1449 final ConnectionReuseStrategy connectionReuseStrategy,
1450 final NHttpMessageParser<HttpRequest> incomingMessageParser,
1451 final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1452 final ContentLengthStrategy incomingContentStrategy,
1453 final ContentLengthStrategy outgoingContentStrategy,
1454 final Http1StreamListener streamListener) {
1455 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1456 scheme.id,
1457 http1Config, connectionConfig, connectionReuseStrategy,
1458 incomingMessageParser, outgoingMessageWriter,
1459 incomingContentStrategy, outgoingContentStrategy,
1460 streamListener) {
1461
1462 @Override
1463 protected ContentEncoder createContentEncoder(
1464 final long len,
1465 final WritableByteChannel channel,
1466 final SessionOutputBuffer buffer,
1467 final BasicHttpTransportMetrics metrics) throws HttpException {
1468 if (len == ContentLengthStrategy.CHUNKED) {
1469 return new BrokenChunkEncoder(channel, buffer, metrics);
1470 } else {
1471 return super.createContentEncoder(len, channel, buffer, metrics);
1472 }
1473 }
1474
1475 };
1476 }
1477
1478 });
1479
1480 client.start();
1481 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1482 "localhost", serverEndpoint.getPort(), TIMEOUT);
1483 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1484
1485 final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1486 final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1487
1488 @Override
1489 public void releaseResources() {
1490
1491 }
1492
1493 };
1494 final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1495 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1496 final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
1497 future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1498 final Throwable cause = exception.getCause();
1499 Assertions.assertTrue(cause instanceof MalformedChunkCodingException);
1500 Assertions.assertEquals("garbage", entityConsumer.generateContent());
1501 }
1502
1503 @Test
1504 public void testExceptionInHandler() throws Exception {
1505 final Http1TestServer server = resources.server();
1506 final Http1TestClient client = resources.client();
1507
1508 server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1509
1510 @Override
1511 protected void handle(
1512 final Message<HttpRequest, String> request,
1513 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1514 final HttpContext context) throws IOException, HttpException {
1515 throw new HttpException("Boom");
1516 }
1517 });
1518 final InetSocketAddress serverEndpoint = server.start();
1519
1520 client.start();
1521 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1522 "localhost", serverEndpoint.getPort(), TIMEOUT);
1523 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1524
1525 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1526 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1527 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1528 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1529 Assertions.assertNotNull(result);
1530 final HttpResponse response1 = result.getHead();
1531 final String entity1 = result.getBody();
1532 Assertions.assertNotNull(response1);
1533 Assertions.assertEquals(500, response1.getCode());
1534 Assertions.assertEquals("Boom", entity1);
1535 }
1536
1537 @Test
1538 public void testNoServiceHandler() throws Exception {
1539 final Http1TestServer server = resources.server();
1540 final Http1TestClient client = resources.client();
1541
1542 final InetSocketAddress serverEndpoint = server.start();
1543
1544 client.start();
1545 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1546 "localhost", serverEndpoint.getPort(), TIMEOUT);
1547 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1548
1549 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1550 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1551 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1552 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1553 Assertions.assertNotNull(result);
1554 final HttpResponse response1 = result.getHead();
1555 final String entity1 = result.getBody();
1556 Assertions.assertNotNull(response1);
1557 Assertions.assertEquals(404, response1.getCode());
1558 Assertions.assertEquals("Resource not found", entity1);
1559 }
1560
1561 @Test
1562 public void testResponseNoContent() throws Exception {
1563 final Http1TestServer server = resources.server();
1564 final Http1TestClient client = resources.client();
1565
1566 server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1567
1568 @Override
1569 protected void handle(
1570 final Message<HttpRequest, String> request,
1571 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1572 final HttpContext context) throws IOException, HttpException {
1573 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1574 responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1575 }
1576 });
1577 final InetSocketAddress serverEndpoint = server.start();
1578
1579 client.start();
1580 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1581 "localhost", serverEndpoint.getPort(), TIMEOUT);
1582 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1583
1584 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1585 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1586 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1587 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1588 Assertions.assertNotNull(result);
1589 final HttpResponse response1 = result.getHead();
1590 Assertions.assertNotNull(response1);
1591 Assertions.assertEquals(204, response1.getCode());
1592 Assertions.assertNull(result.getBody());
1593 }
1594
1595 @Test
1596 public void testAbsentHostHeader() throws Exception {
1597 final Http1TestServer server = resources.server();
1598 final Http1TestClient client = resources.client();
1599
1600 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1601 final InetSocketAddress serverEndpoint = server.start();
1602
1603 client.start(new DefaultHttpProcessor(RequestContent.INSTANCE, RequestConnControl.INSTANCE), Http1Config.DEFAULT);
1604
1605 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1606 "localhost", serverEndpoint.getPort(), TIMEOUT);
1607 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1608
1609 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1610 request1.setVersion(HttpVersion.HTTP_1_0);
1611 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1612 new BasicRequestProducer(request1, null),
1613 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1614 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1615 Assertions.assertNotNull(result1);
1616 final HttpResponse response1 = result1.getHead();
1617 Assertions.assertNotNull(response1);
1618 Assertions.assertEquals(200, response1.getCode());
1619 Assertions.assertEquals("Hi there", result1.getBody());
1620
1621 final HttpRequest request2 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1622 request2.setVersion(HttpVersion.HTTP_1_1);
1623 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1624 new BasicRequestProducer(request2, null),
1625 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1626 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1627 Assertions.assertNotNull(result2);
1628 final HttpResponse response2 = result2.getHead();
1629 Assertions.assertNotNull(response2);
1630 Assertions.assertEquals(400, response2.getCode());
1631 Assertions.assertEquals("Host header is absent", result2.getBody());
1632 }
1633
1634 @Test
1635 public void testMessageWithTrailers() throws Exception {
1636 final Http1TestServer server = resources.server();
1637 final Http1TestClient client = resources.client();
1638
1639 server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1640
1641 @Override
1642 protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1643 final HttpRequest request,
1644 final EntityDetails entityDetails,
1645 final HttpContext context) throws HttpException {
1646 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1647 }
1648
1649 @Override
1650 protected void handle(
1651 final Message<HttpRequest, String> requestMessage,
1652 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1653 final HttpContext context) throws HttpException, IOException {
1654 responseTrigger.submitResponse(new BasicResponseProducer(
1655 HttpStatus.SC_OK,
1656 new DigestingEntityProducer("MD5",
1657 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1658 }
1659 });
1660 final InetSocketAddress serverEndpoint = server.start();
1661
1662 client.start();
1663
1664 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1665 "localhost", serverEndpoint.getPort(), TIMEOUT);
1666 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1667
1668 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1669 final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1670 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1671 new BasicRequestProducer(request1, null),
1672 new BasicResponseConsumer<>(entityConsumer), null);
1673 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1674 Assertions.assertNotNull(result1);
1675 final HttpResponse response1 = result1.getHead();
1676 Assertions.assertNotNull(response1);
1677 Assertions.assertEquals(200, response1.getCode());
1678 Assertions.assertEquals("Hello back with some trailers", result1.getBody());
1679
1680 final List<Header> trailers = entityConsumer.getTrailers();
1681 Assertions.assertNotNull(trailers);
1682 Assertions.assertEquals(2, trailers.size());
1683 final Map<String, String> map = new HashMap<>();
1684 for (final Header header: trailers) {
1685 map.put(TextUtils.toLowerCase(header.getName()), header.getValue());
1686 }
1687 final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1688 Assertions.assertEquals("MD5", map.get("digest-algo"));
1689 Assertions.assertEquals(digest, map.get("digest"));
1690 }
1691
1692 @Test
1693 public void testProtocolException() throws Exception {
1694 final Http1TestServer server = resources.server();
1695 final Http1TestClient client = resources.client();
1696
1697 server.register("/boom", () -> new AsyncServerExchangeHandler() {
1698
1699 private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1700
1701 @Override
1702 public void releaseResources() {
1703 entityProducer.releaseResources();
1704 }
1705
1706 @Override
1707 public void handleRequest(
1708 final HttpRequest request,
1709 final EntityDetails entityDetails,
1710 final ResponseChannel responseChannel,
1711 final HttpContext context) throws HttpException, IOException {
1712 final String requestUri = request.getRequestUri();
1713 if (requestUri.endsWith("boom")) {
1714 throw new ProtocolException("Boom!!!");
1715 }
1716 responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1717 }
1718
1719 @Override
1720 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1721 capacityChannel.update(Integer.MAX_VALUE);
1722 }
1723
1724 @Override
1725 public void consume(final ByteBuffer src) throws IOException {
1726 }
1727
1728 @Override
1729 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1730
1731 }
1732
1733 @Override
1734 public int available() {
1735 return entityProducer.available();
1736 }
1737
1738 @Override
1739 public void produce(final DataStreamChannel channel) throws IOException {
1740 entityProducer.produce(channel);
1741 }
1742
1743 @Override
1744 public void failed(final Exception cause) {
1745 releaseResources();
1746 }
1747
1748 });
1749
1750 final InetSocketAddress serverEndpoint = server.start();
1751
1752 client.start();
1753 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1754 "localhost", serverEndpoint.getPort(), TIMEOUT);
1755 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1756 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1757 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1758 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1759 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1760 Assertions.assertNotNull(result);
1761 final HttpResponse response1 = result.getHead();
1762 final String entity1 = result.getBody();
1763 Assertions.assertNotNull(response1);
1764 Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1765 Assertions.assertEquals("Boom!!!", entity1);
1766 }
1767
1768 @Test
1769 public void testHeaderTooLarge() throws Exception {
1770 final Http1TestServer server = resources.server();
1771 final Http1TestClient client = resources.client();
1772
1773 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1774 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1775 .setMaxLineLength(100)
1776 .build());
1777 client.start();
1778
1779 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1780 "localhost", serverEndpoint.getPort(), TIMEOUT);
1781 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1782
1783 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1784 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1785 "1234567890123456789012345678901234567890");
1786 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1787 new BasicRequestProducer(request1, null),
1788 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1789 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1790 Assertions.assertNotNull(result1);
1791 final HttpResponse response1 = result1.getHead();
1792 Assertions.assertNotNull(response1);
1793 Assertions.assertEquals(431, response1.getCode());
1794 Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1795 }
1796
1797 @Test
1798 public void testHeaderTooLargePost() throws Exception {
1799 final Http1TestServer server = resources.server();
1800 final Http1TestClient client = resources.client();
1801
1802 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1803 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1804 .setMaxLineLength(100)
1805 .build());
1806 client.start(
1807 new DefaultHttpProcessor(RequestContent.INSTANCE, RequestTargetHost.INSTANCE, RequestConnControl.INSTANCE), null);
1808
1809 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1810 "localhost", serverEndpoint.getPort(), TIMEOUT);
1811 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1812
1813 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1814 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1815 "1234567890123456789012345678901234567890");
1816
1817 final byte[] b = new byte[2048];
1818 for (int i = 0; i < b.length; i++) {
1819 b[i] = (byte) ('a' + i % 10);
1820 }
1821
1822 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1823 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1824 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1825 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1826 Assertions.assertNotNull(result1);
1827 final HttpResponse response1 = result1.getHead();
1828 Assertions.assertNotNull(response1);
1829 Assertions.assertEquals(431, response1.getCode());
1830 Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1831 }
1832
1833 }