1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.testing.reactive;
28
29 import static java.lang.String.format;
30 import static org.hamcrest.MatcherAssert.assertThat;
31
32 import java.io.ByteArrayOutputStream;
33 import java.io.IOException;
34 import java.net.InetSocketAddress;
35 import java.net.SocketTimeoutException;
36 import java.net.URI;
37 import java.nio.ByteBuffer;
38 import java.nio.channels.Channels;
39 import java.nio.channels.WritableByteChannel;
40 import java.util.List;
41 import java.util.Random;
42 import java.util.concurrent.CancellationException;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicReference;
47
48 import io.reactivex.rxjava3.core.Flowable;
49 import io.reactivex.rxjava3.core.Observable;
50 import org.apache.hc.core5.http.HttpResponse;
51 import org.apache.hc.core5.http.HttpStreamResetException;
52 import org.apache.hc.core5.http.Message;
53 import org.apache.hc.core5.http.Method;
54 import org.apache.hc.core5.http.URIScheme;
55 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
56 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
57 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
58 import org.apache.hc.core5.http2.HttpVersionPolicy;
59 import org.apache.hc.core5.reactive.ReactiveEntityProducer;
60 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
61 import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
62 import org.apache.hc.core5.reactor.IOReactorConfig;
63 import org.apache.hc.core5.reactor.ListenerEndpoint;
64 import org.apache.hc.core5.testing.nio.extension.H2AsyncRequesterResource;
65 import org.apache.hc.core5.testing.nio.extension.H2AsyncServerResource;
66 import org.apache.hc.core5.testing.reactive.Reactive3TestUtils.StreamDescription;
67 import org.apache.hc.core5.util.TextUtils;
68 import org.apache.hc.core5.util.Timeout;
69 import org.hamcrest.CoreMatchers;
70 import org.junit.jupiter.api.Assertions;
71 import org.junit.jupiter.api.Test;
72 import org.junit.jupiter.api.extension.RegisterExtension;
73 import org.reactivestreams.Publisher;
74
75 public abstract class ReactiveClientTest {
76
77 private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(30);
78 private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(60);
79
80 private static final Random RANDOM = new Random();
81
82 private final HttpVersionPolicy versionPolicy;
83 @RegisterExtension
84 private final H2AsyncServerResource serverResource;
85 @RegisterExtension
86 private final H2AsyncRequesterResource clientResource;
87
88 public ReactiveClientTest(final HttpVersionPolicy httpVersionPolicy) {
89 this.versionPolicy = httpVersionPolicy;
90 this.serverResource = new H2AsyncServerResource(bootstrap -> bootstrap
91 .setVersionPolicy(versionPolicy)
92 .setIOReactorConfig(
93 IOReactorConfig.custom()
94 .setSoTimeout(SOCKET_TIMEOUT)
95 .build())
96 .register("*", () -> new ReactiveServerExchangeHandler(new ReactiveEchoProcessor()))
97 );
98 this.clientResource = new H2AsyncRequesterResource(bootstrap -> bootstrap
99 .setVersionPolicy(versionPolicy)
100 .setIOReactorConfig(IOReactorConfig.custom()
101 .setSoTimeout(SOCKET_TIMEOUT)
102 .build())
103 );
104 }
105
106 @Test
107 public void testSimpleRequest() throws Exception {
108 final InetSocketAddress address = startServer();
109 final HttpAsyncRequester requester = clientResource.start();
110 final byte[] input = new byte[1024];
111 RANDOM.nextBytes(input);
112 final Publisher<ByteBuffer> publisher = Flowable.just(ByteBuffer.wrap(input));
113 final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, input.length, null, null);
114
115 final BasicRequestProducer request = getRequestProducer(address, producer);
116
117 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
118 requester.execute(request, consumer, SOCKET_TIMEOUT, null);
119
120 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
121 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
122
123 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
124 final WritableByteChannel writableByteChannel = Channels.newChannel(byteArrayOutputStream);
125 for (final ByteBuffer byteBuffer : Observable.fromPublisher(response.getBody()).toList().blockingGet()) {
126 writableByteChannel.write(byteBuffer);
127 }
128 writableByteChannel.close();
129 final byte[] output = byteArrayOutputStream.toByteArray();
130 Assertions.assertArrayEquals(input, output);
131 }
132
133 private BasicRequestProducer getRequestProducer(final InetSocketAddress address, final ReactiveEntityProducer producer) {
134 return new BasicRequestProducer(Method.POST, URI.create("http://localhost:" + address.getPort()), producer);
135 }
136
137 @Test
138 public void testLongRunningRequest() throws Exception {
139 final InetSocketAddress address = startServer();
140 final HttpAsyncRequester requester = clientResource.start();
141 final long expectedLength = 6_554_200L;
142 final AtomicReference<String> expectedHash = new AtomicReference<>();
143 final Flowable<ByteBuffer> stream = Reactive3TestUtils.produceStream(expectedLength, expectedHash);
144 final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
145 final BasicRequestProducer request = getRequestProducer(address, producer);
146
147 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
148 requester.execute(request, consumer, SOCKET_TIMEOUT, null);
149 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
150 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
151 final StreamDescription desc = Reactive3TestUtils.consumeStream(response.getBody()).blockingGet();
152
153 Assertions.assertEquals(expectedLength, desc.length);
154 Assertions.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
155 }
156
157 @Test
158 public void testManySmallBuffers() throws Exception {
159
160
161
162
163 final InetSocketAddress address = startServer();
164 final HttpAsyncRequester requester = clientResource.start();
165 for (int i = 0; i < 10; i++) {
166 final long expectedLength = 1_024_000;
167 final int maximumBlockSize = 1024;
168 final AtomicReference<String> expectedHash = new AtomicReference<>();
169 final Publisher<ByteBuffer> stream = Reactive3TestUtils.produceStream(expectedLength, maximumBlockSize, expectedHash);
170 final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
171 final BasicRequestProducer request = getRequestProducer(address, producer);
172
173 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
174 requester.execute(request, consumer, SOCKET_TIMEOUT, null);
175 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
176 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
177 final StreamDescription desc = Reactive3TestUtils.consumeStream(response.getBody()).blockingGet();
178
179 Assertions.assertEquals(expectedLength, desc.length);
180 Assertions.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
181 }
182 }
183
184 @Test
185 public void testRequestError() throws Exception {
186 final InetSocketAddress address = startServer();
187 final HttpAsyncRequester requester = clientResource.start();
188 final RuntimeException exceptionThrown = new RuntimeException("Test");
189 final Publisher<ByteBuffer> publisher = Flowable.error(exceptionThrown);
190 final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, 100, null, null);
191
192 final BasicRequestProducer request = getRequestProducer(address, producer);
193
194 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
195
196 final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
197
198 final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
199 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
200 Assertions.assertTrue(exception.getCause() instanceof HttpStreamResetException);
201 Assertions.assertSame(exceptionThrown, exception.getCause().getCause());
202 }
203
204 @Test
205 public void testRequestTimeout() throws Exception {
206 final InetSocketAddress address = startServer();
207 final HttpAsyncRequester requester = clientResource.start();
208 final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
209 final Publisher<ByteBuffer> publisher = Flowable.<ByteBuffer>never()
210 .doOnCancel(() -> requestPublisherWasCancelled.set(true));
211 final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
212 final BasicRequestProducer request = getRequestProducer(address, producer);
213
214 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
215 final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
216
217 final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
218 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
219 Assertions.assertTrue(requestPublisherWasCancelled.get());
220 final Throwable cause = exception.getCause();
221 if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_1) {
222 Assertions.assertTrue(cause instanceof SocketTimeoutException, "Expected SocketTimeoutException, but got " + cause.getClass().getName());
223 } else if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_2) {
224 Assertions.assertTrue(cause instanceof HttpStreamResetException, format("Expected RST_STREAM, but %s was thrown", cause.getClass().getName()));
225 } else {
226 Assertions.fail("Unknown HttpVersionPolicy: " + versionPolicy);
227 }
228 }
229
230 @Test
231 public void testResponseCancellation() throws Exception {
232 final InetSocketAddress address = startServer();
233 final HttpAsyncRequester requester = clientResource.start();
234 final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
235 final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
236 final Publisher<ByteBuffer> stream = Reactive3TestUtils.produceStream(Long.MAX_VALUE, 1024, null)
237 .doOnCancel(() -> requestPublisherWasCancelled.set(true))
238 .doOnError(requestStreamError::set);
239 final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
240 final BasicRequestProducer request = getRequestProducer(address, producer);
241
242 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
243 final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
244 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
245 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
246
247 final AtomicBoolean responsePublisherWasCancelled = new AtomicBoolean(false);
248 final List<ByteBuffer> outputBuffers = Flowable.fromPublisher(response.getBody())
249 .doOnCancel(() -> responsePublisherWasCancelled.set(true))
250 .take(3)
251 .toList()
252 .blockingGet();
253 Assertions.assertEquals(3, outputBuffers.size());
254 Assertions.assertTrue(responsePublisherWasCancelled.get(), "The response subscription should have been cancelled");
255 final Exception exception = Assertions.assertThrows(Exception.class, () ->
256 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
257 assertThat(exception, CoreMatchers.anyOf(
258 CoreMatchers.instanceOf(CancellationException.class),
259 CoreMatchers.instanceOf(ExecutionException.class)));
260 Assertions.assertTrue(exception.getCause() instanceof HttpStreamResetException);
261 Assertions.assertTrue(requestPublisherWasCancelled.get());
262 Assertions.assertNull(requestStreamError.get());
263 }
264
265 private InetSocketAddress startServer() throws IOException, InterruptedException, ExecutionException {
266 final HttpAsyncServer server = serverResource.start();
267 final ListenerEndpoint listener = server.listen(new InetSocketAddress(0), URIScheme.HTTP).get();
268 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
269 return address;
270 }
271 }