View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import javax.net.ssl.SSLSession;
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.SelectionKey;
34  import java.nio.charset.CharacterCodingException;
35  import java.util.Deque;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Queue;
40  import java.util.concurrent.CancellationException;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.ConcurrentLinkedDeque;
43  import java.util.concurrent.ConcurrentLinkedQueue;
44  import java.util.concurrent.atomic.AtomicInteger;
45  
46  import org.apache.hc.core5.concurrent.Cancellable;
47  import org.apache.hc.core5.concurrent.CancellableDependency;
48  import org.apache.hc.core5.http.ConnectionClosedException;
49  import org.apache.hc.core5.http.EndpointDetails;
50  import org.apache.hc.core5.http.Header;
51  import org.apache.hc.core5.http.HttpConnection;
52  import org.apache.hc.core5.http.HttpException;
53  import org.apache.hc.core5.http.HttpStreamResetException;
54  import org.apache.hc.core5.http.HttpVersion;
55  import org.apache.hc.core5.http.ProtocolException;
56  import org.apache.hc.core5.http.ProtocolVersion;
57  import org.apache.hc.core5.http.config.CharCodingConfig;
58  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
59  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
60  import org.apache.hc.core5.http.impl.CharCodingSupport;
61  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
62  import org.apache.hc.core5.http.nio.AsyncPushProducer;
63  import org.apache.hc.core5.http.nio.HandlerFactory;
64  import org.apache.hc.core5.http.nio.command.ExecutableCommand;
65  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
66  import org.apache.hc.core5.http.protocol.HttpCoreContext;
67  import org.apache.hc.core5.http.protocol.HttpProcessor;
68  import org.apache.hc.core5.http2.H2ConnectionException;
69  import org.apache.hc.core5.http2.H2Error;
70  import org.apache.hc.core5.http2.H2StreamResetException;
71  import org.apache.hc.core5.http2.config.H2Config;
72  import org.apache.hc.core5.http2.config.H2Param;
73  import org.apache.hc.core5.http2.config.H2Setting;
74  import org.apache.hc.core5.http2.frame.FrameFactory;
75  import org.apache.hc.core5.http2.frame.FrameFlag;
76  import org.apache.hc.core5.http2.frame.FrameType;
77  import org.apache.hc.core5.http2.frame.RawFrame;
78  import org.apache.hc.core5.http2.frame.StreamIdGenerator;
79  import org.apache.hc.core5.http2.hpack.HPackDecoder;
80  import org.apache.hc.core5.http2.hpack.HPackEncoder;
81  import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
82  import org.apache.hc.core5.http2.nio.AsyncPingHandler;
83  import org.apache.hc.core5.http2.nio.command.PingCommand;
84  import org.apache.hc.core5.io.CloseMode;
85  import org.apache.hc.core5.reactor.Command;
86  import org.apache.hc.core5.reactor.ProtocolIOSession;
87  import org.apache.hc.core5.reactor.ssl.TlsDetails;
88  import org.apache.hc.core5.util.Args;
89  import org.apache.hc.core5.util.ByteArrayBuffer;
90  import org.apache.hc.core5.util.Identifiable;
91  import org.apache.hc.core5.util.Timeout;
92  
93  abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
94  
95      private static final long LINGER_TIME = 1000; // 1 second
96      private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; // 10 MiB
97  
98      enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
99      enum SettingsHandshake { READY, TRANSMITTED, ACKED }
100 
101     private final ProtocolIOSession ioSession;
102     private final FrameFactory frameFactory;
103     private final StreamIdGenerator idGenerator;
104     private final HttpProcessor httpProcessor;
105     private final H2Config localConfig;
106     private final BasicH2TransportMetrics inputMetrics;
107     private final BasicH2TransportMetrics outputMetrics;
108     private final BasicHttpConnectionMetrics connMetrics;
109     private final FrameInputBuffer inputBuffer;
110     private final FrameOutputBuffer outputBuffer;
111     private final Deque<RawFrame> outputQueue;
112     private final HPackEncoder hPackEncoder;
113     private final HPackDecoder hPackDecoder;
114     private final Map<Integer, H2Stream> streamMap;
115     private final Queue<AsyncPingHandler> pingHandlers;
116     private final AtomicInteger connInputWindow;
117     private final AtomicInteger connOutputWindow;
118     private final AtomicInteger outputRequests;
119     private final AtomicInteger lastStreamId;
120     private final H2StreamListener streamListener;
121 
122     private ConnectionHandshake connState = ConnectionHandshake.READY;
123     private SettingsHandshake localSettingState = SettingsHandshake.READY;
124     private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
125 
126     private int initInputWinSize;
127     private int initOutputWinSize;
128     private int lowMark;
129 
130     private volatile H2Config remoteConfig;
131 
132     private Continuation continuation;
133 
134     private int processedRemoteStreamId;
135     private EndpointDetails endpointDetails;
136 
137     AbstractH2StreamMultiplexer(
138             final ProtocolIOSession ioSession,
139             final FrameFactory frameFactory,
140             final StreamIdGenerator idGenerator,
141             final HttpProcessor httpProcessor,
142             final CharCodingConfig charCodingConfig,
143             final H2Config h2Config,
144             final H2StreamListener streamListener) {
145         this.ioSession = Args.notNull(ioSession, "IO session");
146         this.frameFactory = Args.notNull(frameFactory, "Frame factory");
147         this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
148         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
149         this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
150         this.inputMetrics = new BasicH2TransportMetrics();
151         this.outputMetrics = new BasicH2TransportMetrics();
152         this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
153         this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
154         this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
155         this.outputQueue = new ConcurrentLinkedDeque<>();
156         this.pingHandlers = new ConcurrentLinkedQueue<>();
157         this.outputRequests = new AtomicInteger(0);
158         this.lastStreamId = new AtomicInteger(0);
159         this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
160         this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
161         this.streamMap = new ConcurrentHashMap<>();
162         this.remoteConfig = H2Config.INIT;
163         this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
164         this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
165 
166         this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
167         this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
168 
169         this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
170         this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
171         this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
172 
173         this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
174         this.streamListener = streamListener;
175     }
176 
177     @Override
178     public String getId() {
179         return ioSession.getId();
180     }
181 
182     abstract void acceptHeaderFrame() throws H2ConnectionException;
183 
184     abstract void acceptPushRequest() throws H2ConnectionException;
185 
186     abstract void acceptPushFrame() throws H2ConnectionException;
187 
188     abstract H2StreamHandler createRemotelyInitiatedStream(
189             H2StreamChannel channel,
190             HttpProcessor httpProcessor,
191             BasicHttpConnectionMetrics connMetrics,
192             HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
193 
194     abstract H2StreamHandler createLocallyInitiatedStream(
195             ExecutableCommand command,
196             H2StreamChannel channel,
197             HttpProcessor httpProcessor,
198             BasicHttpConnectionMetrics connMetrics) throws IOException;
199 
200     private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
201         for (;;) {
202             final int current = window.get();
203             long newValue = (long) current + delta;
204 
205             //TODO: work-around for what looks like a bug in Ngnix (1.11)
206             // Tolerate if the update window exceeded by one
207             if (newValue == 0x80000000L) {
208                 newValue = Integer.MAX_VALUE;
209             }
210             //TODO: needs to be removed
211 
212             if (Math.abs(newValue) > 0x7fffffffL) {
213                 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
214             }
215             if (window.compareAndSet(current, (int) newValue)) {
216                 return (int) newValue;
217             }
218         }
219     }
220 
221     private int updateInputWindow(
222             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
223         final int newSize = updateWindow(window, delta);
224         if (streamListener != null) {
225             streamListener.onInputFlowControl(this, streamId, delta, newSize);
226         }
227         return newSize;
228     }
229 
230     private int updateOutputWindow(
231             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
232         final int newSize = updateWindow(window, delta);
233         if (streamListener != null) {
234             streamListener.onOutputFlowControl(this, streamId, delta, newSize);
235         }
236         return newSize;
237     }
238 
239     private void commitFrameInternal(final RawFrame frame) throws IOException {
240         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
241             if (streamListener != null) {
242                 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
243             }
244             outputBuffer.write(frame, ioSession);
245         } else {
246             outputQueue.addLast(frame);
247         }
248         ioSession.setEvent(SelectionKey.OP_WRITE);
249     }
250 
251     private void commitFrame(final RawFrame frame) throws IOException {
252         Args.notNull(frame, "Frame");
253         ioSession.getLock().lock();
254         try {
255             commitFrameInternal(frame);
256         } finally {
257             ioSession.getLock().unlock();
258         }
259     }
260 
261     private void commitHeaders(
262             final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
263         if (streamListener != null) {
264             streamListener.onHeaderOutput(this, streamId, headers);
265         }
266         final ByteArrayBufferByteArrayBuffer.html#ByteArrayBuffer">ByteArrayBuffer buf = new ByteArrayBuffer(512);
267         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
268 
269         int off = 0;
270         int remaining = buf.length();
271         boolean continuation = false;
272 
273         while (remaining > 0) {
274             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
275             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
276 
277             remaining -= chunk;
278             off += chunk;
279 
280             final boolean endHeaders = remaining == 0;
281             final RawFrame frame;
282             if (!continuation) {
283                 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
284                 continuation = true;
285             } else {
286                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
287             }
288             commitFrameInternal(frame);
289         }
290     }
291 
292     private void commitPushPromise(
293             final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
294         if (headers == null || headers.isEmpty()) {
295             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
296         }
297         if (streamListener != null) {
298             streamListener.onHeaderOutput(this, streamId, headers);
299         }
300         final ByteArrayBufferByteArrayBuffer.html#ByteArrayBuffer">ByteArrayBuffer buf = new ByteArrayBuffer(512);
301         buf.append((byte)(promisedStreamId >> 24));
302         buf.append((byte)(promisedStreamId >> 16));
303         buf.append((byte)(promisedStreamId >> 8));
304         buf.append((byte)(promisedStreamId));
305 
306         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
307 
308         int off = 0;
309         int remaining = buf.length();
310         boolean continuation = false;
311 
312         while (remaining > 0) {
313             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
314             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
315 
316             remaining -= chunk;
317             off += chunk;
318 
319             final boolean endHeaders = remaining == 0;
320             final RawFrame frame;
321             if (!continuation) {
322                 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
323                 continuation = true;
324             } else {
325                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
326             }
327             commitFrameInternal(frame);
328         }
329     }
330 
331     private void streamDataFrame(
332             final int streamId,
333             final AtomicInteger streamOutputWindow,
334             final ByteBuffer payload,
335             final int chunk) throws IOException {
336         final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
337         if (streamListener != null) {
338             streamListener.onFrameOutput(this, streamId, dataFrame);
339         }
340         updateOutputWindow(0, connOutputWindow, -chunk);
341         updateOutputWindow(streamId, streamOutputWindow, -chunk);
342         outputBuffer.write(dataFrame, ioSession);
343     }
344 
345     private int streamData(
346             final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
347         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
348             final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
349             if (capacity <= 0) {
350                 return 0;
351             }
352             final int maxPayloadSize = Math.min(capacity, remoteConfig.getMaxFrameSize());
353             final int chunk;
354             if (payload.remaining() <= maxPayloadSize) {
355                 chunk = payload.remaining();
356                 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
357             } else {
358                 chunk = maxPayloadSize;
359                 final int originalLimit = payload.limit();
360                 try {
361                     payload.limit(payload.position() + chunk);
362                     streamDataFrame(streamId, streamOutputWindow, payload, chunk);
363                 } finally {
364                     payload.limit(originalLimit);
365                 }
366             }
367             payload.position(payload.position() + chunk);
368             ioSession.setEvent(SelectionKey.OP_WRITE);
369             return chunk;
370         }
371         return 0;
372     }
373 
374     private void incrementInputCapacity(
375             final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
376         if (inputCapacity > 0) {
377             final int streamWinSize = inputWindow.get();
378             final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
379             final int chunk = Math.min(inputCapacity, remainingCapacity);
380             if (chunk != 0) {
381                 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
382                 commitFrame(windowUpdateFrame);
383                 updateInputWindow(streamId, inputWindow, chunk);
384             }
385         }
386     }
387 
388     private void requestSessionOutput() {
389         outputRequests.incrementAndGet();
390         ioSession.setEvent(SelectionKey.OP_WRITE);
391     }
392 
393     private void updateLastStreamId(final int streamId) {
394         final int currentId = lastStreamId.get();
395         if (streamId > currentId) {
396             lastStreamId.compareAndSet(currentId, streamId);
397         }
398     }
399 
400     private int generateStreamId() {
401         for (;;) {
402             final int currentId = lastStreamId.get();
403             final int newStreamId = idGenerator.generate(currentId);
404             if (lastStreamId.compareAndSet(currentId, newStreamId)) {
405                 return newStreamId;
406             }
407         }
408     }
409 
410     public final void onConnect() throws HttpException, IOException {
411         connState = ConnectionHandshake.ACTIVE;
412         final RawFrame settingsFrame = frameFactory.createSettings(
413                 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
414                 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
415                 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
416                 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
417                 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
418                 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
419 
420         commitFrame(settingsFrame);
421         localSettingState = SettingsHandshake.TRANSMITTED;
422         maximizeConnWindow(connInputWindow.get());
423 
424         if (streamListener != null) {
425             final int initInputWindow = connInputWindow.get();
426             streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
427             final int initOutputWindow = connOutputWindow.get();
428             streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
429         }
430     }
431 
432     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
433         if (connState == ConnectionHandshake.SHUTDOWN) {
434             ioSession.clearEvent(SelectionKey.OP_READ);
435         } else {
436             if (src != null) {
437                 inputBuffer.put(src);
438             }
439             RawFrame frame;
440             while ((frame = inputBuffer.read(ioSession)) != null) {
441                 if (streamListener != null) {
442                     streamListener.onFrameInput(this, frame.getStreamId(), frame);
443                 }
444                 consumeFrame(frame);
445             }
446         }
447     }
448 
449     public final void onOutput() throws HttpException, IOException {
450         ioSession.getLock().lock();
451         try {
452             if (!outputBuffer.isEmpty()) {
453                 outputBuffer.flush(ioSession);
454             }
455             while (outputBuffer.isEmpty()) {
456                 final RawFrame frame = outputQueue.poll();
457                 if (frame != null) {
458                     if (streamListener != null) {
459                         streamListener.onFrameOutput(this, frame.getStreamId(), frame);
460                     }
461                     outputBuffer.write(frame, ioSession);
462                 } else {
463                     break;
464                 }
465             }
466         } finally {
467             ioSession.getLock().unlock();
468         }
469 
470         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
471 
472             if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
473                 produceOutput();
474             }
475             final int pendingOutputRequests = outputRequests.get();
476             boolean outputPending = false;
477             if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
478                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
479                     final Map.Entry<Integer, H2Stream> entry = it.next();
480                     final H2Stream stream = entry.getValue();
481                     if (!stream.isLocalClosed()
482                             && stream.getOutputWindow().get() > 0
483                             && stream.isOutputReady()) {
484                         outputPending = true;
485                         break;
486                     }
487                 }
488             }
489             ioSession.getLock().lock();
490             try {
491                 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
492                         && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
493                     ioSession.clearEvent(SelectionKey.OP_WRITE);
494                 } else {
495                     outputRequests.addAndGet(-pendingOutputRequests);
496                 }
497             } finally {
498                 ioSession.getLock().unlock();
499             }
500         }
501 
502         if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
503             processPendingCommands();
504         }
505         if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
506             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
507                 final Map.Entry<Integer, H2Stream> entry = it.next();
508                 final H2Stream stream = entry.getValue();
509                 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
510                     stream.releaseResources();
511                     it.remove();
512                 }
513             }
514             if (streamMap.isEmpty()) {
515                 connState = ConnectionHandshake.SHUTDOWN;
516             }
517         }
518         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
519             ioSession.getLock().lock();
520             try {
521                 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
522                     ioSession.close();
523                 }
524             } finally {
525                 ioSession.getLock().unlock();
526             }
527         }
528     }
529 
530     public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
531         connState = ConnectionHandshake.SHUTDOWN;
532 
533         final RawFrame goAway;
534         if (localSettingState != SettingsHandshake.ACKED) {
535             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
536                             "Setting timeout (" + timeout + ")");
537         } else {
538             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
539                             "Timeout due to inactivity (" + timeout + ")");
540         }
541         commitFrame(goAway);
542         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
543             final Map.Entry<Integer, H2Stream> entry = it.next();
544             final H2Stream stream = entry.getValue();
545             stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
546         }
547         streamMap.clear();
548     }
549 
550     public final void onDisconnect() {
551         for (;;) {
552             final AsyncPingHandler pingHandler = pingHandlers.poll();
553             if (pingHandler != null) {
554                 pingHandler.cancel();
555             } else {
556                 break;
557             }
558         }
559         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
560             final Map.Entry<Integer, H2Stream> entry = it.next();
561             final H2Stream stream = entry.getValue();
562             stream.cancel();
563         }
564         for (;;) {
565             final Command command = ioSession.poll();
566             if (command != null) {
567                 if (command instanceof ExecutableCommand) {
568                     ((ExecutableCommand) command).failed(new ConnectionClosedException());
569                 } else {
570                     command.cancel();
571                 }
572             } else {
573                 break;
574             }
575         }
576     }
577 
578     private void processPendingCommands() throws IOException, HttpException {
579         while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
580             final Command command = ioSession.poll();
581             if (command == null) {
582                 break;
583             }
584             if (command instanceof ShutdownCommand) {
585                 final ShutdownCommand../org/apache/hc/core5/http/nio/command/ShutdownCommand.html#ShutdownCommand">ShutdownCommand shutdownCommand = (ShutdownCommand) command;
586                 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
587                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
588                         final Map.Entry<Integer, H2Stream> entry = it.next();
589                         final H2Stream stream = entry.getValue();
590                         stream.cancel();
591                     }
592                     streamMap.clear();
593                     connState = ConnectionHandshake.SHUTDOWN;
594                 } else {
595                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
596                         final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
597                         commitFrame(goAway);
598                         connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
599                     }
600                 }
601                 break;
602             } else if (command instanceof PingCommand) {
603                 final PingCommand./../../../org/apache/hc/core5/http2/nio/command/PingCommand.html#PingCommand">PingCommand pingCommand = (PingCommand) command;
604                 final AsyncPingHandler handler = pingCommand.getHandler();
605                 pingHandlers.add(handler);
606                 final RawFrame ping = frameFactory.createPing(handler.getData());
607                 commitFrame(ping);
608             } else if (command instanceof ExecutableCommand) {
609                 final int streamId = generateStreamId();
610                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
611                         streamId, true, initInputWinSize, initOutputWinSize);
612                 final ExecutableCommandrg/apache/hc/core5/http/nio/command/ExecutableCommand.html#ExecutableCommand">ExecutableCommand executableCommand = (ExecutableCommand) command;
613                 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
614                         executableCommand, channel, httpProcessor, connMetrics);
615 
616                 final H2Stream stream = new H2Stream(channel, streamHandler, false);
617                 streamMap.put(streamId, stream);
618 
619                 if (streamListener != null) {
620                     final int initInputWindow = stream.getInputWindow().get();
621                     streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
622                     final int initOutputWindow = stream.getOutputWindow().get();
623                     streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
624                 }
625 
626                 if (stream.isOutputReady()) {
627                     stream.produceOutput();
628                 }
629                 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
630                 if (cancellableDependency != null) {
631                     cancellableDependency.setDependency(new Cancellable() {
632 
633                         @Override
634                         public boolean cancel() {
635                             return stream.abort();
636                         }
637 
638                     });
639                 }
640                 if (!outputQueue.isEmpty()) {
641                     return;
642                 }
643             }
644         }
645     }
646 
647     public final void onException(final Exception cause) {
648         try {
649             for (;;) {
650                 final AsyncPingHandler pingHandler = pingHandlers.poll();
651                 if (pingHandler != null) {
652                     pingHandler.failed(cause);
653                 } else {
654                     break;
655                 }
656             }
657             for (;;) {
658                 final Command command = ioSession.poll();
659                 if (command != null) {
660                     if (command instanceof ExecutableCommand) {
661                         ((ExecutableCommand) command).failed(new ConnectionClosedException());
662                     } else {
663                         command.cancel();
664                     }
665                 } else {
666                     break;
667                 }
668             }
669             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
670                 final Map.Entry<Integer, H2Stream> entry = it.next();
671                 final H2Stream stream = entry.getValue();
672                 stream.reset(cause);
673             }
674             streamMap.clear();
675             if (!(cause instanceof ConnectionClosedException)) {
676                 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
677                     final H2Error errorCode;
678                     if (cause instanceof H2ConnectionException) {
679                         errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
680                     } else if (cause instanceof ProtocolException){
681                         errorCode = H2Error.PROTOCOL_ERROR;
682                     } else {
683                         errorCode = H2Error.INTERNAL_ERROR;
684                     }
685                     final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
686                     commitFrame(goAway);
687                 }
688             }
689         } catch (final IOException ignore) {
690         } finally {
691             connState = ConnectionHandshake.SHUTDOWN;
692             final CloseMode closeMode;
693             if (cause instanceof ConnectionClosedException) {
694                 closeMode = CloseMode.GRACEFUL;
695             } else if (cause instanceof IOException) {
696                 closeMode = CloseMode.IMMEDIATE;
697             } else {
698                 closeMode = CloseMode.GRACEFUL;
699             }
700             ioSession.close(closeMode);
701         }
702     }
703 
704     private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
705         if (streamId == 0) {
706             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
707         }
708         final H2Stream stream = streamMap.get(streamId);
709         if (stream == null) {
710             if (streamId <= lastStreamId.get()) {
711                 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
712             } else {
713                 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
714             }
715         }
716         return stream;
717     }
718 
719     private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
720         final FrameType frameType = FrameType.valueOf(frame.getType());
721         final int streamId = frame.getStreamId();
722         if (continuation != null && frameType != FrameType.CONTINUATION) {
723             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
724         }
725         if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) >= 0) {
726             if (streamId > processedRemoteStreamId && !idGenerator.isSameSide(streamId)) {
727                 // ignore the frame
728                 return;
729             }
730         }
731         switch (frameType) {
732             case DATA: {
733                 final H2Stream stream = getValidStream(streamId);
734                 try {
735                     consumeDataFrame(frame, stream);
736                 } catch (final H2StreamResetException ex) {
737                     stream.localReset(ex);
738                 } catch (final HttpStreamResetException ex) {
739                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
740                 }
741 
742                 if (stream.isTerminated()) {
743                     streamMap.remove(streamId);
744                     stream.releaseResources();
745                 }
746             }
747             break;
748             case HEADERS: {
749                 if (streamId == 0) {
750                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
751                 }
752                 H2Stream stream = streamMap.get(streamId);
753                 if (stream == null) {
754                     acceptHeaderFrame();
755 
756                     updateLastStreamId(streamId);
757 
758                     final H2StreamChannelImpl channel = new H2StreamChannelImpl(
759                             streamId, false, initInputWinSize, initOutputWinSize);
760                     final H2StreamHandler streamHandler = createRemotelyInitiatedStream(
761                             channel, httpProcessor, connMetrics, null);
762                     stream = new H2Stream(channel, streamHandler, true);
763                     if (stream.isOutputReady()) {
764                         stream.produceOutput();
765                     }
766                     streamMap.put(streamId, stream);
767                 }
768 
769                 try {
770                     consumeHeaderFrame(frame, stream);
771 
772                     if (stream.isOutputReady()) {
773                         stream.produceOutput();
774                     }
775                 } catch (final H2StreamResetException ex) {
776                     stream.localReset(ex);
777                 } catch (final HttpStreamResetException ex) {
778                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
779                 } catch (final HttpException ex) {
780                     stream.handle(ex);
781                 }
782 
783                 if (stream.isTerminated()) {
784                     streamMap.remove(streamId);
785                     stream.releaseResources();
786                 }
787             }
788             break;
789             case CONTINUATION: {
790                 if (continuation == null) {
791                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
792                 }
793                 if (streamId != continuation.streamId) {
794                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
795                 }
796 
797                 final H2Stream stream = getValidStream(streamId);
798                 try {
799 
800                     consumeContinuationFrame(frame, stream);
801                 } catch (final H2StreamResetException ex) {
802                     stream.localReset(ex);
803                 } catch (final HttpStreamResetException ex) {
804                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
805                 }
806 
807                 if (stream.isTerminated()) {
808                     streamMap.remove(streamId);
809                     stream.releaseResources();
810                 }
811             }
812             break;
813             case WINDOW_UPDATE: {
814                 final ByteBuffer payload = frame.getPayload();
815                 if (payload == null || payload.remaining() != 4) {
816                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
817                 }
818                 final int delta = payload.getInt();
819                 if (delta <= 0) {
820                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
821                 }
822                 if (streamId == 0) {
823                     try {
824                         updateOutputWindow(0, connOutputWindow, delta);
825                     } catch (final ArithmeticException ex) {
826                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
827                     }
828                 } else {
829                     final H2Stream stream = streamMap.get(streamId);
830                     if (stream != null) {
831                         try {
832                             updateOutputWindow(streamId, stream.getOutputWindow(), delta);
833                         } catch (final ArithmeticException ex) {
834                             throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
835                         }
836                     }
837                 }
838                 ioSession.setEvent(SelectionKey.OP_WRITE);
839             }
840             break;
841             case RST_STREAM: {
842                 if (streamId == 0) {
843                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
844                 }
845                 final H2Stream stream = streamMap.get(streamId);
846                 if (stream == null) {
847                     if (streamId > lastStreamId.get()) {
848                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
849                     }
850                 } else {
851                     final ByteBuffer payload = frame.getPayload();
852                     if (payload == null || payload.remaining() != 4) {
853                         throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
854                     }
855                     final int errorCode = payload.getInt();
856                     stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
857                     streamMap.remove(streamId);
858                     stream.releaseResources();
859                 }
860             }
861             break;
862             case PING: {
863                 if (streamId != 0) {
864                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
865                 }
866                 final ByteBuffer ping = frame.getPayloadContent();
867                 if (ping == null || ping.remaining() != 8) {
868                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
869                 }
870                 if (frame.isFlagSet(FrameFlag.ACK)) {
871                     final AsyncPingHandler pingHandler = pingHandlers.poll();
872                     if (pingHandler != null) {
873                         pingHandler.consumeResponse(ping);
874                     }
875                 } else {
876                     final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
877                     pong.put(ping);
878                     pong.flip();
879                     final RawFrame response = frameFactory.createPingAck(pong);
880                     commitFrame(response);
881                 }
882             }
883             break;
884             case SETTINGS: {
885                 if (streamId != 0) {
886                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
887                 }
888                 if (frame.isFlagSet(FrameFlag.ACK)) {
889                     if (localSettingState == SettingsHandshake.TRANSMITTED) {
890                         localSettingState = SettingsHandshake.ACKED;
891                         ioSession.setEvent(SelectionKey.OP_WRITE);
892                         applyLocalSettings();
893                     }
894                 } else {
895                     final ByteBuffer payload = frame.getPayload();
896                     if (payload != null) {
897                         if ((payload.remaining() % 6) != 0) {
898                             throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
899                         }
900                         consumeSettingsFrame(payload);
901                         remoteSettingState = SettingsHandshake.TRANSMITTED;
902                     }
903                     // Send ACK
904                     final RawFrame response = frameFactory.createSettingsAck();
905                     commitFrame(response);
906                     remoteSettingState = SettingsHandshake.ACKED;
907                 }
908             }
909             break;
910             case PRIORITY:
911                 // Stream priority not supported
912                 break;
913             case PUSH_PROMISE: {
914                 acceptPushFrame();
915 
916                 if (!localConfig.isPushEnabled()) {
917                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
918                 }
919 
920                 final H2Stream stream = getValidStream(streamId);
921                 if (stream.isRemoteClosed()) {
922                     stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
923                     break;
924                 }
925 
926                 final ByteBuffer payload = frame.getPayloadContent();
927                 if (payload == null || payload.remaining() < 4) {
928                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
929                 }
930                 final int promisedStreamId = payload.getInt();
931                 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
932                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
933                 }
934                 if (streamMap.get(promisedStreamId) != null) {
935                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
936                 }
937 
938                 updateLastStreamId(promisedStreamId);
939 
940                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
941                         promisedStreamId, false, initInputWinSize, initOutputWinSize);
942                 final H2StreamHandler streamHandler = createRemotelyInitiatedStream(
943                         channel, httpProcessor, connMetrics, stream.getPushHandlerFactory());
944                 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
945                 streamMap.put(promisedStreamId, promisedStream);
946 
947                 try {
948                     consumePushPromiseFrame(frame, payload, promisedStream);
949                 } catch (final H2StreamResetException ex) {
950                     promisedStream.localReset(ex);
951                 } catch (final HttpStreamResetException ex) {
952                     promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
953                 }
954             }
955             break;
956             case GOAWAY: {
957                 if (streamId != 0) {
958                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
959                 }
960                 final ByteBuffer payload = frame.getPayload();
961                 if (payload == null || payload.remaining() < 8) {
962                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
963                 }
964                 final int processedLocalStreamId = payload.getInt();
965                 final int errorCode = payload.getInt();
966                 if (errorCode == H2Error.NO_ERROR.getCode()) {
967                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
968                         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
969                             final Map.Entry<Integer, H2Stream> entry = it.next();
970                             final int activeStreamId = entry.getKey();
971                             if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
972                                 final H2Stream stream = entry.getValue();
973                                 stream.cancel();
974                                 it.remove();
975                             }
976                         }
977                     }
978                     connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
979                 } else {
980                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
981                         final Map.Entry<Integer, H2Stream> entry = it.next();
982                         final H2Stream stream = entry.getValue();
983                         stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
984                     }
985                     streamMap.clear();
986                     connState = ConnectionHandshake.SHUTDOWN;
987                 }
988             }
989             ioSession.setEvent(SelectionKey.OP_WRITE);
990             break;
991         }
992     }
993 
994     private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
995         final int streamId = stream.getId();
996         final ByteBuffer payload = frame.getPayloadContent();
997         if (payload != null) {
998             final int frameLength = frame.getLength();
999             final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1000             if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1001                 stream.produceInputCapacityUpdate();
1002             }
1003             final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1004             if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1005                 maximizeConnWindow(connWinSize);
1006             }
1007         }
1008         if (stream.isRemoteClosed()) {
1009             throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1010         }
1011         if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1012             stream.setRemoteEndStream();
1013         }
1014         if (stream.isLocalReset()) {
1015             return;
1016         }
1017         stream.consumeData(payload);
1018     }
1019 
1020     private void maximizeConnWindow(final int connWinSize) throws IOException {
1021         final int delta = Integer.MAX_VALUE - connWinSize;
1022         if (delta > 0) {
1023             final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1024             commitFrame(windowUpdateFrame);
1025             updateInputWindow(0, connInputWindow, delta);
1026         }
1027     }
1028 
1029     private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1030         final int promisedStreamId = promisedStream.getId();
1031         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1032             continuation = new Continuation(promisedStreamId, frame.getType(), true);
1033         }
1034         if (continuation == null) {
1035             final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1036             if (promisedStreamId > processedRemoteStreamId) {
1037                 processedRemoteStreamId = promisedStreamId;
1038             }
1039             if (streamListener != null) {
1040                 streamListener.onHeaderInput(this, promisedStreamId, headers);
1041             }
1042             if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1043                 throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused");
1044             }
1045             promisedStream.consumePromise(headers);
1046         } else {
1047             continuation.copyPayload(payload);
1048         }
1049     }
1050 
1051     List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1052         return hPackDecoder.decodeHeaders(payload);
1053     }
1054 
1055     private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1056         final int streamId = stream.getId();
1057         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1058             continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1059         }
1060         final ByteBuffer payload = frame.getPayloadContent();
1061         if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1062             // Priority not supported
1063             payload.getInt();
1064             payload.get();
1065         }
1066         if (continuation == null) {
1067             final List<Header> headers = decodeHeaders(payload);
1068             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1069                 processedRemoteStreamId = streamId;
1070             }
1071             if (streamListener != null) {
1072                 streamListener.onHeaderInput(this, streamId, headers);
1073             }
1074             if (stream.isRemoteClosed()) {
1075                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1076             }
1077             if (stream.isLocalReset()) {
1078                 return;
1079             }
1080             if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1081                 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
1082             }
1083             if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1084                 stream.setRemoteEndStream();
1085             }
1086             stream.consumeHeader(headers);
1087         } else {
1088             continuation.copyPayload(payload);
1089         }
1090     }
1091 
1092     private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1093         final int streamId = frame.getStreamId();
1094         final ByteBuffer payload = frame.getPayload();
1095         continuation.copyPayload(payload);
1096         if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1097             final List<Header> headers = decodeHeaders(continuation.getContent());
1098             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1099                 processedRemoteStreamId = streamId;
1100             }
1101             if (streamListener != null) {
1102                 streamListener.onHeaderInput(this, streamId, headers);
1103             }
1104             if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1105                 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
1106             }
1107             if (stream.isRemoteClosed()) {
1108                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1109             }
1110             if (stream.isLocalReset()) {
1111                 return;
1112             }
1113             if (continuation.endStream) {
1114                 stream.setRemoteEndStream();
1115             }
1116             if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1117                 stream.consumePromise(headers);
1118             } else {
1119                 stream.consumeHeader(headers);
1120             }
1121             continuation = null;
1122         }
1123     }
1124 
1125     private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1126         final H2Config.Builder configBuilder = H2Config.initial();
1127         while (payload.hasRemaining()) {
1128             final int code = payload.getShort();
1129             final int value = payload.getInt();
1130             final H2Param param = H2Param.valueOf(code);
1131             if (param != null) {
1132                 switch (param) {
1133                     case HEADER_TABLE_SIZE:
1134                         try {
1135                             configBuilder.setHeaderTableSize(value);
1136                         } catch (final IllegalArgumentException ex) {
1137                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1138                         }
1139                         break;
1140                     case MAX_CONCURRENT_STREAMS:
1141                         try {
1142                             configBuilder.setMaxConcurrentStreams(value);
1143                         } catch (final IllegalArgumentException ex) {
1144                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1145                         }
1146                         break;
1147                     case ENABLE_PUSH:
1148                         configBuilder.setPushEnabled(value == 1);
1149                         break;
1150                     case INITIAL_WINDOW_SIZE:
1151                         try {
1152                             configBuilder.setInitialWindowSize(value);
1153                         } catch (final IllegalArgumentException ex) {
1154                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1155                         }
1156                         break;
1157                     case MAX_FRAME_SIZE:
1158                         try {
1159                             configBuilder.setMaxFrameSize(value);
1160                         } catch (final IllegalArgumentException ex) {
1161                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1162                         }
1163                         break;
1164                     case MAX_HEADER_LIST_SIZE:
1165                         try {
1166                             configBuilder.setMaxHeaderListSize(value);
1167                         } catch (final IllegalArgumentException ex) {
1168                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1169                         }
1170                         break;
1171                 }
1172             }
1173         }
1174         applyRemoteSettings(configBuilder.build());
1175     }
1176 
1177     private void produceOutput() throws HttpException, IOException {
1178         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1179             final Map.Entry<Integer, H2Stream> entry = it.next();
1180             final H2Stream stream = entry.getValue();
1181             if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1182                 stream.produceOutput();
1183             }
1184             if (stream.isTerminated()) {
1185                 it.remove();
1186                 stream.releaseResources();
1187             }
1188             if (!outputQueue.isEmpty()) {
1189                 break;
1190             }
1191         }
1192     }
1193 
1194     private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1195         remoteConfig = config;
1196 
1197         hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1198         final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1199         initOutputWinSize = remoteConfig.getInitialWindowSize();
1200 
1201         if (delta != 0) {
1202             updateOutputWindow(0, connOutputWindow, delta);
1203             if (!streamMap.isEmpty()) {
1204                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1205                     final Map.Entry<Integer, H2Stream> entry = it.next();
1206                     final H2Stream stream = entry.getValue();
1207                     try {
1208                         updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1209                     } catch (final ArithmeticException ex) {
1210                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1211                     }
1212                 }
1213             }
1214         }
1215     }
1216 
1217     private void applyLocalSettings() throws H2ConnectionException {
1218         hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1219         hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1220 
1221         final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1222         initInputWinSize = localConfig.getInitialWindowSize();
1223 
1224         if (delta != 0 && !streamMap.isEmpty()) {
1225             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1226                 final Map.Entry<Integer, H2Stream> entry = it.next();
1227                 final H2Stream stream = entry.getValue();
1228                 try {
1229                     updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1230                 } catch (final ArithmeticException ex) {
1231                     throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1232                 }
1233             }
1234         }
1235         lowMark = initInputWinSize / 2;
1236     }
1237 
1238     @Override
1239     public void close() throws IOException {
1240         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1241     }
1242 
1243     @Override
1244     public void close(final CloseMode closeMode) {
1245         ioSession.close(closeMode);
1246     }
1247 
1248     @Override
1249     public boolean isOpen() {
1250         return connState == ConnectionHandshake.ACTIVE;
1251     }
1252 
1253     @Override
1254     public void setSocketTimeout(final Timeout timeout) {
1255         ioSession.setSocketTimeout(timeout);
1256     }
1257 
1258     @Override
1259     public SSLSession getSSLSession() {
1260         final TlsDetails tlsDetails = ioSession.getTlsDetails();
1261         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1262     }
1263 
1264     @Override
1265     public EndpointDetails getEndpointDetails() {
1266         if (endpointDetails == null) {
1267             endpointDetails = new BasicEndpointDetails(
1268                     ioSession.getRemoteAddress(),
1269                     ioSession.getLocalAddress(),
1270                     connMetrics,
1271                     ioSession.getSocketTimeout());
1272         }
1273         return endpointDetails;
1274     }
1275 
1276     @Override
1277     public Timeout getSocketTimeout() {
1278         return ioSession.getSocketTimeout();
1279     }
1280 
1281     @Override
1282     public ProtocolVersion getProtocolVersion() {
1283         return HttpVersion.HTTP_2;
1284     }
1285 
1286     @Override
1287     public SocketAddress getRemoteAddress() {
1288         return ioSession.getRemoteAddress();
1289     }
1290 
1291     @Override
1292     public SocketAddress getLocalAddress() {
1293         return ioSession.getLocalAddress();
1294     }
1295 
1296     void appendState(final StringBuilder buf) {
1297         buf.append("connState=").append(connState)
1298                 .append(", connInputWindow=").append(connInputWindow)
1299                 .append(", connOutputWindow=").append(connOutputWindow)
1300                 .append(", outputQueue=").append(outputQueue.size())
1301                 .append(", streamMap=").append(streamMap.size())
1302                 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1303     }
1304 
1305     private static class Continuation {
1306 
1307         final int streamId;
1308         final int type;
1309         final boolean endStream;
1310         final ByteArrayBuffer headerBuffer;
1311 
1312         private Continuation(final int streamId, final int type, final boolean endStream) {
1313             this.streamId = streamId;
1314             this.type = type;
1315             this.endStream = endStream;
1316             this.headerBuffer = new ByteArrayBuffer(1024);
1317         }
1318 
1319         void copyPayload(final ByteBuffer payload) {
1320             if (payload == null) {
1321                 return;
1322             }
1323             headerBuffer.ensureCapacity(payload.remaining());
1324             payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1325         }
1326 
1327         ByteBuffer getContent() {
1328             return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1329         }
1330 
1331     }
1332 
1333     private class H2StreamChannelImpl implements H2StreamChannel {
1334 
1335         private final int id;
1336         private final AtomicInteger inputWindow;
1337         private final AtomicInteger outputWindow;
1338 
1339         private volatile boolean idle;
1340         private volatile boolean remoteEndStream;
1341         private volatile boolean localEndStream;
1342 
1343         private volatile long deadline;
1344 
1345         H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1346             this.id = id;
1347             this.idle = idle;
1348             this.inputWindow = new AtomicInteger(initialInputWindowSize);
1349             this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1350         }
1351 
1352         int getId() {
1353             return id;
1354         }
1355 
1356         AtomicInteger getOutputWindow() {
1357             return outputWindow;
1358         }
1359 
1360         AtomicInteger getInputWindow() {
1361             return inputWindow;
1362         }
1363 
1364         @Override
1365         public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1366             ioSession.getLock().lock();
1367             try {
1368                 if (headers == null || headers.isEmpty()) {
1369                     throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1370                 }
1371                 if (localEndStream) {
1372                     return;
1373                 }
1374                 idle = false;
1375                 commitHeaders(id, headers, endStream);
1376                 if (endStream) {
1377                     localEndStream = true;
1378                 }
1379             } finally {
1380                 ioSession.getLock().unlock();
1381             }
1382         }
1383 
1384         @Override
1385         public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1386             acceptPushRequest();
1387             final int promisedStreamId = generateStreamId();
1388             final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1389                     promisedStreamId,
1390                     true,
1391                     localConfig.getInitialWindowSize(),
1392                     remoteConfig.getInitialWindowSize());
1393             final HttpCoreContext context = HttpCoreContext.create();
1394             context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1395             context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1396             final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1397                     channel, httpProcessor, connMetrics, pushProducer, context);
1398             final H2Stream stream = new H2Stream(channel, streamHandler, false);
1399             streamMap.put(promisedStreamId, stream);
1400 
1401             ioSession.getLock().lock();
1402             try {
1403                 if (localEndStream) {
1404                     stream.releaseResources();
1405                     return;
1406                 }
1407                 commitPushPromise(id, promisedStreamId, headers);
1408                 idle = false;
1409             } finally {
1410                 ioSession.getLock().unlock();
1411             }
1412         }
1413 
1414         @Override
1415         public void update(final int increment) throws IOException {
1416             if (remoteEndStream) {
1417                 return;
1418             }
1419             incrementInputCapacity(0, connInputWindow, increment);
1420             incrementInputCapacity(id, inputWindow, increment);
1421         }
1422 
1423         @Override
1424         public int write(final ByteBuffer payload) throws IOException {
1425             ioSession.getLock().lock();
1426             try {
1427                 if (localEndStream) {
1428                     return 0;
1429                 }
1430                 return streamData(id, outputWindow, payload);
1431             } finally {
1432                 ioSession.getLock().unlock();
1433             }
1434         }
1435 
1436         @Override
1437         public void endStream(final List<? extends Header> trailers) throws IOException {
1438             ioSession.getLock().lock();
1439             try {
1440                 if (localEndStream) {
1441                     return;
1442                 }
1443                 localEndStream = true;
1444                 if (trailers != null && !trailers.isEmpty()) {
1445                     commitHeaders(id, trailers, true);
1446                 } else {
1447                     final RawFrame frame = frameFactory.createData(id, null, true);
1448                     commitFrameInternal(frame);
1449                 }
1450             } finally {
1451                 ioSession.getLock().unlock();
1452             }
1453         }
1454 
1455         @Override
1456         public void endStream() throws IOException {
1457             endStream(null);
1458         }
1459 
1460         @Override
1461         public void requestOutput() {
1462             requestSessionOutput();
1463         }
1464 
1465         boolean isRemoteClosed() {
1466             return remoteEndStream;
1467         }
1468 
1469         void setRemoteEndStream() {
1470             remoteEndStream = true;
1471         }
1472 
1473         boolean isLocalClosed() {
1474             return localEndStream;
1475         }
1476 
1477         void setLocalEndStream() {
1478             localEndStream = true;
1479         }
1480 
1481         boolean isLocalReset() {
1482             return deadline > 0;
1483         }
1484 
1485         boolean isResetDeadline() {
1486             final long l = deadline;
1487             return l > 0 && l < System.currentTimeMillis();
1488         }
1489 
1490         boolean localReset(final int code) throws IOException {
1491             ioSession.getLock().lock();
1492             try {
1493                 if (localEndStream) {
1494                     return false;
1495                 }
1496                 localEndStream = true;
1497                 deadline = System.currentTimeMillis() + LINGER_TIME;
1498                 if (!idle) {
1499                     final RawFrame resetStream = frameFactory.createResetStream(id, code);
1500                     commitFrameInternal(resetStream);
1501                     return true;
1502                 }
1503                 return false;
1504             } finally {
1505                 ioSession.getLock().unlock();
1506             }
1507         }
1508 
1509         boolean localReset(final H2Error error) throws IOException {
1510             return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1511         }
1512 
1513         @Override
1514         public boolean cancel() {
1515             try {
1516                 return localReset(H2Error.CANCEL);
1517             } catch (final IOException ignore) {
1518                 return false;
1519             }
1520         }
1521 
1522         void appendState(final StringBuilder buf) {
1523             buf.append("id=").append(id)
1524                     .append(", connState=").append(connState)
1525                     .append(", inputWindow=").append(inputWindow)
1526                     .append(", outputWindow=").append(outputWindow)
1527                     .append(", localEndStream=").append(localEndStream)
1528                     .append(", idle=").append(idle);
1529         }
1530 
1531         @Override
1532         public String toString() {
1533             final StringBuilder buf = new StringBuilder();
1534             buf.append("[");
1535             appendState(buf);
1536             buf.append("]");
1537             return buf.toString();
1538         }
1539 
1540     }
1541 
1542     static class H2Stream {
1543 
1544         private final H2StreamChannelImpl channel;
1545         private final H2StreamHandler handler;
1546         private final boolean remoteInitiated;
1547 
1548         private H2Stream(
1549                 final H2StreamChannelImpl channel,
1550                 final H2StreamHandler handler,
1551                 final boolean remoteInitiated) {
1552             this.channel = channel;
1553             this.handler = handler;
1554             this.remoteInitiated = remoteInitiated;
1555         }
1556 
1557         int getId() {
1558             return channel.getId();
1559         }
1560 
1561         boolean isRemoteInitiated() {
1562             return remoteInitiated;
1563         }
1564 
1565         AtomicInteger getOutputWindow() {
1566             return channel.getOutputWindow();
1567         }
1568 
1569         AtomicInteger getInputWindow() {
1570             return channel.getInputWindow();
1571         }
1572 
1573         boolean isTerminated() {
1574             return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1575         }
1576 
1577         boolean isRemoteClosed() {
1578             return channel.isRemoteClosed();
1579         }
1580 
1581         boolean isLocalClosed() {
1582             return channel.isLocalClosed();
1583         }
1584 
1585         boolean isLocalReset() {
1586             return channel.isLocalReset();
1587         }
1588 
1589         void setRemoteEndStream() {
1590             channel.setRemoteEndStream();
1591         }
1592 
1593         void consumePromise(final List<Header> headers) throws HttpException, IOException {
1594             try {
1595                 handler.consumePromise(headers);
1596                 channel.setLocalEndStream();
1597             } catch (final ProtocolException ex) {
1598                 localReset(ex, H2Error.PROTOCOL_ERROR);
1599             }
1600         }
1601 
1602         void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1603             try {
1604                 handler.consumeHeader(headers, channel.isRemoteClosed());
1605             } catch (final ProtocolException ex) {
1606                 localReset(ex, H2Error.PROTOCOL_ERROR);
1607             }
1608         }
1609 
1610         void consumeData(final ByteBuffer src) throws HttpException, IOException {
1611             try {
1612                 handler.consumeData(src, channel.isRemoteClosed());
1613             } catch (final CharacterCodingException ex) {
1614                 localReset(ex, H2Error.INTERNAL_ERROR);
1615             } catch (final ProtocolException ex) {
1616                 localReset(ex, H2Error.PROTOCOL_ERROR);
1617             }
1618         }
1619 
1620         boolean isOutputReady() {
1621             return handler.isOutputReady();
1622         }
1623 
1624         void produceOutput() throws HttpException, IOException {
1625             try {
1626                 handler.produceOutput();
1627             } catch (final ProtocolException ex) {
1628                 localReset(ex, H2Error.PROTOCOL_ERROR);
1629             }
1630         }
1631 
1632         void produceInputCapacityUpdate() throws IOException {
1633             handler.updateInputCapacity();
1634         }
1635 
1636         void reset(final Exception cause) {
1637             channel.setRemoteEndStream();
1638             channel.setLocalEndStream();
1639             handler.failed(cause);
1640         }
1641 
1642         void localReset(final Exception cause, final int code) throws IOException {
1643             channel.localReset(code);
1644             handler.failed(cause);
1645         }
1646 
1647         void localReset(final Exception cause, final H2Error error) throws IOException {
1648             localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1649         }
1650 
1651         void localReset(final H2StreamResetException ex) throws IOException {
1652             localReset(ex, ex.getCode());
1653         }
1654 
1655         void handle(final HttpExceptionn.html#HttpException">HttpException ex) throws IOException, HttpException {
1656             handler.handle(ex, channel.isRemoteClosed());
1657         }
1658 
1659         HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1660             return handler.getPushHandlerFactory();
1661         }
1662 
1663         void cancel() {
1664             reset(new CancellationException("HTTP/2 message exchange cancelled"));
1665         }
1666 
1667         boolean abort() {
1668             final boolean cancelled = channel.cancel();
1669             handler.failed(new CancellationException("HTTP/2 message exchange cancelled"));
1670             return cancelled;
1671         }
1672 
1673         void releaseResources() {
1674             handler.releaseResources();
1675             channel.requestOutput();
1676         }
1677 
1678         void appendState(final StringBuilder buf) {
1679             buf.append("channel=[");
1680             channel.appendState(buf);
1681             buf.append("]");
1682         }
1683 
1684         @Override
1685         public String toString() {
1686             final StringBuilder buf = new StringBuilder();
1687             buf.append("[");
1688             appendState(buf);
1689             buf.append("]");
1690             return buf.toString();
1691         }
1692 
1693     }
1694 
1695 }