1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import static org.hamcrest.MatcherAssert.assertThat;
31
32 import java.io.BufferedReader;
33 import java.io.BufferedWriter;
34 import java.io.IOException;
35 import java.io.InputStream;
36 import java.io.InputStreamReader;
37 import java.io.InterruptedIOException;
38 import java.io.OutputStream;
39 import java.io.OutputStreamWriter;
40 import java.net.InetSocketAddress;
41 import java.net.URI;
42 import java.net.URISyntaxException;
43 import java.nio.ByteBuffer;
44 import java.nio.channels.WritableByteChannel;
45 import java.nio.charset.Charset;
46 import java.nio.charset.StandardCharsets;
47 import java.util.HashMap;
48 import java.util.LinkedList;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.Queue;
52 import java.util.Random;
53 import java.util.StringTokenizer;
54 import java.util.concurrent.CancellationException;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.Future;
58 import java.util.concurrent.atomic.AtomicReference;
59
60 import org.apache.hc.core5.http.ConnectionReuseStrategy;
61 import org.apache.hc.core5.http.ContentLengthStrategy;
62 import org.apache.hc.core5.http.ContentType;
63 import org.apache.hc.core5.http.EntityDetails;
64 import org.apache.hc.core5.http.Header;
65 import org.apache.hc.core5.http.HeaderElements;
66 import org.apache.hc.core5.http.HttpException;
67 import org.apache.hc.core5.http.HttpHeaders;
68 import org.apache.hc.core5.http.HttpHost;
69 import org.apache.hc.core5.http.HttpRequest;
70 import org.apache.hc.core5.http.HttpResponse;
71 import org.apache.hc.core5.http.HttpStatus;
72 import org.apache.hc.core5.http.HttpVersion;
73 import org.apache.hc.core5.http.MalformedChunkCodingException;
74 import org.apache.hc.core5.http.Message;
75 import org.apache.hc.core5.http.Method;
76 import org.apache.hc.core5.http.ProtocolException;
77 import org.apache.hc.core5.http.URIScheme;
78 import org.apache.hc.core5.http.config.CharCodingConfig;
79 import org.apache.hc.core5.http.config.Http1Config;
80 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
81 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
82 import org.apache.hc.core5.http.impl.Http1StreamListener;
83 import org.apache.hc.core5.http.impl.HttpProcessors;
84 import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
85 import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
86 import org.apache.hc.core5.http.message.BasicHttpRequest;
87 import org.apache.hc.core5.http.message.BasicHttpResponse;
88 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
89 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
90 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
91 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
92 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
93 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
94 import org.apache.hc.core5.http.nio.CapacityChannel;
95 import org.apache.hc.core5.http.nio.ContentEncoder;
96 import org.apache.hc.core5.http.nio.DataStreamChannel;
97 import org.apache.hc.core5.http.nio.HandlerFactory;
98 import org.apache.hc.core5.http.nio.NHttpMessageParser;
99 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
100 import org.apache.hc.core5.http.nio.ResponseChannel;
101 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
102 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
103 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
104 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
105 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
106 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
107 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
108 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
109 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
110 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
111 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
112 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
113 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
114 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
115 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
116 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
117 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
118 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
119 import org.apache.hc.core5.http.protocol.HttpContext;
120 import org.apache.hc.core5.http.protocol.HttpProcessor;
121 import org.apache.hc.core5.http.protocol.RequestConnControl;
122 import org.apache.hc.core5.http.protocol.RequestContent;
123 import org.apache.hc.core5.http.protocol.RequestTargetHost;
124 import org.apache.hc.core5.http.protocol.RequestValidateHost;
125 import org.apache.hc.core5.reactor.IOSession;
126 import org.apache.hc.core5.reactor.ProtocolIOSession;
127 import org.apache.hc.core5.testing.SSLTestContexts;
128 import org.apache.hc.core5.testing.nio.extension.Http1TestResources;
129 import org.apache.hc.core5.util.CharArrayBuffer;
130 import org.apache.hc.core5.util.TextUtils;
131 import org.apache.hc.core5.util.Timeout;
132 import org.hamcrest.CoreMatchers;
133 import org.junit.jupiter.api.Assertions;
134 import org.junit.jupiter.api.Test;
135 import org.junit.jupiter.api.extension.RegisterExtension;
136
137 public abstract class Http1IntegrationTest {
138
139 private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
140 private static final Timeout LONG_TIMEOUT = Timeout.ofMinutes(2);
141
142 private final URIScheme scheme;
143
144 @RegisterExtension
145 private final Http1TestResources resources;
146
147 public Http1IntegrationTest(final URIScheme scheme) {
148 this.scheme = scheme;
149 this.resources = new Http1TestResources(scheme, TIMEOUT);
150 }
151
152 private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
153 try {
154 return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
155 } catch (final URISyntaxException e) {
156 throw new IllegalStateException();
157 }
158 }
159
160 @Test
161 public void testSimpleGet() throws Exception {
162 final Http1TestServer server = resources.server();
163 final Http1TestClient client = resources.client();
164
165 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
166 final InetSocketAddress serverEndpoint = server.start();
167
168 client.start();
169 final Future<ClientSessionEndpoint> connectFuture = client.connect(
170 "localhost", serverEndpoint.getPort(), TIMEOUT);
171 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
172
173 for (int i = 0; i < 5; i++) {
174 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
175 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
176 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
177 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
178 Assertions.assertNotNull(result);
179 final HttpResponse response1 = result.getHead();
180 final String entity1 = result.getBody();
181 Assertions.assertNotNull(response1);
182 Assertions.assertEquals(200, response1.getCode());
183 Assertions.assertEquals("Hi there", entity1);
184 }
185 }
186
187 @Test
188 public void testSimpleGetConnectionClose() throws Exception {
189 final Http1TestServer server = resources.server();
190 final Http1TestClient client = resources.client();
191
192 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
193 final InetSocketAddress serverEndpoint = server.start();
194
195 client.start();
196 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
197 for (int i = 0; i < 5; i++) {
198 final Future<ClientSessionEndpoint> connectFuture = client.connect(
199 "localhost", serverEndpoint.getPort(), TIMEOUT);
200 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
201 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
202 AsyncRequestBuilder.get(requestURI)
203 .addHeader(HttpHeaders.CONNECTION, "close")
204 .build(),
205 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
206 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
207 Assertions.assertNotNull(result);
208 final HttpResponse response1 = result.getHead();
209 final String entity1 = result.getBody();
210 Assertions.assertNotNull(response1);
211 Assertions.assertEquals(200, response1.getCode());
212 Assertions.assertEquals("Hi there", entity1);
213 }
214 }
215 }
216
217 @Test
218 public void testSimpleGetIdentityTransfer() throws Exception {
219 final Http1TestServer server = resources.server();
220 final Http1TestClient client = resources.client();
221
222 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
223 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
224 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
225
226 client.start();
227
228 final int reqNo = 5;
229
230 for (int i = 0; i < reqNo; i++) {
231 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
232 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
233
234 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
235 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
236 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
237 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
238
239 streamEndpoint.close();
240
241 Assertions.assertNotNull(result);
242 final HttpResponse response = result.getHead();
243 final String entity = result.getBody();
244 Assertions.assertNotNull(response);
245 Assertions.assertEquals(200, response.getCode());
246 Assertions.assertEquals("Hi there", entity);
247 }
248
249 }
250
251 @Test
252 public void testPostIdentityTransfer() throws Exception {
253 final Http1TestServer server = resources.server();
254 final Http1TestClient client = resources.client();
255
256 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
257 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
258 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
259
260 client.start();
261
262 final int reqNo = 5;
263
264 for (int i = 0; i < reqNo; i++) {
265 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
266 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
267
268 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
269 new BasicRequestProducer(Method.POST,
270 createRequestURI(serverEndpoint, "/hello"),
271 new MultiLineEntityProducer("Hello", 16 * i)),
272 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
273 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
274
275 streamEndpoint.close();
276
277 Assertions.assertNotNull(result);
278 final HttpResponse response = result.getHead();
279 final String entity = result.getBody();
280 Assertions.assertNotNull(response);
281 Assertions.assertEquals(200, response.getCode());
282 Assertions.assertEquals("Hi there", entity);
283 }
284 }
285
286 @Test
287 public void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
288 final Http1TestServer server = resources.server();
289 final Http1TestClient client = resources.client();
290
291 server.register("/hello", () -> new ImmediateResponseExchangeHandler(500, "Go away"));
292 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
293 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
294
295 client.start();
296
297 final int reqNo = 5;
298
299 for (int i = 0; i < reqNo; i++) {
300 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
301 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
302
303 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
304 new BasicRequestProducer(Method.POST,
305 createRequestURI(serverEndpoint, "/hello"),
306 new MultiLineEntityProducer("Hello", 16 * i)),
307 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
308 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
309
310 streamEndpoint.close();
311
312 Assertions.assertNotNull(result);
313 final HttpResponse response = result.getHead();
314 final String entity = result.getBody();
315 Assertions.assertNotNull(response);
316 Assertions.assertEquals(500, response.getCode());
317 Assertions.assertEquals("Go away", entity);
318 }
319 }
320
321 @Test
322 public void testSimpleGetsPipelined() throws Exception {
323 final Http1TestServer server = resources.server();
324 final Http1TestClient client = resources.client();
325
326 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
327 final InetSocketAddress serverEndpoint = server.start();
328
329 client.start();
330 final Future<ClientSessionEndpoint> connectFuture = client.connect(
331 "localhost", serverEndpoint.getPort(), TIMEOUT);
332 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
333
334 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
335 for (int i = 0; i < 5; i++) {
336 queue.add(streamEndpoint.execute(
337 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
338 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
339 }
340 while (!queue.isEmpty()) {
341 final Future<Message<HttpResponse, String>> future = queue.remove();
342 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
343 Assertions.assertNotNull(result);
344 final HttpResponse response = result.getHead();
345 final String entity = result.getBody();
346 Assertions.assertNotNull(response);
347 Assertions.assertEquals(200, response.getCode());
348 Assertions.assertEquals("Hi there", entity);
349 }
350 }
351
352 @Test
353 public void testLargeGet() throws Exception {
354 final Http1TestServer server = resources.server();
355 final Http1TestClient client = resources.client();
356
357 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 5000));
358 final InetSocketAddress serverEndpoint = server.start();
359
360 client.start();
361 final Future<ClientSessionEndpoint> connectFuture = client.connect(
362 "localhost", serverEndpoint.getPort(), TIMEOUT);
363 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
364
365 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
366 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
367 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
368
369 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
370 Assertions.assertNotNull(result1);
371 final HttpResponse response1 = result1.getHead();
372 Assertions.assertNotNull(response1);
373 Assertions.assertEquals(200, response1.getCode());
374 final String s1 = result1.getBody();
375 Assertions.assertNotNull(s1);
376 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
377 while (t1.hasMoreTokens()) {
378 Assertions.assertEquals("0123456789abcdef", t1.nextToken());
379 }
380
381 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
382 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
383 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
384
385 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
386 Assertions.assertNotNull(result2);
387 final HttpResponse response2 = result2.getHead();
388 Assertions.assertNotNull(response2);
389 Assertions.assertEquals(200, response2.getCode());
390 final String s2 = result2.getBody();
391 Assertions.assertNotNull(s2);
392 final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
393 while (t2.hasMoreTokens()) {
394 Assertions.assertEquals("0123456789abcdef", t2.nextToken());
395 }
396 }
397
398 @Test
399 public void testLargeGetsPipelined() throws Exception {
400 final Http1TestServer server = resources.server();
401 final Http1TestClient client = resources.client();
402
403 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
404 final InetSocketAddress serverEndpoint = server.start();
405
406 client.start();
407 final Future<ClientSessionEndpoint> connectFuture = client.connect(
408 "localhost", serverEndpoint.getPort(), TIMEOUT);
409 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
410
411 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
412 for (int i = 0; i < 5; i++) {
413 queue.add(streamEndpoint.execute(
414 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
415 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
416 }
417 while (!queue.isEmpty()) {
418 final Future<Message<HttpResponse, String>> future = queue.remove();
419 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
420 Assertions.assertNotNull(result);
421 final HttpResponse response = result.getHead();
422 Assertions.assertNotNull(response);
423 Assertions.assertEquals(200, response.getCode());
424 final String entity = result.getBody();
425 Assertions.assertNotNull(entity);
426 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
427 while (t.hasMoreTokens()) {
428 Assertions.assertEquals("0123456789abcdef", t.nextToken());
429 }
430 }
431 }
432
433 @Test
434 public void testBasicPost() throws Exception {
435 final Http1TestServer server = resources.server();
436 final Http1TestClient client = resources.client();
437
438 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
439 final InetSocketAddress serverEndpoint = server.start();
440
441 client.start();
442 final Future<ClientSessionEndpoint> connectFuture = client.connect(
443 "localhost", serverEndpoint.getPort(), TIMEOUT);
444 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
445
446 for (int i = 0; i < 5; i++) {
447 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
448 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
449 AsyncEntityProducers.create("Hi there")),
450 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
451 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
452 Assertions.assertNotNull(result);
453 final HttpResponse response1 = result.getHead();
454 final String entity1 = result.getBody();
455 Assertions.assertNotNull(response1);
456 Assertions.assertEquals(200, response1.getCode());
457 Assertions.assertEquals("Hi back", entity1);
458 }
459 }
460
461 @Test
462 public void testBasicPostPipelined() throws Exception {
463 final Http1TestServer server = resources.server();
464 final Http1TestClient client = resources.client();
465
466 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
467 final InetSocketAddress serverEndpoint = server.start();
468
469 client.start();
470 final Future<ClientSessionEndpoint> connectFuture = client.connect(
471 "localhost", serverEndpoint.getPort(), TIMEOUT);
472 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
473
474 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
475 for (int i = 0; i < 5; i++) {
476 queue.add(streamEndpoint.execute(
477 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
478 AsyncEntityProducers.create("Hi there")),
479 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
480 }
481 while (!queue.isEmpty()) {
482 final Future<Message<HttpResponse, String>> future = queue.remove();
483 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
484 Assertions.assertNotNull(result);
485 final HttpResponse response = result.getHead();
486 final String entity = result.getBody();
487 Assertions.assertNotNull(response);
488 Assertions.assertEquals(200, response.getCode());
489 Assertions.assertEquals("Hi back", entity);
490 }
491 }
492
493 @Test
494 public void testHttp10Post() throws Exception {
495 final Http1TestServer server = resources.server();
496 final Http1TestClient client = resources.client();
497
498 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
499 final InetSocketAddress serverEndpoint = server.start();
500
501 client.start();
502 final Future<ClientSessionEndpoint> connectFuture = client.connect(
503 "localhost", serverEndpoint.getPort(), TIMEOUT);
504 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
505
506 for (int i = 0; i < 5; i++) {
507 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
508 request.setVersion(HttpVersion.HTTP_1_0);
509 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
510 new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
511 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
512 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
513 Assertions.assertNotNull(result);
514 final HttpResponse response1 = result.getHead();
515 final String entity1 = result.getBody();
516 Assertions.assertNotNull(response1);
517 Assertions.assertEquals(200, response1.getCode());
518 Assertions.assertEquals("Hi back", entity1);
519 }
520 }
521
522 @Test
523 public void testNoEntityPost() throws Exception {
524 final Http1TestServer server = resources.server();
525 final Http1TestClient client = resources.client();
526
527 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
528 final InetSocketAddress serverEndpoint = server.start();
529
530 client.start();
531 final Future<ClientSessionEndpoint> connectFuture = client.connect(
532 "localhost", serverEndpoint.getPort(), TIMEOUT);
533 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
534
535 for (int i = 0; i < 5; i++) {
536 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
537 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
538 new BasicRequestProducer(request, null),
539 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
540 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
541 Assertions.assertNotNull(result);
542 final HttpResponse response1 = result.getHead();
543 final String entity1 = result.getBody();
544 Assertions.assertNotNull(response1);
545 Assertions.assertEquals(200, response1.getCode());
546 Assertions.assertEquals("Hi back", entity1);
547 }
548 }
549
550 @Test
551 public void testLargePost() throws Exception {
552 final Http1TestServer server = resources.server();
553 final Http1TestClient client = resources.client();
554
555 server.register("*", () -> new EchoHandler(2048));
556 final InetSocketAddress serverEndpoint = server.start();
557
558 client.start();
559 final Future<ClientSessionEndpoint> connectFuture = client.connect(
560 "localhost", serverEndpoint.getPort(), TIMEOUT);
561 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
562
563 for (int i = 0; i < 5; i++) {
564 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
565 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
566 new MultiLineEntityProducer("0123456789abcdef", 5000)),
567 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
568 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
569 Assertions.assertNotNull(result);
570 final HttpResponse response = result.getHead();
571 Assertions.assertNotNull(response);
572 Assertions.assertEquals(200, response.getCode());
573 final String entity = result.getBody();
574 Assertions.assertNotNull(entity);
575 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
576 while (t.hasMoreTokens()) {
577 Assertions.assertEquals("0123456789abcdef", t.nextToken());
578 }
579 }
580 }
581
582 @Test
583 public void testPostsPipelinedLargeResponse() throws Exception {
584 final Http1TestServer server = resources.server();
585 final Http1TestClient client = resources.client();
586
587 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
588 final InetSocketAddress serverEndpoint = server.start();
589
590 client.start();
591 final Future<ClientSessionEndpoint> connectFuture = client.connect(
592 "localhost", serverEndpoint.getPort(), TIMEOUT);
593 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
594
595 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
596 for (int i = 0; i < 2; i++) {
597 queue.add(streamEndpoint.execute(
598 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
599 AsyncEntityProducers.create("Hi there")),
600 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
601 }
602 while (!queue.isEmpty()) {
603 final Future<Message<HttpResponse, String>> future = queue.remove();
604 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
605 Assertions.assertNotNull(result);
606 final HttpResponse response = result.getHead();
607 Assertions.assertNotNull(response);
608 Assertions.assertEquals(200, response.getCode());
609 final String entity = result.getBody();
610 Assertions.assertNotNull(entity);
611 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
612 while (t.hasMoreTokens()) {
613 Assertions.assertEquals("0123456789abcdef", t.nextToken());
614 }
615 }
616 }
617
618
619 @Test
620 public void testLargePostsPipelined() throws Exception {
621 final Http1TestServer server = resources.server();
622 final Http1TestClient client = resources.client();
623
624 server.register("*", () -> new EchoHandler(2048));
625 final InetSocketAddress serverEndpoint = server.start();
626
627 client.start();
628 final Future<ClientSessionEndpoint> connectFuture = client.connect(
629 "localhost", serverEndpoint.getPort(), TIMEOUT);
630 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
631
632 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
633 for (int i = 0; i < 5; i++) {
634 queue.add(streamEndpoint.execute(
635 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
636 new MultiLineEntityProducer("0123456789abcdef", 5000)),
637 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
638 }
639 while (!queue.isEmpty()) {
640 final Future<Message<HttpResponse, String>> future = queue.remove();
641 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
642 Assertions.assertNotNull(result);
643 final HttpResponse response = result.getHead();
644 Assertions.assertNotNull(response);
645 Assertions.assertEquals(200, response.getCode());
646 final String entity = result.getBody();
647 Assertions.assertNotNull(entity);
648 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
649 while (t.hasMoreTokens()) {
650 Assertions.assertEquals("0123456789abcdef", t.nextToken());
651 }
652 }
653 }
654
655 @Test
656 public void testSimpleHead() throws Exception {
657 final Http1TestServer server = resources.server();
658 final Http1TestClient client = resources.client();
659
660 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
661 final InetSocketAddress serverEndpoint = server.start();
662
663 client.start();
664 final Future<ClientSessionEndpoint> connectFuture = client.connect(
665 "localhost", serverEndpoint.getPort(), TIMEOUT);
666 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
667
668 for (int i = 0; i < 5; i++) {
669 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
670 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
671 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
672 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
673 Assertions.assertNotNull(result);
674 final HttpResponse response1 = result.getHead();
675 Assertions.assertNotNull(response1);
676 Assertions.assertEquals(200, response1.getCode());
677 Assertions.assertNull(result.getBody());
678 }
679 }
680
681 @Test
682 public void testSimpleHeadConnectionClose() throws Exception {
683 final Http1TestServer server = resources.server();
684 final Http1TestClient client = resources.client();
685
686 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
687 final InetSocketAddress serverEndpoint = server.start();
688
689 client.start();
690 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
691 for (int i = 0; i < 5; i++) {
692 final Future<ClientSessionEndpoint> connectFuture = client.connect(
693 "localhost", serverEndpoint.getPort(), TIMEOUT);
694 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
695 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
696 AsyncRequestBuilder.head(requestURI)
697 .addHeader(HttpHeaders.CONNECTION, "close")
698 .build(),
699 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
700 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
701 Assertions.assertNotNull(result);
702 final HttpResponse response1 = result.getHead();
703 Assertions.assertNotNull(response1);
704 Assertions.assertEquals(200, response1.getCode());
705 Assertions.assertNull(result.getBody());
706 }
707 }
708 }
709
710 @Test
711 public void testHeadPipelined() throws Exception {
712 final Http1TestServer server = resources.server();
713 final Http1TestClient client = resources.client();
714
715 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
716 final InetSocketAddress serverEndpoint = server.start();
717
718 client.start();
719 final Future<ClientSessionEndpoint> connectFuture = client.connect(
720 "localhost", serverEndpoint.getPort(), TIMEOUT);
721 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
722
723 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
724 for (int i = 0; i < 5; i++) {
725 queue.add(streamEndpoint.execute(
726 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
727 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
728 }
729 while (!queue.isEmpty()) {
730 final Future<Message<HttpResponse, String>> future = queue.remove();
731 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
732 Assertions.assertNotNull(result);
733 final HttpResponse response1 = result.getHead();
734 Assertions.assertNotNull(response1);
735 Assertions.assertEquals(200, response1.getCode());
736 Assertions.assertNull(result.getBody());
737 }
738 }
739
740 @Test
741 public void testExpectationFailed() throws Exception {
742 final Http1TestServer server = resources.server();
743 final Http1TestClient client = resources.client();
744
745 server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
746
747 @Override
748 protected void handle(
749 final Message<HttpRequest, String> request,
750 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
751 final HttpContext context) throws IOException, HttpException {
752 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
753
754 }
755 });
756 final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
757
758 @Override
759 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
760 final Header h = request.getFirstHeader("password");
761 if (h != null && "secret".equals(h.getValue())) {
762 return null;
763 } else {
764 return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
765 }
766 }
767 }, Http1Config.DEFAULT);
768
769 client.start();
770 final Future<IOSession> sessionFuture = client.requestSession(
771 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
772 final IOSession ioSession = sessionFuture.get();
773 final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
774
775 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
776 request1.addHeader("password", "secret");
777 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
778 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
779 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
780 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
781 Assertions.assertNotNull(result1);
782 final HttpResponse response1 = result1.getHead();
783 Assertions.assertNotNull(response1);
784 Assertions.assertEquals(200, response1.getCode());
785 Assertions.assertEquals("All is well", result1.getBody());
786
787 Assertions.assertTrue(ioSession.isOpen());
788
789 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
790 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
791 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
792 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
793 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
794 Assertions.assertNotNull(result2);
795 final HttpResponse response2 = result2.getHead();
796 Assertions.assertNotNull(response2);
797 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
798 Assertions.assertEquals("You shall not pass", result2.getBody());
799
800 Assertions.assertTrue(ioSession.isOpen());
801
802 final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
803 request3.addHeader("password", "secret");
804 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
805 new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
806 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
807 final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
808 Assertions.assertNotNull(result3);
809 final HttpResponse response3 = result3.getHead();
810 Assertions.assertNotNull(response3);
811 Assertions.assertEquals(200, response3.getCode());
812 Assertions.assertEquals("All is well", result3.getBody());
813
814 Assertions.assertTrue(ioSession.isOpen());
815
816 final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
817 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
818 new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
819 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
820 final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
821 Assertions.assertNotNull(result4);
822 final HttpResponse response4 = result4.getHead();
823 Assertions.assertNotNull(response4);
824 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
825 Assertions.assertEquals("You shall not pass", result4.getBody());
826
827 Assertions.assertFalse(ioSession.isOpen());
828 }
829
830 @Test
831 public void testExpectationFailedCloseConnection() throws Exception {
832 final Http1TestServer server = resources.server();
833 final Http1TestClient client = resources.client();
834
835 server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
836
837 @Override
838 protected void handle(
839 final Message<HttpRequest, String> request,
840 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
841 final HttpContext context) throws IOException, HttpException {
842 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
843
844 }
845 });
846 final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
847
848 @Override
849 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
850 final Header h = request.getFirstHeader("password");
851 if (h != null && "secret".equals(h.getValue())) {
852 return null;
853 } else {
854 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
855 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
856 return new BasicResponseProducer(response, "You shall not pass");
857 }
858 }
859 }, Http1Config.DEFAULT);
860
861 client.start();
862 final Future<IOSession> sessionFuture = client.requestSession(
863 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
864 final IOSession ioSession = sessionFuture.get();
865 final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
866
867 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
868 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
869 new BasicRequestProducer(request1, new MultiBinEntityProducer(
870 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
871 100000,
872 ContentType.TEXT_PLAIN)),
873 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
874 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
875 Assertions.assertNotNull(result1);
876 final HttpResponse response1 = result1.getHead();
877 Assertions.assertNotNull(response1);
878 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
879 Assertions.assertNotNull("You shall not pass", result1.getBody());
880
881 Assertions.assertFalse(streamEndpoint.isOpen());
882 }
883
884 @Test
885 public void testDelayedExpectationVerification() throws Exception {
886 final Http1TestServer server = resources.server();
887 final Http1TestClient client = resources.client();
888
889 server.register("*", () -> new AsyncServerExchangeHandler() {
890
891 private final Random random = new Random(System.currentTimeMillis());
892 private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
893 "All is well");
894
895 @Override
896 public void handleRequest(
897 final HttpRequest request,
898 final EntityDetails entityDetails,
899 final ResponseChannel responseChannel,
900 final HttpContext context) throws HttpException, IOException {
901
902 Executors.newSingleThreadExecutor().execute(() -> {
903 try {
904 if (entityDetails != null) {
905 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
906 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
907 Thread.sleep(random.nextInt(1000));
908 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
909 }
910 final HttpResponse response = new BasicHttpResponse(200);
911 synchronized (entityProducer) {
912 responseChannel.sendResponse(response, entityProducer, context);
913 }
914 }
915 } catch (final Exception ignore) {
916
917 }
918 });
919
920 }
921
922 @Override
923 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
924 capacityChannel.update(Integer.MAX_VALUE);
925 }
926
927 @Override
928 public void consume(final ByteBuffer src) throws IOException {
929 }
930
931 @Override
932 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
933 }
934
935 @Override
936 public int available() {
937 synchronized (entityProducer) {
938 return entityProducer.available();
939 }
940 }
941
942 @Override
943 public void produce(final DataStreamChannel channel) throws IOException {
944 synchronized (entityProducer) {
945 entityProducer.produce(channel);
946 }
947 }
948
949 @Override
950 public void failed(final Exception cause) {
951 }
952
953 @Override
954 public void releaseResources() {
955 }
956
957 });
958 final InetSocketAddress serverEndpoint = server.start();
959
960 client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
961 final Future<ClientSessionEndpoint> connectFuture = client.connect(
962 "localhost", serverEndpoint.getPort(), TIMEOUT);
963 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
964
965 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
966 for (int i = 0; i < 5; i++) {
967 queue.add(streamEndpoint.execute(
968 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
969 AsyncEntityProducers.create("Some important message")),
970 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
971 }
972 while (!queue.isEmpty()) {
973 final Future<Message<HttpResponse, String>> future = queue.remove();
974 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
975 Assertions.assertNotNull(result);
976 final HttpResponse response = result.getHead();
977 Assertions.assertNotNull(response);
978 Assertions.assertEquals(200, response.getCode());
979 Assertions.assertNotNull("All is well", result.getBody());
980 }
981 }
982
983 @Test
984 public void testPrematureResponse() throws Exception {
985 final Http1TestServer server = resources.server();
986 final Http1TestClient client = resources.client();
987
988 server.register("*", () -> new AsyncServerExchangeHandler() {
989
990 private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
991
992 @Override
993 public void handleRequest(
994 final HttpRequest request,
995 final EntityDetails entityDetails,
996 final ResponseChannel responseChannel,
997 final HttpContext context) throws HttpException, IOException {
998 final AsyncResponseProducer producer;
999 final Header h = request.getFirstHeader("password");
1000 if (h != null && "secret".equals(h.getValue())) {
1001 producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1002 } else {
1003 producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1004 }
1005 responseProducer.set(producer);
1006 producer.sendResponse(responseChannel, context);
1007 }
1008
1009 @Override
1010 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1011 capacityChannel.update(Integer.MAX_VALUE);
1012 }
1013
1014 @Override
1015 public void consume(final ByteBuffer src) throws IOException {
1016 }
1017
1018 @Override
1019 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1020 }
1021
1022 @Override
1023 public int available() {
1024 final AsyncResponseProducer producer = responseProducer.get();
1025 return producer.available();
1026 }
1027
1028 @Override
1029 public void produce(final DataStreamChannel channel) throws IOException {
1030 final AsyncResponseProducer producer = responseProducer.get();
1031 producer.produce(channel);
1032 }
1033
1034 @Override
1035 public void failed(final Exception cause) {
1036 }
1037
1038 @Override
1039 public void releaseResources() {
1040 }
1041 });
1042 final InetSocketAddress serverEndpoint = server.start();
1043
1044 client.start();
1045 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1046 "localhost", serverEndpoint.getPort(), TIMEOUT);
1047 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1048
1049 for (int i = 0; i < 3; i++) {
1050 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1051 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1052 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1053 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1054 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1055 Assertions.assertNotNull(result1);
1056 final HttpResponse response1 = result1.getHead();
1057 Assertions.assertNotNull(response1);
1058 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1059 Assertions.assertNotNull("You shall not pass", result1.getBody());
1060
1061 Assertions.assertTrue(streamEndpoint.isOpen());
1062 }
1063 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1064 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1065 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1066 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1067 100000,
1068 ContentType.TEXT_PLAIN)),
1069 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1070 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1071 Assertions.assertNotNull(result1);
1072 final HttpResponse response1 = result1.getHead();
1073 Assertions.assertNotNull(response1);
1074 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1075 Assertions.assertNotNull("You shall not pass", result1.getBody());
1076 }
1077
1078 @Test
1079 public void testSlowResponseConsumer() throws Exception {
1080 final Http1TestServer server = resources.server();
1081 final Http1TestClient client = resources.client();
1082
1083 server.register("/", () -> new MultiLineResponseHandler("0123456789abcd", 100));
1084 final InetSocketAddress serverEndpoint = server.start();
1085
1086 client.start(Http1Config.custom().setBufferSize(256).build());
1087
1088 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1089 "localhost", serverEndpoint.getPort(), TIMEOUT);
1090 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1091
1092 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1093 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1094 new BasicRequestProducer(request1, null),
1095 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1096
1097 @Override
1098 protected String consumeData(
1099 final ContentType contentType, final InputStream inputStream) throws IOException {
1100 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1101
1102 final StringBuilder buffer = new StringBuilder();
1103 try {
1104 final byte[] tmp = new byte[16];
1105 int l;
1106 while ((l = inputStream.read(tmp)) != -1) {
1107 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1108 Thread.sleep(50);
1109 }
1110 } catch (final InterruptedException ex) {
1111 Thread.currentThread().interrupt();
1112 throw new InterruptedIOException(ex.getMessage());
1113 }
1114 return buffer.toString();
1115 }
1116 }),
1117 null);
1118
1119 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1120 Assertions.assertNotNull(result1);
1121 final HttpResponse response1 = result1.getHead();
1122 Assertions.assertNotNull(response1);
1123 Assertions.assertEquals(200, response1.getCode());
1124 final String s1 = result1.getBody();
1125 Assertions.assertNotNull(s1);
1126 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1127 while (t1.hasMoreTokens()) {
1128 Assertions.assertEquals("0123456789abcd", t1.nextToken());
1129 }
1130 }
1131
1132 @Test
1133 public void testSlowRequestProducer() throws Exception {
1134 final Http1TestServer server = resources.server();
1135 final Http1TestClient client = resources.client();
1136
1137 server.register("*", () -> new EchoHandler(2048));
1138 final InetSocketAddress serverEndpoint = server.start();
1139
1140 client.start();
1141 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1142 "localhost", serverEndpoint.getPort(), TIMEOUT);
1143 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1144
1145 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1146 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1147 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1148
1149 @Override
1150 protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1151 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1152 try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1153 for (int i = 0; i < 500; i++) {
1154 if (i % 100 == 0) {
1155 writer.flush();
1156 Thread.sleep(500);
1157 }
1158 writer.write("0123456789abcdef\r\n");
1159 }
1160 } catch (final InterruptedException ex) {
1161 Thread.currentThread().interrupt();
1162 throw new InterruptedIOException(ex.getMessage());
1163 }
1164 }
1165
1166 }),
1167 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1168 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1169 Assertions.assertNotNull(result1);
1170 final HttpResponse response1 = result1.getHead();
1171 Assertions.assertNotNull(response1);
1172 Assertions.assertEquals(200, response1.getCode());
1173 final String s1 = result1.getBody();
1174 Assertions.assertNotNull(s1);
1175 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1176 while (t1.hasMoreTokens()) {
1177 Assertions.assertEquals("0123456789abcdef", t1.nextToken());
1178 }
1179 }
1180
1181 @Test
1182 public void testSlowResponseProducer() throws Exception {
1183 final Http1TestServer server = resources.server();
1184 final Http1TestClient client = resources.client();
1185
1186 server.register("*", () -> new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1187
1188 @Override
1189 protected void handle(
1190 final HttpRequest request,
1191 final InputStream requestStream,
1192 final HttpResponse response,
1193 final OutputStream responseStream,
1194 final HttpContext context) throws IOException, HttpException {
1195
1196 if (!"/hello".equals(request.getPath())) {
1197 response.setCode(HttpStatus.SC_NOT_FOUND);
1198 return;
1199 }
1200 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1201 response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1202 return;
1203 }
1204 if (requestStream == null) {
1205 return;
1206 }
1207 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1208 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1209 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1210 response.setCode(HttpStatus.SC_OK);
1211 response.setHeader(h1);
1212 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1213 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1214 try {
1215 String l;
1216 int count = 0;
1217 while ((l = reader.readLine()) != null) {
1218 writer.write(l);
1219 writer.write("\r\n");
1220 count++;
1221 if (count % 500 == 0) {
1222 Thread.sleep(500);
1223 }
1224 }
1225 writer.flush();
1226 } catch (final InterruptedException ex) {
1227 Thread.currentThread().interrupt();
1228 throw new InterruptedIOException(ex.getMessage());
1229 }
1230 }
1231 }
1232 });
1233 final InetSocketAddress serverEndpoint = server.start();
1234
1235 client.start(Http1Config.custom().setBufferSize(256).build());
1236
1237 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1238 "localhost", serverEndpoint.getPort(), TIMEOUT);
1239 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1240
1241 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1242 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1243 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1244 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1245 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1246 Assertions.assertNotNull(result1);
1247 final HttpResponse response1 = result1.getHead();
1248 Assertions.assertNotNull(response1);
1249 Assertions.assertEquals(200, response1.getCode());
1250 final String s1 = result1.getBody();
1251 Assertions.assertNotNull(s1);
1252 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1253 while (t1.hasMoreTokens()) {
1254 Assertions.assertEquals("0123456789abcd", t1.nextToken());
1255 }
1256 }
1257
1258 @Test
1259 public void testPipelinedConnectionClose() throws Exception {
1260 final Http1TestServer server = resources.server();
1261 final Http1TestClient client = resources.client();
1262
1263 server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1264 final InetSocketAddress serverEndpoint = server.start();
1265
1266 client.start();
1267 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1268 "localhost", serverEndpoint.getPort(), TIMEOUT);
1269 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1270
1271 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1272 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1273 AsyncEntityProducers.create("Hi there")),
1274 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1275 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1276 request2.addHeader(HttpHeaders.CONNECTION, "close");
1277 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1278 new BasicRequestProducer(request2,
1279 AsyncEntityProducers.create("Hi there")),
1280 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1281 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1282 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1283 AsyncEntityProducers.create("Hi there")),
1284 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1285
1286 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1287 Assertions.assertNotNull(result1);
1288 final HttpResponse response1 = result1.getHead();
1289 final String entity1 = result1.getBody();
1290 Assertions.assertNotNull(response1);
1291 Assertions.assertEquals(200, response1.getCode());
1292 Assertions.assertEquals("Hi back", entity1);
1293
1294 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1295 Assertions.assertNotNull(result2);
1296 final HttpResponse response2 = result2.getHead();
1297 final String entity2 = result2.getBody();
1298 Assertions.assertNotNull(response2);
1299 Assertions.assertEquals(200, response2.getCode());
1300 Assertions.assertEquals("Hi back", entity2);
1301
1302 final Exception exception = Assertions.assertThrows(Exception.class, () ->
1303 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1304 assertThat(exception, CoreMatchers.anyOf(
1305 CoreMatchers.instanceOf(CancellationException.class),
1306 CoreMatchers.instanceOf(ExecutionException.class)));
1307
1308 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1309 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1310 AsyncEntityProducers.create("Hi there")),
1311 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1312 final Exception exception2 = Assertions.assertThrows(Exception.class, () ->
1313 future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1314 assertThat(exception2, CoreMatchers.anyOf(
1315 CoreMatchers.instanceOf(CancellationException.class),
1316 CoreMatchers.instanceOf(ExecutionException.class)));
1317 }
1318
1319 @Test
1320 public void testPipelinedInvalidRequest() throws Exception {
1321 final Http1TestServer server = resources.server();
1322 final Http1TestClient client = resources.client();
1323
1324 server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1325 final InetSocketAddress serverEndpoint = server.start();
1326
1327 client.start();
1328 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1329 "localhost", serverEndpoint.getPort(), TIMEOUT);
1330 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1331
1332 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1333 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1334 AsyncEntityProducers.create("Hi there")),
1335 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1336 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1337 request2.addHeader(HttpHeaders.HOST, "blah:blah");
1338 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1339 new BasicRequestProducer(request2,
1340 AsyncEntityProducers.create("Hi there")),
1341 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1342 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1343 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1344 AsyncEntityProducers.create("Hi there")),
1345 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1346
1347 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1348 Assertions.assertNotNull(result1);
1349 final HttpResponse response1 = result1.getHead();
1350 final String entity1 = result1.getBody();
1351 Assertions.assertNotNull(response1);
1352 Assertions.assertEquals(200, response1.getCode());
1353 Assertions.assertEquals("Hi back", entity1);
1354
1355 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1356 Assertions.assertNotNull(result2);
1357 final HttpResponse response2 = result2.getHead();
1358 final String entity2 = result2.getBody();
1359 Assertions.assertNotNull(response2);
1360 Assertions.assertEquals(400, response2.getCode());
1361 Assertions.assertTrue(entity2.length() > 0);
1362
1363
1364 final Exception exception = Assertions.assertThrows(Exception.class, () ->
1365 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1366 assertThat(exception, CoreMatchers.anyOf(
1367 CoreMatchers.instanceOf(CancellationException.class),
1368 CoreMatchers.instanceOf(ExecutionException.class)));
1369 }
1370
1371 private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1372
1373 private static class BrokenChunkEncoder extends AbstractContentEncoder {
1374
1375 private final CharArrayBuffer lineBuffer;
1376 private boolean done;
1377
1378 BrokenChunkEncoder(
1379 final WritableByteChannel channel,
1380 final SessionOutputBuffer buffer,
1381 final BasicHttpTransportMetrics metrics) {
1382 super(channel, buffer, metrics);
1383 lineBuffer = new CharArrayBuffer(16);
1384 }
1385
1386 @Override
1387 public void complete(final List<? extends Header> trailers) throws IOException {
1388 super.complete(trailers);
1389 }
1390
1391 @Override
1392 public int write(final ByteBuffer src) throws IOException {
1393 final int chunk;
1394 if (!done) {
1395 lineBuffer.clear();
1396 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1397 buffer().writeLine(lineBuffer);
1398 buffer().write(ByteBuffer.wrap(GARBAGE));
1399 done = true;
1400 chunk = GARBAGE.length;
1401 } else {
1402 chunk = 0;
1403 }
1404 final long bytesWritten = buffer().flush(channel());
1405 if (bytesWritten > 0) {
1406 metrics().incrementBytesTransferred(bytesWritten);
1407 }
1408 if (!buffer().hasData()) {
1409 channel().close();
1410 }
1411 return chunk;
1412 }
1413
1414 }
1415
1416 @Test
1417 public void testTruncatedChunk() throws Exception {
1418 final Http1TestServer server = resources.server();
1419 final Http1TestClient client = resources.client();
1420
1421 final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1422 HttpProcessors.server(),
1423 (request, context) -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1424
1425 @Override
1426 protected void handle(
1427 final Message<HttpRequest, String> request,
1428 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1429 final HttpContext context) throws IOException, HttpException {
1430 responseTrigger.submitResponse(
1431 new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1432 }
1433
1434 },
1435 Http1Config.DEFAULT,
1436 CharCodingConfig.DEFAULT,
1437 DefaultConnectionReuseStrategy.INSTANCE,
1438 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1439
1440 @Override
1441 protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1442 final ProtocolIOSession ioSession,
1443 final HttpProcessor httpProcessor,
1444 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1445 final Http1Config http1Config,
1446 final CharCodingConfig connectionConfig,
1447 final ConnectionReuseStrategy connectionReuseStrategy,
1448 final NHttpMessageParser<HttpRequest> incomingMessageParser,
1449 final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1450 final ContentLengthStrategy incomingContentStrategy,
1451 final ContentLengthStrategy outgoingContentStrategy,
1452 final Http1StreamListener streamListener) {
1453 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1454 scheme.id,
1455 http1Config, connectionConfig, connectionReuseStrategy,
1456 incomingMessageParser, outgoingMessageWriter,
1457 incomingContentStrategy, outgoingContentStrategy,
1458 streamListener) {
1459
1460 @Override
1461 protected ContentEncoder createContentEncoder(
1462 final long len,
1463 final WritableByteChannel channel,
1464 final SessionOutputBuffer buffer,
1465 final BasicHttpTransportMetrics metrics) throws HttpException {
1466 if (len == ContentLengthStrategy.CHUNKED) {
1467 return new BrokenChunkEncoder(channel, buffer, metrics);
1468 } else {
1469 return super.createContentEncoder(len, channel, buffer, metrics);
1470 }
1471 }
1472
1473 };
1474 }
1475
1476 });
1477
1478 client.start();
1479 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1480 "localhost", serverEndpoint.getPort(), TIMEOUT);
1481 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1482
1483 final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1484 final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1485
1486 @Override
1487 public void releaseResources() {
1488
1489 }
1490
1491 };
1492 final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1493 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1494 final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
1495 future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1496 final Throwable cause = exception.getCause();
1497 Assertions.assertTrue(cause instanceof MalformedChunkCodingException);
1498 Assertions.assertEquals("garbage", entityConsumer.generateContent());
1499 }
1500
1501 @Test
1502 public void testExceptionInHandler() throws Exception {
1503 final Http1TestServer server = resources.server();
1504 final Http1TestClient client = resources.client();
1505
1506 server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1507
1508 @Override
1509 protected void handle(
1510 final Message<HttpRequest, String> request,
1511 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1512 final HttpContext context) throws IOException, HttpException {
1513 throw new HttpException("Boom");
1514 }
1515 });
1516 final InetSocketAddress serverEndpoint = server.start();
1517
1518 client.start();
1519 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1520 "localhost", serverEndpoint.getPort(), TIMEOUT);
1521 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1522
1523 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1524 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1525 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1526 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1527 Assertions.assertNotNull(result);
1528 final HttpResponse response1 = result.getHead();
1529 final String entity1 = result.getBody();
1530 Assertions.assertNotNull(response1);
1531 Assertions.assertEquals(500, response1.getCode());
1532 Assertions.assertEquals("Boom", entity1);
1533 }
1534
1535 @Test
1536 public void testNoServiceHandler() throws Exception {
1537 final Http1TestServer server = resources.server();
1538 final Http1TestClient client = resources.client();
1539
1540 final InetSocketAddress serverEndpoint = server.start();
1541
1542 client.start();
1543 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1544 "localhost", serverEndpoint.getPort(), TIMEOUT);
1545 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1546
1547 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1548 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1549 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1550 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1551 Assertions.assertNotNull(result);
1552 final HttpResponse response1 = result.getHead();
1553 final String entity1 = result.getBody();
1554 Assertions.assertNotNull(response1);
1555 Assertions.assertEquals(404, response1.getCode());
1556 Assertions.assertEquals("Resource not found", entity1);
1557 }
1558
1559 @Test
1560 public void testResponseNoContent() throws Exception {
1561 final Http1TestServer server = resources.server();
1562 final Http1TestClient client = resources.client();
1563
1564 server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1565
1566 @Override
1567 protected void handle(
1568 final Message<HttpRequest, String> request,
1569 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1570 final HttpContext context) throws IOException, HttpException {
1571 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1572 responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1573 }
1574 });
1575 final InetSocketAddress serverEndpoint = server.start();
1576
1577 client.start();
1578 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1579 "localhost", serverEndpoint.getPort(), TIMEOUT);
1580 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1581
1582 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1583 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1584 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1585 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1586 Assertions.assertNotNull(result);
1587 final HttpResponse response1 = result.getHead();
1588 Assertions.assertNotNull(response1);
1589 Assertions.assertEquals(204, response1.getCode());
1590 Assertions.assertNull(result.getBody());
1591 }
1592
1593 @Test
1594 public void testAbsentHostHeader() throws Exception {
1595 final Http1TestServer server = resources.server();
1596 final Http1TestClient client = resources.client();
1597
1598 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1599 final InetSocketAddress serverEndpoint = server.start();
1600
1601 client.start(new DefaultHttpProcessor(RequestContent.INSTANCE, RequestConnControl.INSTANCE), Http1Config.DEFAULT);
1602
1603 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1604 "localhost", serverEndpoint.getPort(), TIMEOUT);
1605 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1606
1607 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1608 request1.setVersion(HttpVersion.HTTP_1_0);
1609 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1610 new BasicRequestProducer(request1, null),
1611 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1612 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1613 Assertions.assertNotNull(result1);
1614 final HttpResponse response1 = result1.getHead();
1615 Assertions.assertNotNull(response1);
1616 Assertions.assertEquals(200, response1.getCode());
1617 Assertions.assertEquals("Hi there", result1.getBody());
1618
1619 final HttpRequest request2 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1620 request2.setVersion(HttpVersion.HTTP_1_1);
1621 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1622 new BasicRequestProducer(request2, null),
1623 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1624 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1625 Assertions.assertNotNull(result2);
1626 final HttpResponse response2 = result2.getHead();
1627 Assertions.assertNotNull(response2);
1628 Assertions.assertEquals(400, response2.getCode());
1629 Assertions.assertEquals("Host header is absent", result2.getBody());
1630 }
1631
1632 @Test
1633 public void testMessageWithTrailers() throws Exception {
1634 final Http1TestServer server = resources.server();
1635 final Http1TestClient client = resources.client();
1636
1637 server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1638
1639 @Override
1640 protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1641 final HttpRequest request,
1642 final EntityDetails entityDetails,
1643 final HttpContext context) throws HttpException {
1644 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1645 }
1646
1647 @Override
1648 protected void handle(
1649 final Message<HttpRequest, String> requestMessage,
1650 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1651 final HttpContext context) throws HttpException, IOException {
1652 responseTrigger.submitResponse(new BasicResponseProducer(
1653 HttpStatus.SC_OK,
1654 new DigestingEntityProducer("MD5",
1655 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1656 }
1657 });
1658 final InetSocketAddress serverEndpoint = server.start();
1659
1660 client.start();
1661
1662 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1663 "localhost", serverEndpoint.getPort(), TIMEOUT);
1664 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1665
1666 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1667 final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1668 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1669 new BasicRequestProducer(request1, null),
1670 new BasicResponseConsumer<>(entityConsumer), null);
1671 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1672 Assertions.assertNotNull(result1);
1673 final HttpResponse response1 = result1.getHead();
1674 Assertions.assertNotNull(response1);
1675 Assertions.assertEquals(200, response1.getCode());
1676 Assertions.assertEquals("Hello back with some trailers", result1.getBody());
1677
1678 final List<Header> trailers = entityConsumer.getTrailers();
1679 Assertions.assertNotNull(trailers);
1680 Assertions.assertEquals(2, trailers.size());
1681 final Map<String, String> map = new HashMap<>();
1682 for (final Header header: trailers) {
1683 map.put(TextUtils.toLowerCase(header.getName()), header.getValue());
1684 }
1685 final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1686 Assertions.assertEquals("MD5", map.get("digest-algo"));
1687 Assertions.assertEquals(digest, map.get("digest"));
1688 }
1689
1690 @Test
1691 public void testProtocolException() throws Exception {
1692 final Http1TestServer server = resources.server();
1693 final Http1TestClient client = resources.client();
1694
1695 server.register("/boom", () -> new AsyncServerExchangeHandler() {
1696
1697 private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1698
1699 @Override
1700 public void releaseResources() {
1701 entityProducer.releaseResources();
1702 }
1703
1704 @Override
1705 public void handleRequest(
1706 final HttpRequest request,
1707 final EntityDetails entityDetails,
1708 final ResponseChannel responseChannel,
1709 final HttpContext context) throws HttpException, IOException {
1710 final String requestUri = request.getRequestUri();
1711 if (requestUri.endsWith("boom")) {
1712 throw new ProtocolException("Boom!!!");
1713 }
1714 responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1715 }
1716
1717 @Override
1718 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1719 capacityChannel.update(Integer.MAX_VALUE);
1720 }
1721
1722 @Override
1723 public void consume(final ByteBuffer src) throws IOException {
1724 }
1725
1726 @Override
1727 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1728
1729 }
1730
1731 @Override
1732 public int available() {
1733 return entityProducer.available();
1734 }
1735
1736 @Override
1737 public void produce(final DataStreamChannel channel) throws IOException {
1738 entityProducer.produce(channel);
1739 }
1740
1741 @Override
1742 public void failed(final Exception cause) {
1743 releaseResources();
1744 }
1745
1746 });
1747
1748 final InetSocketAddress serverEndpoint = server.start();
1749
1750 client.start();
1751 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1752 "localhost", serverEndpoint.getPort(), TIMEOUT);
1753 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1754 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1755 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1756 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1757 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1758 Assertions.assertNotNull(result);
1759 final HttpResponse response1 = result.getHead();
1760 final String entity1 = result.getBody();
1761 Assertions.assertNotNull(response1);
1762 Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1763 Assertions.assertEquals("Boom!!!", entity1);
1764 }
1765
1766 @Test
1767 public void testHeaderTooLarge() throws Exception {
1768 final Http1TestServer server = resources.server();
1769 final Http1TestClient client = resources.client();
1770
1771 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1772 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1773 .setMaxLineLength(100)
1774 .build());
1775 client.start();
1776
1777 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1778 "localhost", serverEndpoint.getPort(), TIMEOUT);
1779 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1780
1781 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1782 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1783 "1234567890123456789012345678901234567890");
1784 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1785 new BasicRequestProducer(request1, null),
1786 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1787 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1788 Assertions.assertNotNull(result1);
1789 final HttpResponse response1 = result1.getHead();
1790 Assertions.assertNotNull(response1);
1791 Assertions.assertEquals(431, response1.getCode());
1792 Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1793 }
1794
1795 @Test
1796 public void testHeaderTooLargePost() throws Exception {
1797 final Http1TestServer server = resources.server();
1798 final Http1TestClient client = resources.client();
1799
1800 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1801 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1802 .setMaxLineLength(100)
1803 .build());
1804 client.start(
1805 new DefaultHttpProcessor(RequestContent.INSTANCE, RequestTargetHost.INSTANCE, RequestConnControl.INSTANCE), null);
1806
1807 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1808 "localhost", serverEndpoint.getPort(), TIMEOUT);
1809 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1810
1811 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1812 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1813 "1234567890123456789012345678901234567890");
1814
1815 final byte[] b = new byte[2048];
1816 for (int i = 0; i < b.length; i++) {
1817 b[i] = (byte) ('a' + i % 10);
1818 }
1819
1820 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1821 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1822 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1823 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1824 Assertions.assertNotNull(result1);
1825 final HttpResponse response1 = result1.getHead();
1826 Assertions.assertNotNull(response1);
1827 Assertions.assertEquals(431, response1.getCode());
1828 Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1829 }
1830
1831 }