View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.concurrent;
28  
29  import static org.junit.jupiter.api.Assertions.assertEquals;
30  import static org.junit.jupiter.api.Assertions.assertThrows;
31  import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
32  import static org.junit.jupiter.api.Assertions.fail;
33  
34  import java.time.Duration;
35  import java.util.concurrent.CancellationException;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.Executors;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  
44  import org.apache.hc.core5.util.TimeoutValueException;
45  import org.junit.jupiter.api.Assertions;
46  import org.junit.jupiter.api.Test;
47  import org.mockito.Mockito;
48  
49  public class TestBasicFuture {
50  
51      @Test
52      public void testCompleted() throws Exception {
53          final FutureCallback<Object> callback = Mockito.mock(FutureCallback.class);
54          final BasicFuture<Object> future = new BasicFuture<>(callback);
55  
56          Assertions.assertFalse(future.isDone());
57  
58          final Object result = new Object();
59          final Exception boom = new Exception();
60          future.completed(result);
61          future.failed(boom);
62          Mockito.verify(callback).completed(result);
63          Mockito.verify(callback, Mockito.never()).failed(Mockito.any());
64          Mockito.verify(callback, Mockito.never()).cancelled();
65  
66          Assertions.assertSame(result, future.get());
67          Assertions.assertTrue(future.isDone());
68          Assertions.assertFalse(future.isCancelled());
69  
70      }
71  
72      @Test
73      public void testCompletedWithTimeout() throws Exception {
74          final FutureCallback<Object> callback = Mockito.mock(FutureCallback.class);
75          final BasicFuture<Object> future = new BasicFuture<>(callback);
76  
77          Assertions.assertFalse(future.isDone());
78  
79          final Object result = new Object();
80          final Exception boom = new Exception();
81          future.completed(result);
82          future.failed(boom);
83          Mockito.verify(callback).completed(result);
84          Mockito.verify(callback, Mockito.never()).failed(Mockito.any());
85          Mockito.verify(callback, Mockito.never()).cancelled();
86  
87          Assertions.assertSame(result, future.get(1, TimeUnit.MILLISECONDS));
88          Assertions.assertTrue(future.isDone());
89          Assertions.assertFalse(future.isCancelled());
90      }
91  
92      @Test
93      public void testFailed() throws Exception {
94          final FutureCallback<Object> callback = Mockito.mock(FutureCallback.class);
95          final BasicFuture<Object> future = new BasicFuture<>(callback);
96          final Object result = new Object();
97          final Exception boom = new Exception();
98          future.failed(boom);
99          future.completed(result);
100         Mockito.verify(callback, Mockito.never()).completed(Mockito.any());
101         Mockito.verify(callback).failed(boom);
102         Mockito.verify(callback, Mockito.never()).cancelled();
103 
104         try {
105             future.get();
106         } catch (final ExecutionException ex) {
107             Assertions.assertSame(boom, ex.getCause());
108         }
109         Assertions.assertTrue(future.isDone());
110         Assertions.assertFalse(future.isCancelled());
111     }
112 
113     @Test
114     public void testCancelled() throws Exception {
115         final FutureCallback<Object> callback = Mockito.mock(FutureCallback.class);
116         final BasicFuture<Object> future = new BasicFuture<>(callback);
117         final Object result = new Object();
118         final Exception boom = new Exception();
119         future.cancel(true);
120         future.failed(boom);
121         future.completed(result);
122         Mockito.verify(callback, Mockito.never()).completed(Mockito.any());
123         Mockito.verify(callback, Mockito.never()).failed(Mockito.any());
124         Mockito.verify(callback).cancelled();
125 
126         assertThrows(CancellationException.class, future::get);
127         Assertions.assertTrue(future.isDone());
128         Assertions.assertTrue(future.isCancelled());
129     }
130 
131     @Test
132     public void testAsyncCompleted() throws Exception {
133         final BasicFuture<Object> future = new BasicFuture<>(null);
134         final Object result = new Object();
135 
136         final Thread t = new Thread(() -> {
137             try {
138                 Thread.sleep(100);
139                 future.completed(result);
140             } catch (final InterruptedException boom) {
141             }
142         });
143         t.setDaemon(true);
144         t.start();
145         Assertions.assertSame(result, future.get(60, TimeUnit.SECONDS));
146         Assertions.assertTrue(future.isDone());
147         Assertions.assertFalse(future.isCancelled());
148     }
149 
150     @Test
151     public void testAsyncFailed() throws Exception {
152         final BasicFuture<Object> future = new BasicFuture<>(null);
153         final Exception boom = new Exception();
154 
155         final Thread t = new Thread(() -> {
156             try {
157                 Thread.sleep(100);
158                 future.failed(boom);
159             } catch (final InterruptedException ex) {
160             }
161         });
162         t.setDaemon(true);
163         t.start();
164         try {
165             future.get(60, TimeUnit.SECONDS);
166         } catch (final ExecutionException ex) {
167             Assertions.assertSame(boom, ex.getCause());
168         }
169         Assertions.assertTrue(future.isDone());
170         Assertions.assertFalse(future.isCancelled());
171     }
172 
173     @Test
174     public void testAsyncCancelled() throws Exception {
175         final BasicFuture<Object> future = new BasicFuture<>(null);
176 
177         final Thread t = new Thread(() -> {
178             try {
179                 Thread.sleep(100);
180                 future.cancel(true);
181             } catch (final InterruptedException ex) {
182             }
183         });
184         t.setDaemon(true);
185         t.start();
186         assertThrows(CancellationException.class, () ->
187                 future.get(60, TimeUnit.SECONDS));
188     }
189 
190     @Test
191     public void testAsyncTimeout() throws Exception {
192         final BasicFuture<Object> future = new BasicFuture<>(null);
193         final Object result = new Object();
194 
195         final Thread t = new Thread(() -> {
196             try {
197                 Thread.sleep(200);
198                 future.completed(result);
199             } catch (final InterruptedException ex) {
200             }
201         });
202         t.setDaemon(true);
203         t.start();
204         assertThrows(TimeoutException.class, () ->
205                 future.get(1, TimeUnit.MILLISECONDS));
206     }
207 
208     @Test
209     public void testAsyncNegativeTimeout() throws Exception {
210         final BasicFuture<Object> future = new BasicFuture<>(null);
211         assertThrows(TimeoutValueException.class, () ->
212                 future.get(-1, TimeUnit.MILLISECONDS));
213     }
214 
215     @Test
216     public void testConcurrentOperations() throws InterruptedException, ExecutionException {
217         final FutureCallback<Object> callback = new FutureCallback<Object>() {
218             public void completed(final Object result) {
219             }
220 
221             public void failed(final Exception ex) {
222             }
223 
224             public void cancelled() {
225             }
226         };
227 
228         final ExecutorService executor = Executors.newFixedThreadPool(3);
229         final BasicFuture<Object> future = new BasicFuture<>(callback);
230         final Object expectedResult = new Object();
231 
232         final AtomicBoolean completedSuccessfully = new AtomicBoolean(false);
233         final AtomicBoolean failedSuccessfully = new AtomicBoolean(false);
234         final AtomicBoolean cancelledSuccessfully = new AtomicBoolean(false);
235 
236         // Run 3 tasks concurrently: complete, fail, and cancel the future.
237         final Future<?> future1 = executor.submit(() -> completedSuccessfully.set(future.completed(expectedResult)));
238         final Future<?> future2 = executor.submit(() -> failedSuccessfully.set(future.failed(new Exception("Test Exception"))));
239         final Future<?> future3 = executor.submit(() -> cancelledSuccessfully.set(future.cancel()));
240 
241         // Wait for the tasks to finish.
242         future1.get();
243         future2.get();
244         future3.get();
245 
246         // Verify that the first operation won and the other two failed.
247         if (completedSuccessfully.get()) {
248             assertEquals(expectedResult, future.get());
249         } else if (failedSuccessfully.get()) {
250             assertThrows(ExecutionException.class, future::get);
251         } else if (cancelledSuccessfully.get()) {
252             assertThrows(CancellationException.class, future::get);
253         } else {
254             fail("No operation was successful on the future.");
255         }
256 
257         // Shutdown the executor.
258         executor.shutdown();
259     }
260 
261     @Test
262     void testGetWithTimeout() {
263         final AtomicBoolean isFutureCompleted = new AtomicBoolean(false);
264 
265         final FutureCallback<String> callback = new FutureCallback<String>() {
266             @Override
267             public void completed(final String result) {
268                 isFutureCompleted.set(true);
269             }
270 
271             @Override
272             public void failed(final Exception ex) {
273                 // Nothing to do here for this example
274             }
275 
276             @Override
277             public void cancelled() {
278                 // Nothing to do here for this example
279             }
280         };
281 
282         final BasicFuture<String> future = new BasicFuture<>(callback);
283 
284         new Thread(() -> future.completed("test")).start();
285 
286         // Poll until the future is completed or timeout
287         assertTimeoutPreemptively(Duration.ofMillis(200), () -> {
288             while (!isFutureCompleted.get()) {
289                 // This loop will spin until the future is completed or the assertTimeoutPreemptively times out.
290                 Thread.yield();
291             }
292 
293             try {
294                 assertEquals("test", future.get(1, TimeUnit.SECONDS));
295             } catch (final ExecutionException | TimeoutException e) {
296                 fail("Test failed due to exception: " + e.getMessage());
297             }
298         });
299     }
300 }