View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.testing.reactive;
28  
29  import static java.lang.String.format;
30  
31  import java.io.ByteArrayOutputStream;
32  import java.net.InetSocketAddress;
33  import java.net.SocketTimeoutException;
34  import java.net.URI;
35  import java.nio.ByteBuffer;
36  import java.nio.channels.Channels;
37  import java.nio.channels.WritableByteChannel;
38  import java.util.Arrays;
39  import java.util.Collection;
40  import java.util.List;
41  import java.util.Random;
42  import java.util.concurrent.CancellationException;
43  import java.util.concurrent.ExecutionException;
44  import java.util.concurrent.Future;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicReference;
47  
48  import org.apache.hc.core5.function.Supplier;
49  import org.apache.hc.core5.http.HttpResponse;
50  import org.apache.hc.core5.http.HttpStreamResetException;
51  import org.apache.hc.core5.http.Message;
52  import org.apache.hc.core5.http.Method;
53  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
54  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
55  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
56  import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
57  import org.apache.hc.core5.http2.HttpVersionPolicy;
58  import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
59  import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
60  import org.apache.hc.core5.io.CloseMode;
61  import org.apache.hc.core5.reactive.ReactiveEntityProducer;
62  import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
63  import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
64  import org.apache.hc.core5.reactor.IOReactorConfig;
65  import org.apache.hc.core5.reactor.ListenerEndpoint;
66  import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
67  import org.apache.hc.core5.testing.nio.LoggingExceptionCallback;
68  import org.apache.hc.core5.testing.nio.LoggingH2StreamListener;
69  import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener;
70  import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator;
71  import org.apache.hc.core5.testing.nio.LoggingIOSessionListener;
72  import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription;
73  import org.apache.hc.core5.util.TextUtils;
74  import org.apache.hc.core5.util.Timeout;
75  import org.junit.Assert;
76  import org.junit.Rule;
77  import org.junit.Test;
78  import org.junit.rules.ExternalResource;
79  import org.junit.runner.RunWith;
80  import org.junit.runners.Parameterized;
81  import org.reactivestreams.Publisher;
82  import org.slf4j.Logger;
83  import org.slf4j.LoggerFactory;
84  
85  import io.reactivex.Flowable;
86  import io.reactivex.Observable;
87  import io.reactivex.functions.Action;
88  import io.reactivex.functions.Consumer;
89  
90  @RunWith(Parameterized.class)
91  public class ReactiveClientTest {
92  
93      private final Logger log = LoggerFactory.getLogger(getClass());
94  
95      @Parameterized.Parameters(name = "{0}")
96      public static Collection<Object[]> protocols() {
97          return Arrays.asList(new Object[][]{
98              { HttpVersionPolicy.FORCE_HTTP_1 },
99              { HttpVersionPolicy.FORCE_HTTP_2 }
100         });
101     }
102     private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(30);
103     private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(60);
104 
105     private static final Random RANDOM = new Random();
106 
107     private final HttpVersionPolicy versionPolicy;
108 
109     public ReactiveClientTest(final HttpVersionPolicy httpVersionPolicy) {
110         this.versionPolicy = httpVersionPolicy;
111     }
112 
113     private HttpAsyncServer server;
114 
115     @Rule
116     public ExternalResource serverResource = new ExternalResource() {
117 
118         @Override
119         protected void before() throws Throwable {
120             log.debug("Starting up test server");
121             server = H2ServerBootstrap.bootstrap()
122                 .setVersionPolicy(versionPolicy)
123                 .setIOReactorConfig(
124                     IOReactorConfig.custom()
125                         .setSoTimeout(SOCKET_TIMEOUT)
126                         .build())
127                 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
128                 .setStreamListener(LoggingH2StreamListener.INSTANCE)
129                 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
130                 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
131                 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
132                 .register("*", new Supplier<AsyncServerExchangeHandler>() {
133 
134                     @Override
135                     public AsyncServerExchangeHandler get() {
136                         return new ReactiveServerExchangeHandler(new ReactiveEchoProcessor());
137                     }
138 
139                 })
140                 .create();
141         }
142 
143         @Override
144         protected void after() {
145             log.debug("Shutting down test server");
146             if (server != null) {
147                 server.close(CloseMode.GRACEFUL);
148             }
149         }
150 
151     };
152 
153     private HttpAsyncRequester requester;
154 
155     @Rule
156     public ExternalResource clientResource = new ExternalResource() {
157 
158         @Override
159         protected void before() throws Throwable {
160             log.debug("Starting up test client");
161             requester = H2RequesterBootstrap.bootstrap()
162                 .setVersionPolicy(versionPolicy)
163                 .setIOReactorConfig(IOReactorConfig.custom()
164                     .setSoTimeout(SOCKET_TIMEOUT)
165                     .build())
166                 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
167                 .setStreamListener(LoggingH2StreamListener.INSTANCE)
168                 .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
169                 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
170                 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
171                 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
172                 .create();
173         }
174 
175         @Override
176         protected void after() {
177             log.debug("Shutting down test client");
178             if (requester != null) {
179                 requester.close(CloseMode.GRACEFUL);
180             }
181         }
182 
183     };
184 
185     @Test
186     public void testSimpleRequest() throws Exception {
187         final InetSocketAddress address = startClientAndServer();
188         final byte[] input = new byte[1024];
189         RANDOM.nextBytes(input);
190         final Publisher<ByteBuffer> publisher = Flowable.just(ByteBuffer.wrap(input));
191         final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, input.length, null, null);
192 
193         final BasicRequestProducer request = getRequestProducer(address, producer);
194 
195         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
196         requester.execute(request, consumer, SOCKET_TIMEOUT, null);
197 
198         final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
199                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
200 
201         final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
202         final WritableByteChannel writableByteChannel = Channels.newChannel(byteArrayOutputStream);
203         for (final ByteBuffer byteBuffer : Observable.fromPublisher(response.getBody()).toList().blockingGet()) {
204             writableByteChannel.write(byteBuffer);
205         }
206         writableByteChannel.close();
207         final byte[] output = byteArrayOutputStream.toByteArray();
208         Assert.assertArrayEquals(input, output);
209     }
210 
211     private BasicRequestProducer getRequestProducer(final InetSocketAddress address, final ReactiveEntityProducer producer) {
212         return new BasicRequestProducer(Method.POST, URI.create("http://localhost:" + address.getPort()), producer);
213     }
214 
215     @Test
216     public void testLongRunningRequest() throws Exception {
217         final InetSocketAddress address = startClientAndServer();
218         final long expectedLength = 6_554_200L;
219         final AtomicReference<String> expectedHash = new AtomicReference<>(null);
220         final Flowable<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, expectedHash);
221         final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
222         final BasicRequestProducer request = getRequestProducer(address, producer);
223 
224         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
225         requester.execute(request, consumer, SOCKET_TIMEOUT, null);
226         final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
227                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
228         final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
229 
230         Assert.assertEquals(expectedLength, desc.length);
231         Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
232     }
233 
234     @Test
235     public void testManySmallBuffers() throws Exception {
236         // This test is not flaky. If it starts randomly failing, then there is a problem with how
237         // ReactiveDataConsumer signals capacity with its capacity channel. The situations in which
238         // this kind of bug manifests depend on the ordering of several events on different threads
239         // so it's unlikely to consistently occur.
240         final InetSocketAddress address = startClientAndServer();
241         for (int i = 0; i < 10; i++) {
242             final long expectedLength = 1_024_000;
243             final int maximumBlockSize = 1024;
244             final AtomicReference<String> expectedHash = new AtomicReference<>(null);
245             final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, maximumBlockSize, expectedHash);
246             final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
247             final BasicRequestProducer request = getRequestProducer(address, producer);
248 
249             final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
250             requester.execute(request, consumer, SOCKET_TIMEOUT, null);
251             final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
252                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
253             final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
254 
255             Assert.assertEquals(expectedLength, desc.length);
256             Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
257         }
258     }
259 
260     @Test
261     public void testRequestError() throws Exception {
262         final InetSocketAddress address = startClientAndServer();
263         final RuntimeException exceptionThrown = new RuntimeException("Test");
264         final Publisher<ByteBuffer> publisher = Flowable.error(exceptionThrown);
265         final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, 100, null, null);
266 
267         final BasicRequestProducer request = getRequestProducer(address, producer);
268 
269         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
270 
271         final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
272 
273         try {
274             future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
275             Assert.fail("Expected exception");
276         } catch (final ExecutionException ex) {
277             Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
278             Assert.assertSame(exceptionThrown, ex.getCause().getCause());
279         }
280     }
281 
282     @Test
283     public void testRequestTimeout() throws Exception {
284         final InetSocketAddress address = startClientAndServer();
285         final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
286         final Publisher<ByteBuffer> publisher = Flowable.<ByteBuffer>never()
287             .doOnCancel(new Action() {
288                 @Override
289                 public void run() {
290                     requestPublisherWasCancelled.set(true);
291                 }
292             });
293         final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
294         final BasicRequestProducer request = getRequestProducer(address, producer);
295 
296         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
297         final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
298 
299         try {
300             future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
301         } catch (final ExecutionException ex) {
302             Assert.assertTrue(requestPublisherWasCancelled.get());
303             final Throwable cause = ex.getCause();
304             if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_1) {
305                 Assert.assertTrue("Expected SocketTimeoutException, but got " + cause.getClass().getName(),
306                     cause instanceof SocketTimeoutException);
307             } else if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_2) {
308                 Assert.assertTrue(format("Expected RST_STREAM, but %s was thrown", cause.getClass().getName()),
309                     cause instanceof HttpStreamResetException);
310             } else {
311                 Assert.fail("Unknown HttpVersionPolicy: " + versionPolicy);
312             }
313         }
314     }
315 
316     @Test
317     public void testResponseCancellation() throws Exception {
318         final InetSocketAddress address = startClientAndServer();
319         final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
320         final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
321         final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(Long.MAX_VALUE, 1024, null)
322             .doOnCancel(new Action() {
323                 @Override
324                 public void run() throws Exception {
325                     requestPublisherWasCancelled.set(true);
326                 }
327             })
328             .doOnError(new Consumer<Throwable>() {
329                 @Override
330                 public void accept(final Throwable throwable) throws Exception {
331                     requestStreamError.set(throwable);
332                 }
333             });
334         final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
335         final BasicRequestProducer request = getRequestProducer(address, producer);
336 
337         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
338         final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
339         final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
340                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
341 
342         final AtomicBoolean responsePublisherWasCancelled = new AtomicBoolean(false);
343         final List<ByteBuffer> outputBuffers = Flowable.fromPublisher(response.getBody())
344             .doOnCancel(new Action() {
345                 @Override
346                 public void run() throws Exception {
347                     responsePublisherWasCancelled.set(true);
348                 }
349             })
350             .take(3)
351             .toList()
352             .blockingGet();
353         Assert.assertEquals(3, outputBuffers.size());
354         Assert.assertTrue("The response subscription should have been cancelled", responsePublisherWasCancelled.get());
355         try {
356             future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
357             Assert.fail("Expected exception");
358         } catch (final ExecutionException | CancellationException ex) {
359             Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
360             Assert.assertTrue(requestPublisherWasCancelled.get());
361             Assert.assertNull(requestStreamError.get());
362         }
363     }
364 
365     private InetSocketAddress startClientAndServer() throws InterruptedException, ExecutionException {
366         server.start();
367         final ListenerEndpoint listener = server.listen(new InetSocketAddress(0)).get();
368         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
369         requester.start();
370         return address;
371     }
372 }