View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.SelectionKey;
33  import java.nio.charset.CharacterCodingException;
34  import java.util.Deque;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Queue;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.ConcurrentLinkedDeque;
41  import java.util.concurrent.ConcurrentLinkedQueue;
42  import java.util.concurrent.atomic.AtomicInteger;
43  
44  import javax.net.ssl.SSLSession;
45  
46  import org.apache.hc.core5.concurrent.Cancellable;
47  import org.apache.hc.core5.concurrent.CancellableDependency;
48  import org.apache.hc.core5.http.ConnectionClosedException;
49  import org.apache.hc.core5.http.EndpointDetails;
50  import org.apache.hc.core5.http.Header;
51  import org.apache.hc.core5.http.HttpConnection;
52  import org.apache.hc.core5.http.HttpException;
53  import org.apache.hc.core5.http.HttpStreamResetException;
54  import org.apache.hc.core5.http.HttpVersion;
55  import org.apache.hc.core5.http.ProtocolException;
56  import org.apache.hc.core5.http.ProtocolVersion;
57  import org.apache.hc.core5.http.RequestNotExecutedException;
58  import org.apache.hc.core5.http.config.CharCodingConfig;
59  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
60  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
61  import org.apache.hc.core5.http.impl.CharCodingSupport;
62  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
63  import org.apache.hc.core5.http.nio.AsyncPushProducer;
64  import org.apache.hc.core5.http.nio.HandlerFactory;
65  import org.apache.hc.core5.http.nio.command.ExecutableCommand;
66  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
67  import org.apache.hc.core5.http.protocol.HttpCoreContext;
68  import org.apache.hc.core5.http.protocol.HttpProcessor;
69  import org.apache.hc.core5.http2.H2ConnectionException;
70  import org.apache.hc.core5.http2.H2Error;
71  import org.apache.hc.core5.http2.H2StreamResetException;
72  import org.apache.hc.core5.http2.config.H2Config;
73  import org.apache.hc.core5.http2.config.H2Param;
74  import org.apache.hc.core5.http2.config.H2Setting;
75  import org.apache.hc.core5.http2.frame.FrameFactory;
76  import org.apache.hc.core5.http2.frame.FrameFlag;
77  import org.apache.hc.core5.http2.frame.FrameType;
78  import org.apache.hc.core5.http2.frame.RawFrame;
79  import org.apache.hc.core5.http2.frame.StreamIdGenerator;
80  import org.apache.hc.core5.http2.hpack.HPackDecoder;
81  import org.apache.hc.core5.http2.hpack.HPackEncoder;
82  import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
83  import org.apache.hc.core5.http2.nio.AsyncPingHandler;
84  import org.apache.hc.core5.http2.nio.command.PingCommand;
85  import org.apache.hc.core5.io.CloseMode;
86  import org.apache.hc.core5.reactor.Command;
87  import org.apache.hc.core5.reactor.ProtocolIOSession;
88  import org.apache.hc.core5.reactor.ssl.TlsDetails;
89  import org.apache.hc.core5.util.Args;
90  import org.apache.hc.core5.util.ByteArrayBuffer;
91  import org.apache.hc.core5.util.Identifiable;
92  import org.apache.hc.core5.util.Timeout;
93  
94  abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
95  
96      private static final long LINGER_TIME = 1000; // 1 second
97      private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; // 10 MiB
98  
99      enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
100     enum SettingsHandshake { READY, TRANSMITTED, ACKED }
101 
102     private final ProtocolIOSession ioSession;
103     private final FrameFactory frameFactory;
104     private final StreamIdGenerator idGenerator;
105     private final HttpProcessor httpProcessor;
106     private final H2Config localConfig;
107     private final BasicH2TransportMetrics inputMetrics;
108     private final BasicH2TransportMetrics outputMetrics;
109     private final BasicHttpConnectionMetrics connMetrics;
110     private final FrameInputBuffer inputBuffer;
111     private final FrameOutputBuffer outputBuffer;
112     private final Deque<RawFrame> outputQueue;
113     private final HPackEncoder hPackEncoder;
114     private final HPackDecoder hPackDecoder;
115     private final Map<Integer, H2Stream> streamMap;
116     private final Queue<AsyncPingHandler> pingHandlers;
117     private final AtomicInteger connInputWindow;
118     private final AtomicInteger connOutputWindow;
119     private final AtomicInteger outputRequests;
120     private final AtomicInteger lastStreamId;
121     private final H2StreamListener streamListener;
122 
123     private ConnectionHandshake connState = ConnectionHandshake.READY;
124     private SettingsHandshake localSettingState = SettingsHandshake.READY;
125     private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
126 
127     private int initInputWinSize;
128     private int initOutputWinSize;
129     private int lowMark;
130 
131     private volatile H2Config remoteConfig;
132 
133     private Continuation continuation;
134 
135     private int processedRemoteStreamId;
136     private EndpointDetails endpointDetails;
137     private boolean goAwayReceived;
138 
139     AbstractH2StreamMultiplexer(
140             final ProtocolIOSession ioSession,
141             final FrameFactory frameFactory,
142             final StreamIdGenerator idGenerator,
143             final HttpProcessor httpProcessor,
144             final CharCodingConfig charCodingConfig,
145             final H2Config h2Config,
146             final H2StreamListener streamListener) {
147         this.ioSession = Args.notNull(ioSession, "IO session");
148         this.frameFactory = Args.notNull(frameFactory, "Frame factory");
149         this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
150         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
151         this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
152         this.inputMetrics = new BasicH2TransportMetrics();
153         this.outputMetrics = new BasicH2TransportMetrics();
154         this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
155         this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
156         this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
157         this.outputQueue = new ConcurrentLinkedDeque<>();
158         this.pingHandlers = new ConcurrentLinkedQueue<>();
159         this.outputRequests = new AtomicInteger(0);
160         this.lastStreamId = new AtomicInteger(0);
161         this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
162         this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
163         this.streamMap = new ConcurrentHashMap<>();
164         this.remoteConfig = H2Config.INIT;
165         this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
166         this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
167 
168         this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
169         this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
170 
171         this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
172         this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
173         this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
174 
175         this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
176         this.streamListener = streamListener;
177     }
178 
179     @Override
180     public String getId() {
181         return ioSession.getId();
182     }
183 
184     abstract void acceptHeaderFrame() throws H2ConnectionException;
185 
186     abstract void acceptPushRequest() throws H2ConnectionException;
187 
188     abstract void acceptPushFrame() throws H2ConnectionException;
189 
190     abstract H2StreamHandler createRemotelyInitiatedStream(
191             H2StreamChannel channel,
192             HttpProcessor httpProcessor,
193             BasicHttpConnectionMetrics connMetrics,
194             HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
195 
196     abstract H2StreamHandler createLocallyInitiatedStream(
197             ExecutableCommand command,
198             H2StreamChannel channel,
199             HttpProcessor httpProcessor,
200             BasicHttpConnectionMetrics connMetrics) throws IOException;
201 
202     private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
203         for (;;) {
204             final int current = window.get();
205             long newValue = (long) current + delta;
206 
207             //TODO: work-around for what looks like a bug in Ngnix (1.11)
208             // Tolerate if the update window exceeded by one
209             if (newValue == 0x80000000L) {
210                 newValue = Integer.MAX_VALUE;
211             }
212             //TODO: needs to be removed
213 
214             if (Math.abs(newValue) > 0x7fffffffL) {
215                 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
216             }
217             if (window.compareAndSet(current, (int) newValue)) {
218                 return (int) newValue;
219             }
220         }
221     }
222 
223     private int updateInputWindow(
224             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
225         final int newSize = updateWindow(window, delta);
226         if (streamListener != null) {
227             streamListener.onInputFlowControl(this, streamId, delta, newSize);
228         }
229         return newSize;
230     }
231 
232     private int updateOutputWindow(
233             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
234         final int newSize = updateWindow(window, delta);
235         if (streamListener != null) {
236             streamListener.onOutputFlowControl(this, streamId, delta, newSize);
237         }
238         return newSize;
239     }
240 
241     private void commitFrameInternal(final RawFrame frame) throws IOException {
242         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
243             if (streamListener != null) {
244                 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
245             }
246             outputBuffer.write(frame, ioSession);
247         } else {
248             outputQueue.addLast(frame);
249         }
250         ioSession.setEvent(SelectionKey.OP_WRITE);
251     }
252 
253     private void commitFrame(final RawFrame frame) throws IOException {
254         Args.notNull(frame, "Frame");
255         ioSession.getLock().lock();
256         try {
257             commitFrameInternal(frame);
258         } finally {
259             ioSession.getLock().unlock();
260         }
261     }
262 
263     private void commitHeaders(
264             final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
265         if (streamListener != null) {
266             streamListener.onHeaderOutput(this, streamId, headers);
267         }
268         final ByteArrayBufferByteArrayBuffer.html#ByteArrayBuffer">ByteArrayBuffer buf = new ByteArrayBuffer(512);
269         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
270 
271         int off = 0;
272         int remaining = buf.length();
273         boolean continuation = false;
274 
275         while (remaining > 0) {
276             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
277             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
278 
279             remaining -= chunk;
280             off += chunk;
281 
282             final boolean endHeaders = remaining == 0;
283             final RawFrame frame;
284             if (!continuation) {
285                 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
286                 continuation = true;
287             } else {
288                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
289             }
290             commitFrameInternal(frame);
291         }
292     }
293 
294     private void commitPushPromise(
295             final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
296         if (headers == null || headers.isEmpty()) {
297             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
298         }
299         if (streamListener != null) {
300             streamListener.onHeaderOutput(this, streamId, headers);
301         }
302         final ByteArrayBufferByteArrayBuffer.html#ByteArrayBuffer">ByteArrayBuffer buf = new ByteArrayBuffer(512);
303         buf.append((byte)(promisedStreamId >> 24));
304         buf.append((byte)(promisedStreamId >> 16));
305         buf.append((byte)(promisedStreamId >> 8));
306         buf.append((byte)(promisedStreamId));
307 
308         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
309 
310         int off = 0;
311         int remaining = buf.length();
312         boolean continuation = false;
313 
314         while (remaining > 0) {
315             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
316             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
317 
318             remaining -= chunk;
319             off += chunk;
320 
321             final boolean endHeaders = remaining == 0;
322             final RawFrame frame;
323             if (!continuation) {
324                 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
325                 continuation = true;
326             } else {
327                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
328             }
329             commitFrameInternal(frame);
330         }
331     }
332 
333     private void streamDataFrame(
334             final int streamId,
335             final AtomicInteger streamOutputWindow,
336             final ByteBuffer payload,
337             final int chunk) throws IOException {
338         final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
339         if (streamListener != null) {
340             streamListener.onFrameOutput(this, streamId, dataFrame);
341         }
342         updateOutputWindow(0, connOutputWindow, -chunk);
343         updateOutputWindow(streamId, streamOutputWindow, -chunk);
344         outputBuffer.write(dataFrame, ioSession);
345     }
346 
347     private int streamData(
348             final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
349         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
350             final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
351             if (capacity <= 0) {
352                 return 0;
353             }
354             final int maxPayloadSize = Math.min(capacity, remoteConfig.getMaxFrameSize());
355             final int chunk;
356             if (payload.remaining() <= maxPayloadSize) {
357                 chunk = payload.remaining();
358                 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
359             } else {
360                 chunk = maxPayloadSize;
361                 final int originalLimit = payload.limit();
362                 try {
363                     payload.limit(payload.position() + chunk);
364                     streamDataFrame(streamId, streamOutputWindow, payload, chunk);
365                 } finally {
366                     payload.limit(originalLimit);
367                 }
368             }
369             payload.position(payload.position() + chunk);
370             ioSession.setEvent(SelectionKey.OP_WRITE);
371             return chunk;
372         }
373         return 0;
374     }
375 
376     private void incrementInputCapacity(
377             final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
378         if (inputCapacity > 0) {
379             final int streamWinSize = inputWindow.get();
380             final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
381             final int chunk = Math.min(inputCapacity, remainingCapacity);
382             if (chunk != 0) {
383                 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
384                 commitFrame(windowUpdateFrame);
385                 updateInputWindow(streamId, inputWindow, chunk);
386             }
387         }
388     }
389 
390     private void requestSessionOutput() {
391         outputRequests.incrementAndGet();
392         ioSession.setEvent(SelectionKey.OP_WRITE);
393     }
394 
395     private void updateLastStreamId(final int streamId) {
396         final int currentId = lastStreamId.get();
397         if (streamId > currentId) {
398             lastStreamId.compareAndSet(currentId, streamId);
399         }
400     }
401 
402     private int generateStreamId() {
403         for (;;) {
404             final int currentId = lastStreamId.get();
405             final int newStreamId = idGenerator.generate(currentId);
406             if (lastStreamId.compareAndSet(currentId, newStreamId)) {
407                 return newStreamId;
408             }
409         }
410     }
411 
412     public final void onConnect() throws HttpException, IOException {
413         connState = ConnectionHandshake.ACTIVE;
414         final RawFrame settingsFrame = frameFactory.createSettings(
415                 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
416                 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
417                 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
418                 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
419                 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
420                 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
421 
422         commitFrame(settingsFrame);
423         localSettingState = SettingsHandshake.TRANSMITTED;
424         maximizeConnWindow(connInputWindow.get());
425 
426         if (streamListener != null) {
427             final int initInputWindow = connInputWindow.get();
428             streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
429             final int initOutputWindow = connOutputWindow.get();
430             streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
431         }
432     }
433 
434     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
435         if (connState == ConnectionHandshake.SHUTDOWN) {
436             ioSession.clearEvent(SelectionKey.OP_READ);
437         } else {
438             if (src != null) {
439                 inputBuffer.put(src);
440             }
441             RawFrame frame;
442             while ((frame = inputBuffer.read(ioSession)) != null) {
443                 if (streamListener != null) {
444                     streamListener.onFrameInput(this, frame.getStreamId(), frame);
445                 }
446                 consumeFrame(frame);
447             }
448         }
449     }
450 
451     public final void onOutput() throws HttpException, IOException {
452         ioSession.getLock().lock();
453         try {
454             if (!outputBuffer.isEmpty()) {
455                 outputBuffer.flush(ioSession);
456             }
457             while (outputBuffer.isEmpty()) {
458                 final RawFrame frame = outputQueue.poll();
459                 if (frame != null) {
460                     if (streamListener != null) {
461                         streamListener.onFrameOutput(this, frame.getStreamId(), frame);
462                     }
463                     outputBuffer.write(frame, ioSession);
464                 } else {
465                     break;
466                 }
467             }
468         } finally {
469             ioSession.getLock().unlock();
470         }
471 
472         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
473 
474             if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
475                 produceOutput();
476             }
477             final int pendingOutputRequests = outputRequests.get();
478             boolean outputPending = false;
479             if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
480                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
481                     final Map.Entry<Integer, H2Stream> entry = it.next();
482                     final H2Stream stream = entry.getValue();
483                     if (!stream.isLocalClosed()
484                             && stream.getOutputWindow().get() > 0
485                             && stream.isOutputReady()) {
486                         outputPending = true;
487                         break;
488                     }
489                 }
490             }
491             ioSession.getLock().lock();
492             try {
493                 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
494                         && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
495                     ioSession.clearEvent(SelectionKey.OP_WRITE);
496                 } else {
497                     outputRequests.addAndGet(-pendingOutputRequests);
498                 }
499             } finally {
500                 ioSession.getLock().unlock();
501             }
502         }
503 
504         if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
505             processPendingCommands();
506         }
507         if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
508             int liveStreams = 0;
509             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
510                 final Map.Entry<Integer, H2Stream> entry = it.next();
511                 final H2Stream stream = entry.getValue();
512                 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
513                     stream.releaseResources();
514                     it.remove();
515                 } else {
516                     if (idGenerator.isSameSide(stream.getId()) || stream.getId() <= processedRemoteStreamId) {
517                         liveStreams++;
518                     }
519                 }
520             }
521             if (liveStreams == 0) {
522                 connState = ConnectionHandshake.SHUTDOWN;
523             }
524         }
525         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
526             if (!streamMap.isEmpty()) {
527                 for (final H2Stream stream : streamMap.values()) {
528                     stream.releaseResources();
529                 }
530                 streamMap.clear();
531             }
532             ioSession.getLock().lock();
533             try {
534                 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
535                     ioSession.close();
536                 }
537             } finally {
538                 ioSession.getLock().unlock();
539             }
540         }
541     }
542 
543     public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
544         connState = ConnectionHandshake.SHUTDOWN;
545 
546         final RawFrame goAway;
547         if (localSettingState != SettingsHandshake.ACKED) {
548             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
549                             "Setting timeout (" + timeout + ")");
550         } else {
551             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
552                             "Timeout due to inactivity (" + timeout + ")");
553         }
554         commitFrame(goAway);
555         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
556             final Map.Entry<Integer, H2Stream> entry = it.next();
557             final H2Stream stream = entry.getValue();
558             stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
559         }
560         streamMap.clear();
561     }
562 
563     public final void onDisconnect() {
564         for (;;) {
565             final AsyncPingHandler pingHandler = pingHandlers.poll();
566             if (pingHandler != null) {
567                 pingHandler.cancel();
568             } else {
569                 break;
570             }
571         }
572         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
573             final Map.Entry<Integer, H2Stream> entry = it.next();
574             final H2Stream stream = entry.getValue();
575             stream.cancel();
576         }
577         for (;;) {
578             final Command command = ioSession.poll();
579             if (command != null) {
580                 if (command instanceof ExecutableCommand) {
581                     ((ExecutableCommand) command).failed(new ConnectionClosedException());
582                 } else {
583                     command.cancel();
584                 }
585             } else {
586                 break;
587             }
588         }
589     }
590 
591     private void processPendingCommands() throws IOException, HttpException {
592         while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
593             final Command command = ioSession.poll();
594             if (command == null) {
595                 break;
596             }
597             if (command instanceof ShutdownCommand) {
598                 final ShutdownCommand../org/apache/hc/core5/http/nio/command/ShutdownCommand.html#ShutdownCommand">ShutdownCommand shutdownCommand = (ShutdownCommand) command;
599                 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
600                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
601                         final Map.Entry<Integer, H2Stream> entry = it.next();
602                         final H2Stream stream = entry.getValue();
603                         stream.cancel();
604                     }
605                     streamMap.clear();
606                     connState = ConnectionHandshake.SHUTDOWN;
607                 } else {
608                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
609                         final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
610                         commitFrame(goAway);
611                         connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
612                     }
613                 }
614                 break;
615             } else if (command instanceof PingCommand) {
616                 final PingCommand./../../../org/apache/hc/core5/http2/nio/command/PingCommand.html#PingCommand">PingCommand pingCommand = (PingCommand) command;
617                 final AsyncPingHandler handler = pingCommand.getHandler();
618                 pingHandlers.add(handler);
619                 final RawFrame ping = frameFactory.createPing(handler.getData());
620                 commitFrame(ping);
621             } else if (command instanceof ExecutableCommand) {
622                 final int streamId = generateStreamId();
623                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
624                         streamId, true, initInputWinSize, initOutputWinSize);
625                 final ExecutableCommandrg/apache/hc/core5/http/nio/command/ExecutableCommand.html#ExecutableCommand">ExecutableCommand executableCommand = (ExecutableCommand) command;
626                 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
627                         executableCommand, channel, httpProcessor, connMetrics);
628 
629                 final H2Stream stream = new H2Stream(channel, streamHandler, false);
630                 streamMap.put(streamId, stream);
631 
632                 if (streamListener != null) {
633                     final int initInputWindow = stream.getInputWindow().get();
634                     streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
635                     final int initOutputWindow = stream.getOutputWindow().get();
636                     streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
637                 }
638 
639                 if (stream.isOutputReady()) {
640                     stream.produceOutput();
641                 }
642                 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
643                 if (cancellableDependency != null) {
644                     cancellableDependency.setDependency(new Cancellable() {
645 
646                         @Override
647                         public boolean cancel() {
648                             return stream.abort();
649                         }
650 
651                     });
652                 }
653                 if (!outputQueue.isEmpty()) {
654                     return;
655                 }
656             }
657         }
658     }
659 
660     public final void onException(final Exception cause) {
661         try {
662             for (;;) {
663                 final AsyncPingHandler pingHandler = pingHandlers.poll();
664                 if (pingHandler != null) {
665                     pingHandler.failed(cause);
666                 } else {
667                     break;
668                 }
669             }
670             for (;;) {
671                 final Command command = ioSession.poll();
672                 if (command != null) {
673                     if (command instanceof ExecutableCommand) {
674                         ((ExecutableCommand) command).failed(new ConnectionClosedException());
675                     } else {
676                         command.cancel();
677                     }
678                 } else {
679                     break;
680                 }
681             }
682             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
683                 final Map.Entry<Integer, H2Stream> entry = it.next();
684                 final H2Stream stream = entry.getValue();
685                 stream.reset(cause);
686             }
687             streamMap.clear();
688             if (!(cause instanceof ConnectionClosedException)) {
689                 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
690                     final H2Error errorCode;
691                     if (cause instanceof H2ConnectionException) {
692                         errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
693                     } else if (cause instanceof ProtocolException){
694                         errorCode = H2Error.PROTOCOL_ERROR;
695                     } else {
696                         errorCode = H2Error.INTERNAL_ERROR;
697                     }
698                     final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
699                     commitFrame(goAway);
700                 }
701             }
702         } catch (final IOException ignore) {
703         } finally {
704             connState = ConnectionHandshake.SHUTDOWN;
705             final CloseMode closeMode;
706             if (cause instanceof ConnectionClosedException) {
707                 closeMode = CloseMode.GRACEFUL;
708             } else if (cause instanceof IOException) {
709                 closeMode = CloseMode.IMMEDIATE;
710             } else {
711                 closeMode = CloseMode.GRACEFUL;
712             }
713             ioSession.close(closeMode);
714         }
715     }
716 
717     private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
718         if (streamId == 0) {
719             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
720         }
721         final H2Stream stream = streamMap.get(streamId);
722         if (stream == null) {
723             if (streamId <= lastStreamId.get()) {
724                 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
725             } else {
726                 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
727             }
728         }
729         return stream;
730     }
731 
732     private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
733         final FrameType frameType = FrameType.valueOf(frame.getType());
734         final int streamId = frame.getStreamId();
735         if (continuation != null && frameType != FrameType.CONTINUATION) {
736             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
737         }
738         switch (frameType) {
739             case DATA: {
740                 final H2Stream stream = getValidStream(streamId);
741                 try {
742                     consumeDataFrame(frame, stream);
743                 } catch (final H2StreamResetException ex) {
744                     stream.localReset(ex);
745                 } catch (final HttpStreamResetException ex) {
746                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
747                 }
748 
749                 if (stream.isTerminated()) {
750                     streamMap.remove(streamId);
751                     stream.releaseResources();
752                     requestSessionOutput();
753                 }
754             }
755             break;
756             case HEADERS: {
757                 if (streamId == 0) {
758                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
759                 }
760                 H2Stream stream = streamMap.get(streamId);
761                 if (stream == null) {
762                     acceptHeaderFrame();
763 
764                     if (idGenerator.isSameSide(streamId)) {
765                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
766                     }
767                     if (goAwayReceived ) {
768                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
769                     }
770 
771                     updateLastStreamId(streamId);
772 
773                     final H2StreamChannelImpl channel = new H2StreamChannelImpl(
774                             streamId, false, initInputWinSize, initOutputWinSize);
775                     final H2StreamHandler streamHandler;
776                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
777                         streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null);
778                     } else {
779                         streamHandler = NoopH2StreamHandler.INSTANCE;
780                         channel.setLocalEndStream();
781                     }
782 
783                     stream = new H2Stream(channel, streamHandler, true);
784                     if (stream.isOutputReady()) {
785                         stream.produceOutput();
786                     }
787                     streamMap.put(streamId, stream);
788                 }
789 
790                 try {
791                     consumeHeaderFrame(frame, stream);
792 
793                     if (stream.isOutputReady()) {
794                         stream.produceOutput();
795                     }
796                 } catch (final H2StreamResetException ex) {
797                     stream.localReset(ex);
798                 } catch (final HttpStreamResetException ex) {
799                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
800                 } catch (final HttpException ex) {
801                     stream.handle(ex);
802                 }
803 
804                 if (stream.isTerminated()) {
805                     streamMap.remove(streamId);
806                     stream.releaseResources();
807                     requestSessionOutput();
808                 }
809             }
810             break;
811             case CONTINUATION: {
812                 if (continuation == null) {
813                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
814                 }
815                 if (streamId != continuation.streamId) {
816                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
817                 }
818 
819                 final H2Stream stream = getValidStream(streamId);
820                 try {
821 
822                     consumeContinuationFrame(frame, stream);
823                 } catch (final H2StreamResetException ex) {
824                     stream.localReset(ex);
825                 } catch (final HttpStreamResetException ex) {
826                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
827                 }
828 
829                 if (stream.isTerminated()) {
830                     streamMap.remove(streamId);
831                     stream.releaseResources();
832                     requestSessionOutput();
833                 }
834             }
835             break;
836             case WINDOW_UPDATE: {
837                 final ByteBuffer payload = frame.getPayload();
838                 if (payload == null || payload.remaining() != 4) {
839                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
840                 }
841                 final int delta = payload.getInt();
842                 if (delta <= 0) {
843                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
844                 }
845                 if (streamId == 0) {
846                     try {
847                         updateOutputWindow(0, connOutputWindow, delta);
848                     } catch (final ArithmeticException ex) {
849                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
850                     }
851                 } else {
852                     final H2Stream stream = streamMap.get(streamId);
853                     if (stream != null) {
854                         try {
855                             updateOutputWindow(streamId, stream.getOutputWindow(), delta);
856                         } catch (final ArithmeticException ex) {
857                             throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
858                         }
859                     }
860                 }
861                 ioSession.setEvent(SelectionKey.OP_WRITE);
862             }
863             break;
864             case RST_STREAM: {
865                 if (streamId == 0) {
866                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
867                 }
868                 final H2Stream stream = streamMap.get(streamId);
869                 if (stream == null) {
870                     if (streamId > lastStreamId.get()) {
871                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
872                     }
873                 } else {
874                     final ByteBuffer payload = frame.getPayload();
875                     if (payload == null || payload.remaining() != 4) {
876                         throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
877                     }
878                     final int errorCode = payload.getInt();
879                     stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
880                     streamMap.remove(streamId);
881                     stream.releaseResources();
882                     requestSessionOutput();
883                 }
884             }
885             break;
886             case PING: {
887                 if (streamId != 0) {
888                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
889                 }
890                 final ByteBuffer ping = frame.getPayloadContent();
891                 if (ping == null || ping.remaining() != 8) {
892                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
893                 }
894                 if (frame.isFlagSet(FrameFlag.ACK)) {
895                     final AsyncPingHandler pingHandler = pingHandlers.poll();
896                     if (pingHandler != null) {
897                         pingHandler.consumeResponse(ping);
898                     }
899                 } else {
900                     final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
901                     pong.put(ping);
902                     pong.flip();
903                     final RawFrame response = frameFactory.createPingAck(pong);
904                     commitFrame(response);
905                 }
906             }
907             break;
908             case SETTINGS: {
909                 if (streamId != 0) {
910                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
911                 }
912                 if (frame.isFlagSet(FrameFlag.ACK)) {
913                     if (localSettingState == SettingsHandshake.TRANSMITTED) {
914                         localSettingState = SettingsHandshake.ACKED;
915                         ioSession.setEvent(SelectionKey.OP_WRITE);
916                         applyLocalSettings();
917                     }
918                 } else {
919                     final ByteBuffer payload = frame.getPayload();
920                     if (payload != null) {
921                         if ((payload.remaining() % 6) != 0) {
922                             throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
923                         }
924                         consumeSettingsFrame(payload);
925                         remoteSettingState = SettingsHandshake.TRANSMITTED;
926                     }
927                     // Send ACK
928                     final RawFrame response = frameFactory.createSettingsAck();
929                     commitFrame(response);
930                     remoteSettingState = SettingsHandshake.ACKED;
931                 }
932             }
933             break;
934             case PRIORITY:
935                 // Stream priority not supported
936                 break;
937             case PUSH_PROMISE: {
938                 acceptPushFrame();
939 
940                 if (goAwayReceived ) {
941                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
942                 }
943 
944                 if (!localConfig.isPushEnabled()) {
945                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
946                 }
947 
948                 final H2Stream stream = getValidStream(streamId);
949                 if (stream.isRemoteClosed()) {
950                     stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
951                     break;
952                 }
953 
954                 final ByteBuffer payload = frame.getPayloadContent();
955                 if (payload == null || payload.remaining() < 4) {
956                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
957                 }
958                 final int promisedStreamId = payload.getInt();
959                 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
960                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
961                 }
962                 if (streamMap.get(promisedStreamId) != null) {
963                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
964                 }
965 
966                 updateLastStreamId(promisedStreamId);
967 
968                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
969                         promisedStreamId, false, initInputWinSize, initOutputWinSize);
970                 final H2StreamHandler streamHandler;
971                 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
972                     streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics,
973                             stream.getPushHandlerFactory());
974                 } else {
975                     streamHandler = NoopH2StreamHandler.INSTANCE;
976                     channel.setLocalEndStream();
977                 }
978 
979                 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
980                 streamMap.put(promisedStreamId, promisedStream);
981 
982                 try {
983                     consumePushPromiseFrame(frame, payload, promisedStream);
984                 } catch (final H2StreamResetException ex) {
985                     promisedStream.localReset(ex);
986                 } catch (final HttpStreamResetException ex) {
987                     promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
988                 }
989             }
990             break;
991             case GOAWAY: {
992                 if (streamId != 0) {
993                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
994                 }
995                 final ByteBuffer payload = frame.getPayload();
996                 if (payload == null || payload.remaining() < 8) {
997                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
998                 }
999                 final int processedLocalStreamId = payload.getInt();
1000                 final int errorCode = payload.getInt();
1001                 goAwayReceived = true;
1002                 if (errorCode == H2Error.NO_ERROR.getCode()) {
1003                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
1004                         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1005                             final Map.Entry<Integer, H2Stream> entry = it.next();
1006                             final int activeStreamId = entry.getKey();
1007                             if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
1008                                 final H2Stream stream = entry.getValue();
1009                                 stream.cancel();
1010                                 it.remove();
1011                             }
1012                         }
1013                     }
1014                     connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
1015                 } else {
1016                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1017                         final Map.Entry<Integer, H2Stream> entry = it.next();
1018                         final H2Stream stream = entry.getValue();
1019                         stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
1020                     }
1021                     streamMap.clear();
1022                     connState = ConnectionHandshake.SHUTDOWN;
1023                 }
1024             }
1025             ioSession.setEvent(SelectionKey.OP_WRITE);
1026             break;
1027         }
1028     }
1029 
1030     private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1031         final int streamId = stream.getId();
1032         final ByteBuffer payload = frame.getPayloadContent();
1033         if (payload != null) {
1034             final int frameLength = frame.getLength();
1035             final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1036             if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1037                 stream.produceInputCapacityUpdate();
1038             }
1039             final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1040             if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1041                 maximizeConnWindow(connWinSize);
1042             }
1043         }
1044         if (stream.isRemoteClosed()) {
1045             throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1046         }
1047         if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1048             stream.setRemoteEndStream();
1049         }
1050         if (stream.isLocalReset()) {
1051             return;
1052         }
1053         stream.consumeData(payload);
1054     }
1055 
1056     private void maximizeConnWindow(final int connWinSize) throws IOException {
1057         final int delta = Integer.MAX_VALUE - connWinSize;
1058         if (delta > 0) {
1059             final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1060             commitFrame(windowUpdateFrame);
1061             updateInputWindow(0, connInputWindow, delta);
1062         }
1063     }
1064 
1065     private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1066         final int promisedStreamId = promisedStream.getId();
1067         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1068             continuation = new Continuation(promisedStreamId, frame.getType(), true);
1069         }
1070         if (continuation == null) {
1071             final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1072             if (promisedStreamId > processedRemoteStreamId) {
1073                 processedRemoteStreamId = promisedStreamId;
1074             }
1075             if (streamListener != null) {
1076                 streamListener.onHeaderInput(this, promisedStreamId, headers);
1077             }
1078             promisedStream.consumePromise(headers);
1079         } else {
1080             continuation.copyPayload(payload);
1081         }
1082     }
1083 
1084     List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1085         return hPackDecoder.decodeHeaders(payload);
1086     }
1087 
1088     private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1089         final int streamId = stream.getId();
1090         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1091             continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1092         }
1093         final ByteBuffer payload = frame.getPayloadContent();
1094         if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1095             // Priority not supported
1096             payload.getInt();
1097             payload.get();
1098         }
1099         if (continuation == null) {
1100             final List<Header> headers = decodeHeaders(payload);
1101             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1102                 processedRemoteStreamId = streamId;
1103             }
1104             if (streamListener != null) {
1105                 streamListener.onHeaderInput(this, streamId, headers);
1106             }
1107             if (stream.isRemoteClosed()) {
1108                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1109             }
1110             if (stream.isLocalReset()) {
1111                 return;
1112             }
1113             if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1114                 stream.setRemoteEndStream();
1115             }
1116             stream.consumeHeader(headers);
1117         } else {
1118             continuation.copyPayload(payload);
1119         }
1120     }
1121 
1122     private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1123         final int streamId = frame.getStreamId();
1124         final ByteBuffer payload = frame.getPayload();
1125         continuation.copyPayload(payload);
1126         if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1127             final List<Header> headers = decodeHeaders(continuation.getContent());
1128             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1129                 processedRemoteStreamId = streamId;
1130             }
1131             if (streamListener != null) {
1132                 streamListener.onHeaderInput(this, streamId, headers);
1133             }
1134             if (stream.isRemoteClosed()) {
1135                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1136             }
1137             if (stream.isLocalReset()) {
1138                 return;
1139             }
1140             if (continuation.endStream) {
1141                 stream.setRemoteEndStream();
1142             }
1143             if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1144                 stream.consumePromise(headers);
1145             } else {
1146                 stream.consumeHeader(headers);
1147             }
1148             continuation = null;
1149         }
1150     }
1151 
1152     private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1153         final H2Config.Builder configBuilder = H2Config.initial();
1154         while (payload.hasRemaining()) {
1155             final int code = payload.getShort();
1156             final int value = payload.getInt();
1157             final H2Param param = H2Param.valueOf(code);
1158             if (param != null) {
1159                 switch (param) {
1160                     case HEADER_TABLE_SIZE:
1161                         try {
1162                             configBuilder.setHeaderTableSize(value);
1163                         } catch (final IllegalArgumentException ex) {
1164                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1165                         }
1166                         break;
1167                     case MAX_CONCURRENT_STREAMS:
1168                         try {
1169                             configBuilder.setMaxConcurrentStreams(value);
1170                         } catch (final IllegalArgumentException ex) {
1171                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1172                         }
1173                         break;
1174                     case ENABLE_PUSH:
1175                         configBuilder.setPushEnabled(value == 1);
1176                         break;
1177                     case INITIAL_WINDOW_SIZE:
1178                         try {
1179                             configBuilder.setInitialWindowSize(value);
1180                         } catch (final IllegalArgumentException ex) {
1181                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1182                         }
1183                         break;
1184                     case MAX_FRAME_SIZE:
1185                         try {
1186                             configBuilder.setMaxFrameSize(value);
1187                         } catch (final IllegalArgumentException ex) {
1188                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1189                         }
1190                         break;
1191                     case MAX_HEADER_LIST_SIZE:
1192                         try {
1193                             configBuilder.setMaxHeaderListSize(value);
1194                         } catch (final IllegalArgumentException ex) {
1195                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1196                         }
1197                         break;
1198                 }
1199             }
1200         }
1201         applyRemoteSettings(configBuilder.build());
1202     }
1203 
1204     private void produceOutput() throws HttpException, IOException {
1205         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1206             final Map.Entry<Integer, H2Stream> entry = it.next();
1207             final H2Stream stream = entry.getValue();
1208             if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1209                 stream.produceOutput();
1210             }
1211             if (stream.isTerminated()) {
1212                 it.remove();
1213                 stream.releaseResources();
1214                 requestSessionOutput();
1215             }
1216             if (!outputQueue.isEmpty()) {
1217                 break;
1218             }
1219         }
1220     }
1221 
1222     private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1223         remoteConfig = config;
1224 
1225         hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1226         final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1227         initOutputWinSize = remoteConfig.getInitialWindowSize();
1228 
1229         if (delta != 0) {
1230             if (!streamMap.isEmpty()) {
1231                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1232                     final Map.Entry<Integer, H2Stream> entry = it.next();
1233                     final H2Stream stream = entry.getValue();
1234                     try {
1235                         updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1236                     } catch (final ArithmeticException ex) {
1237                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1238                     }
1239                 }
1240             }
1241         }
1242     }
1243 
1244     private void applyLocalSettings() throws H2ConnectionException {
1245         hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1246         hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1247 
1248         final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1249         initInputWinSize = localConfig.getInitialWindowSize();
1250 
1251         if (delta != 0 && !streamMap.isEmpty()) {
1252             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1253                 final Map.Entry<Integer, H2Stream> entry = it.next();
1254                 final H2Stream stream = entry.getValue();
1255                 try {
1256                     updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1257                 } catch (final ArithmeticException ex) {
1258                     throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1259                 }
1260             }
1261         }
1262         lowMark = initInputWinSize / 2;
1263     }
1264 
1265     @Override
1266     public void close() throws IOException {
1267         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1268     }
1269 
1270     @Override
1271     public void close(final CloseMode closeMode) {
1272         ioSession.close(closeMode);
1273     }
1274 
1275     @Override
1276     public boolean isOpen() {
1277         return connState == ConnectionHandshake.ACTIVE;
1278     }
1279 
1280     @Override
1281     public void setSocketTimeout(final Timeout timeout) {
1282         ioSession.setSocketTimeout(timeout);
1283     }
1284 
1285     @Override
1286     public SSLSession getSSLSession() {
1287         final TlsDetails tlsDetails = ioSession.getTlsDetails();
1288         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1289     }
1290 
1291     @Override
1292     public EndpointDetails getEndpointDetails() {
1293         if (endpointDetails == null) {
1294             endpointDetails = new BasicEndpointDetails(
1295                     ioSession.getRemoteAddress(),
1296                     ioSession.getLocalAddress(),
1297                     connMetrics,
1298                     ioSession.getSocketTimeout());
1299         }
1300         return endpointDetails;
1301     }
1302 
1303     @Override
1304     public Timeout getSocketTimeout() {
1305         return ioSession.getSocketTimeout();
1306     }
1307 
1308     @Override
1309     public ProtocolVersion getProtocolVersion() {
1310         return HttpVersion.HTTP_2;
1311     }
1312 
1313     @Override
1314     public SocketAddress getRemoteAddress() {
1315         return ioSession.getRemoteAddress();
1316     }
1317 
1318     @Override
1319     public SocketAddress getLocalAddress() {
1320         return ioSession.getLocalAddress();
1321     }
1322 
1323     void appendState(final StringBuilder buf) {
1324         buf.append("connState=").append(connState)
1325                 .append(", connInputWindow=").append(connInputWindow)
1326                 .append(", connOutputWindow=").append(connOutputWindow)
1327                 .append(", outputQueue=").append(outputQueue.size())
1328                 .append(", streamMap=").append(streamMap.size())
1329                 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1330     }
1331 
1332     private static class Continuation {
1333 
1334         final int streamId;
1335         final int type;
1336         final boolean endStream;
1337         final ByteArrayBuffer headerBuffer;
1338 
1339         private Continuation(final int streamId, final int type, final boolean endStream) {
1340             this.streamId = streamId;
1341             this.type = type;
1342             this.endStream = endStream;
1343             this.headerBuffer = new ByteArrayBuffer(1024);
1344         }
1345 
1346         void copyPayload(final ByteBuffer payload) {
1347             if (payload == null) {
1348                 return;
1349             }
1350             headerBuffer.ensureCapacity(payload.remaining());
1351             payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1352         }
1353 
1354         ByteBuffer getContent() {
1355             return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1356         }
1357 
1358     }
1359 
1360     private class H2StreamChannelImpl implements H2StreamChannel {
1361 
1362         private final int id;
1363         private final AtomicInteger inputWindow;
1364         private final AtomicInteger outputWindow;
1365 
1366         private volatile boolean idle;
1367         private volatile boolean remoteEndStream;
1368         private volatile boolean localEndStream;
1369 
1370         private volatile long deadline;
1371 
1372         H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1373             this.id = id;
1374             this.idle = idle;
1375             this.inputWindow = new AtomicInteger(initialInputWindowSize);
1376             this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1377         }
1378 
1379         int getId() {
1380             return id;
1381         }
1382 
1383         AtomicInteger getOutputWindow() {
1384             return outputWindow;
1385         }
1386 
1387         AtomicInteger getInputWindow() {
1388             return inputWindow;
1389         }
1390 
1391         @Override
1392         public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1393             ioSession.getLock().lock();
1394             try {
1395                 if (headers == null || headers.isEmpty()) {
1396                     throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1397                 }
1398                 if (localEndStream) {
1399                     return;
1400                 }
1401                 idle = false;
1402                 commitHeaders(id, headers, endStream);
1403                 if (endStream) {
1404                     localEndStream = true;
1405                 }
1406             } finally {
1407                 ioSession.getLock().unlock();
1408             }
1409         }
1410 
1411         @Override
1412         public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1413             acceptPushRequest();
1414             final int promisedStreamId = generateStreamId();
1415             final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1416                     promisedStreamId,
1417                     true,
1418                     localConfig.getInitialWindowSize(),
1419                     remoteConfig.getInitialWindowSize());
1420             final HttpCoreContext context = HttpCoreContext.create();
1421             context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1422             context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1423             final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1424                     channel, httpProcessor, connMetrics, pushProducer, context);
1425             final H2Stream stream = new H2Stream(channel, streamHandler, false);
1426             streamMap.put(promisedStreamId, stream);
1427 
1428             ioSession.getLock().lock();
1429             try {
1430                 if (localEndStream) {
1431                     stream.releaseResources();
1432                     return;
1433                 }
1434                 commitPushPromise(id, promisedStreamId, headers);
1435                 idle = false;
1436             } finally {
1437                 ioSession.getLock().unlock();
1438             }
1439         }
1440 
1441         @Override
1442         public void update(final int increment) throws IOException {
1443             if (remoteEndStream) {
1444                 return;
1445             }
1446             incrementInputCapacity(0, connInputWindow, increment);
1447             incrementInputCapacity(id, inputWindow, increment);
1448         }
1449 
1450         @Override
1451         public int write(final ByteBuffer payload) throws IOException {
1452             ioSession.getLock().lock();
1453             try {
1454                 if (localEndStream) {
1455                     return 0;
1456                 }
1457                 return streamData(id, outputWindow, payload);
1458             } finally {
1459                 ioSession.getLock().unlock();
1460             }
1461         }
1462 
1463         @Override
1464         public void endStream(final List<? extends Header> trailers) throws IOException {
1465             ioSession.getLock().lock();
1466             try {
1467                 if (localEndStream) {
1468                     return;
1469                 }
1470                 localEndStream = true;
1471                 if (trailers != null && !trailers.isEmpty()) {
1472                     commitHeaders(id, trailers, true);
1473                 } else {
1474                     final RawFrame frame = frameFactory.createData(id, null, true);
1475                     commitFrameInternal(frame);
1476                 }
1477             } finally {
1478                 ioSession.getLock().unlock();
1479             }
1480         }
1481 
1482         @Override
1483         public void endStream() throws IOException {
1484             endStream(null);
1485         }
1486 
1487         @Override
1488         public void requestOutput() {
1489             requestSessionOutput();
1490         }
1491 
1492         boolean isRemoteClosed() {
1493             return remoteEndStream;
1494         }
1495 
1496         void setRemoteEndStream() {
1497             remoteEndStream = true;
1498         }
1499 
1500         boolean isLocalClosed() {
1501             return localEndStream;
1502         }
1503 
1504         void setLocalEndStream() {
1505             localEndStream = true;
1506         }
1507 
1508         boolean isLocalReset() {
1509             return deadline > 0;
1510         }
1511 
1512         boolean isResetDeadline() {
1513             final long l = deadline;
1514             return l > 0 && l < System.currentTimeMillis();
1515         }
1516 
1517         boolean localReset(final int code) throws IOException {
1518             ioSession.getLock().lock();
1519             try {
1520                 if (localEndStream) {
1521                     return false;
1522                 }
1523                 localEndStream = true;
1524                 deadline = System.currentTimeMillis() + LINGER_TIME;
1525                 if (!idle) {
1526                     final RawFrame resetStream = frameFactory.createResetStream(id, code);
1527                     commitFrameInternal(resetStream);
1528                     return true;
1529                 }
1530                 return false;
1531             } finally {
1532                 ioSession.getLock().unlock();
1533             }
1534         }
1535 
1536         boolean localReset(final H2Error error) throws IOException {
1537             return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1538         }
1539 
1540         @Override
1541         public boolean cancel() {
1542             try {
1543                 return localReset(H2Error.CANCEL);
1544             } catch (final IOException ignore) {
1545                 return false;
1546             }
1547         }
1548 
1549         void appendState(final StringBuilder buf) {
1550             buf.append("id=").append(id)
1551                     .append(", connState=").append(connState)
1552                     .append(", inputWindow=").append(inputWindow)
1553                     .append(", outputWindow=").append(outputWindow)
1554                     .append(", localEndStream=").append(localEndStream)
1555                     .append(", idle=").append(idle);
1556         }
1557 
1558         @Override
1559         public String toString() {
1560             final StringBuilder buf = new StringBuilder();
1561             buf.append("[");
1562             appendState(buf);
1563             buf.append("]");
1564             return buf.toString();
1565         }
1566 
1567     }
1568 
1569     static class H2Stream {
1570 
1571         private final H2StreamChannelImpl channel;
1572         private final H2StreamHandler handler;
1573         private final boolean remoteInitiated;
1574 
1575         private H2Stream(
1576                 final H2StreamChannelImpl channel,
1577                 final H2StreamHandler handler,
1578                 final boolean remoteInitiated) {
1579             this.channel = channel;
1580             this.handler = handler;
1581             this.remoteInitiated = remoteInitiated;
1582         }
1583 
1584         int getId() {
1585             return channel.getId();
1586         }
1587 
1588         boolean isRemoteInitiated() {
1589             return remoteInitiated;
1590         }
1591 
1592         AtomicInteger getOutputWindow() {
1593             return channel.getOutputWindow();
1594         }
1595 
1596         AtomicInteger getInputWindow() {
1597             return channel.getInputWindow();
1598         }
1599 
1600         boolean isTerminated() {
1601             return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1602         }
1603 
1604         boolean isRemoteClosed() {
1605             return channel.isRemoteClosed();
1606         }
1607 
1608         boolean isLocalClosed() {
1609             return channel.isLocalClosed();
1610         }
1611 
1612         boolean isLocalReset() {
1613             return channel.isLocalReset();
1614         }
1615 
1616         void setRemoteEndStream() {
1617             channel.setRemoteEndStream();
1618         }
1619 
1620         void consumePromise(final List<Header> headers) throws HttpException, IOException {
1621             try {
1622                 handler.consumePromise(headers);
1623                 channel.setLocalEndStream();
1624             } catch (final ProtocolException ex) {
1625                 localReset(ex, H2Error.PROTOCOL_ERROR);
1626             }
1627         }
1628 
1629         void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1630             try {
1631                 handler.consumeHeader(headers, channel.isRemoteClosed());
1632             } catch (final ProtocolException ex) {
1633                 localReset(ex, H2Error.PROTOCOL_ERROR);
1634             }
1635         }
1636 
1637         void consumeData(final ByteBuffer src) throws HttpException, IOException {
1638             try {
1639                 handler.consumeData(src, channel.isRemoteClosed());
1640             } catch (final CharacterCodingException ex) {
1641                 localReset(ex, H2Error.INTERNAL_ERROR);
1642             } catch (final ProtocolException ex) {
1643                 localReset(ex, H2Error.PROTOCOL_ERROR);
1644             }
1645         }
1646 
1647         boolean isOutputReady() {
1648             return handler.isOutputReady();
1649         }
1650 
1651         void produceOutput() throws HttpException, IOException {
1652             try {
1653                 handler.produceOutput();
1654             } catch (final ProtocolException ex) {
1655                 localReset(ex, H2Error.PROTOCOL_ERROR);
1656             }
1657         }
1658 
1659         void produceInputCapacityUpdate() throws IOException {
1660             handler.updateInputCapacity();
1661         }
1662 
1663         void reset(final Exception cause) {
1664             channel.setRemoteEndStream();
1665             channel.setLocalEndStream();
1666             handler.failed(cause);
1667         }
1668 
1669         void localReset(final Exception cause, final int code) throws IOException {
1670             channel.localReset(code);
1671             handler.failed(cause);
1672         }
1673 
1674         void localReset(final Exception cause, final H2Error error) throws IOException {
1675             localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1676         }
1677 
1678         void localReset(final H2StreamResetException ex) throws IOException {
1679             localReset(ex, ex.getCode());
1680         }
1681 
1682         void handle(final HttpExceptionn.html#HttpException">HttpException ex) throws IOException, HttpException {
1683             handler.handle(ex, channel.isRemoteClosed());
1684         }
1685 
1686         HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1687             return handler.getPushHandlerFactory();
1688         }
1689 
1690         void cancel() {
1691             reset(new RequestNotExecutedException());
1692         }
1693 
1694         boolean abort() {
1695             final boolean cancelled = channel.cancel();
1696             handler.failed(new RequestNotExecutedException());
1697             return cancelled;
1698         }
1699 
1700         void releaseResources() {
1701             handler.releaseResources();
1702         }
1703 
1704         void appendState(final StringBuilder buf) {
1705             buf.append("channel=[");
1706             channel.appendState(buf);
1707             buf.append("]");
1708         }
1709 
1710         @Override
1711         public String toString() {
1712             final StringBuilder buf = new StringBuilder();
1713             buf.append("[");
1714             appendState(buf);
1715             buf.append("]");
1716             return buf.toString();
1717         }
1718 
1719     }
1720 
1721 }