View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ReadableByteChannel;
34  import java.nio.channels.SelectionKey;
35  import java.nio.channels.WritableByteChannel;
36  import java.util.List;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import javax.net.ssl.SSLSession;
40  
41  import org.apache.hc.core5.http.ConnectionClosedException;
42  import org.apache.hc.core5.http.ContentLengthStrategy;
43  import org.apache.hc.core5.http.EndpointDetails;
44  import org.apache.hc.core5.http.EntityDetails;
45  import org.apache.hc.core5.http.Header;
46  import org.apache.hc.core5.http.HttpConnection;
47  import org.apache.hc.core5.http.HttpException;
48  import org.apache.hc.core5.http.HttpMessage;
49  import org.apache.hc.core5.http.Message;
50  import org.apache.hc.core5.http.ProtocolVersion;
51  import org.apache.hc.core5.http.config.CharCodingConfig;
52  import org.apache.hc.core5.http.config.Http1Config;
53  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
54  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
55  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
56  import org.apache.hc.core5.http.impl.CharCodingSupport;
57  import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
58  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
59  import org.apache.hc.core5.http.nio.CapacityChannel;
60  import org.apache.hc.core5.http.nio.ContentDecoder;
61  import org.apache.hc.core5.http.nio.ContentEncoder;
62  import org.apache.hc.core5.http.nio.NHttpMessageParser;
63  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
64  import org.apache.hc.core5.http.nio.SessionInputBuffer;
65  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
66  import org.apache.hc.core5.http.nio.command.CommandSupport;
67  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
68  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
69  import org.apache.hc.core5.io.CloseMode;
70  import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
71  import org.apache.hc.core5.reactor.Command;
72  import org.apache.hc.core5.reactor.EventMask;
73  import org.apache.hc.core5.reactor.IOSession;
74  import org.apache.hc.core5.reactor.ProtocolIOSession;
75  import org.apache.hc.core5.reactor.ssl.TlsDetails;
76  import org.apache.hc.core5.util.Args;
77  import org.apache.hc.core5.util.Identifiable;
78  import org.apache.hc.core5.util.Timeout;
79  
80  abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
81          implements Identifiable, HttpConnection {
82  
83      private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
84  
85      private final ProtocolIOSession ioSession;
86      private final Http1Config http1Config;
87      private final SessionInputBufferImpl inbuf;
88      private final SessionOutputBufferImpl outbuf;
89      private final BasicHttpTransportMetrics inTransportMetrics;
90      private final BasicHttpTransportMetrics outTransportMetrics;
91      private final BasicHttpConnectionMetrics connMetrics;
92      private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
93      private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
94      private final ContentLengthStrategy incomingContentStrategy;
95      private final ContentLengthStrategy outgoingContentStrategy;
96      private final ByteBuffer contentBuffer;
97      private final AtomicInteger outputRequests;
98  
99      private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
100     private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
101     private volatile ConnectionState connState;
102     private volatile CapacityWindow capacityWindow;
103 
104     private volatile ProtocolVersion version;
105     private volatile EndpointDetails endpointDetails;
106 
107     AbstractHttp1StreamDuplexer(
108             final ProtocolIOSession ioSession,
109             final Http1Config http1Config,
110             final CharCodingConfig charCodingConfig,
111             final NHttpMessageParser<IncomingMessage> incomingMessageParser,
112             final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
113             final ContentLengthStrategy incomingContentStrategy,
114             final ContentLengthStrategy outgoingContentStrategy) {
115         this.ioSession = Args.notNull(ioSession, "I/O session");
116         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
117         final int bufferSize = this.http1Config.getBufferSize();
118         this.inbuf = new SessionInputBufferImpl(bufferSize, Math.min(bufferSize, 512),
119                 this.http1Config.getMaxLineLength(),
120                 CharCodingSupport.createDecoder(charCodingConfig));
121         this.outbuf = new SessionOutputBufferImpl(bufferSize, Math.min(bufferSize, 512),
122                 CharCodingSupport.createEncoder(charCodingConfig));
123         this.inTransportMetrics = new BasicHttpTransportMetrics();
124         this.outTransportMetrics = new BasicHttpTransportMetrics();
125         this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
126         this.incomingMessageParser = incomingMessageParser;
127         this.outgoingMessageWriter = outgoingMessageWriter;
128         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
129                 DefaultContentLengthStrategy.INSTANCE;
130         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
131                 DefaultContentLengthStrategy.INSTANCE;
132         this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
133         this.outputRequests = new AtomicInteger(0);
134         this.connState = ConnectionState.READY;
135     }
136 
137     @Override
138     public String getId() {
139         return ioSession.getId();
140     }
141 
142     boolean isActive() {
143         return connState == ConnectionState.ACTIVE;
144     }
145 
146     boolean isShuttingDown() {
147         return connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0;
148     }
149 
150     void shutdownSession(final CloseMode closeMode) {
151         if (closeMode == CloseMode.GRACEFUL) {
152             connState = ConnectionState.GRACEFUL_SHUTDOWN;
153             ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
154         } else {
155             connState = ConnectionState.SHUTDOWN;
156             ioSession.close();
157         }
158     }
159 
160     void shutdownSession(final Exception cause) {
161         connState = ConnectionState.SHUTDOWN;
162         try {
163             terminate(cause);
164         } finally {
165             final CloseMode closeMode;
166             if (cause instanceof ConnectionClosedException) {
167                 closeMode = CloseMode.GRACEFUL;
168             } else if (cause instanceof IOException) {
169                 closeMode = CloseMode.IMMEDIATE;
170             } else {
171                 closeMode = CloseMode.GRACEFUL;
172             }
173             ioSession.close(closeMode);
174         }
175     }
176 
177     abstract void disconnected();
178 
179     abstract void terminate(final Exception exception);
180 
181     abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
182 
183     abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
184 
185     abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
186 
187     abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
188 
189     abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
190 
191     abstract ContentDecoder createContentDecoder(
192             long contentLength,
193             ReadableByteChannel channel,
194             SessionInputBuffer buffer,
195             BasicHttpTransportMetrics metrics) throws HttpException;
196 
197     abstract ContentEncoder createContentEncoder(
198             long contentLength,
199             WritableByteChannel channel,
200             SessionOutputBuffer buffer,
201             BasicHttpTransportMetrics metrics) throws HttpException;
202 
203     abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
204 
205     abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
206 
207     abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
208 
209     abstract boolean isOutputReady();
210 
211     abstract void produceOutput() throws HttpException, IOException;
212 
213     abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
214 
215     abstract void inputEnd() throws HttpException, IOException;
216 
217     abstract void outputEnd() throws HttpException, IOException;
218 
219     abstract boolean inputIdle();
220 
221     abstract boolean outputIdle();
222 
223     abstract boolean handleTimeout();
224 
225     private void processCommands() throws HttpException, IOException {
226         for (;;) {
227             final Command command = ioSession.poll();
228             if (command == null) {
229                 return;
230             }
231             if (command instanceof ShutdownCommand) {
232                 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
233                 requestShutdown(shutdownCommand.getType());
234             } else if (command instanceof RequestExecutionCommand) {
235                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
236                     command.cancel();
237                 } else {
238                     execute((RequestExecutionCommand) command);
239                     return;
240                 }
241             } else {
242                 throw new HttpException("Unexpected command: " + command.getClass());
243             }
244         }
245     }
246 
247     public final void onConnect() throws HttpException, IOException {
248         if (connState == ConnectionState.READY) {
249             connState = ConnectionState.ACTIVE;
250             processCommands();
251         }
252     }
253 
254     IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
255         final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
256         if (messageHead != null) {
257             incomingMessageParser.reset();
258         }
259         return messageHead;
260     }
261 
262     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
263         if (src != null) {
264             final int n = src.remaining();
265             inbuf.put(src);
266             inTransportMetrics.incrementBytesTransferred(n);
267         }
268 
269         if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
270             ioSession.clearEvent(SelectionKey.OP_READ);
271             return;
272         }
273 
274         boolean endOfStream = false;
275         if (incomingMessage == null) {
276             final int bytesRead = inbuf.fill(ioSession);
277             if (bytesRead > 0) {
278                 inTransportMetrics.incrementBytesTransferred(bytesRead);
279             }
280             endOfStream = bytesRead == -1;
281         }
282 
283         do {
284             if (incomingMessage == null) {
285 
286                 final IncomingMessage messageHead = parseMessageHead(endOfStream);
287                 if (messageHead != null) {
288                     this.version = messageHead.getVersion();
289 
290                     updateInputMetrics(messageHead, connMetrics);
291                     final ContentDecoder contentDecoder;
292                     if (handleIncomingMessage(messageHead)) {
293                         final long len = incomingContentStrategy.determineLength(messageHead);
294                         contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
295                         consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
296                     } else {
297                         consumeHeader(messageHead, null);
298                         contentDecoder = null;
299                     }
300                     capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
301                     if (contentDecoder != null) {
302                         incomingMessage = new Message<>(messageHead, contentDecoder);
303                     } else {
304                         inputEnd();
305                         if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
306                             ioSession.setEvent(SelectionKey.OP_READ);
307                         }
308                     }
309                 } else {
310                     break;
311                 }
312             }
313 
314             if (incomingMessage != null) {
315                 final ContentDecoder contentDecoder = incomingMessage.getBody();
316 
317                 // At present the consumer can be forced to consume data
318                 // over its declared capacity in order to avoid having
319                 // unprocessed message body content stuck in the session
320                 // input buffer
321                 final int bytesRead = contentDecoder.read(contentBuffer);
322                 if (bytesRead > 0) {
323                     contentBuffer.flip();
324                     consumeData(contentBuffer);
325                     contentBuffer.clear();
326                     final int capacity = capacityWindow.removeCapacity(bytesRead);
327                     if (capacity <= 0) {
328                         if (!contentDecoder.isCompleted()) {
329                             updateCapacity(capacityWindow);
330                         }
331                     }
332                 }
333                 if (contentDecoder.isCompleted()) {
334                     dataEnd(contentDecoder.getTrailers());
335                     capacityWindow.close();
336                     incomingMessage = null;
337                     ioSession.setEvent(SelectionKey.OP_READ);
338                     inputEnd();
339                 } else if (bytesRead == 0) {
340                     break;
341                 }
342             }
343         } while (inbuf.hasData());
344 
345         if (endOfStream && !inbuf.hasData()) {
346             if (outputIdle() && inputIdle()) {
347                 requestShutdown(CloseMode.GRACEFUL);
348             } else {
349                 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
350             }
351         }
352     }
353 
354     public final void onOutput() throws IOException, HttpException {
355         ioSession.getLock().lock();
356         try {
357             if (outbuf.hasData()) {
358                 final int bytesWritten = outbuf.flush(ioSession);
359                 if (bytesWritten > 0) {
360                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
361                 }
362             }
363         } finally {
364             ioSession.getLock().unlock();
365         }
366         if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
367             final int pendingOutputRequests = outputRequests.get();
368             produceOutput();
369             final boolean outputPending = isOutputReady();
370             final boolean outputEnd;
371             ioSession.getLock().lock();
372             try {
373                 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
374                     ioSession.clearEvent(SelectionKey.OP_WRITE);
375                 } else {
376                     outputRequests.addAndGet(-pendingOutputRequests);
377                 }
378                 outputEnd = outgoingMessage == null && !outbuf.hasData();
379             } finally {
380                 ioSession.getLock().unlock();
381             }
382             if (outputEnd) {
383                 outputEnd();
384                 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
385                     processCommands();
386                 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
387                     connState = ConnectionState.SHUTDOWN;
388                 }
389             }
390         }
391         if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
392             ioSession.close();
393         }
394     }
395 
396     public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
397         if (!handleTimeout()) {
398             onException(SocketTimeoutExceptionFactory.create(timeout));
399         }
400     }
401 
402     public final void onException(final Exception ex) {
403         shutdownSession(ex);
404         CommandSupport.failCommands(ioSession, ex);
405     }
406 
407     public final void onDisconnect() {
408         disconnected();
409         CommandSupport.cancelCommands(ioSession);
410     }
411 
412     void requestShutdown(final CloseMode closeMode) {
413         switch (closeMode) {
414             case GRACEFUL:
415                 if (connState == ConnectionState.ACTIVE) {
416                     connState = ConnectionState.GRACEFUL_SHUTDOWN;
417                 }
418                 break;
419             case IMMEDIATE:
420                 connState = ConnectionState.SHUTDOWN;
421                 break;
422         }
423         ioSession.setEvent(SelectionKey.OP_WRITE);
424     }
425 
426     void commitMessageHead(
427             final OutgoingMessage messageHead,
428             final boolean endStream,
429             final FlushMode flushMode) throws HttpException, IOException {
430         ioSession.getLock().lock();
431         try {
432             outgoingMessageWriter.write(messageHead, outbuf);
433             updateOutputMetrics(messageHead, connMetrics);
434             if (!endStream) {
435                 final ContentEncoder contentEncoder;
436                 if (handleOutgoingMessage(messageHead)) {
437                     final long len = outgoingContentStrategy.determineLength(messageHead);
438                     contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
439                 } else {
440                     contentEncoder = null;
441                 }
442                 if (contentEncoder != null) {
443                     outgoingMessage = new Message<>(messageHead, contentEncoder);
444                 }
445             }
446             outgoingMessageWriter.reset();
447             if (flushMode == FlushMode.IMMEDIATE) {
448                 final int bytesWritten = outbuf.flush(ioSession);
449                 if (bytesWritten > 0) {
450                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
451                 }
452             }
453             ioSession.setEvent(EventMask.WRITE);
454         } finally {
455             ioSession.getLock().unlock();
456         }
457     }
458 
459     void requestSessionInput() {
460         ioSession.setEvent(SelectionKey.OP_READ);
461     }
462 
463     void requestSessionOutput() {
464         outputRequests.incrementAndGet();
465         ioSession.setEvent(SelectionKey.OP_WRITE);
466     }
467 
468     Timeout getSessionTimeout() {
469         return ioSession.getSocketTimeout();
470     }
471 
472     void setSessionTimeout(final Timeout timeout) {
473         ioSession.setSocketTimeout(timeout);
474     }
475 
476     void suspendSessionInput() {
477         ioSession.clearEvent(SelectionKey.OP_READ);
478     }
479 
480     void suspendSessionOutput() throws IOException {
481         ioSession.getLock().lock();
482         try {
483             if (outbuf.hasData()) {
484                 final int bytesWritten = outbuf.flush(ioSession);
485                 if (bytesWritten > 0) {
486                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
487                 }
488             } else {
489                 ioSession.clearEvent(SelectionKey.OP_WRITE);
490             }
491         } finally {
492             ioSession.getLock().unlock();
493         }
494     }
495 
496     int streamOutput(final ByteBuffer src) throws IOException {
497         ioSession.getLock().lock();
498         try {
499             if (outgoingMessage == null) {
500                 throw new ConnectionClosedException();
501             }
502             final ContentEncoder contentEncoder = outgoingMessage.getBody();
503             final int bytesWritten = contentEncoder.write(src);
504             if (bytesWritten > 0) {
505                 ioSession.setEvent(SelectionKey.OP_WRITE);
506             }
507             return bytesWritten;
508         } finally {
509             ioSession.getLock().unlock();
510         }
511     }
512 
513     enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
514 
515     MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
516         ioSession.getLock().lock();
517         try {
518             if (outgoingMessage == null) {
519                 return MessageDelineation.NONE;
520             }
521             final ContentEncoder contentEncoder = outgoingMessage.getBody();
522             contentEncoder.complete(trailers);
523             ioSession.setEvent(SelectionKey.OP_WRITE);
524             outgoingMessage = null;
525             return contentEncoder instanceof ChunkEncoder
526                             ? MessageDelineation.CHUNK_CODED
527                             : MessageDelineation.MESSAGE_HEAD;
528         } finally {
529             ioSession.getLock().unlock();
530         }
531     }
532 
533     boolean isOutputCompleted() {
534         ioSession.getLock().lock();
535         try {
536             if (outgoingMessage == null) {
537                 return true;
538             }
539             final ContentEncoder contentEncoder = outgoingMessage.getBody();
540             return contentEncoder.isCompleted();
541         } finally {
542             ioSession.getLock().unlock();
543         }
544     }
545 
546     @Override
547     public void close() throws IOException {
548         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
549     }
550 
551     @Override
552     public void close(final CloseMode closeMode) {
553         ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
554     }
555 
556     @Override
557     public boolean isOpen() {
558         return connState == ConnectionState.ACTIVE;
559     }
560 
561     @Override
562     public Timeout getSocketTimeout() {
563         return ioSession.getSocketTimeout();
564     }
565 
566     @Override
567     public void setSocketTimeout(final Timeout timeout) {
568         ioSession.setSocketTimeout(timeout);
569     }
570 
571     @Override
572     public EndpointDetails getEndpointDetails() {
573         if (endpointDetails == null) {
574             endpointDetails = new BasicEndpointDetails(
575                     ioSession.getRemoteAddress(),
576                     ioSession.getLocalAddress(),
577                     connMetrics,
578                     ioSession.getSocketTimeout());
579         }
580         return endpointDetails;
581     }
582 
583     @Override
584     public ProtocolVersion getProtocolVersion() {
585         return version;
586     }
587 
588     @Override
589     public SocketAddress getRemoteAddress() {
590         return ioSession.getRemoteAddress();
591     }
592 
593     @Override
594     public SocketAddress getLocalAddress() {
595         return ioSession.getLocalAddress();
596     }
597 
598     @Override
599     public SSLSession getSSLSession() {
600         final TlsDetails tlsDetails = ioSession.getTlsDetails();
601         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
602     }
603 
604     void appendState(final StringBuilder buf) {
605         buf.append("connState=").append(connState)
606                 .append(", inbuf=").append(inbuf)
607                 .append(", outbuf=").append(outbuf)
608                 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
609     }
610 
611     static class CapacityWindow implements CapacityChannel {
612         private final IOSession ioSession;
613         private final Object lock;
614         private int window;
615         private boolean closed;
616 
617         CapacityWindow(final int window, final IOSession ioSession) {
618             this.window = window;
619             this.ioSession = ioSession;
620             this.lock = new Object();
621         }
622 
623         @Override
624         public void update(final int increment) throws IOException {
625             synchronized (lock) {
626                 if (closed) {
627                     return;
628                 }
629                 if (increment > 0) {
630                     updateWindow(increment);
631                     ioSession.setEvent(SelectionKey.OP_READ);
632                 }
633             }
634         }
635 
636         /**
637          * Internal method for removing capacity. We don't need to check
638          * if this channel is closed in it.
639          */
640         int removeCapacity(final int delta) {
641             synchronized (lock) {
642                 updateWindow(-delta);
643                 if (window <= 0) {
644                     ioSession.clearEvent(SelectionKey.OP_READ);
645                 }
646                 return window;
647             }
648         }
649 
650         private void updateWindow(final int delta) {
651             int newValue = window + delta;
652             // Math.addExact
653             if (((window ^ newValue) & (delta ^ newValue)) < 0) {
654                 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
655             }
656             window = newValue;
657         }
658 
659         /**
660          * Closes the capacity channel, preventing user code from accidentally requesting
661          * read events outside of the context of the request the channel was created for
662          */
663         void close() {
664             synchronized (lock) {
665                 closed = true;
666             }
667         }
668 
669         // visible for testing
670         int getWindow() {
671             return window;
672         }
673     }
674 }