1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.ConnectionReuseStrategy;
35 import org.apache.hc.core5.http.EntityDetails;
36 import org.apache.hc.core5.http.Header;
37 import org.apache.hc.core5.http.HeaderElements;
38 import org.apache.hc.core5.http.HttpException;
39 import org.apache.hc.core5.http.HttpHeaders;
40 import org.apache.hc.core5.http.HttpRequest;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.HttpStatus;
43 import org.apache.hc.core5.http.HttpVersion;
44 import org.apache.hc.core5.http.Method;
45 import org.apache.hc.core5.http.MisdirectedRequestException;
46 import org.apache.hc.core5.http.ProtocolException;
47 import org.apache.hc.core5.http.ProtocolVersion;
48 import org.apache.hc.core5.http.UnsupportedHttpVersionException;
49 import org.apache.hc.core5.http.config.Http1Config;
50 import org.apache.hc.core5.http.impl.ServerSupport;
51 import org.apache.hc.core5.http.message.BasicHttpResponse;
52 import org.apache.hc.core5.http.nio.AsyncPushProducer;
53 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
54 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
55 import org.apache.hc.core5.http.nio.CapacityChannel;
56 import org.apache.hc.core5.http.nio.DataStreamChannel;
57 import org.apache.hc.core5.http.nio.HandlerFactory;
58 import org.apache.hc.core5.http.nio.ResourceHolder;
59 import org.apache.hc.core5.http.nio.ResponseChannel;
60 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
61 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
62 import org.apache.hc.core5.http.protocol.HttpContext;
63 import org.apache.hc.core5.http.protocol.HttpCoreContext;
64 import org.apache.hc.core5.http.protocol.HttpProcessor;
65
66 class ServerHttp1StreamHandler implements ResourceHolder {
67
68 private final Http1StreamChannel<HttpResponse> outputChannel;
69 private final DataStreamChannel internalDataChannel;
70 private final ResponseChannel responseChannel;
71 private final HttpProcessor httpProcessor;
72 private final Http1Config http1Config;
73 private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
74 private final ConnectionReuseStrategy connectionReuseStrategy;
75 private final HttpCoreContext context;
76 private final AtomicBoolean responseCommitted;
77 private final AtomicBoolean done;
78
79 private volatile boolean keepAlive;
80 private volatile AsyncServerExchangeHandler exchangeHandler;
81 private volatile HttpRequest receivedRequest;
82 private volatile MessageState requestState;
83 private volatile MessageState responseState;
84
85 ServerHttp1StreamHandler(
86 final Http1StreamChannel<HttpResponse> outputChannel,
87 final HttpProcessor httpProcessor,
88 final Http1Config http1Config,
89 final ConnectionReuseStrategy connectionReuseStrategy,
90 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
91 final HttpCoreContext context) {
92 this.outputChannel = outputChannel;
93 this.internalDataChannel = new DataStreamChannel() {
94
95 @Override
96 public void requestOutput() {
97 outputChannel.requestOutput();
98 }
99
100 @Override
101 public void endStream(final List<? extends Header> trailers) throws IOException {
102 outputChannel.complete(trailers);
103 if (!keepAlive) {
104 outputChannel.close();
105 }
106 responseState = MessageState.COMPLETE;
107 }
108
109 @Override
110 public int write(final ByteBuffer src) throws IOException {
111 return outputChannel.write(src);
112 }
113
114 @Override
115 public void endStream() throws IOException {
116 endStream(null);
117 }
118
119 };
120
121 this.responseChannel = new ResponseChannel() {
122
123 @Override
124 public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
125 commitInformation(response);
126 }
127
128 @Override
129 public void sendResponse(
130 final HttpResponse response, final EntityDetails responseEntityDetails, final HttpContext httpContext) throws HttpException, IOException {
131 commitResponse(response, responseEntityDetails);
132 }
133
134 @Override
135 public void pushPromise(
136 final HttpRequest promise, final AsyncPushProducer pushProducer, final HttpContext httpContext) throws HttpException, IOException {
137 commitPromise();
138 }
139
140 @Override
141 public String toString() {
142 return super.toString() + " " + ServerHttp1StreamHandler.this;
143 }
144
145 };
146
147 this.httpProcessor = httpProcessor;
148 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
149 this.connectionReuseStrategy = connectionReuseStrategy;
150 this.exchangeHandlerFactory = exchangeHandlerFactory;
151 this.context = context;
152 this.responseCommitted = new AtomicBoolean(false);
153 this.done = new AtomicBoolean(false);
154 this.keepAlive = true;
155 this.requestState = MessageState.HEADERS;
156 this.responseState = MessageState.IDLE;
157 }
158
159 private void commitResponse(
160 final HttpResponse response,
161 final EntityDetails responseEntityDetails) throws HttpException, IOException {
162 if (responseCommitted.compareAndSet(false, true)) {
163
164 final ProtocolVersion transportVersion = response.getVersion();
165 if (transportVersion != null) {
166 if (!transportVersion.lessEquals(http1Config.getVersion())) {
167 throw new UnsupportedHttpVersionException(transportVersion);
168 }
169 context.setProtocolVersion(transportVersion);
170 }
171
172 final int status = response.getCode();
173 if (status < HttpStatus.SC_SUCCESS) {
174 throw new HttpException("Invalid response: " + status);
175 }
176
177 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
178 httpProcessor.process(response, responseEntityDetails, context);
179
180 final boolean endStream = responseEntityDetails == null ||
181 (receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod()));
182
183 if (!connectionReuseStrategy.keepAlive(receivedRequest, response, context)) {
184 keepAlive = false;
185 }
186
187 outputChannel.submit(response, endStream, endStream ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
188 if (endStream) {
189 if (!keepAlive) {
190 outputChannel.close();
191 }
192 responseState = MessageState.COMPLETE;
193 } else {
194 responseState = MessageState.BODY;
195 exchangeHandler.produce(internalDataChannel);
196 }
197 } else {
198 throw new HttpException("Response already committed");
199 }
200 }
201
202 private void commitInformation(final HttpResponse response) throws IOException, HttpException {
203 if (responseCommitted.get()) {
204 throw new HttpException("Response already committed");
205 }
206 final int status = response.getCode();
207 if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
208 throw new HttpException("Invalid intermediate response: " + status);
209 }
210 outputChannel.submit(response, true, FlushMode.IMMEDIATE);
211 }
212
213 private void commitPromise() throws HttpException {
214 throw new HttpException("HTTP/1.1 does not support server push");
215 }
216
217 void activateChannel() throws IOException, HttpException {
218 outputChannel.activate();
219 }
220
221 boolean isResponseFinal() {
222 return responseState == MessageState.COMPLETE;
223 }
224
225 boolean keepAlive() {
226 return keepAlive;
227 }
228
229 boolean isCompleted() {
230 return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
231 }
232
233 void terminateExchange(final HttpException ex) throws HttpException, IOException {
234 if (done.get() || requestState != MessageState.HEADERS) {
235 throw new ProtocolException("Unexpected message head");
236 }
237 receivedRequest = null;
238 requestState = MessageState.COMPLETE;
239 final HttpResponse response = new BasicHttpResponse(ServerSupport.toStatusCode(ex));
240 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
241 final AsyncResponseProducer responseProducer = new BasicResponseProducer(response, ServerSupport.toErrorMessage(ex));
242 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
243 exchangeHandler.handleRequest(null, null, responseChannel, context);
244 }
245
246 void consumeHeader(final HttpRequest request, final EntityDetails requestEntityDetails) throws HttpException, IOException {
247 if (done.get() || requestState != MessageState.HEADERS) {
248 throw new ProtocolException("Unexpected message head");
249 }
250 receivedRequest = request;
251 requestState = requestEntityDetails == null ? MessageState.COMPLETE : MessageState.BODY;
252
253 AsyncServerExchangeHandler handler;
254 try {
255 handler = exchangeHandlerFactory.create(request, context);
256 } catch (final MisdirectedRequestException ex) {
257 handler = new ImmediateResponseExchangeHandler(HttpStatus.SC_MISDIRECTED_REQUEST, ex.getMessage());
258 } catch (final HttpException ex) {
259 handler = new ImmediateResponseExchangeHandler(HttpStatus.SC_INTERNAL_SERVER_ERROR, ex.getMessage());
260 }
261 if (handler == null) {
262 handler = new ImmediateResponseExchangeHandler(HttpStatus.SC_NOT_FOUND, "Cannot handle request");
263 }
264
265 exchangeHandler = handler;
266
267 final ProtocolVersion transportVersion = request.getVersion();
268 if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
269 throw new UnsupportedHttpVersionException(transportVersion);
270 }
271 context.setProtocolVersion(transportVersion != null ? transportVersion : http1Config.getVersion());
272 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
273
274 try {
275 httpProcessor.process(request, requestEntityDetails, context);
276 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
277 } catch (final HttpException ex) {
278 if (!responseCommitted.get()) {
279 final HttpResponse response = new BasicHttpResponse(ServerSupport.toStatusCode(ex));
280 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
281 final AsyncResponseProducer responseProducer = new BasicResponseProducer(response, ServerSupport.toErrorMessage(ex));
282 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
283 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
284 } else {
285 throw ex;
286 }
287 }
288
289 }
290
291 boolean isOutputReady() {
292 switch (responseState) {
293 case BODY:
294 return exchangeHandler.available() > 0;
295 default:
296 return false;
297 }
298 }
299
300 void produceOutput() throws HttpException, IOException {
301 switch (responseState) {
302 case BODY:
303 exchangeHandler.produce(internalDataChannel);
304 break;
305 }
306 }
307
308 void consumeData(final ByteBuffer src) throws HttpException, IOException {
309 if (done.get() || requestState != MessageState.BODY) {
310 throw new ProtocolException("Unexpected message data");
311 }
312 if (responseState == MessageState.ACK) {
313 outputChannel.requestOutput();
314 }
315 exchangeHandler.consume(src);
316 }
317
318 void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
319 exchangeHandler.updateCapacity(capacityChannel);
320 }
321
322 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
323 if (done.get() || requestState != MessageState.BODY) {
324 throw new ProtocolException("Unexpected message data");
325 }
326 requestState = MessageState.COMPLETE;
327 exchangeHandler.streamEnd(trailers);
328 }
329
330 void failed(final Exception cause) {
331 if (!done.get()) {
332 exchangeHandler.failed(cause);
333 }
334 }
335
336 @Override
337 public void releaseResources() {
338 if (done.compareAndSet(false, true)) {
339 requestState = MessageState.COMPLETE;
340 responseState = MessageState.COMPLETE;
341 exchangeHandler.releaseResources();
342 }
343 }
344
345 void appendState(final StringBuilder buf) {
346 buf.append("requestState=").append(requestState)
347 .append(", responseState=").append(responseState)
348 .append(", responseCommitted=").append(responseCommitted)
349 .append(", keepAlive=").append(keepAlive)
350 .append(", done=").append(done);
351 }
352
353 @Override
354 public String toString() {
355 final StringBuilder buf = new StringBuilder();
356 buf.append("[");
357 appendState(buf);
358 buf.append("]");
359 return buf.toString();
360 }
361
362 }
363