1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.integration;
28
29 import java.io.ByteArrayInputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.net.InetSocketAddress;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35
36 import org.apache.http.Consts;
37 import org.apache.http.HttpConnection;
38 import org.apache.http.HttpException;
39 import org.apache.http.HttpHost;
40 import org.apache.http.HttpRequest;
41 import org.apache.http.HttpResponse;
42 import org.apache.http.concurrent.FutureCallback;
43 import org.apache.http.entity.ContentType;
44 import org.apache.http.entity.InputStreamEntity;
45 import org.apache.http.message.BasicHttpRequest;
46 import org.apache.http.nio.ContentEncoder;
47 import org.apache.http.nio.IOControl;
48 import org.apache.http.nio.entity.NStringEntity;
49 import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
50 import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
51 import org.apache.http.nio.protocol.HttpAsyncExchange;
52 import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
53 import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
54 import org.apache.http.nio.reactor.ListenerEndpoint;
55 import org.apache.http.nio.testserver.HttpCoreNIOTestBase;
56 import org.apache.http.protocol.BasicHttpContext;
57 import org.apache.http.protocol.HttpContext;
58 import org.apache.http.protocol.HttpCoreContext;
59 import org.junit.After;
60 import org.junit.Assert;
61 import org.junit.Before;
62 import org.junit.Test;
63
64 public class TestHttpAsyncPrematureTermination extends HttpCoreNIOTestBase {
65
66 @Before
67 public void setUp() throws Exception {
68 initServer();
69 initClient();
70 }
71
72 @After
73 public void tearDown() throws Exception {
74 shutDownClient();
75 shutDownServer();
76 }
77
78 private InetSocketAddress start() throws Exception {
79 this.server.start();
80 this.client.start();
81
82 final ListenerEndpoint endpoint = this.server.getListenerEndpoint();
83 endpoint.waitFor();
84
85 return (InetSocketAddress) endpoint.getAddress();
86 }
87
88 @Test
89 public void testConnectionTerminatedProcessingRequest() throws Exception {
90 this.server.registerHandler("*", new HttpAsyncRequestHandler<HttpRequest>() {
91
92 @Override
93 public HttpAsyncRequestConsumer<HttpRequest> processRequest(
94 final HttpRequest request,
95 final HttpContext context) throws HttpException, IOException {
96 final HttpConnection conn = (HttpConnection) context.getAttribute(
97 HttpCoreContext.HTTP_CONNECTION);
98 conn.shutdown();
99 return new BasicAsyncRequestConsumer();
100 }
101
102 @Override
103 public void handle(
104 final HttpRequest request,
105 final HttpAsyncExchange httpExchange,
106 final HttpContext context) throws HttpException, IOException {
107 final HttpResponse response = httpExchange.getResponse();
108 response.setEntity(new NStringEntity("all is well", ContentType.TEXT_PLAIN));
109 httpExchange.submitResponse();
110 }
111
112 });
113 final InetSocketAddress address = start();
114 final HttpHost target = new HttpHost("localhost", address.getPort());
115
116 final CountDownLatch latch = new CountDownLatch(1);
117
118 final FutureCallback<HttpResponse> callback = new FutureCallback<HttpResponse>() {
119
120 @Override
121 public void cancelled() {
122 latch.countDown();
123 }
124
125 @Override
126 public void failed(final Exception ex) {
127 latch.countDown();
128 }
129
130 @Override
131 public void completed(final HttpResponse response) {
132 Assert.fail();
133 }
134
135 };
136
137 final HttpRequest request = new BasicHttpRequest("GET", "/");
138 final HttpContext context = new BasicHttpContext();
139 this.client.execute(target, request, context, callback);
140
141 Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
142 }
143
144 @Test
145 public void testConnectionTerminatedHandlingRequest() throws Exception {
146 final CountDownLatch responseStreamClosed = new CountDownLatch(1);
147 final InputStream testInputStream = new ByteArrayInputStream(
148 "all is well".getBytes(Consts.ASCII)) {
149 @Override
150 public void close() throws IOException {
151 responseStreamClosed.countDown();
152 super.close();
153 }
154 };
155 this.server.registerHandler("*", new HttpAsyncRequestHandler<HttpRequest>() {
156
157 @Override
158 public HttpAsyncRequestConsumer<HttpRequest> processRequest(
159 final HttpRequest request,
160 final HttpContext context) throws HttpException, IOException {
161 return new BasicAsyncRequestConsumer();
162 }
163
164 @Override
165 public void handle(
166 final HttpRequest request,
167 final HttpAsyncExchange httpExchange,
168 final HttpContext context) throws HttpException, IOException {
169 final HttpConnection conn = (HttpConnection) context.getAttribute(
170 HttpCoreContext.HTTP_CONNECTION);
171 conn.shutdown();
172 final HttpResponse response = httpExchange.getResponse();
173 response.setEntity(new InputStreamEntity(testInputStream, -1));
174 httpExchange.submitResponse();
175 }
176
177 });
178 final InetSocketAddress address = start();
179 final HttpHost target = new HttpHost("localhost", address.getPort());
180
181 final CountDownLatch latch = new CountDownLatch(1);
182
183 final FutureCallback<HttpResponse> callback = new FutureCallback<HttpResponse>() {
184
185 @Override
186 public void cancelled() {
187 latch.countDown();
188 }
189
190 @Override
191 public void failed(final Exception ex) {
192 latch.countDown();
193 }
194
195 @Override
196 public void completed(final HttpResponse response) {
197 Assert.fail();
198 }
199
200 };
201
202 final HttpRequest request = new BasicHttpRequest("GET", "/");
203 final HttpContext context = new BasicHttpContext();
204 this.client.execute(target, request, context, callback);
205
206 Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
207 Assert.assertTrue(responseStreamClosed.await(5, TimeUnit.SECONDS));
208 }
209
210 @Test
211 public void testConnectionTerminatedSendingResponse() throws Exception {
212 this.server.registerHandler("*", new HttpAsyncRequestHandler<HttpRequest>() {
213
214 @Override
215 public HttpAsyncRequestConsumer<HttpRequest> processRequest(
216 final HttpRequest request,
217 final HttpContext context) throws HttpException, IOException {
218 return new BasicAsyncRequestConsumer();
219 }
220
221 @Override
222 public void handle(
223 final HttpRequest request,
224 final HttpAsyncExchange httpExchange,
225 final HttpContext context) throws HttpException, IOException {
226 final HttpResponse response = httpExchange.getResponse();
227 response.setEntity(new NStringEntity("all is well", ContentType.TEXT_PLAIN));
228 httpExchange.submitResponse(new BasicAsyncResponseProducer(response) {
229
230 @Override
231 public synchronized void produceContent(
232 final ContentEncoder encoder,
233 final IOControl ioControl) throws IOException {
234 ioControl.shutdown();
235 }
236
237 });
238 }
239
240 });
241 final InetSocketAddress address = start();
242 final HttpHost target = new HttpHost("localhost", address.getPort());
243
244 final CountDownLatch latch = new CountDownLatch(1);
245
246 final FutureCallback<HttpResponse> callback = new FutureCallback<HttpResponse>() {
247
248 @Override
249 public void cancelled() {
250 latch.countDown();
251 }
252
253 @Override
254 public void failed(final Exception ex) {
255 latch.countDown();
256 }
257
258 @Override
259 public void completed(final HttpResponse response) {
260 Assert.fail();
261 }
262
263 };
264
265 final HttpRequest request = new BasicHttpRequest("GET", "/");
266 final HttpContext context = new BasicHttpContext();
267 this.client.execute(target, request, context, callback);
268
269 Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
270 }
271
272 }