View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import javax.net.ssl.SSLSession;
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.SelectionKey;
34  import java.nio.charset.CharacterCodingException;
35  import java.util.Deque;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Queue;
40  import java.util.concurrent.CancellationException;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.ConcurrentLinkedDeque;
43  import java.util.concurrent.ConcurrentLinkedQueue;
44  import java.util.concurrent.atomic.AtomicInteger;
45  
46  import org.apache.hc.core5.concurrent.Cancellable;
47  import org.apache.hc.core5.concurrent.CancellableDependency;
48  import org.apache.hc.core5.http.ConnectionClosedException;
49  import org.apache.hc.core5.http.EndpointDetails;
50  import org.apache.hc.core5.http.Header;
51  import org.apache.hc.core5.http.HttpConnection;
52  import org.apache.hc.core5.http.HttpException;
53  import org.apache.hc.core5.http.HttpStreamResetException;
54  import org.apache.hc.core5.http.HttpVersion;
55  import org.apache.hc.core5.http.ProtocolException;
56  import org.apache.hc.core5.http.ProtocolVersion;
57  import org.apache.hc.core5.http.config.CharCodingConfig;
58  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
59  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
60  import org.apache.hc.core5.http.impl.CharCodingSupport;
61  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
62  import org.apache.hc.core5.http.nio.AsyncPushProducer;
63  import org.apache.hc.core5.http.nio.HandlerFactory;
64  import org.apache.hc.core5.http.nio.command.ExecutableCommand;
65  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
66  import org.apache.hc.core5.http.protocol.HttpCoreContext;
67  import org.apache.hc.core5.http.protocol.HttpProcessor;
68  import org.apache.hc.core5.http2.H2ConnectionException;
69  import org.apache.hc.core5.http2.H2Error;
70  import org.apache.hc.core5.http2.H2StreamResetException;
71  import org.apache.hc.core5.http2.config.H2Config;
72  import org.apache.hc.core5.http2.config.H2Param;
73  import org.apache.hc.core5.http2.config.H2Setting;
74  import org.apache.hc.core5.http2.frame.FrameFactory;
75  import org.apache.hc.core5.http2.frame.FrameFlag;
76  import org.apache.hc.core5.http2.frame.FrameType;
77  import org.apache.hc.core5.http2.frame.RawFrame;
78  import org.apache.hc.core5.http2.frame.StreamIdGenerator;
79  import org.apache.hc.core5.http2.hpack.HPackDecoder;
80  import org.apache.hc.core5.http2.hpack.HPackEncoder;
81  import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
82  import org.apache.hc.core5.http2.nio.AsyncPingHandler;
83  import org.apache.hc.core5.http2.nio.command.PingCommand;
84  import org.apache.hc.core5.io.CloseMode;
85  import org.apache.hc.core5.reactor.Command;
86  import org.apache.hc.core5.reactor.ProtocolIOSession;
87  import org.apache.hc.core5.reactor.ssl.TlsDetails;
88  import org.apache.hc.core5.util.Args;
89  import org.apache.hc.core5.util.ByteArrayBuffer;
90  import org.apache.hc.core5.util.Identifiable;
91  import org.apache.hc.core5.util.Timeout;
92  
93  abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
94  
95      private static final long LINGER_TIME = 1000; // 1 second
96      private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; // 10 MiB
97  
98      enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
99      enum SettingsHandshake { READY, TRANSMITTED, ACKED }
100 
101     private final ProtocolIOSession ioSession;
102     private final FrameFactory frameFactory;
103     private final StreamIdGenerator idGenerator;
104     private final HttpProcessor httpProcessor;
105     private final H2Config localConfig;
106     private final BasicH2TransportMetrics inputMetrics;
107     private final BasicH2TransportMetrics outputMetrics;
108     private final BasicHttpConnectionMetrics connMetrics;
109     private final FrameInputBuffer inputBuffer;
110     private final FrameOutputBuffer outputBuffer;
111     private final Deque<RawFrame> outputQueue;
112     private final HPackEncoder hPackEncoder;
113     private final HPackDecoder hPackDecoder;
114     private final Map<Integer, H2Stream> streamMap;
115     private final Queue<AsyncPingHandler> pingHandlers;
116     private final AtomicInteger connInputWindow;
117     private final AtomicInteger connOutputWindow;
118     private final AtomicInteger outputRequests;
119     private final AtomicInteger lastStreamId;
120     private final H2StreamListener streamListener;
121 
122     private ConnectionHandshake connState = ConnectionHandshake.READY;
123     private SettingsHandshake localSettingState = SettingsHandshake.READY;
124     private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
125 
126     private int initInputWinSize;
127     private int initOutputWinSize;
128     private int lowMark;
129 
130     private volatile H2Config remoteConfig;
131 
132     private Continuation continuation;
133 
134     private int processedRemoteStreamId;
135     private EndpointDetails endpointDetails;
136 
137     AbstractH2StreamMultiplexer(
138             final ProtocolIOSession ioSession,
139             final FrameFactory frameFactory,
140             final StreamIdGenerator idGenerator,
141             final HttpProcessor httpProcessor,
142             final CharCodingConfig charCodingConfig,
143             final H2Config h2Config,
144             final H2StreamListener streamListener) {
145         this.ioSession = Args.notNull(ioSession, "IO session");
146         this.frameFactory = Args.notNull(frameFactory, "Frame factory");
147         this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
148         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
149         this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
150         this.inputMetrics = new BasicH2TransportMetrics();
151         this.outputMetrics = new BasicH2TransportMetrics();
152         this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
153         this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
154         this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
155         this.outputQueue = new ConcurrentLinkedDeque<>();
156         this.pingHandlers = new ConcurrentLinkedQueue<>();
157         this.outputRequests = new AtomicInteger(0);
158         this.lastStreamId = new AtomicInteger(0);
159         this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
160         this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
161         this.streamMap = new ConcurrentHashMap<>();
162         this.remoteConfig = H2Config.INIT;
163         this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
164         this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
165 
166         this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
167         this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
168 
169         this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
170         this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
171         this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
172 
173         this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
174         this.streamListener = streamListener;
175     }
176 
177     @Override
178     public String getId() {
179         return ioSession.getId();
180     }
181 
182     abstract void acceptHeaderFrame() throws H2ConnectionException;
183 
184     abstract void acceptPushRequest() throws H2ConnectionException;
185 
186     abstract void acceptPushFrame() throws H2ConnectionException;
187 
188     abstract H2StreamHandler createRemotelyInitiatedStream(
189             H2StreamChannel channel,
190             HttpProcessor httpProcessor,
191             BasicHttpConnectionMetrics connMetrics,
192             HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
193 
194     abstract H2StreamHandler createLocallyInitiatedStream(
195             ExecutableCommand command,
196             H2StreamChannel channel,
197             HttpProcessor httpProcessor,
198             BasicHttpConnectionMetrics connMetrics) throws IOException;
199 
200     private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
201         for (;;) {
202             final int current = window.get();
203             long newValue = (long) current + delta;
204 
205             //TODO: work-around for what looks like a bug in Ngnix (1.11)
206             // Tolerate if the update window exceeded by one
207             if (newValue == 0x80000000L) {
208                 newValue = Integer.MAX_VALUE;
209             }
210             //TODO: needs to be removed
211 
212             if (Math.abs(newValue) > 0x7fffffffL) {
213                 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
214             }
215             if (window.compareAndSet(current, (int) newValue)) {
216                 return (int) newValue;
217             }
218         }
219     }
220 
221     private int updateInputWindow(
222             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
223         final int newSize = updateWindow(window, delta);
224         if (streamListener != null) {
225             streamListener.onInputFlowControl(this, streamId, delta, newSize);
226         }
227         return newSize;
228     }
229 
230     private int updateOutputWindow(
231             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
232         final int newSize = updateWindow(window, delta);
233         if (streamListener != null) {
234             streamListener.onOutputFlowControl(this, streamId, delta, newSize);
235         }
236         return newSize;
237     }
238 
239     private void commitFrameInternal(final RawFrame frame) throws IOException {
240         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
241             if (streamListener != null) {
242                 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
243             }
244             outputBuffer.write(frame, ioSession);
245         } else {
246             outputQueue.addLast(frame);
247         }
248         ioSession.setEvent(SelectionKey.OP_WRITE);
249     }
250 
251     private void commitFrame(final RawFrame frame) throws IOException {
252         Args.notNull(frame, "Frame");
253         ioSession.getLock().lock();
254         try {
255             commitFrameInternal(frame);
256         } finally {
257             ioSession.getLock().unlock();
258         }
259     }
260 
261     private void commitHeaders(
262             final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
263         if (streamListener != null) {
264             streamListener.onHeaderOutput(this, streamId, headers);
265         }
266         final ByteArrayBufferByteArrayBuffer.html#ByteArrayBuffer">ByteArrayBuffer buf = new ByteArrayBuffer(512);
267         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
268 
269         int off = 0;
270         int remaining = buf.length();
271         boolean continuation = false;
272 
273         while (remaining > 0) {
274             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
275             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
276 
277             remaining -= chunk;
278             off += chunk;
279 
280             final boolean endHeaders = remaining == 0;
281             final RawFrame frame;
282             if (!continuation) {
283                 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
284                 continuation = true;
285             } else {
286                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
287             }
288             commitFrameInternal(frame);
289         }
290     }
291 
292     private void commitPushPromise(
293             final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
294         if (headers == null || headers.isEmpty()) {
295             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
296         }
297         if (streamListener != null) {
298             streamListener.onHeaderOutput(this, streamId, headers);
299         }
300         final ByteArrayBufferByteArrayBuffer.html#ByteArrayBuffer">ByteArrayBuffer buf = new ByteArrayBuffer(512);
301         buf.append((byte)(promisedStreamId >> 24));
302         buf.append((byte)(promisedStreamId >> 16));
303         buf.append((byte)(promisedStreamId >> 8));
304         buf.append((byte)(promisedStreamId));
305 
306         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
307 
308         int off = 0;
309         int remaining = buf.length();
310         boolean continuation = false;
311 
312         while (remaining > 0) {
313             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
314             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
315 
316             remaining -= chunk;
317             off += chunk;
318 
319             final boolean endHeaders = remaining == 0;
320             final RawFrame frame;
321             if (!continuation) {
322                 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
323                 continuation = true;
324             } else {
325                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
326             }
327             commitFrameInternal(frame);
328         }
329     }
330 
331     private void streamDataFrame(
332             final int streamId,
333             final AtomicInteger streamOutputWindow,
334             final ByteBuffer payload,
335             final int chunk) throws IOException {
336         final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
337         if (streamListener != null) {
338             streamListener.onFrameOutput(this, streamId, dataFrame);
339         }
340         updateOutputWindow(0, connOutputWindow, -chunk);
341         updateOutputWindow(streamId, streamOutputWindow, -chunk);
342         outputBuffer.write(dataFrame, ioSession);
343     }
344 
345     private int streamData(
346             final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
347         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
348             final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
349             if (capacity <= 0) {
350                 return 0;
351             }
352             final int maxPayloadSize = Math.min(capacity, remoteConfig.getMaxFrameSize());
353             final int chunk;
354             if (payload.remaining() <= maxPayloadSize) {
355                 chunk = payload.remaining();
356                 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
357             } else {
358                 chunk = maxPayloadSize;
359                 final int originalLimit = payload.limit();
360                 try {
361                     payload.limit(payload.position() + chunk);
362                     streamDataFrame(streamId, streamOutputWindow, payload, chunk);
363                 } finally {
364                     payload.limit(originalLimit);
365                 }
366             }
367             payload.position(payload.position() + chunk);
368             ioSession.setEvent(SelectionKey.OP_WRITE);
369             return chunk;
370         }
371         return 0;
372     }
373 
374     private void incrementInputCapacity(
375             final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
376         if (inputCapacity > 0) {
377             final int streamWinSize = inputWindow.get();
378             final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
379             final int chunk = Math.min(inputCapacity, remainingCapacity);
380             if (chunk != 0) {
381                 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
382                 commitFrame(windowUpdateFrame);
383                 updateInputWindow(streamId, inputWindow, chunk);
384             }
385         }
386     }
387 
388     private void requestSessionOutput() {
389         outputRequests.incrementAndGet();
390         ioSession.setEvent(SelectionKey.OP_WRITE);
391     }
392 
393     private void updateLastStreamId(final int streamId) {
394         final int currentId = lastStreamId.get();
395         if (streamId > currentId) {
396             lastStreamId.compareAndSet(currentId, streamId);
397         }
398     }
399 
400     private int generateStreamId() {
401         for (;;) {
402             final int currentId = lastStreamId.get();
403             final int newStreamId = idGenerator.generate(currentId);
404             if (lastStreamId.compareAndSet(currentId, newStreamId)) {
405                 return newStreamId;
406             }
407         }
408     }
409 
410     public final void onConnect() throws HttpException, IOException {
411         connState = ConnectionHandshake.ACTIVE;
412         final RawFrame settingsFrame = frameFactory.createSettings(
413                 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
414                 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
415                 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
416                 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
417                 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
418                 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
419 
420         commitFrame(settingsFrame);
421         localSettingState = SettingsHandshake.TRANSMITTED;
422         maximizeConnWindow(connInputWindow.get());
423 
424         if (streamListener != null) {
425             final int initInputWindow = connInputWindow.get();
426             streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
427             final int initOutputWindow = connOutputWindow.get();
428             streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
429         }
430     }
431 
432     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
433         if (connState == ConnectionHandshake.SHUTDOWN) {
434             ioSession.clearEvent(SelectionKey.OP_READ);
435         } else {
436             if (src != null) {
437                 inputBuffer.put(src);
438             }
439             RawFrame frame;
440             while ((frame = inputBuffer.read(ioSession)) != null) {
441                 if (streamListener != null) {
442                     streamListener.onFrameInput(this, frame.getStreamId(), frame);
443                 }
444                 consumeFrame(frame);
445             }
446         }
447     }
448 
449     public final void onOutput() throws HttpException, IOException {
450         ioSession.getLock().lock();
451         try {
452             if (!outputBuffer.isEmpty()) {
453                 outputBuffer.flush(ioSession);
454             }
455             while (outputBuffer.isEmpty()) {
456                 final RawFrame frame = outputQueue.poll();
457                 if (frame != null) {
458                     if (streamListener != null) {
459                         streamListener.onFrameOutput(this, frame.getStreamId(), frame);
460                     }
461                     outputBuffer.write(frame, ioSession);
462                 } else {
463                     break;
464                 }
465             }
466         } finally {
467             ioSession.getLock().unlock();
468         }
469 
470         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
471 
472             if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
473                 produceOutput();
474             }
475             final int pendingOutputRequests = outputRequests.get();
476             boolean outputPending = false;
477             if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
478                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
479                     final Map.Entry<Integer, H2Stream> entry = it.next();
480                     final H2Stream stream = entry.getValue();
481                     if (!stream.isLocalClosed()
482                             && stream.getOutputWindow().get() > 0
483                             && stream.isOutputReady()) {
484                         outputPending = true;
485                         break;
486                     }
487                 }
488             }
489             ioSession.getLock().lock();
490             try {
491                 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
492                         && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
493                     ioSession.clearEvent(SelectionKey.OP_WRITE);
494                 } else {
495                     outputRequests.addAndGet(-pendingOutputRequests);
496                 }
497             } finally {
498                 ioSession.getLock().unlock();
499             }
500         }
501 
502         if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
503             processPendingCommands();
504         }
505         if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
506             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
507                 final Map.Entry<Integer, H2Stream> entry = it.next();
508                 final H2Stream stream = entry.getValue();
509                 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
510                     stream.releaseResources();
511                     it.remove();
512                 }
513             }
514             if (streamMap.isEmpty()) {
515                 connState = ConnectionHandshake.SHUTDOWN;
516             }
517         }
518         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
519             ioSession.getLock().lock();
520             try {
521                 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
522                     ioSession.close();
523                 }
524             } finally {
525                 ioSession.getLock().unlock();
526             }
527         }
528     }
529 
530     public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
531         connState = ConnectionHandshake.SHUTDOWN;
532 
533         final RawFrame goAway;
534         if (localSettingState != SettingsHandshake.ACKED) {
535             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
536                             "Setting timeout (" + timeout + ")");
537         } else {
538             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
539                             "Timeout due to inactivity (" + timeout + ")");
540         }
541         commitFrame(goAway);
542         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
543             final Map.Entry<Integer, H2Stream> entry = it.next();
544             final H2Stream stream = entry.getValue();
545             stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
546         }
547         streamMap.clear();
548     }
549 
550     public final void onDisconnect() {
551         for (;;) {
552             final AsyncPingHandler pingHandler = pingHandlers.poll();
553             if (pingHandler != null) {
554                 pingHandler.cancel();
555             } else {
556                 break;
557             }
558         }
559         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
560             final Map.Entry<Integer, H2Stream> entry = it.next();
561             final H2Stream stream = entry.getValue();
562             stream.cancel();
563         }
564         for (;;) {
565             final Command command = ioSession.poll();
566             if (command != null) {
567                 if (command instanceof ExecutableCommand) {
568                     ((ExecutableCommand) command).failed(new ConnectionClosedException());
569                 } else {
570                     command.cancel();
571                 }
572             } else {
573                 break;
574             }
575         }
576     }
577 
578     private void processPendingCommands() throws IOException, HttpException {
579         while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
580             final Command command = ioSession.poll();
581             if (command == null) {
582                 break;
583             }
584             if (command instanceof ShutdownCommand) {
585                 final ShutdownCommand../org/apache/hc/core5/http/nio/command/ShutdownCommand.html#ShutdownCommand">ShutdownCommand shutdownCommand = (ShutdownCommand) command;
586                 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
587                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
588                         final Map.Entry<Integer, H2Stream> entry = it.next();
589                         final H2Stream stream = entry.getValue();
590                         stream.cancel();
591                     }
592                     streamMap.clear();
593                     connState = ConnectionHandshake.SHUTDOWN;
594                 } else {
595                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
596                         final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
597                         commitFrame(goAway);
598                         connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
599                     }
600                 }
601                 break;
602             } else if (command instanceof PingCommand) {
603                 final PingCommand./../../../org/apache/hc/core5/http2/nio/command/PingCommand.html#PingCommand">PingCommand pingCommand = (PingCommand) command;
604                 final AsyncPingHandler handler = pingCommand.getHandler();
605                 pingHandlers.add(handler);
606                 final RawFrame ping = frameFactory.createPing(handler.getData());
607                 commitFrame(ping);
608             } else if (command instanceof ExecutableCommand) {
609                 final int streamId = generateStreamId();
610                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
611                         streamId, true, initInputWinSize, initOutputWinSize);
612                 final ExecutableCommandrg/apache/hc/core5/http/nio/command/ExecutableCommand.html#ExecutableCommand">ExecutableCommand executableCommand = (ExecutableCommand) command;
613                 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
614                         executableCommand, channel, httpProcessor, connMetrics);
615 
616                 final H2Stream stream = new H2Stream(channel, streamHandler, false);
617                 streamMap.put(streamId, stream);
618 
619                 if (streamListener != null) {
620                     final int initInputWindow = stream.getInputWindow().get();
621                     streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
622                     final int initOutputWindow = stream.getOutputWindow().get();
623                     streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
624                 }
625 
626                 if (stream.isOutputReady()) {
627                     stream.produceOutput();
628                 }
629                 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
630                 if (cancellableDependency != null) {
631                     cancellableDependency.setDependency(new Cancellable() {
632 
633                         @Override
634                         public boolean cancel() {
635                             return stream.abort();
636                         }
637 
638                     });
639                 }
640                 if (!outputQueue.isEmpty()) {
641                     return;
642                 }
643             }
644         }
645     }
646 
647     public final void onException(final Exception cause) {
648         try {
649             for (;;) {
650                 final AsyncPingHandler pingHandler = pingHandlers.poll();
651                 if (pingHandler != null) {
652                     pingHandler.failed(cause);
653                 } else {
654                     break;
655                 }
656             }
657             for (;;) {
658                 final Command command = ioSession.poll();
659                 if (command != null) {
660                     if (command instanceof ExecutableCommand) {
661                         ((ExecutableCommand) command).failed(new ConnectionClosedException());
662                     } else {
663                         command.cancel();
664                     }
665                 } else {
666                     break;
667                 }
668             }
669             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
670                 final Map.Entry<Integer, H2Stream> entry = it.next();
671                 final H2Stream stream = entry.getValue();
672                 stream.reset(cause);
673             }
674             streamMap.clear();
675             if (!(cause instanceof ConnectionClosedException)) {
676                 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
677                     final H2Error errorCode;
678                     if (cause instanceof H2ConnectionException) {
679                         errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
680                     } else if (cause instanceof ProtocolException){
681                         errorCode = H2Error.PROTOCOL_ERROR;
682                     } else {
683                         errorCode = H2Error.INTERNAL_ERROR;
684                     }
685                     final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
686                     commitFrame(goAway);
687                 }
688             }
689         } catch (final IOException ignore) {
690         } finally {
691             connState = ConnectionHandshake.SHUTDOWN;
692             final CloseMode closeMode;
693             if (cause instanceof ConnectionClosedException) {
694                 closeMode = CloseMode.GRACEFUL;
695             } else if (cause instanceof IOException) {
696                 closeMode = CloseMode.IMMEDIATE;
697             } else {
698                 closeMode = CloseMode.GRACEFUL;
699             }
700             ioSession.close(closeMode);
701         }
702     }
703 
704     private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
705         if (streamId == 0) {
706             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
707         }
708         final H2Stream stream = streamMap.get(streamId);
709         if (stream == null) {
710             if (streamId <= lastStreamId.get()) {
711                 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
712             } else {
713                 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
714             }
715         }
716         return stream;
717     }
718 
719     private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
720         final FrameType frameType = FrameType.valueOf(frame.getType());
721         final int streamId = frame.getStreamId();
722         if (continuation != null && frameType != FrameType.CONTINUATION) {
723             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
724         }
725         if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) >= 0) {
726             if (streamId > processedRemoteStreamId && !idGenerator.isSameSide(streamId)) {
727                 // ignore the frame
728                 return;
729             }
730         }
731         switch (frameType) {
732             case DATA: {
733                 final H2Stream stream = getValidStream(streamId);
734                 try {
735                     consumeDataFrame(frame, stream);
736                 } catch (final H2StreamResetException ex) {
737                     stream.localReset(ex);
738                 } catch (final HttpStreamResetException ex) {
739                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
740                 }
741 
742                 if (stream.isTerminated()) {
743                     streamMap.remove(streamId);
744                     stream.releaseResources();
745                 }
746             }
747             break;
748             case HEADERS: {
749                 if (streamId == 0) {
750                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
751                 }
752                 H2Stream stream = streamMap.get(streamId);
753                 if (stream == null) {
754                     acceptHeaderFrame();
755 
756                     updateLastStreamId(streamId);
757 
758                     final H2StreamChannelImpl channel = new H2StreamChannelImpl(
759                             streamId, false, initInputWinSize, initOutputWinSize);
760                     final H2StreamHandler streamHandler = createRemotelyInitiatedStream(
761                             channel, httpProcessor, connMetrics, null);
762                     stream = new H2Stream(channel, streamHandler, true);
763                     if (stream.isOutputReady()) {
764                         stream.produceOutput();
765                     }
766                     streamMap.put(streamId, stream);
767                 }
768 
769                 try {
770                     consumeHeaderFrame(frame, stream);
771 
772                     if (stream.isOutputReady()) {
773                         stream.produceOutput();
774                     }
775                 } catch (final H2StreamResetException ex) {
776                     stream.localReset(ex);
777                 } catch (final HttpStreamResetException ex) {
778                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
779                 } catch (final HttpException ex) {
780                     stream.handle(ex);
781                 }
782 
783                 if (stream.isTerminated()) {
784                     streamMap.remove(streamId);
785                     stream.releaseResources();
786                 }
787             }
788             break;
789             case CONTINUATION: {
790                 if (continuation == null) {
791                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
792                 }
793                 if (streamId != continuation.streamId) {
794                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
795                 }
796 
797                 final H2Stream stream = getValidStream(streamId);
798                 try {
799 
800                     consumeContinuationFrame(frame, stream);
801                 } catch (final H2StreamResetException ex) {
802                     stream.localReset(ex);
803                 } catch (final HttpStreamResetException ex) {
804                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
805                 }
806 
807                 if (stream.isTerminated()) {
808                     streamMap.remove(streamId);
809                     stream.releaseResources();
810                 }
811             }
812             break;
813             case WINDOW_UPDATE: {
814                 final ByteBuffer payload = frame.getPayload();
815                 if (payload == null || payload.remaining() != 4) {
816                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
817                 }
818                 final int delta = payload.getInt();
819                 if (delta <= 0) {
820                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
821                 }
822                 if (streamId == 0) {
823                     try {
824                         updateOutputWindow(0, connOutputWindow, delta);
825                     } catch (final ArithmeticException ex) {
826                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
827                     }
828                 } else {
829                     final H2Stream stream = streamMap.get(streamId);
830                     if (stream != null) {
831                         try {
832                             updateOutputWindow(streamId, stream.getOutputWindow(), delta);
833                         } catch (final ArithmeticException ex) {
834                             throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
835                         }
836                     }
837                 }
838                 ioSession.setEvent(SelectionKey.OP_WRITE);
839             }
840             break;
841             case RST_STREAM: {
842                 if (streamId == 0) {
843                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
844                 }
845                 final H2Stream stream = streamMap.get(streamId);
846                 if (stream == null) {
847                     if (streamId > lastStreamId.get()) {
848                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
849                     }
850                 } else {
851                     final ByteBuffer payload = frame.getPayload();
852                     if (payload == null || payload.remaining() != 4) {
853                         throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
854                     }
855                     final int errorCode = payload.getInt();
856                     stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
857                     streamMap.remove(streamId);
858                     stream.releaseResources();
859                 }
860             }
861             break;
862             case PING: {
863                 if (streamId != 0) {
864                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
865                 }
866                 final ByteBuffer ping = frame.getPayloadContent();
867                 if (ping == null || ping.remaining() != 8) {
868                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
869                 }
870                 if (frame.isFlagSet(FrameFlag.ACK)) {
871                     final AsyncPingHandler pingHandler = pingHandlers.poll();
872                     if (pingHandler != null) {
873                         pingHandler.consumeResponse(ping);
874                     }
875                 } else {
876                     final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
877                     pong.put(ping);
878                     pong.flip();
879                     final RawFrame response = frameFactory.createPingAck(pong);
880                     commitFrame(response);
881                 }
882             }
883             break;
884             case SETTINGS: {
885                 if (streamId != 0) {
886                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
887                 }
888                 if (frame.isFlagSet(FrameFlag.ACK)) {
889                     if (localSettingState == SettingsHandshake.TRANSMITTED) {
890                         localSettingState = SettingsHandshake.ACKED;
891                         ioSession.setEvent(SelectionKey.OP_WRITE);
892                         applyLocalSettings();
893                     }
894                 } else {
895                     final ByteBuffer payload = frame.getPayload();
896                     if (payload != null) {
897                         if ((payload.remaining() % 6) != 0) {
898                             throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
899                         }
900                         consumeSettingsFrame(payload);
901                         remoteSettingState = SettingsHandshake.TRANSMITTED;
902                     }
903                     // Send ACK
904                     final RawFrame response = frameFactory.createSettingsAck();
905                     commitFrame(response);
906                     remoteSettingState = SettingsHandshake.ACKED;
907                 }
908             }
909             break;
910             case PRIORITY:
911                 // Stream priority not supported
912                 break;
913             case PUSH_PROMISE: {
914                 acceptPushFrame();
915 
916                 if (!localConfig.isPushEnabled()) {
917                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
918                 }
919 
920                 final H2Stream stream = getValidStream(streamId);
921                 if (stream.isRemoteClosed()) {
922                     stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
923                     break;
924                 }
925 
926                 final ByteBuffer payload = frame.getPayloadContent();
927                 if (payload == null || payload.remaining() < 4) {
928                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
929                 }
930                 final int promisedStreamId = payload.getInt();
931                 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
932                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
933                 }
934                 if (streamMap.get(promisedStreamId) != null) {
935                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
936                 }
937 
938                 updateLastStreamId(promisedStreamId);
939 
940                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
941                         promisedStreamId, false, initInputWinSize, initOutputWinSize);
942                 final H2StreamHandler streamHandler = createRemotelyInitiatedStream(
943                         channel, httpProcessor, connMetrics, stream.getPushHandlerFactory());
944                 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
945                 streamMap.put(promisedStreamId, promisedStream);
946 
947                 try {
948                     consumePushPromiseFrame(frame, payload, promisedStream);
949                 } catch (final H2StreamResetException ex) {
950                     promisedStream.localReset(ex);
951                 } catch (final HttpStreamResetException ex) {
952                     promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
953                 }
954             }
955             break;
956             case GOAWAY: {
957                 if (streamId != 0) {
958                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
959                 }
960                 final ByteBuffer payload = frame.getPayload();
961                 if (payload == null || payload.remaining() < 8) {
962                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
963                 }
964                 final int processedLocalStreamId = payload.getInt();
965                 final int errorCode = payload.getInt();
966                 if (errorCode == H2Error.NO_ERROR.getCode()) {
967                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
968                         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
969                             final Map.Entry<Integer, H2Stream> entry = it.next();
970                             final int activeStreamId = entry.getKey();
971                             if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
972                                 final H2Stream stream = entry.getValue();
973                                 stream.cancel();
974                                 it.remove();
975                             }
976                         }
977                     }
978                     connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
979                 } else {
980                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
981                         final Map.Entry<Integer, H2Stream> entry = it.next();
982                         final H2Stream stream = entry.getValue();
983                         stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
984                     }
985                     streamMap.clear();
986                     connState = ConnectionHandshake.SHUTDOWN;
987                 }
988             }
989             ioSession.setEvent(SelectionKey.OP_WRITE);
990             break;
991         }
992     }
993 
994     private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
995         final int streamId = stream.getId();
996         final ByteBuffer payload = frame.getPayloadContent();
997         if (payload != null) {
998             final int frameLength = frame.getLength();
999             final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1000             if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1001                 stream.produceInputCapacityUpdate();
1002             }
1003             final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1004             if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1005                 maximizeConnWindow(connWinSize);
1006             }
1007         }
1008         if (stream.isRemoteClosed()) {
1009             throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1010         }
1011         if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1012             stream.setRemoteEndStream();
1013         }
1014         if (stream.isLocalReset()) {
1015             return;
1016         }
1017         stream.consumeData(payload);
1018     }
1019 
1020     private void maximizeConnWindow(final int connWinSize) throws IOException {
1021         final int delta = Integer.MAX_VALUE - connWinSize;
1022         if (delta > 0) {
1023             final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1024             commitFrame(windowUpdateFrame);
1025             updateInputWindow(0, connInputWindow, delta);
1026         }
1027     }
1028 
1029     private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1030         final int promisedStreamId = promisedStream.getId();
1031         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1032             continuation = new Continuation(promisedStreamId, frame.getType(), true);
1033         }
1034         if (continuation == null) {
1035             final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1036             if (promisedStreamId > processedRemoteStreamId) {
1037                 processedRemoteStreamId = promisedStreamId;
1038             }
1039             if (streamListener != null) {
1040                 streamListener.onHeaderInput(this, promisedStreamId, headers);
1041             }
1042             if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1043                 throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused");
1044             }
1045             promisedStream.consumePromise(headers);
1046         } else {
1047             continuation.copyPayload(payload);
1048         }
1049     }
1050 
1051     List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1052         return hPackDecoder.decodeHeaders(payload);
1053     }
1054 
1055     private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1056         final int streamId = stream.getId();
1057         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1058             continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1059         }
1060         final ByteBuffer payload = frame.getPayloadContent();
1061         if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1062             // Priority not supported
1063             payload.getInt();
1064             payload.get();
1065         }
1066         if (continuation == null) {
1067             final List<Header> headers = decodeHeaders(payload);
1068             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1069                 processedRemoteStreamId = streamId;
1070             }
1071             if (streamListener != null) {
1072                 streamListener.onHeaderInput(this, streamId, headers);
1073             }
1074             if (stream.isRemoteClosed()) {
1075                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1076             }
1077             if (stream.isLocalReset()) {
1078                 return;
1079             }
1080             if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1081                 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
1082             }
1083             if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1084                 stream.setRemoteEndStream();
1085             }
1086             stream.consumeHeader(headers);
1087         } else {
1088             continuation.copyPayload(payload);
1089         }
1090     }
1091 
1092     private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1093         final int streamId = frame.getStreamId();
1094         final ByteBuffer payload = frame.getPayload();
1095         continuation.copyPayload(payload);
1096         if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1097             final List<Header> headers = decodeHeaders(continuation.getContent());
1098             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1099                 processedRemoteStreamId = streamId;
1100             }
1101             if (streamListener != null) {
1102                 streamListener.onHeaderInput(this, streamId, headers);
1103             }
1104             if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1105                 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
1106             }
1107             if (stream.isRemoteClosed()) {
1108                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1109             }
1110             if (stream.isLocalReset()) {
1111                 return;
1112             }
1113             if (continuation.endStream) {
1114                 stream.setRemoteEndStream();
1115             }
1116             if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1117                 stream.consumePromise(headers);
1118             } else {
1119                 stream.consumeHeader(headers);
1120             }
1121             continuation = null;
1122         }
1123     }
1124 
1125     private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1126         final H2Config.Builder configBuilder = H2Config.initial();
1127         while (payload.hasRemaining()) {
1128             final int code = payload.getShort();
1129             final int value = payload.getInt();
1130             final H2Param param = H2Param.valueOf(code);
1131             if (param != null) {
1132                 switch (param) {
1133                     case HEADER_TABLE_SIZE:
1134                         try {
1135                             configBuilder.setHeaderTableSize(value);
1136                         } catch (final IllegalArgumentException ex) {
1137                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1138                         }
1139                         break;
1140                     case MAX_CONCURRENT_STREAMS:
1141                         try {
1142                             configBuilder.setMaxConcurrentStreams(value);
1143                         } catch (final IllegalArgumentException ex) {
1144                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1145                         }
1146                         break;
1147                     case ENABLE_PUSH:
1148                         configBuilder.setPushEnabled(value == 1);
1149                         break;
1150                     case INITIAL_WINDOW_SIZE:
1151                         try {
1152                             configBuilder.setInitialWindowSize(value);
1153                         } catch (final IllegalArgumentException ex) {
1154                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1155                         }
1156                         break;
1157                     case MAX_FRAME_SIZE:
1158                         try {
1159                             configBuilder.setMaxFrameSize(value);
1160                         } catch (final IllegalArgumentException ex) {
1161                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1162                         }
1163                         break;
1164                     case MAX_HEADER_LIST_SIZE:
1165                         try {
1166                             configBuilder.setMaxHeaderListSize(value);
1167                         } catch (final IllegalArgumentException ex) {
1168                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1169                         }
1170                         break;
1171                 }
1172             }
1173         }
1174         applyRemoteSettings(configBuilder.build());
1175     }
1176 
1177     private void produceOutput() throws HttpException, IOException {
1178         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1179             final Map.Entry<Integer, H2Stream> entry = it.next();
1180             final H2Stream stream = entry.getValue();
1181             if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1182                 stream.produceOutput();
1183             }
1184             if (stream.isTerminated()) {
1185                 it.remove();
1186                 stream.releaseResources();
1187             }
1188             if (!outputQueue.isEmpty()) {
1189                 break;
1190             }
1191         }
1192     }
1193 
1194     private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1195         remoteConfig = config;
1196 
1197         hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1198         final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1199         initOutputWinSize = remoteConfig.getInitialWindowSize();
1200 
1201         if (delta != 0) {
1202             if (!streamMap.isEmpty()) {
1203                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1204                     final Map.Entry<Integer, H2Stream> entry = it.next();
1205                     final H2Stream stream = entry.getValue();
1206                     try {
1207                         updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1208                     } catch (final ArithmeticException ex) {
1209                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1210                     }
1211                 }
1212             }
1213         }
1214     }
1215 
1216     private void applyLocalSettings() throws H2ConnectionException {
1217         hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1218         hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1219 
1220         final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1221         initInputWinSize = localConfig.getInitialWindowSize();
1222 
1223         if (delta != 0 && !streamMap.isEmpty()) {
1224             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1225                 final Map.Entry<Integer, H2Stream> entry = it.next();
1226                 final H2Stream stream = entry.getValue();
1227                 try {
1228                     updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1229                 } catch (final ArithmeticException ex) {
1230                     throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1231                 }
1232             }
1233         }
1234         lowMark = initInputWinSize / 2;
1235     }
1236 
1237     @Override
1238     public void close() throws IOException {
1239         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1240     }
1241 
1242     @Override
1243     public void close(final CloseMode closeMode) {
1244         ioSession.close(closeMode);
1245     }
1246 
1247     @Override
1248     public boolean isOpen() {
1249         return connState == ConnectionHandshake.ACTIVE;
1250     }
1251 
1252     @Override
1253     public void setSocketTimeout(final Timeout timeout) {
1254         ioSession.setSocketTimeout(timeout);
1255     }
1256 
1257     @Override
1258     public SSLSession getSSLSession() {
1259         final TlsDetails tlsDetails = ioSession.getTlsDetails();
1260         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1261     }
1262 
1263     @Override
1264     public EndpointDetails getEndpointDetails() {
1265         if (endpointDetails == null) {
1266             endpointDetails = new BasicEndpointDetails(
1267                     ioSession.getRemoteAddress(),
1268                     ioSession.getLocalAddress(),
1269                     connMetrics,
1270                     ioSession.getSocketTimeout());
1271         }
1272         return endpointDetails;
1273     }
1274 
1275     @Override
1276     public Timeout getSocketTimeout() {
1277         return ioSession.getSocketTimeout();
1278     }
1279 
1280     @Override
1281     public ProtocolVersion getProtocolVersion() {
1282         return HttpVersion.HTTP_2;
1283     }
1284 
1285     @Override
1286     public SocketAddress getRemoteAddress() {
1287         return ioSession.getRemoteAddress();
1288     }
1289 
1290     @Override
1291     public SocketAddress getLocalAddress() {
1292         return ioSession.getLocalAddress();
1293     }
1294 
1295     void appendState(final StringBuilder buf) {
1296         buf.append("connState=").append(connState)
1297                 .append(", connInputWindow=").append(connInputWindow)
1298                 .append(", connOutputWindow=").append(connOutputWindow)
1299                 .append(", outputQueue=").append(outputQueue.size())
1300                 .append(", streamMap=").append(streamMap.size())
1301                 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1302     }
1303 
1304     private static class Continuation {
1305 
1306         final int streamId;
1307         final int type;
1308         final boolean endStream;
1309         final ByteArrayBuffer headerBuffer;
1310 
1311         private Continuation(final int streamId, final int type, final boolean endStream) {
1312             this.streamId = streamId;
1313             this.type = type;
1314             this.endStream = endStream;
1315             this.headerBuffer = new ByteArrayBuffer(1024);
1316         }
1317 
1318         void copyPayload(final ByteBuffer payload) {
1319             if (payload == null) {
1320                 return;
1321             }
1322             headerBuffer.ensureCapacity(payload.remaining());
1323             payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1324         }
1325 
1326         ByteBuffer getContent() {
1327             return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1328         }
1329 
1330     }
1331 
1332     private class H2StreamChannelImpl implements H2StreamChannel {
1333 
1334         private final int id;
1335         private final AtomicInteger inputWindow;
1336         private final AtomicInteger outputWindow;
1337 
1338         private volatile boolean idle;
1339         private volatile boolean remoteEndStream;
1340         private volatile boolean localEndStream;
1341 
1342         private volatile long deadline;
1343 
1344         H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1345             this.id = id;
1346             this.idle = idle;
1347             this.inputWindow = new AtomicInteger(initialInputWindowSize);
1348             this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1349         }
1350 
1351         int getId() {
1352             return id;
1353         }
1354 
1355         AtomicInteger getOutputWindow() {
1356             return outputWindow;
1357         }
1358 
1359         AtomicInteger getInputWindow() {
1360             return inputWindow;
1361         }
1362 
1363         @Override
1364         public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1365             ioSession.getLock().lock();
1366             try {
1367                 if (headers == null || headers.isEmpty()) {
1368                     throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1369                 }
1370                 if (localEndStream) {
1371                     return;
1372                 }
1373                 idle = false;
1374                 commitHeaders(id, headers, endStream);
1375                 if (endStream) {
1376                     localEndStream = true;
1377                 }
1378             } finally {
1379                 ioSession.getLock().unlock();
1380             }
1381         }
1382 
1383         @Override
1384         public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1385             acceptPushRequest();
1386             final int promisedStreamId = generateStreamId();
1387             final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1388                     promisedStreamId,
1389                     true,
1390                     localConfig.getInitialWindowSize(),
1391                     remoteConfig.getInitialWindowSize());
1392             final HttpCoreContext context = HttpCoreContext.create();
1393             context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1394             context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1395             final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1396                     channel, httpProcessor, connMetrics, pushProducer, context);
1397             final H2Stream stream = new H2Stream(channel, streamHandler, false);
1398             streamMap.put(promisedStreamId, stream);
1399 
1400             ioSession.getLock().lock();
1401             try {
1402                 if (localEndStream) {
1403                     stream.releaseResources();
1404                     return;
1405                 }
1406                 commitPushPromise(id, promisedStreamId, headers);
1407                 idle = false;
1408             } finally {
1409                 ioSession.getLock().unlock();
1410             }
1411         }
1412 
1413         @Override
1414         public void update(final int increment) throws IOException {
1415             if (remoteEndStream) {
1416                 return;
1417             }
1418             incrementInputCapacity(0, connInputWindow, increment);
1419             incrementInputCapacity(id, inputWindow, increment);
1420         }
1421 
1422         @Override
1423         public int write(final ByteBuffer payload) throws IOException {
1424             ioSession.getLock().lock();
1425             try {
1426                 if (localEndStream) {
1427                     return 0;
1428                 }
1429                 return streamData(id, outputWindow, payload);
1430             } finally {
1431                 ioSession.getLock().unlock();
1432             }
1433         }
1434 
1435         @Override
1436         public void endStream(final List<? extends Header> trailers) throws IOException {
1437             ioSession.getLock().lock();
1438             try {
1439                 if (localEndStream) {
1440                     return;
1441                 }
1442                 localEndStream = true;
1443                 if (trailers != null && !trailers.isEmpty()) {
1444                     commitHeaders(id, trailers, true);
1445                 } else {
1446                     final RawFrame frame = frameFactory.createData(id, null, true);
1447                     commitFrameInternal(frame);
1448                 }
1449             } finally {
1450                 ioSession.getLock().unlock();
1451             }
1452         }
1453 
1454         @Override
1455         public void endStream() throws IOException {
1456             endStream(null);
1457         }
1458 
1459         @Override
1460         public void requestOutput() {
1461             requestSessionOutput();
1462         }
1463 
1464         boolean isRemoteClosed() {
1465             return remoteEndStream;
1466         }
1467 
1468         void setRemoteEndStream() {
1469             remoteEndStream = true;
1470         }
1471 
1472         boolean isLocalClosed() {
1473             return localEndStream;
1474         }
1475 
1476         void setLocalEndStream() {
1477             localEndStream = true;
1478         }
1479 
1480         boolean isLocalReset() {
1481             return deadline > 0;
1482         }
1483 
1484         boolean isResetDeadline() {
1485             final long l = deadline;
1486             return l > 0 && l < System.currentTimeMillis();
1487         }
1488 
1489         boolean localReset(final int code) throws IOException {
1490             ioSession.getLock().lock();
1491             try {
1492                 if (localEndStream) {
1493                     return false;
1494                 }
1495                 localEndStream = true;
1496                 deadline = System.currentTimeMillis() + LINGER_TIME;
1497                 if (!idle) {
1498                     final RawFrame resetStream = frameFactory.createResetStream(id, code);
1499                     commitFrameInternal(resetStream);
1500                     return true;
1501                 }
1502                 return false;
1503             } finally {
1504                 ioSession.getLock().unlock();
1505             }
1506         }
1507 
1508         boolean localReset(final H2Error error) throws IOException {
1509             return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1510         }
1511 
1512         @Override
1513         public boolean cancel() {
1514             try {
1515                 return localReset(H2Error.CANCEL);
1516             } catch (final IOException ignore) {
1517                 return false;
1518             }
1519         }
1520 
1521         void appendState(final StringBuilder buf) {
1522             buf.append("id=").append(id)
1523                     .append(", connState=").append(connState)
1524                     .append(", inputWindow=").append(inputWindow)
1525                     .append(", outputWindow=").append(outputWindow)
1526                     .append(", localEndStream=").append(localEndStream)
1527                     .append(", idle=").append(idle);
1528         }
1529 
1530         @Override
1531         public String toString() {
1532             final StringBuilder buf = new StringBuilder();
1533             buf.append("[");
1534             appendState(buf);
1535             buf.append("]");
1536             return buf.toString();
1537         }
1538 
1539     }
1540 
1541     static class H2Stream {
1542 
1543         private final H2StreamChannelImpl channel;
1544         private final H2StreamHandler handler;
1545         private final boolean remoteInitiated;
1546 
1547         private H2Stream(
1548                 final H2StreamChannelImpl channel,
1549                 final H2StreamHandler handler,
1550                 final boolean remoteInitiated) {
1551             this.channel = channel;
1552             this.handler = handler;
1553             this.remoteInitiated = remoteInitiated;
1554         }
1555 
1556         int getId() {
1557             return channel.getId();
1558         }
1559 
1560         boolean isRemoteInitiated() {
1561             return remoteInitiated;
1562         }
1563 
1564         AtomicInteger getOutputWindow() {
1565             return channel.getOutputWindow();
1566         }
1567 
1568         AtomicInteger getInputWindow() {
1569             return channel.getInputWindow();
1570         }
1571 
1572         boolean isTerminated() {
1573             return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1574         }
1575 
1576         boolean isRemoteClosed() {
1577             return channel.isRemoteClosed();
1578         }
1579 
1580         boolean isLocalClosed() {
1581             return channel.isLocalClosed();
1582         }
1583 
1584         boolean isLocalReset() {
1585             return channel.isLocalReset();
1586         }
1587 
1588         void setRemoteEndStream() {
1589             channel.setRemoteEndStream();
1590         }
1591 
1592         void consumePromise(final List<Header> headers) throws HttpException, IOException {
1593             try {
1594                 handler.consumePromise(headers);
1595                 channel.setLocalEndStream();
1596             } catch (final ProtocolException ex) {
1597                 localReset(ex, H2Error.PROTOCOL_ERROR);
1598             }
1599         }
1600 
1601         void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1602             try {
1603                 handler.consumeHeader(headers, channel.isRemoteClosed());
1604             } catch (final ProtocolException ex) {
1605                 localReset(ex, H2Error.PROTOCOL_ERROR);
1606             }
1607         }
1608 
1609         void consumeData(final ByteBuffer src) throws HttpException, IOException {
1610             try {
1611                 handler.consumeData(src, channel.isRemoteClosed());
1612             } catch (final CharacterCodingException ex) {
1613                 localReset(ex, H2Error.INTERNAL_ERROR);
1614             } catch (final ProtocolException ex) {
1615                 localReset(ex, H2Error.PROTOCOL_ERROR);
1616             }
1617         }
1618 
1619         boolean isOutputReady() {
1620             return handler.isOutputReady();
1621         }
1622 
1623         void produceOutput() throws HttpException, IOException {
1624             try {
1625                 handler.produceOutput();
1626             } catch (final ProtocolException ex) {
1627                 localReset(ex, H2Error.PROTOCOL_ERROR);
1628             }
1629         }
1630 
1631         void produceInputCapacityUpdate() throws IOException {
1632             handler.updateInputCapacity();
1633         }
1634 
1635         void reset(final Exception cause) {
1636             channel.setRemoteEndStream();
1637             channel.setLocalEndStream();
1638             handler.failed(cause);
1639         }
1640 
1641         void localReset(final Exception cause, final int code) throws IOException {
1642             channel.localReset(code);
1643             handler.failed(cause);
1644         }
1645 
1646         void localReset(final Exception cause, final H2Error error) throws IOException {
1647             localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1648         }
1649 
1650         void localReset(final H2StreamResetException ex) throws IOException {
1651             localReset(ex, ex.getCode());
1652         }
1653 
1654         void handle(final HttpExceptionn.html#HttpException">HttpException ex) throws IOException, HttpException {
1655             handler.handle(ex, channel.isRemoteClosed());
1656         }
1657 
1658         HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1659             return handler.getPushHandlerFactory();
1660         }
1661 
1662         void cancel() {
1663             reset(new CancellationException("HTTP/2 message exchange cancelled"));
1664         }
1665 
1666         boolean abort() {
1667             final boolean cancelled = channel.cancel();
1668             handler.failed(new CancellationException("HTTP/2 message exchange cancelled"));
1669             return cancelled;
1670         }
1671 
1672         void releaseResources() {
1673             handler.releaseResources();
1674             channel.requestOutput();
1675         }
1676 
1677         void appendState(final StringBuilder buf) {
1678             buf.append("channel=[");
1679             channel.appendState(buf);
1680             buf.append("]");
1681         }
1682 
1683         @Override
1684         public String toString() {
1685             final StringBuilder buf = new StringBuilder();
1686             buf.append("[");
1687             appendState(buf);
1688             buf.append("]");
1689             return buf.toString();
1690         }
1691 
1692     }
1693 
1694 }