View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import java.io.BufferedReader;
31  import java.io.BufferedWriter;
32  import java.io.IOException;
33  import java.io.InputStream;
34  import java.io.InputStreamReader;
35  import java.io.InterruptedIOException;
36  import java.io.OutputStream;
37  import java.io.OutputStreamWriter;
38  import java.net.InetSocketAddress;
39  import java.net.URI;
40  import java.net.URISyntaxException;
41  import java.nio.ByteBuffer;
42  import java.nio.charset.Charset;
43  import java.nio.charset.StandardCharsets;
44  import java.util.Arrays;
45  import java.util.Collection;
46  import java.util.HashMap;
47  import java.util.LinkedList;
48  import java.util.List;
49  import java.util.Locale;
50  import java.util.Map;
51  import java.util.Queue;
52  import java.util.StringTokenizer;
53  import java.util.concurrent.BlockingQueue;
54  import java.util.concurrent.CountDownLatch;
55  import java.util.concurrent.ExecutionException;
56  import java.util.concurrent.Executors;
57  import java.util.concurrent.Future;
58  import java.util.concurrent.LinkedBlockingDeque;
59  import java.util.concurrent.TimeUnit;
60  import java.util.concurrent.atomic.AtomicInteger;
61  import java.util.concurrent.atomic.AtomicReference;
62  
63  import org.apache.hc.core5.function.Callback;
64  import org.apache.hc.core5.function.Decorator;
65  import org.apache.hc.core5.function.Supplier;
66  import org.apache.hc.core5.http.ContentType;
67  import org.apache.hc.core5.http.EndpointDetails;
68  import org.apache.hc.core5.http.EntityDetails;
69  import org.apache.hc.core5.http.Header;
70  import org.apache.hc.core5.http.HeaderElements;
71  import org.apache.hc.core5.http.HttpException;
72  import org.apache.hc.core5.http.HttpHeaders;
73  import org.apache.hc.core5.http.HttpHost;
74  import org.apache.hc.core5.http.HttpRequest;
75  import org.apache.hc.core5.http.HttpResponse;
76  import org.apache.hc.core5.http.HttpStatus;
77  import org.apache.hc.core5.http.Message;
78  import org.apache.hc.core5.http.Method;
79  import org.apache.hc.core5.http.ProtocolException;
80  import org.apache.hc.core5.http.URIScheme;
81  import org.apache.hc.core5.http.message.BasicHttpRequest;
82  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
83  import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
84  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
85  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
86  import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
87  import org.apache.hc.core5.http.nio.CapacityChannel;
88  import org.apache.hc.core5.http.nio.DataStreamChannel;
89  import org.apache.hc.core5.http.nio.HandlerFactory;
90  import org.apache.hc.core5.http.nio.ResponseChannel;
91  import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
92  import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
93  import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
94  import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
95  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
96  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
97  import org.apache.hc.core5.http.nio.support.AbstractAsyncPushHandler;
98  import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
99  import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
100 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
101 import org.apache.hc.core5.http.nio.support.BasicPushProducer;
102 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
103 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
104 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
105 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
106 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
107 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
108 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
109 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
110 import org.apache.hc.core5.http.protocol.HttpContext;
111 import org.apache.hc.core5.http.protocol.HttpCoreContext;
112 import org.apache.hc.core5.http2.H2Error;
113 import org.apache.hc.core5.http2.H2StreamResetException;
114 import org.apache.hc.core5.http2.config.H2Config;
115 import org.apache.hc.core5.http2.nio.command.PingCommand;
116 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
117 import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
118 import org.apache.hc.core5.http2.protocol.H2RequestContent;
119 import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
120 import org.apache.hc.core5.reactor.Command;
121 import org.apache.hc.core5.reactor.IOReactorConfig;
122 import org.apache.hc.core5.reactor.IOSession;
123 import org.apache.hc.core5.testing.SSLTestContexts;
124 import org.apache.hc.core5.util.TextUtils;
125 import org.apache.hc.core5.util.TimeValue;
126 import org.apache.hc.core5.util.Timeout;
127 import org.hamcrest.CoreMatchers;
128 import org.junit.After;
129 import org.junit.Assert;
130 import org.junit.Before;
131 import org.junit.Test;
132 import org.junit.runner.RunWith;
133 import org.junit.runners.Parameterized;
134 import org.slf4j.Logger;
135 import org.slf4j.LoggerFactory;
136 
137 @RunWith(Parameterized.class)
138 public class H2IntegrationTest extends InternalH2ServerTestBase {
139 
140     private final Logger log = LoggerFactory.getLogger(getClass());
141 
142     @Parameterized.Parameters(name = "{0}")
143     public static Collection<Object[]> protocols() {
144         return Arrays.asList(new Object[][]{
145                 { URIScheme.HTTP },
146                 { URIScheme.HTTPS }
147         });
148     }
149 
150     public H2IntegrationTest(final URIScheme scheme) {
151         super(scheme);
152     }
153 
154     private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
155     private static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
156 
157     private H2TestClient client;
158 
159     @Before
160     public void setup() throws Exception {
161         log.debug("Starting up test client");
162         client = new H2TestClient(buildReactorConfig(),
163                 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null, null, null);
164     }
165 
166     protected IOReactorConfig buildReactorConfig() {
167         return IOReactorConfig.DEFAULT;
168     }
169 
170     @After
171     public void cleanup() throws Exception {
172         log.debug("Shutting down test client");
173         if (client != null) {
174             client.shutdown(TimeValue.ofSeconds(5));
175         }
176     }
177 
178     private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
179         try {
180             return new URI("http", null, "localhost", serverEndpoint.getPort(), path, null, null);
181         } catch (final URISyntaxException e) {
182             throw new IllegalStateException();
183         }
184     }
185 
186     @Test
187     public void testSimpleGet() throws Exception {
188         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
189 
190             @Override
191             public AsyncServerExchangeHandler get() {
192                 return new SingleLineResponseHandler("Hi there");
193             }
194 
195         });
196         final InetSocketAddress serverEndpoint = server.start();
197 
198         client.start();
199         final Future<ClientSessionEndpoint> connectFuture = client.connect(
200                 "localhost", serverEndpoint.getPort(), TIMEOUT);
201         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
202 
203         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
204         for (int i = 0; i < 10; i++) {
205             queue.add(streamEndpoint.execute(
206                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
207                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
208 
209         }
210         while (!queue.isEmpty()) {
211             final Future<Message<HttpResponse, String>> future = queue.remove();
212             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
213             Assert.assertNotNull(result);
214             final HttpResponse response = result.getHead();
215             final String entity = result.getBody();
216             Assert.assertNotNull(response);
217             Assert.assertEquals(200, response.getCode());
218             Assert.assertEquals("Hi there", entity);
219         }
220     }
221 
222     @Test
223     public void testSimpleHead() throws Exception {
224         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
225 
226             @Override
227             public AsyncServerExchangeHandler get() {
228                 return new SingleLineResponseHandler("Hi there");
229             }
230 
231         });
232         final InetSocketAddress serverEndpoint = server.start();
233 
234         client.start();
235         final Future<ClientSessionEndpoint> connectFuture = client.connect(
236                 "localhost", serverEndpoint.getPort(), TIMEOUT);
237         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
238 
239         for (int i = 0; i < 5; i++) {
240             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
241                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
242                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
243             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
244             Assert.assertNotNull(result);
245             final HttpResponse response1 = result.getHead();
246             Assert.assertNotNull(response1);
247             Assert.assertEquals(200, response1.getCode());
248             Assert.assertNull(result.getBody());
249         }
250     }
251 
252     @Test
253     public void testLargeGet() throws Exception {
254         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
255 
256             @Override
257             public AsyncServerExchangeHandler get() {
258                 return new MultiLineResponseHandler("0123456789abcdef", 5000);
259             }
260 
261         });
262         final InetSocketAddress serverEndpoint = server.start();
263 
264         client.start();
265         final Future<ClientSessionEndpoint> connectFuture = client.connect(
266                 "localhost", serverEndpoint.getPort(), TIMEOUT);
267         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
268 
269         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
270                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
271                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
272 
273         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
274                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
275                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
276 
277         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
278         Assert.assertNotNull(result1);
279         final HttpResponse response1 = result1.getHead();
280         Assert.assertNotNull(response1);
281         Assert.assertEquals(200, response1.getCode());
282         final String s1 = result1.getBody();
283         Assert.assertNotNull(s1);
284         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
285         while (t1.hasMoreTokens()) {
286             Assert.assertEquals("0123456789abcdef", t1.nextToken());
287         }
288 
289         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
290         Assert.assertNotNull(result2);
291         final HttpResponse response2 = result2.getHead();
292         Assert.assertNotNull(response2);
293         Assert.assertEquals(200, response2.getCode());
294         final String s2 = result2.getBody();
295         Assert.assertNotNull(s2);
296         final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
297         while (t2.hasMoreTokens()) {
298             Assert.assertEquals("0123456789abcdef", t2.nextToken());
299         }
300     }
301 
302     @Test
303     public void testBasicPost() throws Exception {
304         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
305 
306             @Override
307             public AsyncServerExchangeHandler get() {
308                 return new SingleLineResponseHandler("Hi back");
309             }
310 
311         });
312         final InetSocketAddress serverEndpoint = server.start();
313 
314         client.start();
315         final Future<ClientSessionEndpoint> connectFuture = client.connect(
316                 "localhost", serverEndpoint.getPort(), TIMEOUT);
317         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
318 
319         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
320         for (int i = 0; i < 10; i++) {
321             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
322             queue.add(streamEndpoint.execute(
323                     new BasicRequestProducer(request, new StringAsyncEntityProducer("Hi there", ContentType.TEXT_PLAIN)),
324                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
325 
326         }
327         while (!queue.isEmpty()) {
328             final Future<Message<HttpResponse, String>> future = queue.remove();
329             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
330             Assert.assertNotNull(result);
331             final HttpResponse response = result.getHead();
332             final String entity1 = result.getBody();
333             Assert.assertNotNull(response);
334             Assert.assertEquals(200, response.getCode());
335             Assert.assertEquals("Hi back", entity1);
336         }
337     }
338 
339     @Test
340     public void testLargePost() throws Exception {
341         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
342 
343             @Override
344             public AsyncServerExchangeHandler get() {
345                 return new EchoHandler(2048);
346             }
347 
348         });
349         final InetSocketAddress serverEndpoint = server.start();
350 
351         client.start();
352         final Future<ClientSessionEndpoint> connectFuture = client.connect(
353                 "localhost", serverEndpoint.getPort(), TIMEOUT);
354         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
355 
356         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
357                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
358                         new MultiLineEntityProducer("0123456789abcdef", 5000)),
359                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
360         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
361         Assert.assertNotNull(result1);
362         final HttpResponse response1 = result1.getHead();
363         Assert.assertNotNull(response1);
364         Assert.assertEquals(200, response1.getCode());
365         final String s1 = result1.getBody();
366         Assert.assertNotNull(s1);
367         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
368         while (t1.hasMoreTokens()) {
369             Assert.assertEquals("0123456789abcdef", t1.nextToken());
370         }
371     }
372 
373     @Test
374     public void testSlowResponseConsumer() throws Exception {
375         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
376 
377             @Override
378             public AsyncServerExchangeHandler get() {
379                 return new MultiLineResponseHandler("0123456789abcd", 3);
380             }
381 
382         });
383         final InetSocketAddress serverEndpoint = server.start();
384 
385         client.start(H2Config.custom().setInitialWindowSize(16).build());
386         final Future<ClientSessionEndpoint> connectFuture = client.connect(
387                 "localhost", serverEndpoint.getPort(), TIMEOUT);
388         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
389 
390         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
391                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
392                 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
393 
394                     @Override
395                     protected String consumeData(
396                             final ContentType contentType, final InputStream inputStream) throws IOException {
397                         Charset charset = contentType != null ? contentType.getCharset() : null;
398                         if (charset == null) {
399                             charset = StandardCharsets.US_ASCII;
400                         }
401 
402                         final StringBuilder buffer = new StringBuilder();
403                         try {
404                             final byte[] tmp = new byte[16];
405                             int l;
406                             while ((l = inputStream.read(tmp)) != -1) {
407                                 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
408                                 Thread.sleep(500);
409                             }
410                         } catch (final InterruptedException ex) {
411                             Thread.currentThread().interrupt();
412                             throw new InterruptedIOException(ex.getMessage());
413                         }
414                         return buffer.toString();
415                     }
416                 }),
417                 null);
418 
419         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
420         Assert.assertNotNull(result1);
421         final HttpResponse response1 = result1.getHead();
422         Assert.assertNotNull(response1);
423         Assert.assertEquals(200, response1.getCode());
424         final String s1 = result1.getBody();
425         Assert.assertNotNull(s1);
426         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
427         while (t1.hasMoreTokens()) {
428             Assert.assertEquals("0123456789abcd", t1.nextToken());
429         }
430     }
431 
432     @Test
433     public void testSlowRequestProducer() throws Exception {
434         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
435 
436             @Override
437             public AsyncServerExchangeHandler get() {
438                 return new EchoHandler(2048);
439             }
440 
441         });
442         final InetSocketAddress serverEndpoint = server.start();
443 
444         client.start();
445         final Future<ClientSessionEndpoint> connectFuture = client.connect(
446                 "localhost", serverEndpoint.getPort(), TIMEOUT);
447         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
448 
449         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
450         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
451                 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
452 
453                     @Override
454                     protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
455                         Charset charset = contentType.getCharset();
456                         if (charset == null) {
457                             charset = StandardCharsets.US_ASCII;
458                         }
459                         try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
460                             for (int i = 0; i < 500; i++) {
461                                 if (i % 100 == 0) {
462                                     writer.flush();
463                                     Thread.sleep(500);
464                                 }
465                                 writer.write("0123456789abcdef\r\n");
466                             }
467                         } catch (final InterruptedException ex) {
468                             Thread.currentThread().interrupt();
469                             throw new InterruptedIOException(ex.getMessage());
470                         }
471                     }
472 
473                 }),
474                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
475         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
476         Assert.assertNotNull(result1);
477         final HttpResponse response1 = result1.getHead();
478         Assert.assertNotNull(response1);
479         Assert.assertEquals(200, response1.getCode());
480         final String s1 = result1.getBody();
481         Assert.assertNotNull(s1);
482         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
483         while (t1.hasMoreTokens()) {
484             Assert.assertEquals("0123456789abcdef", t1.nextToken());
485         }
486     }
487 
488     @Test
489     public void testSlowResponseProducer() throws Exception {
490         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
491 
492             @Override
493             public AsyncServerExchangeHandler get() {
494                 return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
495 
496                     @Override
497                     protected void handle(
498                             final HttpRequest request,
499                             final InputStream requestStream,
500                             final HttpResponse response,
501                             final OutputStream responseStream,
502                             final HttpContext context) throws IOException, HttpException {
503 
504                         if (!"/hello".equals(request.getPath())) {
505                             response.setCode(HttpStatus.SC_NOT_FOUND);
506                             return;
507                         }
508                         if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
509                             response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
510                             return;
511                         }
512                         if (requestStream == null) {
513                             return;
514                         }
515                         final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
516                         final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
517                         Charset charset = contentType != null ? contentType.getCharset() : null;
518                         if (charset == null) {
519                             charset = StandardCharsets.US_ASCII;
520                         }
521                         response.setCode(HttpStatus.SC_OK);
522                         response.setHeader(h1);
523                         try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
524                             final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
525                             try {
526                                 String l;
527                                 int count = 0;
528                                 while ((l = reader.readLine()) != null) {
529                                     writer.write(l);
530                                     writer.write("\r\n");
531                                     count++;
532                                     if (count % 500 == 0) {
533                                         Thread.sleep(500);
534                                     }
535                                 }
536                                 writer.flush();
537                             } catch (final InterruptedException ex) {
538                                 Thread.currentThread().interrupt();
539                                 throw new InterruptedIOException(ex.getMessage());
540                             }
541                         }
542                     }
543                 };
544             }
545 
546         });
547         final InetSocketAddress serverEndpoint = server.start();
548 
549         client.start(H2Config.custom()
550                 .setInitialWindowSize(512)
551                 .build());
552 
553         final Future<ClientSessionEndpoint> connectFuture = client.connect(
554                 "localhost", serverEndpoint.getPort(), TIMEOUT);
555         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
556 
557         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
558         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
559                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
560                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
561         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
562         Assert.assertNotNull(result1);
563         final HttpResponse response1 = result1.getHead();
564         Assert.assertNotNull(response1);
565         Assert.assertEquals(200, response1.getCode());
566         final String s1 = result1.getBody();
567         Assert.assertNotNull(s1);
568         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
569         while (t1.hasMoreTokens()) {
570             Assert.assertEquals("0123456789abcd", t1.nextToken());
571         }
572     }
573 
574     @Test
575     public void testPush() throws Exception {
576         final InetSocketAddress serverEndpoint = server.start();
577         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
578 
579             @Override
580             public AsyncServerExchangeHandler get() {
581                 return new MessageExchangeHandler<Void>(new NoopEntityConsumer()) {
582 
583                     @Override
584                     protected void handle(
585                             final Message<HttpRequest, Void> request,
586                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
587                             final HttpContext context) throws IOException, HttpException {
588                         responseTrigger.pushPromise(
589                                 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
590                                 context,
591                                 new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)));
592                         responseTrigger.submitResponse(
593                                 AsyncResponseBuilder.create(HttpStatus.SC_OK).setEntity("Hi there", ContentType.TEXT_PLAIN).build(),
594                                 context);
595                     }
596                 };
597             }
598 
599         });
600 
601         client.start(H2Config.custom().setPushEnabled(true).build());
602 
603         final BlockingQueue<Message<HttpResponse, String>> pushMessageQueue = new LinkedBlockingDeque<>();
604 
605         final Future<ClientSessionEndpoint> connectFuture = client.connect(
606                 "localhost", serverEndpoint.getPort(), TIMEOUT);
607         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
608 
609         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
610                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
611                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
612                 new HandlerFactory<AsyncPushConsumer>() {
613 
614                     @Override
615                     public AsyncPushConsumer create(
616                             final HttpRequest request, final HttpContext context) throws HttpException {
617                         return new AbstractAsyncPushHandler<Message<HttpResponse, String>>(new BasicResponseConsumer<>(new StringAsyncEntityConsumer())) {
618 
619                             @Override
620                             protected void handleResponse(
621                                     final HttpRequest promise,
622                                     final Message<HttpResponse, String> responseMessage) throws IOException, HttpException {
623                                 try {
624                                     pushMessageQueue.put(responseMessage);
625                                 } catch (final InterruptedException ex) {
626                                     Thread.currentThread().interrupt();
627                                     throw new InterruptedIOException(ex.getMessage());
628                                 }
629                             }
630 
631                         };
632                     }
633                 },
634                 null,
635                 null);
636         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
637         Assert.assertNotNull(result1);
638         final HttpResponse response1 = result1.getHead();
639         final String entity1 = result1.getBody();
640         Assert.assertNotNull(response1);
641         Assert.assertEquals(200, response1.getCode());
642         Assert.assertEquals("Hi there", entity1);
643 
644         final Message<HttpResponse, String> result2 = pushMessageQueue.poll(5, TimeUnit.SECONDS);
645         Assert.assertNotNull(result2);
646         final HttpResponse response2 = result2.getHead();
647         final String entity2 = result2.getBody();
648         Assert.assertEquals(200, response2.getCode());
649         Assert.assertNotNull(entity2);
650         final StringTokenizer t1 = new StringTokenizer(entity2, "\r\n");
651         while (t1.hasMoreTokens()) {
652             Assert.assertEquals("Pushing lots of stuff", t1.nextToken());
653         }
654     }
655 
656     @Test
657     public void testPushRefused() throws Exception {
658         final BlockingQueue<Exception> pushResultQueue = new LinkedBlockingDeque<>();
659         final InetSocketAddress serverEndpoint = server.start();
660         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
661 
662             @Override
663             public AsyncServerExchangeHandler get() {
664                 return new MessageExchangeHandler<Void>(new NoopEntityConsumer()) {
665 
666                     @Override
667                     protected void handle(
668                             final Message<HttpRequest, Void> request,
669                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
670                             final HttpContext context) throws IOException, HttpException {
671 
672                         responseTrigger.pushPromise(
673                                 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
674                                 context, new BasicPushProducer(AsyncEntityProducers.create("Pushing all sorts of stuff")) {
675 
676                             @Override
677                             public void failed(final Exception cause) {
678                                 pushResultQueue.add(cause);
679                                 super.failed(cause);
680                             }
681 
682                         });
683                         responseTrigger.pushPromise(
684                                 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/more-stuff")),
685                                 context, new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)) {
686 
687                             @Override
688                             public void failed(final Exception cause) {
689                                 pushResultQueue.add(cause);
690                                 super.failed(cause);
691                             }
692 
693                         });
694                         responseTrigger.submitResponse(
695                                 new BasicResponseProducer(HttpStatus.SC_OK, AsyncEntityProducers.create("Hi there")),
696                                 context);
697                     }
698                 };
699             }
700 
701         });
702 
703         client.start(H2Config.custom().setPushEnabled(true).build());
704 
705         final Future<ClientSessionEndpoint> connectFuture = client.connect(
706                 "localhost", serverEndpoint.getPort(), TIMEOUT);
707         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
708 
709         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
710                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
711                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
712         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
713         Assert.assertNotNull(result1);
714         final HttpResponse response1 = result1.getHead();
715         final String entity1 = result1.getBody();
716         Assert.assertNotNull(response1);
717         Assert.assertEquals(200, response1.getCode());
718         Assert.assertEquals("Hi there", entity1);
719 
720         final Object result2 = pushResultQueue.poll(5, TimeUnit.SECONDS);
721         Assert.assertNotNull(result2);
722         Assert.assertTrue(result2 instanceof H2StreamResetException);
723         Assert.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result2).getCode());
724 
725         final Object result3 = pushResultQueue.poll(5, TimeUnit.SECONDS);
726         Assert.assertNotNull(result3);
727         Assert.assertTrue(result3 instanceof H2StreamResetException);
728         Assert.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result3).getCode());
729     }
730 
731     @Test
732     public void testExcessOfConcurrentStreams() throws Exception {
733         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
734 
735             @Override
736             public AsyncServerExchangeHandler get() {
737                 return new MultiLineResponseHandler("0123456789abcdef", 2000);
738             }
739 
740         });
741         final InetSocketAddress serverEndpoint = server.start(H2Config.custom().setMaxConcurrentStreams(20).build());
742 
743         client.start(H2Config.custom().setMaxConcurrentStreams(20).build());
744         final Future<ClientSessionEndpoint> connectFuture = client.connect(
745                 "localhost", serverEndpoint.getPort(), TIMEOUT);
746         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
747 
748         final Queue<Future<Message<HttpResponse, Void>>> queue = new LinkedList<>();
749         for (int i = 0; i < 2000; i++) {
750             final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
751             final Future<Message<HttpResponse, Void>> future = streamEndpoint.execute(
752                     new BasicRequestProducer(request1, null),
753                     new BasicResponseConsumer<>(new NoopEntityConsumer()), null);
754             queue.add(future);
755         }
756 
757         while (!queue.isEmpty()) {
758             final Future<Message<HttpResponse, Void>> future = queue.remove();
759             final Message<HttpResponse, Void> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
760             Assert.assertNotNull(result);
761             final HttpResponse response = result.getHead();
762             Assert.assertNotNull(response);
763             Assert.assertEquals(200, response.getCode());
764         }
765     }
766 
767     @Test
768     public void testExpectationFailed() throws Exception {
769         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
770 
771             @Override
772             public AsyncServerExchangeHandler get() {
773                 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
774 
775                     @Override
776                     protected void handle(
777                             final Message<HttpRequest, String> request,
778                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
779                             final HttpContext context) throws IOException, HttpException {
780                         responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
781 
782                     }
783                 };
784             }
785 
786         });
787         final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
788 
789             @Override
790             public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
791 
792                 return new BasicAsyncServerExpectationDecorator(handler) {
793 
794                     @Override
795                     protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
796                         final Header h = request.getFirstHeader("password");
797                         if (h != null && "secret".equals(h.getValue())) {
798                             return null;
799                         } else {
800                             return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
801                         }
802                     }
803                 };
804 
805             }
806         }, H2Config.DEFAULT);
807 
808         client.start();
809         final Future<ClientSessionEndpoint> connectFuture = client.connect(
810                 "localhost", serverEndpoint.getPort(), TIMEOUT);
811         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
812 
813         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
814         request1.addHeader("password", "secret");
815         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
816                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
817                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
818         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
819         Assert.assertNotNull(result1);
820         final HttpResponse response1 = result1.getHead();
821         Assert.assertNotNull(response1);
822         Assert.assertEquals(200, response1.getCode());
823         Assert.assertNotNull("All is well", result1.getBody());
824 
825         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
826         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
827                 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
828                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
829         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
830         Assert.assertNotNull(result2);
831         final HttpResponse response2 = result2.getHead();
832         Assert.assertNotNull(response2);
833         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
834         Assert.assertNotNull("You shall not pass", result2.getBody());
835     }
836 
837     @Test
838     public void testPrematureResponse() throws Exception {
839         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
840 
841             @Override
842             public AsyncServerExchangeHandler get() {
843                 return new AsyncServerExchangeHandler() {
844 
845                     private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>(null);
846 
847                     @Override
848                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
849                         capacityChannel.update(Integer.MAX_VALUE);
850                     }
851 
852                     @Override
853                     public void consume(final ByteBuffer src) throws IOException {
854                     }
855 
856                     @Override
857                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
858                     }
859 
860                     @Override
861                     public void handleRequest(
862                             final HttpRequest request,
863                             final EntityDetails entityDetails,
864                             final ResponseChannel responseChannel,
865                             final HttpContext context) throws HttpException, IOException {
866                         final AsyncResponseProducer producer;
867                         final Header h = request.getFirstHeader("password");
868                         if (h != null && "secret".equals(h.getValue())) {
869                             producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
870                         } else {
871                             producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
872                         }
873                         responseProducer.set(producer);
874                         producer.sendResponse(responseChannel, context);
875                     }
876 
877                     @Override
878                     public int available() {
879                         final AsyncResponseProducer producer = this.responseProducer.get();
880                         return producer.available();
881                     }
882 
883                     @Override
884                     public void produce(final DataStreamChannel channel) throws IOException {
885                         final AsyncResponseProducer producer = this.responseProducer.get();
886                         producer.produce(channel);
887                     }
888 
889                     @Override
890                     public void failed(final Exception cause) {
891                     }
892 
893                     @Override
894                     public void releaseResources() {
895                     }
896                 };
897             }
898 
899         });
900         final InetSocketAddress serverEndpoint = server.start();
901 
902         client.start();
903         final Future<ClientSessionEndpoint> connectFuture = client.connect(
904                 "localhost", serverEndpoint.getPort(), TIMEOUT);
905         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
906 
907         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
908         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
909                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
910                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
911         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
912         Assert.assertNotNull(result1);
913         final HttpResponse response1 = result1.getHead();
914         Assert.assertNotNull(response1);
915         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
916         Assert.assertNotNull("You shall not pass", result1.getBody());
917     }
918 
919     @Test
920     public void testMessageWithTrailers() throws Exception {
921         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
922 
923             @Override
924             public AsyncServerExchangeHandler get() {
925                 return new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
926 
927                     @Override
928                     protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
929                             final HttpRequest request,
930                             final EntityDetails entityDetails,
931                             final HttpContext context) throws HttpException {
932                         return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
933                     }
934 
935                     @Override
936                     protected void handle(
937                             final Message<HttpRequest, String> requestMessage,
938                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
939                             final HttpContext context) throws HttpException, IOException {
940                         responseTrigger.submitResponse(new BasicResponseProducer(
941                                 HttpStatus.SC_OK,
942                                 new DigestingEntityProducer("MD5",
943                                         new StringAsyncEntityProducer("Hello back with some trailers"))), context);
944                     }
945                 };
946             }
947 
948         });
949         final InetSocketAddress serverEndpoint = server.start();
950 
951         client.start();
952 
953         final Future<ClientSessionEndpoint> connectFuture = client.connect(
954                 "localhost", serverEndpoint.getPort(), TIMEOUT);
955         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
956 
957         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
958         final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
959         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
960                 new BasicRequestProducer(request1, null),
961                 new BasicResponseConsumer<>(entityConsumer), null);
962         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
963         Assert.assertNotNull(result1);
964         final HttpResponse response1 = result1.getHead();
965         Assert.assertNotNull(response1);
966         Assert.assertEquals(200, response1.getCode());
967         Assert.assertEquals("Hello back with some trailers", result1.getBody());
968 
969         final List<Header> trailers = entityConsumer.getTrailers();
970         Assert.assertNotNull(trailers);
971         Assert.assertEquals(2, trailers.size());
972         final Map<String, String> map = new HashMap<>();
973         for (final Header header: trailers) {
974             map.put(header.getName().toLowerCase(Locale.ROOT), header.getValue());
975         }
976         final String digest = TextUtils.toHexString(entityConsumer.getDigest());
977         Assert.assertEquals("MD5", map.get("digest-algo"));
978         Assert.assertEquals(digest, map.get("digest"));
979     }
980 
981     @Test
982     public void testConnectionPing() throws Exception {
983         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
984 
985             @Override
986             public AsyncServerExchangeHandler get() {
987                 return new SingleLineResponseHandler("Hi there");
988             }
989 
990         });
991         final InetSocketAddress serverEndpoint = server.start();
992 
993         client.start();
994         final Future<ClientSessionEndpoint> connectFuture = client.connect(
995                 "localhost", serverEndpoint.getPort(), TIMEOUT);
996         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
997 
998         final int n = 10;
999         final CountDownLatch latch = new CountDownLatch(n);
1000         final AtomicInteger count = new AtomicInteger(0);
1001         for (int i = 0; i < n; i++) {
1002             streamEndpoint.execute(
1003                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1004                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1005             streamEndpoint.execute(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
1006 
1007                 @Override
1008                 public void execute(final Boolean result) {
1009                     if (result) {
1010                         count.incrementAndGet();
1011                     }
1012                     latch.countDown();
1013                 }
1014 
1015             })), Command.Priority.NORMAL);
1016 
1017         }
1018         Assert.assertTrue(latch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1019         Assert.assertEquals(n, count.get());
1020     }
1021 
1022     @Test
1023     public void testRequestWithInvalidConnectionHeader() throws Exception {
1024         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1025 
1026             @Override
1027             public AsyncServerExchangeHandler get() {
1028                 return new SingleLineResponseHandler("Hi there");
1029             }
1030 
1031         });
1032         final InetSocketAddress serverEndpoint = server.start();
1033 
1034         client.start();
1035 
1036         final Future<IOSession> sessionFuture = client.requestSession(new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
1037         final IOSession session = sessionFuture.get();
1038         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(session);
1039 
1040         final HttpRequest request = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1041         request.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
1042         final HttpCoreContext coreContext = HttpCoreContext.create();
1043         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1044                     new BasicRequestProducer(request, null),
1045                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
1046                     coreContext, null);
1047         try {
1048             future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1049             Assert.fail("ExecutionException is expected");
1050         } catch (final ExecutionException ex) {
1051             Assert.assertThat(ex.getCause(), CoreMatchers.instanceOf(ProtocolException.class));
1052         }
1053 
1054         final EndpointDetails endpointDetails = coreContext.getEndpointDetails();
1055         Assert.assertThat(endpointDetails.getRequestCount(), CoreMatchers.equalTo(0L));
1056     }
1057 
1058     @Test
1059     public void testHeaderTooLarge() throws Exception {
1060         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1061 
1062             @Override
1063             public AsyncServerExchangeHandler get() {
1064                 return new SingleLineResponseHandler("Hi there");
1065             }
1066 
1067         });
1068         final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
1069                 .setMaxHeaderListSize(100)
1070                 .build());
1071         client.start();
1072 
1073         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1074                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1075         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1076 
1077         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1078         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1079                 "1234567890123456789012345678901234567890");
1080         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1081                 new BasicRequestProducer(request1, null),
1082                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1083         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1084         Assert.assertNotNull(result1);
1085         final HttpResponse response1 = result1.getHead();
1086         Assert.assertNotNull(response1);
1087         Assert.assertEquals(431, response1.getCode());
1088         Assert.assertEquals("Maximum header list size exceeded", result1.getBody());
1089     }
1090 
1091     @Test
1092     public void testHeaderTooLargePost() throws Exception {
1093         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1094 
1095             @Override
1096             public AsyncServerExchangeHandler get() {
1097                 return new SingleLineResponseHandler("Hi there");
1098             }
1099 
1100         });
1101         final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
1102                 .setMaxHeaderListSize(100)
1103                 .build());
1104         client.start(
1105                 new DefaultHttpProcessor(new H2RequestContent(), new H2RequestTargetHost(), new H2RequestConnControl()),
1106                 H2Config.DEFAULT);
1107 
1108         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1109                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1110         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1111 
1112         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1113         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1114                 "1234567890123456789012345678901234567890");
1115 
1116         final byte[] b = new byte[2048];
1117         for (int i = 0; i < b.length; i++) {
1118             b[i] = (byte) ('a' + i % 10);
1119         }
1120 
1121         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1122                 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1123                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1124         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1125         Assert.assertNotNull(result1);
1126         final HttpResponse response1 = result1.getHead();
1127         Assert.assertNotNull(response1);
1128         Assert.assertEquals(431, response1.getCode());
1129         Assert.assertEquals("Maximum header list size exceeded", result1.getBody());
1130     }
1131 
1132 }