1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import static org.hamcrest.MatcherAssert.assertThat;
31
32 import java.io.BufferedReader;
33 import java.io.BufferedWriter;
34 import java.io.IOException;
35 import java.io.InputStream;
36 import java.io.InputStreamReader;
37 import java.io.InterruptedIOException;
38 import java.io.OutputStream;
39 import java.io.OutputStreamWriter;
40 import java.net.InetSocketAddress;
41 import java.net.URI;
42 import java.net.URISyntaxException;
43 import java.nio.ByteBuffer;
44 import java.nio.charset.Charset;
45 import java.nio.charset.StandardCharsets;
46 import java.util.HashMap;
47 import java.util.LinkedList;
48 import java.util.List;
49 import java.util.Map;
50 import java.util.Queue;
51 import java.util.StringTokenizer;
52 import java.util.concurrent.BlockingQueue;
53 import java.util.concurrent.CountDownLatch;
54 import java.util.concurrent.ExecutionException;
55 import java.util.concurrent.Executors;
56 import java.util.concurrent.Future;
57 import java.util.concurrent.LinkedBlockingDeque;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicInteger;
60 import java.util.concurrent.atomic.AtomicReference;
61
62 import org.apache.hc.core5.function.Supplier;
63 import org.apache.hc.core5.http.ContentType;
64 import org.apache.hc.core5.http.EndpointDetails;
65 import org.apache.hc.core5.http.EntityDetails;
66 import org.apache.hc.core5.http.Header;
67 import org.apache.hc.core5.http.HeaderElements;
68 import org.apache.hc.core5.http.HttpException;
69 import org.apache.hc.core5.http.HttpHeaders;
70 import org.apache.hc.core5.http.HttpHost;
71 import org.apache.hc.core5.http.HttpRequest;
72 import org.apache.hc.core5.http.HttpResponse;
73 import org.apache.hc.core5.http.HttpStatus;
74 import org.apache.hc.core5.http.Message;
75 import org.apache.hc.core5.http.Method;
76 import org.apache.hc.core5.http.ProtocolException;
77 import org.apache.hc.core5.http.URIScheme;
78 import org.apache.hc.core5.http.message.BasicHttpRequest;
79 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
80 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
81 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
82 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
83 import org.apache.hc.core5.http.nio.CapacityChannel;
84 import org.apache.hc.core5.http.nio.DataStreamChannel;
85 import org.apache.hc.core5.http.nio.ResponseChannel;
86 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
87 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
88 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
89 import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
90 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
91 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
92 import org.apache.hc.core5.http.nio.support.AbstractAsyncPushHandler;
93 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
94 import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
95 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
96 import org.apache.hc.core5.http.nio.support.BasicPushProducer;
97 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
98 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
99 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
100 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
101 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
102 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
103 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
104 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
105 import org.apache.hc.core5.http.protocol.HttpContext;
106 import org.apache.hc.core5.http.protocol.HttpCoreContext;
107 import org.apache.hc.core5.http2.H2Error;
108 import org.apache.hc.core5.http2.H2StreamResetException;
109 import org.apache.hc.core5.http2.config.H2Config;
110 import org.apache.hc.core5.http2.nio.command.PingCommand;
111 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
112 import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
113 import org.apache.hc.core5.http2.protocol.H2RequestContent;
114 import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
115 import org.apache.hc.core5.reactor.Command;
116 import org.apache.hc.core5.reactor.IOSession;
117 import org.apache.hc.core5.testing.nio.extension.H2TestResources;
118 import org.apache.hc.core5.util.TextUtils;
119 import org.apache.hc.core5.util.Timeout;
120 import org.hamcrest.CoreMatchers;
121 import org.junit.jupiter.api.Assertions;
122 import org.junit.jupiter.api.Test;
123 import org.junit.jupiter.api.extension.RegisterExtension;
124
125 public abstract class H2IntegrationTest {
126
127 private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
128 private static final Timeout LONG_TIMEOUT = Timeout.ofMinutes(2);
129
130 private final URIScheme scheme;
131
132 @RegisterExtension
133 private final H2TestResources resources;
134
135 public H2IntegrationTest(final URIScheme scheme) {
136 this.scheme = scheme;
137 this.resources = new H2TestResources(scheme, TIMEOUT);
138 }
139
140 private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
141 try {
142 return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
143 } catch (final URISyntaxException e) {
144 throw new IllegalStateException();
145 }
146 }
147
148 @Test
149 public void testSimpleGet() throws Exception {
150 final H2TestServer server = resources.server();
151 final H2TestClient client = resources.client();
152
153 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
154 final InetSocketAddress serverEndpoint = server.start();
155
156 client.start();
157 final Future<ClientSessionEndpoint> connectFuture = client.connect(
158 "localhost", serverEndpoint.getPort(), TIMEOUT);
159 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
160
161 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
162 for (int i = 0; i < 10; i++) {
163 queue.add(streamEndpoint.execute(
164 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
165 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
166
167 }
168 while (!queue.isEmpty()) {
169 final Future<Message<HttpResponse, String>> future = queue.remove();
170 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
171 Assertions.assertNotNull(result);
172 final HttpResponse response = result.getHead();
173 final String entity = result.getBody();
174 Assertions.assertNotNull(response);
175 Assertions.assertEquals(200, response.getCode());
176 Assertions.assertEquals("Hi there", entity);
177 }
178 }
179
180 @Test
181 public void testSimpleHead() throws Exception {
182 final H2TestServer server = resources.server();
183 final H2TestClient client = resources.client();
184
185 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
186 final InetSocketAddress serverEndpoint = server.start();
187
188 client.start();
189 final Future<ClientSessionEndpoint> connectFuture = client.connect(
190 "localhost", serverEndpoint.getPort(), TIMEOUT);
191 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
192
193 for (int i = 0; i < 5; i++) {
194 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
195 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
196 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
197 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
198 Assertions.assertNotNull(result);
199 final HttpResponse response1 = result.getHead();
200 Assertions.assertNotNull(response1);
201 Assertions.assertEquals(200, response1.getCode());
202 Assertions.assertNull(result.getBody());
203 }
204 }
205
206 @Test
207 public void testLargeGet() throws Exception {
208 final H2TestServer server = resources.server();
209 final H2TestClient client = resources.client();
210
211 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 5000));
212 final InetSocketAddress serverEndpoint = server.start();
213
214 client.start();
215 final Future<ClientSessionEndpoint> connectFuture = client.connect(
216 "localhost", serverEndpoint.getPort(), TIMEOUT);
217 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
218
219 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
220 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
221 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
222
223 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
224 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
225 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
226
227 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
228 Assertions.assertNotNull(result1);
229 final HttpResponse response1 = result1.getHead();
230 Assertions.assertNotNull(response1);
231 Assertions.assertEquals(200, response1.getCode());
232 final String s1 = result1.getBody();
233 Assertions.assertNotNull(s1);
234 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
235 while (t1.hasMoreTokens()) {
236 Assertions.assertEquals("0123456789abcdef", t1.nextToken());
237 }
238
239 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
240 Assertions.assertNotNull(result2);
241 final HttpResponse response2 = result2.getHead();
242 Assertions.assertNotNull(response2);
243 Assertions.assertEquals(200, response2.getCode());
244 final String s2 = result2.getBody();
245 Assertions.assertNotNull(s2);
246 final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
247 while (t2.hasMoreTokens()) {
248 Assertions.assertEquals("0123456789abcdef", t2.nextToken());
249 }
250 }
251
252 @Test
253 public void testBasicPost() throws Exception {
254 final H2TestServer server = resources.server();
255 final H2TestClient client = resources.client();
256
257 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
258 final InetSocketAddress serverEndpoint = server.start();
259
260 client.start();
261 final Future<ClientSessionEndpoint> connectFuture = client.connect(
262 "localhost", serverEndpoint.getPort(), TIMEOUT);
263 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
264
265 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
266 for (int i = 0; i < 10; i++) {
267 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
268 queue.add(streamEndpoint.execute(
269 new BasicRequestProducer(request, new StringAsyncEntityProducer("Hi there", ContentType.TEXT_PLAIN)),
270 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
271
272 }
273 while (!queue.isEmpty()) {
274 final Future<Message<HttpResponse, String>> future = queue.remove();
275 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
276 Assertions.assertNotNull(result);
277 final HttpResponse response = result.getHead();
278 final String entity1 = result.getBody();
279 Assertions.assertNotNull(response);
280 Assertions.assertEquals(200, response.getCode());
281 Assertions.assertEquals("Hi back", entity1);
282 }
283 }
284
285 @Test
286 public void testLargePost() throws Exception {
287 final H2TestServer server = resources.server();
288 final H2TestClient client = resources.client();
289
290 server.register("*", () -> new EchoHandler(2048));
291 final InetSocketAddress serverEndpoint = server.start();
292
293 client.start();
294 final Future<ClientSessionEndpoint> connectFuture = client.connect(
295 "localhost", serverEndpoint.getPort(), TIMEOUT);
296 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
297
298 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
299 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
300 new MultiLineEntityProducer("0123456789abcdef", 5000)),
301 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
302 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
303 Assertions.assertNotNull(result1);
304 final HttpResponse response1 = result1.getHead();
305 Assertions.assertNotNull(response1);
306 Assertions.assertEquals(200, response1.getCode());
307 final String s1 = result1.getBody();
308 Assertions.assertNotNull(s1);
309 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
310 while (t1.hasMoreTokens()) {
311 Assertions.assertEquals("0123456789abcdef", t1.nextToken());
312 }
313 }
314
315 @Test
316 public void testSlowResponseConsumer() throws Exception {
317 final H2TestServer server = resources.server();
318 final H2TestClient client = resources.client();
319
320 server.register("/", () -> new MultiLineResponseHandler("0123456789abcd", 3));
321 final InetSocketAddress serverEndpoint = server.start();
322
323 client.start(H2Config.custom().setInitialWindowSize(16).build());
324 final Future<ClientSessionEndpoint> connectFuture = client.connect(
325 "localhost", serverEndpoint.getPort(), TIMEOUT);
326 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
327
328 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
329 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
330 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
331
332 @Override
333 protected String consumeData(
334 final ContentType contentType, final InputStream inputStream) throws IOException {
335 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
336
337 final StringBuilder buffer = new StringBuilder();
338 try {
339 final byte[] tmp = new byte[16];
340 int l;
341 while ((l = inputStream.read(tmp)) != -1) {
342 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
343 Thread.sleep(500);
344 }
345 } catch (final InterruptedException ex) {
346 Thread.currentThread().interrupt();
347 throw new InterruptedIOException(ex.getMessage());
348 }
349 return buffer.toString();
350 }
351 }),
352 null);
353
354 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
355 Assertions.assertNotNull(result1);
356 final HttpResponse response1 = result1.getHead();
357 Assertions.assertNotNull(response1);
358 Assertions.assertEquals(200, response1.getCode());
359 final String s1 = result1.getBody();
360 Assertions.assertNotNull(s1);
361 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
362 while (t1.hasMoreTokens()) {
363 Assertions.assertEquals("0123456789abcd", t1.nextToken());
364 }
365 }
366
367 @Test
368 public void testSlowRequestProducer() throws Exception {
369 final H2TestServer server = resources.server();
370 final H2TestClient client = resources.client();
371
372 server.register("*", () -> new EchoHandler(2048));
373 final InetSocketAddress serverEndpoint = server.start();
374
375 client.start();
376 final Future<ClientSessionEndpoint> connectFuture = client.connect(
377 "localhost", serverEndpoint.getPort(), TIMEOUT);
378 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
379
380 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
381 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
382 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
383
384 @Override
385 protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
386 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
387 try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
388 for (int i = 0; i < 500; i++) {
389 if (i % 100 == 0) {
390 writer.flush();
391 Thread.sleep(500);
392 }
393 writer.write("0123456789abcdef\r\n");
394 }
395 } catch (final InterruptedException ex) {
396 Thread.currentThread().interrupt();
397 throw new InterruptedIOException(ex.getMessage());
398 }
399 }
400
401 }),
402 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
403 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
404 Assertions.assertNotNull(result1);
405 final HttpResponse response1 = result1.getHead();
406 Assertions.assertNotNull(response1);
407 Assertions.assertEquals(200, response1.getCode());
408 final String s1 = result1.getBody();
409 Assertions.assertNotNull(s1);
410 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
411 while (t1.hasMoreTokens()) {
412 Assertions.assertEquals("0123456789abcdef", t1.nextToken());
413 }
414 }
415
416 @Test
417 public void testSlowResponseProducer() throws Exception {
418 final H2TestServer server = resources.server();
419 final H2TestClient client = resources.client();
420
421 server.register("*", () -> new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
422
423 @Override
424 protected void handle(
425 final HttpRequest request,
426 final InputStream requestStream,
427 final HttpResponse response,
428 final OutputStream responseStream,
429 final HttpContext context) throws IOException, HttpException {
430
431 if (!"/hello".equals(request.getPath())) {
432 response.setCode(HttpStatus.SC_NOT_FOUND);
433 return;
434 }
435 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
436 response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
437 return;
438 }
439 if (requestStream == null) {
440 return;
441 }
442 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
443 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
444 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
445 response.setCode(HttpStatus.SC_OK);
446 response.setHeader(h1);
447 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
448 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
449 try {
450 String l;
451 int count = 0;
452 while ((l = reader.readLine()) != null) {
453 writer.write(l);
454 writer.write("\r\n");
455 count++;
456 if (count % 500 == 0) {
457 Thread.sleep(500);
458 }
459 }
460 writer.flush();
461 } catch (final InterruptedException ex) {
462 Thread.currentThread().interrupt();
463 throw new InterruptedIOException(ex.getMessage());
464 }
465 }
466 }
467 });
468 final InetSocketAddress serverEndpoint = server.start();
469
470 client.start(H2Config.custom()
471 .setInitialWindowSize(512)
472 .build());
473
474 final Future<ClientSessionEndpoint> connectFuture = client.connect(
475 "localhost", serverEndpoint.getPort(), TIMEOUT);
476 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
477
478 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
479 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
480 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
481 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
482 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
483 Assertions.assertNotNull(result1);
484 final HttpResponse response1 = result1.getHead();
485 Assertions.assertNotNull(response1);
486 Assertions.assertEquals(200, response1.getCode());
487 final String s1 = result1.getBody();
488 Assertions.assertNotNull(s1);
489 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
490 while (t1.hasMoreTokens()) {
491 Assertions.assertEquals("0123456789abcd", t1.nextToken());
492 }
493 }
494
495 @Test
496 public void testPush() throws Exception {
497 final H2TestServer server = resources.server();
498 final H2TestClient client = resources.client();
499
500 final InetSocketAddress serverEndpoint = server.start();
501 server.register("/hello", () -> new MessageExchangeHandler<Void>(new DiscardingEntityConsumer<>()) {
502
503 @Override
504 protected void handle(
505 final Message<HttpRequest, Void> request,
506 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
507 final HttpContext context) throws IOException, HttpException {
508 responseTrigger.pushPromise(
509 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
510 context,
511 new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)));
512 responseTrigger.submitResponse(
513 AsyncResponseBuilder.create(HttpStatus.SC_OK).setEntity("Hi there", ContentType.TEXT_PLAIN).build(),
514 context);
515 }
516 });
517
518 client.start(H2Config.custom().setPushEnabled(true).build());
519
520 final BlockingQueue<Message<HttpResponse, String>> pushMessageQueue = new LinkedBlockingDeque<>();
521
522 final Future<ClientSessionEndpoint> connectFuture = client.connect(
523 "localhost", serverEndpoint.getPort(), TIMEOUT);
524 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
525
526 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
527 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
528 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
529 (request, context) -> new AbstractAsyncPushHandler<Message<HttpResponse, String>>(new BasicResponseConsumer<>(new StringAsyncEntityConsumer())) {
530
531 @Override
532 protected void handleResponse(
533 final HttpRequest promise,
534 final Message<HttpResponse, String> responseMessage) throws IOException, HttpException {
535 try {
536 pushMessageQueue.put(responseMessage);
537 } catch (final InterruptedException ex) {
538 Thread.currentThread().interrupt();
539 throw new InterruptedIOException(ex.getMessage());
540 }
541 }
542
543 },
544 null,
545 null);
546 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
547 Assertions.assertNotNull(result1);
548 final HttpResponse response1 = result1.getHead();
549 final String entity1 = result1.getBody();
550 Assertions.assertNotNull(response1);
551 Assertions.assertEquals(200, response1.getCode());
552 Assertions.assertEquals("Hi there", entity1);
553
554 final Message<HttpResponse, String> result2 = pushMessageQueue.poll(5, TimeUnit.SECONDS);
555 Assertions.assertNotNull(result2);
556 final HttpResponse response2 = result2.getHead();
557 final String entity2 = result2.getBody();
558 Assertions.assertEquals(200, response2.getCode());
559 Assertions.assertNotNull(entity2);
560 final StringTokenizer t1 = new StringTokenizer(entity2, "\r\n");
561 while (t1.hasMoreTokens()) {
562 Assertions.assertEquals("Pushing lots of stuff", t1.nextToken());
563 }
564 }
565
566 @Test
567 public void testPushRefused() throws Exception {
568 final H2TestServer server = resources.server();
569 final H2TestClient client = resources.client();
570
571 final BlockingQueue<Exception> pushResultQueue = new LinkedBlockingDeque<>();
572 final InetSocketAddress serverEndpoint = server.start();
573 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
574
575 @Override
576 public AsyncServerExchangeHandler get() {
577 return new MessageExchangeHandler<Void>(new DiscardingEntityConsumer<>()) {
578
579 @Override
580 protected void handle(
581 final Message<HttpRequest, Void> request,
582 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
583 final HttpContext context) throws IOException, HttpException {
584
585 responseTrigger.pushPromise(
586 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
587 context, new BasicPushProducer(AsyncEntityProducers.create("Pushing all sorts of stuff")) {
588
589 @Override
590 public void failed(final Exception cause) {
591 pushResultQueue.add(cause);
592 super.failed(cause);
593 }
594
595 });
596 responseTrigger.pushPromise(
597 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/more-stuff")),
598 context, new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)) {
599
600 @Override
601 public void failed(final Exception cause) {
602 pushResultQueue.add(cause);
603 super.failed(cause);
604 }
605
606 });
607 responseTrigger.submitResponse(
608 new BasicResponseProducer(HttpStatus.SC_OK, AsyncEntityProducers.create("Hi there")),
609 context);
610 }
611 };
612 }
613
614 });
615
616 client.start(H2Config.custom().setPushEnabled(true).build());
617
618 final Future<ClientSessionEndpoint> connectFuture = client.connect(
619 "localhost", serverEndpoint.getPort(), TIMEOUT);
620 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
621
622 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
623 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
624 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
625 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
626 Assertions.assertNotNull(result1);
627 final HttpResponse response1 = result1.getHead();
628 final String entity1 = result1.getBody();
629 Assertions.assertNotNull(response1);
630 Assertions.assertEquals(200, response1.getCode());
631 Assertions.assertEquals("Hi there", entity1);
632
633 final Object result2 = pushResultQueue.poll(5, TimeUnit.SECONDS);
634 Assertions.assertNotNull(result2);
635 Assertions.assertTrue(result2 instanceof H2StreamResetException);
636 Assertions.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result2).getCode());
637
638 final Object result3 = pushResultQueue.poll(5, TimeUnit.SECONDS);
639 Assertions.assertNotNull(result3);
640 Assertions.assertTrue(result3 instanceof H2StreamResetException);
641 Assertions.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result3).getCode());
642 }
643
644 @Test
645 public void testExcessOfConcurrentStreams() throws Exception {
646 final H2TestServer server = resources.server();
647 final H2TestClient client = resources.client();
648
649 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
650 final InetSocketAddress serverEndpoint = server.start(H2Config.custom().setMaxConcurrentStreams(20).build());
651
652 client.start(H2Config.custom().setMaxConcurrentStreams(20).build());
653 final Future<ClientSessionEndpoint> connectFuture = client.connect(
654 "localhost", serverEndpoint.getPort(), TIMEOUT);
655 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
656
657 final Queue<Future<Message<HttpResponse, Void>>> queue = new LinkedList<>();
658 for (int i = 0; i < 2000; i++) {
659 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
660 final Future<Message<HttpResponse, Void>> future = streamEndpoint.execute(
661 new BasicRequestProducer(request1, null),
662 new BasicResponseConsumer<>(new DiscardingEntityConsumer<>()), null);
663 queue.add(future);
664 }
665
666 while (!queue.isEmpty()) {
667 final Future<Message<HttpResponse, Void>> future = queue.remove();
668 final Message<HttpResponse, Void> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
669 Assertions.assertNotNull(result);
670 final HttpResponse response = result.getHead();
671 Assertions.assertNotNull(response);
672 Assertions.assertEquals(200, response.getCode());
673 }
674 }
675
676 @Test
677 public void testExpectationFailed() throws Exception {
678 final H2TestServer server = resources.server();
679 final H2TestClient client = resources.client();
680
681 server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
682
683 @Override
684 protected void handle(
685 final Message<HttpRequest, String> request,
686 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
687 final HttpContext context) throws IOException, HttpException {
688 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
689
690 }
691 });
692 final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
693
694 @Override
695 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
696 final Header h = request.getFirstHeader("password");
697 if (h != null && "secret".equals(h.getValue())) {
698 return null;
699 } else {
700 return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
701 }
702 }
703 }, H2Config.DEFAULT);
704
705 client.start();
706 final Future<ClientSessionEndpoint> connectFuture = client.connect(
707 "localhost", serverEndpoint.getPort(), TIMEOUT);
708 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
709
710 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
711 request1.addHeader("password", "secret");
712 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
713 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
714 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
715 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
716 Assertions.assertNotNull(result1);
717 final HttpResponse response1 = result1.getHead();
718 Assertions.assertNotNull(response1);
719 Assertions.assertEquals(200, response1.getCode());
720 Assertions.assertNotNull("All is well", result1.getBody());
721
722 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
723 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
724 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
725 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
726 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
727 Assertions.assertNotNull(result2);
728 final HttpResponse response2 = result2.getHead();
729 Assertions.assertNotNull(response2);
730 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
731 Assertions.assertNotNull("You shall not pass", result2.getBody());
732 }
733
734 @Test
735 public void testPrematureResponse() throws Exception {
736 final H2TestServer server = resources.server();
737 final H2TestClient client = resources.client();
738
739 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
740
741 @Override
742 public AsyncServerExchangeHandler get() {
743 return new AsyncServerExchangeHandler() {
744
745 private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
746
747 @Override
748 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
749 capacityChannel.update(Integer.MAX_VALUE);
750 }
751
752 @Override
753 public void consume(final ByteBuffer src) throws IOException {
754 }
755
756 @Override
757 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
758 }
759
760 @Override
761 public void handleRequest(
762 final HttpRequest request,
763 final EntityDetails entityDetails,
764 final ResponseChannel responseChannel,
765 final HttpContext context) throws HttpException, IOException {
766 final AsyncResponseProducer producer;
767 final Header h = request.getFirstHeader("password");
768 if (h != null && "secret".equals(h.getValue())) {
769 producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
770 } else {
771 producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
772 }
773 responseProducer.set(producer);
774 producer.sendResponse(responseChannel, context);
775 }
776
777 @Override
778 public int available() {
779 final AsyncResponseProducer producer = this.responseProducer.get();
780 return producer.available();
781 }
782
783 @Override
784 public void produce(final DataStreamChannel channel) throws IOException {
785 final AsyncResponseProducer producer = this.responseProducer.get();
786 producer.produce(channel);
787 }
788
789 @Override
790 public void failed(final Exception cause) {
791 }
792
793 @Override
794 public void releaseResources() {
795 }
796 };
797 }
798
799 });
800 final InetSocketAddress serverEndpoint = server.start();
801
802 client.start();
803 final Future<ClientSessionEndpoint> connectFuture = client.connect(
804 "localhost", serverEndpoint.getPort(), TIMEOUT);
805 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
806
807 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
808 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
809 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
810 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
811 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
812 Assertions.assertNotNull(result1);
813 final HttpResponse response1 = result1.getHead();
814 Assertions.assertNotNull(response1);
815 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
816 Assertions.assertNotNull("You shall not pass", result1.getBody());
817 }
818
819 @Test
820 public void testMessageWithTrailers() throws Exception {
821 final H2TestServer server = resources.server();
822 final H2TestClient client = resources.client();
823
824 server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
825
826 @Override
827 protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
828 final HttpRequest request,
829 final EntityDetails entityDetails,
830 final HttpContext context) throws HttpException {
831 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
832 }
833
834 @Override
835 protected void handle(
836 final Message<HttpRequest, String> requestMessage,
837 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
838 final HttpContext context) throws HttpException, IOException {
839 responseTrigger.submitResponse(new BasicResponseProducer(
840 HttpStatus.SC_OK,
841 new DigestingEntityProducer("MD5",
842 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
843 }
844 });
845 final InetSocketAddress serverEndpoint = server.start();
846
847 client.start();
848
849 final Future<ClientSessionEndpoint> connectFuture = client.connect(
850 "localhost", serverEndpoint.getPort(), TIMEOUT);
851 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
852
853 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
854 final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
855 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
856 new BasicRequestProducer(request1, null),
857 new BasicResponseConsumer<>(entityConsumer), null);
858 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
859 Assertions.assertNotNull(result1);
860 final HttpResponse response1 = result1.getHead();
861 Assertions.assertNotNull(response1);
862 Assertions.assertEquals(200, response1.getCode());
863 Assertions.assertEquals("Hello back with some trailers", result1.getBody());
864
865 final List<Header> trailers = entityConsumer.getTrailers();
866 Assertions.assertNotNull(trailers);
867 Assertions.assertEquals(2, trailers.size());
868 final Map<String, String> map = new HashMap<>();
869 for (final Header header: trailers) {
870 map.put(TextUtils.toLowerCase(header.getName()), header.getValue());
871 }
872 final String digest = TextUtils.toHexString(entityConsumer.getDigest());
873 Assertions.assertEquals("MD5", map.get("digest-algo"));
874 Assertions.assertEquals(digest, map.get("digest"));
875 }
876
877 @Test
878 public void testConnectionPing() throws Exception {
879 final H2TestServer server = resources.server();
880 final H2TestClient client = resources.client();
881
882 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
883 final InetSocketAddress serverEndpoint = server.start();
884
885 client.start();
886 final Future<ClientSessionEndpoint> connectFuture = client.connect(
887 "localhost", serverEndpoint.getPort(), TIMEOUT);
888 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
889
890 final int n = 10;
891 final CountDownLatch latch = new CountDownLatch(n);
892 final AtomicInteger count = new AtomicInteger(0);
893 for (int i = 0; i < n; i++) {
894 streamEndpoint.execute(
895 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
896 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
897 streamEndpoint.execute(new PingCommand(new BasicPingHandler(result -> {
898 if (result) {
899 count.incrementAndGet();
900 }
901 latch.countDown();
902 })), Command.Priority.NORMAL);
903
904 }
905 Assertions.assertTrue(latch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
906 Assertions.assertEquals(n, count.get());
907 }
908
909 @Test
910 public void testRequestWithInvalidConnectionHeader() throws Exception {
911 final H2TestServer server = resources.server();
912 final H2TestClient client = resources.client();
913
914 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
915 final InetSocketAddress serverEndpoint = server.start();
916
917 client.start();
918
919 final Future<IOSession> sessionFuture = client.requestSession(new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
920 final IOSession session = sessionFuture.get();
921 try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(session)) {
922
923 final HttpRequest request = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
924 request.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
925 final HttpCoreContext coreContext = HttpCoreContext.create();
926 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
927 new BasicRequestProducer(request, null),
928 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
929 coreContext, null);
930 final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
931 future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
932 assertThat(exception.getCause(), CoreMatchers.instanceOf(ProtocolException.class));
933
934 final EndpointDetails endpointDetails = coreContext.getEndpointDetails();
935 assertThat(endpointDetails.getRequestCount(), CoreMatchers.equalTo(0L));
936 }
937 }
938
939 @Test
940 public void testHeaderTooLarge() throws Exception {
941 final H2TestServer server = resources.server();
942 final H2TestClient client = resources.client();
943
944 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
945 final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
946 .setMaxHeaderListSize(100)
947 .build());
948 client.start();
949
950 final Future<ClientSessionEndpoint> connectFuture = client.connect(
951 "localhost", serverEndpoint.getPort(), TIMEOUT);
952 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
953
954 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
955 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
956 "1234567890123456789012345678901234567890");
957 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
958 new BasicRequestProducer(request1, null),
959 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
960 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
961 Assertions.assertNotNull(result1);
962 final HttpResponse response1 = result1.getHead();
963 Assertions.assertNotNull(response1);
964 Assertions.assertEquals(431, response1.getCode());
965 Assertions.assertEquals("Maximum header list size exceeded", result1.getBody());
966 }
967
968 @Test
969 public void testHeaderTooLargePost() throws Exception {
970 final H2TestServer server = resources.server();
971 final H2TestClient client = resources.client();
972
973 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
974 final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
975 .setMaxHeaderListSize(100)
976 .build());
977 client.start(
978 new DefaultHttpProcessor(H2RequestContent.INSTANCE, H2RequestTargetHost.INSTANCE, H2RequestConnControl.INSTANCE),
979 H2Config.DEFAULT);
980
981 final Future<ClientSessionEndpoint> connectFuture = client.connect(
982 "localhost", serverEndpoint.getPort(), TIMEOUT);
983 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
984
985 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
986 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
987 "1234567890123456789012345678901234567890");
988
989 final byte[] b = new byte[2048];
990 for (int i = 0; i < b.length; i++) {
991 b[i] = (byte) ('a' + i % 10);
992 }
993
994 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
995 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
996 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
997 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
998 Assertions.assertNotNull(result1);
999 final HttpResponse response1 = result1.getHead();
1000 Assertions.assertNotNull(response1);
1001 Assertions.assertEquals(431, response1.getCode());
1002 Assertions.assertEquals("Maximum header list size exceeded", result1.getBody());
1003 }
1004
1005 }