View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.testing.async;
28  
29  import java.util.LinkedList;
30  import java.util.Queue;
31  import java.util.Random;
32  import java.util.concurrent.ConcurrentLinkedQueue;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.ExecutorService;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
40  import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
41  import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
42  import org.apache.hc.client5.http.protocol.HttpClientContext;
43  import org.apache.hc.core5.concurrent.FutureCallback;
44  import org.apache.hc.core5.http.ContentType;
45  import org.apache.hc.core5.http.HttpHost;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.Message;
48  import org.apache.hc.core5.http.Method;
49  import org.apache.hc.core5.http.URIScheme;
50  import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
51  import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
52  import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
53  import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
54  import org.hamcrest.CoreMatchers;
55  import org.junit.Assert;
56  import org.junit.Test;
57  
58  public abstract class AbstractHttpAsyncFundamentalsTest<T extends CloseableHttpAsyncClient> extends AbstractIntegrationTestBase<T> {
59  
60      public AbstractHttpAsyncFundamentalsTest(final URIScheme scheme) {
61          super(scheme);
62      }
63  
64      @Test
65      public void testSequenctialGetRequests() throws Exception {
66          final HttpHost target = start();
67          for (int i = 0; i < 3; i++) {
68              final Future<SimpleHttpResponse> future = httpclient.execute(
69                      SimpleHttpRequests.get(target, "/random/2048"), null);
70              final SimpleHttpResponse response = future.get();
71              Assert.assertThat(response, CoreMatchers.notNullValue());
72              Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
73              final String body = response.getBodyText();
74              Assert.assertThat(body, CoreMatchers.notNullValue());
75              Assert.assertThat(body.length(), CoreMatchers.equalTo(2048));
76          }
77      }
78  
79      @Test
80      public void testSequenctialHeadRequests() throws Exception {
81          final HttpHost target = start();
82          for (int i = 0; i < 3; i++) {
83              final Future<SimpleHttpResponse> future = httpclient.execute(
84                      SimpleHttpRequests.head(target, "/random/2048"), null);
85              final SimpleHttpResponse response = future.get();
86              Assert.assertThat(response, CoreMatchers.notNullValue());
87              Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
88              final String body = response.getBodyText();
89              Assert.assertThat(body, CoreMatchers.nullValue());
90          }
91      }
92  
93      @Test
94      public void testSequenctialPostRequests() throws Exception {
95          final HttpHost target = start();
96          for (int i = 0; i < 3; i++) {
97              final byte[] b1 = new byte[1024];
98              final Random rnd = new Random(System.currentTimeMillis());
99              rnd.nextBytes(b1);
100             final Future<Message<HttpResponse, byte[]>> future = httpclient.execute(
101                     new BasicRequestProducer(Method.GET, target, "/echo/",
102                             AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
103                     new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
104             final Message<HttpResponse, byte[]> responseMessage = future.get();
105             Assert.assertThat(responseMessage, CoreMatchers.notNullValue());
106             final HttpResponse response = responseMessage.getHead();
107             Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
108             final byte[] b2 = responseMessage.getBody();
109             Assert.assertThat(b1, CoreMatchers.equalTo(b2));
110         }
111     }
112 
113     @Test
114     public void testConcurrentPostRequests() throws Exception {
115         final HttpHost target = start();
116         final byte[] b1 = new byte[1024];
117         final Random rnd = new Random(System.currentTimeMillis());
118         rnd.nextBytes(b1);
119 
120         final int reqCount = 20;
121 
122         final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
123         for (int i = 0; i < reqCount; i++) {
124             final Future<Message<HttpResponse, byte[]>> future = httpclient.execute(
125                     new BasicRequestProducer(Method.POST, target, "/echo/",
126                             AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
127                     new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
128             queue.add(future);
129         }
130 
131         while (!queue.isEmpty()) {
132             final Future<Message<HttpResponse, byte[]>> future = queue.remove();
133             final Message<HttpResponse, byte[]> responseMessage = future.get();
134             Assert.assertThat(responseMessage, CoreMatchers.notNullValue());
135             final HttpResponse response = responseMessage.getHead();
136             Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
137             final byte[] b2 = responseMessage.getBody();
138             Assert.assertThat(b1, CoreMatchers.equalTo(b2));
139         }
140     }
141 
142     @Test
143     public void testRequestExecutionFromCallback() throws Exception {
144         final HttpHost target = start();
145         final int requestNum = 50;
146         final AtomicInteger count = new AtomicInteger(requestNum);
147         final Queue<SimpleHttpResponse> resultQueue = new ConcurrentLinkedQueue<>();
148         final CountDownLatch countDownLatch = new CountDownLatch(requestNum);
149 
150         final FutureCallback<SimpleHttpResponse> callback = new FutureCallback<SimpleHttpResponse>() {
151 
152             @Override
153             public void completed(final SimpleHttpResponse result) {
154                 try {
155                     resultQueue.add(result);
156                     if (count.decrementAndGet() > 0) {
157                         httpclient.execute(SimpleHttpRequests.get(target, "/random/2048"), this);
158                     }
159                 } finally {
160                     countDownLatch.countDown();
161                 }
162             }
163 
164             @Override
165             public void failed(final Exception ex) {
166                 countDownLatch.countDown();
167             }
168 
169             @Override
170             public void cancelled() {
171                 countDownLatch.countDown();
172             }
173         };
174 
175         final int threadNum = 5;
176         final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
177         for (int i = 0; i < threadNum; i++) {
178             executorService.execute(new Runnable() {
179 
180                 @Override
181                 public void run() {
182                     if (!Thread.currentThread().isInterrupted()) {
183                         httpclient.execute(SimpleHttpRequests.get(target, "/random/2048"), callback);
184                     }
185                 }
186 
187             });
188         }
189 
190         Assert.assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
191 
192         executorService.shutdownNow();
193         executorService.awaitTermination(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
194 
195         for (;;) {
196             final SimpleHttpResponse response = resultQueue.poll();
197             if (response == null) {
198                 break;
199             }
200             Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
201         }
202     }
203 
204     @Test
205     public void testBadRequest() throws Exception {
206         final HttpHost target = start();
207         final Future<SimpleHttpResponse> future = httpclient.execute(
208                 SimpleHttpRequests.get(target, "/random/boom"), null);
209         final SimpleHttpResponse response = future.get();
210         Assert.assertThat(response, CoreMatchers.notNullValue());
211         Assert.assertThat(response.getCode(), CoreMatchers.equalTo(400));
212     }
213 
214 }