1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.client;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.Queue;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.http.ConnectionClosedException;
38 import org.apache.http.ConnectionReuseStrategy;
39 import org.apache.http.HttpException;
40 import org.apache.http.HttpHost;
41 import org.apache.http.HttpRequest;
42 import org.apache.http.HttpResponse;
43 import org.apache.http.client.config.RequestConfig;
44 import org.apache.http.client.methods.HttpRequestWrapper;
45 import org.apache.http.client.protocol.HttpClientContext;
46 import org.apache.http.concurrent.BasicFuture;
47 import org.apache.http.conn.ConnectionKeepAliveStrategy;
48 import org.apache.http.conn.routing.HttpRoute;
49 import org.apache.http.nio.ContentDecoder;
50 import org.apache.http.nio.ContentEncoder;
51 import org.apache.http.nio.IOControl;
52 import org.apache.http.nio.NHttpClientConnection;
53 import org.apache.http.nio.conn.NHttpClientConnectionManager;
54 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
55 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
56 import org.apache.http.nio.protocol.Pipelined;
57 import org.apache.http.protocol.HttpCoreContext;
58 import org.apache.http.protocol.HttpProcessor;
59 import org.apache.http.util.Args;
60 import org.apache.http.util.Asserts;
61
62
63
64
65
66
67
68
69 @Pipelined
70 class PipeliningClientExchangeHandlerImpl<T> extends AbstractClientExchangeHandler {
71
72 private final HttpHost target;
73 private final Queue<HttpAsyncRequestProducer> requestProducerQueue;
74 private final Queue<HttpAsyncResponseConsumer<T>> responseConsumerQueue;
75 private final Queue<HttpRequest> requestQueue;
76 private final Queue<T> resultQueue;
77 private final HttpClientContext localContext;
78 private final BasicFuture<List<T>> resultFuture;
79 private final HttpProcessor httpProcessor;
80 private final AtomicReference<HttpAsyncRequestProducer> requestProducerRef;
81 private final AtomicReference<HttpAsyncResponseConsumer<T>> responseConsumerRef;
82
83 public PipeliningClientExchangeHandlerImpl(
84 final Log log,
85 final HttpHost target,
86 final List<? extends HttpAsyncRequestProducer> requestProducers,
87 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
88 final HttpClientContext localContext,
89 final BasicFuture<List<T>> resultFuture,
90 final NHttpClientConnectionManager connmgr,
91 final HttpProcessor httpProcessor,
92 final ConnectionReuseStrategy connReuseStrategy,
93 final ConnectionKeepAliveStrategy keepaliveStrategy) {
94 super(log, localContext, connmgr, connReuseStrategy, keepaliveStrategy);
95 Args.notNull(target, "HTTP target");
96 Args.notEmpty(requestProducers, "Request producer list");
97 Args.notEmpty(responseConsumers, "Response consumer list");
98 Args.check(requestProducers.size() == responseConsumers.size(),
99 "Number of request producers does not match that of response consumers");
100 this.target = target;
101 this.requestProducerQueue = new ConcurrentLinkedQueue<HttpAsyncRequestProducer>(requestProducers);
102 this.responseConsumerQueue = new ConcurrentLinkedQueue<HttpAsyncResponseConsumer<T>>(responseConsumers);
103 this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
104 this.resultQueue = new ConcurrentLinkedQueue<T>();
105 this.localContext = localContext;
106 this.resultFuture = resultFuture;
107 this.httpProcessor = httpProcessor;
108 this.requestProducerRef = new AtomicReference<HttpAsyncRequestProducer>(null);
109 this.responseConsumerRef = new AtomicReference<HttpAsyncResponseConsumer<T>>(null);
110 }
111
112 private void closeProducer(final HttpAsyncRequestProducer requestProducer) {
113 if (requestProducer != null) {
114 try {
115 requestProducer.close();
116 } catch (final IOException ex) {
117 this.log.debug("I/O error closing request producer", ex);
118 }
119 }
120 }
121
122 private void closeConsumer(final HttpAsyncResponseConsumer<?> responseConsumer) {
123 if (responseConsumer != null) {
124 try {
125 responseConsumer.close();
126 } catch (final IOException ex) {
127 this.log.debug("I/O error closing response consumer", ex);
128 }
129 }
130 }
131
132 @Override
133 void releaseResources() {
134 closeProducer(this.requestProducerRef.getAndSet(null));
135 closeConsumer(this.responseConsumerRef.getAndSet(null));
136 while (!this.requestProducerQueue.isEmpty()) {
137 closeProducer(this.requestProducerQueue.remove());
138 }
139 while (!this.responseConsumerQueue.isEmpty()) {
140 closeConsumer(this.responseConsumerQueue.remove());
141 }
142 this.requestQueue.clear();
143 this.resultQueue.clear();
144 }
145
146 @Override
147 void executionFailed(final Exception ex) {
148 try {
149 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
150 if (requestProducer != null) {
151 requestProducer.failed(ex);
152 }
153 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
154 if (responseConsumer != null) {
155 responseConsumer.failed(ex);
156 }
157 for (final HttpAsyncResponseConsumer<T> cancellable: this.responseConsumerQueue) {
158 cancellable.cancel();
159 }
160 } finally {
161 this.resultFuture.failed(ex);
162 }
163 }
164
165 @Override
166 boolean executionCancelled() {
167 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
168 final boolean cancelled = responseConsumer != null && responseConsumer.cancel();
169 this.resultFuture.cancel();
170 return cancelled;
171 }
172
173 @Override
174 public void start() throws HttpException, IOException {
175 if (this.log.isDebugEnabled()) {
176 this.log.debug("[exchange: " + getId() + "] start execution");
177 }
178
179 final HttpRoute route = new HttpRoute(this.target);
180 setRoute(route);
181
182 this.localContext.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, this.target);
183 this.localContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
184
185 requestConnection();
186 }
187
188 @Override
189 public HttpRequest generateRequest() throws IOException, HttpException {
190 verifytRoute();
191 if (!isRouteEstablished()) {
192 onRouteToTarget();
193 onRouteComplete();
194 }
195 final NHttpClientConnection localConn = getConnection();
196 this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, localConn);
197
198 Asserts.check(this.requestProducerRef.get() == null, "Inconsistent state: currentRequest producer is not null");
199 final HttpAsyncRequestProducer requestProducer = this.requestProducerQueue.poll();
200 if (requestProducer == null) {
201 return null;
202 }
203 this.requestProducerRef.set(requestProducer);
204
205 final HttpRequest original = requestProducer.generateRequest();
206 final HttpRequestWrapper currentRequest = HttpRequestWrapper.wrap(original);
207 final RequestConfig config = this.localContext.getRequestConfig();
208 if (config.getSocketTimeout() > 0) {
209 localConn.setSocketTimeout(config.getSocketTimeout());
210 }
211
212 this.httpProcessor.process(currentRequest, this.localContext);
213
214 this.requestQueue.add(currentRequest);
215 setCurrentRequest(currentRequest);
216
217 return currentRequest;
218 }
219
220 @Override
221 public void produceContent(
222 final ContentEncoder encoder, final IOControl ioControl) throws IOException {
223 if (this.log.isDebugEnabled()) {
224 this.log.debug("[exchange: " + getId() + "] produce content");
225 }
226 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
227 Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
228 requestProducer.produceContent(encoder, ioControl);
229 if (encoder.isCompleted()) {
230 requestProducer.resetRequest();
231 }
232 }
233
234 @Override
235 public void requestCompleted() {
236 if (this.log.isDebugEnabled()) {
237 this.log.debug("[exchange: " + getId() + "] Request completed");
238 }
239 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.getAndSet(null);
240 Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
241 requestProducer.requestCompleted(this.localContext);
242 try {
243 requestProducer.close();
244 } catch (final IOException ioex) {
245 this.log.debug(ioex.getMessage(), ioex);
246 }
247 }
248
249 @Override
250 public void responseReceived(
251 final HttpResponse response) throws IOException, HttpException {
252 if (this.log.isDebugEnabled()) {
253 this.log.debug("[exchange: " + getId() + "] Response received " + response.getStatusLine());
254 }
255
256 Asserts.check(this.responseConsumerRef.get() == null, "Inconsistent state: response consumer is not null");
257
258 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerQueue.poll();
259 Asserts.check(responseConsumer != null, "Inconsistent state: response consumer queue is empty");
260 this.responseConsumerRef.set(responseConsumer);
261
262 final HttpRequest request = this.requestQueue.poll();
263 Asserts.check(request != null, "Inconsistent state: request queue is empty");
264
265 this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
266 this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
267 this.httpProcessor.process(response, this.localContext);
268
269 responseConsumer.responseReceived(response);
270
271 setCurrentResponse(response);
272 }
273
274 @Override
275 public void consumeContent(
276 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
277 if (this.log.isDebugEnabled()) {
278 this.log.debug("[exchange: " + getId() + "] Consume content");
279 }
280 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
281 Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
282 responseConsumer.consumeContent(decoder, ioControl);
283 }
284
285 @Override
286 public void responseCompleted() throws IOException, HttpException {
287 if (this.log.isDebugEnabled()) {
288 this.log.debug("[exchange: " + getId() + "] Response processed");
289 }
290
291 final boolean keepAlive = manageConnectionPersistence();
292
293 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.getAndSet(null);
294 Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
295 try {
296 responseConsumer.responseCompleted(this.localContext);
297 final T result = responseConsumer.getResult();
298 final Exception ex = responseConsumer.getException();
299 try {
300 responseConsumer.close();
301 } catch (final IOException ioex) {
302 this.log.debug(ioex.getMessage(), ioex);
303 }
304 if (result != null) {
305 this.resultQueue.add(result);
306 } else {
307 failed(ex);
308 }
309 if (!this.resultFuture.isDone() && this.responseConsumerQueue.isEmpty()) {
310 this.resultFuture.completed(new ArrayList<T>(this.resultQueue));
311 this.resultQueue.clear();
312 }
313
314 if (this.resultFuture.isDone()) {
315 close();
316 } else {
317 if (!keepAlive) {
318 failed(new ConnectionClosedException("Connection closed"));
319 } else {
320 final NHttpClientConnection localConn = getConnection();
321 if (localConn != null) {
322 localConn.requestOutput();
323 } else {
324 requestConnection();
325 }
326 }
327 }
328 } catch (final RuntimeException ex) {
329 failed(ex);
330 throw ex;
331 }
332 }
333
334 @Override
335 public void inputTerminated() {
336 failed(new ConnectionClosedException("Connection closed"));
337 }
338
339 public void abortConnection() {
340 discardConnection();
341 }
342
343 }