View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import static org.hamcrest.MatcherAssert.assertThat;
31  
32  import java.io.BufferedReader;
33  import java.io.BufferedWriter;
34  import java.io.IOException;
35  import java.io.InputStream;
36  import java.io.InputStreamReader;
37  import java.io.InterruptedIOException;
38  import java.io.OutputStream;
39  import java.io.OutputStreamWriter;
40  import java.net.InetSocketAddress;
41  import java.net.URI;
42  import java.net.URISyntaxException;
43  import java.nio.ByteBuffer;
44  import java.nio.channels.WritableByteChannel;
45  import java.nio.charset.Charset;
46  import java.nio.charset.StandardCharsets;
47  import java.util.HashMap;
48  import java.util.LinkedList;
49  import java.util.List;
50  import java.util.Map;
51  import java.util.Queue;
52  import java.util.Random;
53  import java.util.StringTokenizer;
54  import java.util.concurrent.CancellationException;
55  import java.util.concurrent.ExecutionException;
56  import java.util.concurrent.Executors;
57  import java.util.concurrent.Future;
58  import java.util.concurrent.atomic.AtomicReference;
59  
60  import org.apache.hc.core5.http.ConnectionReuseStrategy;
61  import org.apache.hc.core5.http.ContentLengthStrategy;
62  import org.apache.hc.core5.http.ContentType;
63  import org.apache.hc.core5.http.EntityDetails;
64  import org.apache.hc.core5.http.Header;
65  import org.apache.hc.core5.http.HeaderElements;
66  import org.apache.hc.core5.http.HttpException;
67  import org.apache.hc.core5.http.HttpHeaders;
68  import org.apache.hc.core5.http.HttpHost;
69  import org.apache.hc.core5.http.HttpRequest;
70  import org.apache.hc.core5.http.HttpResponse;
71  import org.apache.hc.core5.http.HttpStatus;
72  import org.apache.hc.core5.http.HttpVersion;
73  import org.apache.hc.core5.http.MalformedChunkCodingException;
74  import org.apache.hc.core5.http.Message;
75  import org.apache.hc.core5.http.Method;
76  import org.apache.hc.core5.http.ProtocolException;
77  import org.apache.hc.core5.http.URIScheme;
78  import org.apache.hc.core5.http.config.CharCodingConfig;
79  import org.apache.hc.core5.http.config.Http1Config;
80  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
81  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
82  import org.apache.hc.core5.http.impl.Http1StreamListener;
83  import org.apache.hc.core5.http.impl.HttpProcessors;
84  import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
85  import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
86  import org.apache.hc.core5.http.message.BasicHttpRequest;
87  import org.apache.hc.core5.http.message.BasicHttpResponse;
88  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
89  import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
90  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
91  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
92  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
93  import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
94  import org.apache.hc.core5.http.nio.CapacityChannel;
95  import org.apache.hc.core5.http.nio.ContentEncoder;
96  import org.apache.hc.core5.http.nio.DataStreamChannel;
97  import org.apache.hc.core5.http.nio.HandlerFactory;
98  import org.apache.hc.core5.http.nio.NHttpMessageParser;
99  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
100 import org.apache.hc.core5.http.nio.ResponseChannel;
101 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
102 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
103 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
104 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
105 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
106 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
107 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
108 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
109 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
110 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
111 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
112 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
113 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
114 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
115 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
116 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
117 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
118 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
119 import org.apache.hc.core5.http.protocol.HttpContext;
120 import org.apache.hc.core5.http.protocol.HttpProcessor;
121 import org.apache.hc.core5.http.protocol.RequestConnControl;
122 import org.apache.hc.core5.http.protocol.RequestContent;
123 import org.apache.hc.core5.http.protocol.RequestTargetHost;
124 import org.apache.hc.core5.http.protocol.RequestValidateHost;
125 import org.apache.hc.core5.reactor.IOSession;
126 import org.apache.hc.core5.reactor.ProtocolIOSession;
127 import org.apache.hc.core5.testing.SSLTestContexts;
128 import org.apache.hc.core5.testing.nio.extension.Http1TestResources;
129 import org.apache.hc.core5.util.CharArrayBuffer;
130 import org.apache.hc.core5.util.TextUtils;
131 import org.apache.hc.core5.util.Timeout;
132 import org.hamcrest.CoreMatchers;
133 import org.junit.jupiter.api.Assertions;
134 import org.junit.jupiter.api.Test;
135 import org.junit.jupiter.api.extension.RegisterExtension;
136 
137 public abstract class Http1IntegrationTest {
138 
139     private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
140     private static final Timeout LONG_TIMEOUT = Timeout.ofMinutes(2);
141 
142     private final URIScheme scheme;
143 
144     @RegisterExtension
145     private final Http1TestResources resources;
146 
147     public Http1IntegrationTest(final URIScheme scheme) {
148         this.scheme = scheme;
149         this.resources = new Http1TestResources(scheme, TIMEOUT);
150     }
151 
152     private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
153         try {
154             return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
155         } catch (final URISyntaxException e) {
156             throw new IllegalStateException();
157         }
158     }
159 
160     @Test
161     public void testSimpleGet() throws Exception {
162         final Http1TestServer server = resources.server();
163         final Http1TestClient client = resources.client();
164 
165         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
166         final InetSocketAddress serverEndpoint = server.start();
167 
168         client.start();
169         final Future<ClientSessionEndpoint> connectFuture = client.connect(
170                 "localhost", serverEndpoint.getPort(), TIMEOUT);
171         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
172 
173         for (int i = 0; i < 5; i++) {
174             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
175                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
176                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
177             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
178             Assertions.assertNotNull(result);
179             final HttpResponse response1 = result.getHead();
180             final String entity1 = result.getBody();
181             Assertions.assertNotNull(response1);
182             Assertions.assertEquals(200, response1.getCode());
183             Assertions.assertEquals("Hi there", entity1);
184         }
185     }
186 
187     @Test
188     public void testSimpleGetConnectionClose() throws Exception {
189         final Http1TestServer server = resources.server();
190         final Http1TestClient client = resources.client();
191 
192         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
193         final InetSocketAddress serverEndpoint = server.start();
194 
195         client.start();
196         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
197         for (int i = 0; i < 5; i++) {
198             final Future<ClientSessionEndpoint> connectFuture = client.connect(
199                     "localhost", serverEndpoint.getPort(), TIMEOUT);
200             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
201                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
202                         AsyncRequestBuilder.get(requestURI)
203                                 .addHeader(HttpHeaders.CONNECTION, "close")
204                                 .build(),
205                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
206                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
207                 Assertions.assertNotNull(result);
208                 final HttpResponse response1 = result.getHead();
209                 final String entity1 = result.getBody();
210                 Assertions.assertNotNull(response1);
211                 Assertions.assertEquals(200, response1.getCode());
212                 Assertions.assertEquals("Hi there", entity1);
213             }
214         }
215     }
216 
217     @Test
218     public void testSimpleGetIdentityTransfer() throws Exception {
219         final Http1TestServer server = resources.server();
220         final Http1TestClient client = resources.client();
221 
222         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
223         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
224         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
225 
226         client.start();
227 
228         final int reqNo = 5;
229 
230         for (int i = 0; i < reqNo; i++) {
231             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
232             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
233 
234             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
235                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
236                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
237             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
238 
239             streamEndpoint.close();
240 
241             Assertions.assertNotNull(result);
242             final HttpResponse response = result.getHead();
243             final String entity = result.getBody();
244             Assertions.assertNotNull(response);
245             Assertions.assertEquals(200, response.getCode());
246             Assertions.assertEquals("Hi there", entity);
247         }
248 
249     }
250 
251     @Test
252     public void testPostIdentityTransfer() throws Exception {
253         final Http1TestServer server = resources.server();
254         final Http1TestClient client = resources.client();
255 
256         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
257         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
258         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
259 
260         client.start();
261 
262         final int reqNo = 5;
263 
264         for (int i = 0; i < reqNo; i++) {
265             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
266             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
267 
268             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
269                     new BasicRequestProducer(Method.POST,
270                             createRequestURI(serverEndpoint, "/hello"),
271                             new MultiLineEntityProducer("Hello", 16 * i)),
272                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
273             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
274 
275             streamEndpoint.close();
276 
277             Assertions.assertNotNull(result);
278             final HttpResponse response = result.getHead();
279             final String entity = result.getBody();
280             Assertions.assertNotNull(response);
281             Assertions.assertEquals(200, response.getCode());
282             Assertions.assertEquals("Hi there", entity);
283         }
284     }
285 
286     @Test
287     public void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
288         final Http1TestServer server = resources.server();
289         final Http1TestClient client = resources.client();
290 
291         server.register("/hello", () -> new ImmediateResponseExchangeHandler(500, "Go away"));
292         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
293         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
294 
295         client.start();
296 
297         final int reqNo = 5;
298 
299         for (int i = 0; i < reqNo; i++) {
300             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
301             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
302 
303             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
304                     new BasicRequestProducer(Method.POST,
305                             createRequestURI(serverEndpoint, "/hello"),
306                             new MultiLineEntityProducer("Hello", 16 * i)),
307                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
308             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
309 
310             streamEndpoint.close();
311 
312             Assertions.assertNotNull(result);
313             final HttpResponse response = result.getHead();
314             final String entity = result.getBody();
315             Assertions.assertNotNull(response);
316             Assertions.assertEquals(500, response.getCode());
317             Assertions.assertEquals("Go away", entity);
318         }
319     }
320 
321     @Test
322     public void testSimpleGetsPipelined() throws Exception {
323         final Http1TestServer server = resources.server();
324         final Http1TestClient client = resources.client();
325 
326         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
327         final InetSocketAddress serverEndpoint = server.start();
328 
329         client.start();
330         final Future<ClientSessionEndpoint> connectFuture = client.connect(
331                 "localhost", serverEndpoint.getPort(), TIMEOUT);
332         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
333 
334         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
335         for (int i = 0; i < 5; i++) {
336             queue.add(streamEndpoint.execute(
337                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
338                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
339         }
340         while (!queue.isEmpty()) {
341             final Future<Message<HttpResponse, String>> future = queue.remove();
342             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
343             Assertions.assertNotNull(result);
344             final HttpResponse response = result.getHead();
345             final String entity = result.getBody();
346             Assertions.assertNotNull(response);
347             Assertions.assertEquals(200, response.getCode());
348             Assertions.assertEquals("Hi there", entity);
349         }
350     }
351 
352     @Test
353     public void testLargeGet() throws Exception {
354         final Http1TestServer server = resources.server();
355         final Http1TestClient client = resources.client();
356 
357         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 5000));
358         final InetSocketAddress serverEndpoint = server.start();
359 
360         client.start();
361         final Future<ClientSessionEndpoint> connectFuture = client.connect(
362                 "localhost", serverEndpoint.getPort(), TIMEOUT);
363         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
364 
365         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
366                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
367                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
368 
369         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
370         Assertions.assertNotNull(result1);
371         final HttpResponse response1 = result1.getHead();
372         Assertions.assertNotNull(response1);
373         Assertions.assertEquals(200, response1.getCode());
374         final String s1 = result1.getBody();
375         Assertions.assertNotNull(s1);
376         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
377         while (t1.hasMoreTokens()) {
378             Assertions.assertEquals("0123456789abcdef", t1.nextToken());
379         }
380 
381         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
382                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
383                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
384 
385         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
386         Assertions.assertNotNull(result2);
387         final HttpResponse response2 = result2.getHead();
388         Assertions.assertNotNull(response2);
389         Assertions.assertEquals(200, response2.getCode());
390         final String s2 = result2.getBody();
391         Assertions.assertNotNull(s2);
392         final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
393         while (t2.hasMoreTokens()) {
394             Assertions.assertEquals("0123456789abcdef", t2.nextToken());
395         }
396     }
397 
398     @Test
399     public void testLargeGetsPipelined() throws Exception {
400         final Http1TestServer server = resources.server();
401         final Http1TestClient client = resources.client();
402 
403         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
404         final InetSocketAddress serverEndpoint = server.start();
405 
406         client.start();
407         final Future<ClientSessionEndpoint> connectFuture = client.connect(
408                 "localhost", serverEndpoint.getPort(), TIMEOUT);
409         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
410 
411         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
412         for (int i = 0; i < 5; i++) {
413             queue.add(streamEndpoint.execute(
414                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
415                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
416         }
417         while (!queue.isEmpty()) {
418             final Future<Message<HttpResponse, String>> future = queue.remove();
419             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
420             Assertions.assertNotNull(result);
421             final HttpResponse response = result.getHead();
422             Assertions.assertNotNull(response);
423             Assertions.assertEquals(200, response.getCode());
424             final String entity = result.getBody();
425             Assertions.assertNotNull(entity);
426             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
427             while (t.hasMoreTokens()) {
428                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
429             }
430         }
431     }
432 
433     @Test
434     public void testBasicPost() throws Exception {
435         final Http1TestServer server = resources.server();
436         final Http1TestClient client = resources.client();
437 
438         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
439         final InetSocketAddress serverEndpoint = server.start();
440 
441         client.start();
442         final Future<ClientSessionEndpoint> connectFuture = client.connect(
443                 "localhost", serverEndpoint.getPort(), TIMEOUT);
444         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
445 
446         for (int i = 0; i < 5; i++) {
447             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
448                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
449                             AsyncEntityProducers.create("Hi there")),
450                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
451             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
452             Assertions.assertNotNull(result);
453             final HttpResponse response1 = result.getHead();
454             final String entity1 = result.getBody();
455             Assertions.assertNotNull(response1);
456             Assertions.assertEquals(200, response1.getCode());
457             Assertions.assertEquals("Hi back", entity1);
458         }
459     }
460 
461     @Test
462     public void testBasicPostPipelined() throws Exception {
463         final Http1TestServer server = resources.server();
464         final Http1TestClient client = resources.client();
465 
466         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
467         final InetSocketAddress serverEndpoint = server.start();
468 
469         client.start();
470         final Future<ClientSessionEndpoint> connectFuture = client.connect(
471                 "localhost", serverEndpoint.getPort(), TIMEOUT);
472         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
473 
474         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
475         for (int i = 0; i < 5; i++) {
476             queue.add(streamEndpoint.execute(
477                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
478                             AsyncEntityProducers.create("Hi there")),
479                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
480         }
481         while (!queue.isEmpty()) {
482             final Future<Message<HttpResponse, String>> future = queue.remove();
483             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
484             Assertions.assertNotNull(result);
485             final HttpResponse response = result.getHead();
486             final String entity = result.getBody();
487             Assertions.assertNotNull(response);
488             Assertions.assertEquals(200, response.getCode());
489             Assertions.assertEquals("Hi back", entity);
490         }
491     }
492 
493     @Test
494     public void testHttp10Post() throws Exception {
495         final Http1TestServer server = resources.server();
496         final Http1TestClient client = resources.client();
497 
498         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
499         final InetSocketAddress serverEndpoint = server.start();
500 
501         client.start();
502         final Future<ClientSessionEndpoint> connectFuture = client.connect(
503                 "localhost", serverEndpoint.getPort(), TIMEOUT);
504         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
505 
506         for (int i = 0; i < 5; i++) {
507             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
508             request.setVersion(HttpVersion.HTTP_1_0);
509             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
510                     new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
511                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
512             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
513             Assertions.assertNotNull(result);
514             final HttpResponse response1 = result.getHead();
515             final String entity1 = result.getBody();
516             Assertions.assertNotNull(response1);
517             Assertions.assertEquals(200, response1.getCode());
518             Assertions.assertEquals("Hi back", entity1);
519         }
520     }
521 
522     @Test
523     public void testNoEntityPost() throws Exception {
524         final Http1TestServer server = resources.server();
525         final Http1TestClient client = resources.client();
526 
527         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
528         final InetSocketAddress serverEndpoint = server.start();
529 
530         client.start();
531         final Future<ClientSessionEndpoint> connectFuture = client.connect(
532                 "localhost", serverEndpoint.getPort(), TIMEOUT);
533         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
534 
535         for (int i = 0; i < 5; i++) {
536             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
537             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
538                     new BasicRequestProducer(request, null),
539                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
540             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
541             Assertions.assertNotNull(result);
542             final HttpResponse response1 = result.getHead();
543             final String entity1 = result.getBody();
544             Assertions.assertNotNull(response1);
545             Assertions.assertEquals(200, response1.getCode());
546             Assertions.assertEquals("Hi back", entity1);
547         }
548     }
549 
550     @Test
551     public void testLargePost() throws Exception {
552         final Http1TestServer server = resources.server();
553         final Http1TestClient client = resources.client();
554 
555         server.register("*", () -> new EchoHandler(2048));
556         final InetSocketAddress serverEndpoint = server.start();
557 
558         client.start();
559         final Future<ClientSessionEndpoint> connectFuture = client.connect(
560                 "localhost", serverEndpoint.getPort(), TIMEOUT);
561         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
562 
563         for (int i = 0; i < 5; i++) {
564             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
565                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
566                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
567                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
568             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
569             Assertions.assertNotNull(result);
570             final HttpResponse response = result.getHead();
571             Assertions.assertNotNull(response);
572             Assertions.assertEquals(200, response.getCode());
573             final String entity = result.getBody();
574             Assertions.assertNotNull(entity);
575             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
576             while (t.hasMoreTokens()) {
577                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
578             }
579         }
580     }
581 
582     @Test
583     public void testPostsPipelinedLargeResponse() throws Exception {
584         final Http1TestServer server = resources.server();
585         final Http1TestClient client = resources.client();
586 
587         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
588         final InetSocketAddress serverEndpoint = server.start();
589 
590         client.start();
591         final Future<ClientSessionEndpoint> connectFuture = client.connect(
592                 "localhost", serverEndpoint.getPort(), TIMEOUT);
593         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
594 
595         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
596         for (int i = 0; i < 2; i++) {
597             queue.add(streamEndpoint.execute(
598                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
599                             AsyncEntityProducers.create("Hi there")),
600                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
601         }
602         while (!queue.isEmpty()) {
603             final Future<Message<HttpResponse, String>> future = queue.remove();
604             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
605             Assertions.assertNotNull(result);
606             final HttpResponse response = result.getHead();
607             Assertions.assertNotNull(response);
608             Assertions.assertEquals(200, response.getCode());
609             final String entity = result.getBody();
610             Assertions.assertNotNull(entity);
611             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
612             while (t.hasMoreTokens()) {
613                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
614             }
615         }
616     }
617 
618 
619     @Test
620     public void testLargePostsPipelined() throws Exception {
621         final Http1TestServer server = resources.server();
622         final Http1TestClient client = resources.client();
623 
624         server.register("*", () -> new EchoHandler(2048));
625         final InetSocketAddress serverEndpoint = server.start();
626 
627         client.start();
628         final Future<ClientSessionEndpoint> connectFuture = client.connect(
629                 "localhost", serverEndpoint.getPort(), TIMEOUT);
630         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
631 
632         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
633         for (int i = 0; i < 5; i++) {
634             queue.add(streamEndpoint.execute(
635                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
636                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
637                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
638         }
639         while (!queue.isEmpty()) {
640             final Future<Message<HttpResponse, String>> future = queue.remove();
641             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
642             Assertions.assertNotNull(result);
643             final HttpResponse response = result.getHead();
644             Assertions.assertNotNull(response);
645             Assertions.assertEquals(200, response.getCode());
646             final String entity = result.getBody();
647             Assertions.assertNotNull(entity);
648             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
649             while (t.hasMoreTokens()) {
650                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
651             }
652         }
653     }
654 
655     @Test
656     public void testSimpleHead() throws Exception {
657         final Http1TestServer server = resources.server();
658         final Http1TestClient client = resources.client();
659 
660         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
661         final InetSocketAddress serverEndpoint = server.start();
662 
663         client.start();
664         final Future<ClientSessionEndpoint> connectFuture = client.connect(
665                 "localhost", serverEndpoint.getPort(), TIMEOUT);
666         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
667 
668         for (int i = 0; i < 5; i++) {
669             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
670                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
671                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
672             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
673             Assertions.assertNotNull(result);
674             final HttpResponse response1 = result.getHead();
675             Assertions.assertNotNull(response1);
676             Assertions.assertEquals(200, response1.getCode());
677             Assertions.assertNull(result.getBody());
678         }
679     }
680 
681     @Test
682     public void testSimpleHeadConnectionClose() throws Exception {
683         final Http1TestServer server = resources.server();
684         final Http1TestClient client = resources.client();
685 
686         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
687         final InetSocketAddress serverEndpoint = server.start();
688 
689         client.start();
690         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
691         for (int i = 0; i < 5; i++) {
692             final Future<ClientSessionEndpoint> connectFuture = client.connect(
693                     "localhost", serverEndpoint.getPort(), TIMEOUT);
694             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
695                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
696                         AsyncRequestBuilder.head(requestURI)
697                                 .addHeader(HttpHeaders.CONNECTION, "close")
698                                 .build(),
699                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
700                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
701                 Assertions.assertNotNull(result);
702                 final HttpResponse response1 = result.getHead();
703                 Assertions.assertNotNull(response1);
704                 Assertions.assertEquals(200, response1.getCode());
705                 Assertions.assertNull(result.getBody());
706             }
707         }
708     }
709 
710     @Test
711     public void testHeadPipelined() throws Exception {
712         final Http1TestServer server = resources.server();
713         final Http1TestClient client = resources.client();
714 
715         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
716         final InetSocketAddress serverEndpoint = server.start();
717 
718         client.start();
719         final Future<ClientSessionEndpoint> connectFuture = client.connect(
720                 "localhost", serverEndpoint.getPort(), TIMEOUT);
721         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
722 
723         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
724         for (int i = 0; i < 5; i++) {
725             queue.add(streamEndpoint.execute(
726                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
727                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
728         }
729         while (!queue.isEmpty()) {
730             final Future<Message<HttpResponse, String>> future = queue.remove();
731             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
732             Assertions.assertNotNull(result);
733             final HttpResponse response1 = result.getHead();
734             Assertions.assertNotNull(response1);
735             Assertions.assertEquals(200, response1.getCode());
736             Assertions.assertNull(result.getBody());
737         }
738     }
739 
740     @Test
741     public void testExpectationFailed() throws Exception {
742         final Http1TestServer server = resources.server();
743         final Http1TestClient client = resources.client();
744 
745         server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
746 
747             @Override
748             protected void handle(
749                     final Message<HttpRequest, String> request,
750                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
751                     final HttpContext context) throws IOException, HttpException {
752                 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
753 
754             }
755         });
756         final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
757 
758             @Override
759             protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
760                 final Header h = request.getFirstHeader("password");
761                 if (h != null && "secret".equals(h.getValue())) {
762                     return null;
763                 } else {
764                     return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
765                 }
766             }
767         }, Http1Config.DEFAULT);
768 
769         client.start();
770         final Future<IOSession> sessionFuture = client.requestSession(
771                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
772         final IOSession ioSession = sessionFuture.get();
773         try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
774 
775             final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
776             request1.addHeader("password", "secret");
777             final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
778                     new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
779                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
780             final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
781             Assertions.assertNotNull(result1);
782             final HttpResponse response1 = result1.getHead();
783             Assertions.assertNotNull(response1);
784             Assertions.assertEquals(200, response1.getCode());
785             Assertions.assertEquals("All is well", result1.getBody());
786 
787             Assertions.assertTrue(ioSession.isOpen());
788 
789             final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
790             final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
791                     new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
792                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
793             final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
794             Assertions.assertNotNull(result2);
795             final HttpResponse response2 = result2.getHead();
796             Assertions.assertNotNull(response2);
797             Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
798             Assertions.assertEquals("You shall not pass", result2.getBody());
799 
800             Assertions.assertTrue(ioSession.isOpen());
801 
802             final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
803             request3.addHeader("password", "secret");
804             final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
805                     new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
806                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
807             final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
808             Assertions.assertNotNull(result3);
809             final HttpResponse response3 = result3.getHead();
810             Assertions.assertNotNull(response3);
811             Assertions.assertEquals(200, response3.getCode());
812             Assertions.assertEquals("All is well", result3.getBody());
813 
814             Assertions.assertTrue(ioSession.isOpen());
815 
816             final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
817             final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
818                     new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
819                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
820             final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
821             Assertions.assertNotNull(result4);
822             final HttpResponse response4 = result4.getHead();
823             Assertions.assertNotNull(response4);
824             Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
825             Assertions.assertEquals("You shall not pass", result4.getBody());
826 
827             Assertions.assertFalse(ioSession.isOpen());
828         }
829     }
830 
831     @Test
832     public void testExpectationFailedCloseConnection() throws Exception {
833         final Http1TestServer server = resources.server();
834         final Http1TestClient client = resources.client();
835 
836         server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
837 
838             @Override
839             protected void handle(
840                     final Message<HttpRequest, String> request,
841                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
842                     final HttpContext context) throws IOException, HttpException {
843                 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
844 
845             }
846         });
847         final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
848 
849             @Override
850             protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
851                 final Header h = request.getFirstHeader("password");
852                 if (h != null && "secret".equals(h.getValue())) {
853                     return null;
854                 } else {
855                     final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
856                     response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
857                     return new BasicResponseProducer(response, "You shall not pass");
858                 }
859             }
860         }, Http1Config.DEFAULT);
861 
862         client.start();
863         final Future<IOSession> sessionFuture = client.requestSession(
864                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
865         final IOSession ioSession = sessionFuture.get();
866         try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
867 
868             final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
869             final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
870                     new BasicRequestProducer(request1, new MultiBinEntityProducer(
871                             new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
872                             100000,
873                             ContentType.TEXT_PLAIN)),
874                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
875             final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
876             Assertions.assertNotNull(result1);
877             final HttpResponse response1 = result1.getHead();
878             Assertions.assertNotNull(response1);
879             Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
880             Assertions.assertNotNull("You shall not pass", result1.getBody());
881 
882             Assertions.assertFalse(streamEndpoint.isOpen());
883         }
884     }
885 
886     @Test
887     public void testDelayedExpectationVerification() throws Exception {
888         final Http1TestServer server = resources.server();
889         final Http1TestClient client = resources.client();
890 
891         server.register("*", () -> new AsyncServerExchangeHandler() {
892 
893             private final Random random = new Random(System.currentTimeMillis());
894             private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
895                     "All is well");
896 
897             @Override
898             public void handleRequest(
899                     final HttpRequest request,
900                     final EntityDetails entityDetails,
901                     final ResponseChannel responseChannel,
902                     final HttpContext context) throws HttpException, IOException {
903 
904                 Executors.newSingleThreadExecutor().execute(() -> {
905                     try {
906                         if (entityDetails != null) {
907                             final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
908                             if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
909                                 Thread.sleep(random.nextInt(1000));
910                                 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
911                             }
912                             final HttpResponse response = new BasicHttpResponse(200);
913                             synchronized (entityProducer) {
914                                 responseChannel.sendResponse(response, entityProducer, context);
915                             }
916                         }
917                     } catch (final Exception ignore) {
918                         // ignore
919                     }
920                 });
921 
922             }
923 
924             @Override
925             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
926                 capacityChannel.update(Integer.MAX_VALUE);
927             }
928 
929             @Override
930             public void consume(final ByteBuffer src) throws IOException {
931             }
932 
933             @Override
934             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
935             }
936 
937             @Override
938             public int available() {
939                 synchronized (entityProducer) {
940                     return entityProducer.available();
941                 }
942             }
943 
944             @Override
945             public void produce(final DataStreamChannel channel) throws IOException {
946                 synchronized (entityProducer) {
947                     entityProducer.produce(channel);
948                 }
949             }
950 
951             @Override
952             public void failed(final Exception cause) {
953             }
954 
955             @Override
956             public void releaseResources() {
957             }
958 
959         });
960         final InetSocketAddress serverEndpoint = server.start();
961 
962         client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
963         final Future<ClientSessionEndpoint> connectFuture = client.connect(
964                 "localhost", serverEndpoint.getPort(), TIMEOUT);
965         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
966 
967         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
968         for (int i = 0; i < 5; i++) {
969             queue.add(streamEndpoint.execute(
970                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
971                             AsyncEntityProducers.create("Some important message")),
972                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
973         }
974         while (!queue.isEmpty()) {
975             final Future<Message<HttpResponse, String>> future = queue.remove();
976             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
977             Assertions.assertNotNull(result);
978             final HttpResponse response = result.getHead();
979             Assertions.assertNotNull(response);
980             Assertions.assertEquals(200, response.getCode());
981             Assertions.assertNotNull("All is well", result.getBody());
982         }
983     }
984 
985     @Test
986     public void testPrematureResponse() throws Exception {
987         final Http1TestServer server = resources.server();
988         final Http1TestClient client = resources.client();
989 
990         server.register("*", () -> new AsyncServerExchangeHandler() {
991 
992             private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
993 
994             @Override
995             public void handleRequest(
996                     final HttpRequest request,
997                     final EntityDetails entityDetails,
998                     final ResponseChannel responseChannel,
999                     final HttpContext context) throws HttpException, IOException {
1000                 final AsyncResponseProducer producer;
1001                 final Header h = request.getFirstHeader("password");
1002                 if (h != null && "secret".equals(h.getValue())) {
1003                     producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1004                 } else {
1005                     producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1006                 }
1007                 responseProducer.set(producer);
1008                 producer.sendResponse(responseChannel, context);
1009             }
1010 
1011             @Override
1012             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1013                 capacityChannel.update(Integer.MAX_VALUE);
1014             }
1015 
1016             @Override
1017             public void consume(final ByteBuffer src) throws IOException {
1018             }
1019 
1020             @Override
1021             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1022             }
1023 
1024             @Override
1025             public int available() {
1026                 final AsyncResponseProducer producer = responseProducer.get();
1027                 return producer.available();
1028             }
1029 
1030             @Override
1031             public void produce(final DataStreamChannel channel) throws IOException {
1032                 final AsyncResponseProducer producer = responseProducer.get();
1033                 producer.produce(channel);
1034             }
1035 
1036             @Override
1037             public void failed(final Exception cause) {
1038             }
1039 
1040             @Override
1041             public void releaseResources() {
1042             }
1043         });
1044         final InetSocketAddress serverEndpoint = server.start();
1045 
1046         client.start();
1047         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1048                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1049         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1050 
1051         for (int i = 0; i < 3; i++) {
1052             final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1053             final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1054                     new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1055                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1056             final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1057             Assertions.assertNotNull(result1);
1058             final HttpResponse response1 = result1.getHead();
1059             Assertions.assertNotNull(response1);
1060             Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1061             Assertions.assertNotNull("You shall not pass", result1.getBody());
1062 
1063             Assertions.assertTrue(streamEndpoint.isOpen());
1064         }
1065         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1066         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1067                 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1068                         new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1069                         100000,
1070                         ContentType.TEXT_PLAIN)),
1071                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1072         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1073         Assertions.assertNotNull(result1);
1074         final HttpResponse response1 = result1.getHead();
1075         Assertions.assertNotNull(response1);
1076         Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1077         Assertions.assertNotNull("You shall not pass", result1.getBody());
1078     }
1079 
1080     @Test
1081     public void testSlowResponseConsumer() throws Exception {
1082         final Http1TestServer server = resources.server();
1083         final Http1TestClient client = resources.client();
1084 
1085         server.register("/", () -> new MultiLineResponseHandler("0123456789abcd", 100));
1086         final InetSocketAddress serverEndpoint = server.start();
1087 
1088         client.start(Http1Config.custom().setBufferSize(256).build());
1089 
1090         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1091                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1092         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1093 
1094         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1095         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1096                 new BasicRequestProducer(request1, null),
1097                 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1098 
1099                     @Override
1100                     protected String consumeData(
1101                             final ContentType contentType, final InputStream inputStream) throws IOException {
1102                         final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1103 
1104                         final StringBuilder buffer = new StringBuilder();
1105                         try {
1106                             final byte[] tmp = new byte[16];
1107                             int l;
1108                             while ((l = inputStream.read(tmp)) != -1) {
1109                                 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1110                                 Thread.sleep(50);
1111                             }
1112                         } catch (final InterruptedException ex) {
1113                             Thread.currentThread().interrupt();
1114                             throw new InterruptedIOException(ex.getMessage());
1115                         }
1116                         return buffer.toString();
1117                     }
1118                 }),
1119                 null);
1120 
1121         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1122         Assertions.assertNotNull(result1);
1123         final HttpResponse response1 = result1.getHead();
1124         Assertions.assertNotNull(response1);
1125         Assertions.assertEquals(200, response1.getCode());
1126         final String s1 = result1.getBody();
1127         Assertions.assertNotNull(s1);
1128         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1129         while (t1.hasMoreTokens()) {
1130             Assertions.assertEquals("0123456789abcd", t1.nextToken());
1131         }
1132     }
1133 
1134     @Test
1135     public void testSlowRequestProducer() throws Exception {
1136         final Http1TestServer server = resources.server();
1137         final Http1TestClient client = resources.client();
1138 
1139         server.register("*", () -> new EchoHandler(2048));
1140         final InetSocketAddress serverEndpoint = server.start();
1141 
1142         client.start();
1143         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1144                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1145         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1146 
1147         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1148         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1149                 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1150 
1151                     @Override
1152                     protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1153                         final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1154                         try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1155                             for (int i = 0; i < 500; i++) {
1156                                 if (i % 100 == 0) {
1157                                     writer.flush();
1158                                     Thread.sleep(500);
1159                                 }
1160                                 writer.write("0123456789abcdef\r\n");
1161                             }
1162                         } catch (final InterruptedException ex) {
1163                             Thread.currentThread().interrupt();
1164                             throw new InterruptedIOException(ex.getMessage());
1165                         }
1166                     }
1167 
1168                 }),
1169                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1170         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1171         Assertions.assertNotNull(result1);
1172         final HttpResponse response1 = result1.getHead();
1173         Assertions.assertNotNull(response1);
1174         Assertions.assertEquals(200, response1.getCode());
1175         final String s1 = result1.getBody();
1176         Assertions.assertNotNull(s1);
1177         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1178         while (t1.hasMoreTokens()) {
1179             Assertions.assertEquals("0123456789abcdef", t1.nextToken());
1180         }
1181     }
1182 
1183     @Test
1184     public void testSlowResponseProducer() throws Exception {
1185         final Http1TestServer server = resources.server();
1186         final Http1TestClient client = resources.client();
1187 
1188         server.register("*", () -> new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1189 
1190             @Override
1191             protected void handle(
1192                     final HttpRequest request,
1193                     final InputStream requestStream,
1194                     final HttpResponse response,
1195                     final OutputStream responseStream,
1196                     final HttpContext context) throws IOException, HttpException {
1197 
1198                 if (!"/hello".equals(request.getPath())) {
1199                     response.setCode(HttpStatus.SC_NOT_FOUND);
1200                     return;
1201                 }
1202                 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1203                     response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1204                     return;
1205                 }
1206                 if (requestStream == null) {
1207                     return;
1208                 }
1209                 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1210                 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1211                 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1212                 response.setCode(HttpStatus.SC_OK);
1213                 response.setHeader(h1);
1214                 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1215                      final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1216                     try {
1217                         String l;
1218                         int count = 0;
1219                         while ((l = reader.readLine()) != null) {
1220                             writer.write(l);
1221                             writer.write("\r\n");
1222                             count++;
1223                             if (count % 500 == 0) {
1224                                 Thread.sleep(500);
1225                             }
1226                         }
1227                         writer.flush();
1228                     } catch (final InterruptedException ex) {
1229                         Thread.currentThread().interrupt();
1230                         throw new InterruptedIOException(ex.getMessage());
1231                     }
1232                 }
1233             }
1234         });
1235         final InetSocketAddress serverEndpoint = server.start();
1236 
1237         client.start(Http1Config.custom().setBufferSize(256).build());
1238 
1239         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1240                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1241         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1242 
1243         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1244         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1245                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1246                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1247         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1248         Assertions.assertNotNull(result1);
1249         final HttpResponse response1 = result1.getHead();
1250         Assertions.assertNotNull(response1);
1251         Assertions.assertEquals(200, response1.getCode());
1252         final String s1 = result1.getBody();
1253         Assertions.assertNotNull(s1);
1254         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1255         while (t1.hasMoreTokens()) {
1256             Assertions.assertEquals("0123456789abcd", t1.nextToken());
1257         }
1258     }
1259 
1260     @Test
1261     public void testPipelinedConnectionClose() throws Exception {
1262         final Http1TestServer server = resources.server();
1263         final Http1TestClient client = resources.client();
1264 
1265         server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1266         final InetSocketAddress serverEndpoint = server.start();
1267 
1268         client.start();
1269         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1270                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1271         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1272 
1273         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1274                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1275                         AsyncEntityProducers.create("Hi there")),
1276                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1277         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1278         request2.addHeader(HttpHeaders.CONNECTION, "close");
1279         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1280                 new BasicRequestProducer(request2,
1281                         AsyncEntityProducers.create("Hi there")),
1282                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1283         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1284                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1285                         AsyncEntityProducers.create("Hi there")),
1286                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1287 
1288         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1289         Assertions.assertNotNull(result1);
1290         final HttpResponse response1 = result1.getHead();
1291         final String entity1 = result1.getBody();
1292         Assertions.assertNotNull(response1);
1293         Assertions.assertEquals(200, response1.getCode());
1294         Assertions.assertEquals("Hi back", entity1);
1295 
1296         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1297         Assertions.assertNotNull(result2);
1298         final HttpResponse response2 = result2.getHead();
1299         final String entity2 = result2.getBody();
1300         Assertions.assertNotNull(response2);
1301         Assertions.assertEquals(200, response2.getCode());
1302         Assertions.assertEquals("Hi back", entity2);
1303 
1304         final Exception exception = Assertions.assertThrows(Exception.class, () ->
1305                 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1306         assertThat(exception, CoreMatchers.anyOf(
1307                 CoreMatchers.instanceOf(CancellationException.class),
1308                 CoreMatchers.instanceOf(ExecutionException.class)));
1309 
1310         final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1311                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1312                         AsyncEntityProducers.create("Hi there")),
1313                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1314         final Exception exception2 = Assertions.assertThrows(Exception.class, () ->
1315                 future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1316         assertThat(exception2, CoreMatchers.anyOf(
1317                 CoreMatchers.instanceOf(CancellationException.class),
1318                 CoreMatchers.instanceOf(ExecutionException.class)));
1319     }
1320 
1321     @Test
1322     public void testPipelinedInvalidRequest() throws Exception {
1323         final Http1TestServer server = resources.server();
1324         final Http1TestClient client = resources.client();
1325 
1326         server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1327         final InetSocketAddress serverEndpoint = server.start();
1328 
1329         client.start();
1330         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1331                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1332         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1333 
1334         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1335                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1336                         AsyncEntityProducers.create("Hi there")),
1337                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1338         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1339         request2.addHeader(HttpHeaders.HOST, "blah:blah");
1340         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1341                 new BasicRequestProducer(request2,
1342                         AsyncEntityProducers.create("Hi there")),
1343                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1344         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1345                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1346                         AsyncEntityProducers.create("Hi there")),
1347                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1348 
1349         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1350         Assertions.assertNotNull(result1);
1351         final HttpResponse response1 = result1.getHead();
1352         final String entity1 = result1.getBody();
1353         Assertions.assertNotNull(response1);
1354         Assertions.assertEquals(200, response1.getCode());
1355         Assertions.assertEquals("Hi back", entity1);
1356 
1357         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1358         Assertions.assertNotNull(result2);
1359         final HttpResponse response2 = result2.getHead();
1360         final String entity2 = result2.getBody();
1361         Assertions.assertNotNull(response2);
1362         Assertions.assertEquals(400, response2.getCode());
1363         Assertions.assertTrue(entity2.length() > 0);
1364 
1365 
1366         final Exception exception = Assertions.assertThrows(Exception.class, () ->
1367                 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1368         assertThat(exception, CoreMatchers.anyOf(
1369                 CoreMatchers.instanceOf(CancellationException.class),
1370                 CoreMatchers.instanceOf(ExecutionException.class)));
1371     }
1372 
1373     private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1374 
1375     private static class BrokenChunkEncoder extends AbstractContentEncoder {
1376 
1377         private final CharArrayBuffer lineBuffer;
1378         private boolean done;
1379 
1380         BrokenChunkEncoder(
1381                 final WritableByteChannel channel,
1382                 final SessionOutputBuffer buffer,
1383                 final BasicHttpTransportMetrics metrics) {
1384             super(channel, buffer, metrics);
1385             lineBuffer = new CharArrayBuffer(16);
1386         }
1387 
1388         @Override
1389         public void complete(final List<? extends Header> trailers) throws IOException {
1390             super.complete(trailers);
1391         }
1392 
1393         @Override
1394         public int write(final ByteBuffer src) throws IOException {
1395             final int chunk;
1396             if (!done) {
1397                 lineBuffer.clear();
1398                 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1399                 buffer().writeLine(lineBuffer);
1400                 buffer().write(ByteBuffer.wrap(GARBAGE));
1401                 done = true;
1402                 chunk = GARBAGE.length;
1403             } else {
1404                 chunk = 0;
1405             }
1406             final long bytesWritten = buffer().flush(channel());
1407             if (bytesWritten > 0) {
1408                 metrics().incrementBytesTransferred(bytesWritten);
1409             }
1410             if (!buffer().hasData()) {
1411                 channel().close();
1412             }
1413             return chunk;
1414         }
1415 
1416     }
1417 
1418     @Test
1419     public void testTruncatedChunk() throws Exception {
1420         final Http1TestServer server = resources.server();
1421         final Http1TestClient client = resources.client();
1422 
1423         final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1424                 HttpProcessors.server(),
1425                 (request, context) -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1426 
1427                     @Override
1428                     protected void handle(
1429                             final Message<HttpRequest, String> request,
1430                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1431                             final HttpContext context) throws IOException, HttpException {
1432                         responseTrigger.submitResponse(
1433                                 new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1434                     }
1435 
1436                 },
1437                 Http1Config.DEFAULT,
1438                 CharCodingConfig.DEFAULT,
1439                 DefaultConnectionReuseStrategy.INSTANCE,
1440                 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1441 
1442             @Override
1443             protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1444                     final ProtocolIOSession ioSession,
1445                     final HttpProcessor httpProcessor,
1446                     final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1447                     final Http1Config http1Config,
1448                     final CharCodingConfig connectionConfig,
1449                     final ConnectionReuseStrategy connectionReuseStrategy,
1450                     final NHttpMessageParser<HttpRequest> incomingMessageParser,
1451                     final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1452                     final ContentLengthStrategy incomingContentStrategy,
1453                     final ContentLengthStrategy outgoingContentStrategy,
1454                     final Http1StreamListener streamListener) {
1455                 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1456                         scheme.id,
1457                         http1Config, connectionConfig, connectionReuseStrategy,
1458                         incomingMessageParser, outgoingMessageWriter,
1459                         incomingContentStrategy, outgoingContentStrategy,
1460                         streamListener) {
1461 
1462                     @Override
1463                     protected ContentEncoder createContentEncoder(
1464                             final long len,
1465                             final WritableByteChannel channel,
1466                             final SessionOutputBuffer buffer,
1467                             final BasicHttpTransportMetrics metrics) throws HttpException {
1468                         if (len == ContentLengthStrategy.CHUNKED) {
1469                             return new BrokenChunkEncoder(channel, buffer, metrics);
1470                         } else {
1471                             return super.createContentEncoder(len, channel, buffer, metrics);
1472                         }
1473                     }
1474 
1475                 };
1476             }
1477 
1478         });
1479 
1480         client.start();
1481         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1482                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1483         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1484 
1485         final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1486         final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1487 
1488             @Override
1489             public void releaseResources() {
1490                 // Do not clear internal content buffer
1491             }
1492 
1493         };
1494         final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1495         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1496         final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
1497                 future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1498         final Throwable cause = exception.getCause();
1499         Assertions.assertTrue(cause instanceof MalformedChunkCodingException);
1500         Assertions.assertEquals("garbage", entityConsumer.generateContent());
1501     }
1502 
1503     @Test
1504     public void testExceptionInHandler() throws Exception {
1505         final Http1TestServer server = resources.server();
1506         final Http1TestClient client = resources.client();
1507 
1508         server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1509 
1510             @Override
1511             protected void handle(
1512                     final Message<HttpRequest, String> request,
1513                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1514                     final HttpContext context) throws IOException, HttpException {
1515                 throw new HttpException("Boom");
1516             }
1517         });
1518         final InetSocketAddress serverEndpoint = server.start();
1519 
1520         client.start();
1521         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1522                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1523         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1524 
1525         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1526                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1527                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1528         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1529         Assertions.assertNotNull(result);
1530         final HttpResponse response1 = result.getHead();
1531         final String entity1 = result.getBody();
1532         Assertions.assertNotNull(response1);
1533         Assertions.assertEquals(500, response1.getCode());
1534         Assertions.assertEquals("Boom", entity1);
1535     }
1536 
1537     @Test
1538     public void testNoServiceHandler() throws Exception {
1539         final Http1TestServer server = resources.server();
1540         final Http1TestClient client = resources.client();
1541 
1542         final InetSocketAddress serverEndpoint = server.start();
1543 
1544         client.start();
1545         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1546                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1547         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1548 
1549         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1550                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1551                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1552         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1553         Assertions.assertNotNull(result);
1554         final HttpResponse response1 = result.getHead();
1555         final String entity1 = result.getBody();
1556         Assertions.assertNotNull(response1);
1557         Assertions.assertEquals(404, response1.getCode());
1558         Assertions.assertEquals("Resource not found", entity1);
1559     }
1560 
1561     @Test
1562     public void testResponseNoContent() throws Exception {
1563         final Http1TestServer server = resources.server();
1564         final Http1TestClient client = resources.client();
1565 
1566         server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1567 
1568             @Override
1569             protected void handle(
1570                     final Message<HttpRequest, String> request,
1571                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1572                     final HttpContext context) throws IOException, HttpException {
1573                 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1574                 responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1575             }
1576         });
1577         final InetSocketAddress serverEndpoint = server.start();
1578 
1579         client.start();
1580         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1581                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1582         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1583 
1584         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1585                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1586                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1587         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1588         Assertions.assertNotNull(result);
1589         final HttpResponse response1 = result.getHead();
1590         Assertions.assertNotNull(response1);
1591         Assertions.assertEquals(204, response1.getCode());
1592         Assertions.assertNull(result.getBody());
1593     }
1594 
1595     @Test
1596     public void testAbsentHostHeader() throws Exception {
1597         final Http1TestServer server = resources.server();
1598         final Http1TestClient client = resources.client();
1599 
1600         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1601         final InetSocketAddress serverEndpoint = server.start();
1602 
1603         client.start(new DefaultHttpProcessor(RequestContent.INSTANCE, RequestConnControl.INSTANCE), Http1Config.DEFAULT);
1604 
1605         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1606                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1607         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1608 
1609         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1610         request1.setVersion(HttpVersion.HTTP_1_0);
1611         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1612                 new BasicRequestProducer(request1, null),
1613                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1614         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1615         Assertions.assertNotNull(result1);
1616         final HttpResponse response1 = result1.getHead();
1617         Assertions.assertNotNull(response1);
1618         Assertions.assertEquals(200, response1.getCode());
1619         Assertions.assertEquals("Hi there", result1.getBody());
1620 
1621         final HttpRequest request2 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1622         request2.setVersion(HttpVersion.HTTP_1_1);
1623         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1624                 new BasicRequestProducer(request2, null),
1625                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1626         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1627         Assertions.assertNotNull(result2);
1628         final HttpResponse response2 = result2.getHead();
1629         Assertions.assertNotNull(response2);
1630         Assertions.assertEquals(400, response2.getCode());
1631         Assertions.assertEquals("Host header is absent", result2.getBody());
1632     }
1633 
1634     @Test
1635     public void testMessageWithTrailers() throws Exception {
1636         final Http1TestServer server = resources.server();
1637         final Http1TestClient client = resources.client();
1638 
1639         server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1640 
1641             @Override
1642             protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1643                     final HttpRequest request,
1644                     final EntityDetails entityDetails,
1645                     final HttpContext context) throws HttpException {
1646                 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1647             }
1648 
1649             @Override
1650             protected void handle(
1651                     final Message<HttpRequest, String> requestMessage,
1652                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1653                     final HttpContext context) throws HttpException, IOException {
1654                 responseTrigger.submitResponse(new BasicResponseProducer(
1655                         HttpStatus.SC_OK,
1656                         new DigestingEntityProducer("MD5",
1657                                 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1658             }
1659         });
1660         final InetSocketAddress serverEndpoint = server.start();
1661 
1662         client.start();
1663 
1664         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1665                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1666         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1667 
1668         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1669         final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1670         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1671                 new BasicRequestProducer(request1, null),
1672                 new BasicResponseConsumer<>(entityConsumer), null);
1673         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1674         Assertions.assertNotNull(result1);
1675         final HttpResponse response1 = result1.getHead();
1676         Assertions.assertNotNull(response1);
1677         Assertions.assertEquals(200, response1.getCode());
1678         Assertions.assertEquals("Hello back with some trailers", result1.getBody());
1679 
1680         final List<Header> trailers = entityConsumer.getTrailers();
1681         Assertions.assertNotNull(trailers);
1682         Assertions.assertEquals(2, trailers.size());
1683         final Map<String, String> map = new HashMap<>();
1684         for (final Header header: trailers) {
1685             map.put(TextUtils.toLowerCase(header.getName()), header.getValue());
1686         }
1687         final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1688         Assertions.assertEquals("MD5", map.get("digest-algo"));
1689         Assertions.assertEquals(digest, map.get("digest"));
1690     }
1691 
1692     @Test
1693     public void testProtocolException() throws Exception {
1694         final Http1TestServer server = resources.server();
1695         final Http1TestClient client = resources.client();
1696 
1697         server.register("/boom", () -> new AsyncServerExchangeHandler() {
1698 
1699             private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1700 
1701             @Override
1702             public void releaseResources() {
1703                 entityProducer.releaseResources();
1704             }
1705 
1706             @Override
1707             public void handleRequest(
1708                     final HttpRequest request,
1709                     final EntityDetails entityDetails,
1710                     final ResponseChannel responseChannel,
1711                     final HttpContext context) throws HttpException, IOException {
1712                 final String requestUri = request.getRequestUri();
1713                 if (requestUri.endsWith("boom")) {
1714                     throw new ProtocolException("Boom!!!");
1715                 }
1716                 responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1717             }
1718 
1719             @Override
1720             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1721                 capacityChannel.update(Integer.MAX_VALUE);
1722             }
1723 
1724             @Override
1725             public void consume(final ByteBuffer src) throws IOException {
1726             }
1727 
1728             @Override
1729             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1730                 // empty
1731             }
1732 
1733             @Override
1734             public int available() {
1735                 return entityProducer.available();
1736             }
1737 
1738             @Override
1739             public void produce(final DataStreamChannel channel) throws IOException {
1740                 entityProducer.produce(channel);
1741             }
1742 
1743             @Override
1744             public void failed(final Exception cause) {
1745                 releaseResources();
1746             }
1747 
1748         });
1749 
1750         final InetSocketAddress serverEndpoint = server.start();
1751 
1752         client.start();
1753         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1754                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1755         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1756         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1757                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1758                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1759         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1760         Assertions.assertNotNull(result);
1761         final HttpResponse response1 = result.getHead();
1762         final String entity1 = result.getBody();
1763         Assertions.assertNotNull(response1);
1764         Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1765         Assertions.assertEquals("Boom!!!", entity1);
1766     }
1767 
1768     @Test
1769     public void testHeaderTooLarge() throws Exception {
1770         final Http1TestServer server = resources.server();
1771         final Http1TestClient client = resources.client();
1772 
1773         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1774         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1775                 .setMaxLineLength(100)
1776                 .build());
1777         client.start();
1778 
1779         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1780                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1781         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1782 
1783         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1784         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1785                 "1234567890123456789012345678901234567890");
1786         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1787                 new BasicRequestProducer(request1, null),
1788                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1789         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1790         Assertions.assertNotNull(result1);
1791         final HttpResponse response1 = result1.getHead();
1792         Assertions.assertNotNull(response1);
1793         Assertions.assertEquals(431, response1.getCode());
1794         Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1795     }
1796 
1797     @Test
1798     public void testHeaderTooLargePost() throws Exception {
1799         final Http1TestServer server = resources.server();
1800         final Http1TestClient client = resources.client();
1801 
1802         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1803         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1804                 .setMaxLineLength(100)
1805                 .build());
1806         client.start(
1807                 new DefaultHttpProcessor(RequestContent.INSTANCE, RequestTargetHost.INSTANCE, RequestConnControl.INSTANCE), null);
1808 
1809         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1810                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1811         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1812 
1813         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1814         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1815                 "1234567890123456789012345678901234567890");
1816 
1817         final byte[] b = new byte[2048];
1818         for (int i = 0; i < b.length; i++) {
1819             b[i] = (byte) ('a' + i % 10);
1820         }
1821 
1822         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1823                 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1824                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1825         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1826         Assertions.assertNotNull(result1);
1827         final HttpResponse response1 = result1.getHead();
1828         Assertions.assertNotNull(response1);
1829         Assertions.assertEquals(431, response1.getCode());
1830         Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1831     }
1832 
1833 }