1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.ReadableByteChannel;
33 import java.nio.channels.WritableByteChannel;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.locks.ReentrantLock;
38
39 import org.apache.hc.core5.annotation.Internal;
40 import org.apache.hc.core5.http.ConnectionClosedException;
41 import org.apache.hc.core5.http.ConnectionReuseStrategy;
42 import org.apache.hc.core5.http.ContentLengthStrategy;
43 import org.apache.hc.core5.http.EntityDetails;
44 import org.apache.hc.core5.http.Header;
45 import org.apache.hc.core5.http.HttpException;
46 import org.apache.hc.core5.http.HttpRequest;
47 import org.apache.hc.core5.http.HttpResponse;
48 import org.apache.hc.core5.http.HttpStatus;
49 import org.apache.hc.core5.http.URIScheme;
50 import org.apache.hc.core5.http.config.CharCodingConfig;
51 import org.apache.hc.core5.http.config.Http1Config;
52 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
53 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
54 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
55 import org.apache.hc.core5.http.impl.Http1StreamListener;
56 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
57 import org.apache.hc.core5.http.nio.CapacityChannel;
58 import org.apache.hc.core5.http.nio.ContentDecoder;
59 import org.apache.hc.core5.http.nio.ContentEncoder;
60 import org.apache.hc.core5.http.nio.HandlerFactory;
61 import org.apache.hc.core5.http.nio.NHttpMessageParser;
62 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
63 import org.apache.hc.core5.http.nio.SessionInputBuffer;
64 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
65 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
66 import org.apache.hc.core5.http.protocol.HttpCoreContext;
67 import org.apache.hc.core5.http.protocol.HttpProcessor;
68 import org.apache.hc.core5.io.CloseMode;
69 import org.apache.hc.core5.reactor.ProtocolIOSession;
70 import org.apache.hc.core5.util.Args;
71 import org.apache.hc.core5.util.Asserts;
72 import org.apache.hc.core5.util.Timeout;
73
74
75
76
77
78
79
80
81 @Internal
82 public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpRequest, HttpResponse> {
83
84 private final String scheme;
85 private final HttpProcessor httpProcessor;
86 private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
87 private final Http1Config http1Config;
88 private final ConnectionReuseStrategy connectionReuseStrategy;
89 private final Http1StreamListener streamListener;
90 private final Queue<ServerHttp1StreamHandler> pipeline;
91 private final Http1StreamChannel<HttpResponse> outputChannel;
92
93 private volatile ServerHttp1StreamHandler outgoing;
94 private volatile ServerHttp1StreamHandler incoming;
95
96 public ServerHttp1StreamDuplexer(
97 final ProtocolIOSession ioSession,
98 final HttpProcessor httpProcessor,
99 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
100 final String scheme,
101 final Http1Config http1Config,
102 final CharCodingConfig charCodingConfig,
103 final ConnectionReuseStrategy connectionReuseStrategy,
104 final NHttpMessageParser<HttpRequest> incomingMessageParser,
105 final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
106 final ContentLengthStrategy incomingContentStrategy,
107 final ContentLengthStrategy outgoingContentStrategy,
108 final Http1StreamListener streamListener) {
109 super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter,
110 incomingContentStrategy, outgoingContentStrategy);
111 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
112 this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
113 this.scheme = scheme != null ? scheme : URIScheme.HTTP.getId();
114 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
115 this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
116 DefaultConnectionReuseStrategy.INSTANCE;
117 this.streamListener = streamListener;
118 this.pipeline = new ConcurrentLinkedQueue<>();
119 this.outputChannel = new Http1StreamChannel<HttpResponse>() {
120
121 @Override
122 public void close() {
123 ServerHttp1StreamDuplexer.this.close(CloseMode.GRACEFUL);
124 }
125
126 @Override
127 public void submit(
128 final HttpResponse response,
129 final boolean endStream,
130 final FlushMode flushMode) throws HttpException, IOException {
131 if (streamListener != null) {
132 streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
133 }
134 commitMessageHead(response, endStream, flushMode);
135 }
136
137 @Override
138 public void requestOutput() {
139 requestSessionOutput();
140 }
141
142 @Override
143 public void suspendOutput() throws IOException {
144 suspendSessionOutput();
145 }
146
147 @Override
148 public Timeout getSocketTimeout() {
149 return getSessionTimeout();
150 }
151
152 @Override
153 public void setSocketTimeout(final Timeout timeout) {
154 setSessionTimeout(timeout);
155 }
156
157 @Override
158 public int write(final ByteBuffer src) throws IOException {
159 return streamOutput(src);
160 }
161
162 @Override
163 public void complete(final List<? extends Header> trailers) throws IOException {
164 endOutputStream(trailers);
165 }
166
167 @Override
168 public boolean isCompleted() {
169 return isOutputCompleted();
170 }
171
172 @Override
173 public boolean abortGracefully() throws IOException {
174 final MessageDelineation messageDelineation = endOutputStream(null);
175 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
176 }
177
178 @Override
179 public void activate() throws HttpException, IOException {
180
181 }
182
183 @Override
184 public String toString() {
185 return "Http1StreamChannel[" + ServerHttp1StreamDuplexer.this + "]";
186 }
187
188 };
189 }
190
191 @Override
192 void terminate(final Exception exception) {
193 if (incoming != null) {
194 incoming.failed(exception);
195 incoming.releaseResources();
196 incoming = null;
197 }
198 if (outgoing != null) {
199 outgoing.failed(exception);
200 outgoing.releaseResources();
201 outgoing = null;
202 }
203 for (;;) {
204 final ServerHttp1StreamHandler handler = pipeline.poll();
205 if (handler != null) {
206 handler.failed(exception);
207 handler.releaseResources();
208 } else {
209 break;
210 }
211 }
212 }
213
214 @Override
215 void disconnected() {
216 if (incoming != null) {
217 if (!incoming.isCompleted()) {
218 incoming.failed(new ConnectionClosedException());
219 }
220 incoming.releaseResources();
221 incoming = null;
222 }
223 if (outgoing != null) {
224 if (!outgoing.isCompleted()) {
225 outgoing.failed(new ConnectionClosedException());
226 }
227 outgoing.releaseResources();
228 outgoing = null;
229 }
230 for (;;) {
231 final ServerHttp1StreamHandler handler = pipeline.poll();
232 if (handler != null) {
233 handler.failed(new ConnectionClosedException());
234 handler.releaseResources();
235 } else {
236 break;
237 }
238 }
239 }
240
241 @Override
242 void updateInputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
243 connMetrics.incrementRequestCount();
244 }
245
246 @Override
247 void updateOutputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
248 if (response.getCode() >= HttpStatus.SC_OK) {
249 connMetrics.incrementResponseCount();
250 }
251 }
252
253 @Override
254 protected boolean handleIncomingMessage(final HttpRequest request) throws HttpException {
255 return true;
256 }
257
258 @Override
259 protected ContentDecoder createContentDecoder(
260 final long len,
261 final ReadableByteChannel channel,
262 final SessionInputBuffer buffer,
263 final BasicHttpTransportMetrics metrics) throws HttpException {
264 if (len >= 0) {
265 return new LengthDelimitedDecoder(channel, buffer, metrics, len);
266 } else if (len == ContentLengthStrategy.CHUNKED) {
267 return new ChunkDecoder(channel, buffer, http1Config, metrics);
268 } else {
269 return null;
270 }
271 }
272
273 @Override
274 protected boolean handleOutgoingMessage(final HttpResponse response) throws HttpException {
275 return true;
276 }
277
278 @Override
279 protected ContentEncoder createContentEncoder(
280 final long len,
281 final WritableByteChannel channel,
282 final SessionOutputBuffer buffer,
283 final BasicHttpTransportMetrics metrics) throws HttpException {
284 final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
285 if (len >= 0) {
286 return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
287 } else if (len == ContentLengthStrategy.CHUNKED) {
288 return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
289 } else {
290 return new IdentityEncoder(channel, buffer, metrics, chunkSizeHint);
291 }
292 }
293
294 @Override
295 boolean inputIdle() {
296 return incoming == null;
297 }
298
299 @Override
300 boolean outputIdle() {
301 return outgoing == null && pipeline.isEmpty();
302 }
303
304 @Override
305 HttpRequest parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
306 try {
307 return super.parseMessageHead(endOfStream);
308 } catch (final HttpException ex) {
309 terminateExchange(ex);
310 return null;
311 }
312 }
313
314 void terminateExchange(final HttpException ex) throws HttpException, IOException {
315 suspendSessionInput();
316 final ServerHttp1StreamHandler streamHandler;
317 final HttpCoreContext context = HttpCoreContext.create();
318 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
319 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
320 if (outgoing == null) {
321 streamHandler = new ServerHttp1StreamHandler(
322 outputChannel,
323 httpProcessor,
324 http1Config,
325 connectionReuseStrategy,
326 exchangeHandlerFactory,
327 context);
328 outgoing = streamHandler;
329 } else {
330 streamHandler = new ServerHttp1StreamHandler(
331 new DelayedOutputChannel(outputChannel),
332 httpProcessor,
333 http1Config,
334 connectionReuseStrategy,
335 exchangeHandlerFactory,
336 context);
337 pipeline.add(streamHandler);
338 }
339 streamHandler.terminateExchange(ex);
340 incoming = null;
341 }
342
343 @Override
344 void consumeHeader(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
345 if (streamListener != null) {
346 streamListener.onRequestHead(this, request);
347 }
348 final ServerHttp1StreamHandler streamHandler;
349 final HttpCoreContext context = HttpCoreContext.create();
350 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
351 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
352 if (outgoing == null) {
353 streamHandler = new ServerHttp1StreamHandler(
354 outputChannel,
355 httpProcessor,
356 http1Config,
357 connectionReuseStrategy,
358 exchangeHandlerFactory,
359 context);
360 outgoing = streamHandler;
361 } else {
362 streamHandler = new ServerHttp1StreamHandler(
363 new DelayedOutputChannel(outputChannel),
364 httpProcessor,
365 http1Config,
366 connectionReuseStrategy,
367 exchangeHandlerFactory,
368 context);
369 pipeline.add(streamHandler);
370 }
371 request.setScheme(scheme);
372 streamHandler.consumeHeader(request, entityDetails);
373 incoming = streamHandler;
374 }
375
376 @Override
377 void consumeData(final ByteBuffer src) throws HttpException, IOException {
378 Asserts.notNull(incoming, "Request stream handler");
379 incoming.consumeData(src);
380 }
381
382 @Override
383 void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
384 Asserts.notNull(incoming, "Request stream handler");
385 incoming.updateCapacity(capacityChannel);
386 }
387
388 @Override
389 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
390 Asserts.notNull(incoming, "Request stream handler");
391 incoming.dataEnd(trailers);
392 }
393
394 @Override
395 void inputEnd() throws HttpException, IOException {
396 if (incoming != null) {
397 if (incoming.isCompleted()) {
398 incoming.releaseResources();
399 }
400 incoming = null;
401 }
402 if (isShuttingDown() && outputIdle() && inputIdle()) {
403 shutdownSession(CloseMode.IMMEDIATE);
404 }
405 }
406
407 @Override
408 void execute(final RequestExecutionCommand executionCommand) throws HttpException {
409 throw new HttpException("Illegal command: " + executionCommand.getClass());
410 }
411
412 @Override
413 boolean isOutputReady() {
414 return outgoing != null && outgoing.isOutputReady();
415 }
416
417 @Override
418 void produceOutput() throws HttpException, IOException {
419 if (outgoing != null) {
420 outgoing.produceOutput();
421 }
422 }
423
424 @Override
425 void outputEnd() throws HttpException, IOException {
426 if (outgoing != null && outgoing.isResponseFinal()) {
427 if (streamListener != null) {
428 streamListener.onExchangeComplete(this, outgoing.keepAlive());
429 }
430 if (outgoing.isCompleted()) {
431 outgoing.releaseResources();
432 }
433 outgoing = null;
434 }
435 if (outgoing == null && isActive()) {
436 final ServerHttp1StreamHandler handler = pipeline.poll();
437 if (handler != null) {
438 outgoing = handler;
439 handler.activateChannel();
440 if (handler.isOutputReady()) {
441 handler.produceOutput();
442 }
443 }
444 }
445 if (isShuttingDown() && outputIdle() && inputIdle()) {
446 shutdownSession(CloseMode.IMMEDIATE);
447 }
448 }
449
450 @Override
451 boolean handleTimeout() {
452 return false;
453 }
454
455 @Override
456 void appendState(final StringBuilder buf) {
457 super.appendState(buf);
458 buf.append(", incoming=[");
459 if (incoming != null) {
460 incoming.appendState(buf);
461 }
462 buf.append("], outgoing=[");
463 if (outgoing != null) {
464 outgoing.appendState(buf);
465 }
466 buf.append("], pipeline=");
467 buf.append(pipeline.size());
468 }
469
470 @Override
471 public String toString() {
472 final StringBuilder buf = new StringBuilder();
473 buf.append("[");
474 appendState(buf);
475 buf.append("]");
476 return buf.toString();
477 }
478
479 private static class DelayedOutputChannel implements Http1StreamChannel<HttpResponse> {
480
481 private final Http1StreamChannel<HttpResponse> channel;
482
483 private volatile boolean direct;
484 private volatile HttpResponse delayedResponse;
485 private volatile boolean completed;
486 private final ReentrantLock lock = new ReentrantLock();
487
488 private DelayedOutputChannel(final Http1StreamChannel<HttpResponse> channel) {
489 this.channel = channel;
490 }
491
492 @Override
493 public void close() {
494 channel.close();
495 }
496
497 @Override
498 public void submit(
499 final HttpResponse response,
500 final boolean endStream,
501 final FlushMode flushMode) throws HttpException, IOException {
502 lock.lock();
503 try {
504 if (direct) {
505 channel.submit(response, endStream, flushMode);
506 } else {
507 delayedResponse = response;
508 completed = endStream;
509 }
510 } finally {
511 lock.unlock();
512 }
513 }
514
515 @Override
516 public void suspendOutput() throws IOException {
517 channel.suspendOutput();
518 }
519
520 @Override
521 public void requestOutput() {
522 channel.requestOutput();
523 }
524
525 @Override
526 public Timeout getSocketTimeout() {
527 return channel.getSocketTimeout();
528 }
529
530 @Override
531 public void setSocketTimeout(final Timeout timeout) {
532 channel.setSocketTimeout(timeout);
533 }
534
535 @Override
536 public int write(final ByteBuffer src) throws IOException {
537 lock.lock();
538 try {
539 return direct ? channel.write(src) : 0;
540 } finally {
541 lock.unlock();
542 }
543 }
544
545 @Override
546 public void complete(final List<? extends Header> trailers) throws IOException {
547 lock.lock();
548 try {
549 if (direct) {
550 channel.complete(trailers);
551 } else {
552 completed = true;
553 }
554 } finally {
555 lock.unlock();
556 }
557 }
558
559 @Override
560 public boolean abortGracefully() throws IOException {
561 lock.lock();
562 try {
563 if (direct) {
564 return channel.abortGracefully();
565 }
566 completed = true;
567 return true;
568 } finally {
569 lock.unlock();
570 }
571 }
572
573 @Override
574 public boolean isCompleted() {
575 lock.lock();
576 try {
577 return direct ? channel.isCompleted() : completed;
578 } finally {
579 lock.unlock();
580 }
581 }
582
583 @Override
584 public void activate() throws IOException, HttpException {
585 lock.lock();
586 try {
587 direct = true;
588 if (delayedResponse != null) {
589 channel.submit(delayedResponse, completed, completed ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
590 delayedResponse = null;
591 }
592 } finally {
593 lock.unlock();
594 }
595 }
596
597 }
598
599 }