1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.classic;
28
29 import static org.hamcrest.MatcherAssert.assertThat;
30
31 import java.io.IOException;
32 import java.util.LinkedList;
33 import java.util.Queue;
34 import java.util.concurrent.CancellationException;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.FutureTask;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicBoolean;
44
45 import org.apache.hc.client5.http.classic.methods.HttpGet;
46 import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
47 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
48 import org.apache.hc.client5.http.protocol.HttpClientContext;
49 import org.apache.hc.core5.concurrent.FutureCallback;
50 import org.apache.hc.core5.http.ClassicHttpResponse;
51 import org.apache.hc.core5.http.impl.bootstrap.HttpServer;
52 import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap;
53 import org.apache.hc.core5.http.io.HttpClientResponseHandler;
54 import org.hamcrest.CoreMatchers;
55 import org.junit.jupiter.api.AfterEach;
56 import org.junit.jupiter.api.Assertions;
57 import org.junit.jupiter.api.BeforeEach;
58 import org.junit.jupiter.api.Test;
59
60 @SuppressWarnings("boxing")
61 public class TestFutureRequestExecutionService {
62
63 private HttpServer localServer;
64 private String uri;
65 private FutureRequestExecutionService httpAsyncClientWithFuture;
66
67 private final AtomicBoolean blocked = new AtomicBoolean(false);
68
69 @BeforeEach
70 public void before() throws Exception {
71 this.localServer = ServerBootstrap.bootstrap()
72 .register("/wait", (request, response, context) -> {
73 try {
74 while(blocked.get()) {
75 Thread.sleep(10);
76 }
77 } catch (final InterruptedException e) {
78 throw new IllegalStateException(e);
79 }
80 response.setCode(200);
81 }).create();
82
83 this.localServer.start();
84 uri = "http://localhost:" + this.localServer.getLocalPort() + "/wait";
85 final HttpClientConnectionManager cm = PoolingHttpClientConnectionManagerBuilder.create()
86 .setMaxConnPerRoute(5)
87 .build();
88 final CloseableHttpClient httpClient = HttpClientBuilder.create()
89 .setConnectionManager(cm)
90 .build();
91 final ExecutorService executorService = Executors.newFixedThreadPool(5);
92 httpAsyncClientWithFuture = new FutureRequestExecutionService(httpClient, executorService);
93 }
94
95 @AfterEach
96 public void after() throws Exception {
97 blocked.set(false);
98 this.localServer.stop();
99 httpAsyncClientWithFuture.close();
100 }
101
102 @Test
103 public void shouldExecuteSingleCall() throws InterruptedException, ExecutionException {
104 final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
105 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
106 Assertions.assertTrue(task.get(), "request should have returned OK");
107 }
108
109 @Test
110 public void shouldCancel() throws InterruptedException, ExecutionException {
111 final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
112 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
113 task.cancel(true);
114 final Exception exception = Assertions.assertThrows(Exception.class, task::get);
115 assertThat(exception, CoreMatchers.anyOf(
116 CoreMatchers.instanceOf(CancellationException.class),
117 CoreMatchers.instanceOf(ExecutionException.class)));
118 }
119
120 @Test
121 public void shouldTimeout() throws InterruptedException, ExecutionException, TimeoutException {
122 blocked.set(true);
123 final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
124 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
125 Assertions.assertThrows(TimeoutException.class, () ->
126 task.get(10, TimeUnit.MILLISECONDS));
127 }
128
129 @Test
130 public void shouldExecuteMultipleCalls() throws Exception {
131 final int reqNo = 100;
132 final Queue<Future<Boolean>> tasks = new LinkedList<>();
133 for(int i = 0; i < reqNo; i++) {
134 final Future<Boolean> task = httpAsyncClientWithFuture.execute(
135 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
136 tasks.add(task);
137 }
138 for (final Future<Boolean> task : tasks) {
139 final Boolean b = task.get();
140 Assertions.assertNotNull(b);
141 Assertions.assertTrue(b, "request should have returned OK");
142 }
143 }
144
145 @Test
146 public void shouldExecuteMultipleCallsAndCallback() throws Exception {
147 final int reqNo = 100;
148 final Queue<Future<Boolean>> tasks = new LinkedList<>();
149 final CountDownLatch latch = new CountDownLatch(reqNo);
150 for(int i = 0; i < reqNo; i++) {
151 final Future<Boolean> task = httpAsyncClientWithFuture.execute(
152 new HttpGet(uri), HttpClientContext.create(),
153 new OkidokiHandler(), new CountingCallback(latch));
154 tasks.add(task);
155 }
156 Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
157 for (final Future<Boolean> task : tasks) {
158 final Boolean b = task.get();
159 Assertions.assertNotNull(b);
160 Assertions.assertTrue(b, "request should have returned OK");
161 }
162 }
163
164 private final class CountingCallback implements FutureCallback<Boolean> {
165
166 private final CountDownLatch latch;
167
168 CountingCallback(final CountDownLatch latch) {
169 super();
170 this.latch = latch;
171 }
172
173 @Override
174 public void failed(final Exception ex) {
175 latch.countDown();
176 }
177
178 @Override
179 public void completed(final Boolean result) {
180 latch.countDown();
181 }
182
183 @Override
184 public void cancelled() {
185 latch.countDown();
186 }
187 }
188
189
190 private final class OkidokiHandler implements HttpClientResponseHandler<Boolean> {
191 @Override
192 public Boolean handleResponse(
193 final ClassicHttpResponse response) throws IOException {
194 return response.getCode() == 200;
195 }
196 }
197
198 }