View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ReadableByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.WritableByteChannel;
37  import java.util.List;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import javax.net.ssl.SSLSession;
41  
42  import org.apache.hc.core5.http.ConnectionClosedException;
43  import org.apache.hc.core5.http.ContentLengthStrategy;
44  import org.apache.hc.core5.http.EndpointDetails;
45  import org.apache.hc.core5.http.EntityDetails;
46  import org.apache.hc.core5.http.Header;
47  import org.apache.hc.core5.http.HttpConnection;
48  import org.apache.hc.core5.http.HttpException;
49  import org.apache.hc.core5.http.HttpMessage;
50  import org.apache.hc.core5.http.Message;
51  import org.apache.hc.core5.http.ProtocolVersion;
52  import org.apache.hc.core5.http.config.CharCodingConfig;
53  import org.apache.hc.core5.http.config.Http1Config;
54  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
55  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
56  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
57  import org.apache.hc.core5.http.impl.CharCodingSupport;
58  import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
59  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
60  import org.apache.hc.core5.http.nio.CapacityChannel;
61  import org.apache.hc.core5.http.nio.ContentDecoder;
62  import org.apache.hc.core5.http.nio.ContentEncoder;
63  import org.apache.hc.core5.http.nio.NHttpMessageParser;
64  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
65  import org.apache.hc.core5.http.nio.SessionInputBuffer;
66  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
67  import org.apache.hc.core5.http.nio.command.CommandSupport;
68  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
69  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
70  import org.apache.hc.core5.io.CloseMode;
71  import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
72  import org.apache.hc.core5.reactor.Command;
73  import org.apache.hc.core5.reactor.EventMask;
74  import org.apache.hc.core5.reactor.IOSession;
75  import org.apache.hc.core5.reactor.ProtocolIOSession;
76  import org.apache.hc.core5.reactor.ssl.TlsDetails;
77  import org.apache.hc.core5.util.Args;
78  import org.apache.hc.core5.util.Identifiable;
79  import org.apache.hc.core5.util.Timeout;
80  
81  abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
82          implements Identifiable, HttpConnection {
83  
84      private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
85  
86      private final ProtocolIOSession ioSession;
87      private final Http1Config http1Config;
88      private final SessionInputBufferImpl inbuf;
89      private final SessionOutputBufferImpl outbuf;
90      private final BasicHttpTransportMetrics inTransportMetrics;
91      private final BasicHttpTransportMetrics outTransportMetrics;
92      private final BasicHttpConnectionMetrics connMetrics;
93      private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
94      private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
95      private final ContentLengthStrategy incomingContentStrategy;
96      private final ContentLengthStrategy outgoingContentStrategy;
97      private final ByteBuffer contentBuffer;
98      private final AtomicInteger outputRequests;
99  
100     private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
101     private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
102     private volatile ConnectionState connState;
103     private volatile CapacityWindow capacityWindow;
104 
105     private volatile ProtocolVersion version;
106     private volatile EndpointDetails endpointDetails;
107 
108     AbstractHttp1StreamDuplexer(
109             final ProtocolIOSession ioSession,
110             final Http1Config http1Config,
111             final CharCodingConfig charCodingConfig,
112             final NHttpMessageParser<IncomingMessage> incomingMessageParser,
113             final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
114             final ContentLengthStrategy incomingContentStrategy,
115             final ContentLengthStrategy outgoingContentStrategy) {
116         this.ioSession = Args.notNull(ioSession, "I/O session");
117         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
118         final int bufferSize = this.http1Config.getBufferSize();
119         this.inbuf = new SessionInputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
120                 this.http1Config.getMaxLineLength(),
121                 CharCodingSupport.createDecoder(charCodingConfig));
122         this.outbuf = new SessionOutputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
123                 CharCodingSupport.createEncoder(charCodingConfig));
124         this.inTransportMetrics = new BasicHttpTransportMetrics();
125         this.outTransportMetrics = new BasicHttpTransportMetrics();
126         this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
127         this.incomingMessageParser = incomingMessageParser;
128         this.outgoingMessageWriter = outgoingMessageWriter;
129         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
130                 DefaultContentLengthStrategy.INSTANCE;
131         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
132                 DefaultContentLengthStrategy.INSTANCE;
133         this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
134         this.outputRequests = new AtomicInteger(0);
135         this.connState = ConnectionState.READY;
136     }
137 
138     @Override
139     public String getId() {
140         return ioSession.getId();
141     }
142 
143     void shutdownSession(final CloseMode closeMode) {
144         if (closeMode == CloseMode.GRACEFUL) {
145             connState = ConnectionState.GRACEFUL_SHUTDOWN;
146             ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
147         } else {
148             connState = ConnectionState.SHUTDOWN;
149             ioSession.close();
150         }
151     }
152 
153     void shutdownSession(final Exception cause) {
154         connState = ConnectionState.SHUTDOWN;
155         try {
156             terminate(cause);
157         } finally {
158             final CloseMode closeMode;
159             if (cause instanceof ConnectionClosedException) {
160                 closeMode = CloseMode.GRACEFUL;
161             } else if (cause instanceof IOException) {
162                 closeMode = CloseMode.IMMEDIATE;
163             } else {
164                 closeMode = CloseMode.GRACEFUL;
165             }
166             ioSession.close(closeMode);
167         }
168     }
169 
170     abstract void disconnected();
171 
172     abstract void terminate(final Exception exception);
173 
174     abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
175 
176     abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
177 
178     abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
179 
180     abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
181 
182     abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
183 
184     abstract ContentDecoder createContentDecoder(
185             long contentLength,
186             ReadableByteChannel channel,
187             SessionInputBuffer buffer,
188             BasicHttpTransportMetrics metrics) throws HttpException;
189 
190     abstract ContentEncoder createContentEncoder(
191             long contentLength,
192             WritableByteChannel channel,
193             SessionOutputBuffer buffer,
194             BasicHttpTransportMetrics metrics) throws HttpException;
195 
196     abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
197 
198     abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
199 
200     abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
201 
202     abstract boolean isOutputReady();
203 
204     abstract void produceOutput() throws HttpException, IOException;
205 
206     abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
207 
208     abstract void inputEnd() throws HttpException, IOException;
209 
210     abstract void outputEnd() throws HttpException, IOException;
211 
212     abstract boolean inputIdle();
213 
214     abstract boolean outputIdle();
215 
216     abstract boolean handleTimeout();
217 
218     private void processCommands() throws HttpException, IOException {
219         for (;;) {
220             final Command command = ioSession.poll();
221             if (command == null) {
222                 return;
223             }
224             if (command instanceof ShutdownCommand) {
225                 final ShutdownCommand../org/apache/hc/core5/http/nio/command/ShutdownCommand.html#ShutdownCommand">ShutdownCommand shutdownCommand = (ShutdownCommand) command;
226                 requestShutdown(shutdownCommand.getType());
227             } else if (command instanceof RequestExecutionCommand) {
228                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
229                     command.cancel();
230                 } else {
231                     execute((RequestExecutionCommand) command);
232                     return;
233                 }
234             } else {
235                 throw new HttpException("Unexpected command: " + command.getClass());
236             }
237         }
238     }
239 
240     public final void onConnect() throws HttpException, IOException {
241         connState = ConnectionState.ACTIVE;
242         processCommands();
243     }
244 
245     IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
246         final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
247         if (messageHead != null) {
248             incomingMessageParser.reset();
249         }
250         return messageHead;
251     }
252 
253     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
254         if (src != null) {
255             inbuf.put(src);
256         }
257 
258         if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
259             ioSession.clearEvent(SelectionKey.OP_READ);
260             return;
261         }
262 
263         boolean endOfStream = false;
264         if (incomingMessage == null) {
265             final int bytesRead = inbuf.fill(ioSession);
266             if (bytesRead > 0) {
267                 inTransportMetrics.incrementBytesTransferred(bytesRead);
268             }
269             endOfStream = bytesRead == -1;
270         }
271 
272         do {
273             if (incomingMessage == null) {
274 
275                 final IncomingMessage messageHead = parseMessageHead(endOfStream);
276                 if (messageHead != null) {
277                     this.version = messageHead.getVersion();
278 
279                     updateInputMetrics(messageHead, connMetrics);
280                     final ContentDecoder contentDecoder;
281                     if (handleIncomingMessage(messageHead)) {
282                         final long len = incomingContentStrategy.determineLength(messageHead);
283                         contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
284                         consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
285                     } else {
286                         consumeHeader(messageHead, null);
287                         contentDecoder = null;
288                     }
289                     capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
290                     if (contentDecoder != null) {
291                         incomingMessage = new Message<>(messageHead, contentDecoder);
292                     } else {
293                         inputEnd();
294                         if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
295                             ioSession.setEvent(SelectionKey.OP_READ);
296                         }
297                     }
298                 } else {
299                     break;
300                 }
301             }
302 
303             if (incomingMessage != null) {
304                 final ContentDecoder contentDecoder = incomingMessage.getBody();
305 
306                 // At present the consumer can be forced to consume data
307                 // over its declared capacity in order to avoid having
308                 // unprocessed message body content stuck in the session
309                 // input buffer
310                 final int bytesRead = contentDecoder.read(contentBuffer);
311                 if (bytesRead > 0) {
312                     contentBuffer.flip();
313                     consumeData(contentBuffer);
314                     contentBuffer.clear();
315                     final int capacity = capacityWindow.removeCapacity(bytesRead);
316                     if (capacity <= 0) {
317                         if (!contentDecoder.isCompleted()) {
318                             updateCapacity(capacityWindow);
319                         }
320                     }
321                 }
322                 if (contentDecoder.isCompleted()) {
323                     dataEnd(contentDecoder.getTrailers());
324                     capacityWindow.close();
325                     incomingMessage = null;
326                     ioSession.setEvent(SelectionKey.OP_READ);
327                     inputEnd();
328                 }
329                 if (bytesRead == 0) {
330                     break;
331                 }
332             }
333         } while (inbuf.hasData());
334 
335         if (endOfStream && !inbuf.hasData()) {
336             if (outputIdle() && inputIdle()) {
337                 requestShutdown(CloseMode.GRACEFUL);
338             } else {
339                 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
340             }
341         }
342     }
343 
344     public final void onOutput() throws IOException, HttpException {
345         ioSession.getLock().lock();
346         try {
347             if (outbuf.hasData()) {
348                 final int bytesWritten = outbuf.flush(ioSession);
349                 if (bytesWritten > 0) {
350                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
351                 }
352             }
353         } finally {
354             ioSession.getLock().unlock();
355         }
356         if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
357             final int pendingOutputRequests = outputRequests.get();
358             produceOutput();
359             final boolean outputPending = isOutputReady();
360             final boolean outputEnd;
361             ioSession.getLock().lock();
362             try {
363                 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
364                     ioSession.clearEvent(SelectionKey.OP_WRITE);
365                 } else {
366                     outputRequests.addAndGet(-pendingOutputRequests);
367                 }
368                 outputEnd = outgoingMessage == null && !outbuf.hasData();
369             } finally {
370                 ioSession.getLock().unlock();
371             }
372             if (outputEnd) {
373                 outputEnd();
374                 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
375                     processCommands();
376                 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
377                     connState = ConnectionState.SHUTDOWN;
378                 }
379             }
380         }
381         if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
382             ioSession.close();
383         }
384     }
385 
386     public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
387         if (!handleTimeout()) {
388             onException(SocketTimeoutExceptionFactory.create(timeout));
389         }
390     }
391 
392     public final void onException(final Exception ex) {
393         shutdownSession(ex);
394         CommandSupport.failCommands(ioSession, ex);
395     }
396 
397     public final void onDisconnect() {
398         disconnected();
399         CommandSupport.cancelCommands(ioSession);
400     }
401 
402     void requestShutdown(final CloseMode closeMode) {
403         switch (closeMode) {
404             case GRACEFUL:
405                 if (connState == ConnectionState.ACTIVE) {
406                     connState = ConnectionState.GRACEFUL_SHUTDOWN;
407                 }
408                 break;
409             case IMMEDIATE:
410                 connState = ConnectionState.SHUTDOWN;
411                 break;
412         }
413         ioSession.setEvent(SelectionKey.OP_WRITE);
414     }
415 
416     void commitMessageHead(
417             final OutgoingMessage messageHead,
418             final boolean endStream,
419             final FlushMode flushMode) throws HttpException, IOException {
420         ioSession.getLock().lock();
421         try {
422             outgoingMessageWriter.write(messageHead, outbuf);
423             updateOutputMetrics(messageHead, connMetrics);
424             if (!endStream) {
425                 final ContentEncoder contentEncoder;
426                 if (handleOutgoingMessage(messageHead)) {
427                     final long len = outgoingContentStrategy.determineLength(messageHead);
428                     contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
429                 } else {
430                     contentEncoder = null;
431                 }
432                 if (contentEncoder != null) {
433                     outgoingMessage = new Message<>(messageHead, contentEncoder);
434                 }
435             }
436             outgoingMessageWriter.reset();
437             if (flushMode == FlushMode.IMMEDIATE) {
438                 outbuf.flush(ioSession);
439             }
440             ioSession.setEvent(EventMask.WRITE);
441         } finally {
442             ioSession.getLock().unlock();
443         }
444     }
445 
446     void requestSessionInput() {
447         ioSession.setEvent(SelectionKey.OP_READ);
448     }
449 
450     void requestSessionOutput() {
451         outputRequests.incrementAndGet();
452         ioSession.setEvent(SelectionKey.OP_WRITE);
453     }
454 
455     Timeout getSessionTimeout() {
456         return ioSession.getSocketTimeout();
457     }
458 
459     void setSessionTimeout(final Timeout timeout) {
460         ioSession.setSocketTimeout(timeout);
461     }
462 
463     void suspendSessionInput() {
464         ioSession.clearEvent(SelectionKey.OP_READ);
465     }
466 
467     void suspendSessionOutput() throws IOException {
468         ioSession.getLock().lock();
469         try {
470             if (outbuf.hasData()) {
471                 outbuf.flush(ioSession);
472             } else {
473                 ioSession.clearEvent(SelectionKey.OP_WRITE);
474             }
475         } finally {
476             ioSession.getLock().unlock();
477         }
478     }
479 
480     int streamOutput(final ByteBuffer src) throws IOException {
481         ioSession.getLock().lock();
482         try {
483             if (outgoingMessage == null) {
484                 throw new ClosedChannelException();
485             }
486             final ContentEncoder contentEncoder = outgoingMessage.getBody();
487             final int bytesWritten = contentEncoder.write(src);
488             if (bytesWritten > 0) {
489                 ioSession.setEvent(SelectionKey.OP_WRITE);
490             }
491             return bytesWritten;
492         } finally {
493             ioSession.getLock().unlock();
494         }
495     }
496 
497     enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
498 
499     MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
500         ioSession.getLock().lock();
501         try {
502             if (outgoingMessage == null) {
503                 return MessageDelineation.NONE;
504             }
505             final ContentEncoder contentEncoder = outgoingMessage.getBody();
506             contentEncoder.complete(trailers);
507             ioSession.setEvent(SelectionKey.OP_WRITE);
508             outgoingMessage = null;
509             return contentEncoder instanceof ChunkEncoder
510                             ? MessageDelineation.CHUNK_CODED
511                             : MessageDelineation.MESSAGE_HEAD;
512         } finally {
513             ioSession.getLock().unlock();
514         }
515     }
516 
517     boolean isOutputCompleted() {
518         ioSession.getLock().lock();
519         try {
520             if (outgoingMessage == null) {
521                 return true;
522             }
523             final ContentEncoder contentEncoder = outgoingMessage.getBody();
524             return contentEncoder.isCompleted();
525         } finally {
526             ioSession.getLock().unlock();
527         }
528     }
529 
530     @Override
531     public void close() throws IOException {
532         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
533     }
534 
535     @Override
536     public void close(final CloseMode closeMode) {
537         ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
538     }
539 
540     @Override
541     public boolean isOpen() {
542         return connState == ConnectionState.ACTIVE;
543     }
544 
545     @Override
546     public Timeout getSocketTimeout() {
547         return ioSession.getSocketTimeout();
548     }
549 
550     @Override
551     public void setSocketTimeout(final Timeout timeout) {
552         ioSession.setSocketTimeout(timeout);
553     }
554 
555     @Override
556     public EndpointDetails getEndpointDetails() {
557         if (endpointDetails == null) {
558             endpointDetails = new BasicEndpointDetails(
559                     ioSession.getRemoteAddress(),
560                     ioSession.getLocalAddress(),
561                     connMetrics,
562                     ioSession.getSocketTimeout());
563         }
564         return endpointDetails;
565     }
566 
567     @Override
568     public ProtocolVersion getProtocolVersion() {
569         return version;
570     }
571 
572     @Override
573     public SocketAddress getRemoteAddress() {
574         return ioSession.getRemoteAddress();
575     }
576 
577     @Override
578     public SocketAddress getLocalAddress() {
579         return ioSession.getLocalAddress();
580     }
581 
582     @Override
583     public SSLSession getSSLSession() {
584         final TlsDetails tlsDetails = ioSession.getTlsDetails();
585         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
586     }
587 
588     void appendState(final StringBuilder buf) {
589         buf.append("connState=").append(connState)
590                 .append(", inbuf=").append(inbuf)
591                 .append(", outbuf=").append(outbuf)
592                 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
593     }
594 
595     static class CapacityWindow implements CapacityChannel {
596         private final IOSession ioSession;
597         private final Object lock;
598         private int window;
599         private boolean closed;
600 
601         CapacityWindow(final int window, final IOSession ioSession) {
602             this.window = window;
603             this.ioSession = ioSession;
604             this.lock = new Object();
605         }
606 
607         @Override
608         public void update(final int increment) throws IOException {
609             synchronized (lock) {
610                 if (closed) {
611                     return;
612                 }
613                 if (increment > 0) {
614                     updateWindow(increment);
615                     ioSession.setEvent(SelectionKey.OP_READ);
616                 }
617             }
618         }
619 
620         /**
621          * Internal method for removing capacity. We don't need to check
622          * if this channel is closed in it.
623          */
624         int removeCapacity(final int delta) {
625             synchronized (lock) {
626                 updateWindow(-delta);
627                 if (window <= 0) {
628                     ioSession.clearEvent(SelectionKey.OP_READ);
629                 }
630                 return window;
631             }
632         }
633 
634         private void updateWindow(final int delta) {
635             int newValue = window + delta;
636             // Math.addExact
637             if (((window ^ newValue) & (delta ^ newValue)) < 0) {
638                 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
639             }
640             window = newValue;
641         }
642 
643         /**
644          * Closes the capacity channel, preventing user code from accidentally requesting
645          * read events outside of the context of the request the channel was created for
646          */
647         void close() {
648             synchronized (lock) {
649                 closed = true;
650             }
651         }
652 
653         // visible for testing
654         int getWindow() {
655             return window;
656         }
657     }
658 }