View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.testing.async;
28  
29  import java.util.concurrent.Future;
30  
31  import org.apache.hc.client5.http.HttpRoute;
32  import org.apache.hc.client5.http.UserTokenHandler;
33  import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
34  import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
35  import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
36  import org.apache.hc.client5.http.config.RequestConfig;
37  import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
38  import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
39  import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
40  import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
41  import org.apache.hc.client5.http.protocol.HttpClientContext;
42  import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
43  import org.apache.hc.client5.testing.SSLTestContexts;
44  import org.apache.hc.core5.function.Supplier;
45  import org.apache.hc.core5.http.ContentType;
46  import org.apache.hc.core5.http.EndpointDetails;
47  import org.apache.hc.core5.http.HttpException;
48  import org.apache.hc.core5.http.HttpHost;
49  import org.apache.hc.core5.http.HttpResponse;
50  import org.apache.hc.core5.http.HttpStatus;
51  import org.apache.hc.core5.http.config.Http1Config;
52  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
53  import org.apache.hc.core5.http.protocol.BasicHttpContext;
54  import org.apache.hc.core5.http.protocol.HttpContext;
55  import org.apache.hc.core5.http.protocol.HttpCoreContext;
56  import org.junit.Assert;
57  import org.junit.Rule;
58  import org.junit.Test;
59  import org.junit.rules.ExternalResource;
60  
61  public class TestHttp1AsyncStatefulConnManagement extends AbstractIntegrationTestBase<CloseableHttpAsyncClient> {
62  
63      protected HttpAsyncClientBuilder clientBuilder;
64      protected PoolingAsyncClientConnectionManager connManager;
65  
66      @Rule
67      public ExternalResource connManagerResource = new ExternalResource() {
68  
69          @Override
70          protected void before() throws Throwable {
71              connManager = PoolingAsyncClientConnectionManagerBuilder.create()
72                      .setTlsStrategy(new DefaultClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
73                      .build();
74          }
75  
76          @Override
77          protected void after() {
78              if (connManager != null) {
79                  connManager.close();
80                  connManager = null;
81              }
82          }
83  
84      };
85  
86      @Rule
87      public ExternalResource clientBuilderResource = new ExternalResource() {
88  
89          @Override
90          protected void before() throws Throwable {
91              clientBuilder = HttpAsyncClientBuilder.create()
92                      .setDefaultRequestConfig(RequestConfig.custom()
93                              .setConnectTimeout(TIMEOUT)
94                              .setConnectionRequestTimeout(TIMEOUT)
95                              .build())
96                      .setConnectionManager(connManager);
97          }
98  
99      };
100 
101     @Override
102     protected CloseableHttpAsyncClient createClient() throws Exception {
103         return clientBuilder.build();
104     }
105 
106     @Override
107     public HttpHost start() throws Exception {
108         return super.start(null, Http1Config.DEFAULT);
109     }
110 
111     @Test
112     public void testStatefulConnections() throws Exception {
113         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
114 
115             @Override
116             public AsyncServerExchangeHandler get() {
117                 return new AbstractSimpleServerExchangeHandler() {
118 
119                     @Override
120                     protected SimpleHttpResponse handle(
121                             final SimpleHttpRequest request,
122                             final HttpCoreContext context) throws HttpException {
123                         final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
124                         response.setBody("Whatever", ContentType.TEXT_PLAIN);
125                         return response;
126                     }
127                 };
128             }
129 
130         });
131 
132         final UserTokenHandler userTokenHandler = new UserTokenHandler() {
133 
134             @Override
135             public Object getUserToken(final HttpRoute route, final HttpContext context) {
136                 return context.getAttribute("user");
137             }
138 
139         };
140         clientBuilder.setUserTokenHandler(userTokenHandler);
141         final HttpHost target = start();
142 
143         final int workerCount = 2;
144         final int requestCount = 5;
145 
146         final HttpContext[] contexts = new HttpContext[workerCount];
147         final HttpWorker[] workers = new HttpWorker[workerCount];
148         for (int i = 0; i < contexts.length; i++) {
149             final HttpClientContext context = HttpClientContext.create();
150             contexts[i] = context;
151             workers[i] = new HttpWorker(
152                     "user" + i,
153                     context, requestCount, target, httpclient);
154         }
155 
156         for (final HttpWorker worker : workers) {
157             worker.start();
158         }
159         for (final HttpWorker worker : workers) {
160             worker.join(LONG_TIMEOUT.toMilliseconds());
161         }
162         for (final HttpWorker worker : workers) {
163             final Exception ex = worker.getException();
164             if (ex != null) {
165                 throw ex;
166             }
167             Assert.assertEquals(requestCount, worker.getCount());
168         }
169 
170         for (final HttpContext context : contexts) {
171             final String state0 = (String) context.getAttribute("r0");
172             Assert.assertNotNull(state0);
173             for (int r = 1; r < requestCount; r++) {
174                 Assert.assertEquals(state0, context.getAttribute("r" + r));
175             }
176         }
177 
178     }
179 
180     static class HttpWorker extends Thread {
181 
182         private final String uid;
183         private final HttpClientContext context;
184         private final int requestCount;
185         private final HttpHost target;
186         private final CloseableHttpAsyncClient httpclient;
187 
188         private volatile Exception exception;
189         private volatile int count;
190 
191         public HttpWorker(
192                 final String uid,
193                 final HttpClientContext context,
194                 final int requestCount,
195                 final HttpHost target,
196                 final CloseableHttpAsyncClient httpclient) {
197             super();
198             this.uid = uid;
199             this.context = context;
200             this.requestCount = requestCount;
201             this.target = target;
202             this.httpclient = httpclient;
203             this.count = 0;
204         }
205 
206         public int getCount() {
207             return count;
208         }
209 
210         public Exception getException() {
211             return exception;
212         }
213 
214         @Override
215         public void run() {
216             try {
217                 context.setAttribute("user", uid);
218                 for (int r = 0; r < requestCount; r++) {
219                     final SimpleHttpRequest httpget = SimpleHttpRequests.get(target, "/");
220                     final Future<SimpleHttpResponse> future = httpclient.execute(httpget, null);
221                     future.get();
222 
223                     count++;
224                     final EndpointDetails endpointDetails = context.getEndpointDetails();
225                     final String connuid = Integer.toHexString(System.identityHashCode(endpointDetails));
226                     context.setAttribute("r" + r, connuid);
227                 }
228 
229             } catch (final Exception ex) {
230                 exception = ex;
231             }
232         }
233 
234     }
235 
236     @Test
237     public void testRouteSpecificPoolRecylcing() throws Exception {
238         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
239 
240             @Override
241             public AsyncServerExchangeHandler get() {
242                 return new AbstractSimpleServerExchangeHandler() {
243 
244                     @Override
245                     protected SimpleHttpResponse handle(
246                             final SimpleHttpRequest request,
247                             final HttpCoreContext context) throws HttpException {
248                         final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
249                         response.setBody("Whatever", ContentType.TEXT_PLAIN);
250                         return response;
251                     }
252                 };
253             }
254 
255         });
256 
257         // This tests what happens when a maxed connection pool needs
258         // to kill the last idle connection to a route to build a new
259         // one to the same route.
260         final UserTokenHandler userTokenHandler = new UserTokenHandler() {
261 
262             @Override
263             public Object getUserToken(final HttpRoute route, final HttpContext context) {
264                 return context.getAttribute("user");
265             }
266 
267         };
268         clientBuilder.setUserTokenHandler(userTokenHandler);
269 
270         final HttpHost target = start();
271         final int maxConn = 2;
272         // We build a client with 2 max active // connections, and 2 max per route.
273         connManager.setMaxTotal(maxConn);
274         connManager.setDefaultMaxPerRoute(maxConn);
275 
276         // Bottom of the pool : a *keep alive* connection to Route 1.
277         final HttpContext context1 = new BasicHttpContext();
278         context1.setAttribute("user", "stuff");
279 
280         final Future<SimpleHttpResponse> future1 = httpclient.execute(SimpleHttpRequests.get(target, "/"), context1, null);
281         final HttpResponse response1 = future1.get();
282         Assert.assertNotNull(response1);
283         Assert.assertEquals(200, response1.getCode());
284 
285         // The ConnPoolByRoute now has 1 free connection, out of 2 max
286         // The ConnPoolByRoute has one RouteSpcfcPool, that has one free connection
287         // for [localhost][stuff]
288 
289         Thread.sleep(100);
290 
291         // Send a very simple HTTP get (it MUST be simple, no auth, no proxy, no 302, no 401, ...)
292         // Send it to another route. Must be a keepalive.
293         final HttpContext context2 = new BasicHttpContext();
294 
295         final Future<SimpleHttpResponse> future2 = httpclient.execute(SimpleHttpRequests.get(
296                 new HttpHost(target.getSchemeName(), "127.0.0.1", target.getPort()),"/"), context2, null);
297         final HttpResponse response2 = future2.get();
298         Assert.assertNotNull(response2);
299         Assert.assertEquals(200, response2.getCode());
300 
301         // ConnPoolByRoute now has 2 free connexions, out of its 2 max.
302         // The [localhost][stuff] RouteSpcfcPool is the same as earlier
303         // And there is a [127.0.0.1][null] pool with 1 free connection
304 
305         Thread.sleep(100);
306 
307         // This will put the ConnPoolByRoute to the targeted state :
308         // [localhost][stuff] will not get reused because this call is [localhost][null]
309         // So the ConnPoolByRoute will need to kill one connection (it is maxed out globally).
310         // The killed conn is the oldest, which means the first HTTPGet ([localhost][stuff]).
311         // When this happens, the RouteSpecificPool becomes empty.
312         final HttpContext context3 = new BasicHttpContext();
313         final Future<SimpleHttpResponse> future3 = httpclient.execute(SimpleHttpRequests.get(target, "/"), context3, null);
314         final HttpResponse response3 = future3.get();
315         Assert.assertNotNull(response3);
316         Assert.assertEquals(200, response3.getCode());
317     }
318 
319 }