1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.EntityDetails;
35 import org.apache.hc.core5.http.Header;
36 import org.apache.hc.core5.http.HttpException;
37 import org.apache.hc.core5.http.HttpRequest;
38 import org.apache.hc.core5.http.HttpResponse;
39 import org.apache.hc.core5.http.HttpStatus;
40 import org.apache.hc.core5.http.HttpVersion;
41 import org.apache.hc.core5.http.Method;
42 import org.apache.hc.core5.http.ProtocolException;
43 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
44 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
45 import org.apache.hc.core5.http.impl.ServerSupport;
46 import org.apache.hc.core5.http.impl.nio.MessageState;
47 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
48 import org.apache.hc.core5.http.nio.AsyncPushProducer;
49 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
50 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
51 import org.apache.hc.core5.http.nio.DataStreamChannel;
52 import org.apache.hc.core5.http.nio.HandlerFactory;
53 import org.apache.hc.core5.http.nio.ResponseChannel;
54 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
55 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
56 import org.apache.hc.core5.http.protocol.HttpContext;
57 import org.apache.hc.core5.http.protocol.HttpCoreContext;
58 import org.apache.hc.core5.http.protocol.HttpProcessor;
59 import org.apache.hc.core5.http2.H2ConnectionException;
60 import org.apache.hc.core5.http2.H2Error;
61 import org.apache.hc.core5.http2.H2StreamResetException;
62 import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
63 import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
64 import org.apache.hc.core5.util.Asserts;
65
66 class ServerH2StreamHandler implements H2StreamHandler {
67
68 private final H2StreamChannel outputChannel;
69 private final DataStreamChannel dataChannel;
70 private final ResponseChannel responseChannel;
71 private final HttpProcessor httpProcessor;
72 private final BasicHttpConnectionMetrics connMetrics;
73 private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
74 private final HttpCoreContext context;
75 private final AtomicBoolean responseCommitted;
76 private final AtomicBoolean failed;
77 private final AtomicBoolean done;
78
79 private volatile AsyncServerExchangeHandler exchangeHandler;
80 private volatile HttpRequest receivedRequest;
81 private volatile MessageState requestState;
82 private volatile MessageState responseState;
83
84 ServerH2StreamHandler(
85 final H2StreamChannel outputChannel,
86 final HttpProcessor httpProcessor,
87 final BasicHttpConnectionMetrics connMetrics,
88 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
89 final HttpCoreContext context) {
90 this.outputChannel = outputChannel;
91 this.dataChannel = new DataStreamChannel() {
92
93 @Override
94 public void requestOutput() {
95 outputChannel.requestOutput();
96 }
97
98 @Override
99 public int write(final ByteBuffer src) throws IOException {
100 return outputChannel.write(src);
101 }
102
103 @Override
104 public void endStream(final List<? extends Header> trailers) throws IOException {
105 outputChannel.endStream(trailers);
106 responseState = MessageState.COMPLETE;
107 }
108
109 @Override
110 public void endStream() throws IOException {
111 outputChannel.endStream();
112 responseState = MessageState.COMPLETE;
113 }
114
115 };
116 this.responseChannel = new ResponseChannel() {
117
118 @Override
119 public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
120 commitInformation(response);
121 }
122
123 @Override
124 public void sendResponse(
125 final HttpResponse response, final EntityDetails responseEntityDetails, final HttpContext httpContext) throws HttpException, IOException {
126 ServerSupport.validateResponse(response, responseEntityDetails);
127 commitResponse(response, responseEntityDetails);
128 }
129
130 @Override
131 public void pushPromise(
132 final HttpRequest promise, final AsyncPushProducer pushProducer, final HttpContext httpContext) throws HttpException, IOException {
133 commitPromise(promise, pushProducer);
134 }
135
136 };
137 this.httpProcessor = httpProcessor;
138 this.connMetrics = connMetrics;
139 this.exchangeHandlerFactory = exchangeHandlerFactory;
140 this.context = context;
141 this.responseCommitted = new AtomicBoolean(false);
142 this.failed = new AtomicBoolean(false);
143 this.done = new AtomicBoolean(false);
144 this.requestState = MessageState.HEADERS;
145 this.responseState = MessageState.IDLE;
146 }
147
148 @Override
149 public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
150 return null;
151 }
152
153 private void commitInformation(final HttpResponse response) throws IOException, HttpException {
154 if (responseCommitted.get()) {
155 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
156 }
157 final int status = response.getCode();
158 if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
159 throw new HttpException("Invalid intermediate response: " + status);
160 }
161 final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
162 outputChannel.submit(responseHeaders, false);
163 }
164
165 private void commitResponse(
166 final HttpResponse response,
167 final EntityDetails responseEntityDetails) throws HttpException, IOException {
168 if (responseCommitted.compareAndSet(false, true)) {
169
170 final int status = response.getCode();
171 if (status < HttpStatus.SC_SUCCESS) {
172 throw new HttpException("Invalid response: " + status);
173 }
174 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
175 httpProcessor.process(response, responseEntityDetails, context);
176
177 final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
178
179 final boolean endStream = responseEntityDetails == null ||
180 (receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod()));
181 outputChannel.submit(responseHeaders, endStream);
182 connMetrics.incrementResponseCount();
183 if (responseEntityDetails == null) {
184 responseState = MessageState.COMPLETE;
185 } else {
186 responseState = MessageState.BODY;
187 exchangeHandler.produce(outputChannel);
188 }
189 } else {
190 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
191 }
192 }
193
194 private void commitPromise(
195 final HttpRequest promise,
196 final AsyncPushProducer pushProducer) throws HttpException, IOException {
197
198 httpProcessor.process(promise, null, context);
199
200 final List<Header> promiseHeaders = DefaultH2RequestConverter.INSTANCE.convert(promise);
201 outputChannel.push(promiseHeaders, pushProducer);
202 connMetrics.incrementRequestCount();
203 }
204
205 @Override
206 public void consumePromise(final List<Header> headers) throws HttpException, IOException {
207 throw new ProtocolException("Unexpected message promise");
208 }
209
210 @Override
211 public void consumeHeader(final List<Header> headers, final boolean endStream) throws HttpException, IOException {
212 if (done.get()) {
213 throw new ProtocolException("Unexpected message headers");
214 }
215 switch (requestState) {
216 case HEADERS:
217 requestState = endStream ? MessageState.COMPLETE : MessageState.BODY;
218
219 final HttpRequest request = DefaultH2RequestConverter.INSTANCE.convert(headers);
220 final EntityDetails requestEntityDetails = endStream ? null : new IncomingEntityDetails(request, -1);
221
222 final AsyncServerExchangeHandler handler;
223 try {
224 handler = exchangeHandlerFactory != null ? exchangeHandlerFactory.create(request, context) : null;
225 } catch (final ProtocolException ex) {
226 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, ex.getMessage());
227 }
228 if (handler == null) {
229 throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused");
230 }
231 exchangeHandler = handler;
232
233 context.setProtocolVersion(HttpVersion.HTTP_2);
234 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
235
236 try {
237 httpProcessor.process(request, requestEntityDetails, context);
238 connMetrics.incrementRequestCount();
239 receivedRequest = request;
240
241 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
242 } catch (final HttpException ex) {
243 if (!responseCommitted.get()) {
244 final AsyncResponseProducer responseProducer = new BasicResponseProducer(
245 ServerSupport.toStatusCode(ex),
246 ServerSupport.toErrorMessage(ex));
247 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
248 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
249 } else {
250 throw ex;
251 }
252 }
253 break;
254 case BODY:
255 responseState = MessageState.COMPLETE;
256 exchangeHandler.streamEnd(headers);
257 break;
258 default:
259 throw new ProtocolException("Unexpected message headers");
260 }
261 }
262
263 @Override
264 public void updateInputCapacity() throws IOException {
265 Asserts.notNull(exchangeHandler, "Exchange handler");
266 exchangeHandler.updateCapacity(outputChannel);
267 }
268
269 @Override
270 public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
271 if (done.get() || requestState != MessageState.BODY) {
272 throw new ProtocolException("Unexpected message data");
273 }
274 Asserts.notNull(exchangeHandler, "Exchange handler");
275 if (src != null) {
276 exchangeHandler.consume(src);
277 }
278 if (endStream) {
279 requestState = MessageState.COMPLETE;
280 exchangeHandler.streamEnd(null);
281 }
282 }
283
284 @Override
285 public boolean isOutputReady() {
286 return responseState == MessageState.BODY && exchangeHandler != null && exchangeHandler.available() > 0;
287 }
288
289 @Override
290 public void produceOutput() throws HttpException, IOException {
291 if (responseState == MessageState.BODY) {
292 Asserts.notNull(exchangeHandler, "Exchange handler");
293 exchangeHandler.produce(dataChannel);
294 }
295 }
296
297 @Override
298 public void handle(final HttpException ex, final boolean endStream) throws HttpException, IOException {
299 if (done.get()) {
300 throw ex;
301 }
302 switch (requestState) {
303 case HEADERS:
304 requestState = endStream ? MessageState.COMPLETE : MessageState.BODY;
305 if (!responseCommitted.get()) {
306 final AsyncResponseProducer responseProducer = new BasicResponseProducer(
307 ServerSupport.toStatusCode(ex),
308 ServerSupport.toErrorMessage(ex));
309 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
310 exchangeHandler.handleRequest(null, null, responseChannel, context);
311 } else {
312 throw ex;
313 }
314 break;
315 case BODY:
316 responseState = MessageState.COMPLETE;
317 default:
318 throw ex;
319 }
320 }
321
322 @Override
323 public void failed(final Exception cause) {
324 try {
325 if (failed.compareAndSet(false, true)) {
326 if (exchangeHandler != null) {
327 exchangeHandler.failed(cause);
328 }
329 }
330 } finally {
331 releaseResources();
332 }
333 }
334
335 @Override
336 public void releaseResources() {
337 if (done.compareAndSet(false, true)) {
338 requestState = MessageState.COMPLETE;
339 responseState = MessageState.COMPLETE;
340 if (exchangeHandler != null) {
341 exchangeHandler.releaseResources();
342 }
343 }
344 }
345
346 @Override
347 public String toString() {
348 return "[" +
349 "requestState=" + requestState +
350 ", responseState=" + responseState +
351 ']';
352 }
353
354 }
355