View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.nio.client;
28  
29  import java.io.IOException;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.http.ConnectionReuseStrategy;
33  import org.apache.http.HttpException;
34  import org.apache.http.HttpHost;
35  import org.apache.http.HttpRequest;
36  import org.apache.http.HttpResponse;
37  import org.apache.http.client.config.RequestConfig;
38  import org.apache.http.client.methods.Configurable;
39  import org.apache.http.client.methods.HttpExecutionAware;
40  import org.apache.http.client.methods.HttpRequestWrapper;
41  import org.apache.http.client.protocol.HttpClientContext;
42  import org.apache.http.concurrent.BasicFuture;
43  import org.apache.http.conn.ConnectionKeepAliveStrategy;
44  import org.apache.http.conn.routing.HttpRoute;
45  import org.apache.http.nio.ContentDecoder;
46  import org.apache.http.nio.ContentEncoder;
47  import org.apache.http.nio.IOControl;
48  import org.apache.http.nio.NHttpClientConnection;
49  import org.apache.http.nio.conn.NHttpClientConnectionManager;
50  import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
51  import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
52  import org.apache.http.protocol.HttpCoreContext;
53  import org.apache.http.protocol.HttpProcessor;
54  
55  /**
56   * Default implementation of {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler}.
57   * <p>
58   * Instances of this class are expected to be accessed by one thread at a time only.
59   * The {@link #cancel()} method can be called concurrently by multiple threads.
60   */
61  class MinimalClientExchangeHandlerImpl<T> extends AbstractClientExchangeHandler {
62  
63      private final HttpAsyncRequestProducer requestProducer;
64      private final HttpAsyncResponseConsumer<T> responseConsumer;
65      private final HttpClientContext localContext;
66      private final BasicFuture<T> resultFuture;
67      private final HttpProcessor httpProcessor;
68  
69      public MinimalClientExchangeHandlerImpl(
70              final Log log,
71              final HttpAsyncRequestProducer requestProducer,
72              final HttpAsyncResponseConsumer<T> responseConsumer,
73              final HttpClientContext localContext,
74              final BasicFuture<T> resultFuture,
75              final NHttpClientConnectionManager connmgr,
76              final HttpProcessor httpProcessor,
77              final ConnectionReuseStrategy connReuseStrategy,
78              final ConnectionKeepAliveStrategy keepaliveStrategy) {
79          super(log, localContext, connmgr, connReuseStrategy, keepaliveStrategy);
80          this.requestProducer = requestProducer;
81          this.responseConsumer = responseConsumer;
82          this.localContext = localContext;
83          this.resultFuture = resultFuture;
84          this.httpProcessor = httpProcessor;
85      }
86  
87      @Override
88      void releaseResources() {
89          try {
90              this.requestProducer.close();
91          } catch (final IOException ex) {
92              this.log.debug("I/O error closing request producer", ex);
93          }
94          try {
95              this.responseConsumer.close();
96          } catch (final IOException ex) {
97              this.log.debug("I/O error closing response consumer", ex);
98          }
99      }
100 
101     @Override
102     void executionFailed(final Exception ex) {
103         this.requestProducer.failed(ex);
104         this.responseConsumer.failed(ex);
105     }
106 
107     @Override
108     boolean executionCancelled() {
109         final boolean cancelled = this.responseConsumer.cancel();
110 
111         final T result = this.responseConsumer.getResult();
112         final Exception ex = this.responseConsumer.getException();
113         if (ex != null) {
114             this.resultFuture.failed(ex);
115         } else if (result != null) {
116             this.resultFuture.completed(result);
117         } else {
118             this.resultFuture.cancel();
119         }
120         return cancelled;
121     }
122 
123     @Override
124     public void start() throws HttpException, IOException {
125         final HttpHost target = this.requestProducer.getTarget();
126         final HttpRequest original = this.requestProducer.generateRequest();
127 
128         if (original instanceof HttpExecutionAware) {
129             ((HttpExecutionAware) original).setCancellable(this);
130         }
131         if (this.log.isDebugEnabled()) {
132             this.log.debug("[exchange: " + getId() + "] start execution");
133         }
134 
135         if (original instanceof Configurable) {
136             final RequestConfig config = ((Configurable) original).getConfig();
137             if (config != null) {
138                 this.localContext.setRequestConfig(config);
139             }
140         }
141 
142         final HttpRequestWrapper request = HttpRequestWrapper.wrap(original);
143         final HttpRoute route = new HttpRoute(target);
144         setCurrentRequest(request);
145         setRoute(route);
146 
147         this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
148         this.localContext.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
149         this.localContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
150 
151         this.httpProcessor.process(request, this.localContext);
152 
153         requestConnection();
154     }
155 
156     @Override
157     public HttpRequest generateRequest() throws IOException, HttpException {
158         verifytRoute();
159         if (!isRouteEstablished()) {
160             onRouteToTarget();
161             onRouteComplete();
162         }
163 
164         final NHttpClientConnection localConn = getConnection();
165         this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, localConn);
166         final RequestConfig config = this.localContext.getRequestConfig();
167         if (config.getSocketTimeout() > 0) {
168             localConn.setSocketTimeout(config.getSocketTimeout());
169         }
170         return getCurrentRequest();
171     }
172 
173     @Override
174     public void produceContent(
175             final ContentEncoder encoder, final IOControl ioControl) throws IOException {
176         if (this.log.isDebugEnabled()) {
177             this.log.debug("[exchange: " + getId() + "] produce content");
178         }
179         this.requestProducer.produceContent(encoder, ioControl);
180         if (encoder.isCompleted()) {
181             this.requestProducer.resetRequest();
182         }
183     }
184 
185     @Override
186     public void requestCompleted() {
187         if (this.log.isDebugEnabled()) {
188             this.log.debug("[exchange: " + getId() + "] Request completed");
189         }
190         this.requestProducer.requestCompleted(this.localContext);
191     }
192 
193     @Override
194     public void responseReceived(
195             final HttpResponse response) throws IOException, HttpException {
196         if (this.log.isDebugEnabled()) {
197             this.log.debug("[exchange: " + getId() + "] Response received " + response.getStatusLine());
198         }
199         this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
200         this.httpProcessor.process(response, this.localContext);
201 
202         setCurrentResponse(response);
203 
204         this.responseConsumer.responseReceived(response);
205     }
206 
207     @Override
208     public void consumeContent(
209             final ContentDecoder decoder, final IOControl ioControl) throws IOException {
210         if (this.log.isDebugEnabled()) {
211             this.log.debug("[exchange: " + getId() + "] Consume content");
212         }
213         this.responseConsumer.consumeContent(decoder, ioControl);
214         if (!decoder.isCompleted() && this.responseConsumer.isDone()) {
215             markConnectionNonReusable();
216             try {
217                 markCompleted();
218                 releaseConnection();
219                 this.resultFuture.cancel();
220             } finally {
221                 close();
222             }
223         }
224     }
225 
226     @Override
227     public void responseCompleted() throws IOException, HttpException {
228         manageConnectionPersistence();
229         this.responseConsumer.responseCompleted(this.localContext);
230         if (this.log.isDebugEnabled()) {
231             this.log.debug("[exchange: " + getId() + "] Response processed");
232         }
233         try {
234             markCompleted();
235             releaseConnection();
236             final T result = this.responseConsumer.getResult();
237             final Exception ex = this.responseConsumer.getException();
238             if (ex == null) {
239                 this.resultFuture.completed(result);
240             } else {
241                 this.resultFuture.failed(ex);
242             }
243         } finally {
244             close();
245         }
246     }
247 
248     @Override
249     public void inputTerminated() {
250         close();
251     }
252 
253     public void abortConnection() {
254         discardConnection();
255     }
256 
257 }