View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ReadableByteChannel;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.WritableByteChannel;
37  import java.util.List;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import javax.net.ssl.SSLSession;
41  
42  import org.apache.hc.core5.http.ConnectionClosedException;
43  import org.apache.hc.core5.http.ContentLengthStrategy;
44  import org.apache.hc.core5.http.EndpointDetails;
45  import org.apache.hc.core5.http.EntityDetails;
46  import org.apache.hc.core5.http.Header;
47  import org.apache.hc.core5.http.HttpConnection;
48  import org.apache.hc.core5.http.HttpException;
49  import org.apache.hc.core5.http.HttpMessage;
50  import org.apache.hc.core5.http.Message;
51  import org.apache.hc.core5.http.ProtocolVersion;
52  import org.apache.hc.core5.http.config.CharCodingConfig;
53  import org.apache.hc.core5.http.config.Http1Config;
54  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
55  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
56  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
57  import org.apache.hc.core5.http.impl.CharCodingSupport;
58  import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
59  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
60  import org.apache.hc.core5.http.nio.CapacityChannel;
61  import org.apache.hc.core5.http.nio.ContentDecoder;
62  import org.apache.hc.core5.http.nio.ContentEncoder;
63  import org.apache.hc.core5.http.nio.NHttpMessageParser;
64  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
65  import org.apache.hc.core5.http.nio.SessionInputBuffer;
66  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
67  import org.apache.hc.core5.http.nio.command.CommandSupport;
68  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
69  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
70  import org.apache.hc.core5.io.CloseMode;
71  import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
72  import org.apache.hc.core5.reactor.Command;
73  import org.apache.hc.core5.reactor.EventMask;
74  import org.apache.hc.core5.reactor.IOSession;
75  import org.apache.hc.core5.reactor.ProtocolIOSession;
76  import org.apache.hc.core5.reactor.ssl.TlsDetails;
77  import org.apache.hc.core5.util.Args;
78  import org.apache.hc.core5.util.Identifiable;
79  import org.apache.hc.core5.util.Timeout;
80  
81  abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
82          implements Identifiable, HttpConnection {
83  
84      private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
85  
86      private final ProtocolIOSession ioSession;
87      private final Http1Config http1Config;
88      private final SessionInputBufferImpl inbuf;
89      private final SessionOutputBufferImpl outbuf;
90      private final BasicHttpTransportMetrics inTransportMetrics;
91      private final BasicHttpTransportMetrics outTransportMetrics;
92      private final BasicHttpConnectionMetrics connMetrics;
93      private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
94      private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
95      private final ContentLengthStrategy incomingContentStrategy;
96      private final ContentLengthStrategy outgoingContentStrategy;
97      private final ByteBuffer contentBuffer;
98      private final AtomicInteger outputRequests;
99  
100     private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
101     private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
102     private volatile ConnectionState connState;
103     private volatile CapacityWindow capacityWindow;
104 
105     private volatile ProtocolVersion version;
106     private volatile EndpointDetails endpointDetails;
107 
108     AbstractHttp1StreamDuplexer(
109             final ProtocolIOSession ioSession,
110             final Http1Config http1Config,
111             final CharCodingConfig charCodingConfig,
112             final NHttpMessageParser<IncomingMessage> incomingMessageParser,
113             final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
114             final ContentLengthStrategy incomingContentStrategy,
115             final ContentLengthStrategy outgoingContentStrategy) {
116         this.ioSession = Args.notNull(ioSession, "I/O session");
117         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
118         final int bufferSize = this.http1Config.getBufferSize();
119         this.inbuf = new SessionInputBufferImpl(bufferSize, Math.min(bufferSize, 512),
120                 this.http1Config.getMaxLineLength(),
121                 CharCodingSupport.createDecoder(charCodingConfig));
122         this.outbuf = new SessionOutputBufferImpl(bufferSize, Math.min(bufferSize, 512),
123                 CharCodingSupport.createEncoder(charCodingConfig));
124         this.inTransportMetrics = new BasicHttpTransportMetrics();
125         this.outTransportMetrics = new BasicHttpTransportMetrics();
126         this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
127         this.incomingMessageParser = incomingMessageParser;
128         this.outgoingMessageWriter = outgoingMessageWriter;
129         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
130                 DefaultContentLengthStrategy.INSTANCE;
131         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
132                 DefaultContentLengthStrategy.INSTANCE;
133         this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
134         this.outputRequests = new AtomicInteger(0);
135         this.connState = ConnectionState.READY;
136     }
137 
138     @Override
139     public String getId() {
140         return ioSession.getId();
141     }
142 
143     boolean isActive() {
144         return connState == ConnectionState.ACTIVE;
145     }
146 
147     boolean isShuttingDown() {
148         return connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0;
149     }
150 
151     void shutdownSession(final CloseMode closeMode) {
152         if (closeMode == CloseMode.GRACEFUL) {
153             connState = ConnectionState.GRACEFUL_SHUTDOWN;
154             ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
155         } else {
156             connState = ConnectionState.SHUTDOWN;
157             ioSession.close();
158         }
159     }
160 
161     void shutdownSession(final Exception cause) {
162         connState = ConnectionState.SHUTDOWN;
163         try {
164             terminate(cause);
165         } finally {
166             final CloseMode closeMode;
167             if (cause instanceof ConnectionClosedException) {
168                 closeMode = CloseMode.GRACEFUL;
169             } else if (cause instanceof IOException) {
170                 closeMode = CloseMode.IMMEDIATE;
171             } else {
172                 closeMode = CloseMode.GRACEFUL;
173             }
174             ioSession.close(closeMode);
175         }
176     }
177 
178     abstract void disconnected();
179 
180     abstract void terminate(final Exception exception);
181 
182     abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
183 
184     abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
185 
186     abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
187 
188     abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
189 
190     abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
191 
192     abstract ContentDecoder createContentDecoder(
193             long contentLength,
194             ReadableByteChannel channel,
195             SessionInputBuffer buffer,
196             BasicHttpTransportMetrics metrics) throws HttpException;
197 
198     abstract ContentEncoder createContentEncoder(
199             long contentLength,
200             WritableByteChannel channel,
201             SessionOutputBuffer buffer,
202             BasicHttpTransportMetrics metrics) throws HttpException;
203 
204     abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
205 
206     abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
207 
208     abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
209 
210     abstract boolean isOutputReady();
211 
212     abstract void produceOutput() throws HttpException, IOException;
213 
214     abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
215 
216     abstract void inputEnd() throws HttpException, IOException;
217 
218     abstract void outputEnd() throws HttpException, IOException;
219 
220     abstract boolean inputIdle();
221 
222     abstract boolean outputIdle();
223 
224     abstract boolean handleTimeout();
225 
226     private void processCommands() throws HttpException, IOException {
227         for (;;) {
228             final Command command = ioSession.poll();
229             if (command == null) {
230                 return;
231             }
232             if (command instanceof ShutdownCommand) {
233                 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
234                 requestShutdown(shutdownCommand.getType());
235             } else if (command instanceof RequestExecutionCommand) {
236                 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
237                     command.cancel();
238                 } else {
239                     execute((RequestExecutionCommand) command);
240                     return;
241                 }
242             } else {
243                 throw new HttpException("Unexpected command: " + command.getClass());
244             }
245         }
246     }
247 
248     public final void onConnect() throws HttpException, IOException {
249         if (connState == ConnectionState.READY) {
250             connState = ConnectionState.ACTIVE;
251             processCommands();
252         }
253     }
254 
255     IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
256         final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
257         if (messageHead != null) {
258             incomingMessageParser.reset();
259         }
260         return messageHead;
261     }
262 
263     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
264         if (src != null) {
265             final int n = src.remaining();
266             inbuf.put(src);
267             inTransportMetrics.incrementBytesTransferred(n);
268         }
269 
270         if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
271             ioSession.clearEvent(SelectionKey.OP_READ);
272             return;
273         }
274 
275         boolean endOfStream = false;
276         if (incomingMessage == null) {
277             final int bytesRead = inbuf.fill(ioSession);
278             if (bytesRead > 0) {
279                 inTransportMetrics.incrementBytesTransferred(bytesRead);
280             }
281             endOfStream = bytesRead == -1;
282         }
283 
284         do {
285             if (incomingMessage == null) {
286 
287                 final IncomingMessage messageHead = parseMessageHead(endOfStream);
288                 if (messageHead != null) {
289                     this.version = messageHead.getVersion();
290 
291                     updateInputMetrics(messageHead, connMetrics);
292                     final ContentDecoder contentDecoder;
293                     if (handleIncomingMessage(messageHead)) {
294                         final long len = incomingContentStrategy.determineLength(messageHead);
295                         contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
296                         consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
297                     } else {
298                         consumeHeader(messageHead, null);
299                         contentDecoder = null;
300                     }
301                     capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
302                     if (contentDecoder != null) {
303                         incomingMessage = new Message<>(messageHead, contentDecoder);
304                     } else {
305                         inputEnd();
306                         if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
307                             ioSession.setEvent(SelectionKey.OP_READ);
308                         }
309                     }
310                 } else {
311                     break;
312                 }
313             }
314 
315             if (incomingMessage != null) {
316                 final ContentDecoder contentDecoder = incomingMessage.getBody();
317 
318                 // At present the consumer can be forced to consume data
319                 // over its declared capacity in order to avoid having
320                 // unprocessed message body content stuck in the session
321                 // input buffer
322                 final int bytesRead = contentDecoder.read(contentBuffer);
323                 if (bytesRead > 0) {
324                     contentBuffer.flip();
325                     consumeData(contentBuffer);
326                     contentBuffer.clear();
327                     final int capacity = capacityWindow.removeCapacity(bytesRead);
328                     if (capacity <= 0) {
329                         if (!contentDecoder.isCompleted()) {
330                             updateCapacity(capacityWindow);
331                         }
332                     }
333                 }
334                 if (contentDecoder.isCompleted()) {
335                     dataEnd(contentDecoder.getTrailers());
336                     capacityWindow.close();
337                     incomingMessage = null;
338                     ioSession.setEvent(SelectionKey.OP_READ);
339                     inputEnd();
340                 } else if (bytesRead == 0) {
341                     break;
342                 }
343             }
344         } while (inbuf.hasData());
345 
346         if (endOfStream && !inbuf.hasData()) {
347             if (outputIdle() && inputIdle()) {
348                 requestShutdown(CloseMode.GRACEFUL);
349             } else {
350                 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
351             }
352         }
353     }
354 
355     public final void onOutput() throws IOException, HttpException {
356         ioSession.getLock().lock();
357         try {
358             if (outbuf.hasData()) {
359                 final int bytesWritten = outbuf.flush(ioSession);
360                 if (bytesWritten > 0) {
361                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
362                 }
363             }
364         } finally {
365             ioSession.getLock().unlock();
366         }
367         if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
368             final int pendingOutputRequests = outputRequests.get();
369             produceOutput();
370             final boolean outputPending = isOutputReady();
371             final boolean outputEnd;
372             ioSession.getLock().lock();
373             try {
374                 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
375                     ioSession.clearEvent(SelectionKey.OP_WRITE);
376                 } else {
377                     outputRequests.addAndGet(-pendingOutputRequests);
378                 }
379                 outputEnd = outgoingMessage == null && !outbuf.hasData();
380             } finally {
381                 ioSession.getLock().unlock();
382             }
383             if (outputEnd) {
384                 outputEnd();
385                 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
386                     processCommands();
387                 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
388                     connState = ConnectionState.SHUTDOWN;
389                 }
390             }
391         }
392         if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
393             ioSession.close();
394         }
395     }
396 
397     public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
398         if (!handleTimeout()) {
399             onException(SocketTimeoutExceptionFactory.create(timeout));
400         }
401     }
402 
403     public final void onException(final Exception ex) {
404         shutdownSession(ex);
405         CommandSupport.failCommands(ioSession, ex);
406     }
407 
408     public final void onDisconnect() {
409         disconnected();
410         CommandSupport.cancelCommands(ioSession);
411     }
412 
413     void requestShutdown(final CloseMode closeMode) {
414         switch (closeMode) {
415             case GRACEFUL:
416                 if (connState == ConnectionState.ACTIVE) {
417                     connState = ConnectionState.GRACEFUL_SHUTDOWN;
418                 }
419                 break;
420             case IMMEDIATE:
421                 connState = ConnectionState.SHUTDOWN;
422                 break;
423         }
424         ioSession.setEvent(SelectionKey.OP_WRITE);
425     }
426 
427     void commitMessageHead(
428             final OutgoingMessage messageHead,
429             final boolean endStream,
430             final FlushMode flushMode) throws HttpException, IOException {
431         ioSession.getLock().lock();
432         try {
433             outgoingMessageWriter.write(messageHead, outbuf);
434             updateOutputMetrics(messageHead, connMetrics);
435             if (!endStream) {
436                 final ContentEncoder contentEncoder;
437                 if (handleOutgoingMessage(messageHead)) {
438                     final long len = outgoingContentStrategy.determineLength(messageHead);
439                     contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
440                 } else {
441                     contentEncoder = null;
442                 }
443                 if (contentEncoder != null) {
444                     outgoingMessage = new Message<>(messageHead, contentEncoder);
445                 }
446             }
447             outgoingMessageWriter.reset();
448             if (flushMode == FlushMode.IMMEDIATE) {
449                 final int bytesWritten = outbuf.flush(ioSession);
450                 if (bytesWritten > 0) {
451                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
452                 }
453             }
454             ioSession.setEvent(EventMask.WRITE);
455         } finally {
456             ioSession.getLock().unlock();
457         }
458     }
459 
460     void requestSessionInput() {
461         ioSession.setEvent(SelectionKey.OP_READ);
462     }
463 
464     void requestSessionOutput() {
465         outputRequests.incrementAndGet();
466         ioSession.setEvent(SelectionKey.OP_WRITE);
467     }
468 
469     Timeout getSessionTimeout() {
470         return ioSession.getSocketTimeout();
471     }
472 
473     void setSessionTimeout(final Timeout timeout) {
474         ioSession.setSocketTimeout(timeout);
475     }
476 
477     void suspendSessionInput() {
478         ioSession.clearEvent(SelectionKey.OP_READ);
479     }
480 
481     void suspendSessionOutput() throws IOException {
482         ioSession.getLock().lock();
483         try {
484             if (outbuf.hasData()) {
485                 final int bytesWritten = outbuf.flush(ioSession);
486                 if (bytesWritten > 0) {
487                     outTransportMetrics.incrementBytesTransferred(bytesWritten);
488                 }
489             } else {
490                 ioSession.clearEvent(SelectionKey.OP_WRITE);
491             }
492         } finally {
493             ioSession.getLock().unlock();
494         }
495     }
496 
497     int streamOutput(final ByteBuffer src) throws IOException {
498         ioSession.getLock().lock();
499         try {
500             if (outgoingMessage == null) {
501                 throw new ClosedChannelException();
502             }
503             final ContentEncoder contentEncoder = outgoingMessage.getBody();
504             final int bytesWritten = contentEncoder.write(src);
505             if (bytesWritten > 0) {
506                 ioSession.setEvent(SelectionKey.OP_WRITE);
507             }
508             return bytesWritten;
509         } finally {
510             ioSession.getLock().unlock();
511         }
512     }
513 
514     enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
515 
516     MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
517         ioSession.getLock().lock();
518         try {
519             if (outgoingMessage == null) {
520                 return MessageDelineation.NONE;
521             }
522             final ContentEncoder contentEncoder = outgoingMessage.getBody();
523             contentEncoder.complete(trailers);
524             ioSession.setEvent(SelectionKey.OP_WRITE);
525             outgoingMessage = null;
526             return contentEncoder instanceof ChunkEncoder
527                             ? MessageDelineation.CHUNK_CODED
528                             : MessageDelineation.MESSAGE_HEAD;
529         } finally {
530             ioSession.getLock().unlock();
531         }
532     }
533 
534     boolean isOutputCompleted() {
535         ioSession.getLock().lock();
536         try {
537             if (outgoingMessage == null) {
538                 return true;
539             }
540             final ContentEncoder contentEncoder = outgoingMessage.getBody();
541             return contentEncoder.isCompleted();
542         } finally {
543             ioSession.getLock().unlock();
544         }
545     }
546 
547     @Override
548     public void close() throws IOException {
549         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
550     }
551 
552     @Override
553     public void close(final CloseMode closeMode) {
554         ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
555     }
556 
557     @Override
558     public boolean isOpen() {
559         return connState == ConnectionState.ACTIVE;
560     }
561 
562     @Override
563     public Timeout getSocketTimeout() {
564         return ioSession.getSocketTimeout();
565     }
566 
567     @Override
568     public void setSocketTimeout(final Timeout timeout) {
569         ioSession.setSocketTimeout(timeout);
570     }
571 
572     @Override
573     public EndpointDetails getEndpointDetails() {
574         if (endpointDetails == null) {
575             endpointDetails = new BasicEndpointDetails(
576                     ioSession.getRemoteAddress(),
577                     ioSession.getLocalAddress(),
578                     connMetrics,
579                     ioSession.getSocketTimeout());
580         }
581         return endpointDetails;
582     }
583 
584     @Override
585     public ProtocolVersion getProtocolVersion() {
586         return version;
587     }
588 
589     @Override
590     public SocketAddress getRemoteAddress() {
591         return ioSession.getRemoteAddress();
592     }
593 
594     @Override
595     public SocketAddress getLocalAddress() {
596         return ioSession.getLocalAddress();
597     }
598 
599     @Override
600     public SSLSession getSSLSession() {
601         final TlsDetails tlsDetails = ioSession.getTlsDetails();
602         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
603     }
604 
605     void appendState(final StringBuilder buf) {
606         buf.append("connState=").append(connState)
607                 .append(", inbuf=").append(inbuf)
608                 .append(", outbuf=").append(outbuf)
609                 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
610     }
611 
612     static class CapacityWindow implements CapacityChannel {
613         private final IOSession ioSession;
614         private final Object lock;
615         private int window;
616         private boolean closed;
617 
618         CapacityWindow(final int window, final IOSession ioSession) {
619             this.window = window;
620             this.ioSession = ioSession;
621             this.lock = new Object();
622         }
623 
624         @Override
625         public void update(final int increment) throws IOException {
626             synchronized (lock) {
627                 if (closed) {
628                     return;
629                 }
630                 if (increment > 0) {
631                     updateWindow(increment);
632                     ioSession.setEvent(SelectionKey.OP_READ);
633                 }
634             }
635         }
636 
637         /**
638          * Internal method for removing capacity. We don't need to check
639          * if this channel is closed in it.
640          */
641         int removeCapacity(final int delta) {
642             synchronized (lock) {
643                 updateWindow(-delta);
644                 if (window <= 0) {
645                     ioSession.clearEvent(SelectionKey.OP_READ);
646                 }
647                 return window;
648             }
649         }
650 
651         private void updateWindow(final int delta) {
652             int newValue = window + delta;
653             // Math.addExact
654             if (((window ^ newValue) & (delta ^ newValue)) < 0) {
655                 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
656             }
657             window = newValue;
658         }
659 
660         /**
661          * Closes the capacity channel, preventing user code from accidentally requesting
662          * read events outside of the context of the request the channel was created for
663          */
664         void close() {
665             synchronized (lock) {
666                 closed = true;
667             }
668         }
669 
670         // visible for testing
671         int getWindow() {
672             return window;
673         }
674     }
675 }