1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.EntityDetails;
35 import org.apache.hc.core5.http.Header;
36 import org.apache.hc.core5.http.HttpException;
37 import org.apache.hc.core5.http.HttpRequest;
38 import org.apache.hc.core5.http.HttpResponse;
39 import org.apache.hc.core5.http.HttpStatus;
40 import org.apache.hc.core5.http.HttpVersion;
41 import org.apache.hc.core5.http.Method;
42 import org.apache.hc.core5.http.ProtocolException;
43 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
44 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
45 import org.apache.hc.core5.http.impl.ServerSupport;
46 import org.apache.hc.core5.http.impl.nio.MessageState;
47 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
48 import org.apache.hc.core5.http.nio.AsyncPushProducer;
49 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
50 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
51 import org.apache.hc.core5.http.nio.DataStreamChannel;
52 import org.apache.hc.core5.http.nio.HandlerFactory;
53 import org.apache.hc.core5.http.nio.ResponseChannel;
54 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
55 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
56 import org.apache.hc.core5.http.protocol.HttpContext;
57 import org.apache.hc.core5.http.protocol.HttpCoreContext;
58 import org.apache.hc.core5.http.protocol.HttpProcessor;
59 import org.apache.hc.core5.http2.H2ConnectionException;
60 import org.apache.hc.core5.http2.H2Error;
61 import org.apache.hc.core5.http2.H2StreamResetException;
62 import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
63 import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
64 import org.apache.hc.core5.util.Asserts;
65
66 class ServerH2StreamHandler implements H2StreamHandler {
67
68 private final H2StreamChannel outputChannel;
69 private final DataStreamChannel dataChannel;
70 private final ResponseChannel responseChannel;
71 private final HttpProcessor httpProcessor;
72 private final BasicHttpConnectionMetrics connMetrics;
73 private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
74 private final HttpCoreContext context;
75 private final AtomicBoolean responseCommitted;
76 private final AtomicBoolean failed;
77 private final AtomicBoolean done;
78
79 private volatile AsyncServerExchangeHandler exchangeHandler;
80 private volatile HttpRequest receivedRequest;
81 private volatile MessageState requestState;
82 private volatile MessageState responseState;
83
84 ServerH2StreamHandler(
85 final H2StreamChannel outputChannel,
86 final HttpProcessor httpProcessor,
87 final BasicHttpConnectionMetrics connMetrics,
88 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
89 final HttpCoreContext context) {
90 this.outputChannel = outputChannel;
91 this.dataChannel = new DataStreamChannel() {
92
93 @Override
94 public void requestOutput() {
95 outputChannel.requestOutput();
96 }
97
98 @Override
99 public int write(final ByteBuffer src) throws IOException {
100 return outputChannel.write(src);
101 }
102
103 @Override
104 public void endStream(final List<? extends Header> trailers) throws IOException {
105 outputChannel.endStream(trailers);
106 responseState = MessageState.COMPLETE;
107 }
108
109 @Override
110 public void endStream() throws IOException {
111 outputChannel.endStream();
112 responseState = MessageState.COMPLETE;
113 }
114
115 };
116 this.responseChannel = new ResponseChannel() {
117
118 @Override
119 public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
120 commitInformation(response);
121 }
122
123 @Override
124 public void sendResponse(
125 final HttpResponse response, final EntityDetails responseEntityDetails, final HttpContext httpContext) throws HttpException, IOException {
126 commitResponse(response, responseEntityDetails);
127 }
128
129 @Override
130 public void pushPromise(
131 final HttpRequest promise, final AsyncPushProducer pushProducer, final HttpContext httpContext) throws HttpException, IOException {
132 commitPromise(promise, pushProducer);
133 }
134
135 };
136 this.httpProcessor = httpProcessor;
137 this.connMetrics = connMetrics;
138 this.exchangeHandlerFactory = exchangeHandlerFactory;
139 this.context = context;
140 this.responseCommitted = new AtomicBoolean(false);
141 this.failed = new AtomicBoolean(false);
142 this.done = new AtomicBoolean(false);
143 this.requestState = MessageState.HEADERS;
144 this.responseState = MessageState.IDLE;
145 }
146
147 @Override
148 public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
149 return null;
150 }
151
152 private void commitInformation(final HttpResponse response) throws IOException, HttpException {
153 if (responseCommitted.get()) {
154 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
155 }
156 final int status = response.getCode();
157 if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
158 throw new HttpException("Invalid intermediate response: " + status);
159 }
160 final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
161 outputChannel.submit(responseHeaders, false);
162 }
163
164 private void commitResponse(
165 final HttpResponse response,
166 final EntityDetails responseEntityDetails) throws HttpException, IOException {
167 if (responseCommitted.compareAndSet(false, true)) {
168
169 final int status = response.getCode();
170 if (status < HttpStatus.SC_SUCCESS) {
171 throw new HttpException("Invalid response: " + status);
172 }
173 context.setResponse(response);
174 httpProcessor.process(response, responseEntityDetails, context);
175
176 final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
177
178 final boolean endStream = responseEntityDetails == null ||
179 (receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod()));
180 outputChannel.submit(responseHeaders, endStream);
181 connMetrics.incrementResponseCount();
182 if (responseEntityDetails == null) {
183 responseState = MessageState.COMPLETE;
184 } else {
185 responseState = MessageState.BODY;
186 exchangeHandler.produce(outputChannel);
187 }
188 } else {
189 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
190 }
191 }
192
193 private void commitPromise(
194 final HttpRequest promise,
195 final AsyncPushProducer pushProducer) throws HttpException, IOException {
196
197 httpProcessor.process(promise, null, context);
198
199 final List<Header> promiseHeaders = DefaultH2RequestConverter.INSTANCE.convert(promise);
200 outputChannel.push(promiseHeaders, pushProducer);
201 connMetrics.incrementRequestCount();
202 }
203
204 @Override
205 public void consumePromise(final List<Header> headers) throws HttpException, IOException {
206 throw new ProtocolException("Unexpected message promise");
207 }
208
209 @Override
210 public void consumeHeader(final List<Header> headers, final boolean endStream) throws HttpException, IOException {
211 if (done.get()) {
212 throw new ProtocolException("Unexpected message headers");
213 }
214 switch (requestState) {
215 case HEADERS:
216 requestState = endStream ? MessageState.COMPLETE : MessageState.BODY;
217
218 final HttpRequest request = DefaultH2RequestConverter.INSTANCE.convert(headers);
219 final EntityDetails requestEntityDetails = endStream ? null : new IncomingEntityDetails(request, -1);
220
221 final AsyncServerExchangeHandler handler;
222 try {
223 handler = exchangeHandlerFactory != null ? exchangeHandlerFactory.create(request, context) : null;
224 } catch (final ProtocolException ex) {
225 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, ex.getMessage());
226 }
227 if (handler == null) {
228 throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused");
229 }
230 exchangeHandler = handler;
231
232 context.setProtocolVersion(HttpVersion.HTTP_2);
233 context.setRequest(request);
234
235 try {
236 httpProcessor.process(request, requestEntityDetails, context);
237 connMetrics.incrementRequestCount();
238 receivedRequest = request;
239
240 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
241 } catch (final HttpException ex) {
242 if (!responseCommitted.get()) {
243 final AsyncResponseProducer responseProducer = new BasicResponseProducer(
244 ServerSupport.toStatusCode(ex),
245 ServerSupport.toErrorMessage(ex));
246 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
247 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
248 } else {
249 throw ex;
250 }
251 }
252 break;
253 case BODY:
254 responseState = MessageState.COMPLETE;
255 exchangeHandler.streamEnd(headers);
256 break;
257 default:
258 throw new ProtocolException("Unexpected message headers");
259 }
260 }
261
262 @Override
263 public void updateInputCapacity() throws IOException {
264 Asserts.notNull(exchangeHandler, "Exchange handler");
265 exchangeHandler.updateCapacity(outputChannel);
266 }
267
268 @Override
269 public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
270 if (done.get() || requestState != MessageState.BODY) {
271 throw new ProtocolException("Unexpected message data");
272 }
273 Asserts.notNull(exchangeHandler, "Exchange handler");
274 if (src != null) {
275 exchangeHandler.consume(src);
276 }
277 if (endStream) {
278 requestState = MessageState.COMPLETE;
279 exchangeHandler.streamEnd(null);
280 }
281 }
282
283 @Override
284 public boolean isOutputReady() {
285 return responseState == MessageState.BODY && exchangeHandler != null && exchangeHandler.available() > 0;
286 }
287
288 @Override
289 public void produceOutput() throws HttpException, IOException {
290 if (responseState == MessageState.BODY) {
291 Asserts.notNull(exchangeHandler, "Exchange handler");
292 exchangeHandler.produce(dataChannel);
293 }
294 }
295
296 @Override
297 public void handle(final HttpException ex, final boolean endStream) throws HttpException, IOException {
298 if (done.get()) {
299 throw ex;
300 }
301 switch (requestState) {
302 case HEADERS:
303 requestState = endStream ? MessageState.COMPLETE : MessageState.BODY;
304 if (!responseCommitted.get()) {
305 final AsyncResponseProducer responseProducer = new BasicResponseProducer(
306 ServerSupport.toStatusCode(ex),
307 ServerSupport.toErrorMessage(ex));
308 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
309 exchangeHandler.handleRequest(null, null, responseChannel, context);
310 } else {
311 throw ex;
312 }
313 break;
314 case BODY:
315 responseState = MessageState.COMPLETE;
316 default:
317 throw ex;
318 }
319 }
320
321 @Override
322 public void failed(final Exception cause) {
323 try {
324 if (failed.compareAndSet(false, true)) {
325 if (exchangeHandler != null) {
326 exchangeHandler.failed(cause);
327 }
328 }
329 } finally {
330 releaseResources();
331 }
332 }
333
334 @Override
335 public void releaseResources() {
336 if (done.compareAndSet(false, true)) {
337 requestState = MessageState.COMPLETE;
338 responseState = MessageState.COMPLETE;
339 if (exchangeHandler != null) {
340 exchangeHandler.releaseResources();
341 }
342 }
343 }
344
345 @Override
346 public String toString() {
347 return "[" +
348 "requestState=" + requestState +
349 ", responseState=" + responseState +
350 ']';
351 }
352
353 }
354