View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import java.net.InetSocketAddress;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import org.apache.hc.core5.concurrent.FutureCallback;
37  import org.apache.hc.core5.function.Supplier;
38  import org.apache.hc.core5.http.HttpHost;
39  import org.apache.hc.core5.http.URIScheme;
40  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
41  import org.apache.hc.core5.http.impl.routing.RequestRouter;
42  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
43  import org.apache.hc.core5.http2.HttpVersionPolicy;
44  import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester;
45  import org.apache.hc.core5.http2.nio.command.PingCommand;
46  import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
47  import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
48  import org.apache.hc.core5.reactor.Command;
49  import org.apache.hc.core5.reactor.IOReactorConfig;
50  import org.apache.hc.core5.reactor.IOSession;
51  import org.apache.hc.core5.reactor.ListenerEndpoint;
52  import org.apache.hc.core5.testing.nio.extension.H2AsyncServerResource;
53  import org.apache.hc.core5.testing.nio.extension.H2MultiplexingRequesterResource;
54  import org.apache.hc.core5.util.Timeout;
55  import org.junit.jupiter.api.Assertions;
56  import org.junit.jupiter.api.BeforeEach;
57  import org.junit.jupiter.api.Test;
58  import org.junit.jupiter.api.extension.RegisterExtension;
59  
60  public class H2ConnPoolTest {
61  
62      private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
63  
64      private final AtomicLong clientConnCount;
65      @RegisterExtension
66      private final H2AsyncServerResource serverResource;
67      @RegisterExtension
68      private final H2MultiplexingRequesterResource clientResource;
69  
70      public H2ConnPoolTest() throws Exception {
71          this.serverResource = new H2AsyncServerResource(bootstrap -> bootstrap
72                  .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
73                  .setIOReactorConfig(
74                          IOReactorConfig.custom()
75                                  .setSoTimeout(TIMEOUT)
76                                  .build())
77                  .setRequestRouter(RequestRouter.<Supplier<AsyncServerExchangeHandler>>builder()
78                          .addRoute(RequestRouter.LOCAL_AUTHORITY, "*", () -> new EchoHandler(2048))
79                          .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER)
80                          .build())
81          );
82  
83          this.clientConnCount = new AtomicLong();
84          this.clientResource = new H2MultiplexingRequesterResource(bootstrap -> bootstrap
85                  .setIOReactorConfig(IOReactorConfig.custom()
86                          .setSoTimeout(TIMEOUT)
87                          .build())
88                  .setIOSessionListener(new LoggingIOSessionListener() {
89  
90                      @Override
91                      public void connected(final IOSession session) {
92                          clientConnCount.incrementAndGet();
93                          super.connected(session);
94                      }
95  
96                  })
97          );
98      }
99  
100     @BeforeEach
101     public void resetCounts() {
102         clientConnCount.set(0);
103     }
104 
105     @Test
106     public void testManyGetSession() throws Exception {
107         final int n = 200;
108 
109         final HttpAsyncServer server = serverResource.start();
110         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), URIScheme.HTTP);
111         final ListenerEndpoint listener = future.get();
112         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
113         final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", address.getPort());
114 
115         final H2MultiplexingRequester requester = clientResource.start();
116         final H2ConnPool connPool = requester.getConnPool();
117         final CountDownLatch latch = new CountDownLatch(n);
118         for (int i = 0; i < n; i++) {
119             connPool.getSession(target, TIMEOUT, new FutureCallback<IOSession>() {
120 
121                 @Override
122                 public void completed(final IOSession session) {
123                     session.enqueue(new PingCommand(new BasicPingHandler(
124                             result -> {
125                                 latch.countDown();
126                             })), Command.Priority.IMMEDIATE);
127                 }
128 
129                 @Override
130                 public void failed(final Exception ex) {
131                     latch.countDown();
132                 }
133 
134                 @Override
135                 public void cancelled() {
136                     latch.countDown();
137                 }
138 
139             });
140         }
141         Assertions.assertTrue(latch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
142 
143         requester.initiateShutdown();
144         requester.awaitShutdown(TIMEOUT);
145 
146         Assertions.assertEquals(1, clientConnCount.get());
147     }
148 
149     @Test
150     public void testManyGetSessionFailures() throws Exception {
151         final int n = 200;
152 
153         final HttpHost target = new HttpHost(URIScheme.HTTP.id, "pampa.invalid", 8888);
154 
155         final H2MultiplexingRequester requester = clientResource.start();
156         final H2ConnPool connPool = requester.getConnPool();
157         final CountDownLatch latch = new CountDownLatch(n);
158         final ConcurrentLinkedQueue<Long> concurrentConnections = new ConcurrentLinkedQueue<>();
159         for (int i = 0; i < n; i++) {
160             connPool.getSession(target, TIMEOUT, new FutureCallback<IOSession>() {
161 
162                 @Override
163                 public void completed(final IOSession session) {
164                     latch.countDown();
165                 }
166 
167                 @Override
168                 public void failed(final Exception ex) {
169                     latch.countDown();
170                 }
171 
172                 @Override
173                 public void cancelled() {
174                     latch.countDown();
175                 }
176 
177             });
178         }
179 
180         requester.initiateShutdown();
181         requester.awaitShutdown(TIMEOUT);
182 
183         Assertions.assertEquals(0, clientConnCount.get());
184     }
185 
186 }