View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.nio.BufferOverflowException;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.SelectionKey;
34  import java.nio.charset.CharacterCodingException;
35  import java.util.Deque;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Queue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ConcurrentLinkedDeque;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  import java.util.concurrent.atomic.AtomicInteger;
44  
45  import javax.net.ssl.SSLSession;
46  
47  import org.apache.hc.core5.concurrent.CancellableDependency;
48  import org.apache.hc.core5.http.ConnectionClosedException;
49  import org.apache.hc.core5.http.EndpointDetails;
50  import org.apache.hc.core5.http.Header;
51  import org.apache.hc.core5.http.HttpConnection;
52  import org.apache.hc.core5.http.HttpException;
53  import org.apache.hc.core5.http.HttpStreamResetException;
54  import org.apache.hc.core5.http.HttpVersion;
55  import org.apache.hc.core5.http.ProtocolException;
56  import org.apache.hc.core5.http.ProtocolVersion;
57  import org.apache.hc.core5.http.RequestNotExecutedException;
58  import org.apache.hc.core5.http.config.CharCodingConfig;
59  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
60  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
61  import org.apache.hc.core5.http.impl.CharCodingSupport;
62  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
63  import org.apache.hc.core5.http.nio.AsyncPushProducer;
64  import org.apache.hc.core5.http.nio.HandlerFactory;
65  import org.apache.hc.core5.http.nio.command.ExecutableCommand;
66  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
67  import org.apache.hc.core5.http.protocol.HttpCoreContext;
68  import org.apache.hc.core5.http.protocol.HttpProcessor;
69  import org.apache.hc.core5.http2.H2ConnectionException;
70  import org.apache.hc.core5.http2.H2Error;
71  import org.apache.hc.core5.http2.H2StreamResetException;
72  import org.apache.hc.core5.http2.config.H2Config;
73  import org.apache.hc.core5.http2.config.H2Param;
74  import org.apache.hc.core5.http2.config.H2Setting;
75  import org.apache.hc.core5.http2.frame.FrameFactory;
76  import org.apache.hc.core5.http2.frame.FrameFlag;
77  import org.apache.hc.core5.http2.frame.FrameType;
78  import org.apache.hc.core5.http2.frame.RawFrame;
79  import org.apache.hc.core5.http2.frame.StreamIdGenerator;
80  import org.apache.hc.core5.http2.hpack.HPackDecoder;
81  import org.apache.hc.core5.http2.hpack.HPackEncoder;
82  import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
83  import org.apache.hc.core5.http2.nio.AsyncPingHandler;
84  import org.apache.hc.core5.http2.nio.command.PingCommand;
85  import org.apache.hc.core5.io.CloseMode;
86  import org.apache.hc.core5.reactor.Command;
87  import org.apache.hc.core5.reactor.ProtocolIOSession;
88  import org.apache.hc.core5.reactor.ssl.TlsDetails;
89  import org.apache.hc.core5.util.Args;
90  import org.apache.hc.core5.util.ByteArrayBuffer;
91  import org.apache.hc.core5.util.Identifiable;
92  import org.apache.hc.core5.util.Timeout;
93  
94  abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
95  
96      private static final long LINGER_TIME = 1000; // 1 second
97      private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; // 10 MiB
98  
99      enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
100     enum SettingsHandshake { READY, TRANSMITTED, ACKED }
101 
102     private final ProtocolIOSession ioSession;
103     private final FrameFactory frameFactory;
104     private final StreamIdGenerator idGenerator;
105     private final HttpProcessor httpProcessor;
106     private final H2Config localConfig;
107     private final BasicH2TransportMetrics inputMetrics;
108     private final BasicH2TransportMetrics outputMetrics;
109     private final BasicHttpConnectionMetrics connMetrics;
110     private final FrameInputBuffer inputBuffer;
111     private final FrameOutputBuffer outputBuffer;
112     private final Deque<RawFrame> outputQueue;
113     private final HPackEncoder hPackEncoder;
114     private final HPackDecoder hPackDecoder;
115     private final Map<Integer, H2Stream> streamMap;
116     private final Queue<AsyncPingHandler> pingHandlers;
117     private final AtomicInteger connInputWindow;
118     private final AtomicInteger connOutputWindow;
119     private final AtomicInteger outputRequests;
120     private final AtomicInteger lastStreamId;
121     private final H2StreamListener streamListener;
122 
123     private ConnectionHandshake connState = ConnectionHandshake.READY;
124     private SettingsHandshake localSettingState = SettingsHandshake.READY;
125     private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
126 
127     private int initInputWinSize;
128     private int initOutputWinSize;
129     private int lowMark;
130 
131     private volatile H2Config remoteConfig;
132 
133     private Continuation continuation;
134 
135     private int processedRemoteStreamId;
136     private EndpointDetails endpointDetails;
137     private boolean goAwayReceived;
138 
139     AbstractH2StreamMultiplexer(
140             final ProtocolIOSession ioSession,
141             final FrameFactory frameFactory,
142             final StreamIdGenerator idGenerator,
143             final HttpProcessor httpProcessor,
144             final CharCodingConfig charCodingConfig,
145             final H2Config h2Config,
146             final H2StreamListener streamListener) {
147         this.ioSession = Args.notNull(ioSession, "IO session");
148         this.frameFactory = Args.notNull(frameFactory, "Frame factory");
149         this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
150         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
151         this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
152         this.inputMetrics = new BasicH2TransportMetrics();
153         this.outputMetrics = new BasicH2TransportMetrics();
154         this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
155         this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
156         this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
157         this.outputQueue = new ConcurrentLinkedDeque<>();
158         this.pingHandlers = new ConcurrentLinkedQueue<>();
159         this.outputRequests = new AtomicInteger(0);
160         this.lastStreamId = new AtomicInteger(0);
161         this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
162         this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
163         this.streamMap = new ConcurrentHashMap<>();
164         this.remoteConfig = H2Config.INIT;
165         this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
166         this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
167 
168         this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
169         this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
170 
171         this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
172         this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
173         this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
174 
175         this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
176         this.streamListener = streamListener;
177     }
178 
179     @Override
180     public String getId() {
181         return ioSession.getId();
182     }
183 
184     abstract void acceptHeaderFrame() throws H2ConnectionException;
185 
186     abstract void acceptPushRequest() throws H2ConnectionException;
187 
188     abstract void acceptPushFrame() throws H2ConnectionException;
189 
190     abstract H2StreamHandler createRemotelyInitiatedStream(
191             H2StreamChannel channel,
192             HttpProcessor httpProcessor,
193             BasicHttpConnectionMetrics connMetrics,
194             HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
195 
196     abstract H2StreamHandler createLocallyInitiatedStream(
197             ExecutableCommand command,
198             H2StreamChannel channel,
199             HttpProcessor httpProcessor,
200             BasicHttpConnectionMetrics connMetrics) throws IOException;
201 
202     private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
203         for (;;) {
204             final int current = window.get();
205             long newValue = (long) current + delta;
206 
207             //TODO: work-around for what looks like a bug in Ngnix (1.11)
208             // Tolerate if the update window exceeded by one
209             if (newValue == 0x80000000L) {
210                 newValue = Integer.MAX_VALUE;
211             }
212             //TODO: needs to be removed
213 
214             if (Math.abs(newValue) > 0x7fffffffL) {
215                 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
216             }
217             if (window.compareAndSet(current, (int) newValue)) {
218                 return (int) newValue;
219             }
220         }
221     }
222 
223     private int updateInputWindow(
224             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
225         final int newSize = updateWindow(window, delta);
226         if (streamListener != null) {
227             streamListener.onInputFlowControl(this, streamId, delta, newSize);
228         }
229         return newSize;
230     }
231 
232     private int updateOutputWindow(
233             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
234         final int newSize = updateWindow(window, delta);
235         if (streamListener != null) {
236             streamListener.onOutputFlowControl(this, streamId, delta, newSize);
237         }
238         return newSize;
239     }
240 
241     private void commitFrameInternal(final RawFrame frame) throws IOException {
242         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
243             if (streamListener != null) {
244                 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
245             }
246             outputBuffer.write(frame, ioSession);
247         } else {
248             outputQueue.addLast(frame);
249         }
250         ioSession.setEvent(SelectionKey.OP_WRITE);
251     }
252 
253     private void commitFrame(final RawFrame frame) throws IOException {
254         Args.notNull(frame, "Frame");
255         ioSession.getLock().lock();
256         try {
257             commitFrameInternal(frame);
258         } finally {
259             ioSession.getLock().unlock();
260         }
261     }
262 
263     private void commitHeaders(
264             final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
265         if (streamListener != null) {
266             streamListener.onHeaderOutput(this, streamId, headers);
267         }
268         final ByteArrayBuffer buf = new ByteArrayBuffer(512);
269         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
270 
271         int off = 0;
272         int remaining = buf.length();
273         boolean continuation = false;
274 
275         while (remaining > 0) {
276             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
277             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
278 
279             remaining -= chunk;
280             off += chunk;
281 
282             final boolean endHeaders = remaining == 0;
283             final RawFrame frame;
284             if (!continuation) {
285                 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
286                 continuation = true;
287             } else {
288                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
289             }
290             commitFrameInternal(frame);
291         }
292     }
293 
294     private void commitPushPromise(
295             final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
296         if (headers == null || headers.isEmpty()) {
297             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
298         }
299         if (streamListener != null) {
300             streamListener.onHeaderOutput(this, streamId, headers);
301         }
302         final ByteArrayBuffer buf = new ByteArrayBuffer(512);
303         buf.append((byte)(promisedStreamId >> 24));
304         buf.append((byte)(promisedStreamId >> 16));
305         buf.append((byte)(promisedStreamId >> 8));
306         buf.append((byte)(promisedStreamId));
307 
308         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
309 
310         int off = 0;
311         int remaining = buf.length();
312         boolean continuation = false;
313 
314         while (remaining > 0) {
315             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
316             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
317 
318             remaining -= chunk;
319             off += chunk;
320 
321             final boolean endHeaders = remaining == 0;
322             final RawFrame frame;
323             if (!continuation) {
324                 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
325                 continuation = true;
326             } else {
327                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
328             }
329             commitFrameInternal(frame);
330         }
331     }
332 
333     private void streamDataFrame(
334             final int streamId,
335             final AtomicInteger streamOutputWindow,
336             final ByteBuffer payload,
337             final int chunk) throws IOException {
338         final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
339         if (streamListener != null) {
340             streamListener.onFrameOutput(this, streamId, dataFrame);
341         }
342         updateOutputWindow(0, connOutputWindow, -chunk);
343         updateOutputWindow(streamId, streamOutputWindow, -chunk);
344         outputBuffer.write(dataFrame, ioSession);
345     }
346 
347     private int streamData(
348             final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
349         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
350             final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
351             if (capacity <= 0) {
352                 return 0;
353             }
354             final int maxPayloadSize = Math.min(capacity, outputBuffer.getMaxFramePayloadSize());
355             final int chunk;
356             if (payload.remaining() <= maxPayloadSize) {
357                 chunk = payload.remaining();
358                 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
359             } else {
360                 chunk = maxPayloadSize;
361                 final int originalLimit = payload.limit();
362                 try {
363                     payload.limit(payload.position() + chunk);
364                     streamDataFrame(streamId, streamOutputWindow, payload, chunk);
365                 } finally {
366                     payload.limit(originalLimit);
367                 }
368             }
369             payload.position(payload.position() + chunk);
370             ioSession.setEvent(SelectionKey.OP_WRITE);
371             return chunk;
372         }
373         return 0;
374     }
375 
376     private void incrementInputCapacity(
377             final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
378         if (inputCapacity > 0) {
379             final int streamWinSize = inputWindow.get();
380             final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
381             final int chunk = Math.min(inputCapacity, remainingCapacity);
382             if (chunk != 0) {
383                 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
384                 commitFrame(windowUpdateFrame);
385                 updateInputWindow(streamId, inputWindow, chunk);
386             }
387         }
388     }
389 
390     private void requestSessionOutput() {
391         outputRequests.incrementAndGet();
392         ioSession.setEvent(SelectionKey.OP_WRITE);
393     }
394 
395     private void updateLastStreamId(final int streamId) {
396         final int currentId = lastStreamId.get();
397         if (streamId > currentId) {
398             lastStreamId.compareAndSet(currentId, streamId);
399         }
400     }
401 
402     private int generateStreamId() {
403         for (;;) {
404             final int currentId = lastStreamId.get();
405             final int newStreamId = idGenerator.generate(currentId);
406             if (lastStreamId.compareAndSet(currentId, newStreamId)) {
407                 return newStreamId;
408             }
409         }
410     }
411 
412     public final void onConnect() throws HttpException, IOException {
413         connState = ConnectionHandshake.ACTIVE;
414         final RawFrame settingsFrame = frameFactory.createSettings(
415                 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
416                 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
417                 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
418                 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
419                 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
420                 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
421 
422         commitFrame(settingsFrame);
423         localSettingState = SettingsHandshake.TRANSMITTED;
424         maximizeConnWindow(connInputWindow.get());
425 
426         if (streamListener != null) {
427             final int initInputWindow = connInputWindow.get();
428             streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
429             final int initOutputWindow = connOutputWindow.get();
430             streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
431         }
432     }
433 
434     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
435         if (connState == ConnectionHandshake.SHUTDOWN) {
436             ioSession.clearEvent(SelectionKey.OP_READ);
437         } else {
438             for (;;) {
439                 final RawFrame frame = inputBuffer.read(src, ioSession);
440                 if (frame == null) {
441                     break;
442                 }
443                 if (streamListener != null) {
444                     streamListener.onFrameInput(this, frame.getStreamId(), frame);
445                 }
446                 consumeFrame(frame);
447             }
448         }
449     }
450 
451     public final void onOutput() throws HttpException, IOException {
452         ioSession.getLock().lock();
453         try {
454             if (!outputBuffer.isEmpty()) {
455                 outputBuffer.flush(ioSession);
456             }
457             while (outputBuffer.isEmpty()) {
458                 final RawFrame frame = outputQueue.poll();
459                 if (frame != null) {
460                     if (streamListener != null) {
461                         streamListener.onFrameOutput(this, frame.getStreamId(), frame);
462                     }
463                     outputBuffer.write(frame, ioSession);
464                 } else {
465                     break;
466                 }
467             }
468         } finally {
469             ioSession.getLock().unlock();
470         }
471 
472         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
473 
474             if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
475                 produceOutput();
476             }
477             final int pendingOutputRequests = outputRequests.get();
478             boolean outputPending = false;
479             if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
480                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
481                     final Map.Entry<Integer, H2Stream> entry = it.next();
482                     final H2Stream stream = entry.getValue();
483                     if (!stream.isLocalClosed()
484                             && stream.getOutputWindow().get() > 0
485                             && stream.isOutputReady()) {
486                         outputPending = true;
487                         break;
488                     }
489                 }
490             }
491             ioSession.getLock().lock();
492             try {
493                 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
494                         && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
495                     ioSession.clearEvent(SelectionKey.OP_WRITE);
496                 } else {
497                     outputRequests.addAndGet(-pendingOutputRequests);
498                 }
499             } finally {
500                 ioSession.getLock().unlock();
501             }
502         }
503 
504         if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
505             processPendingCommands();
506         }
507         if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
508             int liveStreams = 0;
509             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
510                 final Map.Entry<Integer, H2Stream> entry = it.next();
511                 final H2Stream stream = entry.getValue();
512                 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
513                     stream.releaseResources();
514                     it.remove();
515                 } else {
516                     if (idGenerator.isSameSide(stream.getId()) || stream.getId() <= processedRemoteStreamId) {
517                         liveStreams++;
518                     }
519                 }
520             }
521             if (liveStreams == 0) {
522                 connState = ConnectionHandshake.SHUTDOWN;
523             }
524         }
525         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
526             if (!streamMap.isEmpty()) {
527                 for (final H2Stream stream : streamMap.values()) {
528                     stream.releaseResources();
529                 }
530                 streamMap.clear();
531             }
532             ioSession.getLock().lock();
533             try {
534                 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
535                     ioSession.close();
536                 }
537             } finally {
538                 ioSession.getLock().unlock();
539             }
540         }
541     }
542 
543     public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
544         connState = ConnectionHandshake.SHUTDOWN;
545 
546         final RawFrame goAway;
547         if (localSettingState != SettingsHandshake.ACKED) {
548             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
549                             "Setting timeout (" + timeout + ")");
550         } else {
551             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
552                             "Timeout due to inactivity (" + timeout + ")");
553         }
554         commitFrame(goAway);
555         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
556             final Map.Entry<Integer, H2Stream> entry = it.next();
557             final H2Stream stream = entry.getValue();
558             stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
559         }
560         streamMap.clear();
561     }
562 
563     public final void onDisconnect() {
564         for (;;) {
565             final AsyncPingHandler pingHandler = pingHandlers.poll();
566             if (pingHandler != null) {
567                 pingHandler.cancel();
568             } else {
569                 break;
570             }
571         }
572         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
573             final Map.Entry<Integer, H2Stream> entry = it.next();
574             final H2Stream stream = entry.getValue();
575             stream.cancel();
576         }
577         for (;;) {
578             final Command command = ioSession.poll();
579             if (command != null) {
580                 if (command instanceof ExecutableCommand) {
581                     ((ExecutableCommand) command).failed(new ConnectionClosedException());
582                 } else {
583                     command.cancel();
584                 }
585             } else {
586                 break;
587             }
588         }
589     }
590 
591     private void processPendingCommands() throws IOException, HttpException {
592         while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
593             final Command command = ioSession.poll();
594             if (command == null) {
595                 break;
596             }
597             if (command instanceof ShutdownCommand) {
598                 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
599                 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
600                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
601                         final Map.Entry<Integer, H2Stream> entry = it.next();
602                         final H2Stream stream = entry.getValue();
603                         stream.cancel();
604                     }
605                     streamMap.clear();
606                     connState = ConnectionHandshake.SHUTDOWN;
607                 } else {
608                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
609                         final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
610                         commitFrame(goAway);
611                         connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
612                     }
613                 }
614                 break;
615             } else if (command instanceof PingCommand) {
616                 final PingCommand pingCommand = (PingCommand) command;
617                 final AsyncPingHandler handler = pingCommand.getHandler();
618                 pingHandlers.add(handler);
619                 final RawFrame ping = frameFactory.createPing(handler.getData());
620                 commitFrame(ping);
621             } else if (command instanceof ExecutableCommand) {
622                 final int streamId = generateStreamId();
623                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
624                         streamId, true, initInputWinSize, initOutputWinSize);
625                 final ExecutableCommand executableCommand = (ExecutableCommand) command;
626                 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
627                         executableCommand, channel, httpProcessor, connMetrics);
628 
629                 final H2Stream stream = new H2Stream(channel, streamHandler, false);
630                 streamMap.put(streamId, stream);
631 
632                 if (streamListener != null) {
633                     final int initInputWindow = stream.getInputWindow().get();
634                     streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
635                     final int initOutputWindow = stream.getOutputWindow().get();
636                     streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
637                 }
638 
639                 if (stream.isOutputReady()) {
640                     stream.produceOutput();
641                 }
642                 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
643                 if (cancellableDependency != null) {
644                     cancellableDependency.setDependency(stream::abort);
645                 }
646                 if (!outputQueue.isEmpty()) {
647                     return;
648                 }
649             }
650         }
651     }
652 
653     public final void onException(final Exception cause) {
654         try {
655             for (;;) {
656                 final AsyncPingHandler pingHandler = pingHandlers.poll();
657                 if (pingHandler != null) {
658                     pingHandler.failed(cause);
659                 } else {
660                     break;
661                 }
662             }
663             for (;;) {
664                 final Command command = ioSession.poll();
665                 if (command != null) {
666                     if (command instanceof ExecutableCommand) {
667                         ((ExecutableCommand) command).failed(new ConnectionClosedException());
668                     } else {
669                         command.cancel();
670                     }
671                 } else {
672                     break;
673                 }
674             }
675             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
676                 final Map.Entry<Integer, H2Stream> entry = it.next();
677                 final H2Stream stream = entry.getValue();
678                 stream.reset(cause);
679             }
680             streamMap.clear();
681             if (!(cause instanceof ConnectionClosedException)) {
682                 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
683                     final H2Error errorCode;
684                     if (cause instanceof H2ConnectionException) {
685                         errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
686                     } else if (cause instanceof ProtocolException){
687                         errorCode = H2Error.PROTOCOL_ERROR;
688                     } else {
689                         errorCode = H2Error.INTERNAL_ERROR;
690                     }
691                     final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
692                     commitFrame(goAway);
693                 }
694             }
695         } catch (final IOException ignore) {
696         } finally {
697             connState = ConnectionHandshake.SHUTDOWN;
698             final CloseMode closeMode;
699             if (cause instanceof ConnectionClosedException) {
700                 closeMode = CloseMode.GRACEFUL;
701             } else if (cause instanceof IOException) {
702                 closeMode = CloseMode.IMMEDIATE;
703             } else {
704                 closeMode = CloseMode.GRACEFUL;
705             }
706             ioSession.close(closeMode);
707         }
708     }
709 
710     private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
711         if (streamId == 0) {
712             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
713         }
714         final H2Stream stream = streamMap.get(streamId);
715         if (stream == null) {
716             if (streamId <= lastStreamId.get()) {
717                 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
718             } else {
719                 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
720             }
721         }
722         return stream;
723     }
724 
725     private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
726         final FrameType frameType = FrameType.valueOf(frame.getType());
727         final int streamId = frame.getStreamId();
728         if (continuation != null && frameType != FrameType.CONTINUATION) {
729             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
730         }
731         switch (frameType) {
732             case DATA: {
733                 final H2Stream stream = getValidStream(streamId);
734                 try {
735                     consumeDataFrame(frame, stream);
736                 } catch (final H2StreamResetException ex) {
737                     stream.localReset(ex);
738                 } catch (final HttpStreamResetException ex) {
739                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
740                 }
741 
742                 if (stream.isTerminated()) {
743                     streamMap.remove(streamId);
744                     stream.releaseResources();
745                     requestSessionOutput();
746                 }
747             }
748             break;
749             case HEADERS: {
750                 if (streamId == 0) {
751                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
752                 }
753                 H2Stream stream = streamMap.get(streamId);
754                 if (stream == null) {
755                     acceptHeaderFrame();
756 
757                     if (idGenerator.isSameSide(streamId)) {
758                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
759                     }
760                     if (goAwayReceived ) {
761                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
762                     }
763 
764                     updateLastStreamId(streamId);
765 
766                     final H2StreamChannelImpl channel = new H2StreamChannelImpl(
767                             streamId, false, initInputWinSize, initOutputWinSize);
768                     final H2StreamHandler streamHandler;
769                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
770                         streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null);
771                     } else {
772                         streamHandler = NoopH2StreamHandler.INSTANCE;
773                         channel.setLocalEndStream();
774                     }
775 
776                     stream = new H2Stream(channel, streamHandler, true);
777                     if (stream.isOutputReady()) {
778                         stream.produceOutput();
779                     }
780                     streamMap.put(streamId, stream);
781                 }
782 
783                 try {
784                     consumeHeaderFrame(frame, stream);
785 
786                     if (stream.isOutputReady()) {
787                         stream.produceOutput();
788                     }
789                 } catch (final H2StreamResetException ex) {
790                     stream.localReset(ex);
791                 } catch (final HttpStreamResetException ex) {
792                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
793                 } catch (final HttpException ex) {
794                     stream.handle(ex);
795                 }
796 
797                 if (stream.isTerminated()) {
798                     streamMap.remove(streamId);
799                     stream.releaseResources();
800                     requestSessionOutput();
801                 }
802             }
803             break;
804             case CONTINUATION: {
805                 if (continuation == null) {
806                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
807                 }
808                 if (streamId != continuation.streamId) {
809                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
810                 }
811 
812                 final H2Stream stream = getValidStream(streamId);
813                 try {
814 
815                     consumeContinuationFrame(frame, stream);
816                 } catch (final H2StreamResetException ex) {
817                     stream.localReset(ex);
818                 } catch (final HttpStreamResetException ex) {
819                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
820                 }
821 
822                 if (stream.isTerminated()) {
823                     streamMap.remove(streamId);
824                     stream.releaseResources();
825                     requestSessionOutput();
826                 }
827             }
828             break;
829             case WINDOW_UPDATE: {
830                 final ByteBuffer payload = frame.getPayload();
831                 if (payload == null || payload.remaining() != 4) {
832                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
833                 }
834                 final int delta = payload.getInt();
835                 if (delta <= 0) {
836                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
837                 }
838                 if (streamId == 0) {
839                     try {
840                         updateOutputWindow(0, connOutputWindow, delta);
841                     } catch (final ArithmeticException ex) {
842                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
843                     }
844                 } else {
845                     final H2Stream stream = streamMap.get(streamId);
846                     if (stream != null) {
847                         try {
848                             updateOutputWindow(streamId, stream.getOutputWindow(), delta);
849                         } catch (final ArithmeticException ex) {
850                             throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
851                         }
852                     }
853                 }
854                 ioSession.setEvent(SelectionKey.OP_WRITE);
855             }
856             break;
857             case RST_STREAM: {
858                 if (streamId == 0) {
859                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
860                 }
861                 final H2Stream stream = streamMap.get(streamId);
862                 if (stream == null) {
863                     if (streamId > lastStreamId.get()) {
864                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
865                     }
866                 } else {
867                     final ByteBuffer payload = frame.getPayload();
868                     if (payload == null || payload.remaining() != 4) {
869                         throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
870                     }
871                     final int errorCode = payload.getInt();
872                     stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
873                     streamMap.remove(streamId);
874                     stream.releaseResources();
875                     requestSessionOutput();
876                 }
877             }
878             break;
879             case PING: {
880                 if (streamId != 0) {
881                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
882                 }
883                 final ByteBuffer ping = frame.getPayloadContent();
884                 if (ping == null || ping.remaining() != 8) {
885                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
886                 }
887                 if (frame.isFlagSet(FrameFlag.ACK)) {
888                     final AsyncPingHandler pingHandler = pingHandlers.poll();
889                     if (pingHandler != null) {
890                         pingHandler.consumeResponse(ping);
891                     }
892                 } else {
893                     final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
894                     pong.put(ping);
895                     pong.flip();
896                     final RawFrame response = frameFactory.createPingAck(pong);
897                     commitFrame(response);
898                 }
899             }
900             break;
901             case SETTINGS: {
902                 if (streamId != 0) {
903                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
904                 }
905                 if (frame.isFlagSet(FrameFlag.ACK)) {
906                     if (localSettingState == SettingsHandshake.TRANSMITTED) {
907                         localSettingState = SettingsHandshake.ACKED;
908                         ioSession.setEvent(SelectionKey.OP_WRITE);
909                         applyLocalSettings();
910                     }
911                 } else {
912                     final ByteBuffer payload = frame.getPayload();
913                     if (payload != null) {
914                         if ((payload.remaining() % 6) != 0) {
915                             throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
916                         }
917                         consumeSettingsFrame(payload);
918                         remoteSettingState = SettingsHandshake.TRANSMITTED;
919                     }
920                     // Send ACK
921                     final RawFrame response = frameFactory.createSettingsAck();
922                     commitFrame(response);
923                     remoteSettingState = SettingsHandshake.ACKED;
924                 }
925             }
926             break;
927             case PRIORITY:
928                 // Stream priority not supported
929                 break;
930             case PUSH_PROMISE: {
931                 acceptPushFrame();
932 
933                 if (goAwayReceived ) {
934                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
935                 }
936 
937                 if (!localConfig.isPushEnabled()) {
938                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
939                 }
940 
941                 final H2Stream stream = getValidStream(streamId);
942                 if (stream.isRemoteClosed()) {
943                     stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
944                     break;
945                 }
946 
947                 final ByteBuffer payload = frame.getPayloadContent();
948                 if (payload == null || payload.remaining() < 4) {
949                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
950                 }
951                 final int promisedStreamId = payload.getInt();
952                 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
953                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
954                 }
955                 if (streamMap.get(promisedStreamId) != null) {
956                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
957                 }
958 
959                 updateLastStreamId(promisedStreamId);
960 
961                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
962                         promisedStreamId, false, initInputWinSize, initOutputWinSize);
963                 final H2StreamHandler streamHandler;
964                 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
965                     streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics,
966                             stream.getPushHandlerFactory());
967                 } else {
968                     streamHandler = NoopH2StreamHandler.INSTANCE;
969                     channel.setLocalEndStream();
970                 }
971 
972                 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
973                 streamMap.put(promisedStreamId, promisedStream);
974 
975                 try {
976                     consumePushPromiseFrame(frame, payload, promisedStream);
977                 } catch (final H2StreamResetException ex) {
978                     promisedStream.localReset(ex);
979                 } catch (final HttpStreamResetException ex) {
980                     promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
981                 }
982             }
983             break;
984             case GOAWAY: {
985                 if (streamId != 0) {
986                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
987                 }
988                 final ByteBuffer payload = frame.getPayload();
989                 if (payload == null || payload.remaining() < 8) {
990                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
991                 }
992                 final int processedLocalStreamId = payload.getInt();
993                 final int errorCode = payload.getInt();
994                 goAwayReceived = true;
995                 if (errorCode == H2Error.NO_ERROR.getCode()) {
996                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
997                         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
998                             final Map.Entry<Integer, H2Stream> entry = it.next();
999                             final int activeStreamId = entry.getKey();
1000                             if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
1001                                 final H2Stream stream = entry.getValue();
1002                                 stream.cancel();
1003                                 it.remove();
1004                             }
1005                         }
1006                     }
1007                     connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
1008                 } else {
1009                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1010                         final Map.Entry<Integer, H2Stream> entry = it.next();
1011                         final H2Stream stream = entry.getValue();
1012                         stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
1013                     }
1014                     streamMap.clear();
1015                     connState = ConnectionHandshake.SHUTDOWN;
1016                 }
1017             }
1018             ioSession.setEvent(SelectionKey.OP_WRITE);
1019             break;
1020         }
1021     }
1022 
1023     private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1024         final int streamId = stream.getId();
1025         final ByteBuffer payload = frame.getPayloadContent();
1026         if (payload != null) {
1027             final int frameLength = frame.getLength();
1028             final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1029             if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1030                 stream.produceInputCapacityUpdate();
1031             }
1032             final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1033             if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1034                 maximizeConnWindow(connWinSize);
1035             }
1036         }
1037         if (stream.isRemoteClosed()) {
1038             throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1039         }
1040         if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1041             stream.setRemoteEndStream();
1042         }
1043         if (stream.isLocalReset()) {
1044             return;
1045         }
1046         stream.consumeData(payload);
1047     }
1048 
1049     private void maximizeConnWindow(final int connWinSize) throws IOException {
1050         final int delta = Integer.MAX_VALUE - connWinSize;
1051         if (delta > 0) {
1052             final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1053             commitFrame(windowUpdateFrame);
1054             updateInputWindow(0, connInputWindow, delta);
1055         }
1056     }
1057 
1058     private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1059         final int promisedStreamId = promisedStream.getId();
1060         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1061             continuation = new Continuation(promisedStreamId, frame.getType(), true);
1062         }
1063         if (continuation == null) {
1064             final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1065             if (promisedStreamId > processedRemoteStreamId) {
1066                 processedRemoteStreamId = promisedStreamId;
1067             }
1068             if (streamListener != null) {
1069                 streamListener.onHeaderInput(this, promisedStreamId, headers);
1070             }
1071             promisedStream.consumePromise(headers);
1072         } else {
1073             continuation.copyPayload(payload);
1074         }
1075     }
1076 
1077     List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1078         return hPackDecoder.decodeHeaders(payload);
1079     }
1080 
1081     private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1082         final int streamId = stream.getId();
1083         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1084             continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1085         }
1086         final ByteBuffer payload = frame.getPayloadContent();
1087         if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1088             // Priority not supported
1089             payload.getInt();
1090             payload.get();
1091         }
1092         if (continuation == null) {
1093             final List<Header> headers = decodeHeaders(payload);
1094             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1095                 processedRemoteStreamId = streamId;
1096             }
1097             if (streamListener != null) {
1098                 streamListener.onHeaderInput(this, streamId, headers);
1099             }
1100             if (stream.isRemoteClosed()) {
1101                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1102             }
1103             if (stream.isLocalReset()) {
1104                 return;
1105             }
1106             if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1107                 stream.setRemoteEndStream();
1108             }
1109             stream.consumeHeader(headers);
1110         } else {
1111             continuation.copyPayload(payload);
1112         }
1113     }
1114 
1115     private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1116         final int streamId = frame.getStreamId();
1117         final ByteBuffer payload = frame.getPayload();
1118         continuation.copyPayload(payload);
1119         if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1120             final List<Header> headers = decodeHeaders(continuation.getContent());
1121             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1122                 processedRemoteStreamId = streamId;
1123             }
1124             if (streamListener != null) {
1125                 streamListener.onHeaderInput(this, streamId, headers);
1126             }
1127             if (stream.isRemoteClosed()) {
1128                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1129             }
1130             if (stream.isLocalReset()) {
1131                 return;
1132             }
1133             if (continuation.endStream) {
1134                 stream.setRemoteEndStream();
1135             }
1136             if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1137                 stream.consumePromise(headers);
1138             } else {
1139                 stream.consumeHeader(headers);
1140             }
1141             continuation = null;
1142         }
1143     }
1144 
1145     private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1146         final H2Config.Builder configBuilder = H2Config.initial();
1147         while (payload.hasRemaining()) {
1148             final int code = payload.getShort();
1149             final int value = payload.getInt();
1150             final H2Param param = H2Param.valueOf(code);
1151             if (param != null) {
1152                 switch (param) {
1153                     case HEADER_TABLE_SIZE:
1154                         try {
1155                             configBuilder.setHeaderTableSize(value);
1156                         } catch (final IllegalArgumentException ex) {
1157                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1158                         }
1159                         break;
1160                     case MAX_CONCURRENT_STREAMS:
1161                         try {
1162                             configBuilder.setMaxConcurrentStreams(value);
1163                         } catch (final IllegalArgumentException ex) {
1164                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1165                         }
1166                         break;
1167                     case ENABLE_PUSH:
1168                         configBuilder.setPushEnabled(value == 1);
1169                         break;
1170                     case INITIAL_WINDOW_SIZE:
1171                         try {
1172                             configBuilder.setInitialWindowSize(value);
1173                         } catch (final IllegalArgumentException ex) {
1174                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1175                         }
1176                         break;
1177                     case MAX_FRAME_SIZE:
1178                         try {
1179                             configBuilder.setMaxFrameSize(value);
1180                         } catch (final IllegalArgumentException ex) {
1181                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1182                         }
1183                         break;
1184                     case MAX_HEADER_LIST_SIZE:
1185                         try {
1186                             configBuilder.setMaxHeaderListSize(value);
1187                         } catch (final IllegalArgumentException ex) {
1188                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1189                         }
1190                         break;
1191                 }
1192             }
1193         }
1194         applyRemoteSettings(configBuilder.build());
1195     }
1196 
1197     private void produceOutput() throws HttpException, IOException {
1198         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1199             final Map.Entry<Integer, H2Stream> entry = it.next();
1200             final H2Stream stream = entry.getValue();
1201             if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1202                 stream.produceOutput();
1203             }
1204             if (stream.isTerminated()) {
1205                 it.remove();
1206                 stream.releaseResources();
1207                 requestSessionOutput();
1208             }
1209             if (!outputQueue.isEmpty()) {
1210                 break;
1211             }
1212         }
1213     }
1214 
1215     private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1216         remoteConfig = config;
1217 
1218         hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1219         final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1220         initOutputWinSize = remoteConfig.getInitialWindowSize();
1221 
1222         final int maxFrameSize = remoteConfig.getMaxFrameSize();
1223         if (maxFrameSize < outputBuffer.getMaxFramePayloadSize()) {
1224             try {
1225                 outputBuffer.resize(maxFrameSize);
1226             } catch (final BufferOverflowException ex) {
1227                 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Failure resizing the frame output buffer");
1228             }
1229         }
1230 
1231         if (delta != 0) {
1232             if (!streamMap.isEmpty()) {
1233                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1234                     final Map.Entry<Integer, H2Stream> entry = it.next();
1235                     final H2Stream stream = entry.getValue();
1236                     try {
1237                         updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1238                     } catch (final ArithmeticException ex) {
1239                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1240                     }
1241                 }
1242             }
1243         }
1244     }
1245 
1246     private void applyLocalSettings() throws H2ConnectionException {
1247         hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1248         hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1249 
1250         final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1251         initInputWinSize = localConfig.getInitialWindowSize();
1252 
1253         if (delta != 0 && !streamMap.isEmpty()) {
1254             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1255                 final Map.Entry<Integer, H2Stream> entry = it.next();
1256                 final H2Stream stream = entry.getValue();
1257                 try {
1258                     updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1259                 } catch (final ArithmeticException ex) {
1260                     throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1261                 }
1262             }
1263         }
1264         lowMark = initInputWinSize / 2;
1265     }
1266 
1267     @Override
1268     public void close() throws IOException {
1269         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1270     }
1271 
1272     @Override
1273     public void close(final CloseMode closeMode) {
1274         ioSession.close(closeMode);
1275     }
1276 
1277     @Override
1278     public boolean isOpen() {
1279         return connState == ConnectionHandshake.ACTIVE;
1280     }
1281 
1282     @Override
1283     public void setSocketTimeout(final Timeout timeout) {
1284         ioSession.setSocketTimeout(timeout);
1285     }
1286 
1287     @Override
1288     public SSLSession getSSLSession() {
1289         final TlsDetails tlsDetails = ioSession.getTlsDetails();
1290         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1291     }
1292 
1293     @Override
1294     public EndpointDetails getEndpointDetails() {
1295         if (endpointDetails == null) {
1296             endpointDetails = new BasicEndpointDetails(
1297                     ioSession.getRemoteAddress(),
1298                     ioSession.getLocalAddress(),
1299                     connMetrics,
1300                     ioSession.getSocketTimeout());
1301         }
1302         return endpointDetails;
1303     }
1304 
1305     @Override
1306     public Timeout getSocketTimeout() {
1307         return ioSession.getSocketTimeout();
1308     }
1309 
1310     @Override
1311     public ProtocolVersion getProtocolVersion() {
1312         return HttpVersion.HTTP_2;
1313     }
1314 
1315     @Override
1316     public SocketAddress getRemoteAddress() {
1317         return ioSession.getRemoteAddress();
1318     }
1319 
1320     @Override
1321     public SocketAddress getLocalAddress() {
1322         return ioSession.getLocalAddress();
1323     }
1324 
1325     void appendState(final StringBuilder buf) {
1326         buf.append("connState=").append(connState)
1327                 .append(", connInputWindow=").append(connInputWindow)
1328                 .append(", connOutputWindow=").append(connOutputWindow)
1329                 .append(", outputQueue=").append(outputQueue.size())
1330                 .append(", streamMap=").append(streamMap.size())
1331                 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1332     }
1333 
1334     private static class Continuation {
1335 
1336         final int streamId;
1337         final int type;
1338         final boolean endStream;
1339         final ByteArrayBuffer headerBuffer;
1340 
1341         private Continuation(final int streamId, final int type, final boolean endStream) {
1342             this.streamId = streamId;
1343             this.type = type;
1344             this.endStream = endStream;
1345             this.headerBuffer = new ByteArrayBuffer(1024);
1346         }
1347 
1348         void copyPayload(final ByteBuffer payload) {
1349             if (payload == null) {
1350                 return;
1351             }
1352             headerBuffer.ensureCapacity(payload.remaining());
1353             payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1354         }
1355 
1356         ByteBuffer getContent() {
1357             return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1358         }
1359 
1360     }
1361 
1362     private class H2StreamChannelImpl implements H2StreamChannel {
1363 
1364         private final int id;
1365         private final AtomicInteger inputWindow;
1366         private final AtomicInteger outputWindow;
1367 
1368         private volatile boolean idle;
1369         private volatile boolean remoteEndStream;
1370         private volatile boolean localEndStream;
1371 
1372         private volatile long deadline;
1373 
1374         H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1375             this.id = id;
1376             this.idle = idle;
1377             this.inputWindow = new AtomicInteger(initialInputWindowSize);
1378             this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1379         }
1380 
1381         int getId() {
1382             return id;
1383         }
1384 
1385         AtomicInteger getOutputWindow() {
1386             return outputWindow;
1387         }
1388 
1389         AtomicInteger getInputWindow() {
1390             return inputWindow;
1391         }
1392 
1393         @Override
1394         public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1395             ioSession.getLock().lock();
1396             try {
1397                 if (headers == null || headers.isEmpty()) {
1398                     throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1399                 }
1400                 if (localEndStream) {
1401                     return;
1402                 }
1403                 idle = false;
1404                 commitHeaders(id, headers, endStream);
1405                 if (endStream) {
1406                     localEndStream = true;
1407                 }
1408             } finally {
1409                 ioSession.getLock().unlock();
1410             }
1411         }
1412 
1413         @Override
1414         public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1415             acceptPushRequest();
1416             final int promisedStreamId = generateStreamId();
1417             final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1418                     promisedStreamId,
1419                     true,
1420                     localConfig.getInitialWindowSize(),
1421                     remoteConfig.getInitialWindowSize());
1422             final HttpCoreContext context = HttpCoreContext.create();
1423             context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1424             context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1425             final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1426                     channel, httpProcessor, connMetrics, pushProducer, context);
1427             final H2Stream stream = new H2Stream(channel, streamHandler, false);
1428             streamMap.put(promisedStreamId, stream);
1429 
1430             ioSession.getLock().lock();
1431             try {
1432                 if (localEndStream) {
1433                     stream.releaseResources();
1434                     return;
1435                 }
1436                 commitPushPromise(id, promisedStreamId, headers);
1437                 idle = false;
1438             } finally {
1439                 ioSession.getLock().unlock();
1440             }
1441         }
1442 
1443         @Override
1444         public void update(final int increment) throws IOException {
1445             if (remoteEndStream) {
1446                 return;
1447             }
1448             incrementInputCapacity(0, connInputWindow, increment);
1449             incrementInputCapacity(id, inputWindow, increment);
1450         }
1451 
1452         @Override
1453         public int write(final ByteBuffer payload) throws IOException {
1454             ioSession.getLock().lock();
1455             try {
1456                 if (localEndStream) {
1457                     return 0;
1458                 }
1459                 return streamData(id, outputWindow, payload);
1460             } finally {
1461                 ioSession.getLock().unlock();
1462             }
1463         }
1464 
1465         @Override
1466         public void endStream(final List<? extends Header> trailers) throws IOException {
1467             ioSession.getLock().lock();
1468             try {
1469                 if (localEndStream) {
1470                     return;
1471                 }
1472                 localEndStream = true;
1473                 if (trailers != null && !trailers.isEmpty()) {
1474                     commitHeaders(id, trailers, true);
1475                 } else {
1476                     final RawFrame frame = frameFactory.createData(id, null, true);
1477                     commitFrameInternal(frame);
1478                 }
1479             } finally {
1480                 ioSession.getLock().unlock();
1481             }
1482         }
1483 
1484         @Override
1485         public void endStream() throws IOException {
1486             endStream(null);
1487         }
1488 
1489         @Override
1490         public void requestOutput() {
1491             requestSessionOutput();
1492         }
1493 
1494         boolean isRemoteClosed() {
1495             return remoteEndStream;
1496         }
1497 
1498         void setRemoteEndStream() {
1499             remoteEndStream = true;
1500         }
1501 
1502         boolean isLocalClosed() {
1503             return localEndStream;
1504         }
1505 
1506         void setLocalEndStream() {
1507             localEndStream = true;
1508         }
1509 
1510         boolean isLocalReset() {
1511             return deadline > 0;
1512         }
1513 
1514         boolean isResetDeadline() {
1515             final long l = deadline;
1516             return l > 0 && l < System.currentTimeMillis();
1517         }
1518 
1519         boolean localReset(final int code) throws IOException {
1520             ioSession.getLock().lock();
1521             try {
1522                 if (localEndStream) {
1523                     return false;
1524                 }
1525                 localEndStream = true;
1526                 deadline = System.currentTimeMillis() + LINGER_TIME;
1527                 if (!idle) {
1528                     final RawFrame resetStream = frameFactory.createResetStream(id, code);
1529                     commitFrameInternal(resetStream);
1530                     return true;
1531                 }
1532                 return false;
1533             } finally {
1534                 ioSession.getLock().unlock();
1535             }
1536         }
1537 
1538         boolean localReset(final H2Error error) throws IOException {
1539             return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1540         }
1541 
1542         @Override
1543         public boolean cancel() {
1544             try {
1545                 return localReset(H2Error.CANCEL);
1546             } catch (final IOException ignore) {
1547                 return false;
1548             }
1549         }
1550 
1551         void appendState(final StringBuilder buf) {
1552             buf.append("id=").append(id)
1553                     .append(", connState=").append(connState)
1554                     .append(", inputWindow=").append(inputWindow)
1555                     .append(", outputWindow=").append(outputWindow)
1556                     .append(", localEndStream=").append(localEndStream)
1557                     .append(", idle=").append(idle);
1558         }
1559 
1560         @Override
1561         public String toString() {
1562             final StringBuilder buf = new StringBuilder();
1563             buf.append("[");
1564             appendState(buf);
1565             buf.append("]");
1566             return buf.toString();
1567         }
1568 
1569     }
1570 
1571     static class H2Stream {
1572 
1573         private final H2StreamChannelImpl channel;
1574         private final H2StreamHandler handler;
1575         private final boolean remoteInitiated;
1576 
1577         private H2Stream(
1578                 final H2StreamChannelImpl channel,
1579                 final H2StreamHandler handler,
1580                 final boolean remoteInitiated) {
1581             this.channel = channel;
1582             this.handler = handler;
1583             this.remoteInitiated = remoteInitiated;
1584         }
1585 
1586         int getId() {
1587             return channel.getId();
1588         }
1589 
1590         boolean isRemoteInitiated() {
1591             return remoteInitiated;
1592         }
1593 
1594         AtomicInteger getOutputWindow() {
1595             return channel.getOutputWindow();
1596         }
1597 
1598         AtomicInteger getInputWindow() {
1599             return channel.getInputWindow();
1600         }
1601 
1602         boolean isTerminated() {
1603             return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1604         }
1605 
1606         boolean isRemoteClosed() {
1607             return channel.isRemoteClosed();
1608         }
1609 
1610         boolean isLocalClosed() {
1611             return channel.isLocalClosed();
1612         }
1613 
1614         boolean isLocalReset() {
1615             return channel.isLocalReset();
1616         }
1617 
1618         void setRemoteEndStream() {
1619             channel.setRemoteEndStream();
1620         }
1621 
1622         void consumePromise(final List<Header> headers) throws HttpException, IOException {
1623             try {
1624                 handler.consumePromise(headers);
1625                 channel.setLocalEndStream();
1626             } catch (final ProtocolException ex) {
1627                 localReset(ex, H2Error.PROTOCOL_ERROR);
1628             }
1629         }
1630 
1631         void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1632             try {
1633                 handler.consumeHeader(headers, channel.isRemoteClosed());
1634             } catch (final ProtocolException ex) {
1635                 localReset(ex, H2Error.PROTOCOL_ERROR);
1636             }
1637         }
1638 
1639         void consumeData(final ByteBuffer src) throws HttpException, IOException {
1640             try {
1641                 handler.consumeData(src, channel.isRemoteClosed());
1642             } catch (final CharacterCodingException ex) {
1643                 localReset(ex, H2Error.INTERNAL_ERROR);
1644             } catch (final ProtocolException ex) {
1645                 localReset(ex, H2Error.PROTOCOL_ERROR);
1646             }
1647         }
1648 
1649         boolean isOutputReady() {
1650             return handler.isOutputReady();
1651         }
1652 
1653         void produceOutput() throws HttpException, IOException {
1654             try {
1655                 handler.produceOutput();
1656             } catch (final ProtocolException ex) {
1657                 localReset(ex, H2Error.PROTOCOL_ERROR);
1658             }
1659         }
1660 
1661         void produceInputCapacityUpdate() throws IOException {
1662             handler.updateInputCapacity();
1663         }
1664 
1665         void reset(final Exception cause) {
1666             channel.setRemoteEndStream();
1667             channel.setLocalEndStream();
1668             handler.failed(cause);
1669         }
1670 
1671         void localReset(final Exception cause, final int code) throws IOException {
1672             channel.localReset(code);
1673             handler.failed(cause);
1674         }
1675 
1676         void localReset(final Exception cause, final H2Error error) throws IOException {
1677             localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1678         }
1679 
1680         void localReset(final H2StreamResetException ex) throws IOException {
1681             localReset(ex, ex.getCode());
1682         }
1683 
1684         void handle(final HttpException ex) throws IOException, HttpException {
1685             handler.handle(ex, channel.isRemoteClosed());
1686         }
1687 
1688         HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1689             return handler.getPushHandlerFactory();
1690         }
1691 
1692         void cancel() {
1693             reset(new RequestNotExecutedException());
1694         }
1695 
1696         boolean abort() {
1697             final boolean cancelled = channel.cancel();
1698             handler.failed(new RequestNotExecutedException());
1699             return cancelled;
1700         }
1701 
1702         void releaseResources() {
1703             handler.releaseResources();
1704         }
1705 
1706         void appendState(final StringBuilder buf) {
1707             buf.append("channel=[");
1708             channel.appendState(buf);
1709             buf.append("]");
1710         }
1711 
1712         @Override
1713         public String toString() {
1714             final StringBuilder buf = new StringBuilder();
1715             buf.append("[");
1716             appendState(buf);
1717             buf.append("]");
1718             return buf.toString();
1719         }
1720 
1721     }
1722 
1723 }