1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.net.SocketAddress;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.SelectionKey;
33 import java.nio.charset.CharacterCodingException;
34 import java.util.Deque;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Queue;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentLinkedDeque;
41 import java.util.concurrent.ConcurrentLinkedQueue;
42 import java.util.concurrent.atomic.AtomicInteger;
43
44 import javax.net.ssl.SSLSession;
45
46 import org.apache.hc.core5.concurrent.Cancellable;
47 import org.apache.hc.core5.concurrent.CancellableDependency;
48 import org.apache.hc.core5.http.ConnectionClosedException;
49 import org.apache.hc.core5.http.EndpointDetails;
50 import org.apache.hc.core5.http.Header;
51 import org.apache.hc.core5.http.HttpConnection;
52 import org.apache.hc.core5.http.HttpException;
53 import org.apache.hc.core5.http.HttpStreamResetException;
54 import org.apache.hc.core5.http.HttpVersion;
55 import org.apache.hc.core5.http.ProtocolException;
56 import org.apache.hc.core5.http.ProtocolVersion;
57 import org.apache.hc.core5.http.RequestNotExecutedException;
58 import org.apache.hc.core5.http.config.CharCodingConfig;
59 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
60 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
61 import org.apache.hc.core5.http.impl.CharCodingSupport;
62 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
63 import org.apache.hc.core5.http.nio.AsyncPushProducer;
64 import org.apache.hc.core5.http.nio.HandlerFactory;
65 import org.apache.hc.core5.http.nio.command.ExecutableCommand;
66 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
67 import org.apache.hc.core5.http.protocol.HttpCoreContext;
68 import org.apache.hc.core5.http.protocol.HttpProcessor;
69 import org.apache.hc.core5.http2.H2ConnectionException;
70 import org.apache.hc.core5.http2.H2Error;
71 import org.apache.hc.core5.http2.H2StreamResetException;
72 import org.apache.hc.core5.http2.config.H2Config;
73 import org.apache.hc.core5.http2.config.H2Param;
74 import org.apache.hc.core5.http2.config.H2Setting;
75 import org.apache.hc.core5.http2.frame.FrameFactory;
76 import org.apache.hc.core5.http2.frame.FrameFlag;
77 import org.apache.hc.core5.http2.frame.FrameType;
78 import org.apache.hc.core5.http2.frame.RawFrame;
79 import org.apache.hc.core5.http2.frame.StreamIdGenerator;
80 import org.apache.hc.core5.http2.hpack.HPackDecoder;
81 import org.apache.hc.core5.http2.hpack.HPackEncoder;
82 import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
83 import org.apache.hc.core5.http2.nio.AsyncPingHandler;
84 import org.apache.hc.core5.http2.nio.command.PingCommand;
85 import org.apache.hc.core5.io.CloseMode;
86 import org.apache.hc.core5.reactor.Command;
87 import org.apache.hc.core5.reactor.ProtocolIOSession;
88 import org.apache.hc.core5.reactor.ssl.TlsDetails;
89 import org.apache.hc.core5.util.Args;
90 import org.apache.hc.core5.util.ByteArrayBuffer;
91 import org.apache.hc.core5.util.Identifiable;
92 import org.apache.hc.core5.util.Timeout;
93
94 abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
95
96 private static final long LINGER_TIME = 1000;
97 private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024;
98
99 enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
100 enum SettingsHandshake { READY, TRANSMITTED, ACKED }
101
102 private final ProtocolIOSession ioSession;
103 private final FrameFactory frameFactory;
104 private final StreamIdGenerator idGenerator;
105 private final HttpProcessor httpProcessor;
106 private final H2Config localConfig;
107 private final BasicH2TransportMetrics inputMetrics;
108 private final BasicH2TransportMetrics outputMetrics;
109 private final BasicHttpConnectionMetrics connMetrics;
110 private final FrameInputBuffer inputBuffer;
111 private final FrameOutputBuffer outputBuffer;
112 private final Deque<RawFrame> outputQueue;
113 private final HPackEncoder hPackEncoder;
114 private final HPackDecoder hPackDecoder;
115 private final Map<Integer, H2Stream> streamMap;
116 private final Queue<AsyncPingHandler> pingHandlers;
117 private final AtomicInteger connInputWindow;
118 private final AtomicInteger connOutputWindow;
119 private final AtomicInteger outputRequests;
120 private final AtomicInteger lastStreamId;
121 private final H2StreamListener streamListener;
122
123 private ConnectionHandshake connState = ConnectionHandshake.READY;
124 private SettingsHandshake localSettingState = SettingsHandshake.READY;
125 private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
126
127 private int initInputWinSize;
128 private int initOutputWinSize;
129 private int lowMark;
130
131 private volatile H2Config remoteConfig;
132
133 private Continuation continuation;
134
135 private int processedRemoteStreamId;
136 private EndpointDetails endpointDetails;
137 private boolean goAwayReceived;
138
139 AbstractH2StreamMultiplexer(
140 final ProtocolIOSession ioSession,
141 final FrameFactory frameFactory,
142 final StreamIdGenerator idGenerator,
143 final HttpProcessor httpProcessor,
144 final CharCodingConfig charCodingConfig,
145 final H2Config h2Config,
146 final H2StreamListener streamListener) {
147 this.ioSession = Args.notNull(ioSession, "IO session");
148 this.frameFactory = Args.notNull(frameFactory, "Frame factory");
149 this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
150 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
151 this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
152 this.inputMetrics = new BasicH2TransportMetrics();
153 this.outputMetrics = new BasicH2TransportMetrics();
154 this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
155 this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
156 this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
157 this.outputQueue = new ConcurrentLinkedDeque<>();
158 this.pingHandlers = new ConcurrentLinkedQueue<>();
159 this.outputRequests = new AtomicInteger(0);
160 this.lastStreamId = new AtomicInteger(0);
161 this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
162 this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
163 this.streamMap = new ConcurrentHashMap<>();
164 this.remoteConfig = H2Config.INIT;
165 this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
166 this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
167
168 this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
169 this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
170
171 this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
172 this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
173 this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
174
175 this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
176 this.streamListener = streamListener;
177 }
178
179 @Override
180 public String getId() {
181 return ioSession.getId();
182 }
183
184 abstract void acceptHeaderFrame() throws H2ConnectionException;
185
186 abstract void acceptPushRequest() throws H2ConnectionException;
187
188 abstract void acceptPushFrame() throws H2ConnectionException;
189
190 abstract H2StreamHandler createRemotelyInitiatedStream(
191 H2StreamChannel channel,
192 HttpProcessor httpProcessor,
193 BasicHttpConnectionMetrics connMetrics,
194 HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
195
196 abstract H2StreamHandler createLocallyInitiatedStream(
197 ExecutableCommand command,
198 H2StreamChannel channel,
199 HttpProcessor httpProcessor,
200 BasicHttpConnectionMetrics connMetrics) throws IOException;
201
202 private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
203 for (;;) {
204 final int current = window.get();
205 long newValue = (long) current + delta;
206
207
208
209 if (newValue == 0x80000000L) {
210 newValue = Integer.MAX_VALUE;
211 }
212
213
214 if (Math.abs(newValue) > 0x7fffffffL) {
215 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
216 }
217 if (window.compareAndSet(current, (int) newValue)) {
218 return (int) newValue;
219 }
220 }
221 }
222
223 private int updateInputWindow(
224 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
225 final int newSize = updateWindow(window, delta);
226 if (streamListener != null) {
227 streamListener.onInputFlowControl(this, streamId, delta, newSize);
228 }
229 return newSize;
230 }
231
232 private int updateOutputWindow(
233 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
234 final int newSize = updateWindow(window, delta);
235 if (streamListener != null) {
236 streamListener.onOutputFlowControl(this, streamId, delta, newSize);
237 }
238 return newSize;
239 }
240
241 private void commitFrameInternal(final RawFrame frame) throws IOException {
242 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
243 if (streamListener != null) {
244 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
245 }
246 outputBuffer.write(frame, ioSession);
247 } else {
248 outputQueue.addLast(frame);
249 }
250 ioSession.setEvent(SelectionKey.OP_WRITE);
251 }
252
253 private void commitFrame(final RawFrame frame) throws IOException {
254 Args.notNull(frame, "Frame");
255 ioSession.getLock().lock();
256 try {
257 commitFrameInternal(frame);
258 } finally {
259 ioSession.getLock().unlock();
260 }
261 }
262
263 private void commitHeaders(
264 final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
265 if (streamListener != null) {
266 streamListener.onHeaderOutput(this, streamId, headers);
267 }
268 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
269 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
270
271 int off = 0;
272 int remaining = buf.length();
273 boolean continuation = false;
274
275 while (remaining > 0) {
276 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
277 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
278
279 remaining -= chunk;
280 off += chunk;
281
282 final boolean endHeaders = remaining == 0;
283 final RawFrame frame;
284 if (!continuation) {
285 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
286 continuation = true;
287 } else {
288 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
289 }
290 commitFrameInternal(frame);
291 }
292 }
293
294 private void commitPushPromise(
295 final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
296 if (headers == null || headers.isEmpty()) {
297 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
298 }
299 if (streamListener != null) {
300 streamListener.onHeaderOutput(this, streamId, headers);
301 }
302 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
303 buf.append((byte)(promisedStreamId >> 24));
304 buf.append((byte)(promisedStreamId >> 16));
305 buf.append((byte)(promisedStreamId >> 8));
306 buf.append((byte)(promisedStreamId));
307
308 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
309
310 int off = 0;
311 int remaining = buf.length();
312 boolean continuation = false;
313
314 while (remaining > 0) {
315 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
316 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
317
318 remaining -= chunk;
319 off += chunk;
320
321 final boolean endHeaders = remaining == 0;
322 final RawFrame frame;
323 if (!continuation) {
324 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
325 continuation = true;
326 } else {
327 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
328 }
329 commitFrameInternal(frame);
330 }
331 }
332
333 private void streamDataFrame(
334 final int streamId,
335 final AtomicInteger streamOutputWindow,
336 final ByteBuffer payload,
337 final int chunk) throws IOException {
338 final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
339 if (streamListener != null) {
340 streamListener.onFrameOutput(this, streamId, dataFrame);
341 }
342 updateOutputWindow(0, connOutputWindow, -chunk);
343 updateOutputWindow(streamId, streamOutputWindow, -chunk);
344 outputBuffer.write(dataFrame, ioSession);
345 }
346
347 private int streamData(
348 final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
349 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
350 final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
351 if (capacity <= 0) {
352 return 0;
353 }
354 final int maxPayloadSize = Math.min(capacity, remoteConfig.getMaxFrameSize());
355 final int chunk;
356 if (payload.remaining() <= maxPayloadSize) {
357 chunk = payload.remaining();
358 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
359 } else {
360 chunk = maxPayloadSize;
361 final int originalLimit = payload.limit();
362 try {
363 payload.limit(payload.position() + chunk);
364 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
365 } finally {
366 payload.limit(originalLimit);
367 }
368 }
369 payload.position(payload.position() + chunk);
370 ioSession.setEvent(SelectionKey.OP_WRITE);
371 return chunk;
372 }
373 return 0;
374 }
375
376 private void incrementInputCapacity(
377 final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
378 if (inputCapacity > 0) {
379 final int streamWinSize = inputWindow.get();
380 final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
381 final int chunk = Math.min(inputCapacity, remainingCapacity);
382 if (chunk != 0) {
383 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
384 commitFrame(windowUpdateFrame);
385 updateInputWindow(streamId, inputWindow, chunk);
386 }
387 }
388 }
389
390 private void requestSessionOutput() {
391 outputRequests.incrementAndGet();
392 ioSession.setEvent(SelectionKey.OP_WRITE);
393 }
394
395 private void updateLastStreamId(final int streamId) {
396 final int currentId = lastStreamId.get();
397 if (streamId > currentId) {
398 lastStreamId.compareAndSet(currentId, streamId);
399 }
400 }
401
402 private int generateStreamId() {
403 for (;;) {
404 final int currentId = lastStreamId.get();
405 final int newStreamId = idGenerator.generate(currentId);
406 if (lastStreamId.compareAndSet(currentId, newStreamId)) {
407 return newStreamId;
408 }
409 }
410 }
411
412 public final void onConnect() throws HttpException, IOException {
413 connState = ConnectionHandshake.ACTIVE;
414 final RawFrame settingsFrame = frameFactory.createSettings(
415 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
416 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
417 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
418 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
419 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
420 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
421
422 commitFrame(settingsFrame);
423 localSettingState = SettingsHandshake.TRANSMITTED;
424 maximizeConnWindow(connInputWindow.get());
425
426 if (streamListener != null) {
427 final int initInputWindow = connInputWindow.get();
428 streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
429 final int initOutputWindow = connOutputWindow.get();
430 streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
431 }
432 }
433
434 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
435 if (connState == ConnectionHandshake.SHUTDOWN) {
436 ioSession.clearEvent(SelectionKey.OP_READ);
437 } else {
438 for (;;) {
439 final RawFrame frame = inputBuffer.read(src, ioSession);
440 if (frame == null) {
441 break;
442 }
443 if (streamListener != null) {
444 streamListener.onFrameInput(this, frame.getStreamId(), frame);
445 }
446 consumeFrame(frame);
447 }
448 }
449 }
450
451 public final void onOutput() throws HttpException, IOException {
452 ioSession.getLock().lock();
453 try {
454 if (!outputBuffer.isEmpty()) {
455 outputBuffer.flush(ioSession);
456 }
457 while (outputBuffer.isEmpty()) {
458 final RawFrame frame = outputQueue.poll();
459 if (frame != null) {
460 if (streamListener != null) {
461 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
462 }
463 outputBuffer.write(frame, ioSession);
464 } else {
465 break;
466 }
467 }
468 } finally {
469 ioSession.getLock().unlock();
470 }
471
472 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
473
474 if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
475 produceOutput();
476 }
477 final int pendingOutputRequests = outputRequests.get();
478 boolean outputPending = false;
479 if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
480 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
481 final Map.Entry<Integer, H2Stream> entry = it.next();
482 final H2Stream stream = entry.getValue();
483 if (!stream.isLocalClosed()
484 && stream.getOutputWindow().get() > 0
485 && stream.isOutputReady()) {
486 outputPending = true;
487 break;
488 }
489 }
490 }
491 ioSession.getLock().lock();
492 try {
493 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
494 && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
495 ioSession.clearEvent(SelectionKey.OP_WRITE);
496 } else {
497 outputRequests.addAndGet(-pendingOutputRequests);
498 }
499 } finally {
500 ioSession.getLock().unlock();
501 }
502 }
503
504 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
505 processPendingCommands();
506 }
507 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
508 int liveStreams = 0;
509 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
510 final Map.Entry<Integer, H2Stream> entry = it.next();
511 final H2Stream stream = entry.getValue();
512 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
513 stream.releaseResources();
514 it.remove();
515 } else {
516 if (idGenerator.isSameSide(stream.getId()) || stream.getId() <= processedRemoteStreamId) {
517 liveStreams++;
518 }
519 }
520 }
521 if (liveStreams == 0) {
522 connState = ConnectionHandshake.SHUTDOWN;
523 }
524 }
525 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
526 if (!streamMap.isEmpty()) {
527 for (final H2Stream stream : streamMap.values()) {
528 stream.releaseResources();
529 }
530 streamMap.clear();
531 }
532 ioSession.getLock().lock();
533 try {
534 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
535 ioSession.close();
536 }
537 } finally {
538 ioSession.getLock().unlock();
539 }
540 }
541 }
542
543 public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
544 connState = ConnectionHandshake.SHUTDOWN;
545
546 final RawFrame goAway;
547 if (localSettingState != SettingsHandshake.ACKED) {
548 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
549 "Setting timeout (" + timeout + ")");
550 } else {
551 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
552 "Timeout due to inactivity (" + timeout + ")");
553 }
554 commitFrame(goAway);
555 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
556 final Map.Entry<Integer, H2Stream> entry = it.next();
557 final H2Stream stream = entry.getValue();
558 stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
559 }
560 streamMap.clear();
561 }
562
563 public final void onDisconnect() {
564 for (;;) {
565 final AsyncPingHandler pingHandler = pingHandlers.poll();
566 if (pingHandler != null) {
567 pingHandler.cancel();
568 } else {
569 break;
570 }
571 }
572 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
573 final Map.Entry<Integer, H2Stream> entry = it.next();
574 final H2Stream stream = entry.getValue();
575 stream.cancel();
576 }
577 for (;;) {
578 final Command command = ioSession.poll();
579 if (command != null) {
580 if (command instanceof ExecutableCommand) {
581 ((ExecutableCommand) command).failed(new ConnectionClosedException());
582 } else {
583 command.cancel();
584 }
585 } else {
586 break;
587 }
588 }
589 }
590
591 private void processPendingCommands() throws IOException, HttpException {
592 while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
593 final Command command = ioSession.poll();
594 if (command == null) {
595 break;
596 }
597 if (command instanceof ShutdownCommand) {
598 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
599 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
600 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
601 final Map.Entry<Integer, H2Stream> entry = it.next();
602 final H2Stream stream = entry.getValue();
603 stream.cancel();
604 }
605 streamMap.clear();
606 connState = ConnectionHandshake.SHUTDOWN;
607 } else {
608 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
609 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
610 commitFrame(goAway);
611 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
612 }
613 }
614 break;
615 } else if (command instanceof PingCommand) {
616 final PingCommand./../../../org/apache/hc/core5/http2/nio/command/PingCommand.html#PingCommand">PingCommand pingCommand = (PingCommand) command;
617 final AsyncPingHandler handler = pingCommand.getHandler();
618 pingHandlers.add(handler);
619 final RawFrame ping = frameFactory.createPing(handler.getData());
620 commitFrame(ping);
621 } else if (command instanceof ExecutableCommand) {
622 final int streamId = generateStreamId();
623 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
624 streamId, true, initInputWinSize, initOutputWinSize);
625 final ExecutableCommand executableCommand = (ExecutableCommand) command;
626 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
627 executableCommand, channel, httpProcessor, connMetrics);
628
629 final H2Stream stream = new H2Stream(channel, streamHandler, false);
630 streamMap.put(streamId, stream);
631
632 if (streamListener != null) {
633 final int initInputWindow = stream.getInputWindow().get();
634 streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
635 final int initOutputWindow = stream.getOutputWindow().get();
636 streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
637 }
638
639 if (stream.isOutputReady()) {
640 stream.produceOutput();
641 }
642 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
643 if (cancellableDependency != null) {
644 cancellableDependency.setDependency(new Cancellable() {
645
646 @Override
647 public boolean cancel() {
648 return stream.abort();
649 }
650
651 });
652 }
653 if (!outputQueue.isEmpty()) {
654 return;
655 }
656 }
657 }
658 }
659
660 public final void onException(final Exception cause) {
661 try {
662 for (;;) {
663 final AsyncPingHandler pingHandler = pingHandlers.poll();
664 if (pingHandler != null) {
665 pingHandler.failed(cause);
666 } else {
667 break;
668 }
669 }
670 for (;;) {
671 final Command command = ioSession.poll();
672 if (command != null) {
673 if (command instanceof ExecutableCommand) {
674 ((ExecutableCommand) command).failed(new ConnectionClosedException());
675 } else {
676 command.cancel();
677 }
678 } else {
679 break;
680 }
681 }
682 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
683 final Map.Entry<Integer, H2Stream> entry = it.next();
684 final H2Stream stream = entry.getValue();
685 stream.reset(cause);
686 }
687 streamMap.clear();
688 if (!(cause instanceof ConnectionClosedException)) {
689 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
690 final H2Error errorCode;
691 if (cause instanceof H2ConnectionException) {
692 errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
693 } else if (cause instanceof ProtocolException){
694 errorCode = H2Error.PROTOCOL_ERROR;
695 } else {
696 errorCode = H2Error.INTERNAL_ERROR;
697 }
698 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
699 commitFrame(goAway);
700 }
701 }
702 } catch (final IOException ignore) {
703 } finally {
704 connState = ConnectionHandshake.SHUTDOWN;
705 final CloseMode closeMode;
706 if (cause instanceof ConnectionClosedException) {
707 closeMode = CloseMode.GRACEFUL;
708 } else if (cause instanceof IOException) {
709 closeMode = CloseMode.IMMEDIATE;
710 } else {
711 closeMode = CloseMode.GRACEFUL;
712 }
713 ioSession.close(closeMode);
714 }
715 }
716
717 private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
718 if (streamId == 0) {
719 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
720 }
721 final H2Stream stream = streamMap.get(streamId);
722 if (stream == null) {
723 if (streamId <= lastStreamId.get()) {
724 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
725 } else {
726 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
727 }
728 }
729 return stream;
730 }
731
732 private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
733 final FrameType frameType = FrameType.valueOf(frame.getType());
734 final int streamId = frame.getStreamId();
735 if (continuation != null && frameType != FrameType.CONTINUATION) {
736 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
737 }
738 switch (frameType) {
739 case DATA: {
740 final H2Stream stream = getValidStream(streamId);
741 try {
742 consumeDataFrame(frame, stream);
743 } catch (final H2StreamResetException ex) {
744 stream.localReset(ex);
745 } catch (final HttpStreamResetException ex) {
746 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
747 }
748
749 if (stream.isTerminated()) {
750 streamMap.remove(streamId);
751 stream.releaseResources();
752 requestSessionOutput();
753 }
754 }
755 break;
756 case HEADERS: {
757 if (streamId == 0) {
758 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
759 }
760 H2Stream stream = streamMap.get(streamId);
761 if (stream == null) {
762 acceptHeaderFrame();
763
764 if (idGenerator.isSameSide(streamId)) {
765 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
766 }
767 if (goAwayReceived ) {
768 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
769 }
770
771 updateLastStreamId(streamId);
772
773 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
774 streamId, false, initInputWinSize, initOutputWinSize);
775 final H2StreamHandler streamHandler;
776 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
777 streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null);
778 } else {
779 streamHandler = NoopH2StreamHandler.INSTANCE;
780 channel.setLocalEndStream();
781 }
782
783 stream = new H2Stream(channel, streamHandler, true);
784 if (stream.isOutputReady()) {
785 stream.produceOutput();
786 }
787 streamMap.put(streamId, stream);
788 }
789
790 try {
791 consumeHeaderFrame(frame, stream);
792
793 if (stream.isOutputReady()) {
794 stream.produceOutput();
795 }
796 } catch (final H2StreamResetException ex) {
797 stream.localReset(ex);
798 } catch (final HttpStreamResetException ex) {
799 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
800 } catch (final HttpException ex) {
801 stream.handle(ex);
802 }
803
804 if (stream.isTerminated()) {
805 streamMap.remove(streamId);
806 stream.releaseResources();
807 requestSessionOutput();
808 }
809 }
810 break;
811 case CONTINUATION: {
812 if (continuation == null) {
813 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
814 }
815 if (streamId != continuation.streamId) {
816 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
817 }
818
819 final H2Stream stream = getValidStream(streamId);
820 try {
821
822 consumeContinuationFrame(frame, stream);
823 } catch (final H2StreamResetException ex) {
824 stream.localReset(ex);
825 } catch (final HttpStreamResetException ex) {
826 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
827 }
828
829 if (stream.isTerminated()) {
830 streamMap.remove(streamId);
831 stream.releaseResources();
832 requestSessionOutput();
833 }
834 }
835 break;
836 case WINDOW_UPDATE: {
837 final ByteBuffer payload = frame.getPayload();
838 if (payload == null || payload.remaining() != 4) {
839 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
840 }
841 final int delta = payload.getInt();
842 if (delta <= 0) {
843 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
844 }
845 if (streamId == 0) {
846 try {
847 updateOutputWindow(0, connOutputWindow, delta);
848 } catch (final ArithmeticException ex) {
849 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
850 }
851 } else {
852 final H2Stream stream = streamMap.get(streamId);
853 if (stream != null) {
854 try {
855 updateOutputWindow(streamId, stream.getOutputWindow(), delta);
856 } catch (final ArithmeticException ex) {
857 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
858 }
859 }
860 }
861 ioSession.setEvent(SelectionKey.OP_WRITE);
862 }
863 break;
864 case RST_STREAM: {
865 if (streamId == 0) {
866 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
867 }
868 final H2Stream stream = streamMap.get(streamId);
869 if (stream == null) {
870 if (streamId > lastStreamId.get()) {
871 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
872 }
873 } else {
874 final ByteBuffer payload = frame.getPayload();
875 if (payload == null || payload.remaining() != 4) {
876 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
877 }
878 final int errorCode = payload.getInt();
879 stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
880 streamMap.remove(streamId);
881 stream.releaseResources();
882 requestSessionOutput();
883 }
884 }
885 break;
886 case PING: {
887 if (streamId != 0) {
888 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
889 }
890 final ByteBuffer ping = frame.getPayloadContent();
891 if (ping == null || ping.remaining() != 8) {
892 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
893 }
894 if (frame.isFlagSet(FrameFlag.ACK)) {
895 final AsyncPingHandler pingHandler = pingHandlers.poll();
896 if (pingHandler != null) {
897 pingHandler.consumeResponse(ping);
898 }
899 } else {
900 final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
901 pong.put(ping);
902 pong.flip();
903 final RawFrame response = frameFactory.createPingAck(pong);
904 commitFrame(response);
905 }
906 }
907 break;
908 case SETTINGS: {
909 if (streamId != 0) {
910 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
911 }
912 if (frame.isFlagSet(FrameFlag.ACK)) {
913 if (localSettingState == SettingsHandshake.TRANSMITTED) {
914 localSettingState = SettingsHandshake.ACKED;
915 ioSession.setEvent(SelectionKey.OP_WRITE);
916 applyLocalSettings();
917 }
918 } else {
919 final ByteBuffer payload = frame.getPayload();
920 if (payload != null) {
921 if ((payload.remaining() % 6) != 0) {
922 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
923 }
924 consumeSettingsFrame(payload);
925 remoteSettingState = SettingsHandshake.TRANSMITTED;
926 }
927
928 final RawFrame response = frameFactory.createSettingsAck();
929 commitFrame(response);
930 remoteSettingState = SettingsHandshake.ACKED;
931 }
932 }
933 break;
934 case PRIORITY:
935
936 break;
937 case PUSH_PROMISE: {
938 acceptPushFrame();
939
940 if (goAwayReceived ) {
941 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
942 }
943
944 if (!localConfig.isPushEnabled()) {
945 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
946 }
947
948 final H2Stream stream = getValidStream(streamId);
949 if (stream.isRemoteClosed()) {
950 stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
951 break;
952 }
953
954 final ByteBuffer payload = frame.getPayloadContent();
955 if (payload == null || payload.remaining() < 4) {
956 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
957 }
958 final int promisedStreamId = payload.getInt();
959 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
960 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
961 }
962 if (streamMap.get(promisedStreamId) != null) {
963 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
964 }
965
966 updateLastStreamId(promisedStreamId);
967
968 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
969 promisedStreamId, false, initInputWinSize, initOutputWinSize);
970 final H2StreamHandler streamHandler;
971 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
972 streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics,
973 stream.getPushHandlerFactory());
974 } else {
975 streamHandler = NoopH2StreamHandler.INSTANCE;
976 channel.setLocalEndStream();
977 }
978
979 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
980 streamMap.put(promisedStreamId, promisedStream);
981
982 try {
983 consumePushPromiseFrame(frame, payload, promisedStream);
984 } catch (final H2StreamResetException ex) {
985 promisedStream.localReset(ex);
986 } catch (final HttpStreamResetException ex) {
987 promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
988 }
989 }
990 break;
991 case GOAWAY: {
992 if (streamId != 0) {
993 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
994 }
995 final ByteBuffer payload = frame.getPayload();
996 if (payload == null || payload.remaining() < 8) {
997 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
998 }
999 final int processedLocalStreamId = payload.getInt();
1000 final int errorCode = payload.getInt();
1001 goAwayReceived = true;
1002 if (errorCode == H2Error.NO_ERROR.getCode()) {
1003 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
1004 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1005 final Map.Entry<Integer, H2Stream> entry = it.next();
1006 final int activeStreamId = entry.getKey();
1007 if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
1008 final H2Stream stream = entry.getValue();
1009 stream.cancel();
1010 it.remove();
1011 }
1012 }
1013 }
1014 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
1015 } else {
1016 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1017 final Map.Entry<Integer, H2Stream> entry = it.next();
1018 final H2Stream stream = entry.getValue();
1019 stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
1020 }
1021 streamMap.clear();
1022 connState = ConnectionHandshake.SHUTDOWN;
1023 }
1024 }
1025 ioSession.setEvent(SelectionKey.OP_WRITE);
1026 break;
1027 }
1028 }
1029
1030 private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1031 final int streamId = stream.getId();
1032 final ByteBuffer payload = frame.getPayloadContent();
1033 if (payload != null) {
1034 final int frameLength = frame.getLength();
1035 final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1036 if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1037 stream.produceInputCapacityUpdate();
1038 }
1039 final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1040 if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1041 maximizeConnWindow(connWinSize);
1042 }
1043 }
1044 if (stream.isRemoteClosed()) {
1045 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1046 }
1047 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1048 stream.setRemoteEndStream();
1049 }
1050 if (stream.isLocalReset()) {
1051 return;
1052 }
1053 stream.consumeData(payload);
1054 }
1055
1056 private void maximizeConnWindow(final int connWinSize) throws IOException {
1057 final int delta = Integer.MAX_VALUE - connWinSize;
1058 if (delta > 0) {
1059 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1060 commitFrame(windowUpdateFrame);
1061 updateInputWindow(0, connInputWindow, delta);
1062 }
1063 }
1064
1065 private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1066 final int promisedStreamId = promisedStream.getId();
1067 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1068 continuation = new Continuation(promisedStreamId, frame.getType(), true);
1069 }
1070 if (continuation == null) {
1071 final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1072 if (promisedStreamId > processedRemoteStreamId) {
1073 processedRemoteStreamId = promisedStreamId;
1074 }
1075 if (streamListener != null) {
1076 streamListener.onHeaderInput(this, promisedStreamId, headers);
1077 }
1078 promisedStream.consumePromise(headers);
1079 } else {
1080 continuation.copyPayload(payload);
1081 }
1082 }
1083
1084 List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1085 return hPackDecoder.decodeHeaders(payload);
1086 }
1087
1088 private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1089 final int streamId = stream.getId();
1090 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1091 continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1092 }
1093 final ByteBuffer payload = frame.getPayloadContent();
1094 if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1095
1096 payload.getInt();
1097 payload.get();
1098 }
1099 if (continuation == null) {
1100 final List<Header> headers = decodeHeaders(payload);
1101 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1102 processedRemoteStreamId = streamId;
1103 }
1104 if (streamListener != null) {
1105 streamListener.onHeaderInput(this, streamId, headers);
1106 }
1107 if (stream.isRemoteClosed()) {
1108 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1109 }
1110 if (stream.isLocalReset()) {
1111 return;
1112 }
1113 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1114 stream.setRemoteEndStream();
1115 }
1116 stream.consumeHeader(headers);
1117 } else {
1118 continuation.copyPayload(payload);
1119 }
1120 }
1121
1122 private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1123 final int streamId = frame.getStreamId();
1124 final ByteBuffer payload = frame.getPayload();
1125 continuation.copyPayload(payload);
1126 if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1127 final List<Header> headers = decodeHeaders(continuation.getContent());
1128 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1129 processedRemoteStreamId = streamId;
1130 }
1131 if (streamListener != null) {
1132 streamListener.onHeaderInput(this, streamId, headers);
1133 }
1134 if (stream.isRemoteClosed()) {
1135 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1136 }
1137 if (stream.isLocalReset()) {
1138 return;
1139 }
1140 if (continuation.endStream) {
1141 stream.setRemoteEndStream();
1142 }
1143 if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1144 stream.consumePromise(headers);
1145 } else {
1146 stream.consumeHeader(headers);
1147 }
1148 continuation = null;
1149 }
1150 }
1151
1152 private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1153 final H2Config.Builder configBuilder = H2Config.initial();
1154 while (payload.hasRemaining()) {
1155 final int code = payload.getShort();
1156 final int value = payload.getInt();
1157 final H2Param param = H2Param.valueOf(code);
1158 if (param != null) {
1159 switch (param) {
1160 case HEADER_TABLE_SIZE:
1161 try {
1162 configBuilder.setHeaderTableSize(value);
1163 } catch (final IllegalArgumentException ex) {
1164 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1165 }
1166 break;
1167 case MAX_CONCURRENT_STREAMS:
1168 try {
1169 configBuilder.setMaxConcurrentStreams(value);
1170 } catch (final IllegalArgumentException ex) {
1171 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1172 }
1173 break;
1174 case ENABLE_PUSH:
1175 configBuilder.setPushEnabled(value == 1);
1176 break;
1177 case INITIAL_WINDOW_SIZE:
1178 try {
1179 configBuilder.setInitialWindowSize(value);
1180 } catch (final IllegalArgumentException ex) {
1181 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1182 }
1183 break;
1184 case MAX_FRAME_SIZE:
1185 try {
1186 configBuilder.setMaxFrameSize(value);
1187 } catch (final IllegalArgumentException ex) {
1188 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1189 }
1190 break;
1191 case MAX_HEADER_LIST_SIZE:
1192 try {
1193 configBuilder.setMaxHeaderListSize(value);
1194 } catch (final IllegalArgumentException ex) {
1195 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1196 }
1197 break;
1198 }
1199 }
1200 }
1201 applyRemoteSettings(configBuilder.build());
1202 }
1203
1204 private void produceOutput() throws HttpException, IOException {
1205 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1206 final Map.Entry<Integer, H2Stream> entry = it.next();
1207 final H2Stream stream = entry.getValue();
1208 if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1209 stream.produceOutput();
1210 }
1211 if (stream.isTerminated()) {
1212 it.remove();
1213 stream.releaseResources();
1214 requestSessionOutput();
1215 }
1216 if (!outputQueue.isEmpty()) {
1217 break;
1218 }
1219 }
1220 }
1221
1222 private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1223 remoteConfig = config;
1224
1225 hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1226 final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1227 initOutputWinSize = remoteConfig.getInitialWindowSize();
1228
1229 final int maxFrameSize = remoteConfig.getMaxFrameSize();
1230 if (maxFrameSize > localConfig.getMaxFrameSize()) {
1231 outputBuffer.expand(maxFrameSize);
1232 }
1233
1234 if (delta != 0) {
1235 if (!streamMap.isEmpty()) {
1236 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1237 final Map.Entry<Integer, H2Stream> entry = it.next();
1238 final H2Stream stream = entry.getValue();
1239 try {
1240 updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1241 } catch (final ArithmeticException ex) {
1242 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1243 }
1244 }
1245 }
1246 }
1247 }
1248
1249 private void applyLocalSettings() throws H2ConnectionException {
1250 hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1251 hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1252
1253 final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1254 initInputWinSize = localConfig.getInitialWindowSize();
1255
1256 if (delta != 0 && !streamMap.isEmpty()) {
1257 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1258 final Map.Entry<Integer, H2Stream> entry = it.next();
1259 final H2Stream stream = entry.getValue();
1260 try {
1261 updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1262 } catch (final ArithmeticException ex) {
1263 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1264 }
1265 }
1266 }
1267 lowMark = initInputWinSize / 2;
1268 }
1269
1270 @Override
1271 public void close() throws IOException {
1272 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1273 }
1274
1275 @Override
1276 public void close(final CloseMode closeMode) {
1277 ioSession.close(closeMode);
1278 }
1279
1280 @Override
1281 public boolean isOpen() {
1282 return connState == ConnectionHandshake.ACTIVE;
1283 }
1284
1285 @Override
1286 public void setSocketTimeout(final Timeout timeout) {
1287 ioSession.setSocketTimeout(timeout);
1288 }
1289
1290 @Override
1291 public SSLSession getSSLSession() {
1292 final TlsDetails tlsDetails = ioSession.getTlsDetails();
1293 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1294 }
1295
1296 @Override
1297 public EndpointDetails getEndpointDetails() {
1298 if (endpointDetails == null) {
1299 endpointDetails = new BasicEndpointDetails(
1300 ioSession.getRemoteAddress(),
1301 ioSession.getLocalAddress(),
1302 connMetrics,
1303 ioSession.getSocketTimeout());
1304 }
1305 return endpointDetails;
1306 }
1307
1308 @Override
1309 public Timeout getSocketTimeout() {
1310 return ioSession.getSocketTimeout();
1311 }
1312
1313 @Override
1314 public ProtocolVersion getProtocolVersion() {
1315 return HttpVersion.HTTP_2;
1316 }
1317
1318 @Override
1319 public SocketAddress getRemoteAddress() {
1320 return ioSession.getRemoteAddress();
1321 }
1322
1323 @Override
1324 public SocketAddress getLocalAddress() {
1325 return ioSession.getLocalAddress();
1326 }
1327
1328 void appendState(final StringBuilder buf) {
1329 buf.append("connState=").append(connState)
1330 .append(", connInputWindow=").append(connInputWindow)
1331 .append(", connOutputWindow=").append(connOutputWindow)
1332 .append(", outputQueue=").append(outputQueue.size())
1333 .append(", streamMap=").append(streamMap.size())
1334 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1335 }
1336
1337 private static class Continuation {
1338
1339 final int streamId;
1340 final int type;
1341 final boolean endStream;
1342 final ByteArrayBuffer headerBuffer;
1343
1344 private Continuation(final int streamId, final int type, final boolean endStream) {
1345 this.streamId = streamId;
1346 this.type = type;
1347 this.endStream = endStream;
1348 this.headerBuffer = new ByteArrayBuffer(1024);
1349 }
1350
1351 void copyPayload(final ByteBuffer payload) {
1352 if (payload == null) {
1353 return;
1354 }
1355 headerBuffer.ensureCapacity(payload.remaining());
1356 payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1357 }
1358
1359 ByteBuffer getContent() {
1360 return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1361 }
1362
1363 }
1364
1365 private class H2StreamChannelImpl implements H2StreamChannel {
1366
1367 private final int id;
1368 private final AtomicInteger inputWindow;
1369 private final AtomicInteger outputWindow;
1370
1371 private volatile boolean idle;
1372 private volatile boolean remoteEndStream;
1373 private volatile boolean localEndStream;
1374
1375 private volatile long deadline;
1376
1377 H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1378 this.id = id;
1379 this.idle = idle;
1380 this.inputWindow = new AtomicInteger(initialInputWindowSize);
1381 this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1382 }
1383
1384 int getId() {
1385 return id;
1386 }
1387
1388 AtomicInteger getOutputWindow() {
1389 return outputWindow;
1390 }
1391
1392 AtomicInteger getInputWindow() {
1393 return inputWindow;
1394 }
1395
1396 @Override
1397 public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1398 ioSession.getLock().lock();
1399 try {
1400 if (headers == null || headers.isEmpty()) {
1401 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1402 }
1403 if (localEndStream) {
1404 return;
1405 }
1406 idle = false;
1407 commitHeaders(id, headers, endStream);
1408 if (endStream) {
1409 localEndStream = true;
1410 }
1411 } finally {
1412 ioSession.getLock().unlock();
1413 }
1414 }
1415
1416 @Override
1417 public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1418 acceptPushRequest();
1419 final int promisedStreamId = generateStreamId();
1420 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1421 promisedStreamId,
1422 true,
1423 localConfig.getInitialWindowSize(),
1424 remoteConfig.getInitialWindowSize());
1425 final HttpCoreContext context = HttpCoreContext.create();
1426 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1427 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1428 final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1429 channel, httpProcessor, connMetrics, pushProducer, context);
1430 final H2Stream stream = new H2Stream(channel, streamHandler, false);
1431 streamMap.put(promisedStreamId, stream);
1432
1433 ioSession.getLock().lock();
1434 try {
1435 if (localEndStream) {
1436 stream.releaseResources();
1437 return;
1438 }
1439 commitPushPromise(id, promisedStreamId, headers);
1440 idle = false;
1441 } finally {
1442 ioSession.getLock().unlock();
1443 }
1444 }
1445
1446 @Override
1447 public void update(final int increment) throws IOException {
1448 if (remoteEndStream) {
1449 return;
1450 }
1451 incrementInputCapacity(0, connInputWindow, increment);
1452 incrementInputCapacity(id, inputWindow, increment);
1453 }
1454
1455 @Override
1456 public int write(final ByteBuffer payload) throws IOException {
1457 ioSession.getLock().lock();
1458 try {
1459 if (localEndStream) {
1460 return 0;
1461 }
1462 return streamData(id, outputWindow, payload);
1463 } finally {
1464 ioSession.getLock().unlock();
1465 }
1466 }
1467
1468 @Override
1469 public void endStream(final List<? extends Header> trailers) throws IOException {
1470 ioSession.getLock().lock();
1471 try {
1472 if (localEndStream) {
1473 return;
1474 }
1475 localEndStream = true;
1476 if (trailers != null && !trailers.isEmpty()) {
1477 commitHeaders(id, trailers, true);
1478 } else {
1479 final RawFrame frame = frameFactory.createData(id, null, true);
1480 commitFrameInternal(frame);
1481 }
1482 } finally {
1483 ioSession.getLock().unlock();
1484 }
1485 }
1486
1487 @Override
1488 public void endStream() throws IOException {
1489 endStream(null);
1490 }
1491
1492 @Override
1493 public void requestOutput() {
1494 requestSessionOutput();
1495 }
1496
1497 boolean isRemoteClosed() {
1498 return remoteEndStream;
1499 }
1500
1501 void setRemoteEndStream() {
1502 remoteEndStream = true;
1503 }
1504
1505 boolean isLocalClosed() {
1506 return localEndStream;
1507 }
1508
1509 void setLocalEndStream() {
1510 localEndStream = true;
1511 }
1512
1513 boolean isLocalReset() {
1514 return deadline > 0;
1515 }
1516
1517 boolean isResetDeadline() {
1518 final long l = deadline;
1519 return l > 0 && l < System.currentTimeMillis();
1520 }
1521
1522 boolean localReset(final int code) throws IOException {
1523 ioSession.getLock().lock();
1524 try {
1525 if (localEndStream) {
1526 return false;
1527 }
1528 localEndStream = true;
1529 deadline = System.currentTimeMillis() + LINGER_TIME;
1530 if (!idle) {
1531 final RawFrame resetStream = frameFactory.createResetStream(id, code);
1532 commitFrameInternal(resetStream);
1533 return true;
1534 }
1535 return false;
1536 } finally {
1537 ioSession.getLock().unlock();
1538 }
1539 }
1540
1541 boolean localReset(final H2Error error) throws IOException {
1542 return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1543 }
1544
1545 @Override
1546 public boolean cancel() {
1547 try {
1548 return localReset(H2Error.CANCEL);
1549 } catch (final IOException ignore) {
1550 return false;
1551 }
1552 }
1553
1554 void appendState(final StringBuilder buf) {
1555 buf.append("id=").append(id)
1556 .append(", connState=").append(connState)
1557 .append(", inputWindow=").append(inputWindow)
1558 .append(", outputWindow=").append(outputWindow)
1559 .append(", localEndStream=").append(localEndStream)
1560 .append(", idle=").append(idle);
1561 }
1562
1563 @Override
1564 public String toString() {
1565 final StringBuilder buf = new StringBuilder();
1566 buf.append("[");
1567 appendState(buf);
1568 buf.append("]");
1569 return buf.toString();
1570 }
1571
1572 }
1573
1574 static class H2Stream {
1575
1576 private final H2StreamChannelImpl channel;
1577 private final H2StreamHandler handler;
1578 private final boolean remoteInitiated;
1579
1580 private H2Stream(
1581 final H2StreamChannelImpl channel,
1582 final H2StreamHandler handler,
1583 final boolean remoteInitiated) {
1584 this.channel = channel;
1585 this.handler = handler;
1586 this.remoteInitiated = remoteInitiated;
1587 }
1588
1589 int getId() {
1590 return channel.getId();
1591 }
1592
1593 boolean isRemoteInitiated() {
1594 return remoteInitiated;
1595 }
1596
1597 AtomicInteger getOutputWindow() {
1598 return channel.getOutputWindow();
1599 }
1600
1601 AtomicInteger getInputWindow() {
1602 return channel.getInputWindow();
1603 }
1604
1605 boolean isTerminated() {
1606 return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1607 }
1608
1609 boolean isRemoteClosed() {
1610 return channel.isRemoteClosed();
1611 }
1612
1613 boolean isLocalClosed() {
1614 return channel.isLocalClosed();
1615 }
1616
1617 boolean isLocalReset() {
1618 return channel.isLocalReset();
1619 }
1620
1621 void setRemoteEndStream() {
1622 channel.setRemoteEndStream();
1623 }
1624
1625 void consumePromise(final List<Header> headers) throws HttpException, IOException {
1626 try {
1627 handler.consumePromise(headers);
1628 channel.setLocalEndStream();
1629 } catch (final ProtocolException ex) {
1630 localReset(ex, H2Error.PROTOCOL_ERROR);
1631 }
1632 }
1633
1634 void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1635 try {
1636 handler.consumeHeader(headers, channel.isRemoteClosed());
1637 } catch (final ProtocolException ex) {
1638 localReset(ex, H2Error.PROTOCOL_ERROR);
1639 }
1640 }
1641
1642 void consumeData(final ByteBuffer src) throws HttpException, IOException {
1643 try {
1644 handler.consumeData(src, channel.isRemoteClosed());
1645 } catch (final CharacterCodingException ex) {
1646 localReset(ex, H2Error.INTERNAL_ERROR);
1647 } catch (final ProtocolException ex) {
1648 localReset(ex, H2Error.PROTOCOL_ERROR);
1649 }
1650 }
1651
1652 boolean isOutputReady() {
1653 return handler.isOutputReady();
1654 }
1655
1656 void produceOutput() throws HttpException, IOException {
1657 try {
1658 handler.produceOutput();
1659 } catch (final ProtocolException ex) {
1660 localReset(ex, H2Error.PROTOCOL_ERROR);
1661 }
1662 }
1663
1664 void produceInputCapacityUpdate() throws IOException {
1665 handler.updateInputCapacity();
1666 }
1667
1668 void reset(final Exception cause) {
1669 channel.setRemoteEndStream();
1670 channel.setLocalEndStream();
1671 handler.failed(cause);
1672 }
1673
1674 void localReset(final Exception cause, final int code) throws IOException {
1675 channel.localReset(code);
1676 handler.failed(cause);
1677 }
1678
1679 void localReset(final Exception cause, final H2Error error) throws IOException {
1680 localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1681 }
1682
1683 void localReset(final H2StreamResetException ex) throws IOException {
1684 localReset(ex, ex.getCode());
1685 }
1686
1687 void handle(final HttpException ex) throws IOException, HttpException {
1688 handler.handle(ex, channel.isRemoteClosed());
1689 }
1690
1691 HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1692 return handler.getPushHandlerFactory();
1693 }
1694
1695 void cancel() {
1696 reset(new RequestNotExecutedException());
1697 }
1698
1699 boolean abort() {
1700 final boolean cancelled = channel.cancel();
1701 handler.failed(new RequestNotExecutedException());
1702 return cancelled;
1703 }
1704
1705 void releaseResources() {
1706 handler.releaseResources();
1707 }
1708
1709 void appendState(final StringBuilder buf) {
1710 buf.append("channel=[");
1711 channel.appendState(buf);
1712 buf.append("]");
1713 }
1714
1715 @Override
1716 public String toString() {
1717 final StringBuilder buf = new StringBuilder();
1718 buf.append("[");
1719 appendState(buf);
1720 buf.append("]");
1721 return buf.toString();
1722 }
1723
1724 }
1725
1726 }