1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ClosedChannelException;
34 import java.nio.channels.ReadableByteChannel;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.WritableByteChannel;
37 import java.util.List;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.locks.ReentrantLock;
40
41 import javax.net.ssl.SSLSession;
42
43 import org.apache.hc.core5.http.ConnectionClosedException;
44 import org.apache.hc.core5.http.ContentLengthStrategy;
45 import org.apache.hc.core5.http.EndpointDetails;
46 import org.apache.hc.core5.http.EntityDetails;
47 import org.apache.hc.core5.http.Header;
48 import org.apache.hc.core5.http.HttpConnection;
49 import org.apache.hc.core5.http.HttpException;
50 import org.apache.hc.core5.http.HttpMessage;
51 import org.apache.hc.core5.http.Message;
52 import org.apache.hc.core5.http.ProtocolVersion;
53 import org.apache.hc.core5.http.config.CharCodingConfig;
54 import org.apache.hc.core5.http.config.Http1Config;
55 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
56 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
57 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
58 import org.apache.hc.core5.http.impl.CharCodingSupport;
59 import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
60 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
61 import org.apache.hc.core5.http.nio.CapacityChannel;
62 import org.apache.hc.core5.http.nio.ContentDecoder;
63 import org.apache.hc.core5.http.nio.ContentEncoder;
64 import org.apache.hc.core5.http.nio.NHttpMessageParser;
65 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
66 import org.apache.hc.core5.http.nio.SessionInputBuffer;
67 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
68 import org.apache.hc.core5.http.nio.command.CommandSupport;
69 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
70 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
71 import org.apache.hc.core5.io.CloseMode;
72 import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
73 import org.apache.hc.core5.reactor.Command;
74 import org.apache.hc.core5.reactor.EventMask;
75 import org.apache.hc.core5.reactor.IOSession;
76 import org.apache.hc.core5.reactor.ProtocolIOSession;
77 import org.apache.hc.core5.reactor.ssl.TlsDetails;
78 import org.apache.hc.core5.util.Args;
79 import org.apache.hc.core5.util.Identifiable;
80 import org.apache.hc.core5.util.Timeout;
81
82 abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
83 implements Identifiable, HttpConnection {
84
85 private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
86
87 private final ProtocolIOSession ioSession;
88 private final Http1Config http1Config;
89 private final SessionInputBufferImpl inbuf;
90 private final SessionOutputBufferImpl outbuf;
91 private final BasicHttpTransportMetrics inTransportMetrics;
92 private final BasicHttpTransportMetrics outTransportMetrics;
93 private final BasicHttpConnectionMetrics connMetrics;
94 private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
95 private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
96 private final ContentLengthStrategy incomingContentStrategy;
97 private final ContentLengthStrategy outgoingContentStrategy;
98 private final ByteBuffer contentBuffer;
99 private final AtomicInteger outputRequests;
100
101 private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
102 private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
103 private volatile ConnectionState connState;
104 private volatile CapacityWindow capacityWindow;
105
106 private volatile ProtocolVersion version;
107 private volatile EndpointDetails endpointDetails;
108
109 AbstractHttp1StreamDuplexer(
110 final ProtocolIOSession ioSession,
111 final Http1Config http1Config,
112 final CharCodingConfig charCodingConfig,
113 final NHttpMessageParser<IncomingMessage> incomingMessageParser,
114 final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
115 final ContentLengthStrategy incomingContentStrategy,
116 final ContentLengthStrategy outgoingContentStrategy) {
117 this.ioSession = Args.notNull(ioSession, "I/O session");
118 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
119 final int bufferSize = this.http1Config.getBufferSize();
120 this.inbuf = new SessionInputBufferImpl(bufferSize, Math.min(bufferSize, 512),
121 this.http1Config.getMaxLineLength(),
122 CharCodingSupport.createDecoder(charCodingConfig));
123 this.outbuf = new SessionOutputBufferImpl(bufferSize, Math.min(bufferSize, 512),
124 CharCodingSupport.createEncoder(charCodingConfig));
125 this.inTransportMetrics = new BasicHttpTransportMetrics();
126 this.outTransportMetrics = new BasicHttpTransportMetrics();
127 this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
128 this.incomingMessageParser = incomingMessageParser;
129 this.outgoingMessageWriter = outgoingMessageWriter;
130 this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
131 DefaultContentLengthStrategy.INSTANCE;
132 this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
133 DefaultContentLengthStrategy.INSTANCE;
134 this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
135 this.outputRequests = new AtomicInteger(0);
136 this.connState = ConnectionState.READY;
137 }
138
139 @Override
140 public String getId() {
141 return ioSession.getId();
142 }
143
144 boolean isActive() {
145 return connState == ConnectionState.ACTIVE;
146 }
147
148 boolean isShuttingDown() {
149 return connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0;
150 }
151
152 void shutdownSession(final CloseMode closeMode) {
153 if (closeMode == CloseMode.GRACEFUL) {
154 connState = ConnectionState.GRACEFUL_SHUTDOWN;
155 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
156 } else {
157 connState = ConnectionState.SHUTDOWN;
158 ioSession.close();
159 }
160 }
161
162 void shutdownSession(final Exception cause) {
163 connState = ConnectionState.SHUTDOWN;
164 try {
165 terminate(cause);
166 } finally {
167 final CloseMode closeMode;
168 if (cause instanceof ConnectionClosedException) {
169 closeMode = CloseMode.GRACEFUL;
170 } else if (cause instanceof IOException) {
171 closeMode = CloseMode.IMMEDIATE;
172 } else {
173 closeMode = CloseMode.GRACEFUL;
174 }
175 ioSession.close(closeMode);
176 }
177 }
178
179 abstract void disconnected();
180
181 abstract void terminate(final Exception exception);
182
183 abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
184
185 abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
186
187 abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
188
189 abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
190
191 abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
192
193 abstract ContentDecoder createContentDecoder(
194 long contentLength,
195 ReadableByteChannel channel,
196 SessionInputBuffer buffer,
197 BasicHttpTransportMetrics metrics) throws HttpException;
198
199 abstract ContentEncoder createContentEncoder(
200 long contentLength,
201 WritableByteChannel channel,
202 SessionOutputBuffer buffer,
203 BasicHttpTransportMetrics metrics) throws HttpException;
204
205 abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
206
207 abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
208
209 abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
210
211 abstract boolean isOutputReady();
212
213 abstract void produceOutput() throws HttpException, IOException;
214
215 abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
216
217 abstract void inputEnd() throws HttpException, IOException;
218
219 abstract void outputEnd() throws HttpException, IOException;
220
221 abstract boolean inputIdle();
222
223 abstract boolean outputIdle();
224
225 abstract boolean handleTimeout();
226
227 private void processCommands() throws HttpException, IOException {
228 for (;;) {
229 final Command command = ioSession.poll();
230 if (command == null) {
231 return;
232 }
233 if (command instanceof ShutdownCommand) {
234 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
235 requestShutdown(shutdownCommand.getType());
236 } else if (command instanceof RequestExecutionCommand) {
237 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
238 command.cancel();
239 } else {
240 execute((RequestExecutionCommand) command);
241 return;
242 }
243 } else {
244 throw new HttpException("Unexpected command: " + command.getClass());
245 }
246 }
247 }
248
249 public final void onConnect() throws HttpException, IOException {
250 if (connState == ConnectionState.READY) {
251 connState = ConnectionState.ACTIVE;
252 processCommands();
253 }
254 }
255
256 IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
257 final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
258 if (messageHead != null) {
259 incomingMessageParser.reset();
260 }
261 return messageHead;
262 }
263
264 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
265 if (src != null) {
266 final int n = src.remaining();
267 inbuf.put(src);
268 inTransportMetrics.incrementBytesTransferred(n);
269 }
270
271 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
272 ioSession.clearEvent(SelectionKey.OP_READ);
273 return;
274 }
275
276 boolean endOfStream = false;
277 if (incomingMessage == null) {
278 final int bytesRead = inbuf.fill(ioSession);
279 if (bytesRead > 0) {
280 inTransportMetrics.incrementBytesTransferred(bytesRead);
281 }
282 endOfStream = bytesRead == -1;
283 }
284
285 do {
286 if (incomingMessage == null) {
287
288 final IncomingMessage messageHead = parseMessageHead(endOfStream);
289 if (messageHead != null) {
290 this.version = messageHead.getVersion();
291
292 updateInputMetrics(messageHead, connMetrics);
293 final ContentDecoder contentDecoder;
294 if (handleIncomingMessage(messageHead)) {
295 final long len = incomingContentStrategy.determineLength(messageHead);
296 contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
297 consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
298 } else {
299 consumeHeader(messageHead, null);
300 contentDecoder = null;
301 }
302 capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
303 if (contentDecoder != null) {
304 incomingMessage = new Message<>(messageHead, contentDecoder);
305 } else {
306 inputEnd();
307 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
308 ioSession.setEvent(SelectionKey.OP_READ);
309 }
310 }
311 } else {
312 break;
313 }
314 }
315
316 if (incomingMessage != null) {
317 final ContentDecoder contentDecoder = incomingMessage.getBody();
318
319
320
321
322
323 final int bytesRead = contentDecoder.read(contentBuffer);
324 if (bytesRead > 0) {
325 contentBuffer.flip();
326 consumeData(contentBuffer);
327 contentBuffer.clear();
328 final int capacity = capacityWindow.removeCapacity(bytesRead);
329 if (capacity <= 0) {
330 if (!contentDecoder.isCompleted()) {
331 updateCapacity(capacityWindow);
332 }
333 }
334 }
335 if (contentDecoder.isCompleted()) {
336 dataEnd(contentDecoder.getTrailers());
337 capacityWindow.close();
338 incomingMessage = null;
339 ioSession.setEvent(SelectionKey.OP_READ);
340 inputEnd();
341 } else if (bytesRead == 0) {
342 break;
343 }
344 }
345 } while (inbuf.hasData());
346
347 if (endOfStream && !inbuf.hasData()) {
348 if (outputIdle() && inputIdle()) {
349 requestShutdown(CloseMode.GRACEFUL);
350 } else {
351 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
352 }
353 }
354 }
355
356 public final void onOutput() throws IOException, HttpException {
357 ioSession.getLock().lock();
358 try {
359 if (outbuf.hasData()) {
360 final int bytesWritten = outbuf.flush(ioSession);
361 if (bytesWritten > 0) {
362 outTransportMetrics.incrementBytesTransferred(bytesWritten);
363 }
364 }
365 } finally {
366 ioSession.getLock().unlock();
367 }
368 if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
369 final int pendingOutputRequests = outputRequests.get();
370 produceOutput();
371 final boolean outputPending = isOutputReady();
372 final boolean outputEnd;
373 ioSession.getLock().lock();
374 try {
375 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
376 ioSession.clearEvent(SelectionKey.OP_WRITE);
377 } else {
378 outputRequests.addAndGet(-pendingOutputRequests);
379 }
380 outputEnd = outgoingMessage == null && !outbuf.hasData();
381 } finally {
382 ioSession.getLock().unlock();
383 }
384 if (outputEnd) {
385 outputEnd();
386 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
387 processCommands();
388 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
389 connState = ConnectionState.SHUTDOWN;
390 }
391 }
392 }
393 if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
394 ioSession.close();
395 }
396 }
397
398 public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
399 if (!handleTimeout()) {
400 onException(SocketTimeoutExceptionFactory.create(timeout));
401 }
402 }
403
404 public final void onException(final Exception ex) {
405 shutdownSession(ex);
406 CommandSupport.failCommands(ioSession, ex);
407 }
408
409 public final void onDisconnect() {
410 disconnected();
411 CommandSupport.cancelCommands(ioSession);
412 }
413
414 void requestShutdown(final CloseMode closeMode) {
415 switch (closeMode) {
416 case GRACEFUL:
417 if (connState == ConnectionState.ACTIVE) {
418 connState = ConnectionState.GRACEFUL_SHUTDOWN;
419 }
420 break;
421 case IMMEDIATE:
422 connState = ConnectionState.SHUTDOWN;
423 break;
424 }
425 ioSession.setEvent(SelectionKey.OP_WRITE);
426 }
427
428 void commitMessageHead(
429 final OutgoingMessage messageHead,
430 final boolean endStream,
431 final FlushMode flushMode) throws HttpException, IOException {
432 ioSession.getLock().lock();
433 try {
434 outgoingMessageWriter.write(messageHead, outbuf);
435 updateOutputMetrics(messageHead, connMetrics);
436 if (!endStream) {
437 final ContentEncoder contentEncoder;
438 if (handleOutgoingMessage(messageHead)) {
439 final long len = outgoingContentStrategy.determineLength(messageHead);
440 contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
441 } else {
442 contentEncoder = null;
443 }
444 if (contentEncoder != null) {
445 outgoingMessage = new Message<>(messageHead, contentEncoder);
446 }
447 }
448 outgoingMessageWriter.reset();
449 if (flushMode == FlushMode.IMMEDIATE) {
450 final int bytesWritten = outbuf.flush(ioSession);
451 if (bytesWritten > 0) {
452 outTransportMetrics.incrementBytesTransferred(bytesWritten);
453 }
454 }
455 ioSession.setEvent(EventMask.WRITE);
456 } finally {
457 ioSession.getLock().unlock();
458 }
459 }
460
461 void requestSessionInput() {
462 ioSession.setEvent(SelectionKey.OP_READ);
463 }
464
465 void requestSessionOutput() {
466 outputRequests.incrementAndGet();
467 ioSession.setEvent(SelectionKey.OP_WRITE);
468 }
469
470 Timeout getSessionTimeout() {
471 return ioSession.getSocketTimeout();
472 }
473
474 void setSessionTimeout(final Timeout timeout) {
475 ioSession.setSocketTimeout(timeout);
476 }
477
478 void suspendSessionInput() {
479 ioSession.clearEvent(SelectionKey.OP_READ);
480 }
481
482 void suspendSessionOutput() throws IOException {
483 ioSession.getLock().lock();
484 try {
485 if (outbuf.hasData()) {
486 final int bytesWritten = outbuf.flush(ioSession);
487 if (bytesWritten > 0) {
488 outTransportMetrics.incrementBytesTransferred(bytesWritten);
489 }
490 } else {
491 ioSession.clearEvent(SelectionKey.OP_WRITE);
492 }
493 } finally {
494 ioSession.getLock().unlock();
495 }
496 }
497
498 int streamOutput(final ByteBuffer src) throws IOException {
499 ioSession.getLock().lock();
500 try {
501 if (outgoingMessage == null) {
502 throw new ClosedChannelException();
503 }
504 final ContentEncoder contentEncoder = outgoingMessage.getBody();
505 final int bytesWritten = contentEncoder.write(src);
506 if (bytesWritten > 0) {
507 ioSession.setEvent(SelectionKey.OP_WRITE);
508 }
509 return bytesWritten;
510 } finally {
511 ioSession.getLock().unlock();
512 }
513 }
514
515 enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
516
517 MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
518 ioSession.getLock().lock();
519 try {
520 if (outgoingMessage == null) {
521 return MessageDelineation.NONE;
522 }
523 final ContentEncoder contentEncoder = outgoingMessage.getBody();
524 contentEncoder.complete(trailers);
525 ioSession.setEvent(SelectionKey.OP_WRITE);
526 outgoingMessage = null;
527 return contentEncoder instanceof ChunkEncoder
528 ? MessageDelineation.CHUNK_CODED
529 : MessageDelineation.MESSAGE_HEAD;
530 } finally {
531 ioSession.getLock().unlock();
532 }
533 }
534
535 boolean isOutputCompleted() {
536 ioSession.getLock().lock();
537 try {
538 if (outgoingMessage == null) {
539 return true;
540 }
541 final ContentEncoder contentEncoder = outgoingMessage.getBody();
542 return contentEncoder.isCompleted();
543 } finally {
544 ioSession.getLock().unlock();
545 }
546 }
547
548 @Override
549 public void close() throws IOException {
550 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
551 }
552
553 @Override
554 public void close(final CloseMode closeMode) {
555 ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
556 }
557
558 @Override
559 public boolean isOpen() {
560 return connState == ConnectionState.ACTIVE;
561 }
562
563 @Override
564 public Timeout getSocketTimeout() {
565 return ioSession.getSocketTimeout();
566 }
567
568 @Override
569 public void setSocketTimeout(final Timeout timeout) {
570 ioSession.setSocketTimeout(timeout);
571 }
572
573 @Override
574 public EndpointDetails getEndpointDetails() {
575 if (endpointDetails == null) {
576 endpointDetails = new BasicEndpointDetails(
577 ioSession.getRemoteAddress(),
578 ioSession.getLocalAddress(),
579 connMetrics,
580 ioSession.getSocketTimeout());
581 }
582 return endpointDetails;
583 }
584
585 @Override
586 public ProtocolVersion getProtocolVersion() {
587 return version;
588 }
589
590 @Override
591 public SocketAddress getRemoteAddress() {
592 return ioSession.getRemoteAddress();
593 }
594
595 @Override
596 public SocketAddress getLocalAddress() {
597 return ioSession.getLocalAddress();
598 }
599
600 @Override
601 public SSLSession getSSLSession() {
602 final TlsDetails tlsDetails = ioSession.getTlsDetails();
603 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
604 }
605
606 void appendState(final StringBuilder buf) {
607 buf.append("connState=").append(connState)
608 .append(", inbuf=").append(inbuf)
609 .append(", outbuf=").append(outbuf)
610 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
611 }
612
613 static class CapacityWindow implements CapacityChannel {
614 private final IOSession ioSession;
615 private final ReentrantLock lock;
616 private int window;
617 private boolean closed;
618
619 CapacityWindow(final int window, final IOSession ioSession) {
620 this.window = window;
621 this.ioSession = ioSession;
622 this.lock = new ReentrantLock();
623 }
624
625 @Override
626 public void update(final int increment) throws IOException {
627 lock.lock();
628 try {
629 if (closed) {
630 return;
631 }
632 if (increment > 0) {
633 updateWindow(increment);
634 ioSession.setEvent(SelectionKey.OP_READ);
635 }
636 } finally {
637 lock.unlock();
638 }
639 }
640
641
642
643
644
645 int removeCapacity(final int delta) {
646 lock.lock();
647 try {
648 updateWindow(-delta);
649 if (window <= 0) {
650 ioSession.clearEvent(SelectionKey.OP_READ);
651 }
652 return window;
653 } finally {
654 lock.unlock();
655 }
656 }
657
658 private void updateWindow(final int delta) {
659 int newValue = window + delta;
660
661 if (((window ^ newValue) & (delta ^ newValue)) < 0) {
662 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
663 }
664 window = newValue;
665 }
666
667
668
669
670
671 void close() {
672 lock.lock();
673 try {
674 closed = true;
675 } finally {
676 lock.unlock();
677 }
678 }
679
680
681 int getWindow() {
682 return window;
683 }
684 }
685 }