1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.io.BufferedReader;
31 import java.io.BufferedWriter;
32 import java.io.IOException;
33 import java.io.InputStream;
34 import java.io.InputStreamReader;
35 import java.io.InterruptedIOException;
36 import java.io.OutputStream;
37 import java.io.OutputStreamWriter;
38 import java.net.InetSocketAddress;
39 import java.net.URI;
40 import java.net.URISyntaxException;
41 import java.nio.ByteBuffer;
42 import java.nio.channels.WritableByteChannel;
43 import java.nio.charset.Charset;
44 import java.nio.charset.StandardCharsets;
45 import java.util.Arrays;
46 import java.util.Collection;
47 import java.util.HashMap;
48 import java.util.LinkedList;
49 import java.util.List;
50 import java.util.Locale;
51 import java.util.Map;
52 import java.util.Queue;
53 import java.util.Random;
54 import java.util.StringTokenizer;
55 import java.util.concurrent.CancellationException;
56 import java.util.concurrent.ExecutionException;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.Future;
59 import java.util.concurrent.atomic.AtomicReference;
60
61 import org.apache.hc.core5.function.Decorator;
62 import org.apache.hc.core5.function.Supplier;
63 import org.apache.hc.core5.http.ConnectionReuseStrategy;
64 import org.apache.hc.core5.http.ContentLengthStrategy;
65 import org.apache.hc.core5.http.ContentType;
66 import org.apache.hc.core5.http.EntityDetails;
67 import org.apache.hc.core5.http.Header;
68 import org.apache.hc.core5.http.HeaderElements;
69 import org.apache.hc.core5.http.HttpException;
70 import org.apache.hc.core5.http.HttpHeaders;
71 import org.apache.hc.core5.http.HttpHost;
72 import org.apache.hc.core5.http.HttpRequest;
73 import org.apache.hc.core5.http.HttpResponse;
74 import org.apache.hc.core5.http.HttpStatus;
75 import org.apache.hc.core5.http.HttpVersion;
76 import org.apache.hc.core5.http.MalformedChunkCodingException;
77 import org.apache.hc.core5.http.Message;
78 import org.apache.hc.core5.http.Method;
79 import org.apache.hc.core5.http.ProtocolException;
80 import org.apache.hc.core5.http.URIScheme;
81 import org.apache.hc.core5.http.config.CharCodingConfig;
82 import org.apache.hc.core5.http.config.Http1Config;
83 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
84 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
85 import org.apache.hc.core5.http.impl.Http1StreamListener;
86 import org.apache.hc.core5.http.impl.HttpProcessors;
87 import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
88 import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
89 import org.apache.hc.core5.http.message.BasicHttpRequest;
90 import org.apache.hc.core5.http.message.BasicHttpResponse;
91 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
92 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
93 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
94 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
95 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
96 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
97 import org.apache.hc.core5.http.nio.CapacityChannel;
98 import org.apache.hc.core5.http.nio.ContentEncoder;
99 import org.apache.hc.core5.http.nio.DataStreamChannel;
100 import org.apache.hc.core5.http.nio.HandlerFactory;
101 import org.apache.hc.core5.http.nio.NHttpMessageParser;
102 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
103 import org.apache.hc.core5.http.nio.ResponseChannel;
104 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
105 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
106 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
107 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
108 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
109 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
110 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
111 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
112 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
113 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
114 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
115 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
116 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
117 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
118 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
119 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
120 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
121 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
122 import org.apache.hc.core5.http.protocol.HttpContext;
123 import org.apache.hc.core5.http.protocol.HttpProcessor;
124 import org.apache.hc.core5.http.protocol.RequestConnControl;
125 import org.apache.hc.core5.http.protocol.RequestContent;
126 import org.apache.hc.core5.http.protocol.RequestTargetHost;
127 import org.apache.hc.core5.http.protocol.RequestValidateHost;
128 import org.apache.hc.core5.reactor.IOReactorConfig;
129 import org.apache.hc.core5.reactor.IOSession;
130 import org.apache.hc.core5.reactor.ProtocolIOSession;
131 import org.apache.hc.core5.testing.SSLTestContexts;
132 import org.apache.hc.core5.util.CharArrayBuffer;
133 import org.apache.hc.core5.util.TextUtils;
134 import org.apache.hc.core5.util.TimeValue;
135 import org.apache.hc.core5.util.Timeout;
136 import org.junit.After;
137 import org.junit.Assert;
138 import org.junit.Before;
139 import org.junit.Test;
140 import org.junit.runner.RunWith;
141 import org.junit.runners.Parameterized;
142 import org.slf4j.Logger;
143 import org.slf4j.LoggerFactory;
144
145 @RunWith(Parameterized.class)
146 public class Http1IntegrationTest extends InternalHttp1ServerTestBase {
147
148 private final Logger log = LoggerFactory.getLogger(getClass());
149
150 @Parameterized.Parameters(name = "{0}")
151 public static Collection<Object[]> protocols() {
152 return Arrays.asList(new Object[][]{
153 { URIScheme.HTTP },
154 { URIScheme.HTTPS }
155 });
156 }
157
158 public Http1IntegrationTest(final URIScheme scheme) {
159 super(scheme);
160 }
161
162 private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
163 private static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
164
165 private Http1TestClient client;
166
167 @Before
168 public void setup() throws Exception {
169 log.debug("Starting up test client");
170 client = new Http1TestClient(
171 buildReactorConfig(),
172 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null, null, null);
173 }
174
175 protected IOReactorConfig buildReactorConfig() {
176 return IOReactorConfig.DEFAULT;
177 }
178
179 @After
180 public void cleanup() throws Exception {
181 log.debug("Shutting down test client");
182 if (client != null) {
183 client.shutdown(TimeValue.ofSeconds(5));
184 }
185 }
186
187 private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
188 try {
189 return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
190 } catch (final URISyntaxException e) {
191 throw new IllegalStateException();
192 }
193 }
194
195 @Test
196 public void testSimpleGet() throws Exception {
197 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
198
199 @Override
200 public AsyncServerExchangeHandler get() {
201 return new SingleLineResponseHandler("Hi there");
202 }
203
204 });
205 final InetSocketAddress serverEndpoint = server.start();
206
207 client.start();
208 final Future<ClientSessionEndpoint> connectFuture = client.connect(
209 "localhost", serverEndpoint.getPort(), TIMEOUT);
210 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
211
212 for (int i = 0; i < 5; i++) {
213 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
214 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
215 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
216 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
217 Assert.assertNotNull(result);
218 final HttpResponse response1 = result.getHead();
219 final String entity1 = result.getBody();
220 Assert.assertNotNull(response1);
221 Assert.assertEquals(200, response1.getCode());
222 Assert.assertEquals("Hi there", entity1);
223 }
224 }
225
226 @Test
227 public void testSimpleGetConnectionClose() throws Exception {
228 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
229
230 @Override
231 public AsyncServerExchangeHandler get() {
232 return new SingleLineResponseHandler("Hi there");
233 }
234
235 });
236 final InetSocketAddress serverEndpoint = server.start();
237
238 client.start();
239 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
240 for (int i = 0; i < 5; i++) {
241 final Future<ClientSessionEndpoint> connectFuture = client.connect(
242 "localhost", serverEndpoint.getPort(), TIMEOUT);
243 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
244 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
245 AsyncRequestBuilder.get(requestURI)
246 .addHeader(HttpHeaders.CONNECTION, "close")
247 .build(),
248 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
249 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
250 Assert.assertNotNull(result);
251 final HttpResponse response1 = result.getHead();
252 final String entity1 = result.getBody();
253 Assert.assertNotNull(response1);
254 Assert.assertEquals(200, response1.getCode());
255 Assert.assertEquals("Hi there", entity1);
256 }
257 }
258 }
259
260 @Test
261 public void testSimpleGetIdentityTransfer() throws Exception {
262 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
263
264 @Override
265 public AsyncServerExchangeHandler get() {
266 return new SingleLineResponseHandler("Hi there");
267 }
268
269 });
270 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
271 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
272
273 client.start();
274
275 final int reqNo = 5;
276
277 for (int i = 0; i < reqNo; i++) {
278 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
279 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
280
281 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
282 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
283 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
284 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
285
286 streamEndpoint.close();
287
288 Assert.assertNotNull(result);
289 final HttpResponse response = result.getHead();
290 final String entity = result.getBody();
291 Assert.assertNotNull(response);
292 Assert.assertEquals(200, response.getCode());
293 Assert.assertEquals("Hi there", entity);
294 }
295
296 }
297
298 @Test
299 public void testPostIdentityTransfer() throws Exception {
300 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
301
302 @Override
303 public AsyncServerExchangeHandler get() {
304 return new SingleLineResponseHandler("Hi there");
305 }
306
307 });
308 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
309 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
310
311 client.start();
312
313 final int reqNo = 5;
314
315 for (int i = 0; i < reqNo; i++) {
316 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
317 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
318
319 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
320 new BasicRequestProducer(Method.POST,
321 createRequestURI(serverEndpoint, "/hello"),
322 new MultiLineEntityProducer("Hello", 16 * i)),
323 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
324 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
325
326 streamEndpoint.close();
327
328 Assert.assertNotNull(result);
329 final HttpResponse response = result.getHead();
330 final String entity = result.getBody();
331 Assert.assertNotNull(response);
332 Assert.assertEquals(200, response.getCode());
333 Assert.assertEquals("Hi there", entity);
334 }
335 }
336
337 @Test
338 public void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
339 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
340
341 @Override
342 public AsyncServerExchangeHandler get() {
343 return new ImmediateResponseExchangeHandler(500, "Go away");
344 }
345
346 });
347 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
348 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
349
350 client.start();
351
352 final int reqNo = 5;
353
354 for (int i = 0; i < reqNo; i++) {
355 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
356 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
357
358 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
359 new BasicRequestProducer(Method.POST,
360 createRequestURI(serverEndpoint, "/hello"),
361 new MultiLineEntityProducer("Hello", 16 * i)),
362 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
363 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
364
365 streamEndpoint.close();
366
367 Assert.assertNotNull(result);
368 final HttpResponse response = result.getHead();
369 final String entity = result.getBody();
370 Assert.assertNotNull(response);
371 Assert.assertEquals(500, response.getCode());
372 Assert.assertEquals("Go away", entity);
373 }
374 }
375
376 @Test
377 public void testSimpleGetsPipelined() throws Exception {
378 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
379
380 @Override
381 public AsyncServerExchangeHandler get() {
382 return new SingleLineResponseHandler("Hi there");
383 }
384
385 });
386 final InetSocketAddress serverEndpoint = server.start();
387
388 client.start();
389 final Future<ClientSessionEndpoint> connectFuture = client.connect(
390 "localhost", serverEndpoint.getPort(), TIMEOUT);
391 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
392
393 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
394 for (int i = 0; i < 5; i++) {
395 queue.add(streamEndpoint.execute(
396 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
397 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
398 }
399 while (!queue.isEmpty()) {
400 final Future<Message<HttpResponse, String>> future = queue.remove();
401 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
402 Assert.assertNotNull(result);
403 final HttpResponse response = result.getHead();
404 final String entity = result.getBody();
405 Assert.assertNotNull(response);
406 Assert.assertEquals(200, response.getCode());
407 Assert.assertEquals("Hi there", entity);
408 }
409 }
410
411 @Test
412 public void testLargeGet() throws Exception {
413 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
414
415 @Override
416 public AsyncServerExchangeHandler get() {
417 return new MultiLineResponseHandler("0123456789abcdef", 5000);
418 }
419
420 });
421 final InetSocketAddress serverEndpoint = server.start();
422
423 client.start();
424 final Future<ClientSessionEndpoint> connectFuture = client.connect(
425 "localhost", serverEndpoint.getPort(), TIMEOUT);
426 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
427
428 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
429 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
430 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
431
432 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
433 Assert.assertNotNull(result1);
434 final HttpResponse response1 = result1.getHead();
435 Assert.assertNotNull(response1);
436 Assert.assertEquals(200, response1.getCode());
437 final String s1 = result1.getBody();
438 Assert.assertNotNull(s1);
439 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
440 while (t1.hasMoreTokens()) {
441 Assert.assertEquals("0123456789abcdef", t1.nextToken());
442 }
443
444 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
445 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
446 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
447
448 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
449 Assert.assertNotNull(result2);
450 final HttpResponse response2 = result2.getHead();
451 Assert.assertNotNull(response2);
452 Assert.assertEquals(200, response2.getCode());
453 final String s2 = result2.getBody();
454 Assert.assertNotNull(s2);
455 final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
456 while (t2.hasMoreTokens()) {
457 Assert.assertEquals("0123456789abcdef", t2.nextToken());
458 }
459 }
460
461 @Test
462 public void testLargeGetsPipelined() throws Exception {
463 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
464
465 @Override
466 public AsyncServerExchangeHandler get() {
467 return new MultiLineResponseHandler("0123456789abcdef", 2000);
468 }
469
470 });
471 final InetSocketAddress serverEndpoint = server.start();
472
473 client.start();
474 final Future<ClientSessionEndpoint> connectFuture = client.connect(
475 "localhost", serverEndpoint.getPort(), TIMEOUT);
476 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
477
478 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
479 for (int i = 0; i < 5; i++) {
480 queue.add(streamEndpoint.execute(
481 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
482 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
483 }
484 while (!queue.isEmpty()) {
485 final Future<Message<HttpResponse, String>> future = queue.remove();
486 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
487 Assert.assertNotNull(result);
488 final HttpResponse response = result.getHead();
489 Assert.assertNotNull(response);
490 Assert.assertEquals(200, response.getCode());
491 final String entity = result.getBody();
492 Assert.assertNotNull(entity);
493 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
494 while (t.hasMoreTokens()) {
495 Assert.assertEquals("0123456789abcdef", t.nextToken());
496 }
497 }
498 }
499
500 @Test
501 public void testBasicPost() throws Exception {
502 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
503
504 @Override
505 public AsyncServerExchangeHandler get() {
506 return new SingleLineResponseHandler("Hi back");
507 }
508
509 });
510 final InetSocketAddress serverEndpoint = server.start();
511
512 client.start();
513 final Future<ClientSessionEndpoint> connectFuture = client.connect(
514 "localhost", serverEndpoint.getPort(), TIMEOUT);
515 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
516
517 for (int i = 0; i < 5; i++) {
518 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
519 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
520 AsyncEntityProducers.create("Hi there")),
521 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
522 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
523 Assert.assertNotNull(result);
524 final HttpResponse response1 = result.getHead();
525 final String entity1 = result.getBody();
526 Assert.assertNotNull(response1);
527 Assert.assertEquals(200, response1.getCode());
528 Assert.assertEquals("Hi back", entity1);
529 }
530 }
531
532 @Test
533 public void testBasicPostPipelined() throws Exception {
534 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
535
536 @Override
537 public AsyncServerExchangeHandler get() {
538 return new SingleLineResponseHandler("Hi back");
539 }
540
541 });
542 final InetSocketAddress serverEndpoint = server.start();
543
544 client.start();
545 final Future<ClientSessionEndpoint> connectFuture = client.connect(
546 "localhost", serverEndpoint.getPort(), TIMEOUT);
547 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
548
549 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
550 for (int i = 0; i < 5; i++) {
551 queue.add(streamEndpoint.execute(
552 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
553 AsyncEntityProducers.create("Hi there")),
554 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
555 }
556 while (!queue.isEmpty()) {
557 final Future<Message<HttpResponse, String>> future = queue.remove();
558 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
559 Assert.assertNotNull(result);
560 final HttpResponse response = result.getHead();
561 final String entity = result.getBody();
562 Assert.assertNotNull(response);
563 Assert.assertEquals(200, response.getCode());
564 Assert.assertEquals("Hi back", entity);
565 }
566 }
567
568 @Test
569 public void testHttp10Post() throws Exception {
570 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
571
572 @Override
573 public AsyncServerExchangeHandler get() {
574 return new SingleLineResponseHandler("Hi back");
575 }
576
577 });
578 final InetSocketAddress serverEndpoint = server.start();
579
580 client.start();
581 final Future<ClientSessionEndpoint> connectFuture = client.connect(
582 "localhost", serverEndpoint.getPort(), TIMEOUT);
583 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
584
585 for (int i = 0; i < 5; i++) {
586 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
587 request.setVersion(HttpVersion.HTTP_1_0);
588 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
589 new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
590 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
591 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
592 Assert.assertNotNull(result);
593 final HttpResponse response1 = result.getHead();
594 final String entity1 = result.getBody();
595 Assert.assertNotNull(response1);
596 Assert.assertEquals(200, response1.getCode());
597 Assert.assertEquals("Hi back", entity1);
598 }
599 }
600
601 @Test
602 public void testNoEntityPost() throws Exception {
603 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
604
605 @Override
606 public AsyncServerExchangeHandler get() {
607 return new SingleLineResponseHandler("Hi back");
608 }
609
610 });
611 final InetSocketAddress serverEndpoint = server.start();
612
613 client.start();
614 final Future<ClientSessionEndpoint> connectFuture = client.connect(
615 "localhost", serverEndpoint.getPort(), TIMEOUT);
616 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
617
618 for (int i = 0; i < 5; i++) {
619 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
620 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
621 new BasicRequestProducer(request, null),
622 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
623 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
624 Assert.assertNotNull(result);
625 final HttpResponse response1 = result.getHead();
626 final String entity1 = result.getBody();
627 Assert.assertNotNull(response1);
628 Assert.assertEquals(200, response1.getCode());
629 Assert.assertEquals("Hi back", entity1);
630 }
631 }
632
633 @Test
634 public void testLargePost() throws Exception {
635 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
636
637 @Override
638 public AsyncServerExchangeHandler get() {
639 return new EchoHandler(2048);
640 }
641
642 });
643 final InetSocketAddress serverEndpoint = server.start();
644
645 client.start();
646 final Future<ClientSessionEndpoint> connectFuture = client.connect(
647 "localhost", serverEndpoint.getPort(), TIMEOUT);
648 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
649
650 for (int i = 0; i < 5; i++) {
651 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
652 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
653 new MultiLineEntityProducer("0123456789abcdef", 5000)),
654 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
655 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
656 Assert.assertNotNull(result);
657 final HttpResponse response = result.getHead();
658 Assert.assertNotNull(response);
659 Assert.assertEquals(200, response.getCode());
660 final String entity = result.getBody();
661 Assert.assertNotNull(entity);
662 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
663 while (t.hasMoreTokens()) {
664 Assert.assertEquals("0123456789abcdef", t.nextToken());
665 }
666 }
667 }
668
669 @Test
670 public void testPostsPipelinedLargeResponse() throws Exception {
671 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
672
673 @Override
674 public AsyncServerExchangeHandler get() {
675 return new MultiLineResponseHandler("0123456789abcdef", 2000);
676 }
677
678 });
679 final InetSocketAddress serverEndpoint = server.start();
680
681 client.start();
682 final Future<ClientSessionEndpoint> connectFuture = client.connect(
683 "localhost", serverEndpoint.getPort(), TIMEOUT);
684 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
685
686 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
687 for (int i = 0; i < 2; i++) {
688 queue.add(streamEndpoint.execute(
689 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
690 AsyncEntityProducers.create("Hi there")),
691 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
692 }
693 while (!queue.isEmpty()) {
694 final Future<Message<HttpResponse, String>> future = queue.remove();
695 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
696 Assert.assertNotNull(result);
697 final HttpResponse response = result.getHead();
698 Assert.assertNotNull(response);
699 Assert.assertEquals(200, response.getCode());
700 final String entity = result.getBody();
701 Assert.assertNotNull(entity);
702 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
703 while (t.hasMoreTokens()) {
704 Assert.assertEquals("0123456789abcdef", t.nextToken());
705 }
706 }
707 }
708
709
710 @Test
711 public void testLargePostsPipelined() throws Exception {
712 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
713
714 @Override
715 public AsyncServerExchangeHandler get() {
716 return new EchoHandler(2048);
717 }
718
719 });
720 final InetSocketAddress serverEndpoint = server.start();
721
722 client.start();
723 final Future<ClientSessionEndpoint> connectFuture = client.connect(
724 "localhost", serverEndpoint.getPort(), TIMEOUT);
725 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
726
727 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
728 for (int i = 0; i < 5; i++) {
729 queue.add(streamEndpoint.execute(
730 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
731 new MultiLineEntityProducer("0123456789abcdef", 5000)),
732 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
733 }
734 while (!queue.isEmpty()) {
735 final Future<Message<HttpResponse, String>> future = queue.remove();
736 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
737 Assert.assertNotNull(result);
738 final HttpResponse response = result.getHead();
739 Assert.assertNotNull(response);
740 Assert.assertEquals(200, response.getCode());
741 final String entity = result.getBody();
742 Assert.assertNotNull(entity);
743 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
744 while (t.hasMoreTokens()) {
745 Assert.assertEquals("0123456789abcdef", t.nextToken());
746 }
747 }
748 }
749
750 @Test
751 public void testSimpleHead() throws Exception {
752 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
753
754 @Override
755 public AsyncServerExchangeHandler get() {
756 return new SingleLineResponseHandler("Hi there");
757 }
758
759 });
760 final InetSocketAddress serverEndpoint = server.start();
761
762 client.start();
763 final Future<ClientSessionEndpoint> connectFuture = client.connect(
764 "localhost", serverEndpoint.getPort(), TIMEOUT);
765 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
766
767 for (int i = 0; i < 5; i++) {
768 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
769 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
770 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
771 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
772 Assert.assertNotNull(result);
773 final HttpResponse response1 = result.getHead();
774 Assert.assertNotNull(response1);
775 Assert.assertEquals(200, response1.getCode());
776 Assert.assertNull(result.getBody());
777 }
778 }
779
780 @Test
781 public void testSimpleHeadConnectionClose() throws Exception {
782 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
783
784 @Override
785 public AsyncServerExchangeHandler get() {
786 return new SingleLineResponseHandler("Hi there");
787 }
788
789 });
790 final InetSocketAddress serverEndpoint = server.start();
791
792 client.start();
793 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
794 for (int i = 0; i < 5; i++) {
795 final Future<ClientSessionEndpoint> connectFuture = client.connect(
796 "localhost", serverEndpoint.getPort(), TIMEOUT);
797 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
798 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
799 AsyncRequestBuilder.head(requestURI)
800 .addHeader(HttpHeaders.CONNECTION, "close")
801 .build(),
802 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
803 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
804 Assert.assertNotNull(result);
805 final HttpResponse response1 = result.getHead();
806 Assert.assertNotNull(response1);
807 Assert.assertEquals(200, response1.getCode());
808 Assert.assertNull(result.getBody());
809 }
810 }
811 }
812
813 @Test
814 public void testHeadPipelined() throws Exception {
815 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
816
817 @Override
818 public AsyncServerExchangeHandler get() {
819 return new SingleLineResponseHandler("Hi there");
820 }
821
822 });
823 final InetSocketAddress serverEndpoint = server.start();
824
825 client.start();
826 final Future<ClientSessionEndpoint> connectFuture = client.connect(
827 "localhost", serverEndpoint.getPort(), TIMEOUT);
828 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
829
830 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
831 for (int i = 0; i < 5; i++) {
832 queue.add(streamEndpoint.execute(
833 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
834 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
835 }
836 while (!queue.isEmpty()) {
837 final Future<Message<HttpResponse, String>> future = queue.remove();
838 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
839 Assert.assertNotNull(result);
840 final HttpResponse response1 = result.getHead();
841 Assert.assertNotNull(response1);
842 Assert.assertEquals(200, response1.getCode());
843 Assert.assertNull(result.getBody());
844 }
845 }
846
847 @Test
848 public void testExpectationFailed() throws Exception {
849 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
850
851 @Override
852 public AsyncServerExchangeHandler get() {
853 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
854
855 @Override
856 protected void handle(
857 final Message<HttpRequest, String> request,
858 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
859 final HttpContext context) throws IOException, HttpException {
860 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
861
862 }
863 };
864 }
865
866 });
867 final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
868
869 @Override
870 public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
871
872 return new BasicAsyncServerExpectationDecorator(handler) {
873
874 @Override
875 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
876 final Header h = request.getFirstHeader("password");
877 if (h != null && "secret".equals(h.getValue())) {
878 return null;
879 } else {
880 return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
881 }
882 }
883 };
884
885 }
886 }, Http1Config.DEFAULT);
887
888 client.start();
889 final Future<IOSession> sessionFuture = client.requestSession(
890 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
891 final IOSession ioSession = sessionFuture.get();
892 final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
893
894 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
895 request1.addHeader("password", "secret");
896 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
897 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
898 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
899 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
900 Assert.assertNotNull(result1);
901 final HttpResponse response1 = result1.getHead();
902 Assert.assertNotNull(response1);
903 Assert.assertEquals(200, response1.getCode());
904 Assert.assertNotNull("All is well", result1.getBody());
905
906 Assert.assertTrue(ioSession.isOpen());
907
908 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
909 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
910 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
911 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
912 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
913 Assert.assertNotNull(result2);
914 final HttpResponse response2 = result2.getHead();
915 Assert.assertNotNull(response2);
916 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
917 Assert.assertNotNull("You shall not pass", result2.getBody());
918
919 Assert.assertTrue(ioSession.isOpen());
920
921 final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
922 request3.addHeader("password", "secret");
923 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
924 new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
925 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
926 final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
927 Assert.assertNotNull(result3);
928 final HttpResponse response3 = result3.getHead();
929 Assert.assertNotNull(response3);
930 Assert.assertEquals(200, response3.getCode());
931 Assert.assertNotNull("All is well", result3.getBody());
932
933 Assert.assertTrue(ioSession.isOpen());
934
935 final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
936 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
937 new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
938 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
939 final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
940 Assert.assertNotNull(result4);
941 final HttpResponse response4 = result4.getHead();
942 Assert.assertNotNull(response4);
943 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
944 Assert.assertNotNull("You shall not pass", result4.getBody());
945
946 Assert.assertFalse(ioSession.isOpen());
947 }
948
949 @Test
950 public void testExpectationFailedCloseConnection() throws Exception {
951 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
952
953 @Override
954 public AsyncServerExchangeHandler get() {
955 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
956
957 @Override
958 protected void handle(
959 final Message<HttpRequest, String> request,
960 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
961 final HttpContext context) throws IOException, HttpException {
962 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
963
964 }
965 };
966 }
967
968 });
969 final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
970
971 @Override
972 public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
973
974 return new BasicAsyncServerExpectationDecorator(handler) {
975
976 @Override
977 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
978 final Header h = request.getFirstHeader("password");
979 if (h != null && "secret".equals(h.getValue())) {
980 return null;
981 } else {
982 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
983 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
984 return new BasicResponseProducer(response, "You shall not pass");
985 }
986 }
987 };
988
989 }
990 }, Http1Config.DEFAULT);
991
992 client.start();
993 final Future<IOSession> sessionFuture = client.requestSession(
994 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
995 final IOSession ioSession = sessionFuture.get();
996 final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
997
998 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
999 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1000 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1001 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1002 100000,
1003 ContentType.TEXT_PLAIN)),
1004 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1005 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1006 Assert.assertNotNull(result1);
1007 final HttpResponse response1 = result1.getHead();
1008 Assert.assertNotNull(response1);
1009 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1010 Assert.assertNotNull("You shall not pass", result1.getBody());
1011
1012 Assert.assertFalse(streamEndpoint.isOpen());
1013 }
1014
1015 @Test
1016 public void testDelayedExpectationVerification() throws Exception {
1017 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1018
1019 @Override
1020 public AsyncServerExchangeHandler get() {
1021 return new AsyncServerExchangeHandler() {
1022
1023 private final Random random = new Random(System.currentTimeMillis());
1024 private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
1025 "All is well");
1026
1027 @Override
1028 public void handleRequest(
1029 final HttpRequest request,
1030 final EntityDetails entityDetails,
1031 final ResponseChannel responseChannel,
1032 final HttpContext context) throws HttpException, IOException {
1033
1034 Executors.newSingleThreadExecutor().execute(new Runnable() {
1035 @Override
1036 public void run() {
1037 try {
1038 if (entityDetails != null) {
1039 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
1040 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
1041 Thread.sleep(random.nextInt(1000));
1042 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
1043 }
1044 final HttpResponse response = new BasicHttpResponse(200);
1045 synchronized (entityProducer) {
1046 responseChannel.sendResponse(response, entityProducer, context);
1047 }
1048 }
1049 } catch (final Exception ignore) {
1050
1051 }
1052 }
1053 });
1054
1055 }
1056
1057 @Override
1058 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1059 capacityChannel.update(Integer.MAX_VALUE);
1060 }
1061
1062 @Override
1063 public void consume(final ByteBuffer src) throws IOException {
1064 }
1065
1066 @Override
1067 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1068 }
1069
1070 @Override
1071 public int available() {
1072 synchronized (entityProducer) {
1073 return entityProducer.available();
1074 }
1075 }
1076
1077 @Override
1078 public void produce(final DataStreamChannel channel) throws IOException {
1079 synchronized (entityProducer) {
1080 entityProducer.produce(channel);
1081 }
1082 }
1083
1084 @Override
1085 public void failed(final Exception cause) {
1086 }
1087
1088 @Override
1089 public void releaseResources() {
1090 }
1091
1092 };
1093
1094 }
1095 });
1096 final InetSocketAddress serverEndpoint = server.start();
1097
1098 client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
1099 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1100 "localhost", serverEndpoint.getPort(), TIMEOUT);
1101 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1102
1103 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
1104 for (int i = 0; i < 5; i++) {
1105 queue.add(streamEndpoint.execute(
1106 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
1107 AsyncEntityProducers.create("Some important message")),
1108 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
1109 }
1110 while (!queue.isEmpty()) {
1111 final Future<Message<HttpResponse, String>> future = queue.remove();
1112 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1113 Assert.assertNotNull(result);
1114 final HttpResponse response = result.getHead();
1115 Assert.assertNotNull(response);
1116 Assert.assertEquals(200, response.getCode());
1117 Assert.assertNotNull("All is well", result.getBody());
1118 }
1119 }
1120
1121 @Test
1122 public void testPrematureResponse() throws Exception {
1123 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1124
1125 @Override
1126 public AsyncServerExchangeHandler get() {
1127 return new AsyncServerExchangeHandler() {
1128
1129 private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
1130
1131 @Override
1132 public void handleRequest(
1133 final HttpRequest request,
1134 final EntityDetails entityDetails,
1135 final ResponseChannel responseChannel,
1136 final HttpContext context) throws HttpException, IOException {
1137 final AsyncResponseProducer producer;
1138 final Header h = request.getFirstHeader("password");
1139 if (h != null && "secret".equals(h.getValue())) {
1140 producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1141 } else {
1142 producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1143 }
1144 responseProducer.set(producer);
1145 producer.sendResponse(responseChannel, context);
1146 }
1147
1148 @Override
1149 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1150 capacityChannel.update(Integer.MAX_VALUE);
1151 }
1152
1153 @Override
1154 public void consume(final ByteBuffer src) throws IOException {
1155 }
1156
1157 @Override
1158 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1159 }
1160
1161 @Override
1162 public int available() {
1163 final AsyncResponseProducer producer = responseProducer.get();
1164 return producer.available();
1165 }
1166
1167 @Override
1168 public void produce(final DataStreamChannel channel) throws IOException {
1169 final AsyncResponseProducer producer = responseProducer.get();
1170 producer.produce(channel);
1171 }
1172
1173 @Override
1174 public void failed(final Exception cause) {
1175 }
1176
1177 @Override
1178 public void releaseResources() {
1179 }
1180 };
1181 }
1182
1183 });
1184 final InetSocketAddress serverEndpoint = server.start();
1185
1186 client.start();
1187 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1188 "localhost", serverEndpoint.getPort(), TIMEOUT);
1189 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1190
1191 for (int i = 0; i < 3; i++) {
1192 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1193 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1194 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1195 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1196 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1197 Assert.assertNotNull(result1);
1198 final HttpResponse response1 = result1.getHead();
1199 Assert.assertNotNull(response1);
1200 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1201 Assert.assertNotNull("You shall not pass", result1.getBody());
1202
1203 Assert.assertTrue(streamEndpoint.isOpen());
1204 }
1205 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1206 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1207 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1208 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1209 100000,
1210 ContentType.TEXT_PLAIN)),
1211 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1212 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1213 Assert.assertNotNull(result1);
1214 final HttpResponse response1 = result1.getHead();
1215 Assert.assertNotNull(response1);
1216 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1217 Assert.assertNotNull("You shall not pass", result1.getBody());
1218 }
1219
1220 @Test
1221 public void testSlowResponseConsumer() throws Exception {
1222 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
1223
1224 @Override
1225 public AsyncServerExchangeHandler get() {
1226 return new MultiLineResponseHandler("0123456789abcd", 100);
1227 }
1228
1229 });
1230 final InetSocketAddress serverEndpoint = server.start();
1231
1232 client.start(Http1Config.custom().setBufferSize(256).build());
1233
1234 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1235 "localhost", serverEndpoint.getPort(), TIMEOUT);
1236 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1237
1238 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1239 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1240 new BasicRequestProducer(request1, null),
1241 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1242
1243 @Override
1244 protected String consumeData(
1245 final ContentType contentType, final InputStream inputStream) throws IOException {
1246 Charset charset = contentType != null ? contentType.getCharset() : null;
1247 if (charset == null) {
1248 charset = StandardCharsets.US_ASCII;
1249 }
1250
1251 final StringBuilder buffer = new StringBuilder();
1252 try {
1253 final byte[] tmp = new byte[16];
1254 int l;
1255 while ((l = inputStream.read(tmp)) != -1) {
1256 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1257 Thread.sleep(50);
1258 }
1259 } catch (final InterruptedException ex) {
1260 Thread.currentThread().interrupt();
1261 throw new InterruptedIOException(ex.getMessage());
1262 }
1263 return buffer.toString();
1264 }
1265 }),
1266 null);
1267
1268 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1269 Assert.assertNotNull(result1);
1270 final HttpResponse response1 = result1.getHead();
1271 Assert.assertNotNull(response1);
1272 Assert.assertEquals(200, response1.getCode());
1273 final String s1 = result1.getBody();
1274 Assert.assertNotNull(s1);
1275 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1276 while (t1.hasMoreTokens()) {
1277 Assert.assertEquals("0123456789abcd", t1.nextToken());
1278 }
1279 }
1280
1281 @Test
1282 public void testSlowRequestProducer() throws Exception {
1283 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1284
1285 @Override
1286 public AsyncServerExchangeHandler get() {
1287 return new EchoHandler(2048);
1288 }
1289
1290 });
1291 final InetSocketAddress serverEndpoint = server.start();
1292
1293 client.start();
1294 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1295 "localhost", serverEndpoint.getPort(), TIMEOUT);
1296 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1297
1298 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1299 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1300 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1301
1302 @Override
1303 protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1304 Charset charset = contentType.getCharset();
1305 if (charset == null) {
1306 charset = StandardCharsets.US_ASCII;
1307 }
1308 try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1309 for (int i = 0; i < 500; i++) {
1310 if (i % 100 == 0) {
1311 writer.flush();
1312 Thread.sleep(500);
1313 }
1314 writer.write("0123456789abcdef\r\n");
1315 }
1316 } catch (final InterruptedException ex) {
1317 Thread.currentThread().interrupt();
1318 throw new InterruptedIOException(ex.getMessage());
1319 }
1320 }
1321
1322 }),
1323 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1324 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1325 Assert.assertNotNull(result1);
1326 final HttpResponse response1 = result1.getHead();
1327 Assert.assertNotNull(response1);
1328 Assert.assertEquals(200, response1.getCode());
1329 final String s1 = result1.getBody();
1330 Assert.assertNotNull(s1);
1331 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1332 while (t1.hasMoreTokens()) {
1333 Assert.assertEquals("0123456789abcdef", t1.nextToken());
1334 }
1335 }
1336
1337 @Test
1338 public void testSlowResponseProducer() throws Exception {
1339 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1340
1341 @Override
1342 public AsyncServerExchangeHandler get() {
1343 return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1344
1345 @Override
1346 protected void handle(
1347 final HttpRequest request,
1348 final InputStream requestStream,
1349 final HttpResponse response,
1350 final OutputStream responseStream,
1351 final HttpContext context) throws IOException, HttpException {
1352
1353 if (!"/hello".equals(request.getPath())) {
1354 response.setCode(HttpStatus.SC_NOT_FOUND);
1355 return;
1356 }
1357 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1358 response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1359 return;
1360 }
1361 if (requestStream == null) {
1362 return;
1363 }
1364 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1365 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1366 Charset charset = contentType != null ? contentType.getCharset() : null;
1367 if (charset == null) {
1368 charset = StandardCharsets.US_ASCII;
1369 }
1370 response.setCode(HttpStatus.SC_OK);
1371 response.setHeader(h1);
1372 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1373 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1374 try {
1375 String l;
1376 int count = 0;
1377 while ((l = reader.readLine()) != null) {
1378 writer.write(l);
1379 writer.write("\r\n");
1380 count++;
1381 if (count % 500 == 0) {
1382 Thread.sleep(500);
1383 }
1384 }
1385 writer.flush();
1386 } catch (final InterruptedException ex) {
1387 Thread.currentThread().interrupt();
1388 throw new InterruptedIOException(ex.getMessage());
1389 }
1390 }
1391 }
1392 };
1393 }
1394
1395 });
1396 final InetSocketAddress serverEndpoint = server.start();
1397
1398 client.start(Http1Config.custom().setBufferSize(256).build());
1399
1400 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1401 "localhost", serverEndpoint.getPort(), TIMEOUT);
1402 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1403
1404 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1405 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1406 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1407 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1408 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1409 Assert.assertNotNull(result1);
1410 final HttpResponse response1 = result1.getHead();
1411 Assert.assertNotNull(response1);
1412 Assert.assertEquals(200, response1.getCode());
1413 final String s1 = result1.getBody();
1414 Assert.assertNotNull(s1);
1415 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1416 while (t1.hasMoreTokens()) {
1417 Assert.assertEquals("0123456789abcd", t1.nextToken());
1418 }
1419 }
1420
1421 @Test
1422 public void testPipelinedConnectionClose() throws Exception {
1423 server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1424
1425 @Override
1426 public AsyncServerExchangeHandler get() {
1427 return new SingleLineResponseHandler("Hi back");
1428 }
1429
1430 });
1431 final InetSocketAddress serverEndpoint = server.start();
1432
1433 client.start();
1434 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1435 "localhost", serverEndpoint.getPort(), TIMEOUT);
1436 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1437
1438 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1439 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1440 AsyncEntityProducers.create("Hi there")),
1441 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1442 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1443 request2.addHeader(HttpHeaders.CONNECTION, "close");
1444 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1445 new BasicRequestProducer(request2,
1446 AsyncEntityProducers.create("Hi there")),
1447 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1448 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1449 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1450 AsyncEntityProducers.create("Hi there")),
1451 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1452
1453 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1454 Assert.assertNotNull(result1);
1455 final HttpResponse response1 = result1.getHead();
1456 final String entity1 = result1.getBody();
1457 Assert.assertNotNull(response1);
1458 Assert.assertEquals(200, response1.getCode());
1459 Assert.assertEquals("Hi back", entity1);
1460
1461 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1462 Assert.assertNotNull(result2);
1463 final HttpResponse response2 = result2.getHead();
1464 final String entity2 = result2.getBody();
1465 Assert.assertNotNull(response2);
1466 Assert.assertEquals(200, response2.getCode());
1467 Assert.assertEquals("Hi back", entity2);
1468
1469 try {
1470 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1471 Assert.fail("ExecutionException expected");
1472 } catch (final CancellationException | ExecutionException ignore) {
1473 }
1474
1475 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1476 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1477 AsyncEntityProducers.create("Hi there")),
1478 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1479 try {
1480 future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1481 Assert.fail("CancellationException or ExecutionException expected");
1482 } catch (final CancellationException ignore) {
1483 Assert.assertTrue(future4.isCancelled());
1484 } catch (final ExecutionException ignore) {
1485 }
1486 }
1487
1488 @Test
1489 public void testPipelinedInvalidRequest() throws Exception {
1490 server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1491
1492 @Override
1493 public AsyncServerExchangeHandler get() {
1494 return new SingleLineResponseHandler("Hi back");
1495 }
1496
1497 });
1498 final InetSocketAddress serverEndpoint = server.start();
1499
1500 client.start();
1501 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1502 "localhost", serverEndpoint.getPort(), TIMEOUT);
1503 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1504
1505 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1506 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1507 AsyncEntityProducers.create("Hi there")),
1508 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1509 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1510 request2.addHeader(HttpHeaders.HOST, "blah:blah");
1511 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1512 new BasicRequestProducer(request2,
1513 AsyncEntityProducers.create("Hi there")),
1514 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1515 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1516 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1517 AsyncEntityProducers.create("Hi there")),
1518 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1519
1520 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1521 Assert.assertNotNull(result1);
1522 final HttpResponse response1 = result1.getHead();
1523 final String entity1 = result1.getBody();
1524 Assert.assertNotNull(response1);
1525 Assert.assertEquals(200, response1.getCode());
1526 Assert.assertEquals("Hi back", entity1);
1527
1528 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1529 Assert.assertNotNull(result2);
1530 final HttpResponse response2 = result2.getHead();
1531 final String entity2 = result2.getBody();
1532 Assert.assertNotNull(response2);
1533 Assert.assertEquals(400, response2.getCode());
1534 Assert.assertTrue(entity2.length() > 0);
1535
1536 try {
1537 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1538 Assert.fail("ExecutionException expected");
1539 } catch (final CancellationException | ExecutionException ignore) {
1540 }
1541 }
1542
1543 private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1544
1545 private static class BrokenChunkEncoder extends AbstractContentEncoder {
1546
1547 private final CharArrayBuffer lineBuffer;
1548 private boolean done;
1549
1550 BrokenChunkEncoder(
1551 final WritableByteChannel channel,
1552 final SessionOutputBuffer buffer,
1553 final BasicHttpTransportMetrics metrics) {
1554 super(channel, buffer, metrics);
1555 lineBuffer = new CharArrayBuffer(16);
1556 }
1557
1558 @Override
1559 public void complete(final List<? extends Header> trailers) throws IOException {
1560 super.complete(trailers);
1561 }
1562
1563 @Override
1564 public int write(final ByteBuffer src) throws IOException {
1565 final int chunk;
1566 if (!done) {
1567 lineBuffer.clear();
1568 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1569 buffer().writeLine(lineBuffer);
1570 buffer().write(ByteBuffer.wrap(GARBAGE));
1571 done = true;
1572 chunk = GARBAGE.length;
1573 } else {
1574 chunk = 0;
1575 }
1576 final long bytesWritten = buffer().flush(channel());
1577 if (bytesWritten > 0) {
1578 metrics().incrementBytesTransferred(bytesWritten);
1579 }
1580 if (!buffer().hasData()) {
1581 channel().close();
1582 }
1583 return chunk;
1584 }
1585
1586 }
1587
1588 @Test
1589 public void testTruncatedChunk() throws Exception {
1590 final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1591 HttpProcessors.server(),
1592 new HandlerFactory<AsyncServerExchangeHandler>() {
1593
1594 @Override
1595 public AsyncServerExchangeHandler create(
1596 final HttpRequest request,
1597 final HttpContext context) throws HttpException {
1598 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1599
1600 @Override
1601 protected void handle(
1602 final Message<HttpRequest, String> request,
1603 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1604 final HttpContext context) throws IOException, HttpException {
1605 responseTrigger.submitResponse(
1606 new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1607 }
1608
1609 };
1610 }
1611
1612 },
1613 Http1Config.DEFAULT,
1614 CharCodingConfig.DEFAULT,
1615 DefaultConnectionReuseStrategy.INSTANCE,
1616 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1617
1618 @Override
1619 protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1620 final ProtocolIOSession ioSession,
1621 final HttpProcessor httpProcessor,
1622 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1623 final Http1Config http1Config,
1624 final CharCodingConfig connectionConfig,
1625 final ConnectionReuseStrategy connectionReuseStrategy,
1626 final NHttpMessageParser<HttpRequest> incomingMessageParser,
1627 final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1628 final ContentLengthStrategy incomingContentStrategy,
1629 final ContentLengthStrategy outgoingContentStrategy,
1630 final Http1StreamListener streamListener) {
1631 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1632 scheme.id,
1633 http1Config, connectionConfig, connectionReuseStrategy,
1634 incomingMessageParser, outgoingMessageWriter,
1635 incomingContentStrategy, outgoingContentStrategy,
1636 streamListener) {
1637
1638 @Override
1639 protected ContentEncoder createContentEncoder(
1640 final long len,
1641 final WritableByteChannel channel,
1642 final SessionOutputBuffer buffer,
1643 final BasicHttpTransportMetrics metrics) throws HttpException {
1644 if (len == ContentLengthStrategy.CHUNKED) {
1645 return new BrokenChunkEncoder(channel, buffer, metrics);
1646 } else {
1647 return super.createContentEncoder(len, channel, buffer, metrics);
1648 }
1649 }
1650
1651 };
1652 }
1653
1654 });
1655
1656 client.start();
1657 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1658 "localhost", serverEndpoint.getPort(), TIMEOUT);
1659 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1660
1661 final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1662 final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1663
1664 @Override
1665 public void releaseResources() {
1666
1667 }
1668
1669 };
1670 final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1671 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1672 try {
1673 future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1674 Assert.fail("ExecutionException should have been thrown");
1675 } catch (final ExecutionException ex) {
1676 final Throwable cause = ex.getCause();
1677 Assert.assertTrue(cause instanceof MalformedChunkCodingException);
1678 Assert.assertEquals("garbage", entityConsumer.generateContent());
1679 }
1680 }
1681
1682 @Test
1683 public void testExceptionInHandler() throws Exception {
1684 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1685
1686 @Override
1687 public AsyncServerExchangeHandler get() {
1688 return new SingleLineResponseHandler("Hi there") {
1689
1690 @Override
1691 protected void handle(
1692 final Message<HttpRequest, String> request,
1693 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1694 final HttpContext context) throws IOException, HttpException {
1695 throw new HttpException("Boom");
1696 }
1697 };
1698 }
1699
1700 });
1701 final InetSocketAddress serverEndpoint = server.start();
1702
1703 client.start();
1704 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1705 "localhost", serverEndpoint.getPort(), TIMEOUT);
1706 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1707
1708 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1709 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1710 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1711 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1712 Assert.assertNotNull(result);
1713 final HttpResponse response1 = result.getHead();
1714 final String entity1 = result.getBody();
1715 Assert.assertNotNull(response1);
1716 Assert.assertEquals(500, response1.getCode());
1717 Assert.assertEquals("Boom", entity1);
1718 }
1719
1720 @Test
1721 public void testNoServiceHandler() throws Exception {
1722 final InetSocketAddress serverEndpoint = server.start();
1723
1724 client.start();
1725 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1726 "localhost", serverEndpoint.getPort(), TIMEOUT);
1727 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1728
1729 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1730 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1731 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1732 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1733 Assert.assertNotNull(result);
1734 final HttpResponse response1 = result.getHead();
1735 final String entity1 = result.getBody();
1736 Assert.assertNotNull(response1);
1737 Assert.assertEquals(404, response1.getCode());
1738 Assert.assertEquals("Resource not found", entity1);
1739 }
1740
1741 @Test
1742 public void testResponseNoContent() throws Exception {
1743 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1744
1745 @Override
1746 public AsyncServerExchangeHandler get() {
1747 return new SingleLineResponseHandler("Hi there") {
1748
1749 @Override
1750 protected void handle(
1751 final Message<HttpRequest, String> request,
1752 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1753 final HttpContext context) throws IOException, HttpException {
1754 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1755 responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1756 }
1757 };
1758 }
1759
1760 });
1761 final InetSocketAddress serverEndpoint = server.start();
1762
1763 client.start();
1764 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1765 "localhost", serverEndpoint.getPort(), TIMEOUT);
1766 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1767
1768 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1769 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1770 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1771 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1772 Assert.assertNotNull(result);
1773 final HttpResponse response1 = result.getHead();
1774 Assert.assertNotNull(response1);
1775 Assert.assertEquals(204, response1.getCode());
1776 Assert.assertNull(result.getBody());
1777 }
1778
1779 @Test
1780 public void testAbsentHostHeader() throws Exception {
1781 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1782
1783 @Override
1784 public AsyncServerExchangeHandler get() {
1785 return new SingleLineResponseHandler("Hi there");
1786 }
1787
1788 });
1789 final InetSocketAddress serverEndpoint = server.start();
1790
1791 client.start(new DefaultHttpProcessor(new RequestContent(), new RequestConnControl()), Http1Config.DEFAULT);
1792
1793 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1794 "localhost", serverEndpoint.getPort(), TIMEOUT);
1795 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1796
1797 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1798 request1.setVersion(HttpVersion.HTTP_1_0);
1799 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1800 new BasicRequestProducer(request1, null),
1801 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1802 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1803 Assert.assertNotNull(result1);
1804 final HttpResponse response1 = result1.getHead();
1805 Assert.assertNotNull(response1);
1806 Assert.assertEquals(200, response1.getCode());
1807 Assert.assertEquals("Hi there", result1.getBody());
1808
1809 final HttpRequest request2 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1810 request2.setVersion(HttpVersion.HTTP_1_1);
1811 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1812 new BasicRequestProducer(request2, null),
1813 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1814 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1815 Assert.assertNotNull(result2);
1816 final HttpResponse response2 = result2.getHead();
1817 Assert.assertNotNull(response2);
1818 Assert.assertEquals(400, response2.getCode());
1819 Assert.assertEquals("Host header is absent", result2.getBody());
1820 }
1821
1822 @Test
1823 public void testMessageWithTrailers() throws Exception {
1824 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1825
1826 @Override
1827 public AsyncServerExchangeHandler get() {
1828 return new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1829
1830 @Override
1831 protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1832 final HttpRequest request,
1833 final EntityDetails entityDetails,
1834 final HttpContext context) throws HttpException {
1835 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1836 }
1837
1838 @Override
1839 protected void handle(
1840 final Message<HttpRequest, String> requestMessage,
1841 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1842 final HttpContext context) throws HttpException, IOException {
1843 responseTrigger.submitResponse(new BasicResponseProducer(
1844 HttpStatus.SC_OK,
1845 new DigestingEntityProducer("MD5",
1846 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1847 }
1848 };
1849 }
1850
1851 });
1852 final InetSocketAddress serverEndpoint = server.start();
1853
1854 client.start();
1855
1856 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1857 "localhost", serverEndpoint.getPort(), TIMEOUT);
1858 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1859
1860 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1861 final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1862 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1863 new BasicRequestProducer(request1, null),
1864 new BasicResponseConsumer<>(entityConsumer), null);
1865 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1866 Assert.assertNotNull(result1);
1867 final HttpResponse response1 = result1.getHead();
1868 Assert.assertNotNull(response1);
1869 Assert.assertEquals(200, response1.getCode());
1870 Assert.assertEquals("Hello back with some trailers", result1.getBody());
1871
1872 final List<Header> trailers = entityConsumer.getTrailers();
1873 Assert.assertNotNull(trailers);
1874 Assert.assertEquals(2, trailers.size());
1875 final Map<String, String> map = new HashMap<>();
1876 for (final Header header: trailers) {
1877 map.put(header.getName().toLowerCase(Locale.ROOT), header.getValue());
1878 }
1879 final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1880 Assert.assertEquals("MD5", map.get("digest-algo"));
1881 Assert.assertEquals(digest, map.get("digest"));
1882 }
1883
1884 @Test
1885 public void testProtocolException() throws Exception {
1886 server.register("/boom", new Supplier<AsyncServerExchangeHandler>() {
1887
1888 @Override
1889 public AsyncServerExchangeHandler get() {
1890 return new AsyncServerExchangeHandler() {
1891
1892 private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1893
1894 @Override
1895 public void releaseResources() {
1896 entityProducer.releaseResources();
1897 }
1898
1899 @Override
1900 public void handleRequest(
1901 final HttpRequest request,
1902 final EntityDetails entityDetails,
1903 final ResponseChannel responseChannel,
1904 final HttpContext context) throws HttpException, IOException {
1905 final String requestUri = request.getRequestUri();
1906 if (requestUri.endsWith("boom")) {
1907 throw new ProtocolException("Boom!!!");
1908 }
1909 responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1910 }
1911
1912 @Override
1913 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1914 capacityChannel.update(Integer.MAX_VALUE);
1915 }
1916
1917 @Override
1918 public void consume(final ByteBuffer src) throws IOException {
1919 }
1920
1921 @Override
1922 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1923
1924 }
1925
1926 @Override
1927 public int available() {
1928 return entityProducer.available();
1929 }
1930
1931 @Override
1932 public void produce(final DataStreamChannel channel) throws IOException {
1933 entityProducer.produce(channel);
1934 }
1935
1936 @Override
1937 public void failed(final Exception cause) {
1938 releaseResources();
1939 }
1940
1941 };
1942 }
1943
1944 });
1945
1946 final InetSocketAddress serverEndpoint = server.start();
1947
1948 client.start();
1949 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1950 "localhost", serverEndpoint.getPort(), TIMEOUT);
1951 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1952 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1953 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1954 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1955 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1956 Assert.assertNotNull(result);
1957 final HttpResponse response1 = result.getHead();
1958 final String entity1 = result.getBody();
1959 Assert.assertNotNull(response1);
1960 Assert.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1961 Assert.assertEquals("Boom!!!", entity1);
1962 }
1963
1964 @Test
1965 public void testHeaderTooLarge() throws Exception {
1966 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1967
1968 @Override
1969 public AsyncServerExchangeHandler get() {
1970 return new SingleLineResponseHandler("Hi there");
1971 }
1972
1973 });
1974 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1975 .setMaxLineLength(100)
1976 .build());
1977 client.start();
1978
1979 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1980 "localhost", serverEndpoint.getPort(), TIMEOUT);
1981 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1982
1983 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1984 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1985 "1234567890123456789012345678901234567890");
1986 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1987 new BasicRequestProducer(request1, null),
1988 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1989 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1990 Assert.assertNotNull(result1);
1991 final HttpResponse response1 = result1.getHead();
1992 Assert.assertNotNull(response1);
1993 Assert.assertEquals(431, response1.getCode());
1994 Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
1995 }
1996
1997 @Test
1998 public void testHeaderTooLargePost() throws Exception {
1999 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
2000
2001 @Override
2002 public AsyncServerExchangeHandler get() {
2003 return new SingleLineResponseHandler("Hi there");
2004 }
2005
2006 });
2007 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
2008 .setMaxLineLength(100)
2009 .build());
2010 client.start(
2011 new DefaultHttpProcessor(new RequestContent(), new RequestTargetHost(), new RequestConnControl()), null);
2012
2013 final Future<ClientSessionEndpoint> connectFuture = client.connect(
2014 "localhost", serverEndpoint.getPort(), TIMEOUT);
2015 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
2016
2017 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
2018 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
2019 "1234567890123456789012345678901234567890");
2020
2021 final byte[] b = new byte[2048];
2022 for (int i = 0; i < b.length; i++) {
2023 b[i] = (byte) ('a' + i % 10);
2024 }
2025
2026 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
2027 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
2028 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
2029 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
2030 Assert.assertNotNull(result1);
2031 final HttpResponse response1 = result1.getHead();
2032 Assert.assertNotNull(response1);
2033 Assert.assertEquals(431, response1.getCode());
2034 Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
2035 }
2036
2037 }