1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.testing.reactive;
28
29 import static java.lang.String.format;
30
31 import java.io.ByteArrayOutputStream;
32 import java.net.InetSocketAddress;
33 import java.net.SocketTimeoutException;
34 import java.net.URI;
35 import java.nio.ByteBuffer;
36 import java.nio.channels.Channels;
37 import java.nio.channels.WritableByteChannel;
38 import java.util.Arrays;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.Random;
42 import java.util.concurrent.CancellationException;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicReference;
47
48 import org.apache.hc.core5.function.Supplier;
49 import org.apache.hc.core5.http.HttpResponse;
50 import org.apache.hc.core5.http.HttpStreamResetException;
51 import org.apache.hc.core5.http.Message;
52 import org.apache.hc.core5.http.Method;
53 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
54 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
55 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
56 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
57 import org.apache.hc.core5.http2.HttpVersionPolicy;
58 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
59 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
60 import org.apache.hc.core5.io.CloseMode;
61 import org.apache.hc.core5.reactive.ReactiveEntityProducer;
62 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
63 import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
64 import org.apache.hc.core5.reactor.IOReactorConfig;
65 import org.apache.hc.core5.reactor.ListenerEndpoint;
66 import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
67 import org.apache.hc.core5.testing.nio.LoggingExceptionCallback;
68 import org.apache.hc.core5.testing.nio.LoggingH2StreamListener;
69 import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener;
70 import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator;
71 import org.apache.hc.core5.testing.nio.LoggingIOSessionListener;
72 import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription;
73 import org.apache.hc.core5.util.TextUtils;
74 import org.apache.hc.core5.util.Timeout;
75 import org.junit.Assert;
76 import org.junit.Rule;
77 import org.junit.Test;
78 import org.junit.rules.ExternalResource;
79 import org.junit.runner.RunWith;
80 import org.junit.runners.Parameterized;
81 import org.reactivestreams.Publisher;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85 import io.reactivex.Flowable;
86 import io.reactivex.Observable;
87 import io.reactivex.functions.Action;
88 import io.reactivex.functions.Consumer;
89
90 @RunWith(Parameterized.class)
91 public class ReactiveClientTest {
92
93 private final Logger log = LoggerFactory.getLogger(getClass());
94
95 @Parameterized.Parameters(name = "{0}")
96 public static Collection<Object[]> protocols() {
97 return Arrays.asList(new Object[][]{
98 { HttpVersionPolicy.FORCE_HTTP_1 },
99 { HttpVersionPolicy.FORCE_HTTP_2 }
100 });
101 }
102 private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(30);
103 private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(60);
104
105 private static final Random RANDOM = new Random();
106
107 private final HttpVersionPolicy versionPolicy;
108
109 public ReactiveClientTest(final HttpVersionPolicy httpVersionPolicy) {
110 this.versionPolicy = httpVersionPolicy;
111 }
112
113 private HttpAsyncServer server;
114
115 @Rule
116 public ExternalResource serverResource = new ExternalResource() {
117
118 @Override
119 protected void before() throws Throwable {
120 log.debug("Starting up test server");
121 server = H2ServerBootstrap.bootstrap()
122 .setVersionPolicy(versionPolicy)
123 .setIOReactorConfig(
124 IOReactorConfig.custom()
125 .setSoTimeout(SOCKET_TIMEOUT)
126 .build())
127 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
128 .setStreamListener(LoggingH2StreamListener.INSTANCE)
129 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
130 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
131 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
132 .register("*", new Supplier<AsyncServerExchangeHandler>() {
133
134 @Override
135 public AsyncServerExchangeHandler get() {
136 return new ReactiveServerExchangeHandler(new ReactiveEchoProcessor());
137 }
138
139 })
140 .create();
141 }
142
143 @Override
144 protected void after() {
145 log.debug("Shutting down test server");
146 if (server != null) {
147 server.close(CloseMode.GRACEFUL);
148 }
149 }
150
151 };
152
153 private HttpAsyncRequester requester;
154
155 @Rule
156 public ExternalResource clientResource = new ExternalResource() {
157
158 @Override
159 protected void before() throws Throwable {
160 log.debug("Starting up test client");
161 requester = H2RequesterBootstrap.bootstrap()
162 .setVersionPolicy(versionPolicy)
163 .setIOReactorConfig(IOReactorConfig.custom()
164 .setSoTimeout(SOCKET_TIMEOUT)
165 .build())
166 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
167 .setStreamListener(LoggingH2StreamListener.INSTANCE)
168 .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
169 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
170 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
171 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
172 .create();
173 }
174
175 @Override
176 protected void after() {
177 log.debug("Shutting down test client");
178 if (requester != null) {
179 requester.close(CloseMode.GRACEFUL);
180 }
181 }
182
183 };
184
185 @Test
186 public void testSimpleRequest() throws Exception {
187 final InetSocketAddress address = startClientAndServer();
188 final byte[] input = new byte[1024];
189 RANDOM.nextBytes(input);
190 final Publisher<ByteBuffer> publisher = Flowable.just(ByteBuffer.wrap(input));
191 final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, input.length, null, null);
192
193 final BasicRequestProducer request = getRequestProducer(address, producer);
194
195 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
196 requester.execute(request, consumer, SOCKET_TIMEOUT, null);
197
198 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
199 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
200
201 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
202 final WritableByteChannel writableByteChannel = Channels.newChannel(byteArrayOutputStream);
203 for (final ByteBuffer byteBuffer : Observable.fromPublisher(response.getBody()).toList().blockingGet()) {
204 writableByteChannel.write(byteBuffer);
205 }
206 writableByteChannel.close();
207 final byte[] output = byteArrayOutputStream.toByteArray();
208 Assert.assertArrayEquals(input, output);
209 }
210
211 private BasicRequestProducer getRequestProducer(final InetSocketAddress address, final ReactiveEntityProducer producer) {
212 return new BasicRequestProducer(Method.POST, URI.create("http://localhost:" + address.getPort()), producer);
213 }
214
215 @Test
216 public void testLongRunningRequest() throws Exception {
217 final InetSocketAddress address = startClientAndServer();
218 final long expectedLength = 6_554_200L;
219 final AtomicReference<String> expectedHash = new AtomicReference<>(null);
220 final Flowable<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, expectedHash);
221 final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
222 final BasicRequestProducer request = getRequestProducer(address, producer);
223
224 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
225 requester.execute(request, consumer, SOCKET_TIMEOUT, null);
226 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
227 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
228 final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
229
230 Assert.assertEquals(expectedLength, desc.length);
231 Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
232 }
233
234 @Test
235 public void testManySmallBuffers() throws Exception {
236
237
238
239
240 final InetSocketAddress address = startClientAndServer();
241 for (int i = 0; i < 10; i++) {
242 final long expectedLength = 1_024_000;
243 final int maximumBlockSize = 1024;
244 final AtomicReference<String> expectedHash = new AtomicReference<>(null);
245 final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, maximumBlockSize, expectedHash);
246 final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
247 final BasicRequestProducer request = getRequestProducer(address, producer);
248
249 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
250 requester.execute(request, consumer, SOCKET_TIMEOUT, null);
251 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
252 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
253 final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
254
255 Assert.assertEquals(expectedLength, desc.length);
256 Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
257 }
258 }
259
260 @Test
261 public void testRequestError() throws Exception {
262 final InetSocketAddress address = startClientAndServer();
263 final RuntimeException exceptionThrown = new RuntimeException("Test");
264 final Publisher<ByteBuffer> publisher = Flowable.error(exceptionThrown);
265 final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, 100, null, null);
266
267 final BasicRequestProducer request = getRequestProducer(address, producer);
268
269 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
270
271 final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
272
273 try {
274 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
275 Assert.fail("Expected exception");
276 } catch (final ExecutionException ex) {
277 Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
278 Assert.assertSame(exceptionThrown, ex.getCause().getCause());
279 }
280 }
281
282 @Test
283 public void testRequestTimeout() throws Exception {
284 final InetSocketAddress address = startClientAndServer();
285 final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
286 final Publisher<ByteBuffer> publisher = Flowable.<ByteBuffer>never()
287 .doOnCancel(new Action() {
288 @Override
289 public void run() {
290 requestPublisherWasCancelled.set(true);
291 }
292 });
293 final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
294 final BasicRequestProducer request = getRequestProducer(address, producer);
295
296 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
297 final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
298
299 try {
300 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
301 } catch (final ExecutionException ex) {
302 Assert.assertTrue(requestPublisherWasCancelled.get());
303 final Throwable cause = ex.getCause();
304 if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_1) {
305 Assert.assertTrue("Expected SocketTimeoutException, but got " + cause.getClass().getName(),
306 cause instanceof SocketTimeoutException);
307 } else if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_2) {
308 Assert.assertTrue(format("Expected RST_STREAM, but %s was thrown", cause.getClass().getName()),
309 cause instanceof HttpStreamResetException);
310 } else {
311 Assert.fail("Unknown HttpVersionPolicy: " + versionPolicy);
312 }
313 }
314 }
315
316 @Test
317 public void testResponseCancellation() throws Exception {
318 final InetSocketAddress address = startClientAndServer();
319 final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
320 final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
321 final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(Long.MAX_VALUE, 1024, null)
322 .doOnCancel(new Action() {
323 @Override
324 public void run() throws Exception {
325 requestPublisherWasCancelled.set(true);
326 }
327 })
328 .doOnError(new Consumer<Throwable>() {
329 @Override
330 public void accept(final Throwable throwable) throws Exception {
331 requestStreamError.set(throwable);
332 }
333 });
334 final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
335 final BasicRequestProducer request = getRequestProducer(address, producer);
336
337 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
338 final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
339 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
340 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
341
342 final AtomicBoolean responsePublisherWasCancelled = new AtomicBoolean(false);
343 final List<ByteBuffer> outputBuffers = Flowable.fromPublisher(response.getBody())
344 .doOnCancel(new Action() {
345 @Override
346 public void run() throws Exception {
347 responsePublisherWasCancelled.set(true);
348 }
349 })
350 .take(3)
351 .toList()
352 .blockingGet();
353 Assert.assertEquals(3, outputBuffers.size());
354 Assert.assertTrue("The response subscription should have been cancelled", responsePublisherWasCancelled.get());
355 try {
356 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
357 Assert.fail("Expected exception");
358 } catch (final ExecutionException | CancellationException ex) {
359 Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
360 Assert.assertTrue(requestPublisherWasCancelled.get());
361 Assert.assertNull(requestStreamError.get());
362 }
363 }
364
365 private InetSocketAddress startClientAndServer() throws InterruptedException, ExecutionException {
366 server.start();
367 final ListenerEndpoint listener = server.listen(new InetSocketAddress(0)).get();
368 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
369 requester.start();
370 return address;
371 }
372 }