View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.ConnectionReuseStrategy;
35  import org.apache.hc.core5.http.EntityDetails;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.HeaderElements;
38  import org.apache.hc.core5.http.HttpException;
39  import org.apache.hc.core5.http.HttpHeaders;
40  import org.apache.hc.core5.http.HttpRequest;
41  import org.apache.hc.core5.http.HttpResponse;
42  import org.apache.hc.core5.http.HttpStatus;
43  import org.apache.hc.core5.http.HttpVersion;
44  import org.apache.hc.core5.http.Method;
45  import org.apache.hc.core5.http.MisdirectedRequestException;
46  import org.apache.hc.core5.http.ProtocolException;
47  import org.apache.hc.core5.http.ProtocolVersion;
48  import org.apache.hc.core5.http.UnsupportedHttpVersionException;
49  import org.apache.hc.core5.http.config.Http1Config;
50  import org.apache.hc.core5.http.impl.ServerSupport;
51  import org.apache.hc.core5.http.message.BasicHttpResponse;
52  import org.apache.hc.core5.http.nio.AsyncPushProducer;
53  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
54  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
55  import org.apache.hc.core5.http.nio.CapacityChannel;
56  import org.apache.hc.core5.http.nio.DataStreamChannel;
57  import org.apache.hc.core5.http.nio.HandlerFactory;
58  import org.apache.hc.core5.http.nio.ResourceHolder;
59  import org.apache.hc.core5.http.nio.ResponseChannel;
60  import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
61  import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
62  import org.apache.hc.core5.http.protocol.HttpContext;
63  import org.apache.hc.core5.http.protocol.HttpCoreContext;
64  import org.apache.hc.core5.http.protocol.HttpProcessor;
65  
66  class ServerHttp1StreamHandler implements ResourceHolder {
67  
68      private final Http1StreamChannel<HttpResponse> outputChannel;
69      private final DataStreamChannel internalDataChannel;
70      private final ResponseChannel responseChannel;
71      private final HttpProcessor httpProcessor;
72      private final Http1Config http1Config;
73      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
74      private final ConnectionReuseStrategy connectionReuseStrategy;
75      private final HttpCoreContext context;
76      private final AtomicBoolean responseCommitted;
77      private final AtomicBoolean done;
78  
79      private volatile boolean keepAlive;
80      private volatile AsyncServerExchangeHandler exchangeHandler;
81      private volatile HttpRequest receivedRequest;
82      private volatile MessageState requestState;
83      private volatile MessageState responseState;
84  
85      ServerHttp1StreamHandler(
86              final Http1StreamChannel<HttpResponse> outputChannel,
87              final HttpProcessor httpProcessor,
88              final Http1Config http1Config,
89              final ConnectionReuseStrategy connectionReuseStrategy,
90              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
91              final HttpCoreContext context) {
92          this.outputChannel = outputChannel;
93          this.internalDataChannel = new DataStreamChannel() {
94  
95              @Override
96              public void requestOutput() {
97                  outputChannel.requestOutput();
98              }
99  
100             @Override
101             public void endStream(final List<? extends Header> trailers) throws IOException {
102                 outputChannel.complete(trailers);
103                 if (!keepAlive) {
104                     outputChannel.close();
105                 }
106                 responseState = MessageState.COMPLETE;
107             }
108 
109             @Override
110             public int write(final ByteBuffer src) throws IOException {
111                 return outputChannel.write(src);
112             }
113 
114             @Override
115             public void endStream() throws IOException {
116                 endStream(null);
117             }
118 
119         };
120 
121         this.responseChannel = new ResponseChannel() {
122 
123             @Override
124             public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
125                 commitInformation(response);
126             }
127 
128             @Override
129             public void sendResponse(
130                     final HttpResponse response, final EntityDetails responseEntityDetails, final HttpContext httpContext) throws HttpException, IOException {
131                 commitResponse(response, responseEntityDetails);
132             }
133 
134             @Override
135             public void pushPromise(
136                     final HttpRequest promise, final AsyncPushProducer pushProducer, final HttpContext httpContext) throws HttpException, IOException {
137                 commitPromise();
138             }
139 
140             @Override
141             public String toString() {
142                 return super.toString() + " " + ServerHttp1StreamHandler.this;
143             }
144 
145         };
146 
147         this.httpProcessor = httpProcessor;
148         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
149         this.connectionReuseStrategy = connectionReuseStrategy;
150         this.exchangeHandlerFactory = exchangeHandlerFactory;
151         this.context = context;
152         this.responseCommitted = new AtomicBoolean(false);
153         this.done = new AtomicBoolean(false);
154         this.keepAlive = true;
155         this.requestState = MessageState.HEADERS;
156         this.responseState = MessageState.IDLE;
157     }
158 
159     private void commitResponse(
160             final HttpResponse response,
161             final EntityDetails responseEntityDetails) throws HttpException, IOException {
162         if (responseCommitted.compareAndSet(false, true)) {
163 
164             final ProtocolVersion transportVersion = response.getVersion();
165             if (transportVersion != null) {
166                 if (!transportVersion.lessEquals(http1Config.getVersion())) {
167                     throw new UnsupportedHttpVersionException(transportVersion);
168                 }
169                 context.setProtocolVersion(transportVersion);
170             }
171 
172             final int status = response.getCode();
173             if (status < HttpStatus.SC_SUCCESS) {
174                 throw new HttpException("Invalid response: " + status);
175             }
176 
177             context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
178             httpProcessor.process(response, responseEntityDetails, context);
179 
180             final boolean endStream = responseEntityDetails == null ||
181                     (receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod()));
182 
183             if (!connectionReuseStrategy.keepAlive(receivedRequest, response, context)) {
184                 keepAlive = false;
185             }
186 
187             outputChannel.submit(response, endStream, endStream ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
188             if (endStream) {
189                 if (!keepAlive) {
190                     outputChannel.close();
191                 }
192                 responseState = MessageState.COMPLETE;
193             } else {
194                 responseState = MessageState.BODY;
195                 exchangeHandler.produce(internalDataChannel);
196             }
197         } else {
198             throw new HttpException("Response already committed");
199         }
200     }
201 
202     private void commitInformation(final HttpResponse response) throws IOException, HttpException {
203         if (responseCommitted.get()) {
204             throw new HttpException("Response already committed");
205         }
206         final int status = response.getCode();
207         if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
208             throw new HttpException("Invalid intermediate response: " + status);
209         }
210         outputChannel.submit(response, true, FlushMode.IMMEDIATE);
211     }
212 
213     private void commitPromise() throws HttpException {
214         throw new HttpException("HTTP/1.1 does not support server push");
215     }
216 
217     void activateChannel() throws IOException, HttpException {
218         outputChannel.activate();
219     }
220 
221     boolean isResponseFinal() {
222         return responseState == MessageState.COMPLETE;
223     }
224 
225     boolean keepAlive() {
226         return keepAlive;
227     }
228 
229     boolean isCompleted() {
230         return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
231     }
232 
233     void terminateExchange(final HttpException ex) throws HttpException, IOException {
234         if (done.get() || requestState != MessageState.HEADERS) {
235             throw new ProtocolException("Unexpected message head");
236         }
237         receivedRequest = null;
238         requestState = MessageState.COMPLETE;
239         final HttpResponse response = new BasicHttpResponse(ServerSupport.toStatusCode(ex));
240         response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
241         final AsyncResponseProducer responseProducer = new BasicResponseProducer(response, ServerSupport.toErrorMessage(ex));
242         exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
243         exchangeHandler.handleRequest(null, null, responseChannel, context);
244     }
245 
246     void consumeHeader(final HttpRequest request, final EntityDetails requestEntityDetails) throws HttpException, IOException {
247         if (done.get() || requestState != MessageState.HEADERS) {
248             throw new ProtocolException("Unexpected message head");
249         }
250         receivedRequest = request;
251         requestState = requestEntityDetails == null ? MessageState.COMPLETE : MessageState.BODY;
252 
253         AsyncServerExchangeHandler handler;
254         try {
255             handler = exchangeHandlerFactory.create(request, context);
256         } catch (final MisdirectedRequestException ex) {
257             handler =  new ImmediateResponseExchangeHandler(HttpStatus.SC_MISDIRECTED_REQUEST, ex.getMessage());
258         } catch (final HttpException ex) {
259             handler =  new ImmediateResponseExchangeHandler(HttpStatus.SC_INTERNAL_SERVER_ERROR, ex.getMessage());
260         }
261         if (handler == null) {
262             handler = new ImmediateResponseExchangeHandler(HttpStatus.SC_NOT_FOUND, "Cannot handle request");
263         }
264 
265         exchangeHandler = handler;
266 
267         final ProtocolVersion transportVersion = request.getVersion();
268         if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
269             throw new UnsupportedHttpVersionException(transportVersion);
270         }
271         context.setProtocolVersion(transportVersion != null ? transportVersion : http1Config.getVersion());
272         context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
273 
274         try {
275             httpProcessor.process(request, requestEntityDetails, context);
276             exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
277         } catch (final HttpException ex) {
278             if (!responseCommitted.get()) {
279                 final HttpResponse response = new BasicHttpResponse(ServerSupport.toStatusCode(ex));
280                 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
281                 final AsyncResponseProducer responseProducer = new BasicResponseProducer(response, ServerSupport.toErrorMessage(ex));
282                 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
283                 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
284             } else {
285                 throw ex;
286             }
287         }
288 
289     }
290 
291     boolean isOutputReady() {
292         switch (responseState) {
293             case BODY:
294                 return exchangeHandler.available() > 0;
295             default:
296                 return false;
297         }
298     }
299 
300     void produceOutput() throws HttpException, IOException {
301         switch (responseState) {
302             case BODY:
303                 exchangeHandler.produce(internalDataChannel);
304                 break;
305         }
306     }
307 
308     void consumeData(final ByteBuffer src) throws HttpException, IOException {
309         if (done.get() || requestState != MessageState.BODY) {
310             throw new ProtocolException("Unexpected message data");
311         }
312         if (responseState == MessageState.ACK) {
313             outputChannel.requestOutput();
314         }
315         exchangeHandler.consume(src);
316     }
317 
318     void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
319         exchangeHandler.updateCapacity(capacityChannel);
320     }
321 
322     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
323         if (done.get() || requestState != MessageState.BODY) {
324             throw new ProtocolException("Unexpected message data");
325         }
326         requestState = MessageState.COMPLETE;
327         exchangeHandler.streamEnd(trailers);
328     }
329 
330     void failed(final Exception cause) {
331         if (!done.get()) {
332             exchangeHandler.failed(cause);
333         }
334     }
335 
336     @Override
337     public void releaseResources() {
338         if (done.compareAndSet(false, true)) {
339             requestState = MessageState.COMPLETE;
340             responseState = MessageState.COMPLETE;
341             exchangeHandler.releaseResources();
342         }
343     }
344 
345     void appendState(final StringBuilder buf) {
346         buf.append("requestState=").append(requestState)
347                 .append(", responseState=").append(responseState)
348                 .append(", responseCommitted=").append(responseCommitted)
349                 .append(", keepAlive=").append(keepAlive)
350                 .append(", done=").append(done);
351     }
352 
353     @Override
354     public String toString() {
355         final StringBuilder buf = new StringBuilder();
356         buf.append("[");
357         appendState(buf);
358         buf.append("]");
359         return buf.toString();
360     }
361 
362 }
363