View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.EntityDetails;
35  import org.apache.hc.core5.http.Header;
36  import org.apache.hc.core5.http.HttpException;
37  import org.apache.hc.core5.http.HttpRequest;
38  import org.apache.hc.core5.http.HttpResponse;
39  import org.apache.hc.core5.http.HttpStatus;
40  import org.apache.hc.core5.http.HttpVersion;
41  import org.apache.hc.core5.http.Method;
42  import org.apache.hc.core5.http.ProtocolException;
43  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
44  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
45  import org.apache.hc.core5.http.impl.ServerSupport;
46  import org.apache.hc.core5.http.impl.nio.MessageState;
47  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
48  import org.apache.hc.core5.http.nio.AsyncPushProducer;
49  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
50  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
51  import org.apache.hc.core5.http.nio.DataStreamChannel;
52  import org.apache.hc.core5.http.nio.HandlerFactory;
53  import org.apache.hc.core5.http.nio.ResponseChannel;
54  import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
55  import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
56  import org.apache.hc.core5.http.protocol.HttpContext;
57  import org.apache.hc.core5.http.protocol.HttpCoreContext;
58  import org.apache.hc.core5.http.protocol.HttpProcessor;
59  import org.apache.hc.core5.http2.H2ConnectionException;
60  import org.apache.hc.core5.http2.H2Error;
61  import org.apache.hc.core5.http2.H2StreamResetException;
62  import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
63  import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
64  import org.apache.hc.core5.util.Asserts;
65  
66  class ServerH2StreamHandler implements H2StreamHandler {
67  
68      private final H2StreamChannel outputChannel;
69      private final DataStreamChannel dataChannel;
70      private final ResponseChannel responseChannel;
71      private final HttpProcessor httpProcessor;
72      private final BasicHttpConnectionMetrics connMetrics;
73      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
74      private final HttpCoreContext context;
75      private final AtomicBoolean responseCommitted;
76      private final AtomicBoolean failed;
77      private final AtomicBoolean done;
78  
79      private volatile AsyncServerExchangeHandler exchangeHandler;
80      private volatile HttpRequest receivedRequest;
81      private volatile MessageState requestState;
82      private volatile MessageState responseState;
83  
84      ServerH2StreamHandler(
85              final H2StreamChannel outputChannel,
86              final HttpProcessor httpProcessor,
87              final BasicHttpConnectionMetrics connMetrics,
88              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
89              final HttpCoreContext context) {
90          this.outputChannel = outputChannel;
91          this.dataChannel = new DataStreamChannel() {
92  
93              @Override
94              public void requestOutput() {
95                  outputChannel.requestOutput();
96              }
97  
98              @Override
99              public int write(final ByteBuffer src) throws IOException {
100                 return outputChannel.write(src);
101             }
102 
103             @Override
104             public void endStream(final List<? extends Header> trailers) throws IOException {
105                 outputChannel.endStream(trailers);
106                 responseState = MessageState.COMPLETE;
107             }
108 
109             @Override
110             public void endStream() throws IOException {
111                 outputChannel.endStream();
112                 responseState = MessageState.COMPLETE;
113             }
114 
115         };
116         this.responseChannel = new ResponseChannel() {
117 
118             @Override
119             public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
120                 commitInformation(response);
121             }
122 
123             @Override
124             public void sendResponse(
125                     final HttpResponse response, final EntityDetails responseEntityDetails, final HttpContext httpContext) throws HttpException, IOException {
126                 commitResponse(response, responseEntityDetails);
127             }
128 
129             @Override
130             public void pushPromise(
131                     final HttpRequest promise, final AsyncPushProducer pushProducer, final HttpContext httpContext) throws HttpException, IOException {
132                 commitPromise(promise, pushProducer);
133             }
134 
135         };
136         this.httpProcessor = httpProcessor;
137         this.connMetrics = connMetrics;
138         this.exchangeHandlerFactory = exchangeHandlerFactory;
139         this.context = context;
140         this.responseCommitted = new AtomicBoolean(false);
141         this.failed = new AtomicBoolean(false);
142         this.done = new AtomicBoolean(false);
143         this.requestState = MessageState.HEADERS;
144         this.responseState = MessageState.IDLE;
145     }
146 
147     @Override
148     public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
149         return null;
150     }
151 
152     private void commitInformation(final HttpResponse response) throws IOException, HttpException {
153         if (responseCommitted.get()) {
154             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
155         }
156         final int status = response.getCode();
157         if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
158             throw new HttpException("Invalid intermediate response: " + status);
159         }
160         final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
161         outputChannel.submit(responseHeaders, false);
162     }
163 
164     private void commitResponse(
165             final HttpResponse response,
166             final EntityDetails responseEntityDetails) throws HttpException, IOException {
167         if (responseCommitted.compareAndSet(false, true)) {
168 
169             final int status = response.getCode();
170             if (status < HttpStatus.SC_SUCCESS) {
171                 throw new HttpException("Invalid response: " + status);
172             }
173             context.setResponse(response);
174             httpProcessor.process(response, responseEntityDetails, context);
175 
176             final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
177 
178             final boolean endStream = responseEntityDetails == null ||
179                     (receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod()));
180             outputChannel.submit(responseHeaders, endStream);
181             connMetrics.incrementResponseCount();
182             if (responseEntityDetails == null) {
183                 responseState = MessageState.COMPLETE;
184             } else {
185                 responseState = MessageState.BODY;
186                 exchangeHandler.produce(outputChannel);
187             }
188         } else {
189             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
190         }
191     }
192 
193     private void commitPromise(
194             final HttpRequest promise,
195             final AsyncPushProducer pushProducer) throws HttpException, IOException {
196 
197         httpProcessor.process(promise, null, context);
198 
199         final List<Header> promiseHeaders = DefaultH2RequestConverter.INSTANCE.convert(promise);
200         outputChannel.push(promiseHeaders, pushProducer);
201         connMetrics.incrementRequestCount();
202     }
203 
204     @Override
205     public void consumePromise(final List<Header> headers) throws HttpException, IOException {
206         throw new ProtocolException("Unexpected message promise");
207     }
208 
209     @Override
210     public void consumeHeader(final List<Header> headers, final boolean endStream) throws HttpException, IOException {
211         if (done.get()) {
212             throw new ProtocolException("Unexpected message headers");
213         }
214         switch (requestState) {
215             case HEADERS:
216                 requestState = endStream ? MessageState.COMPLETE : MessageState.BODY;
217 
218                 final HttpRequest request = DefaultH2RequestConverter.INSTANCE.convert(headers);
219                 final EntityDetails requestEntityDetails = endStream ? null : new IncomingEntityDetails(request, -1);
220 
221                 final AsyncServerExchangeHandler handler;
222                 try {
223                     handler = exchangeHandlerFactory != null ? exchangeHandlerFactory.create(request, context) : null;
224                 } catch (final ProtocolException ex) {
225                     throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, ex.getMessage());
226                 }
227                 if (handler == null) {
228                     throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused");
229                 }
230                 exchangeHandler = handler;
231 
232                 context.setProtocolVersion(HttpVersion.HTTP_2);
233                 context.setRequest(request);
234 
235                 try {
236                     httpProcessor.process(request, requestEntityDetails, context);
237                     connMetrics.incrementRequestCount();
238                     receivedRequest = request;
239 
240                     exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
241                 } catch (final HttpException ex) {
242                     if (!responseCommitted.get()) {
243                         final AsyncResponseProducer responseProducer = new BasicResponseProducer(
244                                 ServerSupport.toStatusCode(ex),
245                                 ServerSupport.toErrorMessage(ex));
246                         exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
247                         exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
248                     } else {
249                         throw ex;
250                     }
251                 }
252                 break;
253             case BODY:
254                 responseState = MessageState.COMPLETE;
255                 exchangeHandler.streamEnd(headers);
256                 break;
257             default:
258                 throw new ProtocolException("Unexpected message headers");
259         }
260     }
261 
262     @Override
263     public void updateInputCapacity() throws IOException {
264         Asserts.notNull(exchangeHandler, "Exchange handler");
265         exchangeHandler.updateCapacity(outputChannel);
266     }
267 
268     @Override
269     public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
270         if (done.get() || requestState != MessageState.BODY) {
271             throw new ProtocolException("Unexpected message data");
272         }
273         Asserts.notNull(exchangeHandler, "Exchange handler");
274         if (src != null) {
275             exchangeHandler.consume(src);
276         }
277         if (endStream) {
278             requestState = MessageState.COMPLETE;
279             exchangeHandler.streamEnd(null);
280         }
281     }
282 
283     @Override
284     public boolean isOutputReady() {
285         return responseState == MessageState.BODY && exchangeHandler != null && exchangeHandler.available() > 0;
286     }
287 
288     @Override
289     public void produceOutput() throws HttpException, IOException {
290         if (responseState == MessageState.BODY) {
291             Asserts.notNull(exchangeHandler, "Exchange handler");
292             exchangeHandler.produce(dataChannel);
293         }
294     }
295 
296     @Override
297     public void handle(final HttpException ex, final boolean endStream) throws HttpException, IOException {
298         if (done.get()) {
299             throw ex;
300         }
301         switch (requestState) {
302             case HEADERS:
303                 requestState = endStream ? MessageState.COMPLETE : MessageState.BODY;
304                 if (!responseCommitted.get()) {
305                     final AsyncResponseProducer responseProducer = new BasicResponseProducer(
306                             ServerSupport.toStatusCode(ex),
307                             ServerSupport.toErrorMessage(ex));
308                     exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
309                     exchangeHandler.handleRequest(null, null, responseChannel, context);
310                 } else {
311                     throw ex;
312                 }
313                 break;
314             case BODY:
315                 responseState = MessageState.COMPLETE;
316             default:
317                 throw ex;
318         }
319     }
320 
321     @Override
322     public void failed(final Exception cause) {
323         try {
324             if (failed.compareAndSet(false, true)) {
325                 if (exchangeHandler != null) {
326                     exchangeHandler.failed(cause);
327                 }
328             }
329         } finally {
330             releaseResources();
331         }
332     }
333 
334     @Override
335     public void releaseResources() {
336         if (done.compareAndSet(false, true)) {
337             requestState = MessageState.COMPLETE;
338             responseState = MessageState.COMPLETE;
339             if (exchangeHandler != null) {
340                 exchangeHandler.releaseResources();
341             }
342         }
343     }
344 
345     @Override
346     public String toString() {
347         return "[" +
348                 "requestState=" + requestState +
349                 ", responseState=" + responseState +
350                 ']';
351     }
352 
353 }
354