1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import java.nio.ByteBuffer;
30 import java.util.Arrays;
31 import java.util.Collection;
32
33 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
34 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
35 import org.apache.hc.client5.http.config.RequestConfig;
36 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
37 import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
38 import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
39 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
40 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
41 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
42 import org.apache.hc.client5.testing.SSLTestContexts;
43 import org.apache.hc.core5.http.HeaderElements;
44 import org.apache.hc.core5.http.HttpHeaders;
45 import org.apache.hc.core5.http.HttpHost;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.Message;
48 import org.apache.hc.core5.http.URIScheme;
49 import org.apache.hc.core5.http.config.Http1Config;
50 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
51 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
52 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
53 import org.hamcrest.CoreMatchers;
54 import org.junit.Assert;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.rules.ExternalResource;
58 import org.junit.runner.RunWith;
59 import org.junit.runners.Parameterized;
60 import org.reactivestreams.Publisher;
61
62 @RunWith(Parameterized.class)
63 public class TestHttp1Reactive extends AbstractHttpReactiveFundamentalsTest<CloseableHttpAsyncClient> {
64
65 @Parameterized.Parameters(name = "HTTP/1.1 {0}")
66 public static Collection<Object[]> protocols() {
67 return Arrays.asList(new Object[][]{
68 { URIScheme.HTTP },
69 { URIScheme.HTTPS },
70 });
71 }
72
73 protected HttpAsyncClientBuilder clientBuilder;
74 protected PoolingAsyncClientConnectionManager connManager;
75
76 @Rule
77 public ExternalResource connManagerResource = new ExternalResource() {
78
79 @Override
80 protected void before() throws Throwable {
81 connManager = PoolingAsyncClientConnectionManagerBuilder.create()
82 .setTlsStrategy(new DefaultClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
83 .build();
84 }
85
86 @Override
87 protected void after() {
88 if (connManager != null) {
89 connManager.close();
90 connManager = null;
91 }
92 }
93
94 };
95
96 @Rule
97 public ExternalResource clientResource = new ExternalResource() {
98
99 @Override
100 protected void before() throws Throwable {
101 clientBuilder = HttpAsyncClientBuilder.create()
102 .setDefaultRequestConfig(RequestConfig.custom()
103 .setConnectionRequestTimeout(TIMEOUT)
104 .setConnectTimeout(TIMEOUT)
105 .build())
106 .setConnectionManager(connManager);
107 }
108
109 };
110
111 public TestHttp1Reactive(final URIScheme scheme) {
112 super(scheme);
113 }
114
115 @Override
116 protected CloseableHttpAsyncClient createClient() {
117 return clientBuilder.build();
118 }
119
120 @Override
121 public HttpHost start() throws Exception {
122 return super.start(null, Http1Config.DEFAULT);
123 }
124
125 @Test(timeout = 60_000)
126 public void testSequentialGetRequestsCloseConnection() throws Exception {
127 final HttpHost target = start();
128 for (int i = 0; i < 3; i++) {
129 final SimpleHttpRequest get = SimpleRequestBuilder.get()
130 .setHttpHost(target)
131 .setPath("/random/2048")
132 .build();
133 get.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
134 final AsyncRequestProducer request = AsyncRequestBuilder.get(target + "/random/2048").build();
135 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
136
137 httpclient.execute(request, consumer, null);
138
139 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
140 Assert.assertThat(response, CoreMatchers.notNullValue());
141 Assert.assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200));
142 final String body = publisherToString(response.getBody());
143 Assert.assertThat(body, CoreMatchers.notNullValue());
144 Assert.assertThat(body.length(), CoreMatchers.equalTo(2048));
145 }
146 }
147
148 @Test(timeout = 60_000)
149 public void testConcurrentPostsOverMultipleConnections() throws Exception {
150 connManager.setDefaultMaxPerRoute(20);
151 connManager.setMaxTotal(100);
152 super.testConcurrentPostRequests();
153 }
154
155 @Test(timeout = 60_000)
156 public void testConcurrentPostsOverSingleConnection() throws Exception {
157 connManager.setDefaultMaxPerRoute(1);
158 connManager.setMaxTotal(100);
159 super.testConcurrentPostRequests();
160 }
161
162 @Test(timeout = 60_000)
163 public void testSharedPool() throws Exception {
164 final HttpHost target = start();
165 final AsyncRequestProducer request1 = AsyncRequestBuilder.get(target + "/random/2048").build();
166 final ReactiveResponseConsumer consumer1 = new ReactiveResponseConsumer();
167
168 httpclient.execute(request1, consumer1, null);
169
170 final Message<HttpResponse, Publisher<ByteBuffer>> response1 = consumer1.getResponseFuture().get();
171 Assert.assertThat(response1, CoreMatchers.notNullValue());
172 Assert.assertThat(response1.getHead(), CoreMatchers.notNullValue());
173 Assert.assertThat(response1.getHead().getCode(), CoreMatchers.equalTo(200));
174 final String body1 = publisherToString(response1.getBody());
175 Assert.assertThat(body1, CoreMatchers.notNullValue());
176 Assert.assertThat(body1.length(), CoreMatchers.equalTo(2048));
177
178
179 try (final CloseableHttpAsyncClient httpclient2 = HttpAsyncClients.custom()
180 .setConnectionManager(connManager)
181 .setConnectionManagerShared(true)
182 .build()) {
183 httpclient2.start();
184 final AsyncRequestProducer request2 = AsyncRequestBuilder.get(target + "/random/2048").build();
185 final ReactiveResponseConsumer consumer2 = new ReactiveResponseConsumer();
186
187 httpclient2.execute(request2, consumer2, null);
188
189 final Message<HttpResponse, Publisher<ByteBuffer>> response2 = consumer2.getResponseFuture().get();
190 Assert.assertThat(response2, CoreMatchers.notNullValue());
191 Assert.assertThat(response2.getHead().getCode(), CoreMatchers.equalTo(200));
192 final String body2 = publisherToString(response2.getBody());
193 Assert.assertThat(body2, CoreMatchers.notNullValue());
194 Assert.assertThat(body2.length(), CoreMatchers.equalTo(2048));
195 }
196
197 final AsyncRequestProducer request3 = AsyncRequestBuilder.get(target + "/random/2048").build();
198 final ReactiveResponseConsumer consumer3 = new ReactiveResponseConsumer();
199
200 httpclient.execute(request3, consumer3, null);
201
202 final Message<HttpResponse, Publisher<ByteBuffer>> response3 = consumer3.getResponseFuture().get();
203 Assert.assertThat(response3, CoreMatchers.notNullValue());
204 Assert.assertThat(response3.getHead().getCode(), CoreMatchers.equalTo(200));
205 final String body3 = publisherToString(response3.getBody());
206 Assert.assertThat(body3, CoreMatchers.notNullValue());
207 Assert.assertThat(body3.length(), CoreMatchers.equalTo(2048));
208 }
209
210 }