1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.client;
28
29 import java.io.IOException;
30 import java.util.LinkedList;
31 import java.util.Queue;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.FutureTask;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.concurrent.atomic.AtomicBoolean;
42
43 import org.apache.http.HttpException;
44 import org.apache.http.HttpRequest;
45 import org.apache.http.HttpResponse;
46 import org.apache.http.client.ClientProtocolException;
47 import org.apache.http.client.HttpClient;
48 import org.apache.http.client.ResponseHandler;
49 import org.apache.http.client.methods.HttpGet;
50 import org.apache.http.client.protocol.HttpClientContext;
51 import org.apache.http.concurrent.FutureCallback;
52 import org.apache.http.impl.bootstrap.HttpServer;
53 import org.apache.http.impl.bootstrap.ServerBootstrap;
54 import org.apache.http.protocol.HttpContext;
55 import org.apache.http.protocol.HttpRequestHandler;
56 import org.hamcrest.CoreMatchers;
57 import org.junit.After;
58 import org.junit.Assert;
59 import org.junit.Before;
60 import org.junit.Rule;
61 import org.junit.Test;
62 import org.junit.rules.ExpectedException;
63
64 @SuppressWarnings("boxing")
65 public class TestFutureRequestExecutionService {
66
67 private HttpServer localServer;
68 private String uri;
69 private FutureRequestExecutionService httpAsyncClientWithFuture;
70
71 private final AtomicBoolean blocked = new AtomicBoolean(false);
72
73 @Rule
74 public ExpectedException thrown = ExpectedException.none();
75
76 @Before
77 public void before() throws Exception {
78 this.localServer = ServerBootstrap.bootstrap()
79 .registerHandler("/wait", new HttpRequestHandler() {
80
81 @Override
82 public void handle(
83 final HttpRequest request, final HttpResponse response,
84 final HttpContext context) throws HttpException, IOException {
85 try {
86 while(blocked.get()) {
87 Thread.sleep(10);
88 }
89 } catch (final InterruptedException e) {
90 throw new IllegalStateException(e);
91 }
92 response.setStatusCode(200);
93 }
94 }).create();
95
96 this.localServer.start();
97 uri = "http://localhost:" + this.localServer.getLocalPort() + "/wait";
98 final HttpClient httpClient = HttpClientBuilder.create()
99 .setMaxConnPerRoute(5)
100 .build();
101 final ExecutorService executorService = Executors.newFixedThreadPool(5);
102 httpAsyncClientWithFuture = new FutureRequestExecutionService(httpClient, executorService);
103 }
104
105 @After
106 public void after() throws Exception {
107 blocked.set(false);
108 this.localServer.stop();
109 httpAsyncClientWithFuture.close();
110 }
111
112 @Test
113 public void shouldExecuteSingleCall() throws InterruptedException, ExecutionException {
114 final HttpRequestFutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
115 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
116 Assert.assertTrue("request should have returned OK", task.get().booleanValue());
117 }
118
119 @Test
120 public void shouldCancel() throws InterruptedException, ExecutionException {
121 thrown.expect(CoreMatchers.anyOf(
122 CoreMatchers.instanceOf(CancellationException.class),
123 CoreMatchers.instanceOf(ExecutionException.class)));
124
125 final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
126 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
127 task.cancel(true);
128 task.get();
129 }
130
131 @Test
132 public void shouldTimeout() throws InterruptedException, ExecutionException, TimeoutException {
133 thrown.expect(TimeoutException.class);
134
135 blocked.set(true);
136 final HttpRequestFutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
137 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
138 task.get(10, TimeUnit.MILLISECONDS);
139 }
140
141 @Test
142 public void shouldExecuteMultipleCalls() throws Exception {
143 final int reqNo = 100;
144 final Queue<Future<Boolean>> tasks = new LinkedList<Future<Boolean>>();
145 for(int i = 0; i < reqNo; i++) {
146 final Future<Boolean> task = httpAsyncClientWithFuture.execute(
147 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
148 tasks.add(task);
149 }
150 for (final Future<Boolean> task : tasks) {
151 final Boolean b = task.get();
152 Assert.assertNotNull(b);
153 Assert.assertTrue("request should have returned OK", b.booleanValue());
154 }
155 }
156
157 @Test
158 public void shouldExecuteMultipleCallsAndCallback() throws Exception {
159 final int reqNo = 100;
160 final Queue<Future<Boolean>> tasks = new LinkedList<Future<Boolean>>();
161 final CountDownLatch latch = new CountDownLatch(reqNo);
162 for(int i = 0; i < reqNo; i++) {
163 final Future<Boolean> task = httpAsyncClientWithFuture.execute(
164 new HttpGet(uri), HttpClientContext.create(),
165 new OkidokiHandler(), new CountingCallback(latch));
166 tasks.add(task);
167 }
168 Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
169 for (final Future<Boolean> task : tasks) {
170 final Boolean b = task.get();
171 Assert.assertNotNull(b);
172 Assert.assertTrue("request should have returned OK", b.booleanValue());
173 }
174 }
175
176 private final class CountingCallback implements FutureCallback<Boolean> {
177
178 private final CountDownLatch latch;
179
180 CountingCallback(final CountDownLatch latch) {
181 super();
182 this.latch = latch;
183 }
184
185 @Override
186 public void failed(final Exception ex) {
187 latch.countDown();
188 }
189
190 @Override
191 public void completed(final Boolean result) {
192 latch.countDown();
193 }
194
195 @Override
196 public void cancelled() {
197 latch.countDown();
198 }
199 }
200
201
202 private final class OkidokiHandler implements ResponseHandler<Boolean> {
203 @Override
204 public Boolean handleResponse(
205 final HttpResponse response) throws ClientProtocolException, IOException {
206 return response.getStatusLine().getStatusCode() == 200;
207 }
208 }
209
210 }