1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import static org.hamcrest.MatcherAssert.assertThat;
31
32 import java.io.IOException;
33 import java.net.InetSocketAddress;
34 import java.util.LinkedList;
35 import java.util.Queue;
36 import java.util.concurrent.Future;
37
38 import org.apache.hc.core5.http.ContentType;
39 import org.apache.hc.core5.http.HttpHost;
40 import org.apache.hc.core5.http.HttpRequest;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.HttpStatus;
43 import org.apache.hc.core5.http.Message;
44 import org.apache.hc.core5.http.Method;
45 import org.apache.hc.core5.http.URIScheme;
46 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
47 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
48 import org.apache.hc.core5.http.message.BasicHttpRequest;
49 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
50 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
51 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
52 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
53 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
54 import org.apache.hc.core5.reactor.ListenerEndpoint;
55 import org.apache.hc.core5.util.Timeout;
56 import org.hamcrest.CoreMatchers;
57 import org.junit.jupiter.api.Test;
58
59 public abstract class HttpCoreTransportTest {
60
61 private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
62
63 final URIScheme scheme;
64
65 HttpCoreTransportTest(final URIScheme scheme) {
66 this.scheme = scheme;
67 }
68
69 abstract HttpAsyncServer serverStart() throws IOException;
70
71 abstract HttpAsyncRequester clientStart() throws IOException;
72
73 @Test
74 public void testSequentialRequests() throws Exception {
75 final HttpAsyncServer server = serverStart();
76 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
77 final ListenerEndpoint listener = future.get();
78 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
79 final HttpAsyncRequester requester = clientStart();
80
81 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
82 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
83 new BasicRequestProducer(Method.POST, target, "/stuff",
84 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
85 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
86 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
87 assertThat(message1, CoreMatchers.notNullValue());
88 final HttpResponse response1 = message1.getHead();
89 assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
90 final String body1 = message1.getBody();
91 assertThat(body1, CoreMatchers.equalTo("some stuff"));
92
93 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
94 new BasicRequestProducer(Method.POST, target, "/other-stuff",
95 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
96 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
97 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
98 assertThat(message2, CoreMatchers.notNullValue());
99 final HttpResponse response2 = message2.getHead();
100 assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
101 final String body2 = message2.getBody();
102 assertThat(body2, CoreMatchers.equalTo("some other stuff"));
103
104 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
105 new BasicRequestProducer(Method.POST, target, "/more-stuff",
106 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
107 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
108 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
109 assertThat(message3, CoreMatchers.notNullValue());
110 final HttpResponse response3 = message3.getHead();
111 assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
112 final String body3 = message3.getBody();
113 assertThat(body3, CoreMatchers.equalTo("some more stuff"));
114 }
115
116 @Test
117 public void testSequentialRequestsNonPersistentConnection() throws Exception {
118 final HttpAsyncServer server = serverStart();
119 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
120 final ListenerEndpoint listener = future.get();
121 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
122 final HttpAsyncRequester requester = clientStart();
123
124 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
125 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
126 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/stuff",
127 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
128 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
129 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
130 assertThat(message1, CoreMatchers.notNullValue());
131 final HttpResponse response1 = message1.getHead();
132 assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
133 final String body1 = message1.getBody();
134 assertThat(body1, CoreMatchers.equalTo("some stuff"));
135
136 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
137 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/other-stuff",
138 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
139 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
140 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
141 assertThat(message2, CoreMatchers.notNullValue());
142 final HttpResponse response2 = message2.getHead();
143 assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
144 final String body2 = message2.getBody();
145 assertThat(body2, CoreMatchers.equalTo("some other stuff"));
146
147 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
148 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/more-stuff",
149 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
150 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
151 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
152 assertThat(message3, CoreMatchers.notNullValue());
153 final HttpResponse response3 = message3.getHead();
154 assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
155 final String body3 = message3.getBody();
156 assertThat(body3, CoreMatchers.equalTo("some more stuff"));
157 }
158
159 @Test
160 public void testSequentialRequestsSameEndpoint() throws Exception {
161 final HttpAsyncServer server = serverStart();
162 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
163 final ListenerEndpoint listener = future.get();
164 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
165 final HttpAsyncRequester requester = clientStart();
166
167 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
168 final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
169 final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
170 try {
171
172 final Future<Message<HttpResponse, String>> resultFuture1 = endpoint.execute(
173 new BasicRequestProducer(Method.POST, target, "/stuff",
174 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
175 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
176 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
177 assertThat(message1, CoreMatchers.notNullValue());
178 final HttpResponse response1 = message1.getHead();
179 assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
180 final String body1 = message1.getBody();
181 assertThat(body1, CoreMatchers.equalTo("some stuff"));
182
183 final Future<Message<HttpResponse, String>> resultFuture2 = endpoint.execute(
184 new BasicRequestProducer(Method.POST, target, "/other-stuff",
185 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
186 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
187 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
188 assertThat(message2, CoreMatchers.notNullValue());
189 final HttpResponse response2 = message2.getHead();
190 assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
191 final String body2 = message2.getBody();
192 assertThat(body2, CoreMatchers.equalTo("some other stuff"));
193
194 final Future<Message<HttpResponse, String>> resultFuture3 = endpoint.execute(
195 new BasicRequestProducer(Method.POST, target, "/more-stuff",
196 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
197 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
198 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
199 assertThat(message3, CoreMatchers.notNullValue());
200 final HttpResponse response3 = message3.getHead();
201 assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
202 final String body3 = message3.getBody();
203 assertThat(body3, CoreMatchers.equalTo("some more stuff"));
204
205 } finally {
206 endpoint.releaseAndReuse();
207 }
208 }
209
210 @Test
211 public void testPipelinedRequests() throws Exception {
212 final HttpAsyncServer server = serverStart();
213 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
214 final ListenerEndpoint listener = future.get();
215 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
216 final HttpAsyncRequester requester = clientStart();
217
218 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
219 final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
220 final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
221 try {
222
223 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
224
225 queue.add(endpoint.execute(
226 new BasicRequestProducer(Method.POST, target, "/stuff",
227 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
228 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
229 queue.add(endpoint.execute(
230 new BasicRequestProducer(Method.POST, target, "/other-stuff",
231 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
232 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
233 queue.add(endpoint.execute(
234 new BasicRequestProducer(Method.POST, target, "/more-stuff",
235 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
236 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
237
238 while (!queue.isEmpty()) {
239 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
240 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
241 assertThat(message, CoreMatchers.notNullValue());
242 final HttpResponse response = message.getHead();
243 assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
244 final String body = message.getBody();
245 assertThat(body, CoreMatchers.containsString("stuff"));
246 }
247
248 } finally {
249 endpoint.releaseAndReuse();
250 }
251 }
252
253 @Test
254 public void testNonPersistentHeads() throws Exception {
255 final HttpAsyncServer server = serverStart();
256 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
257 final ListenerEndpoint listener = future.get();
258 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
259 final HttpAsyncRequester requester = clientStart();
260
261 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
262 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
263
264 for (int i = 0; i < 20; i++) {
265 final HttpRequest head = new BasicHttpRequest(Method.HEAD, target, "/no-keep-alive/stuff?p=" + i);
266 queue.add(requester.execute(
267 new BasicRequestProducer(head, null),
268 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
269 }
270
271 while (!queue.isEmpty()) {
272 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
273 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
274 assertThat(message, CoreMatchers.notNullValue());
275 final HttpResponse response = message.getHead();
276 assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
277 assertThat(message.getBody(), CoreMatchers.nullValue());
278 }
279 }
280
281 }