1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.integration;
29
30 import java.io.BufferedWriter;
31 import java.io.IOException;
32 import java.io.OutputStream;
33 import java.io.OutputStreamWriter;
34 import java.net.InetSocketAddress;
35 import java.net.Socket;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38
39 import org.apache.http.HttpException;
40 import org.apache.http.HttpRequest;
41 import org.apache.http.HttpResponse;
42 import org.apache.http.HttpVersion;
43 import org.apache.http.concurrent.Cancellable;
44 import org.apache.http.entity.BasicHttpEntity;
45 import org.apache.http.entity.ContentType;
46 import org.apache.http.message.BasicHttpResponse;
47 import org.apache.http.nio.ContentEncoder;
48 import org.apache.http.nio.IOControl;
49 import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
50 import org.apache.http.nio.protocol.HttpAsyncExchange;
51 import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
52 import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
53 import org.apache.http.nio.protocol.HttpAsyncResponseProducer;
54 import org.apache.http.nio.reactor.ListenerEndpoint;
55 import org.apache.http.nio.testserver.HttpCoreNIOTestBase;
56 import org.apache.http.protocol.HttpContext;
57 import org.junit.After;
58 import org.junit.Assert;
59 import org.junit.Before;
60 import org.junit.Test;
61
62 public class TestHttpAsyncHandlerCancellable extends HttpCoreNIOTestBase {
63
64 @Before
65 public void setUp() throws Exception {
66 initServer();
67 }
68
69 @After
70 public void tearDown() throws Exception {
71 shutDownServer();
72 }
73
74 @Test
75 public void testResponsePrematureTermination() throws Exception {
76
77 final CountDownLatch latch = new CountDownLatch(1);
78 final HttpAsyncResponseProducer responseProducer = new HttpAsyncResponseProducer() {
79
80 @Override
81 public HttpResponse generateResponse() {
82 final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
83 final BasicHttpEntity entity = new BasicHttpEntity();
84 entity.setContentType(ContentType.DEFAULT_BINARY.toString());
85 entity.setChunked(true);
86 response.setEntity(entity);
87 return response;
88 }
89
90 @Override
91 public void close() throws IOException {
92 latch.countDown();
93 }
94
95 @Override
96 public void responseCompleted(final HttpContext context) {
97 }
98
99 @Override
100 public void produceContent(
101 final ContentEncoder encoder, final IOControl ioControl) throws IOException {
102
103 ioControl.suspendOutput();
104 }
105
106 @Override
107 public void failed(final Exception ex) {
108 }
109
110 };
111
112 this.server.registerHandler("*", new HttpAsyncRequestHandler<HttpRequest>() {
113
114 @Override
115 public HttpAsyncRequestConsumer<HttpRequest> processRequest(
116 final HttpRequest request,
117 final HttpContext context) throws HttpException, IOException {
118 return new BasicAsyncRequestConsumer();
119 }
120
121 @Override
122 public void handle(
123 final HttpRequest data,
124 final HttpAsyncExchange httpExchange,
125 final HttpContext context)
126 throws HttpException, IOException {
127 httpExchange.submitResponse(responseProducer);
128 }
129
130 });
131 this.server.start();
132
133 final ListenerEndpoint endpoint = this.server.getListenerEndpoint();
134 endpoint.waitFor();
135
136 final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
137 final Socket socket = new Socket("localhost", address.getPort());
138 try {
139 final OutputStream outStream = socket.getOutputStream();
140 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outStream, "US-ASCII"));
141 writer.write("GET /long HTTP/1.1\r\n");
142 writer.write("Host: localhost\r\n");
143 writer.write("\r\n");
144 writer.flush();
145
146 Thread.sleep(250);
147
148 writer.close();
149 } finally {
150 socket.close();
151 }
152
153 Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
154 }
155
156 @Test
157 public void testRequestCancelled() throws Exception {
158
159 final CountDownLatch latch = new CountDownLatch(1);
160 final Cancellable cancellable = new Cancellable() {
161
162 @Override
163 public boolean cancel() {
164 latch.countDown();
165 return true;
166 }
167 };
168
169 this.server.registerHandler("*", new HttpAsyncRequestHandler<HttpRequest>() {
170
171 @Override
172 public HttpAsyncRequestConsumer<HttpRequest> processRequest(
173 final HttpRequest request,
174 final HttpContext context) throws HttpException, IOException {
175 return new BasicAsyncRequestConsumer();
176 }
177
178 @Override
179 public void handle(
180 final HttpRequest data,
181 final HttpAsyncExchange httpExchange,
182 final HttpContext context)
183 throws HttpException, IOException {
184 httpExchange.setCallback(cancellable);
185
186 }
187
188 });
189 this.server.start();
190
191 final ListenerEndpoint endpoint = this.server.getListenerEndpoint();
192 endpoint.waitFor();
193
194 final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
195 final Socket socket = new Socket("localhost", address.getPort());
196 try {
197 final OutputStream outStream = socket.getOutputStream();
198 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outStream, "US-ASCII"));
199 writer.write("GET /long HTTP/1.1\r\n");
200 writer.write("Host: localhost\r\n");
201 writer.write("\r\n");
202 writer.flush();
203
204 Thread.sleep(250);
205
206 writer.close();
207 } finally {
208 socket.close();
209 }
210
211 Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
212 }
213
214 }