1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.io.BufferedReader;
31 import java.io.BufferedWriter;
32 import java.io.IOException;
33 import java.io.InputStream;
34 import java.io.InputStreamReader;
35 import java.io.InterruptedIOException;
36 import java.io.OutputStream;
37 import java.io.OutputStreamWriter;
38 import java.net.InetSocketAddress;
39 import java.net.URI;
40 import java.net.URISyntaxException;
41 import java.nio.ByteBuffer;
42 import java.nio.charset.Charset;
43 import java.nio.charset.StandardCharsets;
44 import java.util.Arrays;
45 import java.util.Collection;
46 import java.util.HashMap;
47 import java.util.LinkedList;
48 import java.util.List;
49 import java.util.Locale;
50 import java.util.Map;
51 import java.util.Queue;
52 import java.util.StringTokenizer;
53 import java.util.concurrent.BlockingQueue;
54 import java.util.concurrent.CountDownLatch;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.Future;
58 import java.util.concurrent.LinkedBlockingDeque;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.atomic.AtomicReference;
62
63 import org.apache.hc.core5.function.Callback;
64 import org.apache.hc.core5.function.Decorator;
65 import org.apache.hc.core5.function.Supplier;
66 import org.apache.hc.core5.http.ContentType;
67 import org.apache.hc.core5.http.EndpointDetails;
68 import org.apache.hc.core5.http.EntityDetails;
69 import org.apache.hc.core5.http.Header;
70 import org.apache.hc.core5.http.HeaderElements;
71 import org.apache.hc.core5.http.HttpException;
72 import org.apache.hc.core5.http.HttpHeaders;
73 import org.apache.hc.core5.http.HttpHost;
74 import org.apache.hc.core5.http.HttpRequest;
75 import org.apache.hc.core5.http.HttpResponse;
76 import org.apache.hc.core5.http.HttpStatus;
77 import org.apache.hc.core5.http.Message;
78 import org.apache.hc.core5.http.Method;
79 import org.apache.hc.core5.http.ProtocolException;
80 import org.apache.hc.core5.http.URIScheme;
81 import org.apache.hc.core5.http.message.BasicHttpRequest;
82 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
83 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
84 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
85 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
86 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
87 import org.apache.hc.core5.http.nio.CapacityChannel;
88 import org.apache.hc.core5.http.nio.DataStreamChannel;
89 import org.apache.hc.core5.http.nio.HandlerFactory;
90 import org.apache.hc.core5.http.nio.ResponseChannel;
91 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
92 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
93 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
94 import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
95 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
96 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
97 import org.apache.hc.core5.http.nio.support.AbstractAsyncPushHandler;
98 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
99 import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
100 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
101 import org.apache.hc.core5.http.nio.support.BasicPushProducer;
102 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
103 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
104 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
105 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
106 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
107 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
108 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
109 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
110 import org.apache.hc.core5.http.protocol.HttpContext;
111 import org.apache.hc.core5.http.protocol.HttpCoreContext;
112 import org.apache.hc.core5.http2.H2Error;
113 import org.apache.hc.core5.http2.H2StreamResetException;
114 import org.apache.hc.core5.http2.config.H2Config;
115 import org.apache.hc.core5.http2.nio.command.PingCommand;
116 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
117 import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
118 import org.apache.hc.core5.http2.protocol.H2RequestContent;
119 import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
120 import org.apache.hc.core5.reactor.Command;
121 import org.apache.hc.core5.reactor.IOReactorConfig;
122 import org.apache.hc.core5.reactor.IOSession;
123 import org.apache.hc.core5.testing.SSLTestContexts;
124 import org.apache.hc.core5.util.TextUtils;
125 import org.apache.hc.core5.util.TimeValue;
126 import org.apache.hc.core5.util.Timeout;
127 import org.hamcrest.CoreMatchers;
128 import org.junit.After;
129 import org.junit.Assert;
130 import org.junit.Before;
131 import org.junit.Test;
132 import org.junit.runner.RunWith;
133 import org.junit.runners.Parameterized;
134 import org.slf4j.Logger;
135 import org.slf4j.LoggerFactory;
136
137 @RunWith(Parameterized.class)
138 public class H2IntegrationTest extends InternalH2ServerTestBase {
139
140 private final Logger log = LoggerFactory.getLogger(getClass());
141
142 @Parameterized.Parameters(name = "{0}")
143 public static Collection<Object[]> protocols() {
144 return Arrays.asList(new Object[][]{
145 { URIScheme.HTTP },
146 { URIScheme.HTTPS }
147 });
148 }
149
150 public H2IntegrationTest(final URIScheme scheme) {
151 super(scheme);
152 }
153
154 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
155 private static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
156
157 private H2TestClient client;
158
159 @Before
160 public void setup() throws Exception {
161 log.debug("Starting up test client");
162 client = new H2TestClient(buildReactorConfig(),
163 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null, null, null);
164 }
165
166 protected IOReactorConfig buildReactorConfig() {
167 return IOReactorConfig.DEFAULT;
168 }
169
170 @After
171 public void cleanup() throws Exception {
172 log.debug("Shutting down test client");
173 if (client != null) {
174 client.shutdown(TimeValue.ofSeconds(5));
175 }
176 }
177
178 private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
179 try {
180 return new URI("http", null, "localhost", serverEndpoint.getPort(), path, null, null);
181 } catch (final URISyntaxException e) {
182 throw new IllegalStateException();
183 }
184 }
185
186 @Test
187 public void testSimpleGet() throws Exception {
188 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
189
190 @Override
191 public AsyncServerExchangeHandler get() {
192 return new SingleLineResponseHandler("Hi there");
193 }
194
195 });
196 final InetSocketAddress serverEndpoint = server.start();
197
198 client.start();
199 final Future<ClientSessionEndpoint> connectFuture = client.connect(
200 "localhost", serverEndpoint.getPort(), TIMEOUT);
201 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
202
203 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
204 for (int i = 0; i < 10; i++) {
205 queue.add(streamEndpoint.execute(
206 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
207 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
208
209 }
210 while (!queue.isEmpty()) {
211 final Future<Message<HttpResponse, String>> future = queue.remove();
212 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
213 Assert.assertNotNull(result);
214 final HttpResponse response = result.getHead();
215 final String entity = result.getBody();
216 Assert.assertNotNull(response);
217 Assert.assertEquals(200, response.getCode());
218 Assert.assertEquals("Hi there", entity);
219 }
220 }
221
222 @Test
223 public void testSimpleHead() throws Exception {
224 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
225
226 @Override
227 public AsyncServerExchangeHandler get() {
228 return new SingleLineResponseHandler("Hi there");
229 }
230
231 });
232 final InetSocketAddress serverEndpoint = server.start();
233
234 client.start();
235 final Future<ClientSessionEndpoint> connectFuture = client.connect(
236 "localhost", serverEndpoint.getPort(), TIMEOUT);
237 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
238
239 for (int i = 0; i < 5; i++) {
240 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
241 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
242 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
243 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
244 Assert.assertNotNull(result);
245 final HttpResponse response1 = result.getHead();
246 Assert.assertNotNull(response1);
247 Assert.assertEquals(200, response1.getCode());
248 Assert.assertNull(result.getBody());
249 }
250 }
251
252 @Test
253 public void testLargeGet() throws Exception {
254 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
255
256 @Override
257 public AsyncServerExchangeHandler get() {
258 return new MultiLineResponseHandler("0123456789abcdef", 5000);
259 }
260
261 });
262 final InetSocketAddress serverEndpoint = server.start();
263
264 client.start();
265 final Future<ClientSessionEndpoint> connectFuture = client.connect(
266 "localhost", serverEndpoint.getPort(), TIMEOUT);
267 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
268
269 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
270 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
271 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
272
273 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
274 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
275 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
276
277 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
278 Assert.assertNotNull(result1);
279 final HttpResponse response1 = result1.getHead();
280 Assert.assertNotNull(response1);
281 Assert.assertEquals(200, response1.getCode());
282 final String s1 = result1.getBody();
283 Assert.assertNotNull(s1);
284 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
285 while (t1.hasMoreTokens()) {
286 Assert.assertEquals("0123456789abcdef", t1.nextToken());
287 }
288
289 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
290 Assert.assertNotNull(result2);
291 final HttpResponse response2 = result2.getHead();
292 Assert.assertNotNull(response2);
293 Assert.assertEquals(200, response2.getCode());
294 final String s2 = result2.getBody();
295 Assert.assertNotNull(s2);
296 final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
297 while (t2.hasMoreTokens()) {
298 Assert.assertEquals("0123456789abcdef", t2.nextToken());
299 }
300 }
301
302 @Test
303 public void testBasicPost() throws Exception {
304 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
305
306 @Override
307 public AsyncServerExchangeHandler get() {
308 return new SingleLineResponseHandler("Hi back");
309 }
310
311 });
312 final InetSocketAddress serverEndpoint = server.start();
313
314 client.start();
315 final Future<ClientSessionEndpoint> connectFuture = client.connect(
316 "localhost", serverEndpoint.getPort(), TIMEOUT);
317 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
318
319 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
320 for (int i = 0; i < 10; i++) {
321 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
322 queue.add(streamEndpoint.execute(
323 new BasicRequestProducer(request, new StringAsyncEntityProducer("Hi there", ContentType.TEXT_PLAIN)),
324 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
325
326 }
327 while (!queue.isEmpty()) {
328 final Future<Message<HttpResponse, String>> future = queue.remove();
329 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
330 Assert.assertNotNull(result);
331 final HttpResponse response = result.getHead();
332 final String entity1 = result.getBody();
333 Assert.assertNotNull(response);
334 Assert.assertEquals(200, response.getCode());
335 Assert.assertEquals("Hi back", entity1);
336 }
337 }
338
339 @Test
340 public void testLargePost() throws Exception {
341 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
342
343 @Override
344 public AsyncServerExchangeHandler get() {
345 return new EchoHandler(2048);
346 }
347
348 });
349 final InetSocketAddress serverEndpoint = server.start();
350
351 client.start();
352 final Future<ClientSessionEndpoint> connectFuture = client.connect(
353 "localhost", serverEndpoint.getPort(), TIMEOUT);
354 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
355
356 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
357 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
358 new MultiLineEntityProducer("0123456789abcdef", 5000)),
359 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
360 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
361 Assert.assertNotNull(result1);
362 final HttpResponse response1 = result1.getHead();
363 Assert.assertNotNull(response1);
364 Assert.assertEquals(200, response1.getCode());
365 final String s1 = result1.getBody();
366 Assert.assertNotNull(s1);
367 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
368 while (t1.hasMoreTokens()) {
369 Assert.assertEquals("0123456789abcdef", t1.nextToken());
370 }
371 }
372
373 @Test
374 public void testSlowResponseConsumer() throws Exception {
375 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
376
377 @Override
378 public AsyncServerExchangeHandler get() {
379 return new MultiLineResponseHandler("0123456789abcd", 3);
380 }
381
382 });
383 final InetSocketAddress serverEndpoint = server.start();
384
385 client.start(H2Config.custom().setInitialWindowSize(16).build());
386 final Future<ClientSessionEndpoint> connectFuture = client.connect(
387 "localhost", serverEndpoint.getPort(), TIMEOUT);
388 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
389
390 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
391 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
392 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
393
394 @Override
395 protected String consumeData(
396 final ContentType contentType, final InputStream inputStream) throws IOException {
397 Charset charset = contentType != null ? contentType.getCharset() : null;
398 if (charset == null) {
399 charset = StandardCharsets.US_ASCII;
400 }
401
402 final StringBuilder buffer = new StringBuilder();
403 try {
404 final byte[] tmp = new byte[16];
405 int l;
406 while ((l = inputStream.read(tmp)) != -1) {
407 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
408 Thread.sleep(500);
409 }
410 } catch (final InterruptedException ex) {
411 Thread.currentThread().interrupt();
412 throw new InterruptedIOException(ex.getMessage());
413 }
414 return buffer.toString();
415 }
416 }),
417 null);
418
419 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
420 Assert.assertNotNull(result1);
421 final HttpResponse response1 = result1.getHead();
422 Assert.assertNotNull(response1);
423 Assert.assertEquals(200, response1.getCode());
424 final String s1 = result1.getBody();
425 Assert.assertNotNull(s1);
426 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
427 while (t1.hasMoreTokens()) {
428 Assert.assertEquals("0123456789abcd", t1.nextToken());
429 }
430 }
431
432 @Test
433 public void testSlowRequestProducer() throws Exception {
434 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
435
436 @Override
437 public AsyncServerExchangeHandler get() {
438 return new EchoHandler(2048);
439 }
440
441 });
442 final InetSocketAddress serverEndpoint = server.start();
443
444 client.start();
445 final Future<ClientSessionEndpoint> connectFuture = client.connect(
446 "localhost", serverEndpoint.getPort(), TIMEOUT);
447 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
448
449 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
450 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
451 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
452
453 @Override
454 protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
455 Charset charset = contentType.getCharset();
456 if (charset == null) {
457 charset = StandardCharsets.US_ASCII;
458 }
459 try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
460 for (int i = 0; i < 500; i++) {
461 if (i % 100 == 0) {
462 writer.flush();
463 Thread.sleep(500);
464 }
465 writer.write("0123456789abcdef\r\n");
466 }
467 } catch (final InterruptedException ex) {
468 Thread.currentThread().interrupt();
469 throw new InterruptedIOException(ex.getMessage());
470 }
471 }
472
473 }),
474 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
475 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
476 Assert.assertNotNull(result1);
477 final HttpResponse response1 = result1.getHead();
478 Assert.assertNotNull(response1);
479 Assert.assertEquals(200, response1.getCode());
480 final String s1 = result1.getBody();
481 Assert.assertNotNull(s1);
482 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
483 while (t1.hasMoreTokens()) {
484 Assert.assertEquals("0123456789abcdef", t1.nextToken());
485 }
486 }
487
488 @Test
489 public void testSlowResponseProducer() throws Exception {
490 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
491
492 @Override
493 public AsyncServerExchangeHandler get() {
494 return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
495
496 @Override
497 protected void handle(
498 final HttpRequest request,
499 final InputStream requestStream,
500 final HttpResponse response,
501 final OutputStream responseStream,
502 final HttpContext context) throws IOException, HttpException {
503
504 if (!"/hello".equals(request.getPath())) {
505 response.setCode(HttpStatus.SC_NOT_FOUND);
506 return;
507 }
508 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
509 response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
510 return;
511 }
512 if (requestStream == null) {
513 return;
514 }
515 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
516 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
517 Charset charset = contentType != null ? contentType.getCharset() : null;
518 if (charset == null) {
519 charset = StandardCharsets.US_ASCII;
520 }
521 response.setCode(HttpStatus.SC_OK);
522 response.setHeader(h1);
523 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
524 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
525 try {
526 String l;
527 int count = 0;
528 while ((l = reader.readLine()) != null) {
529 writer.write(l);
530 writer.write("\r\n");
531 count++;
532 if (count % 500 == 0) {
533 Thread.sleep(500);
534 }
535 }
536 writer.flush();
537 } catch (final InterruptedException ex) {
538 Thread.currentThread().interrupt();
539 throw new InterruptedIOException(ex.getMessage());
540 }
541 }
542 }
543 };
544 }
545
546 });
547 final InetSocketAddress serverEndpoint = server.start();
548
549 client.start(H2Config.custom()
550 .setInitialWindowSize(512)
551 .build());
552
553 final Future<ClientSessionEndpoint> connectFuture = client.connect(
554 "localhost", serverEndpoint.getPort(), TIMEOUT);
555 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
556
557 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
558 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
559 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
560 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
561 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
562 Assert.assertNotNull(result1);
563 final HttpResponse response1 = result1.getHead();
564 Assert.assertNotNull(response1);
565 Assert.assertEquals(200, response1.getCode());
566 final String s1 = result1.getBody();
567 Assert.assertNotNull(s1);
568 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
569 while (t1.hasMoreTokens()) {
570 Assert.assertEquals("0123456789abcd", t1.nextToken());
571 }
572 }
573
574 @Test
575 public void testPush() throws Exception {
576 final InetSocketAddress serverEndpoint = server.start();
577 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
578
579 @Override
580 public AsyncServerExchangeHandler get() {
581 return new MessageExchangeHandler<Void>(new NoopEntityConsumer()) {
582
583 @Override
584 protected void handle(
585 final Message<HttpRequest, Void> request,
586 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
587 final HttpContext context) throws IOException, HttpException {
588 responseTrigger.pushPromise(
589 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
590 context,
591 new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)));
592 responseTrigger.submitResponse(
593 AsyncResponseBuilder.create(HttpStatus.SC_OK).setEntity("Hi there", ContentType.TEXT_PLAIN).build(),
594 context);
595 }
596 };
597 }
598
599 });
600
601 client.start(H2Config.custom().setPushEnabled(true).build());
602
603 final BlockingQueue<Message<HttpResponse, String>> pushMessageQueue = new LinkedBlockingDeque<>();
604
605 final Future<ClientSessionEndpoint> connectFuture = client.connect(
606 "localhost", serverEndpoint.getPort(), TIMEOUT);
607 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
608
609 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
610 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
611 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
612 new HandlerFactory<AsyncPushConsumer>() {
613
614 @Override
615 public AsyncPushConsumer create(
616 final HttpRequest request, final HttpContext context) throws HttpException {
617 return new AbstractAsyncPushHandler<Message<HttpResponse, String>>(new BasicResponseConsumer<>(new StringAsyncEntityConsumer())) {
618
619 @Override
620 protected void handleResponse(
621 final HttpRequest promise,
622 final Message<HttpResponse, String> responseMessage) throws IOException, HttpException {
623 try {
624 pushMessageQueue.put(responseMessage);
625 } catch (final InterruptedException ex) {
626 Thread.currentThread().interrupt();
627 throw new InterruptedIOException(ex.getMessage());
628 }
629 }
630
631 };
632 }
633 },
634 null,
635 null);
636 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
637 Assert.assertNotNull(result1);
638 final HttpResponse response1 = result1.getHead();
639 final String entity1 = result1.getBody();
640 Assert.assertNotNull(response1);
641 Assert.assertEquals(200, response1.getCode());
642 Assert.assertEquals("Hi there", entity1);
643
644 final Message<HttpResponse, String> result2 = pushMessageQueue.poll(5, TimeUnit.SECONDS);
645 Assert.assertNotNull(result2);
646 final HttpResponse response2 = result2.getHead();
647 final String entity2 = result2.getBody();
648 Assert.assertEquals(200, response2.getCode());
649 Assert.assertNotNull(entity2);
650 final StringTokenizer t1 = new StringTokenizer(entity2, "\r\n");
651 while (t1.hasMoreTokens()) {
652 Assert.assertEquals("Pushing lots of stuff", t1.nextToken());
653 }
654 }
655
656 @Test
657 public void testPushRefused() throws Exception {
658 final BlockingQueue<Exception> pushResultQueue = new LinkedBlockingDeque<>();
659 final InetSocketAddress serverEndpoint = server.start();
660 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
661
662 @Override
663 public AsyncServerExchangeHandler get() {
664 return new MessageExchangeHandler<Void>(new NoopEntityConsumer()) {
665
666 @Override
667 protected void handle(
668 final Message<HttpRequest, Void> request,
669 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
670 final HttpContext context) throws IOException, HttpException {
671
672 responseTrigger.pushPromise(
673 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
674 context, new BasicPushProducer(AsyncEntityProducers.create("Pushing all sorts of stuff")) {
675
676 @Override
677 public void failed(final Exception cause) {
678 pushResultQueue.add(cause);
679 super.failed(cause);
680 }
681
682 });
683 responseTrigger.pushPromise(
684 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/more-stuff")),
685 context, new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)) {
686
687 @Override
688 public void failed(final Exception cause) {
689 pushResultQueue.add(cause);
690 super.failed(cause);
691 }
692
693 });
694 responseTrigger.submitResponse(
695 new BasicResponseProducer(HttpStatus.SC_OK, AsyncEntityProducers.create("Hi there")),
696 context);
697 }
698 };
699 }
700
701 });
702
703 client.start(H2Config.custom().setPushEnabled(true).build());
704
705 final Future<ClientSessionEndpoint> connectFuture = client.connect(
706 "localhost", serverEndpoint.getPort(), TIMEOUT);
707 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
708
709 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
710 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
711 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
712 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
713 Assert.assertNotNull(result1);
714 final HttpResponse response1 = result1.getHead();
715 final String entity1 = result1.getBody();
716 Assert.assertNotNull(response1);
717 Assert.assertEquals(200, response1.getCode());
718 Assert.assertEquals("Hi there", entity1);
719
720 final Object result2 = pushResultQueue.poll(5, TimeUnit.SECONDS);
721 Assert.assertNotNull(result2);
722 Assert.assertTrue(result2 instanceof H2StreamResetException);
723 Assert.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result2).getCode());
724
725 final Object result3 = pushResultQueue.poll(5, TimeUnit.SECONDS);
726 Assert.assertNotNull(result3);
727 Assert.assertTrue(result3 instanceof H2StreamResetException);
728 Assert.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result3).getCode());
729 }
730
731 @Test
732 public void testExcessOfConcurrentStreams() throws Exception {
733 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
734
735 @Override
736 public AsyncServerExchangeHandler get() {
737 return new MultiLineResponseHandler("0123456789abcdef", 2000);
738 }
739
740 });
741 final InetSocketAddress serverEndpoint = server.start(H2Config.custom().setMaxConcurrentStreams(20).build());
742
743 client.start(H2Config.custom().setMaxConcurrentStreams(20).build());
744 final Future<ClientSessionEndpoint> connectFuture = client.connect(
745 "localhost", serverEndpoint.getPort(), TIMEOUT);
746 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
747
748 final Queue<Future<Message<HttpResponse, Void>>> queue = new LinkedList<>();
749 for (int i = 0; i < 2000; i++) {
750 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
751 final Future<Message<HttpResponse, Void>> future = streamEndpoint.execute(
752 new BasicRequestProducer(request1, null),
753 new BasicResponseConsumer<>(new NoopEntityConsumer()), null);
754 queue.add(future);
755 }
756
757 while (!queue.isEmpty()) {
758 final Future<Message<HttpResponse, Void>> future = queue.remove();
759 final Message<HttpResponse, Void> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
760 Assert.assertNotNull(result);
761 final HttpResponse response = result.getHead();
762 Assert.assertNotNull(response);
763 Assert.assertEquals(200, response.getCode());
764 }
765 }
766
767 @Test
768 public void testExpectationFailed() throws Exception {
769 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
770
771 @Override
772 public AsyncServerExchangeHandler get() {
773 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
774
775 @Override
776 protected void handle(
777 final Message<HttpRequest, String> request,
778 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
779 final HttpContext context) throws IOException, HttpException {
780 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
781
782 }
783 };
784 }
785
786 });
787 final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
788
789 @Override
790 public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
791
792 return new BasicAsyncServerExpectationDecorator(handler) {
793
794 @Override
795 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
796 final Header h = request.getFirstHeader("password");
797 if (h != null && "secret".equals(h.getValue())) {
798 return null;
799 } else {
800 return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
801 }
802 }
803 };
804
805 }
806 }, H2Config.DEFAULT);
807
808 client.start();
809 final Future<ClientSessionEndpoint> connectFuture = client.connect(
810 "localhost", serverEndpoint.getPort(), TIMEOUT);
811 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
812
813 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
814 request1.addHeader("password", "secret");
815 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
816 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
817 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
818 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
819 Assert.assertNotNull(result1);
820 final HttpResponse response1 = result1.getHead();
821 Assert.assertNotNull(response1);
822 Assert.assertEquals(200, response1.getCode());
823 Assert.assertNotNull("All is well", result1.getBody());
824
825 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
826 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
827 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
828 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
829 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
830 Assert.assertNotNull(result2);
831 final HttpResponse response2 = result2.getHead();
832 Assert.assertNotNull(response2);
833 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
834 Assert.assertNotNull("You shall not pass", result2.getBody());
835 }
836
837 @Test
838 public void testPrematureResponse() throws Exception {
839 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
840
841 @Override
842 public AsyncServerExchangeHandler get() {
843 return new AsyncServerExchangeHandler() {
844
845 private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>(null);
846
847 @Override
848 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
849 capacityChannel.update(Integer.MAX_VALUE);
850 }
851
852 @Override
853 public void consume(final ByteBuffer src) throws IOException {
854 }
855
856 @Override
857 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
858 }
859
860 @Override
861 public void handleRequest(
862 final HttpRequest request,
863 final EntityDetails entityDetails,
864 final ResponseChannel responseChannel,
865 final HttpContext context) throws HttpException, IOException {
866 final AsyncResponseProducer producer;
867 final Header h = request.getFirstHeader("password");
868 if (h != null && "secret".equals(h.getValue())) {
869 producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
870 } else {
871 producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
872 }
873 responseProducer.set(producer);
874 producer.sendResponse(responseChannel, context);
875 }
876
877 @Override
878 public int available() {
879 final AsyncResponseProducer producer = this.responseProducer.get();
880 return producer.available();
881 }
882
883 @Override
884 public void produce(final DataStreamChannel channel) throws IOException {
885 final AsyncResponseProducer producer = this.responseProducer.get();
886 producer.produce(channel);
887 }
888
889 @Override
890 public void failed(final Exception cause) {
891 }
892
893 @Override
894 public void releaseResources() {
895 }
896 };
897 }
898
899 });
900 final InetSocketAddress serverEndpoint = server.start();
901
902 client.start();
903 final Future<ClientSessionEndpoint> connectFuture = client.connect(
904 "localhost", serverEndpoint.getPort(), TIMEOUT);
905 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
906
907 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
908 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
909 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
910 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
911 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
912 Assert.assertNotNull(result1);
913 final HttpResponse response1 = result1.getHead();
914 Assert.assertNotNull(response1);
915 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
916 Assert.assertNotNull("You shall not pass", result1.getBody());
917 }
918
919 @Test
920 public void testMessageWithTrailers() throws Exception {
921 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
922
923 @Override
924 public AsyncServerExchangeHandler get() {
925 return new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
926
927 @Override
928 protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
929 final HttpRequest request,
930 final EntityDetails entityDetails,
931 final HttpContext context) throws HttpException {
932 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
933 }
934
935 @Override
936 protected void handle(
937 final Message<HttpRequest, String> requestMessage,
938 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
939 final HttpContext context) throws HttpException, IOException {
940 responseTrigger.submitResponse(new BasicResponseProducer(
941 HttpStatus.SC_OK,
942 new DigestingEntityProducer("MD5",
943 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
944 }
945 };
946 }
947
948 });
949 final InetSocketAddress serverEndpoint = server.start();
950
951 client.start();
952
953 final Future<ClientSessionEndpoint> connectFuture = client.connect(
954 "localhost", serverEndpoint.getPort(), TIMEOUT);
955 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
956
957 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
958 final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
959 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
960 new BasicRequestProducer(request1, null),
961 new BasicResponseConsumer<>(entityConsumer), null);
962 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
963 Assert.assertNotNull(result1);
964 final HttpResponse response1 = result1.getHead();
965 Assert.assertNotNull(response1);
966 Assert.assertEquals(200, response1.getCode());
967 Assert.assertEquals("Hello back with some trailers", result1.getBody());
968
969 final List<Header> trailers = entityConsumer.getTrailers();
970 Assert.assertNotNull(trailers);
971 Assert.assertEquals(2, trailers.size());
972 final Map<String, String> map = new HashMap<>();
973 for (final Header header: trailers) {
974 map.put(header.getName().toLowerCase(Locale.ROOT), header.getValue());
975 }
976 final String digest = TextUtils.toHexString(entityConsumer.getDigest());
977 Assert.assertEquals("MD5", map.get("digest-algo"));
978 Assert.assertEquals(digest, map.get("digest"));
979 }
980
981 @Test
982 public void testConnectionPing() throws Exception {
983 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
984
985 @Override
986 public AsyncServerExchangeHandler get() {
987 return new SingleLineResponseHandler("Hi there");
988 }
989
990 });
991 final InetSocketAddress serverEndpoint = server.start();
992
993 client.start();
994 final Future<ClientSessionEndpoint> connectFuture = client.connect(
995 "localhost", serverEndpoint.getPort(), TIMEOUT);
996 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
997
998 final int n = 10;
999 final CountDownLatch latch = new CountDownLatch(n);
1000 final AtomicInteger count = new AtomicInteger(0);
1001 for (int i = 0; i < n; i++) {
1002 streamEndpoint.execute(
1003 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1004 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1005 streamEndpoint.execute(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
1006
1007 @Override
1008 public void execute(final Boolean result) {
1009 if (result) {
1010 count.incrementAndGet();
1011 }
1012 latch.countDown();
1013 }
1014
1015 })), Command.Priority.NORMAL);
1016
1017 }
1018 Assert.assertTrue(latch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1019 Assert.assertEquals(n, count.get());
1020 }
1021
1022 @Test
1023 public void testRequestWithInvalidConnectionHeader() throws Exception {
1024 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1025
1026 @Override
1027 public AsyncServerExchangeHandler get() {
1028 return new SingleLineResponseHandler("Hi there");
1029 }
1030
1031 });
1032 final InetSocketAddress serverEndpoint = server.start();
1033
1034 client.start();
1035
1036 final Future<IOSession> sessionFuture = client.requestSession(new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
1037 final IOSession session = sessionFuture.get();
1038 final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(session);
1039
1040 final HttpRequest request = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1041 request.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
1042 final HttpCoreContext coreContext = HttpCoreContext.create();
1043 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1044 new BasicRequestProducer(request, null),
1045 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
1046 coreContext, null);
1047 try {
1048 future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1049 Assert.fail("ExecutionException is expected");
1050 } catch (final ExecutionException ex) {
1051 Assert.assertThat(ex.getCause(), CoreMatchers.instanceOf(ProtocolException.class));
1052 }
1053
1054 final EndpointDetails endpointDetails = coreContext.getEndpointDetails();
1055 Assert.assertThat(endpointDetails.getRequestCount(), CoreMatchers.equalTo(0L));
1056 }
1057
1058 @Test
1059 public void testHeaderTooLarge() throws Exception {
1060 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1061
1062 @Override
1063 public AsyncServerExchangeHandler get() {
1064 return new SingleLineResponseHandler("Hi there");
1065 }
1066
1067 });
1068 final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
1069 .setMaxHeaderListSize(100)
1070 .build());
1071 client.start();
1072
1073 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1074 "localhost", serverEndpoint.getPort(), TIMEOUT);
1075 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1076
1077 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1078 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1079 "1234567890123456789012345678901234567890");
1080 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1081 new BasicRequestProducer(request1, null),
1082 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1083 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1084 Assert.assertNotNull(result1);
1085 final HttpResponse response1 = result1.getHead();
1086 Assert.assertNotNull(response1);
1087 Assert.assertEquals(431, response1.getCode());
1088 Assert.assertEquals("Maximum header list size exceeded", result1.getBody());
1089 }
1090
1091 @Test
1092 public void testHeaderTooLargePost() throws Exception {
1093 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1094
1095 @Override
1096 public AsyncServerExchangeHandler get() {
1097 return new SingleLineResponseHandler("Hi there");
1098 }
1099
1100 });
1101 final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
1102 .setMaxHeaderListSize(100)
1103 .build());
1104 client.start(
1105 new DefaultHttpProcessor(new H2RequestContent(), new H2RequestTargetHost(), new H2RequestConnControl()),
1106 H2Config.DEFAULT);
1107
1108 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1109 "localhost", serverEndpoint.getPort(), TIMEOUT);
1110 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1111
1112 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1113 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1114 "1234567890123456789012345678901234567890");
1115
1116 final byte[] b = new byte[2048];
1117 for (int i = 0; i < b.length; i++) {
1118 b[i] = (byte) ('a' + i % 10);
1119 }
1120
1121 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1122 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1123 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1124 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1125 Assert.assertNotNull(result1);
1126 final HttpResponse response1 = result1.getHead();
1127 Assert.assertNotNull(response1);
1128 Assert.assertEquals(431, response1.getCode());
1129 Assert.assertEquals("Maximum header list size exceeded", result1.getBody());
1130 }
1131
1132 }