View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.nio.client;
28  
29  import java.io.IOException;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.http.ConnectionReuseStrategy;
33  import org.apache.http.HttpException;
34  import org.apache.http.HttpHost;
35  import org.apache.http.HttpRequest;
36  import org.apache.http.HttpResponse;
37  import org.apache.http.client.config.RequestConfig;
38  import org.apache.http.client.methods.Configurable;
39  import org.apache.http.client.methods.HttpExecutionAware;
40  import org.apache.http.client.methods.HttpRequestWrapper;
41  import org.apache.http.client.protocol.HttpClientContext;
42  import org.apache.http.concurrent.BasicFuture;
43  import org.apache.http.conn.ConnectionKeepAliveStrategy;
44  import org.apache.http.conn.routing.HttpRoute;
45  import org.apache.http.nio.ContentDecoder;
46  import org.apache.http.nio.ContentEncoder;
47  import org.apache.http.nio.IOControl;
48  import org.apache.http.nio.NHttpClientConnection;
49  import org.apache.http.nio.conn.NHttpClientConnectionManager;
50  import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
51  import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
52  import org.apache.http.protocol.HttpCoreContext;
53  import org.apache.http.protocol.HttpProcessor;
54  
55  /**
56   * Default implementation of {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler}.
57   * <p>
58   * Instances of this class are expected to be accessed by one thread at a time only.
59   * The {@link #cancel()} method can be called concurrently by multiple threads.
60   */
61  class MinimalClientExchangeHandlerImpl<T> extends AbstractClientExchangeHandler {
62  
63      private final HttpAsyncRequestProducer requestProducer;
64      private final HttpAsyncResponseConsumer<T> responseConsumer;
65      private final HttpClientContext localContext;
66      private final BasicFuture<T> resultFuture;
67      private final HttpProcessor httpProcessor;
68  
69      public MinimalClientExchangeHandlerImpl(
70              final Log log,
71              final HttpAsyncRequestProducer requestProducer,
72              final HttpAsyncResponseConsumer<T> responseConsumer,
73              final HttpClientContext localContext,
74              final BasicFuture<T> resultFuture,
75              final NHttpClientConnectionManager connmgr,
76              final HttpProcessor httpProcessor,
77              final ConnectionReuseStrategy connReuseStrategy,
78              final ConnectionKeepAliveStrategy keepaliveStrategy) {
79          super(log, localContext, connmgr, connReuseStrategy, keepaliveStrategy);
80          this.requestProducer = requestProducer;
81          this.responseConsumer = responseConsumer;
82          this.localContext = localContext;
83          this.resultFuture = resultFuture;
84          this.httpProcessor = httpProcessor;
85      }
86  
87      @Override
88      void releaseResources() {
89          try {
90              this.requestProducer.close();
91          } catch (final IOException ex) {
92              this.log.debug("I/O error closing request producer", ex);
93          }
94          try {
95              this.responseConsumer.close();
96          } catch (final IOException ex) {
97              this.log.debug("I/O error closing response consumer", ex);
98          }
99      }
100 
101     @Override
102     void executionFailed(final Exception ex) {
103         this.requestProducer.failed(ex);
104         this.responseConsumer.failed(ex);
105     }
106 
107     @Override
108     boolean executionCancelled() {
109         final boolean cancelled = this.responseConsumer.cancel();
110 
111         final T result = this.responseConsumer.getResult();
112         final Exception ex = this.responseConsumer.getException();
113         if (ex != null) {
114             this.resultFuture.failed(ex);
115         } else if (result != null) {
116             this.resultFuture.completed(result);
117         } else {
118             this.resultFuture.cancel();
119         }
120         return cancelled;
121     }
122 
123     public void start() throws HttpException, IOException {
124         final HttpHost target = this.requestProducer.getTarget();
125         final HttpRequest original = this.requestProducer.generateRequest();
126 
127         if (original instanceof HttpExecutionAware) {
128             ((HttpExecutionAware) original).setCancellable(this);
129         }
130         if (this.log.isDebugEnabled()) {
131             this.log.debug("[exchange: " + getId() + "] start execution");
132         }
133 
134         if (original instanceof Configurable) {
135             final RequestConfig config = ((Configurable) original).getConfig();
136             if (config != null) {
137                 this.localContext.setRequestConfig(config);
138             }
139         }
140 
141         final HttpRequestWrapper request = HttpRequestWrapper.wrap(original);
142         final HttpRoute route = new HttpRoute(target);
143         setCurrentRequest(request);
144         setRoute(route);
145 
146         this.localContext.setAttribute(HttpClientContext.HTTP_REQUEST, request);
147         this.localContext.setAttribute(HttpClientContext.HTTP_TARGET_HOST, target);
148         this.localContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
149 
150         this.httpProcessor.process(request, this.localContext);
151 
152         requestConnection();
153     }
154 
155     @Override
156     public HttpRequest generateRequest() throws IOException, HttpException {
157         verifytRoute();
158         if (!isRouteEstablished()) {
159             onRouteToTarget();
160             onRouteComplete();
161         }
162 
163         final NHttpClientConnection localConn = getConnection();
164         this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, localConn);
165         final RequestConfig config = this.localContext.getRequestConfig();
166         if (config.getSocketTimeout() > 0) {
167             localConn.setSocketTimeout(config.getSocketTimeout());
168         }
169         return getCurrentRequest();
170     }
171 
172     @Override
173     public void produceContent(
174             final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
175         if (this.log.isDebugEnabled()) {
176             this.log.debug("[exchange: " + getId() + "] produce content");
177         }
178         this.requestProducer.produceContent(encoder, ioctrl);
179         if (encoder.isCompleted()) {
180             this.requestProducer.resetRequest();
181         }
182     }
183 
184     @Override
185     public void requestCompleted() {
186         if (this.log.isDebugEnabled()) {
187             this.log.debug("[exchange: " + getId() + "] Request completed");
188         }
189         this.requestProducer.requestCompleted(this.localContext);
190     }
191 
192     @Override
193     public void responseReceived(
194             final HttpResponse response) throws IOException, HttpException {
195         if (this.log.isDebugEnabled()) {
196             this.log.debug("[exchange: " + getId() + "] Response received " + response.getStatusLine());
197         }
198         this.localContext.setAttribute(HttpClientContext.HTTP_RESPONSE, response);
199         this.httpProcessor.process(response, this.localContext);
200 
201         setCurrentResponse(response);
202 
203         this.responseConsumer.responseReceived(response);
204     }
205 
206     @Override
207     public void consumeContent(
208             final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
209         if (this.log.isDebugEnabled()) {
210             this.log.debug("[exchange: " + getId() + "] Consume content");
211         }
212         this.responseConsumer.consumeContent(decoder, ioctrl);
213         if (!decoder.isCompleted() && this.responseConsumer.isDone()) {
214             markConnectionNonReusable();
215             try {
216                 markCompleted();
217                 releaseConnection();
218                 this.resultFuture.cancel();
219             } finally {
220                 close();
221             }
222         }
223     }
224 
225     @Override
226     public void responseCompleted() throws IOException, HttpException {
227         manageConnectionPersistence();
228         this.responseConsumer.responseCompleted(this.localContext);
229         if (this.log.isDebugEnabled()) {
230             this.log.debug("[exchange: " + getId() + "] Response processed");
231         }
232         try {
233             markCompleted();
234             releaseConnection();
235             final T result = this.responseConsumer.getResult();
236             final Exception ex = this.responseConsumer.getException();
237             if (ex == null) {
238                 this.resultFuture.completed(result);
239             } else {
240                 this.resultFuture.failed(ex);
241             }
242         } finally {
243             close();
244         }
245     }
246 
247     @Override
248     public void inputTerminated() {
249         close();
250     }
251 
252     public void abortConnection() {
253         discardConnection();
254     }
255 
256 }