/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.coyote.http2; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.http.WebConnection; import org.apache.coyote.ActionCode; import org.apache.coyote.Adapter; import org.apache.coyote.CloseNowException; import org.apache.coyote.ProtocolException; import org.apache.coyote.Request; import org.apache.coyote.Response; import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler; import org.apache.coyote.http2.HpackDecoder.HeaderEmitter; import org.apache.coyote.http2.HpackEncoder.State; import org.apache.coyote.http2.Http2Parser.Input; import org.apache.coyote.http2.Http2Parser.Output; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.codec.binary.Base64; import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.net.SSLSupport; import org.apache.tomcat.util.net.SendfileState; import org.apache.tomcat.util.net.SocketEvent; import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.res.StringManager; /** * This represents an HTTP/2 connection from a client to Tomcat. It is designed * on the basis that there will never be more than one thread performing I/O at * a time. *
* For reading, this implementation is blocking within frames and non-blocking * between frames. *
* Note: * */ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeHandler, Input, Output { protected static final Log log = LogFactory.getLog(Http2UpgradeHandler.class); protected static final StringManager sm = StringManager.getManager(Http2UpgradeHandler.class); private static final AtomicInteger connectionIdGenerator = new AtomicInteger(0); private static final Integer STREAM_ID_ZERO = Integer.valueOf(0); protected static final int FLAG_END_OF_STREAM = 1; protected static final int FLAG_END_OF_HEADERS = 4; protected static final byte[] PING = { 0x00, 0x00, 0x08, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00}; protected static final byte[] PING_ACK = { 0x00, 0x00, 0x08, 0x06, 0x01, 0x00, 0x00, 0x00, 0x00 }; protected static final byte[] SETTINGS_ACK = { 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00 }; protected static final byte[] GOAWAY = { 0x07, 0x00, 0x00, 0x00, 0x00, 0x00 }; private static final String HTTP2_SETTINGS_HEADER = "HTTP2-Settings"; private static final HeaderSink HEADER_SINK = new HeaderSink(); protected final String connectionId; protected final Http2Protocol protocol; private final Adapter adapter; protected volatile SocketWrapperBase socketWrapper; private volatile SSLSupport sslSupport; private volatile Http2Parser parser; // Simple state machine (sequence of states) private AtomicReference connectionState = new AtomicReference<>(ConnectionState.NEW); private volatile long pausedNanoTime = Long.MAX_VALUE; /** * Remote settings are settings defined by the client and sent to Tomcat * that Tomcat must use when communicating with the client. */ private final ConnectionSettingsRemote remoteSettings; /** * Local settings are settings defined by Tomcat and sent to the client that * the client must use when communicating with Tomcat. */ protected final ConnectionSettingsLocal localSettings; private HpackDecoder hpackDecoder; private HpackEncoder hpackEncoder; private final Map streams = new ConcurrentHashMap<>(); protected final AtomicInteger activeRemoteStreamCount = new AtomicInteger(0); // Start at -1 so the 'add 2' logic in closeIdleStreams() works private volatile int maxActiveRemoteStreamId = -1; private volatile int maxProcessedStreamId; private final AtomicInteger nextLocalStreamId = new AtomicInteger(2); private final PingManager pingManager = getPingManager(); private volatile int newStreamsSinceLastPrune = 0; // Tracking for when the connection is blocked (windowSize < 1) private final Map backLogStreams = new ConcurrentHashMap<>(); private long backLogSize = 0; // Stream concurrency control private AtomicInteger streamConcurrency = null; private Queue queuedRunnable = null; // Track 'overhead' frames vs 'request/response' frames private final AtomicLong overheadCount = new AtomicLong(-10); Http2UpgradeHandler(Http2Protocol protocol, Adapter adapter, Request coyoteRequest) { super (STREAM_ID_ZERO); this.protocol = protocol; this.adapter = adapter; this.connectionId = Integer.toString(connectionIdGenerator.getAndIncrement()); remoteSettings = new ConnectionSettingsRemote(connectionId); localSettings = new ConnectionSettingsLocal(connectionId); localSettings.set(Setting.MAX_CONCURRENT_STREAMS, protocol.getMaxConcurrentStreams()); localSettings.set(Setting.INITIAL_WINDOW_SIZE, protocol.getInitialWindowSize()); pingManager.initiateDisabled = protocol.getInitiatePingDisabled(); // Initial HTTP request becomes stream 1. if (coyoteRequest != null) { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.upgrade", connectionId)); } Integer key = Integer.valueOf(1); Stream stream = new Stream(key, this, coyoteRequest); streams.put(key, stream); maxActiveRemoteStreamId = 1; activeRemoteStreamCount.set(1); maxProcessedStreamId = 1; } } protected PingManager getPingManager() { return new PingManager(); } @Override public void init(WebConnection webConnection) { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.init", connectionId, connectionState.get())); } if (!connectionState.compareAndSet(ConnectionState.NEW, ConnectionState.CONNECTED)) { return; } // Init concurrency control if needed if (protocol.getMaxConcurrentStreamExecution() < localSettings.getMaxConcurrentStreams()) { streamConcurrency = new AtomicInteger(0); queuedRunnable = new ConcurrentLinkedQueue<>(); } parser = getParser(connectionId); Stream stream = null; socketWrapper.setReadTimeout(protocol.getReadTimeout()); socketWrapper.setWriteTimeout(protocol.getWriteTimeout()); if (webConnection != null) { // HTTP/2 started via HTTP upgrade. // The initial HTTP/1.1 request is available as Stream 1. try { // Process the initial settings frame stream = getStream(1, true); String base64Settings = stream.getCoyoteRequest().getHeader(HTTP2_SETTINGS_HEADER); byte[] settings = Base64.decodeBase64(base64Settings); // Settings are only valid on stream 0 FrameType.SETTINGS.check(0, settings.length); for (int i = 0; i < settings.length % 6; i++) { int id = ByteUtil.getTwoBytes(settings, i * 6); long value = ByteUtil.getFourBytes(settings, (i * 6) + 2); remoteSettings.set(Setting.valueOf(id), value); } } catch (Http2Exception e) { throw new ProtocolException( sm.getString("upgradeHandler.upgrade.fail", connectionId)); } } // Send the initial settings frame writeSettings(); // Make sure the client has sent a valid connection preface before we // send the response to the original request over HTTP/2. try { parser.readConnectionPreface(); } catch (Http2Exception e) { String msg = sm.getString("upgradeHandler.invalidPreface", connectionId); if (log.isDebugEnabled()) { log.debug(msg); } throw new ProtocolException(msg); } if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.prefaceReceived", connectionId)); } // Send a ping to get an idea of round trip time as early as possible try { pingManager.sendPing(true); } catch (IOException ioe) { throw new ProtocolException(sm.getString("upgradeHandler.pingFailed"), ioe); } if (webConnection != null) { processStreamOnContainerThread(stream); } } protected Http2Parser getParser(String connectionId) { return new Http2Parser(connectionId, this, this); } private void processStreamOnContainerThread(Stream stream) { StreamProcessor streamProcessor = new StreamProcessor(this, stream, adapter, socketWrapper); streamProcessor.setSslSupport(sslSupport); processStreamOnContainerThread(streamProcessor, SocketEvent.OPEN_READ); } void processStreamOnContainerThread(StreamProcessor streamProcessor, SocketEvent event) { StreamRunnable streamRunnable = new StreamRunnable(streamProcessor, event); if (streamConcurrency == null) { socketWrapper.execute(streamRunnable); } else { if (getStreamConcurrency() < protocol.getMaxConcurrentStreamExecution()) { increaseStreamConcurrency(); socketWrapper.execute(streamRunnable); } else { queuedRunnable.offer(streamRunnable); } } } @Override public void setSocketWrapper(SocketWrapperBase wrapper) { this.socketWrapper = wrapper; } @Override public void setSslSupport(SSLSupport sslSupport) { this.sslSupport = sslSupport; } @Override public SocketState upgradeDispatch(SocketEvent status) { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.upgradeDispatch.entry", connectionId, status)); } // WebConnection is not used so passing null here is fine // Might not be necessary. init() will handle that. init(null); SocketState result = SocketState.CLOSED; try { pingManager.sendPing(false); switch(status) { case OPEN_READ: try { // There is data to read so use the read timeout while // reading frames. socketWrapper.setReadTimeout(protocol.getReadTimeout()); while (true) { try { if (!parser.readFrame(false)) { break; } } catch (StreamException se) { // Stream errors are not fatal to the connection so // continue reading frames Stream stream = getStream(se.getStreamId(), false); if (stream == null) { sendStreamReset(se); } else { stream.close(se); } } } if (overheadCount.get() > 0) { throw new ConnectionException( sm.getString("upgradeHandler.tooMuchOverhead", connectionId), Http2Error.ENHANCE_YOUR_CALM); } if (activeRemoteStreamCount.get() == 0) { // No streams currently active. Use the keep-alive // timeout for the connection. socketWrapper.setReadTimeout(protocol.getKeepAliveTimeout()); } else { // Streams currently active. Individual streams have // timeouts so keep the connection open. socketWrapper.setReadTimeout(-1); } } catch (Http2Exception ce) { // Really ConnectionException if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.connectionError"), ce); } closeConnection(ce); break; } if (connectionState.get() != ConnectionState.CLOSED) { result = SocketState.UPGRADED; } break; case OPEN_WRITE: processWrites(); result = SocketState.UPGRADED; break; case DISCONNECT: case ERROR: case TIMEOUT: case STOP: close(); break; } } catch (IOException ioe) { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.ioerror", connectionId), ioe); } close(); } if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.upgradeDispatch.exit", connectionId, result)); } return result; } ConnectionSettingsRemote getRemoteSettings() { return remoteSettings; } ConnectionSettingsLocal getLocalSettings() { return localSettings; } Http2Protocol getProtocol() { return protocol; } @Override public void pause() { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.pause.entry", connectionId)); } if (connectionState.compareAndSet(ConnectionState.CONNECTED, ConnectionState.PAUSING)) { pausedNanoTime = System.nanoTime(); try { writeGoAwayFrame((1 << 31) - 1, Http2Error.NO_ERROR.getCode(), null); } catch (IOException ioe) { // This is fatal for the connection. Ignore it here. There will be // further attempts at I/O in upgradeDispatch() and it can better // handle the IO errors. } } } @Override public void destroy() { // NO-OP } void checkPauseState() throws IOException { if (connectionState.get() == ConnectionState.PAUSING) { if (pausedNanoTime + pingManager.getRoundTripTimeNano() < System.nanoTime()) { connectionState.compareAndSet(ConnectionState.PAUSING, ConnectionState.PAUSED); writeGoAwayFrame(maxProcessedStreamId, Http2Error.NO_ERROR.getCode(), null); } } } private int increaseStreamConcurrency() { return streamConcurrency.incrementAndGet(); } private int decreaseStreamConcurrency() { return streamConcurrency.decrementAndGet(); } private int getStreamConcurrency() { return streamConcurrency.get(); } void executeQueuedStream() { if (streamConcurrency == null) { return; } decreaseStreamConcurrency(); if (getStreamConcurrency() < protocol.getMaxConcurrentStreamExecution()) { StreamRunnable streamRunnable = queuedRunnable.poll(); if (streamRunnable != null) { increaseStreamConcurrency(); socketWrapper.execute(streamRunnable); } } } void sendStreamReset(StreamException se) throws IOException { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.rst.debug", connectionId, Integer.toString(se.getStreamId()), se.getError(), se.getMessage())); } // Write a RST frame byte[] rstFrame = new byte[13]; // Length ByteUtil.setThreeBytes(rstFrame, 0, 4); // Type rstFrame[3] = FrameType.RST.getIdByte(); // No flags // Stream ID ByteUtil.set31Bits(rstFrame, 5, se.getStreamId()); // Payload ByteUtil.setFourBytes(rstFrame, 9, se.getError().getCode()); synchronized (socketWrapper) { socketWrapper.write(true, rstFrame, 0, rstFrame.length); socketWrapper.flush(true); } } void closeConnection(Http2Exception ce) { try { writeGoAwayFrame(maxProcessedStreamId, ce.getError().getCode(), ce.getMessage().getBytes(StandardCharsets.UTF_8)); } catch (IOException ioe) { // Ignore. GOAWAY is sent on a best efforts basis and the original // error has already been logged. } close(); } protected void writeSettings() { // Send the initial settings frame try { byte[] settings = localSettings.getSettingsFrameForPending(); socketWrapper.write(true, settings, 0, settings.length); socketWrapper.flush(true); } catch (IOException ioe) { String msg = sm.getString("upgradeHandler.sendPrefaceFail", connectionId); if (log.isDebugEnabled()) { log.debug(msg); } throw new ProtocolException(msg, ioe); } } protected void writeGoAwayFrame(int maxStreamId, long errorCode, byte[] debugMsg) throws IOException { byte[] fixedPayload = new byte[8]; ByteUtil.set31Bits(fixedPayload, 0, maxStreamId); ByteUtil.setFourBytes(fixedPayload, 4, errorCode); int len = 8; if (debugMsg != null) { len += debugMsg.length; } byte[] payloadLength = new byte[3]; ByteUtil.setThreeBytes(payloadLength, 0, len); synchronized (socketWrapper) { socketWrapper.write(true, payloadLength, 0, payloadLength.length); socketWrapper.write(true, GOAWAY, 0, GOAWAY.length); socketWrapper.write(true, fixedPayload, 0, 8); if (debugMsg != null) { socketWrapper.write(true, debugMsg, 0, debugMsg.length); } socketWrapper.flush(true); } } void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, boolean endOfStream, int payloadSize) throws IOException { // This ensures the Stream processing thread has control of the socket. synchronized (socketWrapper) { doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, payloadSize); } if (endOfStream) { stream.sentEndOfStream(); } } /* * Separate method to allow Http2AsyncUpgradeHandler to call this code * without synchronizing on socketWrapper since it doesn't need to. */ protected HeaderFrameBuffers doWriteHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, boolean endOfStream, int payloadSize) throws IOException { if (log.isDebugEnabled()) { if (pushedStreamId == 0) { log.debug(sm.getString("upgradeHandler.writeHeaders", connectionId, stream.getIdentifier())); } else { log.debug(sm.getString("upgradeHandler.writePushHeaders", connectionId, stream.getIdentifier(), Integer.valueOf(pushedStreamId), Boolean.valueOf(endOfStream))); } } if (!stream.canWrite()) { return null; } HeaderFrameBuffers headerFrameBuffers = getHeaderFrameBuffers(payloadSize); byte[] pushedStreamIdBytes = null; if (pushedStreamId > 0) { pushedStreamIdBytes = new byte[4]; ByteUtil.set31Bits(pushedStreamIdBytes, 0, pushedStreamId); } boolean first = true; State state = null; while (state != State.COMPLETE) { headerFrameBuffers.startFrame(); if (first && pushedStreamIdBytes != null) { headerFrameBuffers.getPayload().put(pushedStreamIdBytes); } state = getHpackEncoder().encode(mimeHeaders, headerFrameBuffers.getPayload()); headerFrameBuffers.getPayload().flip(); if (state == State.COMPLETE || headerFrameBuffers.getPayload().limit() > 0) { ByteUtil.setThreeBytes(headerFrameBuffers.getHeader(), 0, headerFrameBuffers.getPayload().limit()); if (first) { first = false; if (pushedStreamIdBytes == null) { headerFrameBuffers.getHeader()[3] = FrameType.HEADERS.getIdByte(); } else { headerFrameBuffers.getHeader()[3] = FrameType.PUSH_PROMISE.getIdByte(); } if (endOfStream) { headerFrameBuffers.getHeader()[4] = FLAG_END_OF_STREAM; } } else { headerFrameBuffers.getHeader()[3] = FrameType.CONTINUATION.getIdByte(); } if (state == State.COMPLETE) { headerFrameBuffers.getHeader()[4] += FLAG_END_OF_HEADERS; } if (log.isDebugEnabled()) { log.debug(headerFrameBuffers.getPayload().limit() + " bytes"); } ByteUtil.set31Bits(headerFrameBuffers.getHeader(), 5, stream.getIdAsInt()); headerFrameBuffers.endFrame(); } else if (state == State.UNDERFLOW) { headerFrameBuffers.expandPayload(); } } headerFrameBuffers.endHeaders(); return headerFrameBuffers; } protected HeaderFrameBuffers getHeaderFrameBuffers(int initialPayloadSize) { return new DefaultHeaderFrameBuffers(initialPayloadSize); } protected HpackEncoder getHpackEncoder() { if (hpackEncoder == null) { hpackEncoder = new HpackEncoder(); } // Ensure latest agreed table size is used hpackEncoder.setMaxTableSize(remoteSettings.getHeaderTableSize()); return hpackEncoder; } void writeBody(Stream stream, ByteBuffer data, int len, boolean finished) throws IOException { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.writeBody", connectionId, stream.getIdentifier(), Integer.toString(len))); } reduceOverheadCount(); // Need to check this now since sending end of stream will change this. boolean writeable = stream.canWrite(); byte[] header = new byte[9]; ByteUtil.setThreeBytes(header, 0, len); header[3] = FrameType.DATA.getIdByte(); if (finished) { header[4] = FLAG_END_OF_STREAM; stream.sentEndOfStream(); if (!stream.isActive()) { activeRemoteStreamCount.decrementAndGet(); } } if (writeable) { ByteUtil.set31Bits(header, 5, stream.getIdAsInt()); synchronized (socketWrapper) { try { socketWrapper.write(true, header, 0, header.length); int orgLimit = data.limit(); data.limit(data.position() + len); socketWrapper.write(true, data); data.limit(orgLimit); socketWrapper.flush(true); } catch (IOException ioe) { handleAppInitiatedIOException(ioe); } } } } /* * Handles an I/O error on the socket underlying the HTTP/2 connection when * it is triggered by application code (usually reading the request or * writing the response). Such I/O errors are fatal so the connection is * closed. The exception is re-thrown to make the client code aware of the * problem. * * Note: We can not rely on this exception reaching the socket processor * since the application code may swallow it. */ protected void handleAppInitiatedIOException(IOException ioe) throws IOException { close(); throw ioe; } /* * Needs to know if this was application initiated since that affects the * error handling. */ void writeWindowUpdate(Stream stream, int increment, boolean applicationInitiated) throws IOException { if (!stream.canWrite()) { return; } synchronized (socketWrapper) { // Build window update frame for stream 0 byte[] frame = new byte[13]; ByteUtil.setThreeBytes(frame, 0, 4); frame[3] = FrameType.WINDOW_UPDATE.getIdByte(); ByteUtil.set31Bits(frame, 9, increment); socketWrapper.write(true, frame, 0, frame.length); // Change stream Id and re-use ByteUtil.set31Bits(frame, 5, stream.getIdAsInt()); try { socketWrapper.write(true, frame, 0, frame.length); socketWrapper.flush(true); } catch (IOException ioe) { if (applicationInitiated) { handleAppInitiatedIOException(ioe); } else { throw ioe; } } } } boolean hasAsyncIO() { return false; } protected void processWrites() throws IOException { synchronized (socketWrapper) { if (socketWrapper.flush(false)) { socketWrapper.registerWriteInterest(); } } } int reserveWindowSize(Stream stream, int reservation, boolean block) throws IOException { // Need to be holding the stream lock so releaseBacklog() can't notify // this thread until after this thread enters wait() int allocation = 0; synchronized (stream) { do { synchronized (this) { if (!stream.canWrite()) { throw new CloseNowException( sm.getString("upgradeHandler.stream.notWritable", stream.getConnectionId(), stream.getIdentifier())); } long windowSize = getWindowSize(); if (windowSize < 1 || backLogSize > 0) { // Has this stream been granted an allocation int[] value = backLogStreams.get(stream); if (value == null) { value = new int[] { reservation, 0 }; backLogStreams.put(stream, value); backLogSize += reservation; // Add the parents as well AbstractStream parent = stream.getParentStream(); while (parent != null && backLogStreams.putIfAbsent(parent, new int[2]) == null) { parent = parent.getParentStream(); } } else { if (value[1] > 0) { allocation = value[1]; decrementWindowSize(allocation); if (value[0] == 0) { // The reservation has been fully allocated // so this stream can be removed from the // backlog. backLogStreams.remove(stream); } else { // This allocation has been used. Reset the // allocation to zero. Leave the stream on // the backlog as it still has more bytes to // write. value[1] = 0; } } } } else if (windowSize < reservation) { allocation = (int) windowSize; decrementWindowSize(allocation); } else { allocation = reservation; decrementWindowSize(allocation); } } if (allocation == 0) { if (block) { try { stream.wait(); } catch (InterruptedException e) { throw new IOException(sm.getString( "upgradeHandler.windowSizeReservationInterrupted", connectionId, stream.getIdentifier(), Integer.toString(reservation)), e); } } else { return 0; } } } while (allocation == 0); } return allocation; } @SuppressWarnings("sync-override") // notifyAll() needs to be outside sync // to avoid deadlock @Override protected void incrementWindowSize(int increment) throws Http2Exception { Set streamsToNotify = null; synchronized (this) { long windowSize = getWindowSize(); if (windowSize < 1 && windowSize + increment > 0) { streamsToNotify = releaseBackLog((int) (windowSize +increment)); } super.incrementWindowSize(increment); } if (streamsToNotify != null) { for (AbstractStream stream : streamsToNotify) { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.releaseBacklog", connectionId, stream.getIdentifier())); } // There is never any O/P on stream zero but it is included in // the backlog as it simplifies the code. Skip it if it appears // here. if (this == stream) { continue; } Response coyoteResponse = ((Stream) stream).getCoyoteResponse(); if (coyoteResponse.getWriteListener() == null) { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.notifyAll", connectionId, stream.getIdentifier())); } // Blocking, so use notify to release StreamOutputBuffer synchronized (stream) { stream.notifyAll(); } } else { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.dispatchWrite", connectionId, stream.getIdentifier())); } // Non-blocking so dispatch coyoteResponse.action(ActionCode.DISPATCH_WRITE, null); // Need to explicitly execute dispatches on the // StreamProcessor as this thread is being processed by an // UpgradeProcessor which won't see this dispatch coyoteResponse.action(ActionCode.DISPATCH_EXECUTE, null); } } } } /** * Process send file (if supported) for the given stream. The appropriate * request attributes should be set before calling this method. * * @param sendfileData The stream and associated data to process * * @return The result of the send file processing */ protected SendfileState processSendfile(SendfileData sendfileData) { return SendfileState.DONE; } private synchronized Set releaseBackLog(int increment) { Set result = new HashSet<>(); if (backLogSize < increment) { // Can clear the whole backlog result.addAll(backLogStreams.keySet()); backLogStreams.clear(); backLogSize = 0; } else { int leftToAllocate = increment; while (leftToAllocate > 0) { leftToAllocate = allocate(this, leftToAllocate); } for (Entry entry : backLogStreams.entrySet()) { int allocation = entry.getValue()[1]; if (allocation > 0) { backLogSize -= allocation; result.add(entry.getKey()); } } } return result; } private int allocate(AbstractStream stream, int allocation) { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(), stream.getIdentifier(), Integer.toString(allocation))); } // Allocate to the specified stream int[] value = backLogStreams.get(stream); if (value[0] >= allocation) { value[0] -= allocation; value[1] += allocation; return 0; } // There was some left over so allocate that to the children of the // stream. int leftToAllocate = allocation; value[1] = value[0]; value[0] = 0; leftToAllocate -= value[1]; if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.allocate.left", getConnectionId(), stream.getIdentifier(), Integer.toString(leftToAllocate))); } // Recipients are children of the current stream that are in the // backlog. Set recipients = new HashSet<>(); recipients.addAll(stream.getChildStreams()); recipients.retainAll(backLogStreams.keySet()); // Loop until we run out of allocation or recipients while (leftToAllocate > 0) { if (recipients.size() == 0) { backLogStreams.remove(stream); return leftToAllocate; } int totalWeight = 0; for (AbstractStream recipient : recipients) { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.allocate.recipient", getConnectionId(), stream.getIdentifier(), recipient.getIdentifier(), Integer.toString(recipient.getWeight()))); } totalWeight += recipient.getWeight(); } // Use an Iterator so fully allocated children/recipients can be // removed. Iterator iter = recipients.iterator(); int allocated = 0; while (iter.hasNext()) { AbstractStream recipient = iter.next(); int share = leftToAllocate * recipient.getWeight() / totalWeight; if (share == 0) { // This is to avoid rounding issues triggering an infinite // loop. It will cause a very slight over allocation but // HTTP/2 should cope with that. share = 1; } int remainder = allocate(recipient, share); // Remove recipients that receive their full allocation so that // they are excluded from the next allocation round. if (remainder > 0) { iter.remove(); } allocated += (share - remainder); } leftToAllocate -= allocated; } return 0; } private Stream getStream(int streamId, boolean unknownIsError) throws ConnectionException { Integer key = Integer.valueOf(streamId); Stream result = streams.get(key); if (result == null && unknownIsError) { // Stream has been closed and removed from the map throw new ConnectionException(sm.getString("upgradeHandler.stream.closed", key), Http2Error.PROTOCOL_ERROR); } return result; } private Stream createRemoteStream(int streamId) throws ConnectionException { Integer key = Integer.valueOf(streamId); if (streamId %2 != 1) { throw new ConnectionException( sm.getString("upgradeHandler.stream.even", key), Http2Error.PROTOCOL_ERROR); } pruneClosedStreams(); Stream result = new Stream(key, this); streams.put(key, result); return result; } private Stream createLocalStream(Request request) { int streamId = nextLocalStreamId.getAndAdd(2); Integer key = Integer.valueOf(streamId); Stream result = new Stream(key, this, request); streams.put(key, result); return result; } private void close() { connectionState.set(ConnectionState.CLOSED); for (Stream stream : streams.values()) { // The connection is closing. Close the associated streams as no // longer required. stream.receiveReset(Http2Error.CANCEL.getCode()); } try { socketWrapper.close(); } catch (IOException ioe) { log.debug(sm.getString("upgradeHandler.socketCloseFailed"), ioe); } } private void pruneClosedStreams() { // Only prune every 10 new streams if (newStreamsSinceLastPrune < 9) { // Not atomic. Increments may be lost. Not a problem. newStreamsSinceLastPrune++; return; } // Reset counter newStreamsSinceLastPrune = 0; // RFC 7540, 5.3.4 endpoints should maintain state for at least the // maximum number of concurrent streams long max = localSettings.getMaxConcurrentStreams(); if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.pruneStart", connectionId, Long.toString(max), Integer.toString(streams.size()))); } // Allow an additional 10% for closed streams that are used in the // priority tree max = max + max / 10; if (max > Integer.MAX_VALUE) { max = Integer.MAX_VALUE; } int toClose = streams.size() - (int) max; if (toClose < 1) { return; } // Need to try and close some streams. // Try to close streams in this order // 1. Completed streams used for a request with no children // 2. Completed streams used for a request with children // 3. Closed final streams // // Steps 1 and 2 will always be completed. // Step 3 will be completed to the minimum extent necessary to bring the // total number of streams under the limit. // Use these sets to track the different classes of streams TreeSet candidatesStepOne = new TreeSet<>(); TreeSet candidatesStepTwo = new TreeSet<>(); TreeSet candidatesStepThree = new TreeSet<>(); for (Entry entry : streams.entrySet()) { Stream stream = entry.getValue(); // Never remove active streams if (stream.isActive()) { continue; } if (stream.isClosedFinal()) { // This stream went from IDLE to CLOSED and is likely to have // been created by the client as part of the priority tree. candidatesStepThree.add(entry.getKey()); } else if (stream.getChildStreams().size() == 0) { // Closed, no children candidatesStepOne.add(entry.getKey()); } else { // Closed, with children candidatesStepTwo.add(entry.getKey()); } } // Process the step one list for (Integer streamIdToRemove : candidatesStepOne) { // Remove this childless stream Stream removedStream = streams.remove(streamIdToRemove); removedStream.detachFromParent(); toClose--; if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.pruned", connectionId, streamIdToRemove)); } // Did this make the parent childless? AbstractStream parent = removedStream.getParentStream(); while (parent instanceof Stream && !((Stream) parent).isActive() && !((Stream) parent).isClosedFinal() && parent.getChildStreams().size() == 0) { streams.remove(parent.getIdentifier()); parent.detachFromParent(); toClose--; if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.pruned", connectionId, streamIdToRemove)); } // Also need to remove this stream from the p2 list candidatesStepTwo.remove(parent.getIdentifier()); parent = parent.getParentStream(); } } // Process the P2 list for (Integer streamIdToRemove : candidatesStepTwo) { removeStreamFromPriorityTree(streamIdToRemove); toClose--; if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.pruned", connectionId, streamIdToRemove)); } } while (toClose > 0 && candidatesStepThree.size() > 0) { Integer streamIdToRemove = candidatesStepThree.pollLast(); removeStreamFromPriorityTree(streamIdToRemove); toClose--; if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.prunedPriority", connectionId, streamIdToRemove)); } } if (toClose > 0) { log.warn(sm.getString("upgradeHandler.pruneIncomplete", connectionId, Integer.toString(toClose))); } } private void removeStreamFromPriorityTree(Integer streamIdToRemove) { Stream streamToRemove = streams.remove(streamIdToRemove); // Move the removed Stream's children to the removed Stream's // parent. Set children = streamToRemove.getChildStreams(); if (streamToRemove.getChildStreams().size() == 1) { // Shortcut streamToRemove.getChildStreams().iterator().next().rePrioritise( streamToRemove.getParentStream(), streamToRemove.getWeight()); } else { int totalWeight = 0; for (Stream child : children) { totalWeight += child.getWeight(); } for (Stream child : children) { streamToRemove.getChildStreams().iterator().next().rePrioritise( streamToRemove.getParentStream(), streamToRemove.getWeight() * child.getWeight() / totalWeight); } } streamToRemove.detachFromParent(); streamToRemove.getChildStreams().clear(); } void push(Request request, Stream associatedStream) throws IOException { Stream pushStream; // Synchronized since PUSH_PROMISE frames have to be sent in order. Once // the stream has been created we need to ensure that the PUSH_PROMISE // is sent before the next stream is created for a PUSH_PROMISE. synchronized (socketWrapper) { pushStream = createLocalStream(request); writeHeaders(associatedStream, pushStream.getIdAsInt(), request.getMimeHeaders(), false, Constants.DEFAULT_HEADERS_FRAME_SIZE); } pushStream.sentPushPromise(); processStreamOnContainerThread(pushStream); } @Override protected final String getConnectionId() { return connectionId; } @Override protected final int getWeight() { return 0; } private void reduceOverheadCount() { overheadCount.decrementAndGet(); } private void increaseOverheadCount() { overheadCount.addAndGet(getProtocol().getOverheadCountFactor()); } // ----------------------------------------------- Http2Parser.Input methods @Override public boolean fill(boolean block, byte[] data, int offset, int length) throws IOException { int len = length; int pos = offset; boolean nextReadBlock = block; int thisRead = 0; while (len > 0) { thisRead = socketWrapper.read(nextReadBlock, data, pos, len); if (thisRead == 0) { if (nextReadBlock) { // Should never happen throw new IllegalStateException(); } else { return false; } } else if (thisRead == -1) { if (connectionState.get().isNewStreamAllowed()) { throw new EOFException(); } else { return false; } } else { pos += thisRead; len -= thisRead; nextReadBlock = true; } } return true; } @Override public int getMaxFrameSize() { return localSettings.getMaxFrameSize(); } // ---------------------------------------------- Http2Parser.Output methods @Override public HpackDecoder getHpackDecoder() { if (hpackDecoder == null) { hpackDecoder = new HpackDecoder(localSettings.getHeaderTableSize()); } return hpackDecoder; } @Override public ByteBuffer startRequestBodyFrame(int streamId, int payloadSize) throws Http2Exception { reduceOverheadCount(); Stream stream = getStream(streamId, true); stream.checkState(FrameType.DATA); stream.receivedData(payloadSize); return stream.getInputByteBuffer(); } @Override public void endRequestBodyFrame(int streamId) throws Http2Exception { Stream stream = getStream(streamId, true); stream.getInputBuffer().onDataAvailable(); } @Override public void receivedEndOfStream(int streamId) throws ConnectionException { Stream stream = getStream(streamId, connectionState.get().isNewStreamAllowed()); if (stream != null) { stream.receivedEndOfStream(); if (!stream.isActive()) { activeRemoteStreamCount.decrementAndGet(); } } } @Override public void swallowedPadding(int streamId, int paddingLength) throws ConnectionException, IOException { Stream stream = getStream(streamId, true); // +1 is for the payload byte used to define the padding length writeWindowUpdate(stream, paddingLength + 1, false); } @Override public HeaderEmitter headersStart(int streamId, boolean headersEndStream) throws Http2Exception, IOException { // Check the pause state before processing headers since the pause state // determines if a new stream is created or if this stream is ignored. checkPauseState(); reduceOverheadCount(); if (connectionState.get().isNewStreamAllowed()) { Stream stream = getStream(streamId, false); if (stream == null) { stream = createRemoteStream(streamId); } if (streamId < maxActiveRemoteStreamId) { throw new ConnectionException(sm.getString("upgradeHandler.stream.old", Integer.valueOf(streamId), Integer.valueOf(maxActiveRemoteStreamId)), Http2Error.PROTOCOL_ERROR); } stream.checkState(FrameType.HEADERS); stream.receivedStartOfHeaders(headersEndStream); closeIdleStreams(streamId); if (localSettings.getMaxConcurrentStreams() < activeRemoteStreamCount.incrementAndGet()) { activeRemoteStreamCount.decrementAndGet(); throw new StreamException(sm.getString("upgradeHandler.tooManyRemoteStreams", Long.toString(localSettings.getMaxConcurrentStreams())), Http2Error.REFUSED_STREAM, streamId); } return stream; } else { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.noNewStreams", connectionId, Integer.toString(streamId))); } // Stateless so a static can be used to save on GC return HEADER_SINK; } } private void closeIdleStreams(int newMaxActiveRemoteStreamId) throws Http2Exception { for (int i = maxActiveRemoteStreamId + 2; i < newMaxActiveRemoteStreamId; i += 2) { Stream stream = getStream(i, false); if (stream != null) { stream.closeIfIdle(); } } maxActiveRemoteStreamId = newMaxActiveRemoteStreamId; } @Override public void reprioritise(int streamId, int parentStreamId, boolean exclusive, int weight) throws Http2Exception { if (streamId == parentStreamId) { throw new ConnectionException(sm.getString("upgradeHandler.dependency.invalid", getConnectionId(), Integer.valueOf(streamId)), Http2Error.PROTOCOL_ERROR); } increaseOverheadCount(); Stream stream = getStream(streamId, false); if (stream == null) { stream = createRemoteStream(streamId); } stream.checkState(FrameType.PRIORITY); AbstractStream parentStream = getStream(parentStreamId, false); if (parentStream == null) { parentStream = this; } stream.rePrioritise(parentStream, exclusive, weight); } @Override public void headersEnd(int streamId) throws ConnectionException { Stream stream = getStream(streamId, connectionState.get().isNewStreamAllowed()); if (stream != null) { setMaxProcessedStream(streamId); if (stream.isActive()) { if (stream.receivedEndOfHeaders()) { processStreamOnContainerThread(stream); } } } } private void setMaxProcessedStream(int streamId) { if (maxProcessedStreamId < streamId) { maxProcessedStreamId = streamId; } } @Override public void reset(int streamId, long errorCode) throws Http2Exception { Stream stream = getStream(streamId, true); stream.checkState(FrameType.RST); stream.receiveReset(errorCode); } @Override public void setting(Setting setting, long value) throws ConnectionException { increaseOverheadCount(); // Special handling required if (setting == Setting.INITIAL_WINDOW_SIZE) { long oldValue = remoteSettings.getInitialWindowSize(); // Do this first in case new value is invalid remoteSettings.set(setting, value); int diff = (int) (value - oldValue); for (Stream stream : streams.values()) { try { stream.incrementWindowSize(diff); } catch (Http2Exception h2e) { stream.close(new StreamException(sm.getString( "upgradeHandler.windowSizeTooBig", connectionId, stream.getIdentifier()), h2e.getError(), stream.getIdAsInt())); } } } else { remoteSettings.set(setting, value); } } @Override public void settingsEnd(boolean ack) throws IOException { if (ack) { if (!localSettings.ack()) { // Ack was unexpected log.warn(sm.getString( "upgradeHandler.unexpectedAck", connectionId, getIdentifier())); } } else { synchronized (socketWrapper) { socketWrapper.write(true, SETTINGS_ACK, 0, SETTINGS_ACK.length); socketWrapper.flush(true); } } } @Override public void pingReceive(byte[] payload, boolean ack) throws IOException { if (!ack) { increaseOverheadCount(); } pingManager.receivePing(payload, ack); } @Override public void goaway(int lastStreamId, long errorCode, String debugData) { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.goaway.debug", connectionId, Integer.toString(lastStreamId), Long.toHexString(errorCode), debugData)); } close(); } @Override public void incrementWindowSize(int streamId, int increment) throws Http2Exception { if (streamId == 0) { incrementWindowSize(increment); } else { Stream stream = getStream(streamId, true); stream.checkState(FrameType.WINDOW_UPDATE); stream.incrementWindowSize(increment); } } @Override public void swallowed(int streamId, FrameType frameType, int flags, int size) throws IOException { // NO-OP. } protected class PingManager { protected boolean initiateDisabled = false; // 10 seconds protected final long pingIntervalNano = 10000000000L; protected int sequence = 0; protected long lastPingNanoTime = Long.MIN_VALUE; protected Queue inflightPings = new ConcurrentLinkedQueue<>(); protected Queue roundTripTimes = new ConcurrentLinkedQueue<>(); /** * Check to see if a ping was sent recently and, if not, send one. * * @param force Send a ping, even if one was sent recently * * @throws IOException If an I/O issue prevents the ping from being sent */ public void sendPing(boolean force) throws IOException { if (initiateDisabled) { return; } long now = System.nanoTime(); if (force || now - lastPingNanoTime > pingIntervalNano) { lastPingNanoTime = now; byte[] payload = new byte[8]; synchronized (socketWrapper) { int sentSequence = ++sequence; PingRecord pingRecord = new PingRecord(sentSequence, now); inflightPings.add(pingRecord); ByteUtil.set31Bits(payload, 4, sentSequence); socketWrapper.write(true, PING, 0, PING.length); socketWrapper.write(true, payload, 0, payload.length); socketWrapper.flush(true); } } } public void receivePing(byte[] payload, boolean ack) throws IOException { if (ack) { // Extract the sequence from the payload int receivedSequence = ByteUtil.get31Bits(payload, 4); PingRecord pingRecord = inflightPings.poll(); while (pingRecord != null && pingRecord.getSequence() < receivedSequence) { pingRecord = inflightPings.poll(); } if (pingRecord == null) { // Unexpected ACK. Log it. } else { long roundTripTime = System.nanoTime() - pingRecord.getSentNanoTime(); roundTripTimes.add(Long.valueOf(roundTripTime)); while (roundTripTimes.size() > 3) { // Ignore the returned value as we just want to reduce // the queue to 3 entries to use for the rolling average. roundTripTimes.poll(); } if (log.isDebugEnabled()) { log.debug(sm.getString("pingManager.roundTripTime", connectionId, Long.valueOf(roundTripTime))); } } } else { // Client originated ping. Echo it back. synchronized (socketWrapper) { socketWrapper.write(true, PING_ACK, 0, PING_ACK.length); socketWrapper.write(true, payload, 0, payload.length); socketWrapper.flush(true); } } } public long getRoundTripTimeNano() { return (long) roundTripTimes.stream().mapToLong(x -> x.longValue()).average().orElse(0); } } protected static class PingRecord { private final int sequence; private final long sentNanoTime; public PingRecord(int sequence, long sentNanoTime) { this.sequence = sequence; this.sentNanoTime = sentNanoTime; } public int getSequence() { return sequence; } public long getSentNanoTime() { return sentNanoTime; } } private enum ConnectionState { NEW(true), CONNECTED(true), PAUSING(true), PAUSED(false), CLOSED(false); private final boolean newStreamsAllowed; private ConnectionState(boolean newStreamsAllowed) { this.newStreamsAllowed = newStreamsAllowed; } public boolean isNewStreamAllowed() { return newStreamsAllowed; } } protected static interface HeaderFrameBuffers { public void startFrame(); public void endFrame() throws IOException; public void endHeaders() throws IOException; public byte[] getHeader(); public ByteBuffer getPayload(); public void expandPayload(); } private class DefaultHeaderFrameBuffers implements HeaderFrameBuffers { private final byte[] header; private ByteBuffer payload; public DefaultHeaderFrameBuffers(int initialPayloadSize) { header = new byte[9]; payload = ByteBuffer.allocate(initialPayloadSize); } @Override public void startFrame() { // NO-OP } @Override public void endFrame() throws IOException { try { socketWrapper.write(true, header, 0, header.length); socketWrapper.write(true, payload); socketWrapper.flush(true); } catch (IOException ioe) { handleAppInitiatedIOException(ioe); } payload.clear(); } @Override public void endHeaders() { // NO-OP } @Override public byte[] getHeader() { return header; } @Override public ByteBuffer getPayload() { return payload; } @Override public void expandPayload() { payload = ByteBuffer.allocate(payload.capacity() * 2); } } }