View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.locks.ReentrantLock;
38  
39  import org.apache.hc.core5.annotation.Internal;
40  import org.apache.hc.core5.http.ConnectionClosedException;
41  import org.apache.hc.core5.http.ConnectionReuseStrategy;
42  import org.apache.hc.core5.http.ContentLengthStrategy;
43  import org.apache.hc.core5.http.EntityDetails;
44  import org.apache.hc.core5.http.Header;
45  import org.apache.hc.core5.http.HttpException;
46  import org.apache.hc.core5.http.HttpRequest;
47  import org.apache.hc.core5.http.HttpResponse;
48  import org.apache.hc.core5.http.HttpStatus;
49  import org.apache.hc.core5.http.URIScheme;
50  import org.apache.hc.core5.http.config.CharCodingConfig;
51  import org.apache.hc.core5.http.config.Http1Config;
52  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
53  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
54  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
55  import org.apache.hc.core5.http.impl.Http1StreamListener;
56  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
57  import org.apache.hc.core5.http.nio.CapacityChannel;
58  import org.apache.hc.core5.http.nio.ContentDecoder;
59  import org.apache.hc.core5.http.nio.ContentEncoder;
60  import org.apache.hc.core5.http.nio.HandlerFactory;
61  import org.apache.hc.core5.http.nio.NHttpMessageParser;
62  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
63  import org.apache.hc.core5.http.nio.SessionInputBuffer;
64  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
65  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
66  import org.apache.hc.core5.http.protocol.HttpCoreContext;
67  import org.apache.hc.core5.http.protocol.HttpProcessor;
68  import org.apache.hc.core5.io.CloseMode;
69  import org.apache.hc.core5.reactor.ProtocolIOSession;
70  import org.apache.hc.core5.util.Args;
71  import org.apache.hc.core5.util.Asserts;
72  import org.apache.hc.core5.util.Timeout;
73  
74  /**
75   * I/O event handler for events fired by {@link ProtocolIOSession} that implements
76   * server side HTTP/1.1 messaging protocol with full support for
77   * duplexed message transmission and message pipelining.
78   *
79   * @since 5.0
80   */
81  @Internal
82  public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpRequest, HttpResponse> {
83  
84      private final String scheme;
85      private final HttpProcessor httpProcessor;
86      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
87      private final Http1Config http1Config;
88      private final ConnectionReuseStrategy connectionReuseStrategy;
89      private final Http1StreamListener streamListener;
90      private final Queue<ServerHttp1StreamHandler> pipeline;
91      private final Http1StreamChannel<HttpResponse> outputChannel;
92  
93      private volatile ServerHttp1StreamHandler outgoing;
94      private volatile ServerHttp1StreamHandler incoming;
95  
96      public ServerHttp1StreamDuplexer(
97              final ProtocolIOSession ioSession,
98              final HttpProcessor httpProcessor,
99              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
100             final String scheme,
101             final Http1Config http1Config,
102             final CharCodingConfig charCodingConfig,
103             final ConnectionReuseStrategy connectionReuseStrategy,
104             final NHttpMessageParser<HttpRequest> incomingMessageParser,
105             final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
106             final ContentLengthStrategy incomingContentStrategy,
107             final ContentLengthStrategy outgoingContentStrategy,
108             final Http1StreamListener streamListener) {
109         super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter,
110                 incomingContentStrategy, outgoingContentStrategy);
111         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
112         this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
113         this.scheme = scheme != null ? scheme : URIScheme.HTTP.getId();
114         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
115         this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
116                 DefaultConnectionReuseStrategy.INSTANCE;
117         this.streamListener = streamListener;
118         this.pipeline = new ConcurrentLinkedQueue<>();
119         this.outputChannel = new Http1StreamChannel<HttpResponse>() {
120 
121             @Override
122             public void close() {
123                 ServerHttp1StreamDuplexer.this.close(CloseMode.GRACEFUL);
124             }
125 
126             @Override
127             public void submit(
128                     final HttpResponse response,
129                     final boolean endStream,
130                     final FlushMode flushMode) throws HttpException, IOException {
131                 if (streamListener != null) {
132                     streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
133                 }
134                 commitMessageHead(response, endStream, flushMode);
135             }
136 
137             @Override
138             public void requestOutput() {
139                 requestSessionOutput();
140             }
141 
142             @Override
143             public void suspendOutput() throws IOException {
144                 suspendSessionOutput();
145             }
146 
147             @Override
148             public Timeout getSocketTimeout() {
149                 return getSessionTimeout();
150             }
151 
152             @Override
153             public void setSocketTimeout(final Timeout timeout) {
154                 setSessionTimeout(timeout);
155             }
156 
157             @Override
158             public int write(final ByteBuffer src) throws IOException {
159                 return streamOutput(src);
160             }
161 
162             @Override
163             public void complete(final List<? extends Header> trailers) throws IOException {
164                 endOutputStream(trailers);
165             }
166 
167             @Override
168             public boolean isCompleted() {
169                 return isOutputCompleted();
170             }
171 
172             @Override
173             public boolean abortGracefully() throws IOException {
174                 final MessageDelineation messageDelineation = endOutputStream(null);
175                 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
176             }
177 
178             @Override
179             public void activate() throws HttpException, IOException {
180                 // empty
181             }
182 
183             @Override
184             public String toString() {
185                 return "Http1StreamChannel[" + ServerHttp1StreamDuplexer.this + "]";
186             }
187 
188         };
189     }
190 
191     @Override
192     void terminate(final Exception exception) {
193         if (incoming != null) {
194             incoming.failed(exception);
195             incoming.releaseResources();
196             incoming = null;
197         }
198         if (outgoing != null) {
199             outgoing.failed(exception);
200             outgoing.releaseResources();
201             outgoing = null;
202         }
203         for (;;) {
204             final ServerHttp1StreamHandler handler = pipeline.poll();
205             if (handler != null) {
206                 handler.failed(exception);
207                 handler.releaseResources();
208             } else {
209                 break;
210             }
211         }
212     }
213 
214     @Override
215     void disconnected() {
216         if (incoming != null) {
217             if (!incoming.isCompleted()) {
218                 incoming.failed(new ConnectionClosedException());
219             }
220             incoming.releaseResources();
221             incoming = null;
222         }
223         if (outgoing != null) {
224             if (!outgoing.isCompleted()) {
225                 outgoing.failed(new ConnectionClosedException());
226             }
227             outgoing.releaseResources();
228             outgoing = null;
229         }
230         for (;;) {
231             final ServerHttp1StreamHandler handler = pipeline.poll();
232             if (handler != null) {
233                 handler.failed(new ConnectionClosedException());
234                 handler.releaseResources();
235             } else {
236                 break;
237             }
238         }
239     }
240 
241     @Override
242     void updateInputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
243         connMetrics.incrementRequestCount();
244     }
245 
246     @Override
247     void updateOutputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
248         if (response.getCode() >= HttpStatus.SC_OK) {
249             connMetrics.incrementResponseCount();
250         }
251     }
252 
253     @Override
254     protected boolean handleIncomingMessage(final HttpRequest request) throws HttpException {
255         return true;
256     }
257 
258     @Override
259     protected ContentDecoder createContentDecoder(
260             final long len,
261             final ReadableByteChannel channel,
262             final SessionInputBuffer buffer,
263             final BasicHttpTransportMetrics metrics) throws HttpException {
264         if (len >= 0) {
265             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
266         } else if (len == ContentLengthStrategy.CHUNKED) {
267             return new ChunkDecoder(channel, buffer, http1Config, metrics);
268         } else {
269             return null;
270         }
271     }
272 
273     @Override
274     protected boolean handleOutgoingMessage(final HttpResponse response) throws HttpException {
275         return true;
276     }
277 
278     @Override
279     protected ContentEncoder createContentEncoder(
280             final long len,
281             final WritableByteChannel channel,
282             final SessionOutputBuffer buffer,
283             final BasicHttpTransportMetrics metrics) throws HttpException {
284         final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
285         if (len >= 0) {
286             return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
287         } else if (len == ContentLengthStrategy.CHUNKED) {
288             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
289         } else {
290             return new IdentityEncoder(channel, buffer, metrics, chunkSizeHint);
291         }
292     }
293 
294     @Override
295     boolean inputIdle() {
296         return incoming == null;
297     }
298 
299     @Override
300     boolean outputIdle() {
301         return outgoing == null && pipeline.isEmpty();
302     }
303 
304     @Override
305     HttpRequest parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
306         try {
307             return super.parseMessageHead(endOfStream);
308         } catch (final HttpException ex) {
309             terminateExchange(ex);
310             return null;
311         }
312     }
313 
314     void terminateExchange(final HttpException ex) throws HttpException, IOException {
315         suspendSessionInput();
316         final ServerHttp1StreamHandler streamHandler;
317         final HttpCoreContext context = HttpCoreContext.create();
318         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
319         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
320         if (outgoing == null) {
321             streamHandler = new ServerHttp1StreamHandler(
322                     outputChannel,
323                     httpProcessor,
324                     http1Config,
325                     connectionReuseStrategy,
326                     exchangeHandlerFactory,
327                     context);
328             outgoing = streamHandler;
329         } else {
330             streamHandler = new ServerHttp1StreamHandler(
331                     new DelayedOutputChannel(outputChannel),
332                     httpProcessor,
333                     http1Config,
334                     connectionReuseStrategy,
335                     exchangeHandlerFactory,
336                     context);
337             pipeline.add(streamHandler);
338         }
339         streamHandler.terminateExchange(ex);
340         incoming = null;
341     }
342 
343     @Override
344     void consumeHeader(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
345         if (streamListener != null) {
346             streamListener.onRequestHead(this, request);
347         }
348         final ServerHttp1StreamHandler streamHandler;
349         final HttpCoreContext context = HttpCoreContext.create();
350         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
351         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
352         if (outgoing == null) {
353             streamHandler = new ServerHttp1StreamHandler(
354                     outputChannel,
355                     httpProcessor,
356                     http1Config,
357                     connectionReuseStrategy,
358                     exchangeHandlerFactory,
359                     context);
360             outgoing = streamHandler;
361         } else {
362             streamHandler = new ServerHttp1StreamHandler(
363                     new DelayedOutputChannel(outputChannel),
364                     httpProcessor,
365                     http1Config,
366                     connectionReuseStrategy,
367                     exchangeHandlerFactory,
368                     context);
369             pipeline.add(streamHandler);
370         }
371         request.setScheme(scheme);
372         streamHandler.consumeHeader(request, entityDetails);
373         incoming = streamHandler;
374     }
375 
376     @Override
377     void consumeData(final ByteBuffer src) throws HttpException, IOException {
378         Asserts.notNull(incoming, "Request stream handler");
379         incoming.consumeData(src);
380     }
381 
382     @Override
383     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
384         Asserts.notNull(incoming, "Request stream handler");
385         incoming.updateCapacity(capacityChannel);
386     }
387 
388     @Override
389     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
390         Asserts.notNull(incoming, "Request stream handler");
391         incoming.dataEnd(trailers);
392     }
393 
394     @Override
395     void inputEnd() throws HttpException, IOException {
396         if (incoming != null) {
397             if (incoming.isCompleted()) {
398                 incoming.releaseResources();
399             }
400             incoming = null;
401         }
402         if (isShuttingDown() && outputIdle() && inputIdle()) {
403             shutdownSession(CloseMode.IMMEDIATE);
404         }
405     }
406 
407     @Override
408     void execute(final RequestExecutionCommand executionCommand) throws HttpException {
409         throw new HttpException("Illegal command: " + executionCommand.getClass());
410     }
411 
412     @Override
413     boolean isOutputReady() {
414         return outgoing != null && outgoing.isOutputReady();
415     }
416 
417     @Override
418     void produceOutput() throws HttpException, IOException {
419         if (outgoing != null) {
420             outgoing.produceOutput();
421         }
422     }
423 
424     @Override
425     void outputEnd() throws HttpException, IOException {
426         if (outgoing != null && outgoing.isResponseFinal()) {
427             if (streamListener != null) {
428                 streamListener.onExchangeComplete(this, outgoing.keepAlive());
429             }
430             if (outgoing.isCompleted()) {
431                 outgoing.releaseResources();
432             }
433             outgoing = null;
434         }
435         if (outgoing == null && isActive()) {
436             final ServerHttp1StreamHandler handler = pipeline.poll();
437             if (handler != null) {
438                 outgoing = handler;
439                 handler.activateChannel();
440                 if (handler.isOutputReady()) {
441                     handler.produceOutput();
442                 }
443             }
444         }
445         if (isShuttingDown() && outputIdle() && inputIdle()) {
446             shutdownSession(CloseMode.IMMEDIATE);
447         }
448     }
449 
450     @Override
451     boolean handleTimeout() {
452         return false;
453     }
454 
455     @Override
456     void appendState(final StringBuilder buf) {
457         super.appendState(buf);
458         buf.append(", incoming=[");
459         if (incoming != null) {
460             incoming.appendState(buf);
461         }
462         buf.append("], outgoing=[");
463         if (outgoing != null) {
464             outgoing.appendState(buf);
465         }
466         buf.append("], pipeline=");
467         buf.append(pipeline.size());
468     }
469 
470     @Override
471     public String toString() {
472         final StringBuilder buf = new StringBuilder();
473         buf.append("[");
474         appendState(buf);
475         buf.append("]");
476         return buf.toString();
477     }
478 
479     private static class DelayedOutputChannel implements Http1StreamChannel<HttpResponse> {
480 
481         private final Http1StreamChannel<HttpResponse> channel;
482 
483         private volatile boolean direct;
484         private volatile HttpResponse delayedResponse;
485         private volatile boolean completed;
486         private final ReentrantLock lock = new ReentrantLock();
487 
488         private DelayedOutputChannel(final Http1StreamChannel<HttpResponse> channel) {
489             this.channel = channel;
490         }
491 
492         @Override
493         public void close() {
494             channel.close();
495         }
496 
497         @Override
498         public void submit(
499                 final HttpResponse response,
500                 final boolean endStream,
501                 final FlushMode flushMode) throws HttpException, IOException {
502             lock.lock();
503             try {
504                 if (direct) {
505                     channel.submit(response, endStream, flushMode);
506                 } else {
507                     delayedResponse = response;
508                     completed = endStream;
509                 }
510             } finally {
511                 lock.unlock();
512             }
513         }
514 
515         @Override
516         public void suspendOutput() throws IOException {
517             channel.suspendOutput();
518         }
519 
520         @Override
521         public void requestOutput() {
522             channel.requestOutput();
523         }
524 
525         @Override
526         public Timeout getSocketTimeout() {
527             return channel.getSocketTimeout();
528         }
529 
530         @Override
531         public void setSocketTimeout(final Timeout timeout) {
532             channel.setSocketTimeout(timeout);
533         }
534 
535         @Override
536         public int write(final ByteBuffer src) throws IOException {
537             lock.lock();
538             try {
539                 return direct ? channel.write(src) : 0;
540             } finally {
541                 lock.unlock();
542             }
543         }
544 
545         @Override
546         public void complete(final List<? extends Header> trailers) throws IOException {
547             lock.lock();
548             try {
549                 if (direct) {
550                     channel.complete(trailers);
551                 } else {
552                     completed = true;
553                 }
554             } finally {
555                 lock.unlock();
556             }
557         }
558 
559         @Override
560         public boolean abortGracefully() throws IOException {
561            lock.lock();
562             try {
563                 if (direct) {
564                     return channel.abortGracefully();
565                 }
566                 completed = true;
567                 return true;
568             } finally {
569                 lock.unlock();
570             }
571         }
572 
573         @Override
574         public boolean isCompleted() {
575            lock.lock();
576             try {
577                 return direct ? channel.isCompleted() : completed;
578             } finally {
579                 lock.unlock();
580             }
581         }
582 
583         @Override
584         public void activate() throws IOException, HttpException {
585            lock.lock();
586             try {
587                 direct = true;
588                 if (delayedResponse != null) {
589                     channel.submit(delayedResponse, completed, completed ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
590                     delayedResponse = null;
591                 }
592             } finally {
593                 lock.unlock();
594             }
595         }
596 
597     }
598 
599 }