1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.Closeable;
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Queue;
35 import java.util.concurrent.ConcurrentLinkedQueue;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import org.apache.http.ConnectionClosedException;
41 import org.apache.http.ConnectionReuseStrategy;
42 import org.apache.http.HttpException;
43 import org.apache.http.HttpRequest;
44 import org.apache.http.HttpResponse;
45 import org.apache.http.concurrent.BasicFuture;
46 import org.apache.http.concurrent.FutureCallback;
47 import org.apache.http.impl.DefaultConnectionReuseStrategy;
48 import org.apache.http.nio.ContentDecoder;
49 import org.apache.http.nio.ContentEncoder;
50 import org.apache.http.nio.IOControl;
51 import org.apache.http.nio.NHttpClientConnection;
52 import org.apache.http.protocol.HttpContext;
53 import org.apache.http.protocol.HttpCoreContext;
54 import org.apache.http.protocol.HttpProcessor;
55 import org.apache.http.util.Args;
56 import org.apache.http.util.Asserts;
57
58
59
60
61
62
63
64
65 @Pipelined()
66 public class PipeliningClientExchangeHandler<T> implements HttpAsyncClientExchangeHandler {
67
68 private final Queue<HttpAsyncRequestProducer> requestProducerQueue;
69 private final Queue<HttpAsyncResponseConsumer<T>> responseConsumerQueue;
70 private final Queue<HttpRequest> requestQueue;
71 private final Queue<T> resultQueue;
72 private final BasicFuture<List<T>> future;
73 private final HttpContext localContext;
74 private final NHttpClientConnection conn;
75 private final HttpProcessor httpPocessor;
76 private final ConnectionReuseStrategy connReuseStrategy;
77
78 private final AtomicReference<HttpAsyncRequestProducer> requestProducerRef;
79 private final AtomicReference<HttpAsyncResponseConsumer<T>> responseConsumerRef;
80 private final AtomicBoolean keepAlive;
81 private final AtomicBoolean closed;
82
83
84
85
86
87
88
89
90
91
92
93
94 public PipeliningClientExchangeHandler(
95 final List<? extends HttpAsyncRequestProducer> requestProducers,
96 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
97 final FutureCallback<List<T>> callback,
98 final HttpContext localContext,
99 final NHttpClientConnection conn,
100 final HttpProcessor httpPocessor,
101 final ConnectionReuseStrategy connReuseStrategy) {
102 super();
103 Args.notEmpty(requestProducers, "Request producer list");
104 Args.notEmpty(responseConsumers, "Response consumer list");
105 Args.check(requestProducers.size() == responseConsumers.size(),
106 "Number of request producers does not match that of response consumers");
107 this.requestProducerQueue = new ConcurrentLinkedQueue<HttpAsyncRequestProducer>(requestProducers);
108 this.responseConsumerQueue = new ConcurrentLinkedQueue<HttpAsyncResponseConsumer<T>>(responseConsumers);
109 this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
110 this.resultQueue = new ConcurrentLinkedQueue<T>();
111 this.future = new BasicFuture<List<T>>(callback);
112 this.localContext = Args.notNull(localContext, "HTTP context");
113 this.conn = Args.notNull(conn, "HTTP connection");
114 this.httpPocessor = Args.notNull(httpPocessor, "HTTP processor");
115 this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
116 DefaultConnectionReuseStrategy.INSTANCE;
117 this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, this.conn);
118 this.requestProducerRef = new AtomicReference<HttpAsyncRequestProducer>(null);
119 this.responseConsumerRef = new AtomicReference<HttpAsyncResponseConsumer<T>>(null);
120 this.keepAlive = new AtomicBoolean(false);
121 this.closed = new AtomicBoolean(false);
122 }
123
124
125
126
127
128
129
130
131
132
133 public PipeliningClientExchangeHandler(
134 final List<? extends HttpAsyncRequestProducer> requestProducers,
135 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
136 final HttpContext localContext,
137 final NHttpClientConnection conn,
138 final HttpProcessor httpPocessor) {
139 this(requestProducers, responseConsumers, null, localContext, conn, httpPocessor, null);
140 }
141
142 public Future<List<T>> getFuture() {
143 return this.future;
144 }
145
146 private static void closeQuietly(final Closeable closeable) {
147 if (closeable != null) {
148 try {
149 closeable.close();
150 } catch (final IOException ex) {
151 }
152 }
153 }
154
155 private void releaseResources() {
156 closeQuietly(this.requestProducerRef.getAndSet(null));
157 closeQuietly(this.responseConsumerRef.getAndSet(null));
158 while (!this.requestProducerQueue.isEmpty()) {
159 closeQuietly(this.requestProducerQueue.remove());
160 }
161 while (!this.responseConsumerQueue.isEmpty()) {
162 closeQuietly(this.responseConsumerQueue.remove());
163 }
164 this.requestQueue.clear();
165 this.resultQueue.clear();
166 }
167
168 @Override
169 public void close() throws IOException {
170 if (this.closed.compareAndSet(false, true)) {
171 releaseResources();
172 if (!this.future.isDone()) {
173 this.future.cancel();
174 }
175 }
176 }
177
178 @Override
179 public HttpRequest generateRequest() throws IOException, HttpException {
180 Asserts.check(this.requestProducerRef.get() == null, "Inconsistent state: request producer is not null");
181 final HttpAsyncRequestProducer requestProducer = this.requestProducerQueue.poll();
182 if (requestProducer == null) {
183 return null;
184 }
185 this.requestProducerRef.set(requestProducer);
186 final HttpRequest request = requestProducer.generateRequest();
187 this.httpPocessor.process(request, this.localContext);
188 this.requestQueue.add(request);
189 return request;
190 }
191
192 @Override
193 public void produceContent(
194 final ContentEncoder encoder, final IOControl ioControl) throws IOException {
195 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
196 Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
197 requestProducer.produceContent(encoder, ioControl);
198 }
199
200 @Override
201 public void requestCompleted() {
202 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.getAndSet(null);
203 Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
204 requestProducer.requestCompleted(this.localContext);
205 }
206
207 @Override
208 public void responseReceived(final HttpResponse response) throws IOException, HttpException {
209 Asserts.check(this.responseConsumerRef.get() == null, "Inconsistent state: response consumer is not null");
210
211 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerQueue.poll();
212 Asserts.check(responseConsumer != null, "Inconsistent state: response consumer queue is empty");
213 this.responseConsumerRef.set(responseConsumer);
214
215 final HttpRequest request = this.requestQueue.poll();
216 Asserts.check(request != null, "Inconsistent state: request queue is empty");
217
218 this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
219 this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
220 this.httpPocessor.process(response, this.localContext);
221
222 responseConsumer.responseReceived(response);
223 this.keepAlive.set(this.connReuseStrategy.keepAlive(response, this.localContext));
224 }
225
226 @Override
227 public void consumeContent(
228 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
229 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
230 Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
231 responseConsumer.consumeContent(decoder, ioControl);
232 }
233
234 @Override
235 public void responseCompleted() throws IOException {
236 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.getAndSet(null);
237 Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
238 try {
239 if (!this.keepAlive.get()) {
240 this.conn.close();
241 }
242 responseConsumer.responseCompleted(this.localContext);
243 final T result = responseConsumer.getResult();
244 final Exception ex = responseConsumer.getException();
245 if (result != null) {
246 this.resultQueue.add(result);
247 } else {
248 this.future.failed(ex);
249 this.conn.shutdown();
250 }
251 if (!conn.isOpen()) {
252 if (this.closed.compareAndSet(false, true)) {
253 releaseResources();
254 }
255 }
256 if (!this.future.isDone() && this.responseConsumerQueue.isEmpty()) {
257 this.future.completed(new ArrayList<T>(this.resultQueue));
258 this.resultQueue.clear();
259 }
260 } catch (final RuntimeException ex) {
261 failed(ex);
262 throw ex;
263 }
264 }
265
266 @Override
267 public void inputTerminated() {
268 failed(new ConnectionClosedException());
269 }
270
271 @Override
272 public void failed(final Exception ex) {
273 if (this.closed.compareAndSet(false, true)) {
274 try {
275 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
276 if (requestProducer != null) {
277 requestProducer.failed(ex);
278 }
279 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
280 if (responseConsumer != null) {
281 responseConsumer.failed(ex);
282 }
283 } finally {
284 try {
285 this.future.failed(ex);
286 } finally {
287 releaseResources();
288 }
289 }
290 }
291 }
292
293 @Override
294 public boolean cancel() {
295 if (this.closed.compareAndSet(false, true)) {
296 try {
297 try {
298 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
299 return responseConsumer != null && responseConsumer.cancel();
300 } finally {
301 this.future.cancel();
302 }
303 } finally {
304 releaseResources();
305 }
306 }
307 return false;
308 }
309
310 @Override
311 public boolean isDone() {
312 return this.future.isDone();
313 }
314
315 }