1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.net.InetSocketAddress;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.LinkedList;
34 import java.util.Queue;
35 import java.util.Random;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Future;
38
39 import org.apache.hc.core5.concurrent.Cancellable;
40 import org.apache.hc.core5.concurrent.FutureCallback;
41 import org.apache.hc.core5.function.Supplier;
42 import org.apache.hc.core5.http.ContentType;
43 import org.apache.hc.core5.http.HttpHost;
44 import org.apache.hc.core5.http.HttpResponse;
45 import org.apache.hc.core5.http.HttpStatus;
46 import org.apache.hc.core5.http.Message;
47 import org.apache.hc.core5.http.Method;
48 import org.apache.hc.core5.http.URIScheme;
49 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
50 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
51 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
52 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
53 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
54 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
55 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
56 import org.apache.hc.core5.http.protocol.HttpCoreContext;
57 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester;
58 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap;
59 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
60 import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy;
61 import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy;
62 import org.apache.hc.core5.io.CloseMode;
63 import org.apache.hc.core5.reactor.IOReactorConfig;
64 import org.apache.hc.core5.reactor.ListenerEndpoint;
65 import org.apache.hc.core5.testing.SSLTestContexts;
66 import org.apache.hc.core5.util.ReflectionUtils;
67 import org.apache.hc.core5.util.TimeValue;
68 import org.apache.hc.core5.util.Timeout;
69 import org.hamcrest.CoreMatchers;
70 import org.junit.Assert;
71 import org.junit.Assume;
72 import org.junit.Before;
73 import org.junit.BeforeClass;
74 import org.junit.Rule;
75 import org.junit.Test;
76 import org.junit.rules.ExternalResource;
77 import org.junit.runner.RunWith;
78 import org.junit.runners.Parameterized;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 @RunWith(Parameterized.class)
83 public class H2ServerAndMultiplexingRequesterTest {
84
85 private final Logger log = LoggerFactory.getLogger(getClass());
86
87 @Parameterized.Parameters(name = "{0}")
88 public static Collection<Object[]> protocols() {
89 return Arrays.asList(new Object[][]{
90 { URIScheme.HTTP },
91 { URIScheme.HTTPS }
92 });
93 }
94 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
95
96 private final URIScheme scheme;
97
98 public H2ServerAndMultiplexingRequesterTest(final URIScheme scheme) {
99 this.scheme = scheme;
100 }
101
102 private HttpAsyncServer server;
103
104 @Rule
105 public ExternalResource serverResource = new ExternalResource() {
106
107 @Override
108 protected void before() throws Throwable {
109 log.debug("Starting up test server");
110 server = H2ServerBootstrap.bootstrap()
111 .setIOReactorConfig(
112 IOReactorConfig.custom()
113 .setSoTimeout(TIMEOUT)
114 .build())
115 .setTlsStrategy(scheme == URIScheme.HTTPS ? new H2ServerTlsStrategy(
116 SSLTestContexts.createServerSSLContext(),
117 SecureAllPortsStrategy.INSTANCE) : null)
118 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
119 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
120 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
121 .setStreamListener(LoggingH2StreamListener.INSTANCE)
122 .register("*", new Supplier<AsyncServerExchangeHandler>() {
123
124 @Override
125 public AsyncServerExchangeHandler get() {
126 return new EchoHandler(2048);
127 }
128
129 })
130 .create();
131 }
132
133 @Override
134 protected void after() {
135 log.debug("Shutting down test server");
136 if (server != null) {
137 server.close(CloseMode.GRACEFUL);
138 }
139 }
140
141 };
142
143 private H2MultiplexingRequester requester;
144
145 @Rule
146 public ExternalResource clientResource = new ExternalResource() {
147
148 @Override
149 protected void before() throws Throwable {
150 log.debug("Starting up test client");
151 requester = H2MultiplexingRequesterBootstrap.bootstrap()
152 .setIOReactorConfig(IOReactorConfig.custom()
153 .setSoTimeout(TIMEOUT)
154 .build())
155 .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
156 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
157 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
158 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
159 .setStreamListener(LoggingH2StreamListener.INSTANCE)
160 .create();
161 }
162
163 @Override
164 protected void after() {
165 log.debug("Shutting down test client");
166 if (requester != null) {
167 requester.close(CloseMode.GRACEFUL);
168 }
169 }
170
171 };
172
173 private static int javaVersion;
174
175 @BeforeClass
176 public static void determineJavaVersion() {
177 javaVersion = ReflectionUtils.determineJRELevel();
178 }
179
180 @Before
181 public void checkVersion() {
182 if (scheme == URIScheme.HTTPS) {
183 Assume.assumeTrue("Java version must be 1.8 or greater", javaVersion > 7);
184 }
185 }
186
187 @Test
188 public void testSequentialRequests() throws Exception {
189 server.start();
190 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
191 final ListenerEndpoint listener = future.get();
192 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
193 requester.start();
194
195 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
196 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
197 new BasicRequestProducer(Method.POST, target, "/stuff",
198 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
199 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
200 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
201 Assert.assertThat(message1, CoreMatchers.notNullValue());
202 final HttpResponse response1 = message1.getHead();
203 Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
204 final String body1 = message1.getBody();
205 Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
206
207 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
208 new BasicRequestProducer(Method.POST, target, "/other-stuff",
209 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
210 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
211 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
212 Assert.assertThat(message2, CoreMatchers.notNullValue());
213 final HttpResponse response2 = message2.getHead();
214 Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
215 final String body2 = message2.getBody();
216 Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
217
218 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
219 new BasicRequestProducer(Method.POST, target, "/more-stuff",
220 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
221 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
222 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
223 Assert.assertThat(message3, CoreMatchers.notNullValue());
224 final HttpResponse response3 = message3.getHead();
225 Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
226 final String body3 = message3.getBody();
227 Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
228 }
229
230 @Test
231 public void testMultiplexedRequests() throws Exception {
232 server.start();
233 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
234 final ListenerEndpoint listener = future.get();
235 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
236 requester.start();
237
238 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
239 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
240
241 queue.add(requester.execute(
242 new BasicRequestProducer(Method.POST, target, "/stuff",
243 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
244 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
245 queue.add(requester.execute(
246 new BasicRequestProducer(Method.POST, target, "/other-stuff",
247 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
248 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
249 queue.add(requester.execute(
250 new BasicRequestProducer(Method.POST, target, "/more-stuff",
251 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
252 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
253
254 while (!queue.isEmpty()) {
255 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
256 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
257 Assert.assertThat(message, CoreMatchers.notNullValue());
258 final HttpResponse response = message.getHead();
259 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
260 final String body = message.getBody();
261 Assert.assertThat(body, CoreMatchers.containsString("stuff"));
262 }
263 }
264
265 @Test
266 public void testValidityCheck() throws Exception {
267 server.start();
268 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
269 final ListenerEndpoint listener = future.get();
270 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
271 requester.start();
272 requester.setValidateAfterInactivity(TimeValue.ofMilliseconds(10));
273
274 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
275 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
276 new BasicRequestProducer(Method.POST, target, "/stuff",
277 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
278 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
279 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
280 Assert.assertThat(message1, CoreMatchers.notNullValue());
281 final HttpResponse response1 = message1.getHead();
282 Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
283 final String body1 = message1.getBody();
284 Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
285
286 Thread.sleep(100);
287
288 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
289 new BasicRequestProducer(Method.POST, target, "/other-stuff",
290 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
291 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
292 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
293 Assert.assertThat(message2, CoreMatchers.notNullValue());
294 final HttpResponse response2 = message2.getHead();
295 Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
296 final String body2 = message2.getBody();
297 Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
298
299 Thread.sleep(100);
300
301 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
302 new BasicRequestProducer(Method.POST, target, "/more-stuff",
303 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
304 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
305 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
306 Assert.assertThat(message3, CoreMatchers.notNullValue());
307 final HttpResponse response3 = message3.getHead();
308 Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
309 final String body3 = message3.getBody();
310 Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
311 }
312
313 @Test
314 public void testMultiplexedRequestCancellation() throws Exception {
315 server.start();
316 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
317 final ListenerEndpoint listener = future.get();
318 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
319 requester.start();
320
321 final int reqNo = 20;
322
323 final CountDownLatch countDownLatch = new CountDownLatch(reqNo);
324 final Random random = new Random();
325 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
326 for (int i = 0; i < reqNo; i++) {
327 final Cancellable cancellable = requester.execute(
328 new BasicClientExchangeHandler<>(new BasicRequestProducer(Method.POST, target, "/stuff",
329 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
330 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
331 new FutureCallback<Message<HttpResponse, String>>() {
332
333 @Override
334 public void completed(final Message<HttpResponse, String> result) {
335 countDownLatch.countDown();
336 }
337
338 @Override
339 public void failed(final Exception ex) {
340 countDownLatch.countDown();
341 }
342
343 @Override
344 public void cancelled() {
345 countDownLatch.countDown();
346 }
347
348 }),
349 TIMEOUT,
350 HttpCoreContext.create());
351 Thread.sleep(random.nextInt(10));
352 cancellable.cancel();
353 }
354 Assert.assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
355 }
356
357 }