View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ReadableByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.WritableByteChannel;
37  import java.util.List;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.locks.ReentrantLock;
40  
41  import javax.net.ssl.SSLSession;
42  
43  import org.apache.hc.core5.http.ConnectionClosedException;
44  import org.apache.hc.core5.http.ContentLengthStrategy;
45  import org.apache.hc.core5.http.EndpointDetails;
46  import org.apache.hc.core5.http.EntityDetails;
47  import org.apache.hc.core5.http.Header;
48  import org.apache.hc.core5.http.HttpConnection;
49  import org.apache.hc.core5.http.HttpException;
50  import org.apache.hc.core5.http.HttpMessage;
51  import org.apache.hc.core5.http.Message;
52  import org.apache.hc.core5.http.ProtocolVersion;
53  import org.apache.hc.core5.http.config.CharCodingConfig;
54  import org.apache.hc.core5.http.config.Http1Config;
55  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
56  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
57  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
58  import org.apache.hc.core5.http.impl.CharCodingSupport;
59  import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
60  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
61  import org.apache.hc.core5.http.nio.CapacityChannel;
62  import org.apache.hc.core5.http.nio.ContentDecoder;
63  import org.apache.hc.core5.http.nio.ContentEncoder;
64  import org.apache.hc.core5.http.nio.NHttpMessageParser;
65  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
66  import org.apache.hc.core5.http.nio.SessionInputBuffer;
67  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
68  import org.apache.hc.core5.http.nio.command.CommandSupport;
69  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
70  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
71  import org.apache.hc.core5.io.CloseMode;
72  import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
73  import org.apache.hc.core5.reactor.Command;
74  import org.apache.hc.core5.reactor.EventMask;
75  import org.apache.hc.core5.reactor.IOSession;
76  import org.apache.hc.core5.reactor.ProtocolIOSession;
77  import org.apache.hc.core5.reactor.ssl.TlsDetails;
78  import org.apache.hc.core5.util.Args;
79  import org.apache.hc.core5.util.Identifiable;
80  import org.apache.hc.core5.util.Timeout;
81  
82  abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
83          implements Identifiable, HttpConnection {
84  
85      private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
86  
87      private final ProtocolIOSession ioSession;
88      private final Http1Config http1Config;
89      private final SessionInputBufferImpl inbuf;
90      private final SessionOutputBufferImpl outbuf;
91      private final BasicHttpTransportMetrics inTransportMetrics;
92      private final BasicHttpTransportMetrics outTransportMetrics;
93      private final BasicHttpConnectionMetrics connMetrics;
94      private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
95      private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
96      private final ContentLengthStrategy incomingContentStrategy;
97      private final ContentLengthStrategy outgoingContentStrategy;
98      private final ByteBuffer contentBuffer;
99      private final AtomicInteger outputRequests;
100 
101     private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
102     private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
103     private volatile ConnectionState connState;
104     private volatile CapacityWindow capacityWindow;
105 
106     private volatile ProtocolVersion version;
107     private volatile EndpointDetails endpointDetails;
108 
109     AbstractHttp1StreamDuplexer(
110             final ProtocolIOSession ioSession,
111             final Http1Config http1Config,
112             final CharCodingConfig charCodingConfig,
113             final NHttpMessageParser<IncomingMessage> incomingMessageParser,
114             final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
115             final ContentLengthStrategy incomingContentStrategy,
116             final ContentLengthStrategy outgoingContentStrategy) {
117         this.ioSession = Args.notNull(ioSession, "I/O session");
118         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
119         final int bufferSize = this.http1Config.getBufferSize();
120         this.inbuf = new SessionInputBufferImpl(bufferSize, Math.min(bufferSize, 512),
121                 this.http1Config.getMaxLineLength(),
122                 CharCodingSupport.createDecoder(charCodingConfig));
123         this.outbuf = new SessionOutputBufferImpl(bufferSize, Math.min(bufferSize, 512),
124                 CharCodingSupport.createEncoder(charCodingConfig));
125         this.inTransportMetrics = new BasicHttpTransportMetrics();
126         this.outTransportMetrics = new BasicHttpTransportMetrics();
127         this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
128         this.incomingMessageParser = incomingMessageParser;
129         this.outgoingMessageWriter = outgoingMessageWriter;
130         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
131                 DefaultContentLengthStrategy.INSTANCE;
132         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
133                 DefaultContentLengthStrategy.INSTANCE;
134         this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
135         this.outputRequests = new AtomicInteger(0);
136         this.connState = ConnectionState.READY;
137     }
138 
139     @Override
140     public String getId() {
141         return ioSession.getId();
142     }
143 
144     boolean isActive() {
145         return connState == ConnectionState.ACTIVE;
146     }
147 
148     boolean isShuttingDown() {
149         return connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0;
150     }
151 
152     void shutdownSession(final CloseMode closeMode) {
153         if (closeMode == CloseMode.GRACEFUL) {
154             connState = ConnectionState.GRACEFUL_SHUTDOWN;
155             ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
156         } else {
157             connState = ConnectionState.SHUTDOWN;
158             ioSession.close();
159         }
160     }
161 
162     void shutdownSession(final Exception cause) {
163         connState = ConnectionState.SHUTDOWN;
164         try {
165             terminate(cause);
166         } finally {
167             final CloseMode closeMode;
168             if (cause instanceof ConnectionClosedException) {
169                 closeMode = CloseMode.GRACEFUL;
170             } else if (cause instanceof IOException) {
171                 closeMode = CloseMode.IMMEDIATE;
172             } else {
173                 closeMode = CloseMode.GRACEFUL;
174             }
175             ioSession.close(closeMode);
176         }
177     }
178 
179     abstract void disconnected();
180 
181     abstract void terminate(final Exception exception);
182 
183     abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
184 
185     abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
186 
187     abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
188 
189     abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
190 
191     abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
192 
193     abstract ContentDecoder createContentDecoder(
194             long contentLength,
195             ReadableByteChannel channel,
196             SessionInputBuffer buffer,
197             BasicHttpTransportMetrics metrics) throws HttpException;
198 
199     abstract ContentEncoder createContentEncoder(
200             long contentLength,
201             WritableByteChannel channel,
202             SessionOutputBuffer buffer,
203             BasicHttpTransportMetrics metrics) throws HttpException;
204 
205     abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
206 
207     abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
208 
209     abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
210 
211     abstract boolean isOutputReady();
212 
213     abstract void produceOutput() throws HttpException, IOException;
214 
215     abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
216 
217     abstract void inputEnd() throws HttpException, IOException;
218 
219     abstract void outputEnd() throws HttpException, IOException;
220 
221     abstract boolean inputIdle();
222 
223     abstract boolean outputIdle();
224 
225     abstract boolean handleTimeout();
226 
227     private void processCommands() throws HttpException, IOException {
228         for (;;) {
229             final Command command = ioSession.poll();
230             if (command == null) {
231                 return;
232             }
233             if (command instanceof ShutdownCommand) {
234                 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
235                 requestShutdown(shutdownCommand.getType());
236             } else if (command instanceof RequestExecutionCommand) {
237                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
238                     command.cancel();
239                 } else {
240                     execute((RequestExecutionCommand) command);
241                     return;
242                 }
243             } else {
244                 throw new HttpException("Unexpected command: " + command.getClass());
245             }
246         }
247     }
248 
249     public final void onConnect() throws HttpException, IOException {
250         if (connState == ConnectionState.READY) {
251             connState = ConnectionState.ACTIVE;
252             processCommands();
253         }
254     }
255 
256     IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
257         final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
258         if (messageHead != null) {
259             incomingMessageParser.reset();
260         }
261         return messageHead;
262     }
263 
264     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
265         if (src != null) {
266             final int n = src.remaining();
267             inbuf.put(src);
268             inTransportMetrics.incrementBytesTransferred(n);
269         }
270 
271         if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
272             ioSession.clearEvent(SelectionKey.OP_READ);
273             return;
274         }
275 
276         boolean endOfStream = false;
277         if (incomingMessage == null) {
278             final int bytesRead = inbuf.fill(ioSession);
279             if (bytesRead > 0) {
280                 inTransportMetrics.incrementBytesTransferred(bytesRead);
281             }
282             endOfStream = bytesRead == -1;
283         }
284 
285         do {
286             if (incomingMessage == null) {
287 
288                 final IncomingMessage messageHead = parseMessageHead(endOfStream);
289                 if (messageHead != null) {
290                     this.version = messageHead.getVersion();
291 
292                     updateInputMetrics(messageHead, connMetrics);
293                     final ContentDecoder contentDecoder;
294                     if (handleIncomingMessage(messageHead)) {
295                         final long len = incomingContentStrategy.determineLength(messageHead);
296                         contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
297                         consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
298                     } else {
299                         consumeHeader(messageHead, null);
300                         contentDecoder = null;
301                     }
302                     capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
303                     if (contentDecoder != null) {
304                         incomingMessage = new Message<>(messageHead, contentDecoder);
305                     } else {
306                         inputEnd();
307                         if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
308                             ioSession.setEvent(SelectionKey.OP_READ);
309                         }
310                     }
311                 } else {
312                     break;
313                 }
314             }
315 
316             if (incomingMessage != null) {
317                 final ContentDecoder contentDecoder = incomingMessage.getBody();
318 
319                 // At present the consumer can be forced to consume data
320                 // over its declared capacity in order to avoid having
321                 // unprocessed message body content stuck in the session
322                 // input buffer
323                 final int bytesRead = contentDecoder.read(contentBuffer);
324                 if (bytesRead > 0) {
325                     contentBuffer.flip();
326                     consumeData(contentBuffer);
327                     contentBuffer.clear();
328                     final int capacity = capacityWindow.removeCapacity(bytesRead);
329                     if (capacity <= 0) {
330                         if (!contentDecoder.isCompleted()) {
331                             updateCapacity(capacityWindow);
332                         }
333                     }
334                 }
335                 if (contentDecoder.isCompleted()) {
336                     dataEnd(contentDecoder.getTrailers());
337                     capacityWindow.close();
338                     incomingMessage = null;
339                     ioSession.setEvent(SelectionKey.OP_READ);
340                     inputEnd();
341                 } else if (bytesRead == 0) {
342                     break;
343                 }
344             }
345         } while (inbuf.hasData());
346 
347         if (endOfStream && !inbuf.hasData()) {
348             if (outputIdle() && inputIdle()) {
349                 requestShutdown(CloseMode.GRACEFUL);
350             } else {
351                 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
352             }
353         }
354     }
355 
356     public final void onOutput() throws IOException, HttpException {
357         ioSession.getLock().lock();
358         try {
359             if (outbuf.hasData()) {
360                 final int bytesWritten = outbuf.flush(ioSession);
361                 if (bytesWritten > 0) {
362                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
363                 }
364             }
365         } finally {
366             ioSession.getLock().unlock();
367         }
368         if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
369             final int pendingOutputRequests = outputRequests.get();
370             produceOutput();
371             final boolean outputPending = isOutputReady();
372             final boolean outputEnd;
373             ioSession.getLock().lock();
374             try {
375                 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
376                     ioSession.clearEvent(SelectionKey.OP_WRITE);
377                 } else {
378                     outputRequests.addAndGet(-pendingOutputRequests);
379                 }
380                 outputEnd = outgoingMessage == null && !outbuf.hasData();
381             } finally {
382                 ioSession.getLock().unlock();
383             }
384             if (outputEnd) {
385                 outputEnd();
386                 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
387                     processCommands();
388                 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
389                     connState = ConnectionState.SHUTDOWN;
390                 }
391             }
392         }
393         if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
394             ioSession.close();
395         }
396     }
397 
398     public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
399         if (!handleTimeout()) {
400             onException(SocketTimeoutExceptionFactory.create(timeout));
401         }
402     }
403 
404     public final void onException(final Exception ex) {
405         shutdownSession(ex);
406         CommandSupport.failCommands(ioSession, ex);
407     }
408 
409     public final void onDisconnect() {
410         disconnected();
411         CommandSupport.cancelCommands(ioSession);
412     }
413 
414     void requestShutdown(final CloseMode closeMode) {
415         switch (closeMode) {
416             case GRACEFUL:
417                 if (connState == ConnectionState.ACTIVE) {
418                     connState = ConnectionState.GRACEFUL_SHUTDOWN;
419                 }
420                 break;
421             case IMMEDIATE:
422                 connState = ConnectionState.SHUTDOWN;
423                 break;
424         }
425         ioSession.setEvent(SelectionKey.OP_WRITE);
426     }
427 
428     void commitMessageHead(
429             final OutgoingMessage messageHead,
430             final boolean endStream,
431             final FlushMode flushMode) throws HttpException, IOException {
432         ioSession.getLock().lock();
433         try {
434             outgoingMessageWriter.write(messageHead, outbuf);
435             updateOutputMetrics(messageHead, connMetrics);
436             if (!endStream) {
437                 final ContentEncoder contentEncoder;
438                 if (handleOutgoingMessage(messageHead)) {
439                     final long len = outgoingContentStrategy.determineLength(messageHead);
440                     contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
441                 } else {
442                     contentEncoder = null;
443                 }
444                 if (contentEncoder != null) {
445                     outgoingMessage = new Message<>(messageHead, contentEncoder);
446                 }
447             }
448             outgoingMessageWriter.reset();
449             if (flushMode == FlushMode.IMMEDIATE) {
450                 final int bytesWritten = outbuf.flush(ioSession);
451                 if (bytesWritten > 0) {
452                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
453                 }
454             }
455             ioSession.setEvent(EventMask.WRITE);
456         } finally {
457             ioSession.getLock().unlock();
458         }
459     }
460 
461     void requestSessionInput() {
462         ioSession.setEvent(SelectionKey.OP_READ);
463     }
464 
465     void requestSessionOutput() {
466         outputRequests.incrementAndGet();
467         ioSession.setEvent(SelectionKey.OP_WRITE);
468     }
469 
470     Timeout getSessionTimeout() {
471         return ioSession.getSocketTimeout();
472     }
473 
474     void setSessionTimeout(final Timeout timeout) {
475         ioSession.setSocketTimeout(timeout);
476     }
477 
478     void suspendSessionInput() {
479         ioSession.clearEvent(SelectionKey.OP_READ);
480     }
481 
482     void suspendSessionOutput() throws IOException {
483         ioSession.getLock().lock();
484         try {
485             if (outbuf.hasData()) {
486                 final int bytesWritten = outbuf.flush(ioSession);
487                 if (bytesWritten > 0) {
488                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
489                 }
490             } else {
491                 ioSession.clearEvent(SelectionKey.OP_WRITE);
492             }
493         } finally {
494             ioSession.getLock().unlock();
495         }
496     }
497 
498     int streamOutput(final ByteBuffer src) throws IOException {
499         ioSession.getLock().lock();
500         try {
501             if (outgoingMessage == null) {
502                 throw new ClosedChannelException();
503             }
504             final ContentEncoder contentEncoder = outgoingMessage.getBody();
505             final int bytesWritten = contentEncoder.write(src);
506             if (bytesWritten > 0) {
507                 ioSession.setEvent(SelectionKey.OP_WRITE);
508             }
509             return bytesWritten;
510         } finally {
511             ioSession.getLock().unlock();
512         }
513     }
514 
515     enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
516 
517     MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
518         ioSession.getLock().lock();
519         try {
520             if (outgoingMessage == null) {
521                 return MessageDelineation.NONE;
522             }
523             final ContentEncoder contentEncoder = outgoingMessage.getBody();
524             contentEncoder.complete(trailers);
525             ioSession.setEvent(SelectionKey.OP_WRITE);
526             outgoingMessage = null;
527             return contentEncoder instanceof ChunkEncoder
528                             ? MessageDelineation.CHUNK_CODED
529                             : MessageDelineation.MESSAGE_HEAD;
530         } finally {
531             ioSession.getLock().unlock();
532         }
533     }
534 
535     boolean isOutputCompleted() {
536         ioSession.getLock().lock();
537         try {
538             if (outgoingMessage == null) {
539                 return true;
540             }
541             final ContentEncoder contentEncoder = outgoingMessage.getBody();
542             return contentEncoder.isCompleted();
543         } finally {
544             ioSession.getLock().unlock();
545         }
546     }
547 
548     @Override
549     public void close() throws IOException {
550         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
551     }
552 
553     @Override
554     public void close(final CloseMode closeMode) {
555         ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
556     }
557 
558     @Override
559     public boolean isOpen() {
560         return connState == ConnectionState.ACTIVE;
561     }
562 
563     @Override
564     public Timeout getSocketTimeout() {
565         return ioSession.getSocketTimeout();
566     }
567 
568     @Override
569     public void setSocketTimeout(final Timeout timeout) {
570         ioSession.setSocketTimeout(timeout);
571     }
572 
573     @Override
574     public EndpointDetails getEndpointDetails() {
575         if (endpointDetails == null) {
576             endpointDetails = new BasicEndpointDetails(
577                     ioSession.getRemoteAddress(),
578                     ioSession.getLocalAddress(),
579                     connMetrics,
580                     ioSession.getSocketTimeout());
581         }
582         return endpointDetails;
583     }
584 
585     @Override
586     public ProtocolVersion getProtocolVersion() {
587         return version;
588     }
589 
590     @Override
591     public SocketAddress getRemoteAddress() {
592         return ioSession.getRemoteAddress();
593     }
594 
595     @Override
596     public SocketAddress getLocalAddress() {
597         return ioSession.getLocalAddress();
598     }
599 
600     @Override
601     public SSLSession getSSLSession() {
602         final TlsDetails tlsDetails = ioSession.getTlsDetails();
603         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
604     }
605 
606     void appendState(final StringBuilder buf) {
607         buf.append("connState=").append(connState)
608                 .append(", inbuf=").append(inbuf)
609                 .append(", outbuf=").append(outbuf)
610                 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
611     }
612 
613     static class CapacityWindow implements CapacityChannel {
614         private final IOSession ioSession;
615         private final ReentrantLock lock;
616         private int window;
617         private boolean closed;
618 
619         CapacityWindow(final int window, final IOSession ioSession) {
620             this.window = window;
621             this.ioSession = ioSession;
622             this.lock = new ReentrantLock();
623         }
624 
625         @Override
626         public void update(final int increment) throws IOException {
627             lock.lock();
628             try {
629                 if (closed) {
630                     return;
631                 }
632                 if (increment > 0) {
633                     updateWindow(increment);
634                     ioSession.setEvent(SelectionKey.OP_READ);
635                 }
636             } finally {
637                 lock.unlock();
638             }
639         }
640 
641         /**
642          * Internal method for removing capacity. We don't need to check
643          * if this channel is closed in it.
644          */
645         int removeCapacity(final int delta) {
646             lock.lock();
647             try {
648                 updateWindow(-delta);
649                 if (window <= 0) {
650                     ioSession.clearEvent(SelectionKey.OP_READ);
651                 }
652                 return window;
653             } finally {
654                 lock.unlock();
655             }
656         }
657 
658         private void updateWindow(final int delta) {
659             int newValue = window + delta;
660             // Math.addExact
661             if (((window ^ newValue) & (delta ^ newValue)) < 0) {
662                 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
663             }
664             window = newValue;
665         }
666 
667         /**
668          * Closes the capacity channel, preventing user code from accidentally requesting
669          * read events outside of the context of the request the channel was created for
670          */
671         void close() {
672             lock.lock();
673             try {
674                 closed = true;
675             } finally {
676                 lock.unlock();
677             }
678         }
679 
680         // visible for testing
681         int getWindow() {
682             return window;
683         }
684     }
685 }