1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.net.SocketAddress;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.SelectionKey;
33 import java.nio.charset.CharacterCodingException;
34 import java.util.Deque;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Queue;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentLinkedDeque;
41 import java.util.concurrent.ConcurrentLinkedQueue;
42 import java.util.concurrent.atomic.AtomicInteger;
43
44 import javax.net.ssl.SSLSession;
45
46 import org.apache.hc.core5.concurrent.Cancellable;
47 import org.apache.hc.core5.concurrent.CancellableDependency;
48 import org.apache.hc.core5.http.ConnectionClosedException;
49 import org.apache.hc.core5.http.EndpointDetails;
50 import org.apache.hc.core5.http.Header;
51 import org.apache.hc.core5.http.HttpConnection;
52 import org.apache.hc.core5.http.HttpException;
53 import org.apache.hc.core5.http.HttpStreamResetException;
54 import org.apache.hc.core5.http.HttpVersion;
55 import org.apache.hc.core5.http.ProtocolException;
56 import org.apache.hc.core5.http.ProtocolVersion;
57 import org.apache.hc.core5.http.RequestNotExecutedException;
58 import org.apache.hc.core5.http.config.CharCodingConfig;
59 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
60 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
61 import org.apache.hc.core5.http.impl.CharCodingSupport;
62 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
63 import org.apache.hc.core5.http.nio.AsyncPushProducer;
64 import org.apache.hc.core5.http.nio.HandlerFactory;
65 import org.apache.hc.core5.http.nio.command.ExecutableCommand;
66 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
67 import org.apache.hc.core5.http.protocol.HttpCoreContext;
68 import org.apache.hc.core5.http.protocol.HttpProcessor;
69 import org.apache.hc.core5.http2.H2ConnectionException;
70 import org.apache.hc.core5.http2.H2Error;
71 import org.apache.hc.core5.http2.H2StreamResetException;
72 import org.apache.hc.core5.http2.config.H2Config;
73 import org.apache.hc.core5.http2.config.H2Param;
74 import org.apache.hc.core5.http2.config.H2Setting;
75 import org.apache.hc.core5.http2.frame.FrameFactory;
76 import org.apache.hc.core5.http2.frame.FrameFlag;
77 import org.apache.hc.core5.http2.frame.FrameType;
78 import org.apache.hc.core5.http2.frame.RawFrame;
79 import org.apache.hc.core5.http2.frame.StreamIdGenerator;
80 import org.apache.hc.core5.http2.hpack.HPackDecoder;
81 import org.apache.hc.core5.http2.hpack.HPackEncoder;
82 import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
83 import org.apache.hc.core5.http2.nio.AsyncPingHandler;
84 import org.apache.hc.core5.http2.nio.command.PingCommand;
85 import org.apache.hc.core5.io.CloseMode;
86 import org.apache.hc.core5.reactor.Command;
87 import org.apache.hc.core5.reactor.ProtocolIOSession;
88 import org.apache.hc.core5.reactor.ssl.TlsDetails;
89 import org.apache.hc.core5.util.Args;
90 import org.apache.hc.core5.util.ByteArrayBuffer;
91 import org.apache.hc.core5.util.Identifiable;
92 import org.apache.hc.core5.util.Timeout;
93
94 abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
95
96 private static final long LINGER_TIME = 1000;
97 private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024;
98
99 enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
100 enum SettingsHandshake { READY, TRANSMITTED, ACKED }
101
102 private final ProtocolIOSession ioSession;
103 private final FrameFactory frameFactory;
104 private final StreamIdGenerator idGenerator;
105 private final HttpProcessor httpProcessor;
106 private final H2Config localConfig;
107 private final BasicH2TransportMetrics inputMetrics;
108 private final BasicH2TransportMetrics outputMetrics;
109 private final BasicHttpConnectionMetrics connMetrics;
110 private final FrameInputBuffer inputBuffer;
111 private final FrameOutputBuffer outputBuffer;
112 private final Deque<RawFrame> outputQueue;
113 private final HPackEncoder hPackEncoder;
114 private final HPackDecoder hPackDecoder;
115 private final Map<Integer, H2Stream> streamMap;
116 private final Queue<AsyncPingHandler> pingHandlers;
117 private final AtomicInteger connInputWindow;
118 private final AtomicInteger connOutputWindow;
119 private final AtomicInteger outputRequests;
120 private final AtomicInteger lastStreamId;
121 private final H2StreamListener streamListener;
122
123 private ConnectionHandshake connState = ConnectionHandshake.READY;
124 private SettingsHandshake localSettingState = SettingsHandshake.READY;
125 private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
126
127 private int initInputWinSize;
128 private int initOutputWinSize;
129 private int lowMark;
130
131 private volatile H2Config remoteConfig;
132
133 private Continuation continuation;
134
135 private int processedRemoteStreamId;
136 private EndpointDetails endpointDetails;
137 private boolean goAwayReceived;
138
139 AbstractH2StreamMultiplexer(
140 final ProtocolIOSession ioSession,
141 final FrameFactory frameFactory,
142 final StreamIdGenerator idGenerator,
143 final HttpProcessor httpProcessor,
144 final CharCodingConfig charCodingConfig,
145 final H2Config h2Config,
146 final H2StreamListener streamListener) {
147 this.ioSession = Args.notNull(ioSession, "IO session");
148 this.frameFactory = Args.notNull(frameFactory, "Frame factory");
149 this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
150 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
151 this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
152 this.inputMetrics = new BasicH2TransportMetrics();
153 this.outputMetrics = new BasicH2TransportMetrics();
154 this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
155 this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
156 this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
157 this.outputQueue = new ConcurrentLinkedDeque<>();
158 this.pingHandlers = new ConcurrentLinkedQueue<>();
159 this.outputRequests = new AtomicInteger(0);
160 this.lastStreamId = new AtomicInteger(0);
161 this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
162 this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
163 this.streamMap = new ConcurrentHashMap<>();
164 this.remoteConfig = H2Config.INIT;
165 this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
166 this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
167
168 this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
169 this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
170
171 this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
172 this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
173 this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
174
175 this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
176 this.streamListener = streamListener;
177 }
178
179 @Override
180 public String getId() {
181 return ioSession.getId();
182 }
183
184 abstract void acceptHeaderFrame() throws H2ConnectionException;
185
186 abstract void acceptPushRequest() throws H2ConnectionException;
187
188 abstract void acceptPushFrame() throws H2ConnectionException;
189
190 abstract H2StreamHandler createRemotelyInitiatedStream(
191 H2StreamChannel channel,
192 HttpProcessor httpProcessor,
193 BasicHttpConnectionMetrics connMetrics,
194 HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
195
196 abstract H2StreamHandler createLocallyInitiatedStream(
197 ExecutableCommand command,
198 H2StreamChannel channel,
199 HttpProcessor httpProcessor,
200 BasicHttpConnectionMetrics connMetrics) throws IOException;
201
202 private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
203 for (;;) {
204 final int current = window.get();
205 long newValue = (long) current + delta;
206
207
208
209 if (newValue == 0x80000000L) {
210 newValue = Integer.MAX_VALUE;
211 }
212
213
214 if (Math.abs(newValue) > 0x7fffffffL) {
215 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
216 }
217 if (window.compareAndSet(current, (int) newValue)) {
218 return (int) newValue;
219 }
220 }
221 }
222
223 private int updateInputWindow(
224 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
225 final int newSize = updateWindow(window, delta);
226 if (streamListener != null) {
227 streamListener.onInputFlowControl(this, streamId, delta, newSize);
228 }
229 return newSize;
230 }
231
232 private int updateOutputWindow(
233 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
234 final int newSize = updateWindow(window, delta);
235 if (streamListener != null) {
236 streamListener.onOutputFlowControl(this, streamId, delta, newSize);
237 }
238 return newSize;
239 }
240
241 private void commitFrameInternal(final RawFrame frame) throws IOException {
242 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
243 if (streamListener != null) {
244 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
245 }
246 outputBuffer.write(frame, ioSession);
247 } else {
248 outputQueue.addLast(frame);
249 }
250 ioSession.setEvent(SelectionKey.OP_WRITE);
251 }
252
253 private void commitFrame(final RawFrame frame) throws IOException {
254 Args.notNull(frame, "Frame");
255 ioSession.getLock().lock();
256 try {
257 commitFrameInternal(frame);
258 } finally {
259 ioSession.getLock().unlock();
260 }
261 }
262
263 private void commitHeaders(
264 final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
265 if (streamListener != null) {
266 streamListener.onHeaderOutput(this, streamId, headers);
267 }
268 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
269 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
270
271 int off = 0;
272 int remaining = buf.length();
273 boolean continuation = false;
274
275 while (remaining > 0) {
276 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
277 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
278
279 remaining -= chunk;
280 off += chunk;
281
282 final boolean endHeaders = remaining == 0;
283 final RawFrame frame;
284 if (!continuation) {
285 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
286 continuation = true;
287 } else {
288 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
289 }
290 commitFrameInternal(frame);
291 }
292 }
293
294 private void commitPushPromise(
295 final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
296 if (headers == null || headers.isEmpty()) {
297 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
298 }
299 if (streamListener != null) {
300 streamListener.onHeaderOutput(this, streamId, headers);
301 }
302 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
303 buf.append((byte)(promisedStreamId >> 24));
304 buf.append((byte)(promisedStreamId >> 16));
305 buf.append((byte)(promisedStreamId >> 8));
306 buf.append((byte)(promisedStreamId));
307
308 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
309
310 int off = 0;
311 int remaining = buf.length();
312 boolean continuation = false;
313
314 while (remaining > 0) {
315 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
316 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
317
318 remaining -= chunk;
319 off += chunk;
320
321 final boolean endHeaders = remaining == 0;
322 final RawFrame frame;
323 if (!continuation) {
324 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
325 continuation = true;
326 } else {
327 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
328 }
329 commitFrameInternal(frame);
330 }
331 }
332
333 private void streamDataFrame(
334 final int streamId,
335 final AtomicInteger streamOutputWindow,
336 final ByteBuffer payload,
337 final int chunk) throws IOException {
338 final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
339 if (streamListener != null) {
340 streamListener.onFrameOutput(this, streamId, dataFrame);
341 }
342 updateOutputWindow(0, connOutputWindow, -chunk);
343 updateOutputWindow(streamId, streamOutputWindow, -chunk);
344 outputBuffer.write(dataFrame, ioSession);
345 }
346
347 private int streamData(
348 final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
349 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
350 final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
351 if (capacity <= 0) {
352 return 0;
353 }
354 final int maxPayloadSize = Math.min(capacity, remoteConfig.getMaxFrameSize());
355 final int chunk;
356 if (payload.remaining() <= maxPayloadSize) {
357 chunk = payload.remaining();
358 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
359 } else {
360 chunk = maxPayloadSize;
361 final int originalLimit = payload.limit();
362 try {
363 payload.limit(payload.position() + chunk);
364 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
365 } finally {
366 payload.limit(originalLimit);
367 }
368 }
369 payload.position(payload.position() + chunk);
370 ioSession.setEvent(SelectionKey.OP_WRITE);
371 return chunk;
372 }
373 return 0;
374 }
375
376 private void incrementInputCapacity(
377 final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
378 if (inputCapacity > 0) {
379 final int streamWinSize = inputWindow.get();
380 final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
381 final int chunk = Math.min(inputCapacity, remainingCapacity);
382 if (chunk != 0) {
383 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
384 commitFrame(windowUpdateFrame);
385 updateInputWindow(streamId, inputWindow, chunk);
386 }
387 }
388 }
389
390 private void requestSessionOutput() {
391 outputRequests.incrementAndGet();
392 ioSession.setEvent(SelectionKey.OP_WRITE);
393 }
394
395 private void updateLastStreamId(final int streamId) {
396 final int currentId = lastStreamId.get();
397 if (streamId > currentId) {
398 lastStreamId.compareAndSet(currentId, streamId);
399 }
400 }
401
402 private int generateStreamId() {
403 for (;;) {
404 final int currentId = lastStreamId.get();
405 final int newStreamId = idGenerator.generate(currentId);
406 if (lastStreamId.compareAndSet(currentId, newStreamId)) {
407 return newStreamId;
408 }
409 }
410 }
411
412 public final void onConnect() throws HttpException, IOException {
413 connState = ConnectionHandshake.ACTIVE;
414 final RawFrame settingsFrame = frameFactory.createSettings(
415 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
416 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
417 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
418 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
419 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
420 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
421
422 commitFrame(settingsFrame);
423 localSettingState = SettingsHandshake.TRANSMITTED;
424 maximizeConnWindow(connInputWindow.get());
425
426 if (streamListener != null) {
427 final int initInputWindow = connInputWindow.get();
428 streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
429 final int initOutputWindow = connOutputWindow.get();
430 streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
431 }
432 }
433
434 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
435 if (connState == ConnectionHandshake.SHUTDOWN) {
436 ioSession.clearEvent(SelectionKey.OP_READ);
437 } else {
438 if (src != null) {
439 inputBuffer.put(src);
440 }
441 RawFrame frame;
442 while ((frame = inputBuffer.read(ioSession)) != null) {
443 if (streamListener != null) {
444 streamListener.onFrameInput(this, frame.getStreamId(), frame);
445 }
446 consumeFrame(frame);
447 }
448 }
449 }
450
451 public final void onOutput() throws HttpException, IOException {
452 ioSession.getLock().lock();
453 try {
454 if (!outputBuffer.isEmpty()) {
455 outputBuffer.flush(ioSession);
456 }
457 while (outputBuffer.isEmpty()) {
458 final RawFrame frame = outputQueue.poll();
459 if (frame != null) {
460 if (streamListener != null) {
461 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
462 }
463 outputBuffer.write(frame, ioSession);
464 } else {
465 break;
466 }
467 }
468 } finally {
469 ioSession.getLock().unlock();
470 }
471
472 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
473
474 if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
475 produceOutput();
476 }
477 final int pendingOutputRequests = outputRequests.get();
478 boolean outputPending = false;
479 if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
480 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
481 final Map.Entry<Integer, H2Stream> entry = it.next();
482 final H2Stream stream = entry.getValue();
483 if (!stream.isLocalClosed()
484 && stream.getOutputWindow().get() > 0
485 && stream.isOutputReady()) {
486 outputPending = true;
487 break;
488 }
489 }
490 }
491 ioSession.getLock().lock();
492 try {
493 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
494 && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
495 ioSession.clearEvent(SelectionKey.OP_WRITE);
496 } else {
497 outputRequests.addAndGet(-pendingOutputRequests);
498 }
499 } finally {
500 ioSession.getLock().unlock();
501 }
502 }
503
504 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
505 processPendingCommands();
506 }
507 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
508 int liveStreams = 0;
509 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
510 final Map.Entry<Integer, H2Stream> entry = it.next();
511 final H2Stream stream = entry.getValue();
512 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
513 stream.releaseResources();
514 it.remove();
515 } else {
516 if (idGenerator.isSameSide(stream.getId()) || stream.getId() <= processedRemoteStreamId) {
517 liveStreams++;
518 }
519 }
520 }
521 if (liveStreams == 0) {
522 connState = ConnectionHandshake.SHUTDOWN;
523 }
524 }
525 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
526 if (!streamMap.isEmpty()) {
527 for (final H2Stream stream : streamMap.values()) {
528 stream.releaseResources();
529 }
530 streamMap.clear();
531 }
532 ioSession.getLock().lock();
533 try {
534 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
535 ioSession.close();
536 }
537 } finally {
538 ioSession.getLock().unlock();
539 }
540 }
541 }
542
543 public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
544 connState = ConnectionHandshake.SHUTDOWN;
545
546 final RawFrame goAway;
547 if (localSettingState != SettingsHandshake.ACKED) {
548 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
549 "Setting timeout (" + timeout + ")");
550 } else {
551 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
552 "Timeout due to inactivity (" + timeout + ")");
553 }
554 commitFrame(goAway);
555 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
556 final Map.Entry<Integer, H2Stream> entry = it.next();
557 final H2Stream stream = entry.getValue();
558 stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
559 }
560 streamMap.clear();
561 }
562
563 public final void onDisconnect() {
564 for (;;) {
565 final AsyncPingHandler pingHandler = pingHandlers.poll();
566 if (pingHandler != null) {
567 pingHandler.cancel();
568 } else {
569 break;
570 }
571 }
572 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
573 final Map.Entry<Integer, H2Stream> entry = it.next();
574 final H2Stream stream = entry.getValue();
575 stream.cancel();
576 }
577 for (;;) {
578 final Command command = ioSession.poll();
579 if (command != null) {
580 if (command instanceof ExecutableCommand) {
581 ((ExecutableCommand) command).failed(new ConnectionClosedException());
582 } else {
583 command.cancel();
584 }
585 } else {
586 break;
587 }
588 }
589 }
590
591 private void processPendingCommands() throws IOException, HttpException {
592 while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
593 final Command command = ioSession.poll();
594 if (command == null) {
595 break;
596 }
597 if (command instanceof ShutdownCommand) {
598 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
599 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
600 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
601 final Map.Entry<Integer, H2Stream> entry = it.next();
602 final H2Stream stream = entry.getValue();
603 stream.cancel();
604 }
605 streamMap.clear();
606 connState = ConnectionHandshake.SHUTDOWN;
607 } else {
608 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
609 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
610 commitFrame(goAway);
611 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
612 }
613 }
614 break;
615 } else if (command instanceof PingCommand) {
616 final PingCommand./../../../org/apache/hc/core5/http2/nio/command/PingCommand.html#PingCommand">PingCommand pingCommand = (PingCommand) command;
617 final AsyncPingHandler handler = pingCommand.getHandler();
618 pingHandlers.add(handler);
619 final RawFrame ping = frameFactory.createPing(handler.getData());
620 commitFrame(ping);
621 } else if (command instanceof ExecutableCommand) {
622 final int streamId = generateStreamId();
623 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
624 streamId, true, initInputWinSize, initOutputWinSize);
625 final ExecutableCommand executableCommand = (ExecutableCommand) command;
626 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
627 executableCommand, channel, httpProcessor, connMetrics);
628
629 final H2Stream stream = new H2Stream(channel, streamHandler, false);
630 streamMap.put(streamId, stream);
631
632 if (streamListener != null) {
633 final int initInputWindow = stream.getInputWindow().get();
634 streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
635 final int initOutputWindow = stream.getOutputWindow().get();
636 streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
637 }
638
639 if (stream.isOutputReady()) {
640 stream.produceOutput();
641 }
642 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
643 if (cancellableDependency != null) {
644 cancellableDependency.setDependency(new Cancellable() {
645
646 @Override
647 public boolean cancel() {
648 return stream.abort();
649 }
650
651 });
652 }
653 if (!outputQueue.isEmpty()) {
654 return;
655 }
656 }
657 }
658 }
659
660 public final void onException(final Exception cause) {
661 try {
662 for (;;) {
663 final AsyncPingHandler pingHandler = pingHandlers.poll();
664 if (pingHandler != null) {
665 pingHandler.failed(cause);
666 } else {
667 break;
668 }
669 }
670 for (;;) {
671 final Command command = ioSession.poll();
672 if (command != null) {
673 if (command instanceof ExecutableCommand) {
674 ((ExecutableCommand) command).failed(new ConnectionClosedException());
675 } else {
676 command.cancel();
677 }
678 } else {
679 break;
680 }
681 }
682 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
683 final Map.Entry<Integer, H2Stream> entry = it.next();
684 final H2Stream stream = entry.getValue();
685 stream.reset(cause);
686 }
687 streamMap.clear();
688 if (!(cause instanceof ConnectionClosedException)) {
689 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
690 final H2Error errorCode;
691 if (cause instanceof H2ConnectionException) {
692 errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
693 } else if (cause instanceof ProtocolException){
694 errorCode = H2Error.PROTOCOL_ERROR;
695 } else {
696 errorCode = H2Error.INTERNAL_ERROR;
697 }
698 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
699 commitFrame(goAway);
700 }
701 }
702 } catch (final IOException ignore) {
703 } finally {
704 connState = ConnectionHandshake.SHUTDOWN;
705 final CloseMode closeMode;
706 if (cause instanceof ConnectionClosedException) {
707 closeMode = CloseMode.GRACEFUL;
708 } else if (cause instanceof IOException) {
709 closeMode = CloseMode.IMMEDIATE;
710 } else {
711 closeMode = CloseMode.GRACEFUL;
712 }
713 ioSession.close(closeMode);
714 }
715 }
716
717 private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
718 if (streamId == 0) {
719 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
720 }
721 final H2Stream stream = streamMap.get(streamId);
722 if (stream == null) {
723 if (streamId <= lastStreamId.get()) {
724 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
725 } else {
726 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
727 }
728 }
729 return stream;
730 }
731
732 private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
733 final FrameType frameType = FrameType.valueOf(frame.getType());
734 final int streamId = frame.getStreamId();
735 if (continuation != null && frameType != FrameType.CONTINUATION) {
736 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
737 }
738 switch (frameType) {
739 case DATA: {
740 final H2Stream stream = getValidStream(streamId);
741 try {
742 consumeDataFrame(frame, stream);
743 } catch (final H2StreamResetException ex) {
744 stream.localReset(ex);
745 } catch (final HttpStreamResetException ex) {
746 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
747 }
748
749 if (stream.isTerminated()) {
750 streamMap.remove(streamId);
751 stream.releaseResources();
752 requestSessionOutput();
753 }
754 }
755 break;
756 case HEADERS: {
757 if (streamId == 0) {
758 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
759 }
760 H2Stream stream = streamMap.get(streamId);
761 if (stream == null) {
762 acceptHeaderFrame();
763
764 if (idGenerator.isSameSide(streamId)) {
765 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
766 }
767 if (goAwayReceived ) {
768 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
769 }
770
771 updateLastStreamId(streamId);
772
773 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
774 streamId, false, initInputWinSize, initOutputWinSize);
775 final H2StreamHandler streamHandler;
776 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
777 streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null);
778 } else {
779 streamHandler = NoopH2StreamHandler.INSTANCE;
780 channel.setLocalEndStream();
781 }
782
783 stream = new H2Stream(channel, streamHandler, true);
784 if (stream.isOutputReady()) {
785 stream.produceOutput();
786 }
787 streamMap.put(streamId, stream);
788 }
789
790 try {
791 consumeHeaderFrame(frame, stream);
792
793 if (stream.isOutputReady()) {
794 stream.produceOutput();
795 }
796 } catch (final H2StreamResetException ex) {
797 stream.localReset(ex);
798 } catch (final HttpStreamResetException ex) {
799 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
800 } catch (final HttpException ex) {
801 stream.handle(ex);
802 }
803
804 if (stream.isTerminated()) {
805 streamMap.remove(streamId);
806 stream.releaseResources();
807 requestSessionOutput();
808 }
809 }
810 break;
811 case CONTINUATION: {
812 if (continuation == null) {
813 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
814 }
815 if (streamId != continuation.streamId) {
816 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
817 }
818
819 final H2Stream stream = getValidStream(streamId);
820 try {
821
822 consumeContinuationFrame(frame, stream);
823 } catch (final H2StreamResetException ex) {
824 stream.localReset(ex);
825 } catch (final HttpStreamResetException ex) {
826 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
827 }
828
829 if (stream.isTerminated()) {
830 streamMap.remove(streamId);
831 stream.releaseResources();
832 requestSessionOutput();
833 }
834 }
835 break;
836 case WINDOW_UPDATE: {
837 final ByteBuffer payload = frame.getPayload();
838 if (payload == null || payload.remaining() != 4) {
839 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
840 }
841 final int delta = payload.getInt();
842 if (delta <= 0) {
843 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
844 }
845 if (streamId == 0) {
846 try {
847 updateOutputWindow(0, connOutputWindow, delta);
848 } catch (final ArithmeticException ex) {
849 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
850 }
851 } else {
852 final H2Stream stream = streamMap.get(streamId);
853 if (stream != null) {
854 try {
855 updateOutputWindow(streamId, stream.getOutputWindow(), delta);
856 } catch (final ArithmeticException ex) {
857 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
858 }
859 }
860 }
861 ioSession.setEvent(SelectionKey.OP_WRITE);
862 }
863 break;
864 case RST_STREAM: {
865 if (streamId == 0) {
866 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
867 }
868 final H2Stream stream = streamMap.get(streamId);
869 if (stream == null) {
870 if (streamId > lastStreamId.get()) {
871 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
872 }
873 } else {
874 final ByteBuffer payload = frame.getPayload();
875 if (payload == null || payload.remaining() != 4) {
876 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
877 }
878 final int errorCode = payload.getInt();
879 stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
880 streamMap.remove(streamId);
881 stream.releaseResources();
882 requestSessionOutput();
883 }
884 }
885 break;
886 case PING: {
887 if (streamId != 0) {
888 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
889 }
890 final ByteBuffer ping = frame.getPayloadContent();
891 if (ping == null || ping.remaining() != 8) {
892 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
893 }
894 if (frame.isFlagSet(FrameFlag.ACK)) {
895 final AsyncPingHandler pingHandler = pingHandlers.poll();
896 if (pingHandler != null) {
897 pingHandler.consumeResponse(ping);
898 }
899 } else {
900 final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
901 pong.put(ping);
902 pong.flip();
903 final RawFrame response = frameFactory.createPingAck(pong);
904 commitFrame(response);
905 }
906 }
907 break;
908 case SETTINGS: {
909 if (streamId != 0) {
910 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
911 }
912 if (frame.isFlagSet(FrameFlag.ACK)) {
913 if (localSettingState == SettingsHandshake.TRANSMITTED) {
914 localSettingState = SettingsHandshake.ACKED;
915 ioSession.setEvent(SelectionKey.OP_WRITE);
916 applyLocalSettings();
917 }
918 } else {
919 final ByteBuffer payload = frame.getPayload();
920 if (payload != null) {
921 if ((payload.remaining() % 6) != 0) {
922 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
923 }
924 consumeSettingsFrame(payload);
925 remoteSettingState = SettingsHandshake.TRANSMITTED;
926 }
927
928 final RawFrame response = frameFactory.createSettingsAck();
929 commitFrame(response);
930 remoteSettingState = SettingsHandshake.ACKED;
931 }
932 }
933 break;
934 case PRIORITY:
935
936 break;
937 case PUSH_PROMISE: {
938 acceptPushFrame();
939
940 if (goAwayReceived ) {
941 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
942 }
943
944 if (!localConfig.isPushEnabled()) {
945 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
946 }
947
948 final H2Stream stream = getValidStream(streamId);
949 if (stream.isRemoteClosed()) {
950 stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
951 break;
952 }
953
954 final ByteBuffer payload = frame.getPayloadContent();
955 if (payload == null || payload.remaining() < 4) {
956 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
957 }
958 final int promisedStreamId = payload.getInt();
959 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
960 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
961 }
962 if (streamMap.get(promisedStreamId) != null) {
963 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
964 }
965
966 updateLastStreamId(promisedStreamId);
967
968 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
969 promisedStreamId, false, initInputWinSize, initOutputWinSize);
970 final H2StreamHandler streamHandler;
971 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
972 streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics,
973 stream.getPushHandlerFactory());
974 } else {
975 streamHandler = NoopH2StreamHandler.INSTANCE;
976 channel.setLocalEndStream();
977 }
978
979 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
980 streamMap.put(promisedStreamId, promisedStream);
981
982 try {
983 consumePushPromiseFrame(frame, payload, promisedStream);
984 } catch (final H2StreamResetException ex) {
985 promisedStream.localReset(ex);
986 } catch (final HttpStreamResetException ex) {
987 promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
988 }
989 }
990 break;
991 case GOAWAY: {
992 if (streamId != 0) {
993 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
994 }
995 final ByteBuffer payload = frame.getPayload();
996 if (payload == null || payload.remaining() < 8) {
997 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
998 }
999 final int processedLocalStreamId = payload.getInt();
1000 final int errorCode = payload.getInt();
1001 goAwayReceived = true;
1002 if (errorCode == H2Error.NO_ERROR.getCode()) {
1003 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
1004 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1005 final Map.Entry<Integer, H2Stream> entry = it.next();
1006 final int activeStreamId = entry.getKey();
1007 if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
1008 final H2Stream stream = entry.getValue();
1009 stream.cancel();
1010 it.remove();
1011 }
1012 }
1013 }
1014 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
1015 } else {
1016 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1017 final Map.Entry<Integer, H2Stream> entry = it.next();
1018 final H2Stream stream = entry.getValue();
1019 stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
1020 }
1021 streamMap.clear();
1022 connState = ConnectionHandshake.SHUTDOWN;
1023 }
1024 }
1025 ioSession.setEvent(SelectionKey.OP_WRITE);
1026 break;
1027 }
1028 }
1029
1030 private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1031 final int streamId = stream.getId();
1032 final ByteBuffer payload = frame.getPayloadContent();
1033 if (payload != null) {
1034 final int frameLength = frame.getLength();
1035 final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1036 if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1037 stream.produceInputCapacityUpdate();
1038 }
1039 final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1040 if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1041 maximizeConnWindow(connWinSize);
1042 }
1043 }
1044 if (stream.isRemoteClosed()) {
1045 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1046 }
1047 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1048 stream.setRemoteEndStream();
1049 }
1050 if (stream.isLocalReset()) {
1051 return;
1052 }
1053 stream.consumeData(payload);
1054 }
1055
1056 private void maximizeConnWindow(final int connWinSize) throws IOException {
1057 final int delta = Integer.MAX_VALUE - connWinSize;
1058 if (delta > 0) {
1059 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1060 commitFrame(windowUpdateFrame);
1061 updateInputWindow(0, connInputWindow, delta);
1062 }
1063 }
1064
1065 private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1066 final int promisedStreamId = promisedStream.getId();
1067 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1068 continuation = new Continuation(promisedStreamId, frame.getType(), true);
1069 }
1070 if (continuation == null) {
1071 final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1072 if (promisedStreamId > processedRemoteStreamId) {
1073 processedRemoteStreamId = promisedStreamId;
1074 }
1075 if (streamListener != null) {
1076 streamListener.onHeaderInput(this, promisedStreamId, headers);
1077 }
1078 promisedStream.consumePromise(headers);
1079 } else {
1080 continuation.copyPayload(payload);
1081 }
1082 }
1083
1084 List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1085 return hPackDecoder.decodeHeaders(payload);
1086 }
1087
1088 private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1089 final int streamId = stream.getId();
1090 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1091 continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1092 }
1093 final ByteBuffer payload = frame.getPayloadContent();
1094 if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1095
1096 payload.getInt();
1097 payload.get();
1098 }
1099 if (continuation == null) {
1100 final List<Header> headers = decodeHeaders(payload);
1101 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1102 processedRemoteStreamId = streamId;
1103 }
1104 if (streamListener != null) {
1105 streamListener.onHeaderInput(this, streamId, headers);
1106 }
1107 if (stream.isRemoteClosed()) {
1108 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1109 }
1110 if (stream.isLocalReset()) {
1111 return;
1112 }
1113 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1114 stream.setRemoteEndStream();
1115 }
1116 stream.consumeHeader(headers);
1117 } else {
1118 continuation.copyPayload(payload);
1119 }
1120 }
1121
1122 private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1123 final int streamId = frame.getStreamId();
1124 final ByteBuffer payload = frame.getPayload();
1125 continuation.copyPayload(payload);
1126 if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1127 final List<Header> headers = decodeHeaders(continuation.getContent());
1128 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1129 processedRemoteStreamId = streamId;
1130 }
1131 if (streamListener != null) {
1132 streamListener.onHeaderInput(this, streamId, headers);
1133 }
1134 if (stream.isRemoteClosed()) {
1135 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1136 }
1137 if (stream.isLocalReset()) {
1138 return;
1139 }
1140 if (continuation.endStream) {
1141 stream.setRemoteEndStream();
1142 }
1143 if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1144 stream.consumePromise(headers);
1145 } else {
1146 stream.consumeHeader(headers);
1147 }
1148 continuation = null;
1149 }
1150 }
1151
1152 private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1153 final H2Config.Builder configBuilder = H2Config.initial();
1154 while (payload.hasRemaining()) {
1155 final int code = payload.getShort();
1156 final int value = payload.getInt();
1157 final H2Param param = H2Param.valueOf(code);
1158 if (param != null) {
1159 switch (param) {
1160 case HEADER_TABLE_SIZE:
1161 try {
1162 configBuilder.setHeaderTableSize(value);
1163 } catch (final IllegalArgumentException ex) {
1164 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1165 }
1166 break;
1167 case MAX_CONCURRENT_STREAMS:
1168 try {
1169 configBuilder.setMaxConcurrentStreams(value);
1170 } catch (final IllegalArgumentException ex) {
1171 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1172 }
1173 break;
1174 case ENABLE_PUSH:
1175 configBuilder.setPushEnabled(value == 1);
1176 break;
1177 case INITIAL_WINDOW_SIZE:
1178 try {
1179 configBuilder.setInitialWindowSize(value);
1180 } catch (final IllegalArgumentException ex) {
1181 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1182 }
1183 break;
1184 case MAX_FRAME_SIZE:
1185 try {
1186 configBuilder.setMaxFrameSize(value);
1187 } catch (final IllegalArgumentException ex) {
1188 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1189 }
1190 break;
1191 case MAX_HEADER_LIST_SIZE:
1192 try {
1193 configBuilder.setMaxHeaderListSize(value);
1194 } catch (final IllegalArgumentException ex) {
1195 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1196 }
1197 break;
1198 }
1199 }
1200 }
1201 applyRemoteSettings(configBuilder.build());
1202 }
1203
1204 private void produceOutput() throws HttpException, IOException {
1205 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1206 final Map.Entry<Integer, H2Stream> entry = it.next();
1207 final H2Stream stream = entry.getValue();
1208 if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1209 stream.produceOutput();
1210 }
1211 if (stream.isTerminated()) {
1212 it.remove();
1213 stream.releaseResources();
1214 requestSessionOutput();
1215 }
1216 if (!outputQueue.isEmpty()) {
1217 break;
1218 }
1219 }
1220 }
1221
1222 private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1223 remoteConfig = config;
1224
1225 hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1226 final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1227 initOutputWinSize = remoteConfig.getInitialWindowSize();
1228
1229 if (delta != 0) {
1230 if (!streamMap.isEmpty()) {
1231 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1232 final Map.Entry<Integer, H2Stream> entry = it.next();
1233 final H2Stream stream = entry.getValue();
1234 try {
1235 updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1236 } catch (final ArithmeticException ex) {
1237 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1238 }
1239 }
1240 }
1241 }
1242 }
1243
1244 private void applyLocalSettings() throws H2ConnectionException {
1245 hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1246 hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1247
1248 final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1249 initInputWinSize = localConfig.getInitialWindowSize();
1250
1251 if (delta != 0 && !streamMap.isEmpty()) {
1252 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1253 final Map.Entry<Integer, H2Stream> entry = it.next();
1254 final H2Stream stream = entry.getValue();
1255 try {
1256 updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1257 } catch (final ArithmeticException ex) {
1258 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1259 }
1260 }
1261 }
1262 lowMark = initInputWinSize / 2;
1263 }
1264
1265 @Override
1266 public void close() throws IOException {
1267 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1268 }
1269
1270 @Override
1271 public void close(final CloseMode closeMode) {
1272 ioSession.close(closeMode);
1273 }
1274
1275 @Override
1276 public boolean isOpen() {
1277 return connState == ConnectionHandshake.ACTIVE;
1278 }
1279
1280 @Override
1281 public void setSocketTimeout(final Timeout timeout) {
1282 ioSession.setSocketTimeout(timeout);
1283 }
1284
1285 @Override
1286 public SSLSession getSSLSession() {
1287 final TlsDetails tlsDetails = ioSession.getTlsDetails();
1288 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1289 }
1290
1291 @Override
1292 public EndpointDetails getEndpointDetails() {
1293 if (endpointDetails == null) {
1294 endpointDetails = new BasicEndpointDetails(
1295 ioSession.getRemoteAddress(),
1296 ioSession.getLocalAddress(),
1297 connMetrics,
1298 ioSession.getSocketTimeout());
1299 }
1300 return endpointDetails;
1301 }
1302
1303 @Override
1304 public Timeout getSocketTimeout() {
1305 return ioSession.getSocketTimeout();
1306 }
1307
1308 @Override
1309 public ProtocolVersion getProtocolVersion() {
1310 return HttpVersion.HTTP_2;
1311 }
1312
1313 @Override
1314 public SocketAddress getRemoteAddress() {
1315 return ioSession.getRemoteAddress();
1316 }
1317
1318 @Override
1319 public SocketAddress getLocalAddress() {
1320 return ioSession.getLocalAddress();
1321 }
1322
1323 void appendState(final StringBuilder buf) {
1324 buf.append("connState=").append(connState)
1325 .append(", connInputWindow=").append(connInputWindow)
1326 .append(", connOutputWindow=").append(connOutputWindow)
1327 .append(", outputQueue=").append(outputQueue.size())
1328 .append(", streamMap=").append(streamMap.size())
1329 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1330 }
1331
1332 private static class Continuation {
1333
1334 final int streamId;
1335 final int type;
1336 final boolean endStream;
1337 final ByteArrayBuffer headerBuffer;
1338
1339 private Continuation(final int streamId, final int type, final boolean endStream) {
1340 this.streamId = streamId;
1341 this.type = type;
1342 this.endStream = endStream;
1343 this.headerBuffer = new ByteArrayBuffer(1024);
1344 }
1345
1346 void copyPayload(final ByteBuffer payload) {
1347 if (payload == null) {
1348 return;
1349 }
1350 headerBuffer.ensureCapacity(payload.remaining());
1351 payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1352 }
1353
1354 ByteBuffer getContent() {
1355 return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1356 }
1357
1358 }
1359
1360 private class H2StreamChannelImpl implements H2StreamChannel {
1361
1362 private final int id;
1363 private final AtomicInteger inputWindow;
1364 private final AtomicInteger outputWindow;
1365
1366 private volatile boolean idle;
1367 private volatile boolean remoteEndStream;
1368 private volatile boolean localEndStream;
1369
1370 private volatile long deadline;
1371
1372 H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1373 this.id = id;
1374 this.idle = idle;
1375 this.inputWindow = new AtomicInteger(initialInputWindowSize);
1376 this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1377 }
1378
1379 int getId() {
1380 return id;
1381 }
1382
1383 AtomicInteger getOutputWindow() {
1384 return outputWindow;
1385 }
1386
1387 AtomicInteger getInputWindow() {
1388 return inputWindow;
1389 }
1390
1391 @Override
1392 public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1393 ioSession.getLock().lock();
1394 try {
1395 if (headers == null || headers.isEmpty()) {
1396 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1397 }
1398 if (localEndStream) {
1399 return;
1400 }
1401 idle = false;
1402 commitHeaders(id, headers, endStream);
1403 if (endStream) {
1404 localEndStream = true;
1405 }
1406 } finally {
1407 ioSession.getLock().unlock();
1408 }
1409 }
1410
1411 @Override
1412 public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1413 acceptPushRequest();
1414 final int promisedStreamId = generateStreamId();
1415 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1416 promisedStreamId,
1417 true,
1418 localConfig.getInitialWindowSize(),
1419 remoteConfig.getInitialWindowSize());
1420 final HttpCoreContext context = HttpCoreContext.create();
1421 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1422 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1423 final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1424 channel, httpProcessor, connMetrics, pushProducer, context);
1425 final H2Stream stream = new H2Stream(channel, streamHandler, false);
1426 streamMap.put(promisedStreamId, stream);
1427
1428 ioSession.getLock().lock();
1429 try {
1430 if (localEndStream) {
1431 stream.releaseResources();
1432 return;
1433 }
1434 commitPushPromise(id, promisedStreamId, headers);
1435 idle = false;
1436 } finally {
1437 ioSession.getLock().unlock();
1438 }
1439 }
1440
1441 @Override
1442 public void update(final int increment) throws IOException {
1443 if (remoteEndStream) {
1444 return;
1445 }
1446 incrementInputCapacity(0, connInputWindow, increment);
1447 incrementInputCapacity(id, inputWindow, increment);
1448 }
1449
1450 @Override
1451 public int write(final ByteBuffer payload) throws IOException {
1452 ioSession.getLock().lock();
1453 try {
1454 if (localEndStream) {
1455 return 0;
1456 }
1457 return streamData(id, outputWindow, payload);
1458 } finally {
1459 ioSession.getLock().unlock();
1460 }
1461 }
1462
1463 @Override
1464 public void endStream(final List<? extends Header> trailers) throws IOException {
1465 ioSession.getLock().lock();
1466 try {
1467 if (localEndStream) {
1468 return;
1469 }
1470 localEndStream = true;
1471 if (trailers != null && !trailers.isEmpty()) {
1472 commitHeaders(id, trailers, true);
1473 } else {
1474 final RawFrame frame = frameFactory.createData(id, null, true);
1475 commitFrameInternal(frame);
1476 }
1477 } finally {
1478 ioSession.getLock().unlock();
1479 }
1480 }
1481
1482 @Override
1483 public void endStream() throws IOException {
1484 endStream(null);
1485 }
1486
1487 @Override
1488 public void requestOutput() {
1489 requestSessionOutput();
1490 }
1491
1492 boolean isRemoteClosed() {
1493 return remoteEndStream;
1494 }
1495
1496 void setRemoteEndStream() {
1497 remoteEndStream = true;
1498 }
1499
1500 boolean isLocalClosed() {
1501 return localEndStream;
1502 }
1503
1504 void setLocalEndStream() {
1505 localEndStream = true;
1506 }
1507
1508 boolean isLocalReset() {
1509 return deadline > 0;
1510 }
1511
1512 boolean isResetDeadline() {
1513 final long l = deadline;
1514 return l > 0 && l < System.currentTimeMillis();
1515 }
1516
1517 boolean localReset(final int code) throws IOException {
1518 ioSession.getLock().lock();
1519 try {
1520 if (localEndStream) {
1521 return false;
1522 }
1523 localEndStream = true;
1524 deadline = System.currentTimeMillis() + LINGER_TIME;
1525 if (!idle) {
1526 final RawFrame resetStream = frameFactory.createResetStream(id, code);
1527 commitFrameInternal(resetStream);
1528 return true;
1529 }
1530 return false;
1531 } finally {
1532 ioSession.getLock().unlock();
1533 }
1534 }
1535
1536 boolean localReset(final H2Error error) throws IOException {
1537 return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1538 }
1539
1540 @Override
1541 public boolean cancel() {
1542 try {
1543 return localReset(H2Error.CANCEL);
1544 } catch (final IOException ignore) {
1545 return false;
1546 }
1547 }
1548
1549 void appendState(final StringBuilder buf) {
1550 buf.append("id=").append(id)
1551 .append(", connState=").append(connState)
1552 .append(", inputWindow=").append(inputWindow)
1553 .append(", outputWindow=").append(outputWindow)
1554 .append(", localEndStream=").append(localEndStream)
1555 .append(", idle=").append(idle);
1556 }
1557
1558 @Override
1559 public String toString() {
1560 final StringBuilder buf = new StringBuilder();
1561 buf.append("[");
1562 appendState(buf);
1563 buf.append("]");
1564 return buf.toString();
1565 }
1566
1567 }
1568
1569 static class H2Stream {
1570
1571 private final H2StreamChannelImpl channel;
1572 private final H2StreamHandler handler;
1573 private final boolean remoteInitiated;
1574
1575 private H2Stream(
1576 final H2StreamChannelImpl channel,
1577 final H2StreamHandler handler,
1578 final boolean remoteInitiated) {
1579 this.channel = channel;
1580 this.handler = handler;
1581 this.remoteInitiated = remoteInitiated;
1582 }
1583
1584 int getId() {
1585 return channel.getId();
1586 }
1587
1588 boolean isRemoteInitiated() {
1589 return remoteInitiated;
1590 }
1591
1592 AtomicInteger getOutputWindow() {
1593 return channel.getOutputWindow();
1594 }
1595
1596 AtomicInteger getInputWindow() {
1597 return channel.getInputWindow();
1598 }
1599
1600 boolean isTerminated() {
1601 return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1602 }
1603
1604 boolean isRemoteClosed() {
1605 return channel.isRemoteClosed();
1606 }
1607
1608 boolean isLocalClosed() {
1609 return channel.isLocalClosed();
1610 }
1611
1612 boolean isLocalReset() {
1613 return channel.isLocalReset();
1614 }
1615
1616 void setRemoteEndStream() {
1617 channel.setRemoteEndStream();
1618 }
1619
1620 void consumePromise(final List<Header> headers) throws HttpException, IOException {
1621 try {
1622 handler.consumePromise(headers);
1623 channel.setLocalEndStream();
1624 } catch (final ProtocolException ex) {
1625 localReset(ex, H2Error.PROTOCOL_ERROR);
1626 }
1627 }
1628
1629 void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1630 try {
1631 handler.consumeHeader(headers, channel.isRemoteClosed());
1632 } catch (final ProtocolException ex) {
1633 localReset(ex, H2Error.PROTOCOL_ERROR);
1634 }
1635 }
1636
1637 void consumeData(final ByteBuffer src) throws HttpException, IOException {
1638 try {
1639 handler.consumeData(src, channel.isRemoteClosed());
1640 } catch (final CharacterCodingException ex) {
1641 localReset(ex, H2Error.INTERNAL_ERROR);
1642 } catch (final ProtocolException ex) {
1643 localReset(ex, H2Error.PROTOCOL_ERROR);
1644 }
1645 }
1646
1647 boolean isOutputReady() {
1648 return handler.isOutputReady();
1649 }
1650
1651 void produceOutput() throws HttpException, IOException {
1652 try {
1653 handler.produceOutput();
1654 } catch (final ProtocolException ex) {
1655 localReset(ex, H2Error.PROTOCOL_ERROR);
1656 }
1657 }
1658
1659 void produceInputCapacityUpdate() throws IOException {
1660 handler.updateInputCapacity();
1661 }
1662
1663 void reset(final Exception cause) {
1664 channel.setRemoteEndStream();
1665 channel.setLocalEndStream();
1666 handler.failed(cause);
1667 }
1668
1669 void localReset(final Exception cause, final int code) throws IOException {
1670 channel.localReset(code);
1671 handler.failed(cause);
1672 }
1673
1674 void localReset(final Exception cause, final H2Error error) throws IOException {
1675 localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1676 }
1677
1678 void localReset(final H2StreamResetException ex) throws IOException {
1679 localReset(ex, ex.getCode());
1680 }
1681
1682 void handle(final HttpException ex) throws IOException, HttpException {
1683 handler.handle(ex, channel.isRemoteClosed());
1684 }
1685
1686 HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1687 return handler.getPushHandlerFactory();
1688 }
1689
1690 void cancel() {
1691 reset(new RequestNotExecutedException());
1692 }
1693
1694 boolean abort() {
1695 final boolean cancelled = channel.cancel();
1696 handler.failed(new RequestNotExecutedException());
1697 return cancelled;
1698 }
1699
1700 void releaseResources() {
1701 handler.releaseResources();
1702 }
1703
1704 void appendState(final StringBuilder buf) {
1705 buf.append("channel=[");
1706 channel.appendState(buf);
1707 buf.append("]");
1708 }
1709
1710 @Override
1711 public String toString() {
1712 final StringBuilder buf = new StringBuilder();
1713 buf.append("[");
1714 appendState(buf);
1715 buf.append("]");
1716 return buf.toString();
1717 }
1718
1719 }
1720
1721 }