View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.client;
28  
29  import java.io.IOException;
30  import java.util.LinkedList;
31  import java.util.Queue;
32  import java.util.concurrent.CancellationException;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.ExecutorService;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.FutureTask;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  
43  import org.apache.http.HttpException;
44  import org.apache.http.HttpRequest;
45  import org.apache.http.HttpResponse;
46  import org.apache.http.client.ClientProtocolException;
47  import org.apache.http.client.HttpClient;
48  import org.apache.http.client.ResponseHandler;
49  import org.apache.http.client.methods.HttpGet;
50  import org.apache.http.client.protocol.HttpClientContext;
51  import org.apache.http.concurrent.FutureCallback;
52  import org.apache.http.impl.bootstrap.HttpServer;
53  import org.apache.http.impl.bootstrap.ServerBootstrap;
54  import org.apache.http.protocol.HttpContext;
55  import org.apache.http.protocol.HttpRequestHandler;
56  import org.hamcrest.CoreMatchers;
57  import org.junit.After;
58  import org.junit.Assert;
59  import org.junit.Before;
60  import org.junit.Rule;
61  import org.junit.Test;
62  import org.junit.rules.ExpectedException;
63  
64  @SuppressWarnings("boxing") // test code
65  public class TestFutureRequestExecutionService {
66  
67      private HttpServer localServer;
68      private String uri;
69      private FutureRequestExecutionService httpAsyncClientWithFuture;
70  
71      private final AtomicBoolean blocked = new AtomicBoolean(false);
72  
73      @Rule
74      public ExpectedException thrown = ExpectedException.none();
75  
76      @Before
77      public void before() throws Exception {
78              this.localServer = ServerBootstrap.bootstrap()
79                      .registerHandler("/wait", new HttpRequestHandler() {
80  
81                  @Override
82                  public void handle(
83                          final HttpRequest request, final HttpResponse response,
84                          final HttpContext context) throws HttpException, IOException {
85                      try {
86                          while(blocked.get()) {
87                              Thread.sleep(10);
88                          }
89                      } catch (final InterruptedException e) {
90                          throw new IllegalStateException(e);
91                      }
92                      response.setStatusCode(200);
93                  }
94              }).create();
95  
96              this.localServer.start();
97              uri = "http://localhost:" + this.localServer.getLocalPort() + "/wait";
98              final HttpClient httpClient = HttpClientBuilder.create()
99                      .setMaxConnPerRoute(5)
100                     .build();
101             final ExecutorService executorService = Executors.newFixedThreadPool(5);
102             httpAsyncClientWithFuture = new FutureRequestExecutionService(httpClient, executorService);
103     }
104 
105     @After
106     public void after() throws Exception {
107         blocked.set(false); // any remaining requests should unblock
108         this.localServer.stop();
109         httpAsyncClientWithFuture.close();
110     }
111 
112     @Test
113     public void shouldExecuteSingleCall() throws InterruptedException, ExecutionException {
114         final HttpRequestFutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
115             new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
116         Assert.assertTrue("request should have returned OK", task.get().booleanValue());
117     }
118 
119     @Test
120     public void shouldCancel() throws InterruptedException, ExecutionException {
121         thrown.expect(CoreMatchers.anyOf(
122                 CoreMatchers.instanceOf(CancellationException.class),
123                 CoreMatchers.instanceOf(ExecutionException.class)));
124 
125         final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
126                 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
127         task.cancel(true);
128         task.get();
129     }
130 
131     @Test
132     public void shouldTimeout() throws InterruptedException, ExecutionException, TimeoutException {
133         thrown.expect(TimeoutException.class);
134 
135         blocked.set(true);
136         final HttpRequestFutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
137             new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
138         task.get(10, TimeUnit.MILLISECONDS);
139     }
140 
141     @Test
142     public void shouldExecuteMultipleCalls() throws Exception {
143         final int reqNo = 100;
144         final Queue<Future<Boolean>> tasks = new LinkedList<Future<Boolean>>();
145         for(int i = 0; i < reqNo; i++) {
146             final Future<Boolean> task = httpAsyncClientWithFuture.execute(
147                     new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
148             tasks.add(task);
149         }
150         for (final Future<Boolean> task : tasks) {
151             final Boolean b = task.get();
152             Assert.assertNotNull(b);
153             Assert.assertTrue("request should have returned OK", b.booleanValue());
154         }
155     }
156 
157     @Test
158     public void shouldExecuteMultipleCallsAndCallback() throws Exception {
159         final int reqNo = 100;
160         final Queue<Future<Boolean>> tasks = new LinkedList<Future<Boolean>>();
161         final CountDownLatch latch = new CountDownLatch(reqNo);
162         for(int i = 0; i < reqNo; i++) {
163             final Future<Boolean> task = httpAsyncClientWithFuture.execute(
164                     new HttpGet(uri), HttpClientContext.create(),
165                     new OkidokiHandler(), new CountingCallback(latch));
166             tasks.add(task);
167         }
168         Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
169         for (final Future<Boolean> task : tasks) {
170             final Boolean b = task.get();
171             Assert.assertNotNull(b);
172             Assert.assertTrue("request should have returned OK", b.booleanValue());
173         }
174     }
175 
176     private final class CountingCallback implements FutureCallback<Boolean> {
177 
178         private final CountDownLatch latch;
179 
180         CountingCallback(final CountDownLatch latch) {
181             super();
182             this.latch = latch;
183         }
184 
185         @Override
186         public void failed(final Exception ex) {
187             latch.countDown();
188         }
189 
190         @Override
191         public void completed(final Boolean result) {
192             latch.countDown();
193         }
194 
195         @Override
196         public void cancelled() {
197             latch.countDown();
198         }
199     }
200 
201 
202     private final class OkidokiHandler implements ResponseHandler<Boolean> {
203         @Override
204         public Boolean handleResponse(
205                 final HttpResponse response) throws ClientProtocolException, IOException {
206             return response.getStatusLine().getStatusCode() == 200;
207         }
208     }
209 
210 }