View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import org.apache.hc.core5.annotation.Internal;
39  import org.apache.hc.core5.http.ConnectionClosedException;
40  import org.apache.hc.core5.http.ConnectionReuseStrategy;
41  import org.apache.hc.core5.http.ContentLengthStrategy;
42  import org.apache.hc.core5.http.EntityDetails;
43  import org.apache.hc.core5.http.Header;
44  import org.apache.hc.core5.http.HttpException;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.HttpStatus;
48  import org.apache.hc.core5.http.config.CharCodingConfig;
49  import org.apache.hc.core5.http.config.Http1Config;
50  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
51  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
52  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
53  import org.apache.hc.core5.http.impl.Http1StreamListener;
54  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
55  import org.apache.hc.core5.http.nio.CapacityChannel;
56  import org.apache.hc.core5.http.nio.ContentDecoder;
57  import org.apache.hc.core5.http.nio.ContentEncoder;
58  import org.apache.hc.core5.http.nio.HandlerFactory;
59  import org.apache.hc.core5.http.nio.NHttpMessageParser;
60  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
61  import org.apache.hc.core5.http.nio.SessionInputBuffer;
62  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
63  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64  import org.apache.hc.core5.http.protocol.HttpCoreContext;
65  import org.apache.hc.core5.http.protocol.HttpProcessor;
66  import org.apache.hc.core5.io.CloseMode;
67  import org.apache.hc.core5.reactor.ProtocolIOSession;
68  import org.apache.hc.core5.util.Args;
69  import org.apache.hc.core5.util.Asserts;
70  import org.apache.hc.core5.util.Timeout;
71  
72  /**
73   * I/O event handler for events fired by {@link ProtocolIOSession} that implements
74   * server side HTTP/1.1 messaging protocol with full support for
75   * duplexed message transmission and message pipelining.
76   *
77   * @since 5.0
78   */
79  @Internal
80  public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpRequest, HttpResponse> {
81  
82      private final String scheme;
83      private final HttpProcessor httpProcessor;
84      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
85      private final Http1Config http1Config;
86      private final ConnectionReuseStrategy connectionReuseStrategy;
87      private final Http1StreamListener streamListener;
88      private final Queue<ServerHttp1StreamHandler> pipeline;
89      private final Http1StreamChannel<HttpResponse> outputChannel;
90  
91      private volatile ServerHttp1StreamHandler outgoing;
92      private volatile ServerHttp1StreamHandler incoming;
93  
94      public ServerHttp1StreamDuplexer(
95              final ProtocolIOSession ioSession,
96              final HttpProcessor httpProcessor,
97              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
98              final String scheme,
99              final Http1Config http1Config,
100             final CharCodingConfig charCodingConfig,
101             final ConnectionReuseStrategy connectionReuseStrategy,
102             final NHttpMessageParser<HttpRequest> incomingMessageParser,
103             final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
104             final ContentLengthStrategy incomingContentStrategy,
105             final ContentLengthStrategy outgoingContentStrategy,
106             final Http1StreamListener streamListener) {
107         super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
108         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
109         this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
110         this.scheme = scheme;
111         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
112         this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
113                 DefaultConnectionReuseStrategy.INSTANCE;
114         this.streamListener = streamListener;
115         this.pipeline = new ConcurrentLinkedQueue<>();
116         this.outputChannel = new Http1StreamChannel<HttpResponse>() {
117 
118             @Override
119             public void close() {
120                 ServerHttp1StreamDuplexer.this.close(CloseMode.GRACEFUL);
121             }
122 
123             @Override
124             public void submit(
125                     final HttpResponse response,
126                     final boolean endStream,
127                     final FlushMode flushMode) throws HttpException, IOException {
128                 if (streamListener != null) {
129                     streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
130                 }
131                 commitMessageHead(response, endStream, flushMode);
132             }
133 
134             @Override
135             public void requestOutput() {
136                 requestSessionOutput();
137             }
138 
139             @Override
140             public void suspendOutput() throws IOException {
141                 suspendSessionOutput();
142             }
143 
144             @Override
145             public Timeout getSocketTimeout() {
146                 return getSessionTimeout();
147             }
148 
149             @Override
150             public void setSocketTimeout(final Timeout timeout) {
151                 setSessionTimeout(timeout);
152             }
153 
154             @Override
155             public int write(final ByteBuffer src) throws IOException {
156                 return streamOutput(src);
157             }
158 
159             @Override
160             public void complete(final List<? extends Header> trailers) throws IOException {
161                 endOutputStream(trailers);
162             }
163 
164             @Override
165             public boolean isCompleted() {
166                 return isOutputCompleted();
167             }
168 
169             @Override
170             public boolean abortGracefully() throws IOException {
171                 final MessageDelineation messageDelineation = endOutputStream(null);
172                 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
173             }
174 
175             @Override
176             public void activate() throws HttpException, IOException {
177                 // empty
178             }
179 
180             @Override
181             public String toString() {
182                 return "Http1StreamChannel[" + ServerHttp1StreamDuplexer.this + "]";
183             }
184 
185         };
186     }
187 
188     @Override
189     void terminate(final Exception exception) {
190         if (incoming != null) {
191             incoming.failed(exception);
192             incoming.releaseResources();
193             incoming = null;
194         }
195         if (outgoing != null) {
196             outgoing.failed(exception);
197             outgoing.releaseResources();
198             outgoing = null;
199         }
200         for (;;) {
201             final ServerHttp1StreamHandler handler = pipeline.poll();
202             if (handler != null) {
203                 handler.failed(exception);
204                 handler.releaseResources();
205             } else {
206                 break;
207             }
208         }
209     }
210 
211     @Override
212     void disconnected() {
213         if (incoming != null) {
214             if (!incoming.isCompleted()) {
215                 incoming.failed(new ConnectionClosedException());
216             }
217             incoming.releaseResources();
218             incoming = null;
219         }
220         if (outgoing != null) {
221             if (!outgoing.isCompleted()) {
222                 outgoing.failed(new ConnectionClosedException());
223             }
224             outgoing.releaseResources();
225             outgoing = null;
226         }
227         for (;;) {
228             final ServerHttp1StreamHandler handler = pipeline.poll();
229             if (handler != null) {
230                 handler.failed(new ConnectionClosedException());
231                 handler.releaseResources();
232             } else {
233                 break;
234             }
235         }
236     }
237 
238     @Override
239     void updateInputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
240         connMetrics.incrementRequestCount();
241     }
242 
243     @Override
244     void updateOutputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
245         if (response.getCode() >= HttpStatus.SC_OK) {
246             connMetrics.incrementRequestCount();
247         }
248     }
249 
250     @Override
251     protected boolean handleIncomingMessage(final HttpRequest request) throws HttpException {
252         return true;
253     }
254 
255     @Override
256     protected ContentDecoder createContentDecoder(
257             final long len,
258             final ReadableByteChannel channel,
259             final SessionInputBuffer buffer,
260             final BasicHttpTransportMetrics metrics) throws HttpException {
261         if (len >= 0) {
262             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
263         } else if (len == ContentLengthStrategy.CHUNKED) {
264             return new ChunkDecoder(channel, buffer, http1Config, metrics);
265         } else {
266             return null;
267         }
268     }
269 
270     @Override
271     protected boolean handleOutgoingMessage(final HttpResponse response) throws HttpException {
272         return true;
273     }
274 
275     @Override
276     protected ContentEncoder createContentEncoder(
277             final long len,
278             final WritableByteChannel channel,
279             final SessionOutputBuffer buffer,
280             final BasicHttpTransportMetrics metrics) throws HttpException {
281         final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
282         if (len >= 0) {
283             return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
284         } else if (len == ContentLengthStrategy.CHUNKED) {
285             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
286         } else {
287             return new IdentityEncoder(channel, buffer, metrics, chunkSizeHint);
288         }
289     }
290 
291     @Override
292     boolean inputIdle() {
293         return incoming == null;
294     }
295 
296     @Override
297     boolean outputIdle() {
298         return outgoing == null && pipeline.isEmpty();
299     }
300 
301     @Override
302     HttpRequest parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
303         try {
304             return super.parseMessageHead(endOfStream);
305         } catch (final HttpException ex) {
306             terminateExchange(ex);
307             return null;
308         }
309     }
310 
311     void terminateExchange(final HttpException ex) throws HttpException, IOException {
312         suspendSessionInput();
313         final ServerHttp1StreamHandler streamHandler;
314         final HttpCoreContext context = HttpCoreContext.create();
315         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
316         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
317         if (outgoing == null) {
318             streamHandler = new ServerHttp1StreamHandler(
319                     outputChannel,
320                     httpProcessor,
321                     connectionReuseStrategy,
322                     exchangeHandlerFactory,
323                     context);
324             outgoing = streamHandler;
325         } else {
326             streamHandler = new ServerHttp1StreamHandler(
327                     new DelayedOutputChannel(outputChannel),
328                     httpProcessor,
329                     connectionReuseStrategy,
330                     exchangeHandlerFactory,
331                     context);
332             pipeline.add(streamHandler);
333         }
334         streamHandler.terminateExchange(ex);
335         incoming = null;
336     }
337 
338     @Override
339     void consumeHeader(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
340         if (streamListener != null) {
341             streamListener.onRequestHead(this, request);
342         }
343         final ServerHttp1StreamHandler streamHandler;
344         final HttpCoreContext context = HttpCoreContext.create();
345         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
346         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
347         if (outgoing == null) {
348             streamHandler = new ServerHttp1StreamHandler(
349                     outputChannel,
350                     httpProcessor,
351                     connectionReuseStrategy,
352                     exchangeHandlerFactory,
353                     context);
354             outgoing = streamHandler;
355         } else {
356             streamHandler = new ServerHttp1StreamHandler(
357                     new DelayedOutputChannel(outputChannel),
358                     httpProcessor,
359                     connectionReuseStrategy,
360                     exchangeHandlerFactory,
361                     context);
362             pipeline.add(streamHandler);
363         }
364         request.setScheme(scheme);
365         streamHandler.consumeHeader(request, entityDetails);
366         incoming = streamHandler;
367     }
368 
369     @Override
370     void consumeData(final ByteBuffer src) throws HttpException, IOException {
371         Asserts.notNull(incoming, "Request stream handler");
372         incoming.consumeData(src);
373     }
374 
375     @Override
376     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
377         Asserts.notNull(incoming, "Request stream handler");
378         incoming.updateCapacity(capacityChannel);
379     }
380 
381     @Override
382     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
383         Asserts.notNull(incoming, "Request stream handler");
384         incoming.dataEnd(trailers);
385     }
386 
387     @Override
388     void inputEnd() throws HttpException, IOException {
389         if (incoming != null) {
390             if (incoming.isCompleted()) {
391                 incoming.releaseResources();
392             }
393             incoming = null;
394         }
395     }
396 
397     @Override
398     void execute(final RequestExecutionCommand executionCommand) throws HttpException {
399         throw new HttpException("Illegal command: " + executionCommand.getClass());
400     }
401 
402     @Override
403     boolean isOutputReady() {
404         return outgoing != null && outgoing.isOutputReady();
405     }
406 
407     @Override
408     void produceOutput() throws HttpException, IOException {
409         if (outgoing != null) {
410             outgoing.produceOutput();
411         }
412     }
413 
414     @Override
415     void outputEnd() throws HttpException, IOException {
416         if (outgoing != null && outgoing.isResponseFinal()) {
417             if (streamListener != null) {
418                 streamListener.onExchangeComplete(this, outgoing.keepAlive());
419             }
420             if (outgoing.isCompleted()) {
421                 outgoing.releaseResources();
422             }
423             outgoing = null;
424         }
425         if (outgoing == null && isOpen()) {
426             final ServerHttp1StreamHandler handler = pipeline.poll();
427             if (handler != null) {
428                 outgoing = handler;
429                 handler.activateChannel();
430                 if (handler.isOutputReady()) {
431                     handler.produceOutput();
432                 }
433             }
434         }
435     }
436 
437     @Override
438     boolean handleTimeout() {
439         return false;
440     }
441 
442     @Override
443     void appendState(final StringBuilder buf) {
444         super.appendState(buf);
445         buf.append(", incoming=[");
446         if (incoming != null) {
447             incoming.appendState(buf);
448         }
449         buf.append("], outgoing=[");
450         if (outgoing != null) {
451             outgoing.appendState(buf);
452         }
453         buf.append("], pipeline=");
454         buf.append(pipeline.size());
455     }
456 
457     @Override
458     public String toString() {
459         final StringBuilder buf = new StringBuilder();
460         buf.append("[");
461         appendState(buf);
462         buf.append("]");
463         return buf.toString();
464     }
465 
466     private static class DelayedOutputChannel implements Http1StreamChannel<HttpResponse> {
467 
468         private final Http1StreamChannel<HttpResponse> channel;
469 
470         private volatile boolean direct;
471         private volatile HttpResponse delayedResponse;
472         private volatile boolean completed;
473 
474         private DelayedOutputChannel(final Http1StreamChannel<HttpResponse> channel) {
475             this.channel = channel;
476         }
477 
478         @Override
479         public void close() {
480             channel.close();
481         }
482 
483         @Override
484         public void submit(
485                 final HttpResponse response,
486                 final boolean endStream,
487                 final FlushMode flushMode) throws HttpException, IOException {
488             synchronized (this) {
489                 if (direct) {
490                     channel.submit(response, endStream, flushMode);
491                 } else {
492                     delayedResponse = response;
493                     completed = endStream;
494                 }
495             }
496         }
497 
498         @Override
499         public void suspendOutput() throws IOException {
500             channel.suspendOutput();
501         }
502 
503         @Override
504         public void requestOutput() {
505             channel.requestOutput();
506         }
507 
508         @Override
509         public Timeout getSocketTimeout() {
510             return channel.getSocketTimeout();
511         }
512 
513         @Override
514         public void setSocketTimeout(final Timeout timeout) {
515             channel.setSocketTimeout(timeout);
516         }
517 
518         @Override
519         public int write(final ByteBuffer src) throws IOException {
520             synchronized (this) {
521                 return direct ? channel.write(src) : 0;
522             }
523         }
524 
525         @Override
526         public void complete(final List<? extends Header> trailers) throws IOException {
527             synchronized (this) {
528                 if (direct) {
529                     channel.complete(trailers);
530                 } else {
531                     completed = true;
532                 }
533             }
534         }
535 
536         @Override
537         public boolean abortGracefully() throws IOException {
538             synchronized (this) {
539                 if (direct) {
540                     return channel.abortGracefully();
541                 }
542                 completed = true;
543                 return true;
544             }
545         }
546 
547         @Override
548         public boolean isCompleted() {
549             synchronized (this) {
550                 return direct ? channel.isCompleted() : completed;
551             }
552         }
553 
554         @Override
555         public void activate() throws IOException, HttpException {
556             synchronized (this) {
557                 direct = true;
558                 if (delayedResponse != null) {
559                     channel.submit(delayedResponse, completed, completed ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
560                     delayedResponse = null;
561                 }
562             }
563         }
564 
565     }
566 
567 }