1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import static org.hamcrest.MatcherAssert.assertThat;
31
32 import java.net.InetSocketAddress;
33 import java.util.LinkedList;
34 import java.util.Queue;
35 import java.util.Random;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Future;
38
39 import org.apache.hc.core5.concurrent.Cancellable;
40 import org.apache.hc.core5.concurrent.FutureCallback;
41 import org.apache.hc.core5.http.ContentType;
42 import org.apache.hc.core5.http.HttpHost;
43 import org.apache.hc.core5.http.HttpResponse;
44 import org.apache.hc.core5.http.HttpStatus;
45 import org.apache.hc.core5.http.Message;
46 import org.apache.hc.core5.http.Method;
47 import org.apache.hc.core5.http.URIScheme;
48 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
49 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
50 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
51 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
52 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
53 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
54 import org.apache.hc.core5.http.protocol.HttpCoreContext;
55 import org.apache.hc.core5.http.protocol.UriPatternMatcher;
56 import org.apache.hc.core5.http2.HttpVersionPolicy;
57 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester;
58 import org.apache.hc.core5.reactor.IOReactorConfig;
59 import org.apache.hc.core5.reactor.ListenerEndpoint;
60 import org.apache.hc.core5.testing.nio.extension.H2AsyncServerResource;
61 import org.apache.hc.core5.testing.nio.extension.H2MultiplexingRequesterResource;
62 import org.apache.hc.core5.util.TimeValue;
63 import org.apache.hc.core5.util.Timeout;
64 import org.hamcrest.CoreMatchers;
65 import org.junit.jupiter.api.Test;
66 import org.junit.jupiter.api.extension.RegisterExtension;
67
68 public abstract class H2CoreTransportMultiplexingTest {
69
70 private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
71
72 private final URIScheme scheme;
73 @RegisterExtension
74 private final H2AsyncServerResource serverResource;
75 @RegisterExtension
76 private final H2MultiplexingRequesterResource clientResource;
77
78 public H2CoreTransportMultiplexingTest(final URIScheme scheme) {
79 this.scheme = scheme;
80 this.serverResource = new H2AsyncServerResource(bootstrap -> bootstrap
81 .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
82 .setIOReactorConfig(
83 IOReactorConfig.custom()
84 .setSoTimeout(TIMEOUT)
85 .build())
86 .setLookupRegistry(new UriPatternMatcher<>())
87 .register("*", () -> new EchoHandler(2048))
88 );
89 this.clientResource = new H2MultiplexingRequesterResource(bootstrap -> bootstrap
90 .setIOReactorConfig(IOReactorConfig.custom()
91 .setSoTimeout(TIMEOUT)
92 .build())
93 );
94 }
95
96 @Test
97 public void testSequentialRequests() throws Exception {
98 final HttpAsyncServer server = serverResource.start();
99 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
100 final ListenerEndpoint listener = future.get();
101 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
102 final H2MultiplexingRequester requester = clientResource.start();
103
104 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
105 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
106 new BasicRequestProducer(Method.POST, target, "/stuff",
107 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
108 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
109 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
110 assertThat(message1, CoreMatchers.notNullValue());
111 final HttpResponse response1 = message1.getHead();
112 assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
113 final String body1 = message1.getBody();
114 assertThat(body1, CoreMatchers.equalTo("some stuff"));
115
116 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
117 new BasicRequestProducer(Method.POST, target, "/other-stuff",
118 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
119 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
120 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
121 assertThat(message2, CoreMatchers.notNullValue());
122 final HttpResponse response2 = message2.getHead();
123 assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
124 final String body2 = message2.getBody();
125 assertThat(body2, CoreMatchers.equalTo("some other stuff"));
126
127 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
128 new BasicRequestProducer(Method.POST, target, "/more-stuff",
129 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
130 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
131 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
132 assertThat(message3, CoreMatchers.notNullValue());
133 final HttpResponse response3 = message3.getHead();
134 assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
135 final String body3 = message3.getBody();
136 assertThat(body3, CoreMatchers.equalTo("some more stuff"));
137 }
138
139 @Test
140 public void testMultiplexedRequests() throws Exception {
141 final HttpAsyncServer server = serverResource.start();
142 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
143 final ListenerEndpoint listener = future.get();
144 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
145 final H2MultiplexingRequester requester = clientResource.start();
146
147 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
148 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
149
150 queue.add(requester.execute(
151 new BasicRequestProducer(Method.POST, target, "/stuff",
152 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
153 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
154 queue.add(requester.execute(
155 new BasicRequestProducer(Method.POST, target, "/other-stuff",
156 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
157 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
158 queue.add(requester.execute(
159 new BasicRequestProducer(Method.POST, target, "/more-stuff",
160 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
161 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
162
163 while (!queue.isEmpty()) {
164 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
165 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
166 assertThat(message, CoreMatchers.notNullValue());
167 final HttpResponse response = message.getHead();
168 assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
169 final String body = message.getBody();
170 assertThat(body, CoreMatchers.containsString("stuff"));
171 }
172 }
173
174 @Test
175 public void testValidityCheck() throws Exception {
176 final HttpAsyncServer server = serverResource.start();
177 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
178 final ListenerEndpoint listener = future.get();
179 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
180 final H2MultiplexingRequester requester = clientResource.start();
181 requester.setValidateAfterInactivity(TimeValue.ofMilliseconds(10));
182
183 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
184 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
185 new BasicRequestProducer(Method.POST, target, "/stuff",
186 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
187 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
188 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
189 assertThat(message1, CoreMatchers.notNullValue());
190 final HttpResponse response1 = message1.getHead();
191 assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
192 final String body1 = message1.getBody();
193 assertThat(body1, CoreMatchers.equalTo("some stuff"));
194
195 Thread.sleep(100);
196
197 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
198 new BasicRequestProducer(Method.POST, target, "/other-stuff",
199 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
200 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
201 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
202 assertThat(message2, CoreMatchers.notNullValue());
203 final HttpResponse response2 = message2.getHead();
204 assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
205 final String body2 = message2.getBody();
206 assertThat(body2, CoreMatchers.equalTo("some other stuff"));
207
208 Thread.sleep(100);
209
210 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
211 new BasicRequestProducer(Method.POST, target, "/more-stuff",
212 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
213 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
214 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
215 assertThat(message3, CoreMatchers.notNullValue());
216 final HttpResponse response3 = message3.getHead();
217 assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
218 final String body3 = message3.getBody();
219 assertThat(body3, CoreMatchers.equalTo("some more stuff"));
220 }
221
222 @Test
223 public void testMultiplexedRequestCancellation() throws Exception {
224 final HttpAsyncServer server = serverResource.start();
225 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
226 final ListenerEndpoint listener = future.get();
227 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
228 final H2MultiplexingRequester requester = clientResource.start();
229
230 final int reqNo = 20;
231
232 final CountDownLatch countDownLatch = new CountDownLatch(reqNo);
233 final Random random = new Random();
234 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
235 for (int i = 0; i < reqNo; i++) {
236 final Cancellable cancellable = requester.execute(
237 new BasicClientExchangeHandler<>(new BasicRequestProducer(Method.POST, target, "/stuff",
238 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
239 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
240 new FutureCallback<Message<HttpResponse, String>>() {
241
242 @Override
243 public void completed(final Message<HttpResponse, String> result) {
244 countDownLatch.countDown();
245 }
246
247 @Override
248 public void failed(final Exception ex) {
249 countDownLatch.countDown();
250 }
251
252 @Override
253 public void cancelled() {
254 countDownLatch.countDown();
255 }
256
257 }),
258 TIMEOUT,
259 HttpCoreContext.create());
260 Thread.sleep(random.nextInt(10));
261 cancellable.cancel();
262 }
263 assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
264 }
265
266 }