View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.Arrays;
33  import java.util.Collection;
34  import java.util.LinkedList;
35  import java.util.Queue;
36  import java.util.concurrent.Future;
37  
38  import org.apache.hc.core5.function.Supplier;
39  import org.apache.hc.core5.http.ContentType;
40  import org.apache.hc.core5.http.EntityDetails;
41  import org.apache.hc.core5.http.HeaderElements;
42  import org.apache.hc.core5.http.HttpException;
43  import org.apache.hc.core5.http.HttpHeaders;
44  import org.apache.hc.core5.http.HttpHost;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.HttpStatus;
48  import org.apache.hc.core5.http.Message;
49  import org.apache.hc.core5.http.Method;
50  import org.apache.hc.core5.http.URIScheme;
51  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
52  import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
53  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
54  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
55  import org.apache.hc.core5.http.impl.bootstrap.StandardFilter;
56  import org.apache.hc.core5.http.message.BasicHttpRequest;
57  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
58  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
59  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
60  import org.apache.hc.core5.http.nio.AsyncFilterChain;
61  import org.apache.hc.core5.http.nio.AsyncFilterHandler;
62  import org.apache.hc.core5.http.nio.AsyncPushProducer;
63  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
64  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
65  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
66  import org.apache.hc.core5.http.nio.ssl.BasicClientTlsStrategy;
67  import org.apache.hc.core5.http.nio.ssl.BasicServerTlsStrategy;
68  import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
69  import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
70  import org.apache.hc.core5.http.protocol.HttpContext;
71  import org.apache.hc.core5.http.protocol.UriPatternMatcher;
72  import org.apache.hc.core5.io.CloseMode;
73  import org.apache.hc.core5.reactor.IOReactorConfig;
74  import org.apache.hc.core5.reactor.ListenerEndpoint;
75  import org.apache.hc.core5.testing.SSLTestContexts;
76  import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
77  import org.apache.hc.core5.util.Timeout;
78  import org.hamcrest.CoreMatchers;
79  import org.junit.Assert;
80  import org.junit.Rule;
81  import org.junit.Test;
82  import org.junit.rules.ExternalResource;
83  import org.junit.runner.RunWith;
84  import org.junit.runners.Parameterized;
85  import org.slf4j.Logger;
86  import org.slf4j.LoggerFactory;
87  
88  @RunWith(Parameterized.class)
89  public class Http1ServerAndRequesterTest {
90  
91      private final Logger log = LoggerFactory.getLogger(getClass());
92  
93      @Parameterized.Parameters(name = "{0}")
94      public static Collection<Object[]> protocols() {
95          return Arrays.asList(new Object[][]{
96                  { URIScheme.HTTP },
97                  { URIScheme.HTTPS }
98          });
99      }
100     private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
101 
102     private final URIScheme scheme;
103 
104     public Http1ServerAndRequesterTest(final URIScheme scheme) {
105         this.scheme = scheme;
106     }
107 
108     private HttpAsyncServer server;
109 
110     @Rule
111     public ExternalResource serverResource = new ExternalResource() {
112 
113         @Override
114         protected void before() throws Throwable {
115             log.debug("Starting up test server");
116             server = AsyncServerBootstrap.bootstrap()
117                     .setLookupRegistry(new UriPatternMatcher<Supplier<AsyncServerExchangeHandler>>())
118                     .setIOReactorConfig(
119                             IOReactorConfig.custom()
120                                     .setSoTimeout(TIMEOUT)
121                                     .build())
122                     .register("*", new Supplier<AsyncServerExchangeHandler>() {
123 
124                         @Override
125                         public AsyncServerExchangeHandler get() {
126                             return new EchoHandler(2048);
127                         }
128 
129                     })
130                     .addFilterBefore(StandardFilter.MAIN_HANDLER.name(), "no-keepalive", new AsyncFilterHandler() {
131 
132                         @Override
133                         public AsyncDataConsumer handle(
134                                 final HttpRequest request,
135                                 final EntityDetails entityDetails,
136                                 final HttpContext context,
137                                 final AsyncFilterChain.ResponseTrigger responseTrigger,
138                                 final AsyncFilterChain chain) throws HttpException, IOException {
139                             return chain.proceed(request, entityDetails, context, new AsyncFilterChain.ResponseTrigger() {
140 
141                                 @Override
142                                 public void sendInformation(
143                                         final HttpResponse response) throws HttpException, IOException {
144                                     responseTrigger.sendInformation(response);
145                                 }
146 
147                                 @Override
148                                 public void submitResponse(
149                                         final HttpResponse response,
150                                         final AsyncEntityProducer entityProducer) throws HttpException, IOException {
151                                     if (request.getPath().startsWith("/no-keep-alive")) {
152                                         response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
153                                     }
154                                     responseTrigger.submitResponse(response, entityProducer);
155                                 }
156 
157                                 @Override
158                                 public void pushPromise(
159                                         final HttpRequest promise,
160                                         final AsyncPushProducer responseProducer) throws HttpException, IOException {
161                                     responseTrigger.pushPromise(promise, responseProducer);
162                                 }
163 
164                             });
165                         }
166                     })
167                     .setTlsStrategy(scheme == URIScheme.HTTPS  ? new BasicServerTlsStrategy(
168                             SSLTestContexts.createServerSSLContext(),
169                             SecureAllPortsStrategy.INSTANCE) : null)
170                     .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
171                     .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
172                     .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
173                     .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
174                     .create();
175         }
176 
177         @Override
178         protected void after() {
179             log.debug("Shutting down test server");
180             if (server != null) {
181                 server.close(CloseMode.GRACEFUL);
182             }
183         }
184 
185     };
186 
187     private HttpAsyncRequester requester;
188 
189     @Rule
190     public ExternalResource clientResource = new ExternalResource() {
191 
192         @Override
193         protected void before() throws Throwable {
194             log.debug("Starting up test client");
195             requester = AsyncRequesterBootstrap.bootstrap()
196                     .setIOReactorConfig(IOReactorConfig.custom()
197                             .setSoTimeout(TIMEOUT)
198                             .build())
199                     .setTlsStrategy(new BasicClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
200                     .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
201                     .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
202                     .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
203                     .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
204                     .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
205                     .create();
206         }
207 
208         @Override
209         protected void after() {
210             log.debug("Shutting down test client");
211             if (requester != null) {
212                 requester.close(CloseMode.GRACEFUL);
213             }
214         }
215 
216     };
217 
218     @Test
219     public void testSequentialRequests() throws Exception {
220         server.start();
221         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
222         final ListenerEndpoint listener = future.get();
223         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
224         requester.start();
225 
226         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
227         final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
228                 new BasicRequestProducer(Method.POST, target, "/stuff",
229                         new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
230                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
231         final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
232         Assert.assertThat(message1, CoreMatchers.notNullValue());
233         final HttpResponse response1 = message1.getHead();
234         Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
235         final String body1 = message1.getBody();
236         Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
237 
238         final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
239                 new BasicRequestProducer(Method.POST, target, "/other-stuff",
240                         new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
241                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
242         final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
243         Assert.assertThat(message2, CoreMatchers.notNullValue());
244         final HttpResponse response2 = message2.getHead();
245         Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
246         final String body2 = message2.getBody();
247         Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
248 
249         final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
250                 new BasicRequestProducer(Method.POST, target, "/more-stuff",
251                         new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
252                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
253         final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
254         Assert.assertThat(message3, CoreMatchers.notNullValue());
255         final HttpResponse response3 = message3.getHead();
256         Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
257         final String body3 = message3.getBody();
258         Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
259     }
260 
261     @Test
262     public void testSequentialRequestsNonPersistentConnection() throws Exception {
263         server.start();
264         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
265         final ListenerEndpoint listener = future.get();
266         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
267         requester.start();
268 
269         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
270         final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
271                 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/stuff",
272                         new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
273                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
274         final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
275         Assert.assertThat(message1, CoreMatchers.notNullValue());
276         final HttpResponse response1 = message1.getHead();
277         Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
278         final String body1 = message1.getBody();
279         Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
280 
281         final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
282                 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/other-stuff",
283                         new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
284                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
285         final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
286         Assert.assertThat(message2, CoreMatchers.notNullValue());
287         final HttpResponse response2 = message2.getHead();
288         Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
289         final String body2 = message2.getBody();
290         Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
291 
292         final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
293                 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/more-stuff",
294                         new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
295                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
296         final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
297         Assert.assertThat(message3, CoreMatchers.notNullValue());
298         final HttpResponse response3 = message3.getHead();
299         Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
300         final String body3 = message3.getBody();
301         Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
302     }
303 
304     @Test
305     public void testSequentialRequestsSameEndpoint() throws Exception {
306         server.start();
307         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
308         final ListenerEndpoint listener = future.get();
309         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
310         requester.start();
311 
312         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
313         final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
314         final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
315         try {
316 
317             final Future<Message<HttpResponse, String>> resultFuture1 = endpoint.execute(
318                     new BasicRequestProducer(Method.POST, target, "/stuff",
319                             new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
320                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
321             final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
322             Assert.assertThat(message1, CoreMatchers.notNullValue());
323             final HttpResponse response1 = message1.getHead();
324             Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
325             final String body1 = message1.getBody();
326             Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
327 
328             final Future<Message<HttpResponse, String>> resultFuture2 = endpoint.execute(
329                     new BasicRequestProducer(Method.POST, target, "/other-stuff",
330                             new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
331                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
332             final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
333             Assert.assertThat(message2, CoreMatchers.notNullValue());
334             final HttpResponse response2 = message2.getHead();
335             Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
336             final String body2 = message2.getBody();
337             Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
338 
339             final Future<Message<HttpResponse, String>> resultFuture3 = endpoint.execute(
340                     new BasicRequestProducer(Method.POST, target, "/more-stuff",
341                             new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
342                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
343             final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
344             Assert.assertThat(message3, CoreMatchers.notNullValue());
345             final HttpResponse response3 = message3.getHead();
346             Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
347             final String body3 = message3.getBody();
348             Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
349 
350         } finally {
351             endpoint.releaseAndReuse();
352         }
353     }
354 
355     @Test
356     public void testPipelinedRequests() throws Exception {
357         server.start();
358         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
359         final ListenerEndpoint listener = future.get();
360         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
361         requester.start();
362 
363         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
364         final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
365         final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
366         try {
367 
368             final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
369 
370             queue.add(endpoint.execute(
371                     new BasicRequestProducer(Method.POST, target, "/stuff",
372                             new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
373                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
374             queue.add(endpoint.execute(
375                     new BasicRequestProducer(Method.POST, target, "/other-stuff",
376                             new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
377                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
378             queue.add(endpoint.execute(
379                     new BasicRequestProducer(Method.POST, target, "/more-stuff",
380                             new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
381                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
382 
383             while (!queue.isEmpty()) {
384                 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
385                 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
386                 Assert.assertThat(message, CoreMatchers.notNullValue());
387                 final HttpResponse response = message.getHead();
388                 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
389                 final String body = message.getBody();
390                 Assert.assertThat(body, CoreMatchers.containsString("stuff"));
391             }
392 
393         } finally {
394             endpoint.releaseAndReuse();
395         }
396     }
397 
398     @Test
399     public void testNonPersistentHeads() throws Exception {
400         server.start();
401         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
402         final ListenerEndpoint listener = future.get();
403         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
404         requester.start();
405 
406         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
407         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
408 
409         for (int i = 0; i < 20; i++) {
410             final HttpRequest head = new BasicHttpRequest(Method.HEAD, target, "/no-keep-alive/stuff?p=" + i);
411             queue.add(requester.execute(
412                     new BasicRequestProducer(head, null),
413                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
414         }
415 
416         while (!queue.isEmpty()) {
417             final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
418             final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
419             Assert.assertThat(message, CoreMatchers.notNullValue());
420             final HttpResponse response = message.getHead();
421             Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
422             Assert.assertThat(message.getBody(), CoreMatchers.nullValue());
423         }
424     }
425 
426 }