View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import java.io.BufferedReader;
31  import java.io.BufferedWriter;
32  import java.io.IOException;
33  import java.io.InputStream;
34  import java.io.InputStreamReader;
35  import java.io.InterruptedIOException;
36  import java.io.OutputStream;
37  import java.io.OutputStreamWriter;
38  import java.net.InetSocketAddress;
39  import java.net.URI;
40  import java.net.URISyntaxException;
41  import java.nio.ByteBuffer;
42  import java.nio.channels.WritableByteChannel;
43  import java.nio.charset.Charset;
44  import java.nio.charset.StandardCharsets;
45  import java.util.Arrays;
46  import java.util.Collection;
47  import java.util.HashMap;
48  import java.util.LinkedList;
49  import java.util.List;
50  import java.util.Locale;
51  import java.util.Map;
52  import java.util.Queue;
53  import java.util.Random;
54  import java.util.StringTokenizer;
55  import java.util.concurrent.CancellationException;
56  import java.util.concurrent.ExecutionException;
57  import java.util.concurrent.Executors;
58  import java.util.concurrent.Future;
59  import java.util.concurrent.atomic.AtomicReference;
60  
61  import org.apache.hc.core5.function.Decorator;
62  import org.apache.hc.core5.function.Supplier;
63  import org.apache.hc.core5.http.ConnectionReuseStrategy;
64  import org.apache.hc.core5.http.ContentLengthStrategy;
65  import org.apache.hc.core5.http.ContentType;
66  import org.apache.hc.core5.http.EntityDetails;
67  import org.apache.hc.core5.http.Header;
68  import org.apache.hc.core5.http.HeaderElements;
69  import org.apache.hc.core5.http.HttpException;
70  import org.apache.hc.core5.http.HttpHeaders;
71  import org.apache.hc.core5.http.HttpHost;
72  import org.apache.hc.core5.http.HttpRequest;
73  import org.apache.hc.core5.http.HttpResponse;
74  import org.apache.hc.core5.http.HttpStatus;
75  import org.apache.hc.core5.http.HttpVersion;
76  import org.apache.hc.core5.http.MalformedChunkCodingException;
77  import org.apache.hc.core5.http.Message;
78  import org.apache.hc.core5.http.Method;
79  import org.apache.hc.core5.http.ProtocolException;
80  import org.apache.hc.core5.http.URIScheme;
81  import org.apache.hc.core5.http.config.CharCodingConfig;
82  import org.apache.hc.core5.http.config.Http1Config;
83  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
84  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
85  import org.apache.hc.core5.http.impl.Http1StreamListener;
86  import org.apache.hc.core5.http.impl.HttpProcessors;
87  import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
88  import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
89  import org.apache.hc.core5.http.message.BasicHttpRequest;
90  import org.apache.hc.core5.http.message.BasicHttpResponse;
91  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
92  import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
93  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
94  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
95  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
96  import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
97  import org.apache.hc.core5.http.nio.CapacityChannel;
98  import org.apache.hc.core5.http.nio.ContentEncoder;
99  import org.apache.hc.core5.http.nio.DataStreamChannel;
100 import org.apache.hc.core5.http.nio.HandlerFactory;
101 import org.apache.hc.core5.http.nio.NHttpMessageParser;
102 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
103 import org.apache.hc.core5.http.nio.ResponseChannel;
104 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
105 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
106 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
107 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
108 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
109 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
110 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
111 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
112 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
113 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
114 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
115 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
116 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
117 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
118 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
119 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
120 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
121 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
122 import org.apache.hc.core5.http.protocol.HttpContext;
123 import org.apache.hc.core5.http.protocol.HttpProcessor;
124 import org.apache.hc.core5.http.protocol.RequestConnControl;
125 import org.apache.hc.core5.http.protocol.RequestContent;
126 import org.apache.hc.core5.http.protocol.RequestTargetHost;
127 import org.apache.hc.core5.http.protocol.RequestValidateHost;
128 import org.apache.hc.core5.reactor.IOReactorConfig;
129 import org.apache.hc.core5.reactor.IOSession;
130 import org.apache.hc.core5.reactor.ProtocolIOSession;
131 import org.apache.hc.core5.testing.SSLTestContexts;
132 import org.apache.hc.core5.util.CharArrayBuffer;
133 import org.apache.hc.core5.util.TextUtils;
134 import org.apache.hc.core5.util.TimeValue;
135 import org.apache.hc.core5.util.Timeout;
136 import org.junit.After;
137 import org.junit.Assert;
138 import org.junit.Before;
139 import org.junit.Test;
140 import org.junit.runner.RunWith;
141 import org.junit.runners.Parameterized;
142 import org.slf4j.Logger;
143 import org.slf4j.LoggerFactory;
144 
145 @RunWith(Parameterized.class)
146 public class Http1IntegrationTest extends InternalHttp1ServerTestBase {
147 
148     private final Logger log = LoggerFactory.getLogger(getClass());
149 
150     @Parameterized.Parameters(name = "{0}")
151     public static Collection<Object[]> protocols() {
152         return Arrays.asList(new Object[][]{
153                 { URIScheme.HTTP },
154                 { URIScheme.HTTPS }
155         });
156     }
157 
158     public Http1IntegrationTest(final URIScheme scheme) {
159         super(scheme);
160     }
161 
162     private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
163     private static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
164 
165     private Http1TestClient client;
166 
167     @Before
168     public void setup() throws Exception {
169         log.debug("Starting up test client");
170         client = new Http1TestClient(
171                 buildReactorConfig(),
172                 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null, null, null);
173     }
174 
175     protected IOReactorConfig buildReactorConfig() {
176         return IOReactorConfig.DEFAULT;
177     }
178 
179     @After
180     public void cleanup() throws Exception {
181         log.debug("Shutting down test client");
182         if (client != null) {
183             client.shutdown(TimeValue.ofSeconds(5));
184         }
185     }
186 
187     private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
188         try {
189             return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
190         } catch (final URISyntaxException e) {
191             throw new IllegalStateException();
192         }
193     }
194 
195     @Test
196     public void testSimpleGet() throws Exception {
197         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
198 
199             @Override
200             public AsyncServerExchangeHandler get() {
201                 return new SingleLineResponseHandler("Hi there");
202             }
203 
204         });
205         final InetSocketAddress serverEndpoint = server.start();
206 
207         client.start();
208         final Future<ClientSessionEndpoint> connectFuture = client.connect(
209                 "localhost", serverEndpoint.getPort(), TIMEOUT);
210         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
211 
212         for (int i = 0; i < 5; i++) {
213             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
214                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
215                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
216             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
217             Assert.assertNotNull(result);
218             final HttpResponse response1 = result.getHead();
219             final String entity1 = result.getBody();
220             Assert.assertNotNull(response1);
221             Assert.assertEquals(200, response1.getCode());
222             Assert.assertEquals("Hi there", entity1);
223         }
224     }
225 
226     @Test
227     public void testSimpleGetConnectionClose() throws Exception {
228         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
229 
230             @Override
231             public AsyncServerExchangeHandler get() {
232                 return new SingleLineResponseHandler("Hi there");
233             }
234 
235         });
236         final InetSocketAddress serverEndpoint = server.start();
237 
238         client.start();
239         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
240         for (int i = 0; i < 5; i++) {
241             final Future<ClientSessionEndpoint> connectFuture = client.connect(
242                     "localhost", serverEndpoint.getPort(), TIMEOUT);
243             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
244                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
245                         AsyncRequestBuilder.get(requestURI)
246                                 .addHeader(HttpHeaders.CONNECTION, "close")
247                                 .build(),
248                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
249                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
250                 Assert.assertNotNull(result);
251                 final HttpResponse response1 = result.getHead();
252                 final String entity1 = result.getBody();
253                 Assert.assertNotNull(response1);
254                 Assert.assertEquals(200, response1.getCode());
255                 Assert.assertEquals("Hi there", entity1);
256             }
257         }
258     }
259 
260     @Test
261     public void testSimpleGetIdentityTransfer() throws Exception {
262         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
263 
264             @Override
265             public AsyncServerExchangeHandler get() {
266                 return new SingleLineResponseHandler("Hi there");
267             }
268 
269         });
270         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
271         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
272 
273         client.start();
274 
275         final int reqNo = 5;
276 
277         for (int i = 0; i < reqNo; i++) {
278             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
279             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
280 
281             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
282                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
283                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
284             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
285 
286             streamEndpoint.close();
287 
288             Assert.assertNotNull(result);
289             final HttpResponse response = result.getHead();
290             final String entity = result.getBody();
291             Assert.assertNotNull(response);
292             Assert.assertEquals(200, response.getCode());
293             Assert.assertEquals("Hi there", entity);
294         }
295 
296     }
297 
298     @Test
299     public void testPostIdentityTransfer() throws Exception {
300         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
301 
302             @Override
303             public AsyncServerExchangeHandler get() {
304                 return new SingleLineResponseHandler("Hi there");
305             }
306 
307         });
308         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
309         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
310 
311         client.start();
312 
313         final int reqNo = 5;
314 
315         for (int i = 0; i < reqNo; i++) {
316             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
317             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
318 
319             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
320                     new BasicRequestProducer(Method.POST,
321                             createRequestURI(serverEndpoint, "/hello"),
322                             new MultiLineEntityProducer("Hello", 16 * i)),
323                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
324             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
325 
326             streamEndpoint.close();
327 
328             Assert.assertNotNull(result);
329             final HttpResponse response = result.getHead();
330             final String entity = result.getBody();
331             Assert.assertNotNull(response);
332             Assert.assertEquals(200, response.getCode());
333             Assert.assertEquals("Hi there", entity);
334         }
335     }
336 
337     @Test
338     public void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
339         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
340 
341             @Override
342             public AsyncServerExchangeHandler get() {
343                 return new ImmediateResponseExchangeHandler(500, "Go away");
344             }
345 
346         });
347         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
348         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
349 
350         client.start();
351 
352         final int reqNo = 5;
353 
354         for (int i = 0; i < reqNo; i++) {
355             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
356             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
357 
358             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
359                     new BasicRequestProducer(Method.POST,
360                             createRequestURI(serverEndpoint, "/hello"),
361                             new MultiLineEntityProducer("Hello", 16 * i)),
362                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
363             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
364 
365             streamEndpoint.close();
366 
367             Assert.assertNotNull(result);
368             final HttpResponse response = result.getHead();
369             final String entity = result.getBody();
370             Assert.assertNotNull(response);
371             Assert.assertEquals(500, response.getCode());
372             Assert.assertEquals("Go away", entity);
373         }
374     }
375 
376     @Test
377     public void testSimpleGetsPipelined() throws Exception {
378         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
379 
380             @Override
381             public AsyncServerExchangeHandler get() {
382                 return new SingleLineResponseHandler("Hi there");
383             }
384 
385         });
386         final InetSocketAddress serverEndpoint = server.start();
387 
388         client.start();
389         final Future<ClientSessionEndpoint> connectFuture = client.connect(
390                 "localhost", serverEndpoint.getPort(), TIMEOUT);
391         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
392 
393         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
394         for (int i = 0; i < 5; i++) {
395             queue.add(streamEndpoint.execute(
396                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
397                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
398         }
399         while (!queue.isEmpty()) {
400             final Future<Message<HttpResponse, String>> future = queue.remove();
401             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
402             Assert.assertNotNull(result);
403             final HttpResponse response = result.getHead();
404             final String entity = result.getBody();
405             Assert.assertNotNull(response);
406             Assert.assertEquals(200, response.getCode());
407             Assert.assertEquals("Hi there", entity);
408         }
409     }
410 
411     @Test
412     public void testLargeGet() throws Exception {
413         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
414 
415             @Override
416             public AsyncServerExchangeHandler get() {
417                 return new MultiLineResponseHandler("0123456789abcdef", 5000);
418             }
419 
420         });
421         final InetSocketAddress serverEndpoint = server.start();
422 
423         client.start();
424         final Future<ClientSessionEndpoint> connectFuture = client.connect(
425                 "localhost", serverEndpoint.getPort(), TIMEOUT);
426         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
427 
428         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
429                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
430                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
431 
432         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
433         Assert.assertNotNull(result1);
434         final HttpResponse response1 = result1.getHead();
435         Assert.assertNotNull(response1);
436         Assert.assertEquals(200, response1.getCode());
437         final String s1 = result1.getBody();
438         Assert.assertNotNull(s1);
439         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
440         while (t1.hasMoreTokens()) {
441             Assert.assertEquals("0123456789abcdef", t1.nextToken());
442         }
443 
444         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
445                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
446                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
447 
448         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
449         Assert.assertNotNull(result2);
450         final HttpResponse response2 = result2.getHead();
451         Assert.assertNotNull(response2);
452         Assert.assertEquals(200, response2.getCode());
453         final String s2 = result2.getBody();
454         Assert.assertNotNull(s2);
455         final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
456         while (t2.hasMoreTokens()) {
457             Assert.assertEquals("0123456789abcdef", t2.nextToken());
458         }
459     }
460 
461     @Test
462     public void testLargeGetsPipelined() throws Exception {
463         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
464 
465             @Override
466             public AsyncServerExchangeHandler get() {
467                 return new MultiLineResponseHandler("0123456789abcdef", 2000);
468             }
469 
470         });
471         final InetSocketAddress serverEndpoint = server.start();
472 
473         client.start();
474         final Future<ClientSessionEndpoint> connectFuture = client.connect(
475                 "localhost", serverEndpoint.getPort(), TIMEOUT);
476         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
477 
478         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
479         for (int i = 0; i < 5; i++) {
480             queue.add(streamEndpoint.execute(
481                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
482                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
483         }
484         while (!queue.isEmpty()) {
485             final Future<Message<HttpResponse, String>> future = queue.remove();
486             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
487             Assert.assertNotNull(result);
488             final HttpResponse response = result.getHead();
489             Assert.assertNotNull(response);
490             Assert.assertEquals(200, response.getCode());
491             final String entity = result.getBody();
492             Assert.assertNotNull(entity);
493             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
494             while (t.hasMoreTokens()) {
495                 Assert.assertEquals("0123456789abcdef", t.nextToken());
496             }
497         }
498     }
499 
500     @Test
501     public void testBasicPost() throws Exception {
502         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
503 
504             @Override
505             public AsyncServerExchangeHandler get() {
506                 return new SingleLineResponseHandler("Hi back");
507             }
508 
509         });
510         final InetSocketAddress serverEndpoint = server.start();
511 
512         client.start();
513         final Future<ClientSessionEndpoint> connectFuture = client.connect(
514                 "localhost", serverEndpoint.getPort(), TIMEOUT);
515         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
516 
517         for (int i = 0; i < 5; i++) {
518             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
519                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
520                             AsyncEntityProducers.create("Hi there")),
521                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
522             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
523             Assert.assertNotNull(result);
524             final HttpResponse response1 = result.getHead();
525             final String entity1 = result.getBody();
526             Assert.assertNotNull(response1);
527             Assert.assertEquals(200, response1.getCode());
528             Assert.assertEquals("Hi back", entity1);
529         }
530     }
531 
532     @Test
533     public void testBasicPostPipelined() throws Exception {
534         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
535 
536             @Override
537             public AsyncServerExchangeHandler get() {
538                 return new SingleLineResponseHandler("Hi back");
539             }
540 
541         });
542         final InetSocketAddress serverEndpoint = server.start();
543 
544         client.start();
545         final Future<ClientSessionEndpoint> connectFuture = client.connect(
546                 "localhost", serverEndpoint.getPort(), TIMEOUT);
547         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
548 
549         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
550         for (int i = 0; i < 5; i++) {
551             queue.add(streamEndpoint.execute(
552                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
553                             AsyncEntityProducers.create("Hi there")),
554                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
555         }
556         while (!queue.isEmpty()) {
557             final Future<Message<HttpResponse, String>> future = queue.remove();
558             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
559             Assert.assertNotNull(result);
560             final HttpResponse response = result.getHead();
561             final String entity = result.getBody();
562             Assert.assertNotNull(response);
563             Assert.assertEquals(200, response.getCode());
564             Assert.assertEquals("Hi back", entity);
565         }
566     }
567 
568     @Test
569     public void testHttp10Post() throws Exception {
570         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
571 
572             @Override
573             public AsyncServerExchangeHandler get() {
574                 return new SingleLineResponseHandler("Hi back");
575             }
576 
577         });
578         final InetSocketAddress serverEndpoint = server.start();
579 
580         client.start();
581         final Future<ClientSessionEndpoint> connectFuture = client.connect(
582                 "localhost", serverEndpoint.getPort(), TIMEOUT);
583         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
584 
585         for (int i = 0; i < 5; i++) {
586             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
587             request.setVersion(HttpVersion.HTTP_1_0);
588             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
589                     new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
590                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
591             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
592             Assert.assertNotNull(result);
593             final HttpResponse response1 = result.getHead();
594             final String entity1 = result.getBody();
595             Assert.assertNotNull(response1);
596             Assert.assertEquals(200, response1.getCode());
597             Assert.assertEquals("Hi back", entity1);
598         }
599     }
600 
601     @Test
602     public void testNoEntityPost() throws Exception {
603         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
604 
605             @Override
606             public AsyncServerExchangeHandler get() {
607                 return new SingleLineResponseHandler("Hi back");
608             }
609 
610         });
611         final InetSocketAddress serverEndpoint = server.start();
612 
613         client.start();
614         final Future<ClientSessionEndpoint> connectFuture = client.connect(
615                 "localhost", serverEndpoint.getPort(), TIMEOUT);
616         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
617 
618         for (int i = 0; i < 5; i++) {
619             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
620             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
621                     new BasicRequestProducer(request, null),
622                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
623             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
624             Assert.assertNotNull(result);
625             final HttpResponse response1 = result.getHead();
626             final String entity1 = result.getBody();
627             Assert.assertNotNull(response1);
628             Assert.assertEquals(200, response1.getCode());
629             Assert.assertEquals("Hi back", entity1);
630         }
631     }
632 
633     @Test
634     public void testLargePost() throws Exception {
635         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
636 
637             @Override
638             public AsyncServerExchangeHandler get() {
639                 return new EchoHandler(2048);
640             }
641 
642         });
643         final InetSocketAddress serverEndpoint = server.start();
644 
645         client.start();
646         final Future<ClientSessionEndpoint> connectFuture = client.connect(
647                 "localhost", serverEndpoint.getPort(), TIMEOUT);
648         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
649 
650         for (int i = 0; i < 5; i++) {
651             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
652                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
653                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
654                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
655             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
656             Assert.assertNotNull(result);
657             final HttpResponse response = result.getHead();
658             Assert.assertNotNull(response);
659             Assert.assertEquals(200, response.getCode());
660             final String entity = result.getBody();
661             Assert.assertNotNull(entity);
662             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
663             while (t.hasMoreTokens()) {
664                 Assert.assertEquals("0123456789abcdef", t.nextToken());
665             }
666         }
667     }
668 
669     @Test
670     public void testPostsPipelinedLargeResponse() throws Exception {
671         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
672 
673             @Override
674             public AsyncServerExchangeHandler get() {
675                 return new MultiLineResponseHandler("0123456789abcdef", 2000);
676             }
677 
678         });
679         final InetSocketAddress serverEndpoint = server.start();
680 
681         client.start();
682         final Future<ClientSessionEndpoint> connectFuture = client.connect(
683                 "localhost", serverEndpoint.getPort(), TIMEOUT);
684         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
685 
686         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
687         for (int i = 0; i < 2; i++) {
688             queue.add(streamEndpoint.execute(
689                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
690                             AsyncEntityProducers.create("Hi there")),
691                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
692         }
693         while (!queue.isEmpty()) {
694             final Future<Message<HttpResponse, String>> future = queue.remove();
695             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
696             Assert.assertNotNull(result);
697             final HttpResponse response = result.getHead();
698             Assert.assertNotNull(response);
699             Assert.assertEquals(200, response.getCode());
700             final String entity = result.getBody();
701             Assert.assertNotNull(entity);
702             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
703             while (t.hasMoreTokens()) {
704                 Assert.assertEquals("0123456789abcdef", t.nextToken());
705             }
706         }
707     }
708 
709 
710     @Test
711     public void testLargePostsPipelined() throws Exception {
712         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
713 
714             @Override
715             public AsyncServerExchangeHandler get() {
716                 return new EchoHandler(2048);
717             }
718 
719         });
720         final InetSocketAddress serverEndpoint = server.start();
721 
722         client.start();
723         final Future<ClientSessionEndpoint> connectFuture = client.connect(
724                 "localhost", serverEndpoint.getPort(), TIMEOUT);
725         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
726 
727         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
728         for (int i = 0; i < 5; i++) {
729             queue.add(streamEndpoint.execute(
730                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
731                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
732                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
733         }
734         while (!queue.isEmpty()) {
735             final Future<Message<HttpResponse, String>> future = queue.remove();
736             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
737             Assert.assertNotNull(result);
738             final HttpResponse response = result.getHead();
739             Assert.assertNotNull(response);
740             Assert.assertEquals(200, response.getCode());
741             final String entity = result.getBody();
742             Assert.assertNotNull(entity);
743             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
744             while (t.hasMoreTokens()) {
745                 Assert.assertEquals("0123456789abcdef", t.nextToken());
746             }
747         }
748     }
749 
750     @Test
751     public void testSimpleHead() throws Exception {
752         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
753 
754             @Override
755             public AsyncServerExchangeHandler get() {
756                 return new SingleLineResponseHandler("Hi there");
757             }
758 
759         });
760         final InetSocketAddress serverEndpoint = server.start();
761 
762         client.start();
763         final Future<ClientSessionEndpoint> connectFuture = client.connect(
764                 "localhost", serverEndpoint.getPort(), TIMEOUT);
765         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
766 
767         for (int i = 0; i < 5; i++) {
768             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
769                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
770                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
771             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
772             Assert.assertNotNull(result);
773             final HttpResponse response1 = result.getHead();
774             Assert.assertNotNull(response1);
775             Assert.assertEquals(200, response1.getCode());
776             Assert.assertNull(result.getBody());
777         }
778     }
779 
780     @Test
781     public void testSimpleHeadConnectionClose() throws Exception {
782         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
783 
784             @Override
785             public AsyncServerExchangeHandler get() {
786                 return new SingleLineResponseHandler("Hi there");
787             }
788 
789         });
790         final InetSocketAddress serverEndpoint = server.start();
791 
792         client.start();
793         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
794         for (int i = 0; i < 5; i++) {
795             final Future<ClientSessionEndpoint> connectFuture = client.connect(
796                     "localhost", serverEndpoint.getPort(), TIMEOUT);
797             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
798                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
799                         AsyncRequestBuilder.head(requestURI)
800                                 .addHeader(HttpHeaders.CONNECTION, "close")
801                                 .build(),
802                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
803                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
804                 Assert.assertNotNull(result);
805                 final HttpResponse response1 = result.getHead();
806                 Assert.assertNotNull(response1);
807                 Assert.assertEquals(200, response1.getCode());
808                 Assert.assertNull(result.getBody());
809             }
810         }
811     }
812 
813     @Test
814     public void testHeadPipelined() throws Exception {
815         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
816 
817             @Override
818             public AsyncServerExchangeHandler get() {
819                 return new SingleLineResponseHandler("Hi there");
820             }
821 
822         });
823         final InetSocketAddress serverEndpoint = server.start();
824 
825         client.start();
826         final Future<ClientSessionEndpoint> connectFuture = client.connect(
827                 "localhost", serverEndpoint.getPort(), TIMEOUT);
828         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
829 
830         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
831         for (int i = 0; i < 5; i++) {
832             queue.add(streamEndpoint.execute(
833                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
834                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
835         }
836         while (!queue.isEmpty()) {
837             final Future<Message<HttpResponse, String>> future = queue.remove();
838             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
839             Assert.assertNotNull(result);
840             final HttpResponse response1 = result.getHead();
841             Assert.assertNotNull(response1);
842             Assert.assertEquals(200, response1.getCode());
843             Assert.assertNull(result.getBody());
844         }
845     }
846 
847     @Test
848     public void testExpectationFailed() throws Exception {
849         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
850 
851             @Override
852             public AsyncServerExchangeHandler get() {
853                 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
854 
855                     @Override
856                     protected void handle(
857                             final Message<HttpRequest, String> request,
858                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
859                             final HttpContext context) throws IOException, HttpException {
860                         responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
861 
862                     }
863                 };
864             }
865 
866         });
867         final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
868 
869             @Override
870             public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
871 
872                 return new BasicAsyncServerExpectationDecorator(handler) {
873 
874                     @Override
875                     protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
876                         final Header h = request.getFirstHeader("password");
877                         if (h != null && "secret".equals(h.getValue())) {
878                             return null;
879                         } else {
880                             return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
881                         }
882                     }
883                 };
884 
885             }
886         }, Http1Config.DEFAULT);
887 
888         client.start();
889         final Future<IOSession> sessionFuture = client.requestSession(
890                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
891         final IOSession ioSession = sessionFuture.get();
892         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
893 
894         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
895         request1.addHeader("password", "secret");
896         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
897                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
898                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
899         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
900         Assert.assertNotNull(result1);
901         final HttpResponse response1 = result1.getHead();
902         Assert.assertNotNull(response1);
903         Assert.assertEquals(200, response1.getCode());
904         Assert.assertNotNull("All is well", result1.getBody());
905 
906         Assert.assertTrue(ioSession.isOpen());
907 
908         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
909         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
910                 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
911                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
912         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
913         Assert.assertNotNull(result2);
914         final HttpResponse response2 = result2.getHead();
915         Assert.assertNotNull(response2);
916         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
917         Assert.assertNotNull("You shall not pass", result2.getBody());
918 
919         Assert.assertTrue(ioSession.isOpen());
920 
921         final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
922         request3.addHeader("password", "secret");
923         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
924                 new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
925                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
926         final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
927         Assert.assertNotNull(result3);
928         final HttpResponse response3 = result3.getHead();
929         Assert.assertNotNull(response3);
930         Assert.assertEquals(200, response3.getCode());
931         Assert.assertNotNull("All is well", result3.getBody());
932 
933         Assert.assertTrue(ioSession.isOpen());
934 
935         final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
936         final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
937                 new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
938                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
939         final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
940         Assert.assertNotNull(result4);
941         final HttpResponse response4 = result4.getHead();
942         Assert.assertNotNull(response4);
943         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
944         Assert.assertNotNull("You shall not pass", result4.getBody());
945 
946         Assert.assertFalse(ioSession.isOpen());
947     }
948 
949     @Test
950     public void testExpectationFailedCloseConnection() throws Exception {
951         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
952 
953             @Override
954             public AsyncServerExchangeHandler get() {
955                 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
956 
957                     @Override
958                     protected void handle(
959                             final Message<HttpRequest, String> request,
960                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
961                             final HttpContext context) throws IOException, HttpException {
962                         responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
963 
964                     }
965                 };
966             }
967 
968         });
969         final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
970 
971             @Override
972             public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
973 
974                 return new BasicAsyncServerExpectationDecorator(handler) {
975 
976                     @Override
977                     protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
978                         final Header h = request.getFirstHeader("password");
979                         if (h != null && "secret".equals(h.getValue())) {
980                             return null;
981                         } else {
982                             final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
983                             response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
984                             return new BasicResponseProducer(response, "You shall not pass");
985                         }
986                     }
987                 };
988 
989             }
990         }, Http1Config.DEFAULT);
991 
992         client.start();
993         final Future<IOSession> sessionFuture = client.requestSession(
994                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
995         final IOSession ioSession = sessionFuture.get();
996         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
997 
998         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
999         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1000                 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1001                         new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1002                         100000,
1003                         ContentType.TEXT_PLAIN)),
1004                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1005         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1006         Assert.assertNotNull(result1);
1007         final HttpResponse response1 = result1.getHead();
1008         Assert.assertNotNull(response1);
1009         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1010         Assert.assertNotNull("You shall not pass", result1.getBody());
1011 
1012         Assert.assertFalse(streamEndpoint.isOpen());
1013     }
1014 
1015     @Test
1016     public void testDelayedExpectationVerification() throws Exception {
1017         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1018 
1019             @Override
1020             public AsyncServerExchangeHandler get() {
1021                 return new AsyncServerExchangeHandler() {
1022 
1023                     private final Random random = new Random(System.currentTimeMillis());
1024                     private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
1025                             "All is well");
1026 
1027                     @Override
1028                     public void handleRequest(
1029                             final HttpRequest request,
1030                             final EntityDetails entityDetails,
1031                             final ResponseChannel responseChannel,
1032                             final HttpContext context) throws HttpException, IOException {
1033 
1034                         Executors.newSingleThreadExecutor().execute(new Runnable() {
1035                             @Override
1036                             public void run() {
1037                                 try {
1038                                     if (entityDetails != null) {
1039                                         final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
1040                                         if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
1041                                             Thread.sleep(random.nextInt(1000));
1042                                             responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
1043                                         }
1044                                         final HttpResponse response = new BasicHttpResponse(200);
1045                                         synchronized (entityProducer) {
1046                                             responseChannel.sendResponse(response, entityProducer, context);
1047                                         }
1048                                     }
1049                                 } catch (final Exception ignore) {
1050                                     // ignore
1051                                 }
1052                             }
1053                         });
1054 
1055                     }
1056 
1057                     @Override
1058                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1059                         capacityChannel.update(Integer.MAX_VALUE);
1060                     }
1061 
1062                     @Override
1063                     public void consume(final ByteBuffer src) throws IOException {
1064                     }
1065 
1066                     @Override
1067                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1068                     }
1069 
1070                     @Override
1071                     public int available() {
1072                         synchronized (entityProducer) {
1073                             return entityProducer.available();
1074                         }
1075                     }
1076 
1077                     @Override
1078                     public void produce(final DataStreamChannel channel) throws IOException {
1079                         synchronized (entityProducer) {
1080                             entityProducer.produce(channel);
1081                         }
1082                     }
1083 
1084                     @Override
1085                     public void failed(final Exception cause) {
1086                     }
1087 
1088                     @Override
1089                     public void releaseResources() {
1090                     }
1091 
1092                 };
1093 
1094             }
1095         });
1096         final InetSocketAddress serverEndpoint = server.start();
1097 
1098         client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
1099         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1100                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1101         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1102 
1103         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
1104         for (int i = 0; i < 5; i++) {
1105             queue.add(streamEndpoint.execute(
1106                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
1107                             AsyncEntityProducers.create("Some important message")),
1108                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
1109         }
1110         while (!queue.isEmpty()) {
1111             final Future<Message<HttpResponse, String>> future = queue.remove();
1112             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1113             Assert.assertNotNull(result);
1114             final HttpResponse response = result.getHead();
1115             Assert.assertNotNull(response);
1116             Assert.assertEquals(200, response.getCode());
1117             Assert.assertNotNull("All is well", result.getBody());
1118         }
1119     }
1120 
1121     @Test
1122     public void testPrematureResponse() throws Exception {
1123         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1124 
1125             @Override
1126             public AsyncServerExchangeHandler get() {
1127                 return new AsyncServerExchangeHandler() {
1128 
1129                     private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
1130 
1131                     @Override
1132                     public void handleRequest(
1133                             final HttpRequest request,
1134                             final EntityDetails entityDetails,
1135                             final ResponseChannel responseChannel,
1136                             final HttpContext context) throws HttpException, IOException {
1137                         final AsyncResponseProducer producer;
1138                         final Header h = request.getFirstHeader("password");
1139                         if (h != null && "secret".equals(h.getValue())) {
1140                             producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1141                         } else {
1142                             producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1143                         }
1144                         responseProducer.set(producer);
1145                         producer.sendResponse(responseChannel, context);
1146                     }
1147 
1148                     @Override
1149                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1150                         capacityChannel.update(Integer.MAX_VALUE);
1151                     }
1152 
1153                     @Override
1154                     public void consume(final ByteBuffer src) throws IOException {
1155                     }
1156 
1157                     @Override
1158                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1159                     }
1160 
1161                     @Override
1162                     public int available() {
1163                         final AsyncResponseProducer producer = responseProducer.get();
1164                         return producer.available();
1165                     }
1166 
1167                     @Override
1168                     public void produce(final DataStreamChannel channel) throws IOException {
1169                         final AsyncResponseProducer producer = responseProducer.get();
1170                         producer.produce(channel);
1171                     }
1172 
1173                     @Override
1174                     public void failed(final Exception cause) {
1175                     }
1176 
1177                     @Override
1178                     public void releaseResources() {
1179                     }
1180                 };
1181             }
1182 
1183         });
1184         final InetSocketAddress serverEndpoint = server.start();
1185 
1186         client.start();
1187         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1188                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1189         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1190 
1191         for (int i = 0; i < 3; i++) {
1192             final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1193             final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1194                     new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1195                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1196             final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1197             Assert.assertNotNull(result1);
1198             final HttpResponse response1 = result1.getHead();
1199             Assert.assertNotNull(response1);
1200             Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1201             Assert.assertNotNull("You shall not pass", result1.getBody());
1202 
1203             Assert.assertTrue(streamEndpoint.isOpen());
1204         }
1205         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1206         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1207                 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1208                         new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1209                         100000,
1210                         ContentType.TEXT_PLAIN)),
1211                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1212         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1213         Assert.assertNotNull(result1);
1214         final HttpResponse response1 = result1.getHead();
1215         Assert.assertNotNull(response1);
1216         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1217         Assert.assertNotNull("You shall not pass", result1.getBody());
1218     }
1219 
1220     @Test
1221     public void testSlowResponseConsumer() throws Exception {
1222         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
1223 
1224             @Override
1225             public AsyncServerExchangeHandler get() {
1226                 return new MultiLineResponseHandler("0123456789abcd", 100);
1227             }
1228 
1229         });
1230         final InetSocketAddress serverEndpoint = server.start();
1231 
1232         client.start(Http1Config.custom().setBufferSize(256).build());
1233 
1234         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1235                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1236         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1237 
1238         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1239         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1240                 new BasicRequestProducer(request1, null),
1241                 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1242 
1243                     @Override
1244                     protected String consumeData(
1245                             final ContentType contentType, final InputStream inputStream) throws IOException {
1246                         Charset charset = contentType != null ? contentType.getCharset() : null;
1247                         if (charset == null) {
1248                             charset = StandardCharsets.US_ASCII;
1249                         }
1250 
1251                         final StringBuilder buffer = new StringBuilder();
1252                         try {
1253                             final byte[] tmp = new byte[16];
1254                             int l;
1255                             while ((l = inputStream.read(tmp)) != -1) {
1256                                 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1257                                 Thread.sleep(50);
1258                             }
1259                         } catch (final InterruptedException ex) {
1260                             Thread.currentThread().interrupt();
1261                             throw new InterruptedIOException(ex.getMessage());
1262                         }
1263                         return buffer.toString();
1264                     }
1265                 }),
1266                 null);
1267 
1268         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1269         Assert.assertNotNull(result1);
1270         final HttpResponse response1 = result1.getHead();
1271         Assert.assertNotNull(response1);
1272         Assert.assertEquals(200, response1.getCode());
1273         final String s1 = result1.getBody();
1274         Assert.assertNotNull(s1);
1275         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1276         while (t1.hasMoreTokens()) {
1277             Assert.assertEquals("0123456789abcd", t1.nextToken());
1278         }
1279     }
1280 
1281     @Test
1282     public void testSlowRequestProducer() throws Exception {
1283         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1284 
1285             @Override
1286             public AsyncServerExchangeHandler get() {
1287                 return new EchoHandler(2048);
1288             }
1289 
1290         });
1291         final InetSocketAddress serverEndpoint = server.start();
1292 
1293         client.start();
1294         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1295                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1296         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1297 
1298         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1299         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1300                 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1301 
1302                     @Override
1303                     protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1304                         Charset charset = contentType.getCharset();
1305                         if (charset == null) {
1306                             charset = StandardCharsets.US_ASCII;
1307                         }
1308                         try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1309                             for (int i = 0; i < 500; i++) {
1310                                 if (i % 100 == 0) {
1311                                     writer.flush();
1312                                     Thread.sleep(500);
1313                                 }
1314                                 writer.write("0123456789abcdef\r\n");
1315                             }
1316                         } catch (final InterruptedException ex) {
1317                             Thread.currentThread().interrupt();
1318                             throw new InterruptedIOException(ex.getMessage());
1319                         }
1320                     }
1321 
1322                 }),
1323                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1324         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1325         Assert.assertNotNull(result1);
1326         final HttpResponse response1 = result1.getHead();
1327         Assert.assertNotNull(response1);
1328         Assert.assertEquals(200, response1.getCode());
1329         final String s1 = result1.getBody();
1330         Assert.assertNotNull(s1);
1331         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1332         while (t1.hasMoreTokens()) {
1333             Assert.assertEquals("0123456789abcdef", t1.nextToken());
1334         }
1335     }
1336 
1337     @Test
1338     public void testSlowResponseProducer() throws Exception {
1339         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1340 
1341             @Override
1342             public AsyncServerExchangeHandler get() {
1343                 return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1344 
1345                     @Override
1346                     protected void handle(
1347                             final HttpRequest request,
1348                             final InputStream requestStream,
1349                             final HttpResponse response,
1350                             final OutputStream responseStream,
1351                             final HttpContext context) throws IOException, HttpException {
1352 
1353                         if (!"/hello".equals(request.getPath())) {
1354                             response.setCode(HttpStatus.SC_NOT_FOUND);
1355                             return;
1356                         }
1357                         if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1358                             response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1359                             return;
1360                         }
1361                         if (requestStream == null) {
1362                             return;
1363                         }
1364                         final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1365                         final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1366                         Charset charset = contentType != null ? contentType.getCharset() : null;
1367                         if (charset == null) {
1368                             charset = StandardCharsets.US_ASCII;
1369                         }
1370                         response.setCode(HttpStatus.SC_OK);
1371                         response.setHeader(h1);
1372                         try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1373                              final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1374                             try {
1375                                 String l;
1376                                 int count = 0;
1377                                 while ((l = reader.readLine()) != null) {
1378                                     writer.write(l);
1379                                     writer.write("\r\n");
1380                                     count++;
1381                                     if (count % 500 == 0) {
1382                                         Thread.sleep(500);
1383                                     }
1384                                 }
1385                                 writer.flush();
1386                             } catch (final InterruptedException ex) {
1387                                 Thread.currentThread().interrupt();
1388                                 throw new InterruptedIOException(ex.getMessage());
1389                             }
1390                         }
1391                     }
1392                 };
1393             }
1394 
1395         });
1396         final InetSocketAddress serverEndpoint = server.start();
1397 
1398         client.start(Http1Config.custom().setBufferSize(256).build());
1399 
1400         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1401                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1402         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1403 
1404         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1405         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1406                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1407                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1408         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1409         Assert.assertNotNull(result1);
1410         final HttpResponse response1 = result1.getHead();
1411         Assert.assertNotNull(response1);
1412         Assert.assertEquals(200, response1.getCode());
1413         final String s1 = result1.getBody();
1414         Assert.assertNotNull(s1);
1415         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1416         while (t1.hasMoreTokens()) {
1417             Assert.assertEquals("0123456789abcd", t1.nextToken());
1418         }
1419     }
1420 
1421     @Test
1422     public void testPipelinedConnectionClose() throws Exception {
1423         server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1424 
1425             @Override
1426             public AsyncServerExchangeHandler get() {
1427                 return new SingleLineResponseHandler("Hi back");
1428             }
1429 
1430         });
1431         final InetSocketAddress serverEndpoint = server.start();
1432 
1433         client.start();
1434         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1435                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1436         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1437 
1438         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1439                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1440                         AsyncEntityProducers.create("Hi there")),
1441                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1442         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1443         request2.addHeader(HttpHeaders.CONNECTION, "close");
1444         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1445                 new BasicRequestProducer(request2,
1446                         AsyncEntityProducers.create("Hi there")),
1447                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1448         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1449                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1450                         AsyncEntityProducers.create("Hi there")),
1451                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1452 
1453         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1454         Assert.assertNotNull(result1);
1455         final HttpResponse response1 = result1.getHead();
1456         final String entity1 = result1.getBody();
1457         Assert.assertNotNull(response1);
1458         Assert.assertEquals(200, response1.getCode());
1459         Assert.assertEquals("Hi back", entity1);
1460 
1461         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1462         Assert.assertNotNull(result2);
1463         final HttpResponse response2 = result2.getHead();
1464         final String entity2 = result2.getBody();
1465         Assert.assertNotNull(response2);
1466         Assert.assertEquals(200, response2.getCode());
1467         Assert.assertEquals("Hi back", entity2);
1468 
1469         try {
1470             future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1471             Assert.fail("ExecutionException expected");
1472         } catch (final CancellationException | ExecutionException ignore) {
1473         }
1474 
1475         final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1476                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1477                         AsyncEntityProducers.create("Hi there")),
1478                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1479         try {
1480             future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1481             Assert.fail("CancellationException or ExecutionException expected");
1482         } catch (final CancellationException ignore) {
1483             Assert.assertTrue(future4.isCancelled());
1484         } catch (final ExecutionException ignore) {
1485         }
1486     }
1487 
1488     @Test
1489     public void testPipelinedInvalidRequest() throws Exception {
1490         server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1491 
1492             @Override
1493             public AsyncServerExchangeHandler get() {
1494                 return new SingleLineResponseHandler("Hi back");
1495             }
1496 
1497         });
1498         final InetSocketAddress serverEndpoint = server.start();
1499 
1500         client.start();
1501         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1502                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1503         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1504 
1505         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1506                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1507                         AsyncEntityProducers.create("Hi there")),
1508                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1509         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1510         request2.addHeader(HttpHeaders.HOST, "blah:blah");
1511         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1512                 new BasicRequestProducer(request2,
1513                         AsyncEntityProducers.create("Hi there")),
1514                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1515         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1516                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1517                         AsyncEntityProducers.create("Hi there")),
1518                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1519 
1520         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1521         Assert.assertNotNull(result1);
1522         final HttpResponse response1 = result1.getHead();
1523         final String entity1 = result1.getBody();
1524         Assert.assertNotNull(response1);
1525         Assert.assertEquals(200, response1.getCode());
1526         Assert.assertEquals("Hi back", entity1);
1527 
1528         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1529         Assert.assertNotNull(result2);
1530         final HttpResponse response2 = result2.getHead();
1531         final String entity2 = result2.getBody();
1532         Assert.assertNotNull(response2);
1533         Assert.assertEquals(400, response2.getCode());
1534         Assert.assertTrue(entity2.length() > 0);
1535 
1536         try {
1537             future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1538             Assert.fail("ExecutionException expected");
1539         } catch (final CancellationException | ExecutionException ignore) {
1540         }
1541     }
1542 
1543     private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1544 
1545     private static class BrokenChunkEncoder extends AbstractContentEncoder {
1546 
1547         private final CharArrayBuffer lineBuffer;
1548         private boolean done;
1549 
1550         BrokenChunkEncoder(
1551                 final WritableByteChannel channel,
1552                 final SessionOutputBuffer buffer,
1553                 final BasicHttpTransportMetrics metrics) {
1554             super(channel, buffer, metrics);
1555             lineBuffer = new CharArrayBuffer(16);
1556         }
1557 
1558         @Override
1559         public void complete(final List<? extends Header> trailers) throws IOException {
1560             super.complete(trailers);
1561         }
1562 
1563         @Override
1564         public int write(final ByteBuffer src) throws IOException {
1565             final int chunk;
1566             if (!done) {
1567                 lineBuffer.clear();
1568                 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1569                 buffer().writeLine(lineBuffer);
1570                 buffer().write(ByteBuffer.wrap(GARBAGE));
1571                 done = true;
1572                 chunk = GARBAGE.length;
1573             } else {
1574                 chunk = 0;
1575             }
1576             final long bytesWritten = buffer().flush(channel());
1577             if (bytesWritten > 0) {
1578                 metrics().incrementBytesTransferred(bytesWritten);
1579             }
1580             if (!buffer().hasData()) {
1581                 channel().close();
1582             }
1583             return chunk;
1584         }
1585 
1586     }
1587 
1588     @Test
1589     public void testTruncatedChunk() throws Exception {
1590         final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1591                 HttpProcessors.server(),
1592                 new HandlerFactory<AsyncServerExchangeHandler>() {
1593 
1594                     @Override
1595                     public AsyncServerExchangeHandler create(
1596                             final HttpRequest request,
1597                             final HttpContext context) throws HttpException {
1598                         return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1599 
1600                             @Override
1601                             protected void handle(
1602                                     final Message<HttpRequest, String> request,
1603                                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1604                                     final HttpContext context) throws IOException, HttpException {
1605                                 responseTrigger.submitResponse(
1606                                         new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1607                             }
1608 
1609                         };
1610                     }
1611 
1612                 },
1613                 Http1Config.DEFAULT,
1614                 CharCodingConfig.DEFAULT,
1615                 DefaultConnectionReuseStrategy.INSTANCE,
1616                 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1617 
1618             @Override
1619             protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1620                     final ProtocolIOSession ioSession,
1621                     final HttpProcessor httpProcessor,
1622                     final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1623                     final Http1Config http1Config,
1624                     final CharCodingConfig connectionConfig,
1625                     final ConnectionReuseStrategy connectionReuseStrategy,
1626                     final NHttpMessageParser<HttpRequest> incomingMessageParser,
1627                     final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1628                     final ContentLengthStrategy incomingContentStrategy,
1629                     final ContentLengthStrategy outgoingContentStrategy,
1630                     final Http1StreamListener streamListener) {
1631                 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1632                         scheme.id,
1633                         http1Config, connectionConfig, connectionReuseStrategy,
1634                         incomingMessageParser, outgoingMessageWriter,
1635                         incomingContentStrategy, outgoingContentStrategy,
1636                         streamListener) {
1637 
1638                     @Override
1639                     protected ContentEncoder createContentEncoder(
1640                             final long len,
1641                             final WritableByteChannel channel,
1642                             final SessionOutputBuffer buffer,
1643                             final BasicHttpTransportMetrics metrics) throws HttpException {
1644                         if (len == ContentLengthStrategy.CHUNKED) {
1645                             return new BrokenChunkEncoder(channel, buffer, metrics);
1646                         } else {
1647                             return super.createContentEncoder(len, channel, buffer, metrics);
1648                         }
1649                     }
1650 
1651                 };
1652             }
1653 
1654         });
1655 
1656         client.start();
1657         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1658                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1659         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1660 
1661         final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1662         final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1663 
1664             @Override
1665             public void releaseResources() {
1666                 // Do not clear internal content buffer
1667             }
1668 
1669         };
1670         final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1671         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1672         try {
1673             future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1674             Assert.fail("ExecutionException should have been thrown");
1675         } catch (final ExecutionException ex) {
1676             final Throwable cause = ex.getCause();
1677             Assert.assertTrue(cause instanceof MalformedChunkCodingException);
1678             Assert.assertEquals("garbage", entityConsumer.generateContent());
1679         }
1680     }
1681 
1682     @Test
1683     public void testExceptionInHandler() throws Exception {
1684         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1685 
1686             @Override
1687             public AsyncServerExchangeHandler get() {
1688                 return new SingleLineResponseHandler("Hi there") {
1689 
1690                     @Override
1691                     protected void handle(
1692                             final Message<HttpRequest, String> request,
1693                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1694                             final HttpContext context) throws IOException, HttpException {
1695                         throw new HttpException("Boom");
1696                     }
1697                 };
1698             }
1699 
1700         });
1701         final InetSocketAddress serverEndpoint = server.start();
1702 
1703         client.start();
1704         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1705                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1706         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1707 
1708         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1709                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1710                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1711         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1712         Assert.assertNotNull(result);
1713         final HttpResponse response1 = result.getHead();
1714         final String entity1 = result.getBody();
1715         Assert.assertNotNull(response1);
1716         Assert.assertEquals(500, response1.getCode());
1717         Assert.assertEquals("Boom", entity1);
1718     }
1719 
1720     @Test
1721     public void testNoServiceHandler() throws Exception {
1722         final InetSocketAddress serverEndpoint = server.start();
1723 
1724         client.start();
1725         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1726                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1727         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1728 
1729         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1730                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1731                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1732         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1733         Assert.assertNotNull(result);
1734         final HttpResponse response1 = result.getHead();
1735         final String entity1 = result.getBody();
1736         Assert.assertNotNull(response1);
1737         Assert.assertEquals(404, response1.getCode());
1738         Assert.assertEquals("Resource not found", entity1);
1739     }
1740 
1741     @Test
1742     public void testResponseNoContent() throws Exception {
1743         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1744 
1745             @Override
1746             public AsyncServerExchangeHandler get() {
1747                 return new SingleLineResponseHandler("Hi there") {
1748 
1749                     @Override
1750                     protected void handle(
1751                             final Message<HttpRequest, String> request,
1752                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1753                             final HttpContext context) throws IOException, HttpException {
1754                         final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1755                         responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1756                     }
1757                 };
1758             }
1759 
1760         });
1761         final InetSocketAddress serverEndpoint = server.start();
1762 
1763         client.start();
1764         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1765                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1766         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1767 
1768         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1769                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1770                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1771         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1772         Assert.assertNotNull(result);
1773         final HttpResponse response1 = result.getHead();
1774         Assert.assertNotNull(response1);
1775         Assert.assertEquals(204, response1.getCode());
1776         Assert.assertNull(result.getBody());
1777     }
1778 
1779     @Test
1780     public void testAbsentHostHeader() throws Exception {
1781         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1782 
1783             @Override
1784             public AsyncServerExchangeHandler get() {
1785                 return new SingleLineResponseHandler("Hi there");
1786             }
1787 
1788         });
1789         final InetSocketAddress serverEndpoint = server.start();
1790 
1791         client.start(new DefaultHttpProcessor(new RequestContent(), new RequestConnControl()), Http1Config.DEFAULT);
1792 
1793         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1794                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1795         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1796 
1797         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1798         request1.setVersion(HttpVersion.HTTP_1_0);
1799         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1800                 new BasicRequestProducer(request1, null),
1801                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1802         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1803         Assert.assertNotNull(result1);
1804         final HttpResponse response1 = result1.getHead();
1805         Assert.assertNotNull(response1);
1806         Assert.assertEquals(200, response1.getCode());
1807         Assert.assertEquals("Hi there", result1.getBody());
1808 
1809         final HttpRequest request2 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1810         request2.setVersion(HttpVersion.HTTP_1_1);
1811         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1812                 new BasicRequestProducer(request2, null),
1813                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1814         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1815         Assert.assertNotNull(result2);
1816         final HttpResponse response2 = result2.getHead();
1817         Assert.assertNotNull(response2);
1818         Assert.assertEquals(400, response2.getCode());
1819         Assert.assertEquals("Host header is absent", result2.getBody());
1820     }
1821 
1822     @Test
1823     public void testMessageWithTrailers() throws Exception {
1824         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1825 
1826             @Override
1827             public AsyncServerExchangeHandler get() {
1828                 return new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1829 
1830                     @Override
1831                     protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1832                             final HttpRequest request,
1833                             final EntityDetails entityDetails,
1834                             final HttpContext context) throws HttpException {
1835                         return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1836                     }
1837 
1838                     @Override
1839                     protected void handle(
1840                             final Message<HttpRequest, String> requestMessage,
1841                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1842                             final HttpContext context) throws HttpException, IOException {
1843                         responseTrigger.submitResponse(new BasicResponseProducer(
1844                                 HttpStatus.SC_OK,
1845                                 new DigestingEntityProducer("MD5",
1846                                         new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1847                     }
1848                 };
1849             }
1850 
1851         });
1852         final InetSocketAddress serverEndpoint = server.start();
1853 
1854         client.start();
1855 
1856         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1857                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1858         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1859 
1860         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1861         final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1862         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1863                 new BasicRequestProducer(request1, null),
1864                 new BasicResponseConsumer<>(entityConsumer), null);
1865         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1866         Assert.assertNotNull(result1);
1867         final HttpResponse response1 = result1.getHead();
1868         Assert.assertNotNull(response1);
1869         Assert.assertEquals(200, response1.getCode());
1870         Assert.assertEquals("Hello back with some trailers", result1.getBody());
1871 
1872         final List<Header> trailers = entityConsumer.getTrailers();
1873         Assert.assertNotNull(trailers);
1874         Assert.assertEquals(2, trailers.size());
1875         final Map<String, String> map = new HashMap<>();
1876         for (final Header header: trailers) {
1877             map.put(header.getName().toLowerCase(Locale.ROOT), header.getValue());
1878         }
1879         final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1880         Assert.assertEquals("MD5", map.get("digest-algo"));
1881         Assert.assertEquals(digest, map.get("digest"));
1882     }
1883 
1884     @Test
1885     public void testProtocolException() throws Exception {
1886         server.register("/boom", new Supplier<AsyncServerExchangeHandler>() {
1887 
1888             @Override
1889             public AsyncServerExchangeHandler get() {
1890                 return new AsyncServerExchangeHandler() {
1891 
1892                     private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1893 
1894                     @Override
1895                     public void releaseResources() {
1896                         entityProducer.releaseResources();
1897                     }
1898 
1899                     @Override
1900                     public void handleRequest(
1901                             final HttpRequest request,
1902                             final EntityDetails entityDetails,
1903                             final ResponseChannel responseChannel,
1904                             final HttpContext context) throws HttpException, IOException {
1905                         final String requestUri = request.getRequestUri();
1906                         if (requestUri.endsWith("boom")) {
1907                             throw new ProtocolException("Boom!!!");
1908                         }
1909                         responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1910                     }
1911 
1912                     @Override
1913                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1914                         capacityChannel.update(Integer.MAX_VALUE);
1915                     }
1916 
1917                     @Override
1918                     public void consume(final ByteBuffer src) throws IOException {
1919                     }
1920 
1921                     @Override
1922                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1923                         // empty
1924                     }
1925 
1926                     @Override
1927                     public int available() {
1928                         return entityProducer.available();
1929                     }
1930 
1931                     @Override
1932                     public void produce(final DataStreamChannel channel) throws IOException {
1933                         entityProducer.produce(channel);
1934                     }
1935 
1936                     @Override
1937                     public void failed(final Exception cause) {
1938                         releaseResources();
1939                     }
1940 
1941                 };
1942             }
1943 
1944         });
1945 
1946         final InetSocketAddress serverEndpoint = server.start();
1947 
1948         client.start();
1949         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1950                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1951         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1952         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1953                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1954                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1955         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1956         Assert.assertNotNull(result);
1957         final HttpResponse response1 = result.getHead();
1958         final String entity1 = result.getBody();
1959         Assert.assertNotNull(response1);
1960         Assert.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1961         Assert.assertEquals("Boom!!!", entity1);
1962     }
1963 
1964     @Test
1965     public void testHeaderTooLarge() throws Exception {
1966         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1967 
1968             @Override
1969             public AsyncServerExchangeHandler get() {
1970                 return new SingleLineResponseHandler("Hi there");
1971             }
1972 
1973         });
1974         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1975                 .setMaxLineLength(100)
1976                 .build());
1977         client.start();
1978 
1979         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1980                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1981         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1982 
1983         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1984         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1985                 "1234567890123456789012345678901234567890");
1986         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1987                 new BasicRequestProducer(request1, null),
1988                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1989         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1990         Assert.assertNotNull(result1);
1991         final HttpResponse response1 = result1.getHead();
1992         Assert.assertNotNull(response1);
1993         Assert.assertEquals(431, response1.getCode());
1994         Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
1995     }
1996 
1997     @Test
1998     public void testHeaderTooLargePost() throws Exception {
1999         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
2000 
2001             @Override
2002             public AsyncServerExchangeHandler get() {
2003                 return new SingleLineResponseHandler("Hi there");
2004             }
2005 
2006         });
2007         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
2008                 .setMaxLineLength(100)
2009                 .build());
2010         client.start(
2011                 new DefaultHttpProcessor(new RequestContent(), new RequestTargetHost(), new RequestConnControl()), null);
2012 
2013         final Future<ClientSessionEndpoint> connectFuture = client.connect(
2014                 "localhost", serverEndpoint.getPort(), TIMEOUT);
2015         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
2016 
2017         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
2018         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
2019                 "1234567890123456789012345678901234567890");
2020 
2021         final byte[] b = new byte[2048];
2022         for (int i = 0; i < b.length; i++) {
2023             b[i] = (byte) ('a' + i % 10);
2024         }
2025 
2026         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
2027                 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
2028                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
2029         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
2030         Assert.assertNotNull(result1);
2031         final HttpResponse response1 = result1.getHead();
2032         Assert.assertNotNull(response1);
2033         Assert.assertEquals(431, response1.getCode());
2034         Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
2035     }
2036 
2037 }