View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.classic;
28  
29  
30  import static org.junit.jupiter.api.Assertions.assertEquals;
31  import static org.junit.jupiter.api.Assertions.assertTrue;
32  
33  import java.util.Random;
34  import java.util.concurrent.BrokenBarrierException;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.CyclicBarrier;
37  
38  import org.apache.hc.client5.http.HttpRoute;
39  import org.apache.hc.client5.http.classic.BackoffManager;
40  import org.apache.hc.core5.http.HttpHost;
41  import org.apache.hc.core5.util.TimeValue;
42  import org.junit.jupiter.api.BeforeEach;
43  import org.junit.jupiter.api.Test;
44  
45  public class TestAIMDBackoffManager {
46  
47      private AIMDBackoffManager impl;
48      private MockConnPoolControl connPerRoute;
49      private HttpRoute route;
50      private static final long DEFAULT_COOL_DOWN_MS = 10; // Adjust this value to match the default cooldown period in AIMDBackoffManager
51  
52  
53      @BeforeEach
54      public void setUp() {
55          connPerRoute = new MockConnPoolControl();
56          route = new HttpRoute(new HttpHost("localhost", 80));
57          impl = new AIMDBackoffManager(connPerRoute);
58          impl.setPerHostConnectionCap(10);
59          impl.setCoolDown(TimeValue.ofMilliseconds(DEFAULT_COOL_DOWN_MS));
60  
61      }
62  
63      @Test
64      public void isABackoffManager() {
65          assertTrue(impl instanceof BackoffManager);
66      }
67  
68      @Test
69      public void halvesConnectionsOnBackoff() {
70          connPerRoute.setMaxPerRoute(route, 4);
71          impl.backOff(route);
72          assertEquals(2, connPerRoute.getMaxPerRoute(route));
73      }
74  
75      @Test
76      public void doesNotBackoffBelowOneConnection() {
77          connPerRoute.setMaxPerRoute(route, 1);
78          impl.backOff(route);
79          assertEquals(1, connPerRoute.getMaxPerRoute(route));
80      }
81  
82      @Test
83      public void increasesByOneOnProbe() {
84          connPerRoute.setMaxPerRoute(route, 2);
85          impl.probe(route);
86          assertEquals(3, connPerRoute.getMaxPerRoute(route));
87      }
88  
89      @Test
90      public void doesNotIncreaseBeyondPerHostMaxOnProbe() {
91          connPerRoute.setDefaultMaxPerRoute(5);
92          connPerRoute.setMaxPerRoute(route, 5);
93          impl.setPerHostConnectionCap(5);
94          impl.probe(route);
95          assertEquals(5, connPerRoute.getMaxPerRoute(route));
96      }
97  
98      @Test
99      public void backoffDoesNotAdjustDuringCoolDownPeriod() {
100         connPerRoute.setMaxPerRoute(route, 4);
101         impl.backOff(route);
102         final long max = connPerRoute.getMaxPerRoute(route);
103 
104         // Replace Thread.sleep(1) with busy waiting
105         final long end = System.currentTimeMillis() + 1;
106         while (System.currentTimeMillis() < end) {
107             // Busy waiting
108         }
109 
110         impl.backOff(route);
111         assertEquals(max, connPerRoute.getMaxPerRoute(route));
112     }
113 
114     @Test
115     public void backoffStillAdjustsAfterCoolDownPeriod() throws InterruptedException {
116         connPerRoute.setMaxPerRoute(route, 8);
117         impl.backOff(route);
118         final long max = connPerRoute.getMaxPerRoute(route);
119         Thread.sleep(DEFAULT_COOL_DOWN_MS + 100); // Sleep for cooldown period + 100 ms
120         impl.backOff(route);
121         assertTrue(max == 1 || max > connPerRoute.getMaxPerRoute(route));
122     }
123 
124     @Test
125     public void probeDoesNotAdjustDuringCooldownPeriod() {
126         connPerRoute.setMaxPerRoute(route, 4);
127         impl.probe(route);
128         final long max = connPerRoute.getMaxPerRoute(route);
129 
130         // Replace Thread.sleep(1) with busy waiting
131         final long end = System.currentTimeMillis() + 1;
132         while (System.currentTimeMillis() < end) {
133             // Busy waiting
134         }
135 
136         impl.probe(route);
137         assertEquals(max, connPerRoute.getMaxPerRoute(route));
138     }
139 
140     @Test
141     public void probeStillAdjustsAfterCoolDownPeriod() throws InterruptedException {
142         connPerRoute.setMaxPerRoute(route, 8);
143         impl.probe(route);
144         final long max = connPerRoute.getMaxPerRoute(route);
145         Thread.sleep(DEFAULT_COOL_DOWN_MS + 100); // Sleep for cooldown period + 1 ms
146         impl.probe(route);
147         assertTrue(max < connPerRoute.getMaxPerRoute(route));
148     }
149 
150     @Test
151     public void willBackoffImmediatelyEvenAfterAProbe() {
152         connPerRoute.setMaxPerRoute(route, 8);
153         final long now = System.currentTimeMillis();
154         impl.probe(route);
155         final long max = connPerRoute.getMaxPerRoute(route);
156         impl.backOff(route);
157         assertTrue(connPerRoute.getMaxPerRoute(route) < max);
158     }
159 
160     @Test
161     public void backOffFactorIsConfigurable() {
162         connPerRoute.setMaxPerRoute(route, 10);
163         impl.setBackoffFactor(0.9);
164         impl.backOff(route);
165         assertEquals(9, connPerRoute.getMaxPerRoute(route));
166     }
167 
168     @Test
169     public void coolDownPeriodIsConfigurable() throws InterruptedException {
170         final long cd = new Random().nextInt(500) + 500; // Random cooldown period between 500 and 1000 milliseconds
171         impl.setCoolDown(TimeValue.ofMilliseconds(cd));
172 
173         // Probe and check if the connection count remains the same during the cooldown period
174         impl.probe(route);
175         final int max0 = connPerRoute.getMaxPerRoute(route);
176         Thread.sleep(cd / 2 + 100); // Sleep for half the cooldown period + 100 ms buffer
177         impl.probe(route);
178         assertEquals(max0, connPerRoute.getMaxPerRoute(route));
179 
180         // Probe and check if the connection count increases after the cooldown period
181         Thread.sleep(cd / 2 + 100); // Sleep for the remaining half of the cooldown period + 100 ms buffer
182         impl.probe(route);
183         assertTrue(max0 < connPerRoute.getMaxPerRoute(route));
184     }
185 
186     @Test
187     public void testConcurrency() throws InterruptedException {
188         final int initialMaxPerRoute = 10;
189         final int numberOfThreads = 20;
190         final int numberOfOperationsPerThread = 100;  // reduced operations
191 
192         // Create a cyclic barrier that will wait for all threads to be ready before proceeding
193         final CyclicBarrier barrier = new CyclicBarrier(numberOfThreads);
194 
195         final CountDownLatch latch = new CountDownLatch(numberOfThreads);
196 
197         for (int i = 0; i < numberOfThreads; i++) {
198             final HttpRoute threadRoute = new HttpRoute(new HttpHost("localhost", 8080 + i)); // Each thread gets its own route
199             connPerRoute.setMaxPerRoute(threadRoute, initialMaxPerRoute);
200 
201             new Thread(() -> {
202                 try {
203                     // Wait for all threads to be ready
204                     barrier.await();
205 
206                     // Run operations
207                     for (int j = 0; j < numberOfOperationsPerThread; j++) {
208                         if (Math.random() < 0.5) {
209                             impl.backOff(threadRoute);
210                         } else {
211                             impl.probe(threadRoute);
212                         }
213                     }
214                 } catch (InterruptedException | BrokenBarrierException e) {
215                     Thread.currentThread().interrupt();
216                 } finally {
217                     latch.countDown();
218                 }
219             }).start();
220         }
221 
222         latch.await();
223 
224         // Check that the final value for each route is within an acceptable range
225         for (int i = 0; i < numberOfThreads; i++) {
226             final HttpRoute threadRoute = new HttpRoute(new HttpHost("localhost", 8080 + i));
227             final int finalMaxPerRoute = connPerRoute.getMaxPerRoute(threadRoute);
228             assertTrue(finalMaxPerRoute >= 1 && finalMaxPerRoute <= initialMaxPerRoute + 7);  // more permissive check
229         }
230     }
231 }