1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.LinkedList;
35 import java.util.Queue;
36 import java.util.concurrent.Future;
37
38 import org.apache.hc.core5.function.Supplier;
39 import org.apache.hc.core5.http.ContentType;
40 import org.apache.hc.core5.http.EntityDetails;
41 import org.apache.hc.core5.http.HeaderElements;
42 import org.apache.hc.core5.http.HttpException;
43 import org.apache.hc.core5.http.HttpHeaders;
44 import org.apache.hc.core5.http.HttpHost;
45 import org.apache.hc.core5.http.HttpRequest;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.HttpStatus;
48 import org.apache.hc.core5.http.Message;
49 import org.apache.hc.core5.http.Method;
50 import org.apache.hc.core5.http.URIScheme;
51 import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
52 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
53 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
54 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
55 import org.apache.hc.core5.http.impl.bootstrap.StandardFilter;
56 import org.apache.hc.core5.http.message.BasicHttpRequest;
57 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
58 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
59 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
60 import org.apache.hc.core5.http.nio.AsyncFilterChain;
61 import org.apache.hc.core5.http.nio.AsyncFilterHandler;
62 import org.apache.hc.core5.http.nio.AsyncPushProducer;
63 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
64 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
65 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
66 import org.apache.hc.core5.http.nio.ssl.BasicClientTlsStrategy;
67 import org.apache.hc.core5.http.nio.ssl.BasicServerTlsStrategy;
68 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
69 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
70 import org.apache.hc.core5.http.protocol.HttpContext;
71 import org.apache.hc.core5.http.protocol.UriPatternMatcher;
72 import org.apache.hc.core5.io.CloseMode;
73 import org.apache.hc.core5.reactor.IOReactorConfig;
74 import org.apache.hc.core5.reactor.ListenerEndpoint;
75 import org.apache.hc.core5.testing.SSLTestContexts;
76 import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
77 import org.apache.hc.core5.util.Timeout;
78 import org.hamcrest.CoreMatchers;
79 import org.junit.Assert;
80 import org.junit.Rule;
81 import org.junit.Test;
82 import org.junit.rules.ExternalResource;
83 import org.junit.runner.RunWith;
84 import org.junit.runners.Parameterized;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88 @RunWith(Parameterized.class)
89 public class Http1ServerAndRequesterTest {
90
91 private final Logger log = LoggerFactory.getLogger(getClass());
92
93 @Parameterized.Parameters(name = "{0}")
94 public static Collection<Object[]> protocols() {
95 return Arrays.asList(new Object[][]{
96 { URIScheme.HTTP },
97 { URIScheme.HTTPS }
98 });
99 }
100 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
101
102 private final URIScheme scheme;
103
104 public Http1ServerAndRequesterTest(final URIScheme scheme) {
105 this.scheme = scheme;
106 }
107
108 private HttpAsyncServer server;
109
110 @Rule
111 public ExternalResource serverResource = new ExternalResource() {
112
113 @Override
114 protected void before() throws Throwable {
115 log.debug("Starting up test server");
116 server = AsyncServerBootstrap.bootstrap()
117 .setLookupRegistry(new UriPatternMatcher<Supplier<AsyncServerExchangeHandler>>())
118 .setIOReactorConfig(
119 IOReactorConfig.custom()
120 .setSoTimeout(TIMEOUT)
121 .build())
122 .register("*", new Supplier<AsyncServerExchangeHandler>() {
123
124 @Override
125 public AsyncServerExchangeHandler get() {
126 return new EchoHandler(2048);
127 }
128
129 })
130 .addFilterBefore(StandardFilter.MAIN_HANDLER.name(), "no-keepalive", new AsyncFilterHandler() {
131
132 @Override
133 public AsyncDataConsumer handle(
134 final HttpRequest request,
135 final EntityDetails entityDetails,
136 final HttpContext context,
137 final AsyncFilterChain.ResponseTrigger responseTrigger,
138 final AsyncFilterChain chain) throws HttpException, IOException {
139 return chain.proceed(request, entityDetails, context, new AsyncFilterChain.ResponseTrigger() {
140
141 @Override
142 public void sendInformation(
143 final HttpResponse response) throws HttpException, IOException {
144 responseTrigger.sendInformation(response);
145 }
146
147 @Override
148 public void submitResponse(
149 final HttpResponse response,
150 final AsyncEntityProducer entityProducer) throws HttpException, IOException {
151 if (request.getPath().startsWith("/no-keep-alive")) {
152 response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
153 }
154 responseTrigger.submitResponse(response, entityProducer);
155 }
156
157 @Override
158 public void pushPromise(
159 final HttpRequest promise,
160 final AsyncPushProducer responseProducer) throws HttpException, IOException {
161 responseTrigger.pushPromise(promise, responseProducer);
162 }
163
164 });
165 }
166 })
167 .setTlsStrategy(scheme == URIScheme.HTTPS ? new BasicServerTlsStrategy(
168 SSLTestContexts.createServerSSLContext(),
169 SecureAllPortsStrategy.INSTANCE) : null)
170 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
171 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
172 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
173 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
174 .create();
175 }
176
177 @Override
178 protected void after() {
179 log.debug("Shutting down test server");
180 if (server != null) {
181 server.close(CloseMode.GRACEFUL);
182 }
183 }
184
185 };
186
187 private HttpAsyncRequester requester;
188
189 @Rule
190 public ExternalResource clientResource = new ExternalResource() {
191
192 @Override
193 protected void before() throws Throwable {
194 log.debug("Starting up test client");
195 requester = AsyncRequesterBootstrap.bootstrap()
196 .setIOReactorConfig(IOReactorConfig.custom()
197 .setSoTimeout(TIMEOUT)
198 .build())
199 .setTlsStrategy(new BasicClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
200 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
201 .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
202 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
203 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
204 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
205 .create();
206 }
207
208 @Override
209 protected void after() {
210 log.debug("Shutting down test client");
211 if (requester != null) {
212 requester.close(CloseMode.GRACEFUL);
213 }
214 }
215
216 };
217
218 @Test
219 public void testSequentialRequests() throws Exception {
220 server.start();
221 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
222 final ListenerEndpoint listener = future.get();
223 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
224 requester.start();
225
226 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
227 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
228 new BasicRequestProducer(Method.POST, target, "/stuff",
229 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
230 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
231 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
232 Assert.assertThat(message1, CoreMatchers.notNullValue());
233 final HttpResponse response1 = message1.getHead();
234 Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
235 final String body1 = message1.getBody();
236 Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
237
238 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
239 new BasicRequestProducer(Method.POST, target, "/other-stuff",
240 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
241 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
242 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
243 Assert.assertThat(message2, CoreMatchers.notNullValue());
244 final HttpResponse response2 = message2.getHead();
245 Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
246 final String body2 = message2.getBody();
247 Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
248
249 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
250 new BasicRequestProducer(Method.POST, target, "/more-stuff",
251 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
252 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
253 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
254 Assert.assertThat(message3, CoreMatchers.notNullValue());
255 final HttpResponse response3 = message3.getHead();
256 Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
257 final String body3 = message3.getBody();
258 Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
259 }
260
261 @Test
262 public void testSequentialRequestsNonPersistentConnection() throws Exception {
263 server.start();
264 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
265 final ListenerEndpoint listener = future.get();
266 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
267 requester.start();
268
269 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
270 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
271 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/stuff",
272 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
273 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
274 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
275 Assert.assertThat(message1, CoreMatchers.notNullValue());
276 final HttpResponse response1 = message1.getHead();
277 Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
278 final String body1 = message1.getBody();
279 Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
280
281 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
282 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/other-stuff",
283 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
284 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
285 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
286 Assert.assertThat(message2, CoreMatchers.notNullValue());
287 final HttpResponse response2 = message2.getHead();
288 Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
289 final String body2 = message2.getBody();
290 Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
291
292 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
293 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/more-stuff",
294 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
295 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
296 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
297 Assert.assertThat(message3, CoreMatchers.notNullValue());
298 final HttpResponse response3 = message3.getHead();
299 Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
300 final String body3 = message3.getBody();
301 Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
302 }
303
304 @Test
305 public void testSequentialRequestsSameEndpoint() throws Exception {
306 server.start();
307 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
308 final ListenerEndpoint listener = future.get();
309 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
310 requester.start();
311
312 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
313 final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
314 final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
315 try {
316
317 final Future<Message<HttpResponse, String>> resultFuture1 = endpoint.execute(
318 new BasicRequestProducer(Method.POST, target, "/stuff",
319 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
320 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
321 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
322 Assert.assertThat(message1, CoreMatchers.notNullValue());
323 final HttpResponse response1 = message1.getHead();
324 Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
325 final String body1 = message1.getBody();
326 Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
327
328 final Future<Message<HttpResponse, String>> resultFuture2 = endpoint.execute(
329 new BasicRequestProducer(Method.POST, target, "/other-stuff",
330 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
331 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
332 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
333 Assert.assertThat(message2, CoreMatchers.notNullValue());
334 final HttpResponse response2 = message2.getHead();
335 Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
336 final String body2 = message2.getBody();
337 Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
338
339 final Future<Message<HttpResponse, String>> resultFuture3 = endpoint.execute(
340 new BasicRequestProducer(Method.POST, target, "/more-stuff",
341 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
342 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
343 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
344 Assert.assertThat(message3, CoreMatchers.notNullValue());
345 final HttpResponse response3 = message3.getHead();
346 Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
347 final String body3 = message3.getBody();
348 Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
349
350 } finally {
351 endpoint.releaseAndReuse();
352 }
353 }
354
355 @Test
356 public void testPipelinedRequests() throws Exception {
357 server.start();
358 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
359 final ListenerEndpoint listener = future.get();
360 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
361 requester.start();
362
363 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
364 final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
365 final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
366 try {
367
368 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
369
370 queue.add(endpoint.execute(
371 new BasicRequestProducer(Method.POST, target, "/stuff",
372 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
373 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
374 queue.add(endpoint.execute(
375 new BasicRequestProducer(Method.POST, target, "/other-stuff",
376 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
377 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
378 queue.add(endpoint.execute(
379 new BasicRequestProducer(Method.POST, target, "/more-stuff",
380 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
381 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
382
383 while (!queue.isEmpty()) {
384 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
385 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
386 Assert.assertThat(message, CoreMatchers.notNullValue());
387 final HttpResponse response = message.getHead();
388 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
389 final String body = message.getBody();
390 Assert.assertThat(body, CoreMatchers.containsString("stuff"));
391 }
392
393 } finally {
394 endpoint.releaseAndReuse();
395 }
396 }
397
398 @Test
399 public void testNonPersistentHeads() throws Exception {
400 server.start();
401 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
402 final ListenerEndpoint listener = future.get();
403 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
404 requester.start();
405
406 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
407 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
408
409 for (int i = 0; i < 20; i++) {
410 final HttpRequest head = new BasicHttpRequest(Method.HEAD, target, "/no-keep-alive/stuff?p=" + i);
411 queue.add(requester.execute(
412 new BasicRequestProducer(head, null),
413 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
414 }
415
416 while (!queue.isEmpty()) {
417 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
418 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
419 Assert.assertThat(message, CoreMatchers.notNullValue());
420 final HttpResponse response = message.getHead();
421 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
422 Assert.assertThat(message.getBody(), CoreMatchers.nullValue());
423 }
424 }
425
426 }