View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import org.apache.hc.core5.annotation.Internal;
39  import org.apache.hc.core5.http.ConnectionClosedException;
40  import org.apache.hc.core5.http.ConnectionReuseStrategy;
41  import org.apache.hc.core5.http.ContentLengthStrategy;
42  import org.apache.hc.core5.http.EntityDetails;
43  import org.apache.hc.core5.http.Header;
44  import org.apache.hc.core5.http.HttpException;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.HttpStatus;
48  import org.apache.hc.core5.http.LengthRequiredException;
49  import org.apache.hc.core5.http.config.CharCodingConfig;
50  import org.apache.hc.core5.http.config.Http1Config;
51  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
52  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
53  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
54  import org.apache.hc.core5.http.impl.Http1StreamListener;
55  import org.apache.hc.core5.http.message.MessageSupport;
56  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
57  import org.apache.hc.core5.http.nio.CapacityChannel;
58  import org.apache.hc.core5.http.nio.ContentDecoder;
59  import org.apache.hc.core5.http.nio.ContentEncoder;
60  import org.apache.hc.core5.http.nio.NHttpMessageParser;
61  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
62  import org.apache.hc.core5.http.nio.SessionInputBuffer;
63  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
64  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
65  import org.apache.hc.core5.http.protocol.HttpCoreContext;
66  import org.apache.hc.core5.http.protocol.HttpProcessor;
67  import org.apache.hc.core5.io.CloseMode;
68  import org.apache.hc.core5.reactor.ProtocolIOSession;
69  import org.apache.hc.core5.util.Args;
70  import org.apache.hc.core5.util.Asserts;
71  import org.apache.hc.core5.util.Timeout;
72  
73  /**
74   * I/O event handler for events fired by {@link ProtocolIOSession} that implements
75   * client side HTTP/1.1 messaging protocol with full support for
76   * duplexed message transmission and message pipelining.
77   *
78   * @since 5.0
79   */
80  @Internal
81  public class ClientHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpResponse, HttpRequest> {
82  
83      private final HttpProcessor httpProcessor;
84      private final ConnectionReuseStrategy connectionReuseStrategy;
85      private final Http1Config http1Config;
86      private final Http1StreamListener streamListener;
87      private final Queue<ClientHttp1StreamHandler> pipeline;
88      private final Http1StreamChannel<HttpRequest> outputChannel;
89  
90      private volatile ClientHttp1StreamHandler outgoing;
91      private volatile ClientHttp1StreamHandler incoming;
92  
93      public ClientHttp1StreamDuplexer(
94              final ProtocolIOSession ioSession,
95              final HttpProcessor httpProcessor,
96              final Http1Config http1Config,
97              final CharCodingConfig charCodingConfig,
98              final ConnectionReuseStrategy connectionReuseStrategy,
99              final NHttpMessageParser<HttpResponse> incomingMessageParser,
100             final NHttpMessageWriter<HttpRequest> outgoingMessageWriter,
101             final ContentLengthStrategy incomingContentStrategy,
102             final ContentLengthStrategy outgoingContentStrategy,
103             final Http1StreamListener streamListener) {
104         super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
105         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
106         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
107         this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
108                 DefaultConnectionReuseStrategy.INSTANCE;
109         this.streamListener = streamListener;
110         this.pipeline = new ConcurrentLinkedQueue<>();
111         this.outputChannel = new Http1StreamChannel<HttpRequest>() {
112 
113             @Override
114             public void close() {
115                 shutdownSession(CloseMode.IMMEDIATE);
116             }
117 
118             @Override
119             public void submit(
120                     final HttpRequest request,
121                     final boolean endStream,
122                     final FlushMode flushMode) throws HttpException, IOException {
123                 if (streamListener != null) {
124                     streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
125                 }
126                 commitMessageHead(request, endStream, flushMode);
127             }
128 
129             @Override
130             public void suspendOutput() throws IOException {
131                 suspendSessionOutput();
132             }
133 
134             @Override
135             public void requestOutput() {
136                 requestSessionOutput();
137             }
138 
139             @Override
140             public Timeout getSocketTimeout() {
141                 return getSessionTimeout();
142             }
143 
144             @Override
145             public void setSocketTimeout(final Timeout timeout) {
146                 setSessionTimeout(timeout);
147             }
148 
149             @Override
150             public int write(final ByteBuffer src) throws IOException {
151                 return streamOutput(src);
152             }
153 
154             @Override
155             public void complete(final List<? extends Header> trailers) throws IOException {
156                 endOutputStream(trailers);
157             }
158 
159             @Override
160             public boolean isCompleted() {
161                 return isOutputCompleted();
162             }
163 
164             @Override
165             public boolean abortGracefully() throws IOException {
166                 final MessageDelineation messageDelineation = endOutputStream(null);
167                 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
168             }
169 
170             @Override
171             public void activate() throws HttpException, IOException {
172             }
173 
174         };
175     }
176 
177     @Override
178     void terminate(final Exception exception) {
179         if (incoming != null) {
180             incoming.failed(exception);
181             incoming.releaseResources();
182             incoming = null;
183         }
184         if (outgoing != null) {
185             outgoing.failed(exception);
186             outgoing.releaseResources();
187             outgoing = null;
188         }
189         for (;;) {
190             final ClientHttp1StreamHandler handler = pipeline.poll();
191             if (handler != null) {
192                 handler.failed(exception);
193                 handler.releaseResources();
194             } else {
195                 break;
196             }
197         }
198     }
199 
200     @Override
201     void disconnected() {
202         if (incoming != null) {
203             if (!incoming.isCompleted()) {
204                 incoming.failed(new ConnectionClosedException());
205             }
206             incoming.releaseResources();
207             incoming = null;
208         }
209         if (outgoing != null) {
210             if (!outgoing.isCompleted()) {
211                 outgoing.failed(new ConnectionClosedException());
212             }
213             outgoing.releaseResources();
214             outgoing = null;
215         }
216         for (;;) {
217             final ClientHttp1StreamHandler handler = pipeline.poll();
218             if (handler != null) {
219                 handler.failed(new ConnectionClosedException());
220                 handler.releaseResources();
221             } else {
222                 break;
223             }
224         }
225     }
226 
227     @Override
228     void updateInputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
229         if (response.getCode() >= HttpStatus.SC_OK) {
230             connMetrics.incrementRequestCount();
231         }
232     }
233 
234     @Override
235     void updateOutputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
236         connMetrics.incrementRequestCount();
237     }
238 
239     @Override
240     protected boolean handleIncomingMessage(final HttpResponse response) throws HttpException {
241 
242         if (incoming == null) {
243             incoming = pipeline.poll();
244         }
245         if (incoming == null) {
246             throw new HttpException("Unexpected response");
247         }
248         return MessageSupport.canResponseHaveBody(incoming.getRequestMethod(), response);
249     }
250 
251     @Override
252     protected ContentDecoder createContentDecoder(
253             final long len,
254             final ReadableByteChannel channel,
255             final SessionInputBuffer buffer,
256             final BasicHttpTransportMetrics metrics) throws HttpException {
257 
258         if (len >= 0) {
259             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
260         } else if (len == ContentLengthStrategy.CHUNKED) {
261             return new ChunkDecoder(channel, buffer, http1Config, metrics);
262         } else {
263             return new IdentityDecoder(channel, buffer, metrics);
264         }
265     }
266 
267     @Override
268     protected boolean handleOutgoingMessage(final HttpRequest request) throws HttpException {
269         return true;
270     }
271 
272     @Override
273     protected ContentEncoder createContentEncoder(
274             final long len,
275             final WritableByteChannel channel,
276             final SessionOutputBuffer buffer,
277             final BasicHttpTransportMetrics metrics) throws HttpException {
278         final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
279         if (len >= 0) {
280             return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
281         } else if (len == ContentLengthStrategy.CHUNKED) {
282             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
283         } else {
284             throw new LengthRequiredException();
285         }
286     }
287 
288     @Override
289     boolean inputIdle() {
290         return incoming == null;
291     }
292 
293     @Override
294     boolean outputIdle() {
295         return outgoing == null && pipeline.isEmpty();
296     }
297 
298     @Override
299     void outputEnd() throws HttpException, IOException {
300         if (outgoing != null) {
301             if (outgoing.isCompleted()) {
302                 outgoing.releaseResources();
303             }
304             outgoing = null;
305         }
306     }
307 
308     @Override
309     void execute(final RequestExecutionCommand executionCommand) throws HttpException, IOException {
310         final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
311         final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
312         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
313         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
314         final ClientHttp1StreamHandlerntHttp1StreamHandler.html#ClientHttp1StreamHandler">ClientHttp1StreamHandler handler = new ClientHttp1StreamHandler(
315                 outputChannel,
316                 httpProcessor,
317                 http1Config,
318                 connectionReuseStrategy,
319                 exchangeHandler,
320                 context);
321         pipeline.add(handler);
322         outgoing = handler;
323 
324         if (handler.isOutputReady()) {
325             handler.produceOutput();
326         }
327     }
328 
329     @Override
330     boolean isOutputReady() {
331         return outgoing != null && outgoing.isOutputReady();
332     }
333 
334     @Override
335     void produceOutput() throws HttpException, IOException {
336         if (outgoing != null) {
337             outgoing.produceOutput();
338         }
339     }
340 
341     @Override
342     void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
343         if (streamListener != null) {
344             streamListener.onResponseHead(this, response);
345         }
346         Asserts.notNull(incoming, "Response stream handler");
347         incoming.consumeHeader(response, entityDetails);
348     }
349 
350     @Override
351     void consumeData(final ByteBuffer src) throws HttpException, IOException {
352         Asserts.notNull(incoming, "Response stream handler");
353         incoming.consumeData(src);
354     }
355 
356     @Override
357     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
358         Asserts.notNull(incoming, "Response stream handler");
359         incoming.updateCapacity(capacityChannel);
360     }
361 
362     @Override
363     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
364         Asserts.notNull(incoming, "Response stream handler");
365         incoming.dataEnd(trailers);
366     }
367 
368     @Override
369     void inputEnd() throws HttpException, IOException {
370         if (incoming != null && incoming.isResponseFinal()) {
371             if (streamListener != null) {
372                 streamListener.onExchangeComplete(this, isOpen());
373             }
374             if (incoming.isCompleted()) {
375                 incoming.releaseResources();
376             }
377             incoming = null;
378         }
379     }
380 
381     @Override
382     boolean handleTimeout() {
383         return outgoing != null && outgoing.handleTimeout();
384     }
385 
386     @Override
387     void appendState(final StringBuilder buf) {
388         super.appendState(buf);
389         super.appendState(buf);
390         buf.append(", incoming=[");
391         if (incoming != null) {
392             incoming.appendState(buf);
393         }
394         buf.append("], outgoing=[");
395         if (outgoing != null) {
396             outgoing.appendState(buf);
397         }
398         buf.append("], pipeline=");
399         buf.append(pipeline.size());
400     }
401 
402     @Override
403     public String toString() {
404         final StringBuilder buf = new StringBuilder();
405         buf.append("[");
406         appendState(buf);
407         buf.append("]");
408         return buf.toString();
409     }
410 
411 }