1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.client;
28
29 import java.io.IOException;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.http.ConnectionReuseStrategy;
33 import org.apache.http.HttpException;
34 import org.apache.http.HttpHost;
35 import org.apache.http.HttpRequest;
36 import org.apache.http.HttpResponse;
37 import org.apache.http.client.config.RequestConfig;
38 import org.apache.http.client.methods.Configurable;
39 import org.apache.http.client.methods.HttpExecutionAware;
40 import org.apache.http.client.methods.HttpRequestWrapper;
41 import org.apache.http.client.protocol.HttpClientContext;
42 import org.apache.http.concurrent.BasicFuture;
43 import org.apache.http.conn.ConnectionKeepAliveStrategy;
44 import org.apache.http.conn.routing.HttpRoute;
45 import org.apache.http.nio.ContentDecoder;
46 import org.apache.http.nio.ContentEncoder;
47 import org.apache.http.nio.IOControl;
48 import org.apache.http.nio.NHttpClientConnection;
49 import org.apache.http.nio.conn.NHttpClientConnectionManager;
50 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
51 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
52 import org.apache.http.protocol.HttpCoreContext;
53 import org.apache.http.protocol.HttpProcessor;
54
55
56
57
58
59
60
61 class MinimalClientExchangeHandlerImpl<T> extends AbstractClientExchangeHandler {
62
63 private final HttpAsyncRequestProducer requestProducer;
64 private final HttpAsyncResponseConsumer<T> responseConsumer;
65 private final HttpClientContext localContext;
66 private final BasicFuture<T> resultFuture;
67 private final HttpProcessor httpProcessor;
68
69 public MinimalClientExchangeHandlerImpl(
70 final Log log,
71 final HttpAsyncRequestProducer requestProducer,
72 final HttpAsyncResponseConsumer<T> responseConsumer,
73 final HttpClientContext localContext,
74 final BasicFuture<T> resultFuture,
75 final NHttpClientConnectionManager connmgr,
76 final HttpProcessor httpProcessor,
77 final ConnectionReuseStrategy connReuseStrategy,
78 final ConnectionKeepAliveStrategy keepaliveStrategy) {
79 super(log, localContext, connmgr, connReuseStrategy, keepaliveStrategy);
80 this.requestProducer = requestProducer;
81 this.responseConsumer = responseConsumer;
82 this.localContext = localContext;
83 this.resultFuture = resultFuture;
84 this.httpProcessor = httpProcessor;
85 }
86
87 @Override
88 void releaseResources() {
89 try {
90 this.requestProducer.close();
91 } catch (final IOException ex) {
92 this.log.debug("I/O error closing request producer", ex);
93 }
94 try {
95 this.responseConsumer.close();
96 } catch (final IOException ex) {
97 this.log.debug("I/O error closing response consumer", ex);
98 }
99 }
100
101 @Override
102 void executionFailed(final Exception ex) {
103 this.requestProducer.failed(ex);
104 this.responseConsumer.failed(ex);
105 }
106
107 @Override
108 boolean executionCancelled() {
109 final boolean cancelled = this.responseConsumer.cancel();
110
111 final T result = this.responseConsumer.getResult();
112 final Exception ex = this.responseConsumer.getException();
113 if (ex != null) {
114 this.resultFuture.failed(ex);
115 } else if (result != null) {
116 this.resultFuture.completed(result);
117 } else {
118 this.resultFuture.cancel();
119 }
120 return cancelled;
121 }
122
123 @Override
124 public void start() throws HttpException, IOException {
125 final HttpHost target = this.requestProducer.getTarget();
126 final HttpRequest original = this.requestProducer.generateRequest();
127
128 if (original instanceof HttpExecutionAware) {
129 ((HttpExecutionAware) original).setCancellable(this);
130 }
131 if (this.log.isDebugEnabled()) {
132 this.log.debug("[exchange: " + getId() + "] start execution");
133 }
134
135 if (original instanceof Configurable) {
136 final RequestConfig config = ((Configurable) original).getConfig();
137 if (config != null) {
138 this.localContext.setRequestConfig(config);
139 }
140 }
141
142 final HttpRequestWrapper request = HttpRequestWrapper.wrap(original);
143 final HttpRoute route = new HttpRoute(target);
144 setCurrentRequest(request);
145 setRoute(route);
146
147 this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
148 this.localContext.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
149 this.localContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
150
151 this.httpProcessor.process(request, this.localContext);
152
153 requestConnection();
154 }
155
156 @Override
157 public HttpRequest generateRequest() throws IOException, HttpException {
158 verifytRoute();
159 if (!isRouteEstablished()) {
160 onRouteToTarget();
161 onRouteComplete();
162 }
163
164 final NHttpClientConnection localConn = getConnection();
165 this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, localConn);
166 final RequestConfig config = this.localContext.getRequestConfig();
167 if (config.getSocketTimeout() > 0) {
168 localConn.setSocketTimeout(config.getSocketTimeout());
169 }
170 return getCurrentRequest();
171 }
172
173 @Override
174 public void produceContent(
175 final ContentEncoder encoder, final IOControl ioControl) throws IOException {
176 if (this.log.isDebugEnabled()) {
177 this.log.debug("[exchange: " + getId() + "] produce content");
178 }
179 this.requestProducer.produceContent(encoder, ioControl);
180 if (encoder.isCompleted()) {
181 this.requestProducer.resetRequest();
182 }
183 }
184
185 @Override
186 public void requestCompleted() {
187 if (this.log.isDebugEnabled()) {
188 this.log.debug("[exchange: " + getId() + "] Request completed");
189 }
190 this.requestProducer.requestCompleted(this.localContext);
191 }
192
193 @Override
194 public void responseReceived(
195 final HttpResponse response) throws IOException, HttpException {
196 if (this.log.isDebugEnabled()) {
197 this.log.debug("[exchange: " + getId() + "] Response received " + response.getStatusLine());
198 }
199 this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
200 this.httpProcessor.process(response, this.localContext);
201
202 setCurrentResponse(response);
203
204 this.responseConsumer.responseReceived(response);
205 }
206
207 @Override
208 public void consumeContent(
209 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
210 if (this.log.isDebugEnabled()) {
211 this.log.debug("[exchange: " + getId() + "] Consume content");
212 }
213 this.responseConsumer.consumeContent(decoder, ioControl);
214 if (!decoder.isCompleted() && this.responseConsumer.isDone()) {
215 markConnectionNonReusable();
216 try {
217 markCompleted();
218 releaseConnection();
219 this.resultFuture.cancel();
220 } finally {
221 close();
222 }
223 }
224 }
225
226 @Override
227 public void responseCompleted() throws IOException, HttpException {
228 manageConnectionPersistence();
229 this.responseConsumer.responseCompleted(this.localContext);
230 if (this.log.isDebugEnabled()) {
231 this.log.debug("[exchange: " + getId() + "] Response processed");
232 }
233 try {
234 markCompleted();
235 releaseConnection();
236 final T result = this.responseConsumer.getResult();
237 final Exception ex = this.responseConsumer.getException();
238 if (ex == null) {
239 this.resultFuture.completed(result);
240 } else {
241 this.resultFuture.failed(ex);
242 }
243 } finally {
244 close();
245 }
246 }
247
248 @Override
249 public void inputTerminated() {
250 close();
251 }
252
253 public void abortConnection() {
254 discardConnection();
255 }
256
257 }