View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ReadableByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.WritableByteChannel;
37  import java.util.List;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import javax.net.ssl.SSLSession;
41  
42  import org.apache.hc.core5.http.ConnectionClosedException;
43  import org.apache.hc.core5.http.ContentLengthStrategy;
44  import org.apache.hc.core5.http.EndpointDetails;
45  import org.apache.hc.core5.http.EntityDetails;
46  import org.apache.hc.core5.http.Header;
47  import org.apache.hc.core5.http.HttpConnection;
48  import org.apache.hc.core5.http.HttpException;
49  import org.apache.hc.core5.http.HttpMessage;
50  import org.apache.hc.core5.http.Message;
51  import org.apache.hc.core5.http.ProtocolVersion;
52  import org.apache.hc.core5.http.config.CharCodingConfig;
53  import org.apache.hc.core5.http.config.Http1Config;
54  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
55  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
56  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
57  import org.apache.hc.core5.http.impl.CharCodingSupport;
58  import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
59  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
60  import org.apache.hc.core5.http.nio.CapacityChannel;
61  import org.apache.hc.core5.http.nio.ContentDecoder;
62  import org.apache.hc.core5.http.nio.ContentEncoder;
63  import org.apache.hc.core5.http.nio.NHttpMessageParser;
64  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
65  import org.apache.hc.core5.http.nio.SessionInputBuffer;
66  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
67  import org.apache.hc.core5.http.nio.command.CommandSupport;
68  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
69  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
70  import org.apache.hc.core5.io.CloseMode;
71  import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
72  import org.apache.hc.core5.reactor.Command;
73  import org.apache.hc.core5.reactor.EventMask;
74  import org.apache.hc.core5.reactor.IOSession;
75  import org.apache.hc.core5.reactor.ProtocolIOSession;
76  import org.apache.hc.core5.reactor.ssl.TlsDetails;
77  import org.apache.hc.core5.util.Args;
78  import org.apache.hc.core5.util.Identifiable;
79  import org.apache.hc.core5.util.Timeout;
80  
81  abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
82          implements Identifiable, HttpConnection {
83  
84      private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
85  
86      private final ProtocolIOSession ioSession;
87      private final Http1Config http1Config;
88      private final SessionInputBufferImpl inbuf;
89      private final SessionOutputBufferImpl outbuf;
90      private final BasicHttpTransportMetrics inTransportMetrics;
91      private final BasicHttpTransportMetrics outTransportMetrics;
92      private final BasicHttpConnectionMetrics connMetrics;
93      private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
94      private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
95      private final ContentLengthStrategy incomingContentStrategy;
96      private final ContentLengthStrategy outgoingContentStrategy;
97      private final ByteBuffer contentBuffer;
98      private final AtomicInteger outputRequests;
99  
100     private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
101     private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
102     private volatile ConnectionState connState;
103     private volatile CapacityWindow capacityWindow;
104 
105     private volatile ProtocolVersion version;
106     private volatile EndpointDetails endpointDetails;
107 
108     AbstractHttp1StreamDuplexer(
109             final ProtocolIOSession ioSession,
110             final Http1Config http1Config,
111             final CharCodingConfig charCodingConfig,
112             final NHttpMessageParser<IncomingMessage> incomingMessageParser,
113             final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
114             final ContentLengthStrategy incomingContentStrategy,
115             final ContentLengthStrategy outgoingContentStrategy) {
116         this.ioSession = Args.notNull(ioSession, "I/O session");
117         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
118         final int bufferSize = this.http1Config.getBufferSize();
119         this.inbuf = new SessionInputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
120                 this.http1Config.getMaxLineLength(),
121                 CharCodingSupport.createDecoder(charCodingConfig));
122         this.outbuf = new SessionOutputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
123                 CharCodingSupport.createEncoder(charCodingConfig));
124         this.inTransportMetrics = new BasicHttpTransportMetrics();
125         this.outTransportMetrics = new BasicHttpTransportMetrics();
126         this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
127         this.incomingMessageParser = incomingMessageParser;
128         this.outgoingMessageWriter = outgoingMessageWriter;
129         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
130                 DefaultContentLengthStrategy.INSTANCE;
131         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
132                 DefaultContentLengthStrategy.INSTANCE;
133         this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
134         this.outputRequests = new AtomicInteger(0);
135         this.connState = ConnectionState.READY;
136     }
137 
138     @Override
139     public String getId() {
140         return ioSession.getId();
141     }
142 
143     void shutdownSession(final CloseMode closeMode) {
144         if (closeMode == CloseMode.GRACEFUL) {
145             connState = ConnectionState.GRACEFUL_SHUTDOWN;
146             ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
147         } else {
148             connState = ConnectionState.SHUTDOWN;
149             ioSession.close();
150         }
151     }
152 
153     void shutdownSession(final Exception cause) {
154         connState = ConnectionState.SHUTDOWN;
155         try {
156             terminate(cause);
157         } finally {
158             final CloseMode closeMode;
159             if (cause instanceof ConnectionClosedException) {
160                 closeMode = CloseMode.GRACEFUL;
161             } else if (cause instanceof IOException) {
162                 closeMode = CloseMode.IMMEDIATE;
163             } else {
164                 closeMode = CloseMode.GRACEFUL;
165             }
166             ioSession.close(closeMode);
167         }
168     }
169 
170     abstract void disconnected();
171 
172     abstract void terminate(final Exception exception);
173 
174     abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
175 
176     abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
177 
178     abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
179 
180     abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
181 
182     abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
183 
184     abstract ContentDecoder createContentDecoder(
185             long contentLength,
186             ReadableByteChannel channel,
187             SessionInputBuffer buffer,
188             BasicHttpTransportMetrics metrics) throws HttpException;
189 
190     abstract ContentEncoder createContentEncoder(
191             long contentLength,
192             WritableByteChannel channel,
193             SessionOutputBuffer buffer,
194             BasicHttpTransportMetrics metrics) throws HttpException;
195 
196     abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
197 
198     abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
199 
200     abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
201 
202     abstract boolean isOutputReady();
203 
204     abstract void produceOutput() throws HttpException, IOException;
205 
206     abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
207 
208     abstract void inputEnd() throws HttpException, IOException;
209 
210     abstract void outputEnd() throws HttpException, IOException;
211 
212     abstract boolean inputIdle();
213 
214     abstract boolean outputIdle();
215 
216     abstract boolean handleTimeout();
217 
218     private void processCommands() throws HttpException, IOException {
219         for (;;) {
220             final Command command = ioSession.poll();
221             if (command == null) {
222                 return;
223             }
224             if (command instanceof ShutdownCommand) {
225                 final ShutdownCommand../org/apache/hc/core5/http/nio/command/ShutdownCommand.html#ShutdownCommand">ShutdownCommand shutdownCommand = (ShutdownCommand) command;
226                 requestShutdown(shutdownCommand.getType());
227             } else if (command instanceof RequestExecutionCommand) {
228                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
229                     command.cancel();
230                 } else {
231                     execute((RequestExecutionCommand) command);
232                     return;
233                 }
234             } else {
235                 throw new HttpException("Unexpected command: " + command.getClass());
236             }
237         }
238     }
239 
240     public final void onConnect() throws HttpException, IOException {
241         if (connState == ConnectionState.READY) {
242             connState = ConnectionState.ACTIVE;
243             processCommands();
244         }
245     }
246 
247     IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
248         final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
249         if (messageHead != null) {
250             incomingMessageParser.reset();
251         }
252         return messageHead;
253     }
254 
255     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
256         if (src != null) {
257             inbuf.put(src);
258         }
259 
260         if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
261             ioSession.clearEvent(SelectionKey.OP_READ);
262             return;
263         }
264 
265         boolean endOfStream = false;
266         if (incomingMessage == null) {
267             final int bytesRead = inbuf.fill(ioSession);
268             if (bytesRead > 0) {
269                 inTransportMetrics.incrementBytesTransferred(bytesRead);
270             }
271             endOfStream = bytesRead == -1;
272         }
273 
274         do {
275             if (incomingMessage == null) {
276 
277                 final IncomingMessage messageHead = parseMessageHead(endOfStream);
278                 if (messageHead != null) {
279                     this.version = messageHead.getVersion();
280 
281                     updateInputMetrics(messageHead, connMetrics);
282                     final ContentDecoder contentDecoder;
283                     if (handleIncomingMessage(messageHead)) {
284                         final long len = incomingContentStrategy.determineLength(messageHead);
285                         contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
286                         consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
287                     } else {
288                         consumeHeader(messageHead, null);
289                         contentDecoder = null;
290                     }
291                     capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
292                     if (contentDecoder != null) {
293                         incomingMessage = new Message<>(messageHead, contentDecoder);
294                     } else {
295                         inputEnd();
296                         if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
297                             ioSession.setEvent(SelectionKey.OP_READ);
298                         }
299                     }
300                 } else {
301                     break;
302                 }
303             }
304 
305             if (incomingMessage != null) {
306                 final ContentDecoder contentDecoder = incomingMessage.getBody();
307 
308                 // At present the consumer can be forced to consume data
309                 // over its declared capacity in order to avoid having
310                 // unprocessed message body content stuck in the session
311                 // input buffer
312                 final int bytesRead = contentDecoder.read(contentBuffer);
313                 if (bytesRead > 0) {
314                     contentBuffer.flip();
315                     consumeData(contentBuffer);
316                     contentBuffer.clear();
317                     final int capacity = capacityWindow.removeCapacity(bytesRead);
318                     if (capacity <= 0) {
319                         if (!contentDecoder.isCompleted()) {
320                             updateCapacity(capacityWindow);
321                         }
322                     }
323                 }
324                 if (contentDecoder.isCompleted()) {
325                     dataEnd(contentDecoder.getTrailers());
326                     capacityWindow.close();
327                     incomingMessage = null;
328                     ioSession.setEvent(SelectionKey.OP_READ);
329                     inputEnd();
330                 }
331                 if (bytesRead == 0) {
332                     break;
333                 }
334             }
335         } while (inbuf.hasData());
336 
337         if (endOfStream && !inbuf.hasData()) {
338             if (outputIdle() && inputIdle()) {
339                 requestShutdown(CloseMode.GRACEFUL);
340             } else {
341                 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
342             }
343         }
344     }
345 
346     public final void onOutput() throws IOException, HttpException {
347         ioSession.getLock().lock();
348         try {
349             if (outbuf.hasData()) {
350                 final int bytesWritten = outbuf.flush(ioSession);
351                 if (bytesWritten > 0) {
352                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
353                 }
354             }
355         } finally {
356             ioSession.getLock().unlock();
357         }
358         if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
359             final int pendingOutputRequests = outputRequests.get();
360             produceOutput();
361             final boolean outputPending = isOutputReady();
362             final boolean outputEnd;
363             ioSession.getLock().lock();
364             try {
365                 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
366                     ioSession.clearEvent(SelectionKey.OP_WRITE);
367                 } else {
368                     outputRequests.addAndGet(-pendingOutputRequests);
369                 }
370                 outputEnd = outgoingMessage == null && !outbuf.hasData();
371             } finally {
372                 ioSession.getLock().unlock();
373             }
374             if (outputEnd) {
375                 outputEnd();
376                 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
377                     processCommands();
378                 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
379                     connState = ConnectionState.SHUTDOWN;
380                 }
381             }
382         }
383         if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
384             ioSession.close();
385         }
386     }
387 
388     public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
389         if (!handleTimeout()) {
390             onException(SocketTimeoutExceptionFactory.create(timeout));
391         }
392     }
393 
394     public final void onException(final Exception ex) {
395         shutdownSession(ex);
396         CommandSupport.failCommands(ioSession, ex);
397     }
398 
399     public final void onDisconnect() {
400         disconnected();
401         CommandSupport.cancelCommands(ioSession);
402     }
403 
404     void requestShutdown(final CloseMode closeMode) {
405         switch (closeMode) {
406             case GRACEFUL:
407                 if (connState == ConnectionState.ACTIVE) {
408                     connState = ConnectionState.GRACEFUL_SHUTDOWN;
409                 }
410                 break;
411             case IMMEDIATE:
412                 connState = ConnectionState.SHUTDOWN;
413                 break;
414         }
415         ioSession.setEvent(SelectionKey.OP_WRITE);
416     }
417 
418     void commitMessageHead(
419             final OutgoingMessage messageHead,
420             final boolean endStream,
421             final FlushMode flushMode) throws HttpException, IOException {
422         ioSession.getLock().lock();
423         try {
424             outgoingMessageWriter.write(messageHead, outbuf);
425             updateOutputMetrics(messageHead, connMetrics);
426             if (!endStream) {
427                 final ContentEncoder contentEncoder;
428                 if (handleOutgoingMessage(messageHead)) {
429                     final long len = outgoingContentStrategy.determineLength(messageHead);
430                     contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
431                 } else {
432                     contentEncoder = null;
433                 }
434                 if (contentEncoder != null) {
435                     outgoingMessage = new Message<>(messageHead, contentEncoder);
436                 }
437             }
438             outgoingMessageWriter.reset();
439             if (flushMode == FlushMode.IMMEDIATE) {
440                 outbuf.flush(ioSession);
441             }
442             ioSession.setEvent(EventMask.WRITE);
443         } finally {
444             ioSession.getLock().unlock();
445         }
446     }
447 
448     void requestSessionInput() {
449         ioSession.setEvent(SelectionKey.OP_READ);
450     }
451 
452     void requestSessionOutput() {
453         outputRequests.incrementAndGet();
454         ioSession.setEvent(SelectionKey.OP_WRITE);
455     }
456 
457     Timeout getSessionTimeout() {
458         return ioSession.getSocketTimeout();
459     }
460 
461     void setSessionTimeout(final Timeout timeout) {
462         ioSession.setSocketTimeout(timeout);
463     }
464 
465     void suspendSessionInput() {
466         ioSession.clearEvent(SelectionKey.OP_READ);
467     }
468 
469     void suspendSessionOutput() throws IOException {
470         ioSession.getLock().lock();
471         try {
472             if (outbuf.hasData()) {
473                 outbuf.flush(ioSession);
474             } else {
475                 ioSession.clearEvent(SelectionKey.OP_WRITE);
476             }
477         } finally {
478             ioSession.getLock().unlock();
479         }
480     }
481 
482     int streamOutput(final ByteBuffer src) throws IOException {
483         ioSession.getLock().lock();
484         try {
485             if (outgoingMessage == null) {
486                 throw new ClosedChannelException();
487             }
488             final ContentEncoder contentEncoder = outgoingMessage.getBody();
489             final int bytesWritten = contentEncoder.write(src);
490             if (bytesWritten > 0) {
491                 ioSession.setEvent(SelectionKey.OP_WRITE);
492             }
493             return bytesWritten;
494         } finally {
495             ioSession.getLock().unlock();
496         }
497     }
498 
499     enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
500 
501     MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
502         ioSession.getLock().lock();
503         try {
504             if (outgoingMessage == null) {
505                 return MessageDelineation.NONE;
506             }
507             final ContentEncoder contentEncoder = outgoingMessage.getBody();
508             contentEncoder.complete(trailers);
509             ioSession.setEvent(SelectionKey.OP_WRITE);
510             outgoingMessage = null;
511             return contentEncoder instanceof ChunkEncoder
512                             ? MessageDelineation.CHUNK_CODED
513                             : MessageDelineation.MESSAGE_HEAD;
514         } finally {
515             ioSession.getLock().unlock();
516         }
517     }
518 
519     boolean isOutputCompleted() {
520         ioSession.getLock().lock();
521         try {
522             if (outgoingMessage == null) {
523                 return true;
524             }
525             final ContentEncoder contentEncoder = outgoingMessage.getBody();
526             return contentEncoder.isCompleted();
527         } finally {
528             ioSession.getLock().unlock();
529         }
530     }
531 
532     @Override
533     public void close() throws IOException {
534         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
535     }
536 
537     @Override
538     public void close(final CloseMode closeMode) {
539         ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
540     }
541 
542     @Override
543     public boolean isOpen() {
544         return connState == ConnectionState.ACTIVE;
545     }
546 
547     @Override
548     public Timeout getSocketTimeout() {
549         return ioSession.getSocketTimeout();
550     }
551 
552     @Override
553     public void setSocketTimeout(final Timeout timeout) {
554         ioSession.setSocketTimeout(timeout);
555     }
556 
557     @Override
558     public EndpointDetails getEndpointDetails() {
559         if (endpointDetails == null) {
560             endpointDetails = new BasicEndpointDetails(
561                     ioSession.getRemoteAddress(),
562                     ioSession.getLocalAddress(),
563                     connMetrics,
564                     ioSession.getSocketTimeout());
565         }
566         return endpointDetails;
567     }
568 
569     @Override
570     public ProtocolVersion getProtocolVersion() {
571         return version;
572     }
573 
574     @Override
575     public SocketAddress getRemoteAddress() {
576         return ioSession.getRemoteAddress();
577     }
578 
579     @Override
580     public SocketAddress getLocalAddress() {
581         return ioSession.getLocalAddress();
582     }
583 
584     @Override
585     public SSLSession getSSLSession() {
586         final TlsDetails tlsDetails = ioSession.getTlsDetails();
587         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
588     }
589 
590     void appendState(final StringBuilder buf) {
591         buf.append("connState=").append(connState)
592                 .append(", inbuf=").append(inbuf)
593                 .append(", outbuf=").append(outbuf)
594                 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
595     }
596 
597     static class CapacityWindow implements CapacityChannel {
598         private final IOSession ioSession;
599         private final Object lock;
600         private int window;
601         private boolean closed;
602 
603         CapacityWindow(final int window, final IOSession ioSession) {
604             this.window = window;
605             this.ioSession = ioSession;
606             this.lock = new Object();
607         }
608 
609         @Override
610         public void update(final int increment) throws IOException {
611             synchronized (lock) {
612                 if (closed) {
613                     return;
614                 }
615                 if (increment > 0) {
616                     updateWindow(increment);
617                     ioSession.setEvent(SelectionKey.OP_READ);
618                 }
619             }
620         }
621 
622         /**
623          * Internal method for removing capacity. We don't need to check
624          * if this channel is closed in it.
625          */
626         int removeCapacity(final int delta) {
627             synchronized (lock) {
628                 updateWindow(-delta);
629                 if (window <= 0) {
630                     ioSession.clearEvent(SelectionKey.OP_READ);
631                 }
632                 return window;
633             }
634         }
635 
636         private void updateWindow(final int delta) {
637             int newValue = window + delta;
638             // Math.addExact
639             if (((window ^ newValue) & (delta ^ newValue)) < 0) {
640                 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
641             }
642             window = newValue;
643         }
644 
645         /**
646          * Closes the capacity channel, preventing user code from accidentally requesting
647          * read events outside of the context of the request the channel was created for
648          */
649         void close() {
650             synchronized (lock) {
651                 closed = true;
652             }
653         }
654 
655         // visible for testing
656         int getWindow() {
657             return window;
658         }
659     }
660 }