1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.client.integration;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.concurrent.Future;
32
33 import org.apache.http.localserver.HttpAsyncTestBase;
34 import org.apache.http.HttpEntity;
35 import org.apache.http.HttpException;
36 import org.apache.http.HttpHost;
37 import org.apache.http.HttpRequest;
38 import org.apache.http.HttpResponse;
39 import org.apache.http.HttpStatus;
40 import org.apache.http.client.UserTokenHandler;
41 import org.apache.http.client.methods.HttpGet;
42 import org.apache.http.entity.ContentType;
43 import org.apache.http.impl.nio.conn.CPoolUtils;
44 import org.apache.http.nio.ContentDecoder;
45 import org.apache.http.nio.IOControl;
46 import org.apache.http.nio.NHttpClientConnection;
47 import org.apache.http.nio.client.HttpAsyncClient;
48 import org.apache.http.nio.entity.NStringEntity;
49 import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
50 import org.apache.http.nio.protocol.BasicAsyncRequestHandler;
51 import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
52 import org.apache.http.nio.reactor.IOEventDispatch;
53 import org.apache.http.pool.PoolEntry;
54 import org.apache.http.protocol.BasicHttpContext;
55 import org.apache.http.protocol.HttpContext;
56 import org.apache.http.protocol.HttpRequestHandler;
57 import org.junit.Assert;
58 import org.junit.Test;
59
60 public class TestStatefulConnManagement extends HttpAsyncTestBase {
61
62 static class SimpleService implements HttpRequestHandler {
63
64 public SimpleService() {
65 super();
66 }
67
68 @Override
69 public void handle(
70 final HttpRequest request,
71 final HttpResponse response,
72 final HttpContext context) throws HttpException, IOException {
73 response.setStatusCode(HttpStatus.SC_OK);
74 final NStringEntity entity = new NStringEntity("Whatever");
75 response.setEntity(entity);
76 }
77 }
78
79 @Test
80 public void testStatefulConnections() throws Exception {
81 this.serverBootstrap.registerHandler("*", new BasicAsyncRequestHandler(new SimpleService()));
82
83 final UserTokenHandler userTokenHandler = new UserTokenHandler() {
84
85 @Override
86 public Object getUserToken(final HttpContext context) {
87 return context.getAttribute("user");
88 }
89
90 };
91 this.clientBuilder.setUserTokenHandler(userTokenHandler);
92 final HttpHost target = start();
93
94 final int workerCount = 2;
95 final int requestCount = 5;
96
97 final HttpContext[] contexts = new HttpContext[workerCount];
98 final HttpWorker[] workers = new HttpWorker[workerCount];
99 for (int i = 0; i < contexts.length; i++) {
100 final HttpContext context = new BasicHttpContext();
101 final Object token = Integer.valueOf(i);
102 context.setAttribute("user", token);
103 contexts[i] = context;
104 workers[i] = new HttpWorker(context, requestCount, target, this.httpclient);
105 }
106
107 for (final HttpWorker worker : workers) {
108 worker.start();
109 }
110 for (final HttpWorker worker : workers) {
111 worker.join(10000);
112 }
113 for (final HttpWorker worker : workers) {
114 final Exception ex = worker.getException();
115 if (ex != null) {
116 throw ex;
117 }
118 Assert.assertEquals(requestCount, worker.getCount());
119 }
120
121 for (final HttpContext context : contexts) {
122 final Integer id = (Integer) context.getAttribute("user");
123
124 for (int r = 1; r < requestCount; r++) {
125 final Integer state = (Integer) context.getAttribute("r" + r);
126 Assert.assertEquals(id, state);
127 }
128 }
129
130 }
131
132 static class HttpWorker extends Thread {
133
134 private final HttpContext context;
135 private final int requestCount;
136 private final HttpHost target;
137 private final HttpAsyncClient httpclient;
138
139 private volatile Exception exception;
140 private volatile int count;
141
142 public HttpWorker(
143 final HttpContext context,
144 final int requestCount,
145 final HttpHost target,
146 final HttpAsyncClient httpclient) {
147 super();
148 this.context = context;
149 this.requestCount = requestCount;
150 this.target = target;
151 this.httpclient = httpclient;
152 this.count = 0;
153 }
154
155 public int getCount() {
156 return this.count;
157 }
158
159 public Exception getException() {
160 return this.exception;
161 }
162
163 @Override
164 public void run() {
165 try {
166 for (int r = 0; r < this.requestCount; r++) {
167 final HttpGet httpget = new HttpGet("/");
168 final Future<Object> future = this.httpclient.execute(
169 new BasicAsyncRequestProducer(this.target, httpget),
170 new AbstractAsyncResponseConsumer<Object>() {
171
172 @Override
173 protected void onResponseReceived(final HttpResponse response) {
174 }
175
176 @Override
177 protected void onEntityEnclosed(
178 final HttpEntity entity,
179 final ContentType contentType) throws IOException {
180 }
181
182 @Override
183 protected void onContentReceived(
184 final ContentDecoder decoder,
185 final IOControl ioctrl) throws IOException {
186 final ByteBuffer buf = ByteBuffer.allocate(2048);
187 decoder.read(buf);
188 }
189
190 @Override
191 protected Object buildResult(final HttpContext context) throws Exception {
192 final NHttpClientConnection conn = (NHttpClientConnection) context.getAttribute(
193 IOEventDispatch.CONNECTION_KEY);
194
195 final PoolEntry<?, ?> entry = CPoolUtils.getPoolEntry(conn);
196 return entry.getState();
197 }
198
199 @Override
200 protected void releaseResources() {
201 }
202
203 },
204 this.context,
205 null);
206 this.count++;
207 final Object state = future.get();
208 this.context.setAttribute("r" + r, state);
209 }
210
211 } catch (final Exception ex) {
212 this.exception = ex;
213 }
214 }
215
216 }
217
218 @Test
219 public void testRouteSpecificPoolRecylcing() throws Exception {
220 this.serverBootstrap.registerHandler("*", new BasicAsyncRequestHandler(new SimpleService()));
221
222
223
224 final UserTokenHandler userTokenHandler = new UserTokenHandler() {
225
226 @Override
227 public Object getUserToken(final HttpContext context) {
228 return context.getAttribute("user");
229 }
230
231 };
232 this.clientBuilder.setUserTokenHandler(userTokenHandler);
233
234 final HttpHost target = start();
235 final int maxConn = 2;
236
237 this.connMgr.setMaxTotal(maxConn);
238 this.connMgr.setDefaultMaxPerRoute(maxConn);
239
240
241 final HttpContext context1 = new BasicHttpContext();
242 context1.setAttribute("user", "stuff");
243
244 final Future<HttpResponse> future1 = this.httpclient.execute(
245 target, new HttpGet("/"), context1, null);
246 final HttpResponse response1 = future1.get();
247 Assert.assertNotNull(response1);
248 Assert.assertEquals(200, response1.getStatusLine().getStatusCode());
249
250
251
252
253
254 Thread.sleep(100);
255
256
257
258 final HttpContext context2 = new BasicHttpContext();
259
260 final Future<HttpResponse> future2 = this.httpclient.execute(
261 new HttpHost("127.0.0.1", target.getPort(), target.getSchemeName()),
262 new HttpGet("/"), context2, null);
263 final HttpResponse response2 = future2.get();
264 Assert.assertNotNull(response2);
265 Assert.assertEquals(200, response2.getStatusLine().getStatusCode());
266
267
268
269
270
271 Thread.sleep(100);
272
273
274
275
276
277
278 final HttpContext context3 = new BasicHttpContext();
279 final Future<HttpResponse> future3 = this.httpclient.execute(
280 target, new HttpGet("/"), context3, null);
281 final HttpResponse response3 = future3.get();
282 Assert.assertNotNull(response3);
283 Assert.assertEquals(200, response3.getStatusLine().getStatusCode());
284 }
285
286 }