1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ClosedChannelException;
34 import java.nio.channels.ReadableByteChannel;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.WritableByteChannel;
37 import java.util.List;
38 import java.util.concurrent.atomic.AtomicInteger;
39
40 import javax.net.ssl.SSLSession;
41
42 import org.apache.hc.core5.http.ConnectionClosedException;
43 import org.apache.hc.core5.http.ContentLengthStrategy;
44 import org.apache.hc.core5.http.EndpointDetails;
45 import org.apache.hc.core5.http.EntityDetails;
46 import org.apache.hc.core5.http.Header;
47 import org.apache.hc.core5.http.HttpConnection;
48 import org.apache.hc.core5.http.HttpException;
49 import org.apache.hc.core5.http.HttpMessage;
50 import org.apache.hc.core5.http.Message;
51 import org.apache.hc.core5.http.ProtocolVersion;
52 import org.apache.hc.core5.http.config.CharCodingConfig;
53 import org.apache.hc.core5.http.config.Http1Config;
54 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
55 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
56 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
57 import org.apache.hc.core5.http.impl.CharCodingSupport;
58 import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
59 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
60 import org.apache.hc.core5.http.nio.CapacityChannel;
61 import org.apache.hc.core5.http.nio.ContentDecoder;
62 import org.apache.hc.core5.http.nio.ContentEncoder;
63 import org.apache.hc.core5.http.nio.NHttpMessageParser;
64 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
65 import org.apache.hc.core5.http.nio.SessionInputBuffer;
66 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
67 import org.apache.hc.core5.http.nio.command.CommandSupport;
68 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
69 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
70 import org.apache.hc.core5.io.CloseMode;
71 import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
72 import org.apache.hc.core5.reactor.Command;
73 import org.apache.hc.core5.reactor.EventMask;
74 import org.apache.hc.core5.reactor.IOSession;
75 import org.apache.hc.core5.reactor.ProtocolIOSession;
76 import org.apache.hc.core5.reactor.ssl.TlsDetails;
77 import org.apache.hc.core5.util.Args;
78 import org.apache.hc.core5.util.Identifiable;
79 import org.apache.hc.core5.util.Timeout;
80
81 abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
82 implements Identifiable, HttpConnection {
83
84 private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
85
86 private final ProtocolIOSession ioSession;
87 private final Http1Config http1Config;
88 private final SessionInputBufferImpl inbuf;
89 private final SessionOutputBufferImpl outbuf;
90 private final BasicHttpTransportMetrics inTransportMetrics;
91 private final BasicHttpTransportMetrics outTransportMetrics;
92 private final BasicHttpConnectionMetrics connMetrics;
93 private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
94 private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
95 private final ContentLengthStrategy incomingContentStrategy;
96 private final ContentLengthStrategy outgoingContentStrategy;
97 private final ByteBuffer contentBuffer;
98 private final AtomicInteger outputRequests;
99
100 private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
101 private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
102 private volatile ConnectionState connState;
103 private volatile CapacityWindow capacityWindow;
104
105 private volatile ProtocolVersion version;
106 private volatile EndpointDetails endpointDetails;
107
108 AbstractHttp1StreamDuplexer(
109 final ProtocolIOSession ioSession,
110 final Http1Config http1Config,
111 final CharCodingConfig charCodingConfig,
112 final NHttpMessageParser<IncomingMessage> incomingMessageParser,
113 final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
114 final ContentLengthStrategy incomingContentStrategy,
115 final ContentLengthStrategy outgoingContentStrategy) {
116 this.ioSession = Args.notNull(ioSession, "I/O session");
117 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
118 final int bufferSize = this.http1Config.getBufferSize();
119 this.inbuf = new SessionInputBufferImpl(bufferSize, Math.min(bufferSize, 512),
120 this.http1Config.getMaxLineLength(),
121 CharCodingSupport.createDecoder(charCodingConfig));
122 this.outbuf = new SessionOutputBufferImpl(bufferSize, Math.min(bufferSize, 512),
123 CharCodingSupport.createEncoder(charCodingConfig));
124 this.inTransportMetrics = new BasicHttpTransportMetrics();
125 this.outTransportMetrics = new BasicHttpTransportMetrics();
126 this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
127 this.incomingMessageParser = incomingMessageParser;
128 this.outgoingMessageWriter = outgoingMessageWriter;
129 this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
130 DefaultContentLengthStrategy.INSTANCE;
131 this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
132 DefaultContentLengthStrategy.INSTANCE;
133 this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
134 this.outputRequests = new AtomicInteger(0);
135 this.connState = ConnectionState.READY;
136 }
137
138 @Override
139 public String getId() {
140 return ioSession.getId();
141 }
142
143 boolean isActive() {
144 return connState == ConnectionState.ACTIVE;
145 }
146
147 boolean isShuttingDown() {
148 return connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0;
149 }
150
151 void shutdownSession(final CloseMode closeMode) {
152 if (closeMode == CloseMode.GRACEFUL) {
153 connState = ConnectionState.GRACEFUL_SHUTDOWN;
154 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
155 } else {
156 connState = ConnectionState.SHUTDOWN;
157 ioSession.close();
158 }
159 }
160
161 void shutdownSession(final Exception cause) {
162 connState = ConnectionState.SHUTDOWN;
163 try {
164 terminate(cause);
165 } finally {
166 final CloseMode closeMode;
167 if (cause instanceof ConnectionClosedException) {
168 closeMode = CloseMode.GRACEFUL;
169 } else if (cause instanceof IOException) {
170 closeMode = CloseMode.IMMEDIATE;
171 } else {
172 closeMode = CloseMode.GRACEFUL;
173 }
174 ioSession.close(closeMode);
175 }
176 }
177
178 abstract void disconnected();
179
180 abstract void terminate(final Exception exception);
181
182 abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
183
184 abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
185
186 abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
187
188 abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
189
190 abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
191
192 abstract ContentDecoder createContentDecoder(
193 long contentLength,
194 ReadableByteChannel channel,
195 SessionInputBuffer buffer,
196 BasicHttpTransportMetrics metrics) throws HttpException;
197
198 abstract ContentEncoder createContentEncoder(
199 long contentLength,
200 WritableByteChannel channel,
201 SessionOutputBuffer buffer,
202 BasicHttpTransportMetrics metrics) throws HttpException;
203
204 abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
205
206 abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
207
208 abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
209
210 abstract boolean isOutputReady();
211
212 abstract void produceOutput() throws HttpException, IOException;
213
214 abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
215
216 abstract void inputEnd() throws HttpException, IOException;
217
218 abstract void outputEnd() throws HttpException, IOException;
219
220 abstract boolean inputIdle();
221
222 abstract boolean outputIdle();
223
224 abstract boolean handleTimeout();
225
226 private void processCommands() throws HttpException, IOException {
227 for (;;) {
228 final Command command = ioSession.poll();
229 if (command == null) {
230 return;
231 }
232 if (command instanceof ShutdownCommand) {
233 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
234 requestShutdown(shutdownCommand.getType());
235 } else if (command instanceof RequestExecutionCommand) {
236 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
237 command.cancel();
238 } else {
239 execute((RequestExecutionCommand) command);
240 return;
241 }
242 } else {
243 throw new HttpException("Unexpected command: " + command.getClass());
244 }
245 }
246 }
247
248 public final void onConnect() throws HttpException, IOException {
249 if (connState == ConnectionState.READY) {
250 connState = ConnectionState.ACTIVE;
251 processCommands();
252 }
253 }
254
255 IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
256 final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
257 if (messageHead != null) {
258 incomingMessageParser.reset();
259 }
260 return messageHead;
261 }
262
263 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
264 if (src != null) {
265 final int n = src.remaining();
266 inbuf.put(src);
267 inTransportMetrics.incrementBytesTransferred(n);
268 }
269
270 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
271 ioSession.clearEvent(SelectionKey.OP_READ);
272 return;
273 }
274
275 boolean endOfStream = false;
276 if (incomingMessage == null) {
277 final int bytesRead = inbuf.fill(ioSession);
278 if (bytesRead > 0) {
279 inTransportMetrics.incrementBytesTransferred(bytesRead);
280 }
281 endOfStream = bytesRead == -1;
282 }
283
284 do {
285 if (incomingMessage == null) {
286
287 final IncomingMessage messageHead = parseMessageHead(endOfStream);
288 if (messageHead != null) {
289 this.version = messageHead.getVersion();
290
291 updateInputMetrics(messageHead, connMetrics);
292 final ContentDecoder contentDecoder;
293 if (handleIncomingMessage(messageHead)) {
294 final long len = incomingContentStrategy.determineLength(messageHead);
295 contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
296 consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
297 } else {
298 consumeHeader(messageHead, null);
299 contentDecoder = null;
300 }
301 capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
302 if (contentDecoder != null) {
303 incomingMessage = new Message<>(messageHead, contentDecoder);
304 } else {
305 inputEnd();
306 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
307 ioSession.setEvent(SelectionKey.OP_READ);
308 }
309 }
310 } else {
311 break;
312 }
313 }
314
315 if (incomingMessage != null) {
316 final ContentDecoder contentDecoder = incomingMessage.getBody();
317
318
319
320
321
322 final int bytesRead = contentDecoder.read(contentBuffer);
323 if (bytesRead > 0) {
324 contentBuffer.flip();
325 consumeData(contentBuffer);
326 contentBuffer.clear();
327 final int capacity = capacityWindow.removeCapacity(bytesRead);
328 if (capacity <= 0) {
329 if (!contentDecoder.isCompleted()) {
330 updateCapacity(capacityWindow);
331 }
332 }
333 }
334 if (contentDecoder.isCompleted()) {
335 dataEnd(contentDecoder.getTrailers());
336 capacityWindow.close();
337 incomingMessage = null;
338 ioSession.setEvent(SelectionKey.OP_READ);
339 inputEnd();
340 } else if (bytesRead == 0) {
341 break;
342 }
343 }
344 } while (inbuf.hasData());
345
346 if (endOfStream && !inbuf.hasData()) {
347 if (outputIdle() && inputIdle()) {
348 requestShutdown(CloseMode.GRACEFUL);
349 } else {
350 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
351 }
352 }
353 }
354
355 public final void onOutput() throws IOException, HttpException {
356 ioSession.getLock().lock();
357 try {
358 if (outbuf.hasData()) {
359 final int bytesWritten = outbuf.flush(ioSession);
360 if (bytesWritten > 0) {
361 outTransportMetrics.incrementBytesTransferred(bytesWritten);
362 }
363 }
364 } finally {
365 ioSession.getLock().unlock();
366 }
367 if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
368 final int pendingOutputRequests = outputRequests.get();
369 produceOutput();
370 final boolean outputPending = isOutputReady();
371 final boolean outputEnd;
372 ioSession.getLock().lock();
373 try {
374 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
375 ioSession.clearEvent(SelectionKey.OP_WRITE);
376 } else {
377 outputRequests.addAndGet(-pendingOutputRequests);
378 }
379 outputEnd = outgoingMessage == null && !outbuf.hasData();
380 } finally {
381 ioSession.getLock().unlock();
382 }
383 if (outputEnd) {
384 outputEnd();
385 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
386 processCommands();
387 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
388 connState = ConnectionState.SHUTDOWN;
389 }
390 }
391 }
392 if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
393 ioSession.close();
394 }
395 }
396
397 public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
398 if (!handleTimeout()) {
399 onException(SocketTimeoutExceptionFactory.create(timeout));
400 }
401 }
402
403 public final void onException(final Exception ex) {
404 shutdownSession(ex);
405 CommandSupport.failCommands(ioSession, ex);
406 }
407
408 public final void onDisconnect() {
409 disconnected();
410 CommandSupport.cancelCommands(ioSession);
411 }
412
413 void requestShutdown(final CloseMode closeMode) {
414 switch (closeMode) {
415 case GRACEFUL:
416 if (connState == ConnectionState.ACTIVE) {
417 connState = ConnectionState.GRACEFUL_SHUTDOWN;
418 }
419 break;
420 case IMMEDIATE:
421 connState = ConnectionState.SHUTDOWN;
422 break;
423 }
424 ioSession.setEvent(SelectionKey.OP_WRITE);
425 }
426
427 void commitMessageHead(
428 final OutgoingMessage messageHead,
429 final boolean endStream,
430 final FlushMode flushMode) throws HttpException, IOException {
431 ioSession.getLock().lock();
432 try {
433 outgoingMessageWriter.write(messageHead, outbuf);
434 updateOutputMetrics(messageHead, connMetrics);
435 if (!endStream) {
436 final ContentEncoder contentEncoder;
437 if (handleOutgoingMessage(messageHead)) {
438 final long len = outgoingContentStrategy.determineLength(messageHead);
439 contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
440 } else {
441 contentEncoder = null;
442 }
443 if (contentEncoder != null) {
444 outgoingMessage = new Message<>(messageHead, contentEncoder);
445 }
446 }
447 outgoingMessageWriter.reset();
448 if (flushMode == FlushMode.IMMEDIATE) {
449 final int bytesWritten = outbuf.flush(ioSession);
450 if (bytesWritten > 0) {
451 outTransportMetrics.incrementBytesTransferred(bytesWritten);
452 }
453 }
454 ioSession.setEvent(EventMask.WRITE);
455 } finally {
456 ioSession.getLock().unlock();
457 }
458 }
459
460 void requestSessionInput() {
461 ioSession.setEvent(SelectionKey.OP_READ);
462 }
463
464 void requestSessionOutput() {
465 outputRequests.incrementAndGet();
466 ioSession.setEvent(SelectionKey.OP_WRITE);
467 }
468
469 Timeout getSessionTimeout() {
470 return ioSession.getSocketTimeout();
471 }
472
473 void setSessionTimeout(final Timeout timeout) {
474 ioSession.setSocketTimeout(timeout);
475 }
476
477 void suspendSessionInput() {
478 ioSession.clearEvent(SelectionKey.OP_READ);
479 }
480
481 void suspendSessionOutput() throws IOException {
482 ioSession.getLock().lock();
483 try {
484 if (outbuf.hasData()) {
485 final int bytesWritten = outbuf.flush(ioSession);
486 if (bytesWritten > 0) {
487 outTransportMetrics.incrementBytesTransferred(bytesWritten);
488 }
489 } else {
490 ioSession.clearEvent(SelectionKey.OP_WRITE);
491 }
492 } finally {
493 ioSession.getLock().unlock();
494 }
495 }
496
497 int streamOutput(final ByteBuffer src) throws IOException {
498 ioSession.getLock().lock();
499 try {
500 if (outgoingMessage == null) {
501 throw new ClosedChannelException();
502 }
503 final ContentEncoder contentEncoder = outgoingMessage.getBody();
504 final int bytesWritten = contentEncoder.write(src);
505 if (bytesWritten > 0) {
506 ioSession.setEvent(SelectionKey.OP_WRITE);
507 }
508 return bytesWritten;
509 } finally {
510 ioSession.getLock().unlock();
511 }
512 }
513
514 enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
515
516 MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
517 ioSession.getLock().lock();
518 try {
519 if (outgoingMessage == null) {
520 return MessageDelineation.NONE;
521 }
522 final ContentEncoder contentEncoder = outgoingMessage.getBody();
523 contentEncoder.complete(trailers);
524 ioSession.setEvent(SelectionKey.OP_WRITE);
525 outgoingMessage = null;
526 return contentEncoder instanceof ChunkEncoder
527 ? MessageDelineation.CHUNK_CODED
528 : MessageDelineation.MESSAGE_HEAD;
529 } finally {
530 ioSession.getLock().unlock();
531 }
532 }
533
534 boolean isOutputCompleted() {
535 ioSession.getLock().lock();
536 try {
537 if (outgoingMessage == null) {
538 return true;
539 }
540 final ContentEncoder contentEncoder = outgoingMessage.getBody();
541 return contentEncoder.isCompleted();
542 } finally {
543 ioSession.getLock().unlock();
544 }
545 }
546
547 @Override
548 public void close() throws IOException {
549 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
550 }
551
552 @Override
553 public void close(final CloseMode closeMode) {
554 ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
555 }
556
557 @Override
558 public boolean isOpen() {
559 return connState == ConnectionState.ACTIVE;
560 }
561
562 @Override
563 public Timeout getSocketTimeout() {
564 return ioSession.getSocketTimeout();
565 }
566
567 @Override
568 public void setSocketTimeout(final Timeout timeout) {
569 ioSession.setSocketTimeout(timeout);
570 }
571
572 @Override
573 public EndpointDetails getEndpointDetails() {
574 if (endpointDetails == null) {
575 endpointDetails = new BasicEndpointDetails(
576 ioSession.getRemoteAddress(),
577 ioSession.getLocalAddress(),
578 connMetrics,
579 ioSession.getSocketTimeout());
580 }
581 return endpointDetails;
582 }
583
584 @Override
585 public ProtocolVersion getProtocolVersion() {
586 return version;
587 }
588
589 @Override
590 public SocketAddress getRemoteAddress() {
591 return ioSession.getRemoteAddress();
592 }
593
594 @Override
595 public SocketAddress getLocalAddress() {
596 return ioSession.getLocalAddress();
597 }
598
599 @Override
600 public SSLSession getSSLSession() {
601 final TlsDetails tlsDetails = ioSession.getTlsDetails();
602 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
603 }
604
605 void appendState(final StringBuilder buf) {
606 buf.append("connState=").append(connState)
607 .append(", inbuf=").append(inbuf)
608 .append(", outbuf=").append(outbuf)
609 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
610 }
611
612 static class CapacityWindow implements CapacityChannel {
613 private final IOSession ioSession;
614 private final Object lock;
615 private int window;
616 private boolean closed;
617
618 CapacityWindow(final int window, final IOSession ioSession) {
619 this.window = window;
620 this.ioSession = ioSession;
621 this.lock = new Object();
622 }
623
624 @Override
625 public void update(final int increment) throws IOException {
626 synchronized (lock) {
627 if (closed) {
628 return;
629 }
630 if (increment > 0) {
631 updateWindow(increment);
632 ioSession.setEvent(SelectionKey.OP_READ);
633 }
634 }
635 }
636
637
638
639
640
641 int removeCapacity(final int delta) {
642 synchronized (lock) {
643 updateWindow(-delta);
644 if (window <= 0) {
645 ioSession.clearEvent(SelectionKey.OP_READ);
646 }
647 return window;
648 }
649 }
650
651 private void updateWindow(final int delta) {
652 int newValue = window + delta;
653
654 if (((window ^ newValue) & (delta ^ newValue)) < 0) {
655 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
656 }
657 window = newValue;
658 }
659
660
661
662
663
664 void close() {
665 synchronized (lock) {
666 closed = true;
667 }
668 }
669
670
671 int getWindow() {
672 return window;
673 }
674 }
675 }