View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import static org.hamcrest.MatcherAssert.assertThat;
31  
32  import java.io.BufferedReader;
33  import java.io.BufferedWriter;
34  import java.io.IOException;
35  import java.io.InputStream;
36  import java.io.InputStreamReader;
37  import java.io.InterruptedIOException;
38  import java.io.OutputStream;
39  import java.io.OutputStreamWriter;
40  import java.net.InetSocketAddress;
41  import java.net.URI;
42  import java.net.URISyntaxException;
43  import java.nio.ByteBuffer;
44  import java.nio.channels.WritableByteChannel;
45  import java.nio.charset.Charset;
46  import java.nio.charset.StandardCharsets;
47  import java.util.HashMap;
48  import java.util.LinkedList;
49  import java.util.List;
50  import java.util.Map;
51  import java.util.Queue;
52  import java.util.Random;
53  import java.util.StringTokenizer;
54  import java.util.concurrent.CancellationException;
55  import java.util.concurrent.ExecutionException;
56  import java.util.concurrent.Executors;
57  import java.util.concurrent.Future;
58  import java.util.concurrent.atomic.AtomicReference;
59  import java.util.concurrent.locks.ReentrantLock;
60  
61  import org.apache.hc.core5.http.ConnectionReuseStrategy;
62  import org.apache.hc.core5.http.ContentLengthStrategy;
63  import org.apache.hc.core5.http.ContentType;
64  import org.apache.hc.core5.http.EntityDetails;
65  import org.apache.hc.core5.http.Header;
66  import org.apache.hc.core5.http.HeaderElements;
67  import org.apache.hc.core5.http.HttpException;
68  import org.apache.hc.core5.http.HttpHeaders;
69  import org.apache.hc.core5.http.HttpHost;
70  import org.apache.hc.core5.http.HttpRequest;
71  import org.apache.hc.core5.http.HttpResponse;
72  import org.apache.hc.core5.http.HttpStatus;
73  import org.apache.hc.core5.http.HttpVersion;
74  import org.apache.hc.core5.http.MalformedChunkCodingException;
75  import org.apache.hc.core5.http.Message;
76  import org.apache.hc.core5.http.Method;
77  import org.apache.hc.core5.http.ProtocolException;
78  import org.apache.hc.core5.http.URIScheme;
79  import org.apache.hc.core5.http.config.CharCodingConfig;
80  import org.apache.hc.core5.http.config.Http1Config;
81  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
82  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
83  import org.apache.hc.core5.http.impl.Http1StreamListener;
84  import org.apache.hc.core5.http.impl.HttpProcessors;
85  import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
86  import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
87  import org.apache.hc.core5.http.message.BasicHttpRequest;
88  import org.apache.hc.core5.http.message.BasicHttpResponse;
89  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
90  import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
91  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
92  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
93  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
94  import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
95  import org.apache.hc.core5.http.nio.CapacityChannel;
96  import org.apache.hc.core5.http.nio.ContentEncoder;
97  import org.apache.hc.core5.http.nio.DataStreamChannel;
98  import org.apache.hc.core5.http.nio.HandlerFactory;
99  import org.apache.hc.core5.http.nio.NHttpMessageParser;
100 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
101 import org.apache.hc.core5.http.nio.ResponseChannel;
102 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
103 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
104 import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
105 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
106 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
107 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
108 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
109 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
110 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
111 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
112 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
113 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
114 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
115 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
116 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
117 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
118 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
119 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
120 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
121 import org.apache.hc.core5.http.protocol.HttpContext;
122 import org.apache.hc.core5.http.protocol.HttpProcessor;
123 import org.apache.hc.core5.http.protocol.RequestConnControl;
124 import org.apache.hc.core5.http.protocol.RequestContent;
125 import org.apache.hc.core5.http.protocol.RequestTargetHost;
126 import org.apache.hc.core5.http.protocol.RequestValidateHost;
127 import org.apache.hc.core5.reactor.IOSession;
128 import org.apache.hc.core5.reactor.ProtocolIOSession;
129 import org.apache.hc.core5.testing.SSLTestContexts;
130 import org.apache.hc.core5.testing.nio.extension.Http1TestResources;
131 import org.apache.hc.core5.util.CharArrayBuffer;
132 import org.apache.hc.core5.util.TextUtils;
133 import org.apache.hc.core5.util.Timeout;
134 import org.hamcrest.CoreMatchers;
135 import org.junit.jupiter.api.Assertions;
136 import org.junit.jupiter.api.Test;
137 import org.junit.jupiter.api.extension.RegisterExtension;
138 
139 public abstract class Http1IntegrationTest {
140 
141     private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
142     private static final Timeout LONG_TIMEOUT = Timeout.ofMinutes(2);
143 
144     private final URIScheme scheme;
145 
146     private final ReentrantLock lock = new ReentrantLock();
147 
148     @RegisterExtension
149     private final Http1TestResources resources;
150 
151     public Http1IntegrationTest(final URIScheme scheme) {
152         this.scheme = scheme;
153         this.resources = new Http1TestResources(scheme, TIMEOUT);
154     }
155 
156     private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
157         try {
158             return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
159         } catch (final URISyntaxException e) {
160             throw new IllegalStateException();
161         }
162     }
163 
164     @Test
165     public void testSimpleGet() throws Exception {
166         final Http1TestServer server = resources.server();
167         final Http1TestClient client = resources.client();
168 
169         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
170         final InetSocketAddress serverEndpoint = server.start();
171 
172         client.start();
173         final Future<ClientSessionEndpoint> connectFuture = client.connect(
174                 "localhost", serverEndpoint.getPort(), TIMEOUT);
175         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
176 
177         for (int i = 0; i < 5; i++) {
178             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
179                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
180                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
181             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
182             Assertions.assertNotNull(result);
183             final HttpResponse response1 = result.getHead();
184             final String entity1 = result.getBody();
185             Assertions.assertNotNull(response1);
186             Assertions.assertEquals(200, response1.getCode());
187             Assertions.assertEquals("Hi there", entity1);
188         }
189     }
190 
191     @Test
192     public void testSimpleGetConnectionClose() throws Exception {
193         final Http1TestServer server = resources.server();
194         final Http1TestClient client = resources.client();
195 
196         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
197         final InetSocketAddress serverEndpoint = server.start();
198 
199         client.start();
200         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
201         for (int i = 0; i < 5; i++) {
202             final Future<ClientSessionEndpoint> connectFuture = client.connect(
203                     "localhost", serverEndpoint.getPort(), TIMEOUT);
204             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
205                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
206                         AsyncRequestBuilder.get(requestURI)
207                                 .addHeader(HttpHeaders.CONNECTION, "close")
208                                 .build(),
209                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
210                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
211                 Assertions.assertNotNull(result);
212                 final HttpResponse response1 = result.getHead();
213                 final String entity1 = result.getBody();
214                 Assertions.assertNotNull(response1);
215                 Assertions.assertEquals(200, response1.getCode());
216                 Assertions.assertEquals("Hi there", entity1);
217             }
218         }
219     }
220 
221     @Test
222     public void testSimpleGetIdentityTransfer() throws Exception {
223         final Http1TestServer server = resources.server();
224         final Http1TestClient client = resources.client();
225 
226         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
227         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
228         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
229 
230         client.start();
231 
232         final int reqNo = 5;
233 
234         for (int i = 0; i < reqNo; i++) {
235             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
236             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
237 
238             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
239                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
240                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
241             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
242 
243             streamEndpoint.close();
244 
245             Assertions.assertNotNull(result);
246             final HttpResponse response = result.getHead();
247             final String entity = result.getBody();
248             Assertions.assertNotNull(response);
249             Assertions.assertEquals(200, response.getCode());
250             Assertions.assertEquals("Hi there", entity);
251         }
252 
253     }
254 
255     @Test
256     public void testPostIdentityTransfer() throws Exception {
257         final Http1TestServer server = resources.server();
258         final Http1TestClient client = resources.client();
259 
260         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
261         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
262         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
263 
264         client.start();
265 
266         final int reqNo = 5;
267 
268         for (int i = 0; i < reqNo; i++) {
269             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
270             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
271 
272             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
273                     new BasicRequestProducer(Method.POST,
274                             createRequestURI(serverEndpoint, "/hello"),
275                             new MultiLineEntityProducer("Hello", 16 * i)),
276                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
277             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
278 
279             streamEndpoint.close();
280 
281             Assertions.assertNotNull(result);
282             final HttpResponse response = result.getHead();
283             final String entity = result.getBody();
284             Assertions.assertNotNull(response);
285             Assertions.assertEquals(200, response.getCode());
286             Assertions.assertEquals("Hi there", entity);
287         }
288     }
289 
290     @Test
291     public void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
292         final Http1TestServer server = resources.server();
293         final Http1TestClient client = resources.client();
294 
295         server.register("/hello", () -> new ImmediateResponseExchangeHandler(500, "Go away"));
296         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
297         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
298 
299         client.start();
300 
301         final int reqNo = 5;
302 
303         for (int i = 0; i < reqNo; i++) {
304             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
305             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
306 
307             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
308                     new BasicRequestProducer(Method.POST,
309                             createRequestURI(serverEndpoint, "/hello"),
310                             new MultiLineEntityProducer("Hello", 16 * i)),
311                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
312             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
313 
314             streamEndpoint.close();
315 
316             Assertions.assertNotNull(result);
317             final HttpResponse response = result.getHead();
318             final String entity = result.getBody();
319             Assertions.assertNotNull(response);
320             Assertions.assertEquals(500, response.getCode());
321             Assertions.assertEquals("Go away", entity);
322         }
323     }
324 
325     @Test
326     public void testSimpleGetsPipelined() throws Exception {
327         final Http1TestServer server = resources.server();
328         final Http1TestClient client = resources.client();
329 
330         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
331         final InetSocketAddress serverEndpoint = server.start();
332 
333         client.start();
334         final Future<ClientSessionEndpoint> connectFuture = client.connect(
335                 "localhost", serverEndpoint.getPort(), TIMEOUT);
336         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
337 
338         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
339         for (int i = 0; i < 5; i++) {
340             queue.add(streamEndpoint.execute(
341                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
342                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
343         }
344         while (!queue.isEmpty()) {
345             final Future<Message<HttpResponse, String>> future = queue.remove();
346             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
347             Assertions.assertNotNull(result);
348             final HttpResponse response = result.getHead();
349             final String entity = result.getBody();
350             Assertions.assertNotNull(response);
351             Assertions.assertEquals(200, response.getCode());
352             Assertions.assertEquals("Hi there", entity);
353         }
354     }
355 
356     @Test
357     public void testLargeGet() throws Exception {
358         final Http1TestServer server = resources.server();
359         final Http1TestClient client = resources.client();
360 
361         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 5000));
362         final InetSocketAddress serverEndpoint = server.start();
363 
364         client.start();
365         final Future<ClientSessionEndpoint> connectFuture = client.connect(
366                 "localhost", serverEndpoint.getPort(), TIMEOUT);
367         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
368 
369         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
370                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
371                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
372 
373         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
374         Assertions.assertNotNull(result1);
375         final HttpResponse response1 = result1.getHead();
376         Assertions.assertNotNull(response1);
377         Assertions.assertEquals(200, response1.getCode());
378         final String s1 = result1.getBody();
379         Assertions.assertNotNull(s1);
380         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
381         while (t1.hasMoreTokens()) {
382             Assertions.assertEquals("0123456789abcdef", t1.nextToken());
383         }
384 
385         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
386                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
387                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
388 
389         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
390         Assertions.assertNotNull(result2);
391         final HttpResponse response2 = result2.getHead();
392         Assertions.assertNotNull(response2);
393         Assertions.assertEquals(200, response2.getCode());
394         final String s2 = result2.getBody();
395         Assertions.assertNotNull(s2);
396         final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
397         while (t2.hasMoreTokens()) {
398             Assertions.assertEquals("0123456789abcdef", t2.nextToken());
399         }
400     }
401 
402     @Test
403     public void testLargeGetsPipelined() throws Exception {
404         final Http1TestServer server = resources.server();
405         final Http1TestClient client = resources.client();
406 
407         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
408         final InetSocketAddress serverEndpoint = server.start();
409 
410         client.start();
411         final Future<ClientSessionEndpoint> connectFuture = client.connect(
412                 "localhost", serverEndpoint.getPort(), TIMEOUT);
413         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
414 
415         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
416         for (int i = 0; i < 5; i++) {
417             queue.add(streamEndpoint.execute(
418                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
419                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
420         }
421         while (!queue.isEmpty()) {
422             final Future<Message<HttpResponse, String>> future = queue.remove();
423             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
424             Assertions.assertNotNull(result);
425             final HttpResponse response = result.getHead();
426             Assertions.assertNotNull(response);
427             Assertions.assertEquals(200, response.getCode());
428             final String entity = result.getBody();
429             Assertions.assertNotNull(entity);
430             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
431             while (t.hasMoreTokens()) {
432                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
433             }
434         }
435     }
436 
437     @Test
438     public void testBasicPost() throws Exception {
439         final Http1TestServer server = resources.server();
440         final Http1TestClient client = resources.client();
441 
442         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
443         final InetSocketAddress serverEndpoint = server.start();
444 
445         client.start();
446         final Future<ClientSessionEndpoint> connectFuture = client.connect(
447                 "localhost", serverEndpoint.getPort(), TIMEOUT);
448         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
449 
450         for (int i = 0; i < 5; i++) {
451             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
452                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
453                             AsyncEntityProducers.create("Hi there")),
454                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
455             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
456             Assertions.assertNotNull(result);
457             final HttpResponse response1 = result.getHead();
458             final String entity1 = result.getBody();
459             Assertions.assertNotNull(response1);
460             Assertions.assertEquals(200, response1.getCode());
461             Assertions.assertEquals("Hi back", entity1);
462         }
463     }
464 
465     @Test
466     public void testBasicPostPipelined() throws Exception {
467         final Http1TestServer server = resources.server();
468         final Http1TestClient client = resources.client();
469 
470         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
471         final InetSocketAddress serverEndpoint = server.start();
472 
473         client.start();
474         final Future<ClientSessionEndpoint> connectFuture = client.connect(
475                 "localhost", serverEndpoint.getPort(), TIMEOUT);
476         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
477 
478         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
479         for (int i = 0; i < 5; i++) {
480             queue.add(streamEndpoint.execute(
481                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
482                             AsyncEntityProducers.create("Hi there")),
483                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
484         }
485         while (!queue.isEmpty()) {
486             final Future<Message<HttpResponse, String>> future = queue.remove();
487             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
488             Assertions.assertNotNull(result);
489             final HttpResponse response = result.getHead();
490             final String entity = result.getBody();
491             Assertions.assertNotNull(response);
492             Assertions.assertEquals(200, response.getCode());
493             Assertions.assertEquals("Hi back", entity);
494         }
495     }
496 
497     @Test
498     public void testHttp10Post() throws Exception {
499         final Http1TestServer server = resources.server();
500         final Http1TestClient client = resources.client();
501 
502         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
503         final InetSocketAddress serverEndpoint = server.start();
504 
505         client.start();
506         final Future<ClientSessionEndpoint> connectFuture = client.connect(
507                 "localhost", serverEndpoint.getPort(), TIMEOUT);
508         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
509 
510         for (int i = 0; i < 5; i++) {
511             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
512             request.setVersion(HttpVersion.HTTP_1_0);
513             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
514                     new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
515                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
516             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
517             Assertions.assertNotNull(result);
518             final HttpResponse response1 = result.getHead();
519             final String entity1 = result.getBody();
520             Assertions.assertNotNull(response1);
521             Assertions.assertEquals(200, response1.getCode());
522             Assertions.assertEquals("Hi back", entity1);
523         }
524     }
525 
526     @Test
527     public void testHTTP11FeaturesDisabledWithHTTP10Requests() throws Exception {
528         final Http1TestServer server = resources.server();
529         final Http1TestClient client = resources.client();
530 
531         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
532         final InetSocketAddress serverEndpoint = server.start();
533 
534         client.start();
535         final Future<ClientSessionEndpoint> connectFuture = client.connect(
536                 "localhost", serverEndpoint.getPort(), TIMEOUT);
537         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
538 
539         final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
540         request.setVersion(HttpVersion.HTTP_1_0);
541         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
542                 new BasicRequestProducer(request, new BasicAsyncEntityProducer(new byte[] {'a', 'b', 'c'}, null, true)),
543                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
544         final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, future::get);
545         Assertions.assertInstanceOf(ProtocolException.class, exception.getCause());
546     }
547 
548     @Test
549     public void testNoEntityPost() throws Exception {
550         final Http1TestServer server = resources.server();
551         final Http1TestClient client = resources.client();
552 
553         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
554         final InetSocketAddress serverEndpoint = server.start();
555 
556         client.start();
557         final Future<ClientSessionEndpoint> connectFuture = client.connect(
558                 "localhost", serverEndpoint.getPort(), TIMEOUT);
559         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
560 
561         for (int i = 0; i < 5; i++) {
562             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
563             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
564                     new BasicRequestProducer(request, null),
565                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
566             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
567             Assertions.assertNotNull(result);
568             final HttpResponse response1 = result.getHead();
569             final String entity1 = result.getBody();
570             Assertions.assertNotNull(response1);
571             Assertions.assertEquals(200, response1.getCode());
572             Assertions.assertEquals("Hi back", entity1);
573         }
574     }
575 
576     @Test
577     public void testLargePost() throws Exception {
578         final Http1TestServer server = resources.server();
579         final Http1TestClient client = resources.client();
580 
581         server.register("*", () -> new EchoHandler(2048));
582         final InetSocketAddress serverEndpoint = server.start();
583 
584         client.start();
585         final Future<ClientSessionEndpoint> connectFuture = client.connect(
586                 "localhost", serverEndpoint.getPort(), TIMEOUT);
587         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
588 
589         for (int i = 0; i < 5; i++) {
590             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
591                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
592                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
593                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
594             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
595             Assertions.assertNotNull(result);
596             final HttpResponse response = result.getHead();
597             Assertions.assertNotNull(response);
598             Assertions.assertEquals(200, response.getCode());
599             final String entity = result.getBody();
600             Assertions.assertNotNull(entity);
601             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
602             while (t.hasMoreTokens()) {
603                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
604             }
605         }
606     }
607 
608     @Test
609     public void testPostsPipelinedLargeResponse() throws Exception {
610         final Http1TestServer server = resources.server();
611         final Http1TestClient client = resources.client();
612 
613         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
614         final InetSocketAddress serverEndpoint = server.start();
615 
616         client.start();
617         final Future<ClientSessionEndpoint> connectFuture = client.connect(
618                 "localhost", serverEndpoint.getPort(), TIMEOUT);
619         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
620 
621         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
622         for (int i = 0; i < 2; i++) {
623             queue.add(streamEndpoint.execute(
624                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
625                             AsyncEntityProducers.create("Hi there")),
626                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
627         }
628         while (!queue.isEmpty()) {
629             final Future<Message<HttpResponse, String>> future = queue.remove();
630             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
631             Assertions.assertNotNull(result);
632             final HttpResponse response = result.getHead();
633             Assertions.assertNotNull(response);
634             Assertions.assertEquals(200, response.getCode());
635             final String entity = result.getBody();
636             Assertions.assertNotNull(entity);
637             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
638             while (t.hasMoreTokens()) {
639                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
640             }
641         }
642     }
643 
644 
645     @Test
646     public void testLargePostsPipelined() throws Exception {
647         final Http1TestServer server = resources.server();
648         final Http1TestClient client = resources.client();
649 
650         server.register("*", () -> new EchoHandler(2048));
651         final InetSocketAddress serverEndpoint = server.start();
652 
653         client.start();
654         final Future<ClientSessionEndpoint> connectFuture = client.connect(
655                 "localhost", serverEndpoint.getPort(), TIMEOUT);
656         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
657 
658         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
659         for (int i = 0; i < 5; i++) {
660             queue.add(streamEndpoint.execute(
661                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
662                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
663                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
664         }
665         while (!queue.isEmpty()) {
666             final Future<Message<HttpResponse, String>> future = queue.remove();
667             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
668             Assertions.assertNotNull(result);
669             final HttpResponse response = result.getHead();
670             Assertions.assertNotNull(response);
671             Assertions.assertEquals(200, response.getCode());
672             final String entity = result.getBody();
673             Assertions.assertNotNull(entity);
674             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
675             while (t.hasMoreTokens()) {
676                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
677             }
678         }
679     }
680 
681     @Test
682     public void testSimpleHead() throws Exception {
683         final Http1TestServer server = resources.server();
684         final Http1TestClient client = resources.client();
685 
686         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
687         final InetSocketAddress serverEndpoint = server.start();
688 
689         client.start();
690         final Future<ClientSessionEndpoint> connectFuture = client.connect(
691                 "localhost", serverEndpoint.getPort(), TIMEOUT);
692         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
693 
694         for (int i = 0; i < 5; i++) {
695             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
696                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
697                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
698             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
699             Assertions.assertNotNull(result);
700             final HttpResponse response1 = result.getHead();
701             Assertions.assertNotNull(response1);
702             Assertions.assertEquals(200, response1.getCode());
703             Assertions.assertNull(result.getBody());
704         }
705     }
706 
707     @Test
708     public void testSimpleHeadConnectionClose() throws Exception {
709         final Http1TestServer server = resources.server();
710         final Http1TestClient client = resources.client();
711 
712         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
713         final InetSocketAddress serverEndpoint = server.start();
714 
715         client.start();
716         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
717         for (int i = 0; i < 5; i++) {
718             final Future<ClientSessionEndpoint> connectFuture = client.connect(
719                     "localhost", serverEndpoint.getPort(), TIMEOUT);
720             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
721                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
722                         AsyncRequestBuilder.head(requestURI)
723                                 .addHeader(HttpHeaders.CONNECTION, "close")
724                                 .build(),
725                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
726                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
727                 Assertions.assertNotNull(result);
728                 final HttpResponse response1 = result.getHead();
729                 Assertions.assertNotNull(response1);
730                 Assertions.assertEquals(200, response1.getCode());
731                 Assertions.assertNull(result.getBody());
732             }
733         }
734     }
735 
736     @Test
737     public void testHeadPipelined() throws Exception {
738         final Http1TestServer server = resources.server();
739         final Http1TestClient client = resources.client();
740 
741         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
742         final InetSocketAddress serverEndpoint = server.start();
743 
744         client.start();
745         final Future<ClientSessionEndpoint> connectFuture = client.connect(
746                 "localhost", serverEndpoint.getPort(), TIMEOUT);
747         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
748 
749         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
750         for (int i = 0; i < 5; i++) {
751             queue.add(streamEndpoint.execute(
752                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
753                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
754         }
755         while (!queue.isEmpty()) {
756             final Future<Message<HttpResponse, String>> future = queue.remove();
757             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
758             Assertions.assertNotNull(result);
759             final HttpResponse response1 = result.getHead();
760             Assertions.assertNotNull(response1);
761             Assertions.assertEquals(200, response1.getCode());
762             Assertions.assertNull(result.getBody());
763         }
764     }
765 
766     @Test
767     public void testExpectationFailed() throws Exception {
768         final Http1TestServer server = resources.server();
769         final Http1TestClient client = resources.client();
770 
771         server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
772 
773             @Override
774             protected void handle(
775                     final Message<HttpRequest, String> request,
776                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
777                     final HttpContext context) throws IOException, HttpException {
778                 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
779 
780             }
781         });
782         final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
783 
784             @Override
785             protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
786                 final Header h = request.getFirstHeader("password");
787                 if (h != null && "secret".equals(h.getValue())) {
788                     return null;
789                 } else {
790                     return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
791                 }
792             }
793         }, Http1Config.DEFAULT);
794 
795         client.start();
796         final Future<IOSession> sessionFuture = client.requestSession(
797                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
798         final IOSession ioSession = sessionFuture.get();
799         try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
800 
801             final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
802             request1.addHeader("password", "secret");
803             final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
804                     new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
805                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
806             final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
807             Assertions.assertNotNull(result1);
808             final HttpResponse response1 = result1.getHead();
809             Assertions.assertNotNull(response1);
810             Assertions.assertEquals(200, response1.getCode());
811             Assertions.assertEquals("All is well", result1.getBody());
812 
813             Assertions.assertTrue(ioSession.isOpen());
814 
815             final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
816             final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
817                     new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
818                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
819             final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
820             Assertions.assertNotNull(result2);
821             final HttpResponse response2 = result2.getHead();
822             Assertions.assertNotNull(response2);
823             Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
824             Assertions.assertEquals("You shall not pass", result2.getBody());
825 
826             Assertions.assertTrue(ioSession.isOpen());
827 
828             final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
829             request3.addHeader("password", "secret");
830             final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
831                     new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
832                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
833             final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
834             Assertions.assertNotNull(result3);
835             final HttpResponse response3 = result3.getHead();
836             Assertions.assertNotNull(response3);
837             Assertions.assertEquals(200, response3.getCode());
838             Assertions.assertEquals("All is well", result3.getBody());
839 
840             Assertions.assertTrue(ioSession.isOpen());
841 
842             final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
843             final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
844                     new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
845                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
846             final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
847             Assertions.assertNotNull(result4);
848             final HttpResponse response4 = result4.getHead();
849             Assertions.assertNotNull(response4);
850             Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
851             Assertions.assertEquals("You shall not pass", result4.getBody());
852 
853             Assertions.assertFalse(ioSession.isOpen());
854         }
855     }
856 
857     @Test
858     public void testExpectationFailedCloseConnection() throws Exception {
859         final Http1TestServer server = resources.server();
860         final Http1TestClient client = resources.client();
861 
862         server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
863 
864             @Override
865             protected void handle(
866                     final Message<HttpRequest, String> request,
867                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
868                     final HttpContext context) throws IOException, HttpException {
869                 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
870 
871             }
872         });
873         final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
874 
875             @Override
876             protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
877                 final Header h = request.getFirstHeader("password");
878                 if (h != null && "secret".equals(h.getValue())) {
879                     return null;
880                 } else {
881                     final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
882                     response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
883                     return new BasicResponseProducer(response, "You shall not pass");
884                 }
885             }
886         }, Http1Config.DEFAULT);
887 
888         client.start();
889         final Future<IOSession> sessionFuture = client.requestSession(
890                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
891         final IOSession ioSession = sessionFuture.get();
892         try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
893 
894             final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
895             final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
896                     new BasicRequestProducer(request1, new MultiBinEntityProducer(
897                             new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
898                             100000,
899                             ContentType.TEXT_PLAIN)),
900                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
901             final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
902             Assertions.assertNotNull(result1);
903             final HttpResponse response1 = result1.getHead();
904             Assertions.assertNotNull(response1);
905             Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
906             Assertions.assertNotNull("You shall not pass", result1.getBody());
907 
908             Assertions.assertFalse(streamEndpoint.isOpen());
909         }
910     }
911 
912     @Test
913     public void testDelayedExpectationVerification() throws Exception {
914         final Http1TestServer server = resources.server();
915         final Http1TestClient client = resources.client();
916 
917         server.register("*", () -> new AsyncServerExchangeHandler() {
918 
919             private final Random random = new Random(System.currentTimeMillis());
920             private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
921                     "All is well");
922 
923             @Override
924             public void handleRequest(
925                     final HttpRequest request,
926                     final EntityDetails entityDetails,
927                     final ResponseChannel responseChannel,
928                     final HttpContext context) throws HttpException, IOException {
929 
930                 Executors.newSingleThreadExecutor().execute(() -> {
931                     try {
932                         if (entityDetails != null) {
933                             final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
934                             if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
935                                 Thread.sleep(random.nextInt(1000));
936                                 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
937                             }
938                             final HttpResponse response = new BasicHttpResponse(200);
939                             lock.lock();
940                             try {
941                                 responseChannel.sendResponse(response, entityProducer, context);
942                             } finally {
943                                 lock.unlock();
944                             }
945                         }
946                     } catch (final Exception ignore) {
947                         // ignore
948                     }
949                 });
950 
951             }
952 
953             @Override
954             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
955                 capacityChannel.update(Integer.MAX_VALUE);
956             }
957 
958             @Override
959             public void consume(final ByteBuffer src) throws IOException {
960             }
961 
962             @Override
963             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
964             }
965 
966             @Override
967             public int available() {
968                 lock.lock();
969                 try {
970                     return entityProducer.available();
971                 } finally {
972                     lock.unlock();
973                 }
974             }
975 
976             @Override
977             public void produce(final DataStreamChannel channel) throws IOException {
978                 lock.lock();
979                 try {
980                     entityProducer.produce(channel);
981                 } finally {
982                     lock.unlock();
983                 }
984             }
985 
986             @Override
987             public void failed(final Exception cause) {
988             }
989 
990             @Override
991             public void releaseResources() {
992             }
993 
994         });
995         final InetSocketAddress serverEndpoint = server.start();
996 
997         client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
998         final Future<ClientSessionEndpoint> connectFuture = client.connect(
999                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1000         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1001 
1002         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
1003         for (int i = 0; i < 5; i++) {
1004             queue.add(streamEndpoint.execute(
1005                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
1006                             AsyncEntityProducers.create("Some important message")),
1007                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
1008         }
1009         while (!queue.isEmpty()) {
1010             final Future<Message<HttpResponse, String>> future = queue.remove();
1011             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1012             Assertions.assertNotNull(result);
1013             final HttpResponse response = result.getHead();
1014             Assertions.assertNotNull(response);
1015             Assertions.assertEquals(200, response.getCode());
1016             Assertions.assertNotNull("All is well", result.getBody());
1017         }
1018     }
1019 
1020     @Test
1021     public void testPrematureResponse() throws Exception {
1022         final Http1TestServer server = resources.server();
1023         final Http1TestClient client = resources.client();
1024 
1025         server.register("*", () -> new AsyncServerExchangeHandler() {
1026 
1027             private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
1028 
1029             @Override
1030             public void handleRequest(
1031                     final HttpRequest request,
1032                     final EntityDetails entityDetails,
1033                     final ResponseChannel responseChannel,
1034                     final HttpContext context) throws HttpException, IOException {
1035                 final AsyncResponseProducer producer;
1036                 final Header h = request.getFirstHeader("password");
1037                 if (h != null && "secret".equals(h.getValue())) {
1038                     producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1039                 } else {
1040                     producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1041                 }
1042                 responseProducer.set(producer);
1043                 producer.sendResponse(responseChannel, context);
1044             }
1045 
1046             @Override
1047             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1048                 capacityChannel.update(Integer.MAX_VALUE);
1049             }
1050 
1051             @Override
1052             public void consume(final ByteBuffer src) throws IOException {
1053             }
1054 
1055             @Override
1056             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1057             }
1058 
1059             @Override
1060             public int available() {
1061                 final AsyncResponseProducer producer = responseProducer.get();
1062                 return producer.available();
1063             }
1064 
1065             @Override
1066             public void produce(final DataStreamChannel channel) throws IOException {
1067                 final AsyncResponseProducer producer = responseProducer.get();
1068                 producer.produce(channel);
1069             }
1070 
1071             @Override
1072             public void failed(final Exception cause) {
1073             }
1074 
1075             @Override
1076             public void releaseResources() {
1077             }
1078         });
1079         final InetSocketAddress serverEndpoint = server.start();
1080 
1081         client.start();
1082         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1083                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1084         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1085 
1086         for (int i = 0; i < 3; i++) {
1087             final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1088             final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1089                     new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1090                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1091             final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1092             Assertions.assertNotNull(result1);
1093             final HttpResponse response1 = result1.getHead();
1094             Assertions.assertNotNull(response1);
1095             Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1096             Assertions.assertNotNull("You shall not pass", result1.getBody());
1097 
1098             Assertions.assertTrue(streamEndpoint.isOpen());
1099         }
1100         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1101         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1102                 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1103                         new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1104                         100000,
1105                         ContentType.TEXT_PLAIN)),
1106                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1107         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1108         Assertions.assertNotNull(result1);
1109         final HttpResponse response1 = result1.getHead();
1110         Assertions.assertNotNull(response1);
1111         Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1112         Assertions.assertNotNull("You shall not pass", result1.getBody());
1113     }
1114 
1115     @Test
1116     public void testSlowResponseConsumer() throws Exception {
1117         final Http1TestServer server = resources.server();
1118         final Http1TestClient client = resources.client();
1119 
1120         server.register("/", () -> new MultiLineResponseHandler("0123456789abcd", 100));
1121         final InetSocketAddress serverEndpoint = server.start();
1122 
1123         client.start(Http1Config.custom().setBufferSize(256).build());
1124 
1125         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1126                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1127         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1128 
1129         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1130         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1131                 new BasicRequestProducer(request1, null),
1132                 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1133 
1134                     @Override
1135                     protected String consumeData(
1136                             final ContentType contentType, final InputStream inputStream) throws IOException {
1137                         final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1138 
1139                         final StringBuilder buffer = new StringBuilder();
1140                         try {
1141                             final byte[] tmp = new byte[16];
1142                             int l;
1143                             while ((l = inputStream.read(tmp)) != -1) {
1144                                 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1145                                 Thread.sleep(50);
1146                             }
1147                         } catch (final InterruptedException ex) {
1148                             Thread.currentThread().interrupt();
1149                             throw new InterruptedIOException(ex.getMessage());
1150                         }
1151                         return buffer.toString();
1152                     }
1153                 }),
1154                 null);
1155 
1156         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1157         Assertions.assertNotNull(result1);
1158         final HttpResponse response1 = result1.getHead();
1159         Assertions.assertNotNull(response1);
1160         Assertions.assertEquals(200, response1.getCode());
1161         final String s1 = result1.getBody();
1162         Assertions.assertNotNull(s1);
1163         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1164         while (t1.hasMoreTokens()) {
1165             Assertions.assertEquals("0123456789abcd", t1.nextToken());
1166         }
1167     }
1168 
1169     @Test
1170     public void testSlowRequestProducer() throws Exception {
1171         final Http1TestServer server = resources.server();
1172         final Http1TestClient client = resources.client();
1173 
1174         server.register("*", () -> new EchoHandler(2048));
1175         final InetSocketAddress serverEndpoint = server.start();
1176 
1177         client.start();
1178         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1179                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1180         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1181 
1182         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1183         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1184                 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1185 
1186                     @Override
1187                     protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1188                         final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1189                         try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1190                             for (int i = 0; i < 500; i++) {
1191                                 if (i % 100 == 0) {
1192                                     writer.flush();
1193                                     Thread.sleep(500);
1194                                 }
1195                                 writer.write("0123456789abcdef\r\n");
1196                             }
1197                         } catch (final InterruptedException ex) {
1198                             Thread.currentThread().interrupt();
1199                             throw new InterruptedIOException(ex.getMessage());
1200                         }
1201                     }
1202 
1203                 }),
1204                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1205         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1206         Assertions.assertNotNull(result1);
1207         final HttpResponse response1 = result1.getHead();
1208         Assertions.assertNotNull(response1);
1209         Assertions.assertEquals(200, response1.getCode());
1210         final String s1 = result1.getBody();
1211         Assertions.assertNotNull(s1);
1212         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1213         while (t1.hasMoreTokens()) {
1214             Assertions.assertEquals("0123456789abcdef", t1.nextToken());
1215         }
1216     }
1217 
1218     @Test
1219     public void testSlowResponseProducer() throws Exception {
1220         final Http1TestServer server = resources.server();
1221         final Http1TestClient client = resources.client();
1222 
1223         server.register("*", () -> new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1224 
1225             @Override
1226             protected void handle(
1227                     final HttpRequest request,
1228                     final InputStream requestStream,
1229                     final HttpResponse response,
1230                     final OutputStream responseStream,
1231                     final HttpContext context) throws IOException, HttpException {
1232 
1233                 if (!"/hello".equals(request.getPath())) {
1234                     response.setCode(HttpStatus.SC_NOT_FOUND);
1235                     return;
1236                 }
1237                 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1238                     response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1239                     return;
1240                 }
1241                 if (requestStream == null) {
1242                     return;
1243                 }
1244                 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1245                 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1246                 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1247                 response.setCode(HttpStatus.SC_OK);
1248                 response.setHeader(h1);
1249                 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1250                      final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1251                     try {
1252                         String l;
1253                         int count = 0;
1254                         while ((l = reader.readLine()) != null) {
1255                             writer.write(l);
1256                             writer.write("\r\n");
1257                             count++;
1258                             if (count % 500 == 0) {
1259                                 Thread.sleep(500);
1260                             }
1261                         }
1262                         writer.flush();
1263                     } catch (final InterruptedException ex) {
1264                         Thread.currentThread().interrupt();
1265                         throw new InterruptedIOException(ex.getMessage());
1266                     }
1267                 }
1268             }
1269         });
1270         final InetSocketAddress serverEndpoint = server.start();
1271 
1272         client.start(Http1Config.custom().setBufferSize(256).build());
1273 
1274         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1275                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1276         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1277 
1278         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1279         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1280                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1281                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1282         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1283         Assertions.assertNotNull(result1);
1284         final HttpResponse response1 = result1.getHead();
1285         Assertions.assertNotNull(response1);
1286         Assertions.assertEquals(200, response1.getCode());
1287         final String s1 = result1.getBody();
1288         Assertions.assertNotNull(s1);
1289         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1290         while (t1.hasMoreTokens()) {
1291             Assertions.assertEquals("0123456789abcd", t1.nextToken());
1292         }
1293     }
1294 
1295     @Test
1296     public void testPipelinedConnectionClose() throws Exception {
1297         final Http1TestServer server = resources.server();
1298         final Http1TestClient client = resources.client();
1299 
1300         server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1301         final InetSocketAddress serverEndpoint = server.start();
1302 
1303         client.start();
1304         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1305                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1306         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1307 
1308         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1309                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1310                         AsyncEntityProducers.create("Hi there")),
1311                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1312         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1313         request2.addHeader(HttpHeaders.CONNECTION, "close");
1314         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1315                 new BasicRequestProducer(request2,
1316                         AsyncEntityProducers.create("Hi there")),
1317                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1318         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1319                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1320                         AsyncEntityProducers.create("Hi there")),
1321                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1322 
1323         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1324         Assertions.assertNotNull(result1);
1325         final HttpResponse response1 = result1.getHead();
1326         final String entity1 = result1.getBody();
1327         Assertions.assertNotNull(response1);
1328         Assertions.assertEquals(200, response1.getCode());
1329         Assertions.assertEquals("Hi back", entity1);
1330 
1331         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1332         Assertions.assertNotNull(result2);
1333         final HttpResponse response2 = result2.getHead();
1334         final String entity2 = result2.getBody();
1335         Assertions.assertNotNull(response2);
1336         Assertions.assertEquals(200, response2.getCode());
1337         Assertions.assertEquals("Hi back", entity2);
1338 
1339         final Exception exception = Assertions.assertThrows(Exception.class, () ->
1340                 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1341         assertThat(exception, CoreMatchers.anyOf(
1342                 CoreMatchers.instanceOf(CancellationException.class),
1343                 CoreMatchers.instanceOf(ExecutionException.class)));
1344 
1345         final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1346                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1347                         AsyncEntityProducers.create("Hi there")),
1348                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1349         final Exception exception2 = Assertions.assertThrows(Exception.class, () ->
1350                 future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1351         assertThat(exception2, CoreMatchers.anyOf(
1352                 CoreMatchers.instanceOf(CancellationException.class),
1353                 CoreMatchers.instanceOf(ExecutionException.class)));
1354     }
1355 
1356     @Test
1357     public void testPipelinedInvalidRequest() throws Exception {
1358         final Http1TestServer server = resources.server();
1359         final Http1TestClient client = resources.client();
1360 
1361         server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1362         final InetSocketAddress serverEndpoint = server.start();
1363 
1364         client.start();
1365         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1366                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1367         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1368 
1369         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1370                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1371                         AsyncEntityProducers.create("Hi there")),
1372                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1373         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1374         request2.addHeader(HttpHeaders.HOST, "blah:blah");
1375         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1376                 new BasicRequestProducer(request2,
1377                         AsyncEntityProducers.create("Hi there")),
1378                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1379         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1380                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1381                         AsyncEntityProducers.create("Hi there")),
1382                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1383 
1384         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1385         Assertions.assertNotNull(result1);
1386         final HttpResponse response1 = result1.getHead();
1387         final String entity1 = result1.getBody();
1388         Assertions.assertNotNull(response1);
1389         Assertions.assertEquals(200, response1.getCode());
1390         Assertions.assertEquals("Hi back", entity1);
1391 
1392         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1393         Assertions.assertNotNull(result2);
1394         final HttpResponse response2 = result2.getHead();
1395         final String entity2 = result2.getBody();
1396         Assertions.assertNotNull(response2);
1397         Assertions.assertEquals(400, response2.getCode());
1398         Assertions.assertTrue(entity2.length() > 0);
1399 
1400 
1401         final Exception exception = Assertions.assertThrows(Exception.class, () ->
1402                 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1403         assertThat(exception, CoreMatchers.anyOf(
1404                 CoreMatchers.instanceOf(CancellationException.class),
1405                 CoreMatchers.instanceOf(ExecutionException.class)));
1406     }
1407 
1408     private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1409 
1410     private static class BrokenChunkEncoder extends AbstractContentEncoder {
1411 
1412         private final CharArrayBuffer lineBuffer;
1413         private boolean done;
1414 
1415         BrokenChunkEncoder(
1416                 final WritableByteChannel channel,
1417                 final SessionOutputBuffer buffer,
1418                 final BasicHttpTransportMetrics metrics) {
1419             super(channel, buffer, metrics);
1420             lineBuffer = new CharArrayBuffer(16);
1421         }
1422 
1423         @Override
1424         public void complete(final List<? extends Header> trailers) throws IOException {
1425             super.complete(trailers);
1426         }
1427 
1428         @Override
1429         public int write(final ByteBuffer src) throws IOException {
1430             final int chunk;
1431             if (!done) {
1432                 lineBuffer.clear();
1433                 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1434                 buffer().writeLine(lineBuffer);
1435                 buffer().write(ByteBuffer.wrap(GARBAGE));
1436                 done = true;
1437                 chunk = GARBAGE.length;
1438             } else {
1439                 chunk = 0;
1440             }
1441             final long bytesWritten = buffer().flush(channel());
1442             if (bytesWritten > 0) {
1443                 metrics().incrementBytesTransferred(bytesWritten);
1444             }
1445             if (!buffer().hasData()) {
1446                 channel().close();
1447             }
1448             return chunk;
1449         }
1450 
1451     }
1452 
1453     @Test
1454     public void testTruncatedChunk() throws Exception {
1455         final Http1TestServer server = resources.server();
1456         final Http1TestClient client = resources.client();
1457 
1458         final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1459                 HttpProcessors.server(),
1460                 (request, context) -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1461 
1462                     @Override
1463                     protected void handle(
1464                             final Message<HttpRequest, String> request,
1465                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1466                             final HttpContext context) throws IOException, HttpException {
1467                         responseTrigger.submitResponse(
1468                                 new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1469                     }
1470 
1471                 },
1472                 Http1Config.DEFAULT,
1473                 CharCodingConfig.DEFAULT,
1474                 DefaultConnectionReuseStrategy.INSTANCE,
1475                 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1476 
1477             @Override
1478             protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1479                     final ProtocolIOSession ioSession,
1480                     final HttpProcessor httpProcessor,
1481                     final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1482                     final Http1Config http1Config,
1483                     final CharCodingConfig connectionConfig,
1484                     final ConnectionReuseStrategy connectionReuseStrategy,
1485                     final NHttpMessageParser<HttpRequest> incomingMessageParser,
1486                     final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1487                     final ContentLengthStrategy incomingContentStrategy,
1488                     final ContentLengthStrategy outgoingContentStrategy,
1489                     final Http1StreamListener streamListener) {
1490                 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1491                         scheme.id,
1492                         http1Config, connectionConfig, connectionReuseStrategy,
1493                         incomingMessageParser, outgoingMessageWriter,
1494                         incomingContentStrategy, outgoingContentStrategy,
1495                         streamListener) {
1496 
1497                     @Override
1498                     protected ContentEncoder createContentEncoder(
1499                             final long len,
1500                             final WritableByteChannel channel,
1501                             final SessionOutputBuffer buffer,
1502                             final BasicHttpTransportMetrics metrics) throws HttpException {
1503                         if (len == ContentLengthStrategy.CHUNKED) {
1504                             return new BrokenChunkEncoder(channel, buffer, metrics);
1505                         } else {
1506                             return super.createContentEncoder(len, channel, buffer, metrics);
1507                         }
1508                     }
1509 
1510                 };
1511             }
1512 
1513         });
1514 
1515         client.start();
1516         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1517                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1518         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1519 
1520         final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1521         final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1522 
1523             @Override
1524             public void releaseResources() {
1525                 // Do not clear internal content buffer
1526             }
1527 
1528         };
1529         final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1530         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1531         final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
1532                 future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1533         final Throwable cause = exception.getCause();
1534         Assertions.assertTrue(cause instanceof MalformedChunkCodingException);
1535         Assertions.assertEquals("garbage", entityConsumer.generateContent());
1536     }
1537 
1538     @Test
1539     public void testExceptionInHandler() throws Exception {
1540         final Http1TestServer server = resources.server();
1541         final Http1TestClient client = resources.client();
1542 
1543         server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1544 
1545             @Override
1546             protected void handle(
1547                     final Message<HttpRequest, String> request,
1548                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1549                     final HttpContext context) throws IOException, HttpException {
1550                 throw new HttpException("Boom");
1551             }
1552         });
1553         final InetSocketAddress serverEndpoint = server.start();
1554 
1555         client.start();
1556         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1557                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1558         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1559 
1560         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1561                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1562                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1563         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1564         Assertions.assertNotNull(result);
1565         final HttpResponse response1 = result.getHead();
1566         final String entity1 = result.getBody();
1567         Assertions.assertNotNull(response1);
1568         Assertions.assertEquals(500, response1.getCode());
1569         Assertions.assertEquals("Boom", entity1);
1570     }
1571 
1572     @Test
1573     public void testNoServiceHandler() throws Exception {
1574         final Http1TestServer server = resources.server();
1575         final Http1TestClient client = resources.client();
1576 
1577         final InetSocketAddress serverEndpoint = server.start();
1578 
1579         client.start();
1580         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1581                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1582         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1583 
1584         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1585                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1586                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1587         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1588         Assertions.assertNotNull(result);
1589         final HttpResponse response1 = result.getHead();
1590         final String entity1 = result.getBody();
1591         Assertions.assertNotNull(response1);
1592         Assertions.assertEquals(404, response1.getCode());
1593         Assertions.assertEquals("Resource not found", entity1);
1594     }
1595 
1596     @Test
1597     public void testResponseNoContent() throws Exception {
1598         final Http1TestServer server = resources.server();
1599         final Http1TestClient client = resources.client();
1600 
1601         server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1602 
1603             @Override
1604             protected void handle(
1605                     final Message<HttpRequest, String> request,
1606                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1607                     final HttpContext context) throws IOException, HttpException {
1608                 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1609                 responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1610             }
1611         });
1612         final InetSocketAddress serverEndpoint = server.start();
1613 
1614         client.start();
1615         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1616                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1617         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1618 
1619         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1620                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1621                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1622         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1623         Assertions.assertNotNull(result);
1624         final HttpResponse response1 = result.getHead();
1625         Assertions.assertNotNull(response1);
1626         Assertions.assertEquals(204, response1.getCode());
1627         Assertions.assertNull(result.getBody());
1628     }
1629 
1630     @Test
1631     public void testMessageWithTrailers() throws Exception {
1632         final Http1TestServer server = resources.server();
1633         final Http1TestClient client = resources.client();
1634 
1635         server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1636 
1637             @Override
1638             protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1639                     final HttpRequest request,
1640                     final EntityDetails entityDetails,
1641                     final HttpContext context) throws HttpException {
1642                 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1643             }
1644 
1645             @Override
1646             protected void handle(
1647                     final Message<HttpRequest, String> requestMessage,
1648                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1649                     final HttpContext context) throws HttpException, IOException {
1650                 responseTrigger.submitResponse(new BasicResponseProducer(
1651                         HttpStatus.SC_OK,
1652                         new DigestingEntityProducer("MD5",
1653                                 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1654             }
1655         });
1656         final InetSocketAddress serverEndpoint = server.start();
1657 
1658         client.start();
1659 
1660         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1661                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1662         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1663 
1664         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1665         final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1666         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1667                 new BasicRequestProducer(request1, null),
1668                 new BasicResponseConsumer<>(entityConsumer), null);
1669         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1670         Assertions.assertNotNull(result1);
1671         final HttpResponse response1 = result1.getHead();
1672         Assertions.assertNotNull(response1);
1673         Assertions.assertEquals(200, response1.getCode());
1674         Assertions.assertEquals("Hello back with some trailers", result1.getBody());
1675 
1676         final List<Header> trailers = entityConsumer.getTrailers();
1677         Assertions.assertNotNull(trailers);
1678         Assertions.assertEquals(2, trailers.size());
1679         final Map<String, String> map = new HashMap<>();
1680         for (final Header header: trailers) {
1681             map.put(TextUtils.toLowerCase(header.getName()), header.getValue());
1682         }
1683         final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1684         Assertions.assertEquals("MD5", map.get("digest-algo"));
1685         Assertions.assertEquals(digest, map.get("digest"));
1686     }
1687 
1688     @Test
1689     public void testProtocolException() throws Exception {
1690         final Http1TestServer server = resources.server();
1691         final Http1TestClient client = resources.client();
1692 
1693         server.register("/boom", () -> new AsyncServerExchangeHandler() {
1694 
1695             private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1696 
1697             @Override
1698             public void releaseResources() {
1699                 entityProducer.releaseResources();
1700             }
1701 
1702             @Override
1703             public void handleRequest(
1704                     final HttpRequest request,
1705                     final EntityDetails entityDetails,
1706                     final ResponseChannel responseChannel,
1707                     final HttpContext context) throws HttpException, IOException {
1708                 final String requestUri = request.getRequestUri();
1709                 if (requestUri.endsWith("boom")) {
1710                     throw new ProtocolException("Boom!!!");
1711                 }
1712                 responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1713             }
1714 
1715             @Override
1716             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1717                 capacityChannel.update(Integer.MAX_VALUE);
1718             }
1719 
1720             @Override
1721             public void consume(final ByteBuffer src) throws IOException {
1722             }
1723 
1724             @Override
1725             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1726                 // empty
1727             }
1728 
1729             @Override
1730             public int available() {
1731                 return entityProducer.available();
1732             }
1733 
1734             @Override
1735             public void produce(final DataStreamChannel channel) throws IOException {
1736                 entityProducer.produce(channel);
1737             }
1738 
1739             @Override
1740             public void failed(final Exception cause) {
1741                 releaseResources();
1742             }
1743 
1744         });
1745 
1746         final InetSocketAddress serverEndpoint = server.start();
1747 
1748         client.start();
1749         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1750                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1751         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1752         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1753                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1754                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1755         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1756         Assertions.assertNotNull(result);
1757         final HttpResponse response1 = result.getHead();
1758         final String entity1 = result.getBody();
1759         Assertions.assertNotNull(response1);
1760         Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1761         Assertions.assertEquals("Boom!!!", entity1);
1762     }
1763 
1764     @Test
1765     public void testHeaderTooLarge() throws Exception {
1766         final Http1TestServer server = resources.server();
1767         final Http1TestClient client = resources.client();
1768 
1769         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1770         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1771                 .setMaxLineLength(100)
1772                 .build());
1773         client.start();
1774 
1775         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1776                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1777         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1778 
1779         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1780         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1781                 "1234567890123456789012345678901234567890");
1782         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1783                 new BasicRequestProducer(request1, null),
1784                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1785         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1786         Assertions.assertNotNull(result1);
1787         final HttpResponse response1 = result1.getHead();
1788         Assertions.assertNotNull(response1);
1789         Assertions.assertEquals(431, response1.getCode());
1790         Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1791     }
1792 
1793     @Test
1794     public void testHeaderTooLargePost() throws Exception {
1795         final Http1TestServer server = resources.server();
1796         final Http1TestClient client = resources.client();
1797 
1798         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1799         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1800                 .setMaxLineLength(100)
1801                 .build());
1802         client.start(
1803                 new DefaultHttpProcessor(RequestContent.INSTANCE, RequestTargetHost.INSTANCE, RequestConnControl.INSTANCE), null);
1804 
1805         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1806                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1807         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1808 
1809         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1810         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1811                 "1234567890123456789012345678901234567890");
1812 
1813         final byte[] b = new byte[2048];
1814         for (int i = 0; i < b.length; i++) {
1815             b[i] = (byte) ('a' + i % 10);
1816         }
1817 
1818         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1819                 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1820                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1821         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1822         Assertions.assertNotNull(result1);
1823         final HttpResponse response1 = result1.getHead();
1824         Assertions.assertNotNull(response1);
1825         Assertions.assertEquals(431, response1.getCode());
1826         Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1827     }
1828 
1829 }