1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.client;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.Queue;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.http.ConnectionClosedException;
38 import org.apache.http.ConnectionReuseStrategy;
39 import org.apache.http.HttpException;
40 import org.apache.http.HttpHost;
41 import org.apache.http.HttpRequest;
42 import org.apache.http.HttpResponse;
43 import org.apache.http.client.config.RequestConfig;
44 import org.apache.http.client.methods.HttpRequestWrapper;
45 import org.apache.http.client.protocol.HttpClientContext;
46 import org.apache.http.concurrent.BasicFuture;
47 import org.apache.http.conn.ConnectionKeepAliveStrategy;
48 import org.apache.http.conn.routing.HttpRoute;
49 import org.apache.http.nio.ContentDecoder;
50 import org.apache.http.nio.ContentEncoder;
51 import org.apache.http.nio.IOControl;
52 import org.apache.http.nio.NHttpClientConnection;
53 import org.apache.http.nio.conn.NHttpClientConnectionManager;
54 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
55 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
56 import org.apache.http.nio.protocol.Pipelined;
57 import org.apache.http.protocol.HttpCoreContext;
58 import org.apache.http.protocol.HttpProcessor;
59 import org.apache.http.util.Args;
60 import org.apache.http.util.Asserts;
61
62
63
64
65
66
67
68
69 @Pipelined
70 class PipeliningClientExchangeHandlerImpl<T> extends AbstractClientExchangeHandler {
71
72 private final HttpHost target;
73 private final Queue<HttpAsyncRequestProducer> requestProducerQueue;
74 private final Queue<HttpAsyncResponseConsumer<T>> responseConsumerQueue;
75 private final Queue<HttpRequest> requestQueue;
76 private final Queue<T> resultQueue;
77 private final HttpClientContext localContext;
78 private final BasicFuture<List<T>> resultFuture;
79 private final HttpProcessor httpProcessor;
80 private final AtomicReference<HttpAsyncRequestProducer> requestProducerRef;
81 private final AtomicReference<HttpAsyncResponseConsumer<T>> responseConsumerRef;
82
83 public PipeliningClientExchangeHandlerImpl(
84 final Log log,
85 final HttpHost target,
86 final List<? extends HttpAsyncRequestProducer> requestProducers,
87 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
88 final HttpClientContext localContext,
89 final BasicFuture<List<T>> resultFuture,
90 final NHttpClientConnectionManager connmgr,
91 final HttpProcessor httpProcessor,
92 final ConnectionReuseStrategy connReuseStrategy,
93 final ConnectionKeepAliveStrategy keepaliveStrategy) {
94 super(log, localContext, connmgr, connReuseStrategy, keepaliveStrategy);
95 Args.notNull(target, "HTTP target");
96 Args.notEmpty(requestProducers, "Request producer list");
97 Args.notEmpty(responseConsumers, "Response consumer list");
98 Args.check(requestProducers.size() == responseConsumers.size(),
99 "Number of request producers does not match that of response consumers");
100 this.target = target;
101 this.requestProducerQueue = new ConcurrentLinkedQueue<HttpAsyncRequestProducer>(requestProducers);
102 this.responseConsumerQueue = new ConcurrentLinkedQueue<HttpAsyncResponseConsumer<T>>(responseConsumers);
103 this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
104 this.resultQueue = new ConcurrentLinkedQueue<T>();
105 this.localContext = localContext;
106 this.resultFuture = resultFuture;
107 this.httpProcessor = httpProcessor;
108 this.requestProducerRef = new AtomicReference<HttpAsyncRequestProducer>(null);
109 this.responseConsumerRef = new AtomicReference<HttpAsyncResponseConsumer<T>>(null);
110 }
111
112 private void closeProducer(final HttpAsyncRequestProducer requestProducer) {
113 if (requestProducer != null) {
114 try {
115 requestProducer.close();
116 } catch (final IOException ex) {
117 this.log.debug("I/O error closing request producer", ex);
118 }
119 }
120 }
121
122 private void closeConsumer(final HttpAsyncResponseConsumer<?> responseConsumer) {
123 if (responseConsumer != null) {
124 try {
125 responseConsumer.close();
126 } catch (final IOException ex) {
127 this.log.debug("I/O error closing response consumer", ex);
128 }
129 }
130 }
131
132 @Override
133 void releaseResources() {
134 closeProducer(this.requestProducerRef.getAndSet(null));
135 closeConsumer(this.responseConsumerRef.getAndSet(null));
136 while (!this.requestProducerQueue.isEmpty()) {
137 closeProducer(this.requestProducerQueue.remove());
138 }
139 while (!this.responseConsumerQueue.isEmpty()) {
140 closeConsumer(this.responseConsumerQueue.remove());
141 }
142 this.requestQueue.clear();
143 this.resultQueue.clear();
144 }
145
146 @Override
147 void executionFailed(final Exception ex) {
148 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
149 if (requestProducer != null) {
150 requestProducer.failed(ex);
151 }
152 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
153 if (responseConsumer != null) {
154 responseConsumer.failed(ex);
155 }
156 for (final HttpAsyncResponseConsumer<T> cancellable: this.responseConsumerQueue) {
157 cancellable.cancel();
158 }
159 }
160
161 @Override
162 boolean executionCancelled() {
163 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
164 final boolean cancelled = responseConsumer != null && responseConsumer.cancel();
165 this.resultFuture.cancel();
166 return cancelled;
167 }
168
169 public void start() throws HttpException, IOException {
170 if (this.log.isDebugEnabled()) {
171 this.log.debug("[exchange: " + getId() + "] start execution");
172 }
173
174 final HttpRoute route = new HttpRoute(this.target);
175 setRoute(route);
176
177 this.localContext.setAttribute(HttpClientContext.HTTP_TARGET_HOST, this.target);
178 this.localContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
179
180 requestConnection();
181 }
182
183 @Override
184 public HttpRequest generateRequest() throws IOException, HttpException {
185 verifytRoute();
186 if (!isRouteEstablished()) {
187 onRouteToTarget();
188 onRouteComplete();
189 }
190 final NHttpClientConnection localConn = getConnection();
191 this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, localConn);
192
193 Asserts.check(this.requestProducerRef.get() == null, "Inconsistent state: currentRequest producer is not null");
194 final HttpAsyncRequestProducer requestProducer = this.requestProducerQueue.poll();
195 if (requestProducer == null) {
196 return null;
197 }
198 this.requestProducerRef.set(requestProducer);
199
200 final HttpRequest original = requestProducer.generateRequest();
201 final HttpRequestWrapper currentRequest = HttpRequestWrapper.wrap(original);
202 final RequestConfig config = this.localContext.getRequestConfig();
203 if (config.getSocketTimeout() > 0) {
204 localConn.setSocketTimeout(config.getSocketTimeout());
205 }
206
207 this.httpProcessor.process(currentRequest, this.localContext);
208
209 this.requestQueue.add(currentRequest);
210 setCurrentRequest(currentRequest);
211
212 return currentRequest;
213 }
214
215 @Override
216 public void produceContent(
217 final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
218 if (this.log.isDebugEnabled()) {
219 this.log.debug("[exchange: " + getId() + "] produce content");
220 }
221 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
222 Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
223 requestProducer.produceContent(encoder, ioctrl);
224 if (encoder.isCompleted()) {
225 requestProducer.resetRequest();
226 }
227 }
228
229 @Override
230 public void requestCompleted() {
231 if (this.log.isDebugEnabled()) {
232 this.log.debug("[exchange: " + getId() + "] Request completed");
233 }
234 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.getAndSet(null);
235 Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
236 requestProducer.requestCompleted(this.localContext);
237 try {
238 requestProducer.close();
239 } catch (final IOException ioex) {
240 this.log.debug(ioex.getMessage(), ioex);
241 }
242 }
243
244 @Override
245 public void responseReceived(
246 final HttpResponse response) throws IOException, HttpException {
247 if (this.log.isDebugEnabled()) {
248 this.log.debug("[exchange: " + getId() + "] Response received " + response.getStatusLine());
249 }
250
251 Asserts.check(this.responseConsumerRef.get() == null, "Inconsistent state: response consumer is not null");
252
253 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerQueue.poll();
254 Asserts.check(responseConsumer != null, "Inconsistent state: response consumer queue is empty");
255 this.responseConsumerRef.set(responseConsumer);
256
257 final HttpRequest request = this.requestQueue.poll();
258 Asserts.check(request != null, "Inconsistent state: request queue is empty");
259
260 this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
261 this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
262 this.httpProcessor.process(response, this.localContext);
263
264 responseConsumer.responseReceived(response);
265
266 setCurrentResponse(response);
267 }
268
269 @Override
270 public void consumeContent(
271 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
272 if (this.log.isDebugEnabled()) {
273 this.log.debug("[exchange: " + getId() + "] Consume content");
274 }
275 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
276 Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
277 responseConsumer.consumeContent(decoder, ioctrl);
278 }
279
280 @Override
281 public void responseCompleted() throws IOException, HttpException {
282 if (this.log.isDebugEnabled()) {
283 this.log.debug("[exchange: " + getId() + "] Response processed");
284 }
285
286 final boolean keepAlive = manageConnectionPersistence();
287
288 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.getAndSet(null);
289 Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
290 try {
291 responseConsumer.responseCompleted(this.localContext);
292 final T result = responseConsumer.getResult();
293 final Exception ex = responseConsumer.getException();
294 try {
295 responseConsumer.close();
296 } catch (final IOException ioex) {
297 this.log.debug(ioex.getMessage(), ioex);
298 }
299 if (result != null) {
300 this.resultQueue.add(result);
301 } else {
302 failed(ex);
303 }
304 if (!this.resultFuture.isDone() && this.responseConsumerQueue.isEmpty()) {
305 this.resultFuture.completed(new ArrayList<T>(this.resultQueue));
306 this.resultQueue.clear();
307 }
308
309 if (this.resultFuture.isDone()) {
310 close();
311 } else {
312 if (!keepAlive) {
313 failed(new ConnectionClosedException("Connection closed"));
314 } else {
315 final NHttpClientConnection localConn = getConnection();
316 if (localConn != null) {
317 localConn.requestOutput();
318 } else {
319 requestConnection();
320 }
321 }
322 }
323 } catch (final RuntimeException ex) {
324 failed(ex);
325 throw ex;
326 }
327 }
328
329 @Override
330 public void inputTerminated() {
331 failed(new ConnectionClosedException("Connection closed"));
332 }
333
334 public void abortConnection() {
335 discardConnection();
336 }
337
338 }