1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.client;
28
29 import java.io.IOException;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.http.ConnectionReuseStrategy;
33 import org.apache.http.HttpException;
34 import org.apache.http.HttpHost;
35 import org.apache.http.HttpRequest;
36 import org.apache.http.HttpResponse;
37 import org.apache.http.client.config.RequestConfig;
38 import org.apache.http.client.methods.Configurable;
39 import org.apache.http.client.methods.HttpExecutionAware;
40 import org.apache.http.client.methods.HttpRequestWrapper;
41 import org.apache.http.client.protocol.HttpClientContext;
42 import org.apache.http.concurrent.BasicFuture;
43 import org.apache.http.conn.ConnectionKeepAliveStrategy;
44 import org.apache.http.conn.routing.HttpRoute;
45 import org.apache.http.nio.ContentDecoder;
46 import org.apache.http.nio.ContentEncoder;
47 import org.apache.http.nio.IOControl;
48 import org.apache.http.nio.NHttpClientConnection;
49 import org.apache.http.nio.conn.NHttpClientConnectionManager;
50 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
51 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
52 import org.apache.http.protocol.HttpCoreContext;
53 import org.apache.http.protocol.HttpProcessor;
54
55
56
57
58
59
60
61 class MinimalClientExchangeHandlerImpl<T> extends AbstractClientExchangeHandler {
62
63 private final HttpAsyncRequestProducer requestProducer;
64 private final HttpAsyncResponseConsumer<T> responseConsumer;
65 private final HttpClientContext localContext;
66 private final BasicFuture<T> resultFuture;
67 private final HttpProcessor httpProcessor;
68
69 public MinimalClientExchangeHandlerImpl(
70 final Log log,
71 final HttpAsyncRequestProducer requestProducer,
72 final HttpAsyncResponseConsumer<T> responseConsumer,
73 final HttpClientContext localContext,
74 final BasicFuture<T> resultFuture,
75 final NHttpClientConnectionManager connmgr,
76 final HttpProcessor httpProcessor,
77 final ConnectionReuseStrategy connReuseStrategy,
78 final ConnectionKeepAliveStrategy keepaliveStrategy) {
79 super(log, localContext, connmgr, connReuseStrategy, keepaliveStrategy);
80 this.requestProducer = requestProducer;
81 this.responseConsumer = responseConsumer;
82 this.localContext = localContext;
83 this.resultFuture = resultFuture;
84 this.httpProcessor = httpProcessor;
85 }
86
87 @Override
88 void releaseResources() {
89 try {
90 this.requestProducer.close();
91 } catch (final IOException ex) {
92 this.log.debug("I/O error closing request producer", ex);
93 }
94 try {
95 this.responseConsumer.close();
96 } catch (final IOException ex) {
97 this.log.debug("I/O error closing response consumer", ex);
98 }
99 }
100
101 @Override
102 void executionFailed(final Exception ex) {
103 this.requestProducer.failed(ex);
104 this.responseConsumer.failed(ex);
105 }
106
107 @Override
108 boolean executionCancelled() {
109 final boolean cancelled = this.responseConsumer.cancel();
110
111 final T result = this.responseConsumer.getResult();
112 final Exception ex = this.responseConsumer.getException();
113 if (ex != null) {
114 this.resultFuture.failed(ex);
115 } else if (result != null) {
116 this.resultFuture.completed(result);
117 } else {
118 this.resultFuture.cancel();
119 }
120 return cancelled;
121 }
122
123 public void start() throws HttpException, IOException {
124 final HttpHost target = this.requestProducer.getTarget();
125 final HttpRequest original = this.requestProducer.generateRequest();
126
127 if (original instanceof HttpExecutionAware) {
128 ((HttpExecutionAware) original).setCancellable(this);
129 }
130 if (this.log.isDebugEnabled()) {
131 this.log.debug("[exchange: " + getId() + "] start execution");
132 }
133
134 if (original instanceof Configurable) {
135 final RequestConfig config = ((Configurable) original).getConfig();
136 if (config != null) {
137 this.localContext.setRequestConfig(config);
138 }
139 }
140
141 final HttpRequestWrapper request = HttpRequestWrapper.wrap(original);
142 final HttpRoute route = new HttpRoute(target);
143 setCurrentRequest(request);
144 setRoute(route);
145
146 this.localContext.setAttribute(HttpClientContext.HTTP_REQUEST, request);
147 this.localContext.setAttribute(HttpClientContext.HTTP_TARGET_HOST, target);
148 this.localContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
149
150 this.httpProcessor.process(request, this.localContext);
151
152 requestConnection();
153 }
154
155 @Override
156 public HttpRequest generateRequest() throws IOException, HttpException {
157 verifytRoute();
158 if (!isRouteEstablished()) {
159 onRouteToTarget();
160 onRouteComplete();
161 }
162
163 final NHttpClientConnection localConn = getConnection();
164 this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, localConn);
165 final RequestConfig config = this.localContext.getRequestConfig();
166 if (config.getSocketTimeout() > 0) {
167 localConn.setSocketTimeout(config.getSocketTimeout());
168 }
169 return getCurrentRequest();
170 }
171
172 @Override
173 public void produceContent(
174 final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
175 if (this.log.isDebugEnabled()) {
176 this.log.debug("[exchange: " + getId() + "] produce content");
177 }
178 this.requestProducer.produceContent(encoder, ioctrl);
179 if (encoder.isCompleted()) {
180 this.requestProducer.resetRequest();
181 }
182 }
183
184 @Override
185 public void requestCompleted() {
186 if (this.log.isDebugEnabled()) {
187 this.log.debug("[exchange: " + getId() + "] Request completed");
188 }
189 this.requestProducer.requestCompleted(this.localContext);
190 }
191
192 @Override
193 public void responseReceived(
194 final HttpResponse response) throws IOException, HttpException {
195 if (this.log.isDebugEnabled()) {
196 this.log.debug("[exchange: " + getId() + "] Response received " + response.getStatusLine());
197 }
198 this.localContext.setAttribute(HttpClientContext.HTTP_RESPONSE, response);
199 this.httpProcessor.process(response, this.localContext);
200
201 setCurrentResponse(response);
202
203 this.responseConsumer.responseReceived(response);
204 }
205
206 @Override
207 public void consumeContent(
208 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
209 if (this.log.isDebugEnabled()) {
210 this.log.debug("[exchange: " + getId() + "] Consume content");
211 }
212 this.responseConsumer.consumeContent(decoder, ioctrl);
213 if (!decoder.isCompleted() && this.responseConsumer.isDone()) {
214 markConnectionNonReusable();
215 try {
216 markCompleted();
217 releaseConnection();
218 this.resultFuture.cancel();
219 } finally {
220 close();
221 }
222 }
223 }
224
225 @Override
226 public void responseCompleted() throws IOException, HttpException {
227 manageConnectionPersistence();
228 this.responseConsumer.responseCompleted(this.localContext);
229 if (this.log.isDebugEnabled()) {
230 this.log.debug("[exchange: " + getId() + "] Response processed");
231 }
232 try {
233 markCompleted();
234 releaseConnection();
235 final T result = this.responseConsumer.getResult();
236 final Exception ex = this.responseConsumer.getException();
237 if (ex == null) {
238 this.resultFuture.completed(result);
239 } else {
240 this.resultFuture.failed(ex);
241 }
242 } finally {
243 close();
244 }
245 }
246
247 @Override
248 public void inputTerminated() {
249 close();
250 }
251
252 public void abortConnection() {
253 discardConnection();
254 }
255
256 }