1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import java.util.LinkedList;
30 import java.util.Queue;
31 import java.util.Random;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
40 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
41 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
42 import org.apache.hc.client5.http.protocol.HttpClientContext;
43 import org.apache.hc.core5.concurrent.FutureCallback;
44 import org.apache.hc.core5.http.ContentType;
45 import org.apache.hc.core5.http.HttpHost;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.Message;
48 import org.apache.hc.core5.http.Method;
49 import org.apache.hc.core5.http.URIScheme;
50 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
51 import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
52 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
53 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
54 import org.hamcrest.CoreMatchers;
55 import org.junit.Assert;
56 import org.junit.Test;
57
58 public abstract class AbstractHttpAsyncFundamentalsTest<T extends CloseableHttpAsyncClient> extends AbstractIntegrationTestBase<T> {
59
60 public AbstractHttpAsyncFundamentalsTest(final URIScheme scheme) {
61 super(scheme);
62 }
63
64 @Test
65 public void testSequentialGetRequests() throws Exception {
66 final HttpHost target = start();
67 for (int i = 0; i < 3; i++) {
68 final Future<SimpleHttpResponse> future = httpclient.execute(
69 SimpleRequestBuilder.get()
70 .setHttpHost(target)
71 .setPath("/random/2048")
72 .build(), null);
73 final SimpleHttpResponse response = future.get();
74 Assert.assertThat(response, CoreMatchers.notNullValue());
75 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
76 final String body = response.getBodyText();
77 Assert.assertThat(body, CoreMatchers.notNullValue());
78 Assert.assertThat(body.length(), CoreMatchers.equalTo(2048));
79 }
80 }
81
82 @Test
83 public void testSequentialHeadRequests() throws Exception {
84 final HttpHost target = start();
85 for (int i = 0; i < 3; i++) {
86 final Future<SimpleHttpResponse> future = httpclient.execute(
87 SimpleRequestBuilder.head()
88 .setHttpHost(target)
89 .setPath("/random/2048")
90 .build(), null);
91 final SimpleHttpResponse response = future.get();
92 Assert.assertThat(response, CoreMatchers.notNullValue());
93 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
94 final String body = response.getBodyText();
95 Assert.assertThat(body, CoreMatchers.nullValue());
96 }
97 }
98
99 @Test
100 public void testSequentialPostRequests() throws Exception {
101 final HttpHost target = start();
102 for (int i = 0; i < 3; i++) {
103 final byte[] b1 = new byte[1024];
104 final Random rnd = new Random(System.currentTimeMillis());
105 rnd.nextBytes(b1);
106 final Future<Message<HttpResponse, byte[]>> future = httpclient.execute(
107 new BasicRequestProducer(Method.GET, target, "/echo/",
108 AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
109 new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
110 final Message<HttpResponse, byte[]> responseMessage = future.get();
111 Assert.assertThat(responseMessage, CoreMatchers.notNullValue());
112 final HttpResponse response = responseMessage.getHead();
113 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
114 final byte[] b2 = responseMessage.getBody();
115 Assert.assertThat(b1, CoreMatchers.equalTo(b2));
116 }
117 }
118
119 @Test
120 public void testConcurrentPostRequests() throws Exception {
121 final HttpHost target = start();
122 final byte[] b1 = new byte[1024];
123 final Random rnd = new Random(System.currentTimeMillis());
124 rnd.nextBytes(b1);
125
126 final int reqCount = 20;
127
128 final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
129 for (int i = 0; i < reqCount; i++) {
130 final Future<Message<HttpResponse, byte[]>> future = httpclient.execute(
131 new BasicRequestProducer(Method.POST, target, "/echo/",
132 AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
133 new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
134 queue.add(future);
135 }
136
137 while (!queue.isEmpty()) {
138 final Future<Message<HttpResponse, byte[]>> future = queue.remove();
139 final Message<HttpResponse, byte[]> responseMessage = future.get();
140 Assert.assertThat(responseMessage, CoreMatchers.notNullValue());
141 final HttpResponse response = responseMessage.getHead();
142 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
143 final byte[] b2 = responseMessage.getBody();
144 Assert.assertThat(b1, CoreMatchers.equalTo(b2));
145 }
146 }
147
148 @Test
149 public void testRequestExecutionFromCallback() throws Exception {
150 final HttpHost target = start();
151 final int requestNum = 50;
152 final AtomicInteger count = new AtomicInteger(requestNum);
153 final Queue<SimpleHttpResponse> resultQueue = new ConcurrentLinkedQueue<>();
154 final CountDownLatch countDownLatch = new CountDownLatch(requestNum);
155
156 final FutureCallback<SimpleHttpResponse> callback = new FutureCallback<SimpleHttpResponse>() {
157
158 @Override
159 public void completed(final SimpleHttpResponse result) {
160 try {
161 resultQueue.add(result);
162 if (count.decrementAndGet() > 0) {
163 httpclient.execute(
164 SimpleRequestBuilder.get()
165 .setHttpHost(target)
166 .setPath("/random/2048")
167 .build(), this);
168 }
169 } finally {
170 countDownLatch.countDown();
171 }
172 }
173
174 @Override
175 public void failed(final Exception ex) {
176 countDownLatch.countDown();
177 }
178
179 @Override
180 public void cancelled() {
181 countDownLatch.countDown();
182 }
183 };
184
185 final int threadNum = 5;
186 final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
187 for (int i = 0; i < threadNum; i++) {
188 executorService.execute(new Runnable() {
189
190 @Override
191 public void run() {
192 if (!Thread.currentThread().isInterrupted()) {
193 httpclient.execute(
194 SimpleRequestBuilder.get()
195 .setHttpHost(target)
196 .setPath("/random/2048")
197 .build(), callback);
198 }
199 }
200
201 });
202 }
203
204 Assert.assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
205
206 executorService.shutdownNow();
207 executorService.awaitTermination(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
208
209 for (;;) {
210 final SimpleHttpResponse response = resultQueue.poll();
211 if (response == null) {
212 break;
213 }
214 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
215 }
216 }
217
218 @Test
219 public void testBadRequest() throws Exception {
220 final HttpHost target = start();
221 final Future<SimpleHttpResponse> future = httpclient.execute(
222 SimpleRequestBuilder.get()
223 .setHttpHost(target)
224 .setPath("/random/boom")
225 .build(), null);
226 final SimpleHttpResponse response = future.get();
227 Assert.assertThat(response, CoreMatchers.notNullValue());
228 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(400));
229 }
230
231 }