View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.SelectionKey;
33  import java.nio.charset.CharacterCodingException;
34  import java.util.Deque;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Queue;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.ConcurrentLinkedDeque;
41  import java.util.concurrent.ConcurrentLinkedQueue;
42  import java.util.concurrent.atomic.AtomicInteger;
43  
44  import javax.net.ssl.SSLSession;
45  
46  import org.apache.hc.core5.concurrent.CancellableDependency;
47  import org.apache.hc.core5.http.ConnectionClosedException;
48  import org.apache.hc.core5.http.EndpointDetails;
49  import org.apache.hc.core5.http.Header;
50  import org.apache.hc.core5.http.HttpConnection;
51  import org.apache.hc.core5.http.HttpException;
52  import org.apache.hc.core5.http.HttpStreamResetException;
53  import org.apache.hc.core5.http.HttpVersion;
54  import org.apache.hc.core5.http.ProtocolException;
55  import org.apache.hc.core5.http.ProtocolVersion;
56  import org.apache.hc.core5.http.RequestNotExecutedException;
57  import org.apache.hc.core5.http.config.CharCodingConfig;
58  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
59  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
60  import org.apache.hc.core5.http.impl.CharCodingSupport;
61  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
62  import org.apache.hc.core5.http.nio.AsyncPushProducer;
63  import org.apache.hc.core5.http.nio.HandlerFactory;
64  import org.apache.hc.core5.http.nio.command.ExecutableCommand;
65  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
66  import org.apache.hc.core5.http.protocol.HttpCoreContext;
67  import org.apache.hc.core5.http.protocol.HttpProcessor;
68  import org.apache.hc.core5.http2.H2ConnectionException;
69  import org.apache.hc.core5.http2.H2Error;
70  import org.apache.hc.core5.http2.H2StreamResetException;
71  import org.apache.hc.core5.http2.config.H2Config;
72  import org.apache.hc.core5.http2.config.H2Param;
73  import org.apache.hc.core5.http2.config.H2Setting;
74  import org.apache.hc.core5.http2.frame.FrameFactory;
75  import org.apache.hc.core5.http2.frame.FrameFlag;
76  import org.apache.hc.core5.http2.frame.FrameType;
77  import org.apache.hc.core5.http2.frame.RawFrame;
78  import org.apache.hc.core5.http2.frame.StreamIdGenerator;
79  import org.apache.hc.core5.http2.hpack.HPackDecoder;
80  import org.apache.hc.core5.http2.hpack.HPackEncoder;
81  import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
82  import org.apache.hc.core5.http2.nio.AsyncPingHandler;
83  import org.apache.hc.core5.http2.nio.command.PingCommand;
84  import org.apache.hc.core5.io.CloseMode;
85  import org.apache.hc.core5.reactor.Command;
86  import org.apache.hc.core5.reactor.ProtocolIOSession;
87  import org.apache.hc.core5.reactor.ssl.TlsDetails;
88  import org.apache.hc.core5.util.Args;
89  import org.apache.hc.core5.util.ByteArrayBuffer;
90  import org.apache.hc.core5.util.Identifiable;
91  import org.apache.hc.core5.util.Timeout;
92  
93  abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
94  
95      private static final long LINGER_TIME = 1000; // 1 second
96      private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; // 10 MiB
97  
98      enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
99      enum SettingsHandshake { READY, TRANSMITTED, ACKED }
100 
101     private final ProtocolIOSession ioSession;
102     private final FrameFactory frameFactory;
103     private final StreamIdGenerator idGenerator;
104     private final HttpProcessor httpProcessor;
105     private final H2Config localConfig;
106     private final BasicH2TransportMetrics inputMetrics;
107     private final BasicH2TransportMetrics outputMetrics;
108     private final BasicHttpConnectionMetrics connMetrics;
109     private final FrameInputBuffer inputBuffer;
110     private final FrameOutputBuffer outputBuffer;
111     private final Deque<RawFrame> outputQueue;
112     private final HPackEncoder hPackEncoder;
113     private final HPackDecoder hPackDecoder;
114     private final Map<Integer, H2Stream> streamMap;
115     private final Queue<AsyncPingHandler> pingHandlers;
116     private final AtomicInteger connInputWindow;
117     private final AtomicInteger connOutputWindow;
118     private final AtomicInteger outputRequests;
119     private final AtomicInteger lastStreamId;
120     private final H2StreamListener streamListener;
121 
122     private ConnectionHandshake connState = ConnectionHandshake.READY;
123     private SettingsHandshake localSettingState = SettingsHandshake.READY;
124     private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
125 
126     private int initInputWinSize;
127     private int initOutputWinSize;
128     private int lowMark;
129 
130     private volatile H2Config remoteConfig;
131 
132     private Continuation continuation;
133 
134     private int processedRemoteStreamId;
135     private EndpointDetails endpointDetails;
136     private boolean goAwayReceived;
137 
138     AbstractH2StreamMultiplexer(
139             final ProtocolIOSession ioSession,
140             final FrameFactory frameFactory,
141             final StreamIdGenerator idGenerator,
142             final HttpProcessor httpProcessor,
143             final CharCodingConfig charCodingConfig,
144             final H2Config h2Config,
145             final H2StreamListener streamListener) {
146         this.ioSession = Args.notNull(ioSession, "IO session");
147         this.frameFactory = Args.notNull(frameFactory, "Frame factory");
148         this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
149         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
150         this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
151         this.inputMetrics = new BasicH2TransportMetrics();
152         this.outputMetrics = new BasicH2TransportMetrics();
153         this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
154         this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
155         this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
156         this.outputQueue = new ConcurrentLinkedDeque<>();
157         this.pingHandlers = new ConcurrentLinkedQueue<>();
158         this.outputRequests = new AtomicInteger(0);
159         this.lastStreamId = new AtomicInteger(0);
160         this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
161         this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
162         this.streamMap = new ConcurrentHashMap<>();
163         this.remoteConfig = H2Config.INIT;
164         this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
165         this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
166 
167         this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
168         this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
169 
170         this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
171         this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
172         this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
173 
174         this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
175         this.streamListener = streamListener;
176     }
177 
178     @Override
179     public String getId() {
180         return ioSession.getId();
181     }
182 
183     abstract void acceptHeaderFrame() throws H2ConnectionException;
184 
185     abstract void acceptPushRequest() throws H2ConnectionException;
186 
187     abstract void acceptPushFrame() throws H2ConnectionException;
188 
189     abstract H2StreamHandler createRemotelyInitiatedStream(
190             H2StreamChannel channel,
191             HttpProcessor httpProcessor,
192             BasicHttpConnectionMetrics connMetrics,
193             HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
194 
195     abstract H2StreamHandler createLocallyInitiatedStream(
196             ExecutableCommand command,
197             H2StreamChannel channel,
198             HttpProcessor httpProcessor,
199             BasicHttpConnectionMetrics connMetrics) throws IOException;
200 
201     private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
202         for (;;) {
203             final int current = window.get();
204             long newValue = (long) current + delta;
205 
206             //TODO: work-around for what looks like a bug in Ngnix (1.11)
207             // Tolerate if the update window exceeded by one
208             if (newValue == 0x80000000L) {
209                 newValue = Integer.MAX_VALUE;
210             }
211             //TODO: needs to be removed
212 
213             if (Math.abs(newValue) > 0x7fffffffL) {
214                 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
215             }
216             if (window.compareAndSet(current, (int) newValue)) {
217                 return (int) newValue;
218             }
219         }
220     }
221 
222     private int updateInputWindow(
223             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
224         final int newSize = updateWindow(window, delta);
225         if (streamListener != null) {
226             streamListener.onInputFlowControl(this, streamId, delta, newSize);
227         }
228         return newSize;
229     }
230 
231     private int updateOutputWindow(
232             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
233         final int newSize = updateWindow(window, delta);
234         if (streamListener != null) {
235             streamListener.onOutputFlowControl(this, streamId, delta, newSize);
236         }
237         return newSize;
238     }
239 
240     private void commitFrameInternal(final RawFrame frame) throws IOException {
241         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
242             if (streamListener != null) {
243                 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
244             }
245             outputBuffer.write(frame, ioSession);
246         } else {
247             outputQueue.addLast(frame);
248         }
249         ioSession.setEvent(SelectionKey.OP_WRITE);
250     }
251 
252     private void commitFrame(final RawFrame frame) throws IOException {
253         Args.notNull(frame, "Frame");
254         ioSession.getLock().lock();
255         try {
256             commitFrameInternal(frame);
257         } finally {
258             ioSession.getLock().unlock();
259         }
260     }
261 
262     private void commitHeaders(
263             final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
264         if (streamListener != null) {
265             streamListener.onHeaderOutput(this, streamId, headers);
266         }
267         final ByteArrayBuffer buf = new ByteArrayBuffer(512);
268         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
269 
270         int off = 0;
271         int remaining = buf.length();
272         boolean continuation = false;
273 
274         while (remaining > 0) {
275             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
276             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
277 
278             remaining -= chunk;
279             off += chunk;
280 
281             final boolean endHeaders = remaining == 0;
282             final RawFrame frame;
283             if (!continuation) {
284                 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
285                 continuation = true;
286             } else {
287                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
288             }
289             commitFrameInternal(frame);
290         }
291     }
292 
293     private void commitPushPromise(
294             final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
295         if (headers == null || headers.isEmpty()) {
296             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
297         }
298         if (streamListener != null) {
299             streamListener.onHeaderOutput(this, streamId, headers);
300         }
301         final ByteArrayBuffer buf = new ByteArrayBuffer(512);
302         buf.append((byte)(promisedStreamId >> 24));
303         buf.append((byte)(promisedStreamId >> 16));
304         buf.append((byte)(promisedStreamId >> 8));
305         buf.append((byte)(promisedStreamId));
306 
307         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
308 
309         int off = 0;
310         int remaining = buf.length();
311         boolean continuation = false;
312 
313         while (remaining > 0) {
314             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
315             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
316 
317             remaining -= chunk;
318             off += chunk;
319 
320             final boolean endHeaders = remaining == 0;
321             final RawFrame frame;
322             if (!continuation) {
323                 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
324                 continuation = true;
325             } else {
326                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
327             }
328             commitFrameInternal(frame);
329         }
330     }
331 
332     private void streamDataFrame(
333             final int streamId,
334             final AtomicInteger streamOutputWindow,
335             final ByteBuffer payload,
336             final int chunk) throws IOException {
337         final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
338         if (streamListener != null) {
339             streamListener.onFrameOutput(this, streamId, dataFrame);
340         }
341         updateOutputWindow(0, connOutputWindow, -chunk);
342         updateOutputWindow(streamId, streamOutputWindow, -chunk);
343         outputBuffer.write(dataFrame, ioSession);
344     }
345 
346     private int streamData(
347             final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
348         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
349             final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
350             if (capacity <= 0) {
351                 return 0;
352             }
353             final int maxPayloadSize = Math.min(capacity, remoteConfig.getMaxFrameSize());
354             final int chunk;
355             if (payload.remaining() <= maxPayloadSize) {
356                 chunk = payload.remaining();
357                 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
358             } else {
359                 chunk = maxPayloadSize;
360                 final int originalLimit = payload.limit();
361                 try {
362                     payload.limit(payload.position() + chunk);
363                     streamDataFrame(streamId, streamOutputWindow, payload, chunk);
364                 } finally {
365                     payload.limit(originalLimit);
366                 }
367             }
368             payload.position(payload.position() + chunk);
369             ioSession.setEvent(SelectionKey.OP_WRITE);
370             return chunk;
371         }
372         return 0;
373     }
374 
375     private void incrementInputCapacity(
376             final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
377         if (inputCapacity > 0) {
378             final int streamWinSize = inputWindow.get();
379             final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
380             final int chunk = Math.min(inputCapacity, remainingCapacity);
381             if (chunk != 0) {
382                 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
383                 commitFrame(windowUpdateFrame);
384                 updateInputWindow(streamId, inputWindow, chunk);
385             }
386         }
387     }
388 
389     private void requestSessionOutput() {
390         outputRequests.incrementAndGet();
391         ioSession.setEvent(SelectionKey.OP_WRITE);
392     }
393 
394     private void updateLastStreamId(final int streamId) {
395         final int currentId = lastStreamId.get();
396         if (streamId > currentId) {
397             lastStreamId.compareAndSet(currentId, streamId);
398         }
399     }
400 
401     private int generateStreamId() {
402         for (;;) {
403             final int currentId = lastStreamId.get();
404             final int newStreamId = idGenerator.generate(currentId);
405             if (lastStreamId.compareAndSet(currentId, newStreamId)) {
406                 return newStreamId;
407             }
408         }
409     }
410 
411     public final void onConnect() throws HttpException, IOException {
412         connState = ConnectionHandshake.ACTIVE;
413         final RawFrame settingsFrame = frameFactory.createSettings(
414                 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
415                 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
416                 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
417                 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
418                 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
419                 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
420 
421         commitFrame(settingsFrame);
422         localSettingState = SettingsHandshake.TRANSMITTED;
423         maximizeConnWindow(connInputWindow.get());
424 
425         if (streamListener != null) {
426             final int initInputWindow = connInputWindow.get();
427             streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
428             final int initOutputWindow = connOutputWindow.get();
429             streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
430         }
431     }
432 
433     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
434         if (connState == ConnectionHandshake.SHUTDOWN) {
435             ioSession.clearEvent(SelectionKey.OP_READ);
436         } else {
437             for (;;) {
438                 final RawFrame frame = inputBuffer.read(src, ioSession);
439                 if (frame == null) {
440                     break;
441                 }
442                 if (streamListener != null) {
443                     streamListener.onFrameInput(this, frame.getStreamId(), frame);
444                 }
445                 consumeFrame(frame);
446             }
447         }
448     }
449 
450     public final void onOutput() throws HttpException, IOException {
451         ioSession.getLock().lock();
452         try {
453             if (!outputBuffer.isEmpty()) {
454                 outputBuffer.flush(ioSession);
455             }
456             while (outputBuffer.isEmpty()) {
457                 final RawFrame frame = outputQueue.poll();
458                 if (frame != null) {
459                     if (streamListener != null) {
460                         streamListener.onFrameOutput(this, frame.getStreamId(), frame);
461                     }
462                     outputBuffer.write(frame, ioSession);
463                 } else {
464                     break;
465                 }
466             }
467         } finally {
468             ioSession.getLock().unlock();
469         }
470 
471         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
472 
473             if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
474                 produceOutput();
475             }
476             final int pendingOutputRequests = outputRequests.get();
477             boolean outputPending = false;
478             if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
479                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
480                     final Map.Entry<Integer, H2Stream> entry = it.next();
481                     final H2Stream stream = entry.getValue();
482                     if (!stream.isLocalClosed()
483                             && stream.getOutputWindow().get() > 0
484                             && stream.isOutputReady()) {
485                         outputPending = true;
486                         break;
487                     }
488                 }
489             }
490             ioSession.getLock().lock();
491             try {
492                 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
493                         && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
494                     ioSession.clearEvent(SelectionKey.OP_WRITE);
495                 } else {
496                     outputRequests.addAndGet(-pendingOutputRequests);
497                 }
498             } finally {
499                 ioSession.getLock().unlock();
500             }
501         }
502 
503         if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
504             processPendingCommands();
505         }
506         if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
507             int liveStreams = 0;
508             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
509                 final Map.Entry<Integer, H2Stream> entry = it.next();
510                 final H2Stream stream = entry.getValue();
511                 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
512                     stream.releaseResources();
513                     it.remove();
514                 } else {
515                     if (idGenerator.isSameSide(stream.getId()) || stream.getId() <= processedRemoteStreamId) {
516                         liveStreams++;
517                     }
518                 }
519             }
520             if (liveStreams == 0) {
521                 connState = ConnectionHandshake.SHUTDOWN;
522             }
523         }
524         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
525             if (!streamMap.isEmpty()) {
526                 for (final H2Stream stream : streamMap.values()) {
527                     stream.releaseResources();
528                 }
529                 streamMap.clear();
530             }
531             ioSession.getLock().lock();
532             try {
533                 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
534                     ioSession.close();
535                 }
536             } finally {
537                 ioSession.getLock().unlock();
538             }
539         }
540     }
541 
542     public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
543         connState = ConnectionHandshake.SHUTDOWN;
544 
545         final RawFrame goAway;
546         if (localSettingState != SettingsHandshake.ACKED) {
547             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
548                             "Setting timeout (" + timeout + ")");
549         } else {
550             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
551                             "Timeout due to inactivity (" + timeout + ")");
552         }
553         commitFrame(goAway);
554         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
555             final Map.Entry<Integer, H2Stream> entry = it.next();
556             final H2Stream stream = entry.getValue();
557             stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
558         }
559         streamMap.clear();
560     }
561 
562     public final void onDisconnect() {
563         for (;;) {
564             final AsyncPingHandler pingHandler = pingHandlers.poll();
565             if (pingHandler != null) {
566                 pingHandler.cancel();
567             } else {
568                 break;
569             }
570         }
571         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
572             final Map.Entry<Integer, H2Stream> entry = it.next();
573             final H2Stream stream = entry.getValue();
574             stream.cancel();
575         }
576         for (;;) {
577             final Command command = ioSession.poll();
578             if (command != null) {
579                 if (command instanceof ExecutableCommand) {
580                     ((ExecutableCommand) command).failed(new ConnectionClosedException());
581                 } else {
582                     command.cancel();
583                 }
584             } else {
585                 break;
586             }
587         }
588     }
589 
590     private void processPendingCommands() throws IOException, HttpException {
591         while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
592             final Command command = ioSession.poll();
593             if (command == null) {
594                 break;
595             }
596             if (command instanceof ShutdownCommand) {
597                 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
598                 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
599                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
600                         final Map.Entry<Integer, H2Stream> entry = it.next();
601                         final H2Stream stream = entry.getValue();
602                         stream.cancel();
603                     }
604                     streamMap.clear();
605                     connState = ConnectionHandshake.SHUTDOWN;
606                 } else {
607                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
608                         final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
609                         commitFrame(goAway);
610                         connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
611                     }
612                 }
613                 break;
614             } else if (command instanceof PingCommand) {
615                 final PingCommand pingCommand = (PingCommand) command;
616                 final AsyncPingHandler handler = pingCommand.getHandler();
617                 pingHandlers.add(handler);
618                 final RawFrame ping = frameFactory.createPing(handler.getData());
619                 commitFrame(ping);
620             } else if (command instanceof ExecutableCommand) {
621                 final int streamId = generateStreamId();
622                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
623                         streamId, true, initInputWinSize, initOutputWinSize);
624                 final ExecutableCommand executableCommand = (ExecutableCommand) command;
625                 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
626                         executableCommand, channel, httpProcessor, connMetrics);
627 
628                 final H2Stream stream = new H2Stream(channel, streamHandler, false);
629                 streamMap.put(streamId, stream);
630 
631                 if (streamListener != null) {
632                     final int initInputWindow = stream.getInputWindow().get();
633                     streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
634                     final int initOutputWindow = stream.getOutputWindow().get();
635                     streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
636                 }
637 
638                 if (stream.isOutputReady()) {
639                     stream.produceOutput();
640                 }
641                 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
642                 if (cancellableDependency != null) {
643                     cancellableDependency.setDependency(stream::abort);
644                 }
645                 if (!outputQueue.isEmpty()) {
646                     return;
647                 }
648             }
649         }
650     }
651 
652     public final void onException(final Exception cause) {
653         try {
654             for (;;) {
655                 final AsyncPingHandler pingHandler = pingHandlers.poll();
656                 if (pingHandler != null) {
657                     pingHandler.failed(cause);
658                 } else {
659                     break;
660                 }
661             }
662             for (;;) {
663                 final Command command = ioSession.poll();
664                 if (command != null) {
665                     if (command instanceof ExecutableCommand) {
666                         ((ExecutableCommand) command).failed(new ConnectionClosedException());
667                     } else {
668                         command.cancel();
669                     }
670                 } else {
671                     break;
672                 }
673             }
674             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
675                 final Map.Entry<Integer, H2Stream> entry = it.next();
676                 final H2Stream stream = entry.getValue();
677                 stream.reset(cause);
678             }
679             streamMap.clear();
680             if (!(cause instanceof ConnectionClosedException)) {
681                 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
682                     final H2Error errorCode;
683                     if (cause instanceof H2ConnectionException) {
684                         errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
685                     } else if (cause instanceof ProtocolException){
686                         errorCode = H2Error.PROTOCOL_ERROR;
687                     } else {
688                         errorCode = H2Error.INTERNAL_ERROR;
689                     }
690                     final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
691                     commitFrame(goAway);
692                 }
693             }
694         } catch (final IOException ignore) {
695         } finally {
696             connState = ConnectionHandshake.SHUTDOWN;
697             final CloseMode closeMode;
698             if (cause instanceof ConnectionClosedException) {
699                 closeMode = CloseMode.GRACEFUL;
700             } else if (cause instanceof IOException) {
701                 closeMode = CloseMode.IMMEDIATE;
702             } else {
703                 closeMode = CloseMode.GRACEFUL;
704             }
705             ioSession.close(closeMode);
706         }
707     }
708 
709     private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
710         if (streamId == 0) {
711             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
712         }
713         final H2Stream stream = streamMap.get(streamId);
714         if (stream == null) {
715             if (streamId <= lastStreamId.get()) {
716                 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
717             } else {
718                 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
719             }
720         }
721         return stream;
722     }
723 
724     private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
725         final FrameType frameType = FrameType.valueOf(frame.getType());
726         final int streamId = frame.getStreamId();
727         if (continuation != null && frameType != FrameType.CONTINUATION) {
728             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
729         }
730         switch (frameType) {
731             case DATA: {
732                 final H2Stream stream = getValidStream(streamId);
733                 try {
734                     consumeDataFrame(frame, stream);
735                 } catch (final H2StreamResetException ex) {
736                     stream.localReset(ex);
737                 } catch (final HttpStreamResetException ex) {
738                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
739                 }
740 
741                 if (stream.isTerminated()) {
742                     streamMap.remove(streamId);
743                     stream.releaseResources();
744                     requestSessionOutput();
745                 }
746             }
747             break;
748             case HEADERS: {
749                 if (streamId == 0) {
750                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
751                 }
752                 H2Stream stream = streamMap.get(streamId);
753                 if (stream == null) {
754                     acceptHeaderFrame();
755 
756                     if (idGenerator.isSameSide(streamId)) {
757                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
758                     }
759                     if (goAwayReceived ) {
760                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
761                     }
762 
763                     updateLastStreamId(streamId);
764 
765                     final H2StreamChannelImpl channel = new H2StreamChannelImpl(
766                             streamId, false, initInputWinSize, initOutputWinSize);
767                     final H2StreamHandler streamHandler;
768                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
769                         streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null);
770                     } else {
771                         streamHandler = NoopH2StreamHandler.INSTANCE;
772                         channel.setLocalEndStream();
773                     }
774 
775                     stream = new H2Stream(channel, streamHandler, true);
776                     if (stream.isOutputReady()) {
777                         stream.produceOutput();
778                     }
779                     streamMap.put(streamId, stream);
780                 }
781 
782                 try {
783                     consumeHeaderFrame(frame, stream);
784 
785                     if (stream.isOutputReady()) {
786                         stream.produceOutput();
787                     }
788                 } catch (final H2StreamResetException ex) {
789                     stream.localReset(ex);
790                 } catch (final HttpStreamResetException ex) {
791                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
792                 } catch (final HttpException ex) {
793                     stream.handle(ex);
794                 }
795 
796                 if (stream.isTerminated()) {
797                     streamMap.remove(streamId);
798                     stream.releaseResources();
799                     requestSessionOutput();
800                 }
801             }
802             break;
803             case CONTINUATION: {
804                 if (continuation == null) {
805                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
806                 }
807                 if (streamId != continuation.streamId) {
808                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
809                 }
810 
811                 final H2Stream stream = getValidStream(streamId);
812                 try {
813 
814                     consumeContinuationFrame(frame, stream);
815                 } catch (final H2StreamResetException ex) {
816                     stream.localReset(ex);
817                 } catch (final HttpStreamResetException ex) {
818                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
819                 }
820 
821                 if (stream.isTerminated()) {
822                     streamMap.remove(streamId);
823                     stream.releaseResources();
824                     requestSessionOutput();
825                 }
826             }
827             break;
828             case WINDOW_UPDATE: {
829                 final ByteBuffer payload = frame.getPayload();
830                 if (payload == null || payload.remaining() != 4) {
831                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
832                 }
833                 final int delta = payload.getInt();
834                 if (delta <= 0) {
835                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
836                 }
837                 if (streamId == 0) {
838                     try {
839                         updateOutputWindow(0, connOutputWindow, delta);
840                     } catch (final ArithmeticException ex) {
841                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
842                     }
843                 } else {
844                     final H2Stream stream = streamMap.get(streamId);
845                     if (stream != null) {
846                         try {
847                             updateOutputWindow(streamId, stream.getOutputWindow(), delta);
848                         } catch (final ArithmeticException ex) {
849                             throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
850                         }
851                     }
852                 }
853                 ioSession.setEvent(SelectionKey.OP_WRITE);
854             }
855             break;
856             case RST_STREAM: {
857                 if (streamId == 0) {
858                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
859                 }
860                 final H2Stream stream = streamMap.get(streamId);
861                 if (stream == null) {
862                     if (streamId > lastStreamId.get()) {
863                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
864                     }
865                 } else {
866                     final ByteBuffer payload = frame.getPayload();
867                     if (payload == null || payload.remaining() != 4) {
868                         throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
869                     }
870                     final int errorCode = payload.getInt();
871                     stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
872                     streamMap.remove(streamId);
873                     stream.releaseResources();
874                     requestSessionOutput();
875                 }
876             }
877             break;
878             case PING: {
879                 if (streamId != 0) {
880                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
881                 }
882                 final ByteBuffer ping = frame.getPayloadContent();
883                 if (ping == null || ping.remaining() != 8) {
884                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
885                 }
886                 if (frame.isFlagSet(FrameFlag.ACK)) {
887                     final AsyncPingHandler pingHandler = pingHandlers.poll();
888                     if (pingHandler != null) {
889                         pingHandler.consumeResponse(ping);
890                     }
891                 } else {
892                     final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
893                     pong.put(ping);
894                     pong.flip();
895                     final RawFrame response = frameFactory.createPingAck(pong);
896                     commitFrame(response);
897                 }
898             }
899             break;
900             case SETTINGS: {
901                 if (streamId != 0) {
902                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
903                 }
904                 if (frame.isFlagSet(FrameFlag.ACK)) {
905                     if (localSettingState == SettingsHandshake.TRANSMITTED) {
906                         localSettingState = SettingsHandshake.ACKED;
907                         ioSession.setEvent(SelectionKey.OP_WRITE);
908                         applyLocalSettings();
909                     }
910                 } else {
911                     final ByteBuffer payload = frame.getPayload();
912                     if (payload != null) {
913                         if ((payload.remaining() % 6) != 0) {
914                             throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
915                         }
916                         consumeSettingsFrame(payload);
917                         remoteSettingState = SettingsHandshake.TRANSMITTED;
918                     }
919                     // Send ACK
920                     final RawFrame response = frameFactory.createSettingsAck();
921                     commitFrame(response);
922                     remoteSettingState = SettingsHandshake.ACKED;
923                 }
924             }
925             break;
926             case PRIORITY:
927                 // Stream priority not supported
928                 break;
929             case PUSH_PROMISE: {
930                 acceptPushFrame();
931 
932                 if (goAwayReceived ) {
933                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
934                 }
935 
936                 if (!localConfig.isPushEnabled()) {
937                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
938                 }
939 
940                 final H2Stream stream = getValidStream(streamId);
941                 if (stream.isRemoteClosed()) {
942                     stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
943                     break;
944                 }
945 
946                 final ByteBuffer payload = frame.getPayloadContent();
947                 if (payload == null || payload.remaining() < 4) {
948                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
949                 }
950                 final int promisedStreamId = payload.getInt();
951                 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
952                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
953                 }
954                 if (streamMap.get(promisedStreamId) != null) {
955                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
956                 }
957 
958                 updateLastStreamId(promisedStreamId);
959 
960                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
961                         promisedStreamId, false, initInputWinSize, initOutputWinSize);
962                 final H2StreamHandler streamHandler;
963                 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
964                     streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics,
965                             stream.getPushHandlerFactory());
966                 } else {
967                     streamHandler = NoopH2StreamHandler.INSTANCE;
968                     channel.setLocalEndStream();
969                 }
970 
971                 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
972                 streamMap.put(promisedStreamId, promisedStream);
973 
974                 try {
975                     consumePushPromiseFrame(frame, payload, promisedStream);
976                 } catch (final H2StreamResetException ex) {
977                     promisedStream.localReset(ex);
978                 } catch (final HttpStreamResetException ex) {
979                     promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
980                 }
981             }
982             break;
983             case GOAWAY: {
984                 if (streamId != 0) {
985                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
986                 }
987                 final ByteBuffer payload = frame.getPayload();
988                 if (payload == null || payload.remaining() < 8) {
989                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
990                 }
991                 final int processedLocalStreamId = payload.getInt();
992                 final int errorCode = payload.getInt();
993                 goAwayReceived = true;
994                 if (errorCode == H2Error.NO_ERROR.getCode()) {
995                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
996                         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
997                             final Map.Entry<Integer, H2Stream> entry = it.next();
998                             final int activeStreamId = entry.getKey();
999                             if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
1000                                 final H2Stream stream = entry.getValue();
1001                                 stream.cancel();
1002                                 it.remove();
1003                             }
1004                         }
1005                     }
1006                     connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
1007                 } else {
1008                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1009                         final Map.Entry<Integer, H2Stream> entry = it.next();
1010                         final H2Stream stream = entry.getValue();
1011                         stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
1012                     }
1013                     streamMap.clear();
1014                     connState = ConnectionHandshake.SHUTDOWN;
1015                 }
1016             }
1017             ioSession.setEvent(SelectionKey.OP_WRITE);
1018             break;
1019         }
1020     }
1021 
1022     private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1023         final int streamId = stream.getId();
1024         final ByteBuffer payload = frame.getPayloadContent();
1025         if (payload != null) {
1026             final int frameLength = frame.getLength();
1027             final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1028             if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1029                 stream.produceInputCapacityUpdate();
1030             }
1031             final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1032             if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1033                 maximizeConnWindow(connWinSize);
1034             }
1035         }
1036         if (stream.isRemoteClosed()) {
1037             throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1038         }
1039         if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1040             stream.setRemoteEndStream();
1041         }
1042         if (stream.isLocalReset()) {
1043             return;
1044         }
1045         stream.consumeData(payload);
1046     }
1047 
1048     private void maximizeConnWindow(final int connWinSize) throws IOException {
1049         final int delta = Integer.MAX_VALUE - connWinSize;
1050         if (delta > 0) {
1051             final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1052             commitFrame(windowUpdateFrame);
1053             updateInputWindow(0, connInputWindow, delta);
1054         }
1055     }
1056 
1057     private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1058         final int promisedStreamId = promisedStream.getId();
1059         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1060             continuation = new Continuation(promisedStreamId, frame.getType(), true);
1061         }
1062         if (continuation == null) {
1063             final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1064             if (promisedStreamId > processedRemoteStreamId) {
1065                 processedRemoteStreamId = promisedStreamId;
1066             }
1067             if (streamListener != null) {
1068                 streamListener.onHeaderInput(this, promisedStreamId, headers);
1069             }
1070             promisedStream.consumePromise(headers);
1071         } else {
1072             continuation.copyPayload(payload);
1073         }
1074     }
1075 
1076     List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1077         return hPackDecoder.decodeHeaders(payload);
1078     }
1079 
1080     private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1081         final int streamId = stream.getId();
1082         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1083             continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1084         }
1085         final ByteBuffer payload = frame.getPayloadContent();
1086         if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1087             // Priority not supported
1088             payload.getInt();
1089             payload.get();
1090         }
1091         if (continuation == null) {
1092             final List<Header> headers = decodeHeaders(payload);
1093             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1094                 processedRemoteStreamId = streamId;
1095             }
1096             if (streamListener != null) {
1097                 streamListener.onHeaderInput(this, streamId, headers);
1098             }
1099             if (stream.isRemoteClosed()) {
1100                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1101             }
1102             if (stream.isLocalReset()) {
1103                 return;
1104             }
1105             if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1106                 stream.setRemoteEndStream();
1107             }
1108             stream.consumeHeader(headers);
1109         } else {
1110             continuation.copyPayload(payload);
1111         }
1112     }
1113 
1114     private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1115         final int streamId = frame.getStreamId();
1116         final ByteBuffer payload = frame.getPayload();
1117         continuation.copyPayload(payload);
1118         if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1119             final List<Header> headers = decodeHeaders(continuation.getContent());
1120             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1121                 processedRemoteStreamId = streamId;
1122             }
1123             if (streamListener != null) {
1124                 streamListener.onHeaderInput(this, streamId, headers);
1125             }
1126             if (stream.isRemoteClosed()) {
1127                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1128             }
1129             if (stream.isLocalReset()) {
1130                 return;
1131             }
1132             if (continuation.endStream) {
1133                 stream.setRemoteEndStream();
1134             }
1135             if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1136                 stream.consumePromise(headers);
1137             } else {
1138                 stream.consumeHeader(headers);
1139             }
1140             continuation = null;
1141         }
1142     }
1143 
1144     private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1145         final H2Config.Builder configBuilder = H2Config.initial();
1146         while (payload.hasRemaining()) {
1147             final int code = payload.getShort();
1148             final int value = payload.getInt();
1149             final H2Param param = H2Param.valueOf(code);
1150             if (param != null) {
1151                 switch (param) {
1152                     case HEADER_TABLE_SIZE:
1153                         try {
1154                             configBuilder.setHeaderTableSize(value);
1155                         } catch (final IllegalArgumentException ex) {
1156                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1157                         }
1158                         break;
1159                     case MAX_CONCURRENT_STREAMS:
1160                         try {
1161                             configBuilder.setMaxConcurrentStreams(value);
1162                         } catch (final IllegalArgumentException ex) {
1163                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1164                         }
1165                         break;
1166                     case ENABLE_PUSH:
1167                         configBuilder.setPushEnabled(value == 1);
1168                         break;
1169                     case INITIAL_WINDOW_SIZE:
1170                         try {
1171                             configBuilder.setInitialWindowSize(value);
1172                         } catch (final IllegalArgumentException ex) {
1173                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1174                         }
1175                         break;
1176                     case MAX_FRAME_SIZE:
1177                         try {
1178                             configBuilder.setMaxFrameSize(value);
1179                         } catch (final IllegalArgumentException ex) {
1180                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1181                         }
1182                         break;
1183                     case MAX_HEADER_LIST_SIZE:
1184                         try {
1185                             configBuilder.setMaxHeaderListSize(value);
1186                         } catch (final IllegalArgumentException ex) {
1187                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1188                         }
1189                         break;
1190                 }
1191             }
1192         }
1193         applyRemoteSettings(configBuilder.build());
1194     }
1195 
1196     private void produceOutput() throws HttpException, IOException {
1197         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1198             final Map.Entry<Integer, H2Stream> entry = it.next();
1199             final H2Stream stream = entry.getValue();
1200             if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1201                 stream.produceOutput();
1202             }
1203             if (stream.isTerminated()) {
1204                 it.remove();
1205                 stream.releaseResources();
1206                 requestSessionOutput();
1207             }
1208             if (!outputQueue.isEmpty()) {
1209                 break;
1210             }
1211         }
1212     }
1213 
1214     private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1215         remoteConfig = config;
1216 
1217         hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1218         final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1219         initOutputWinSize = remoteConfig.getInitialWindowSize();
1220 
1221         final int maxFrameSize = remoteConfig.getMaxFrameSize();
1222         if (maxFrameSize > localConfig.getMaxFrameSize()) {
1223             outputBuffer.expand(maxFrameSize);
1224         }
1225 
1226         if (delta != 0) {
1227             if (!streamMap.isEmpty()) {
1228                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1229                     final Map.Entry<Integer, H2Stream> entry = it.next();
1230                     final H2Stream stream = entry.getValue();
1231                     try {
1232                         updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1233                     } catch (final ArithmeticException ex) {
1234                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1235                     }
1236                 }
1237             }
1238         }
1239     }
1240 
1241     private void applyLocalSettings() throws H2ConnectionException {
1242         hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1243         hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1244 
1245         final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1246         initInputWinSize = localConfig.getInitialWindowSize();
1247 
1248         if (delta != 0 && !streamMap.isEmpty()) {
1249             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1250                 final Map.Entry<Integer, H2Stream> entry = it.next();
1251                 final H2Stream stream = entry.getValue();
1252                 try {
1253                     updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1254                 } catch (final ArithmeticException ex) {
1255                     throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1256                 }
1257             }
1258         }
1259         lowMark = initInputWinSize / 2;
1260     }
1261 
1262     @Override
1263     public void close() throws IOException {
1264         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1265     }
1266 
1267     @Override
1268     public void close(final CloseMode closeMode) {
1269         ioSession.close(closeMode);
1270     }
1271 
1272     @Override
1273     public boolean isOpen() {
1274         return connState == ConnectionHandshake.ACTIVE;
1275     }
1276 
1277     @Override
1278     public void setSocketTimeout(final Timeout timeout) {
1279         ioSession.setSocketTimeout(timeout);
1280     }
1281 
1282     @Override
1283     public SSLSession getSSLSession() {
1284         final TlsDetails tlsDetails = ioSession.getTlsDetails();
1285         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1286     }
1287 
1288     @Override
1289     public EndpointDetails getEndpointDetails() {
1290         if (endpointDetails == null) {
1291             endpointDetails = new BasicEndpointDetails(
1292                     ioSession.getRemoteAddress(),
1293                     ioSession.getLocalAddress(),
1294                     connMetrics,
1295                     ioSession.getSocketTimeout());
1296         }
1297         return endpointDetails;
1298     }
1299 
1300     @Override
1301     public Timeout getSocketTimeout() {
1302         return ioSession.getSocketTimeout();
1303     }
1304 
1305     @Override
1306     public ProtocolVersion getProtocolVersion() {
1307         return HttpVersion.HTTP_2;
1308     }
1309 
1310     @Override
1311     public SocketAddress getRemoteAddress() {
1312         return ioSession.getRemoteAddress();
1313     }
1314 
1315     @Override
1316     public SocketAddress getLocalAddress() {
1317         return ioSession.getLocalAddress();
1318     }
1319 
1320     void appendState(final StringBuilder buf) {
1321         buf.append("connState=").append(connState)
1322                 .append(", connInputWindow=").append(connInputWindow)
1323                 .append(", connOutputWindow=").append(connOutputWindow)
1324                 .append(", outputQueue=").append(outputQueue.size())
1325                 .append(", streamMap=").append(streamMap.size())
1326                 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1327     }
1328 
1329     private static class Continuation {
1330 
1331         final int streamId;
1332         final int type;
1333         final boolean endStream;
1334         final ByteArrayBuffer headerBuffer;
1335 
1336         private Continuation(final int streamId, final int type, final boolean endStream) {
1337             this.streamId = streamId;
1338             this.type = type;
1339             this.endStream = endStream;
1340             this.headerBuffer = new ByteArrayBuffer(1024);
1341         }
1342 
1343         void copyPayload(final ByteBuffer payload) {
1344             if (payload == null) {
1345                 return;
1346             }
1347             headerBuffer.ensureCapacity(payload.remaining());
1348             payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1349         }
1350 
1351         ByteBuffer getContent() {
1352             return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1353         }
1354 
1355     }
1356 
1357     private class H2StreamChannelImpl implements H2StreamChannel {
1358 
1359         private final int id;
1360         private final AtomicInteger inputWindow;
1361         private final AtomicInteger outputWindow;
1362 
1363         private volatile boolean idle;
1364         private volatile boolean remoteEndStream;
1365         private volatile boolean localEndStream;
1366 
1367         private volatile long deadline;
1368 
1369         H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1370             this.id = id;
1371             this.idle = idle;
1372             this.inputWindow = new AtomicInteger(initialInputWindowSize);
1373             this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1374         }
1375 
1376         int getId() {
1377             return id;
1378         }
1379 
1380         AtomicInteger getOutputWindow() {
1381             return outputWindow;
1382         }
1383 
1384         AtomicInteger getInputWindow() {
1385             return inputWindow;
1386         }
1387 
1388         @Override
1389         public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1390             ioSession.getLock().lock();
1391             try {
1392                 if (headers == null || headers.isEmpty()) {
1393                     throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1394                 }
1395                 if (localEndStream) {
1396                     return;
1397                 }
1398                 idle = false;
1399                 commitHeaders(id, headers, endStream);
1400                 if (endStream) {
1401                     localEndStream = true;
1402                 }
1403             } finally {
1404                 ioSession.getLock().unlock();
1405             }
1406         }
1407 
1408         @Override
1409         public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1410             acceptPushRequest();
1411             final int promisedStreamId = generateStreamId();
1412             final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1413                     promisedStreamId,
1414                     true,
1415                     localConfig.getInitialWindowSize(),
1416                     remoteConfig.getInitialWindowSize());
1417             final HttpCoreContext context = HttpCoreContext.create();
1418             context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1419             context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1420             final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1421                     channel, httpProcessor, connMetrics, pushProducer, context);
1422             final H2Stream stream = new H2Stream(channel, streamHandler, false);
1423             streamMap.put(promisedStreamId, stream);
1424 
1425             ioSession.getLock().lock();
1426             try {
1427                 if (localEndStream) {
1428                     stream.releaseResources();
1429                     return;
1430                 }
1431                 commitPushPromise(id, promisedStreamId, headers);
1432                 idle = false;
1433             } finally {
1434                 ioSession.getLock().unlock();
1435             }
1436         }
1437 
1438         @Override
1439         public void update(final int increment) throws IOException {
1440             if (remoteEndStream) {
1441                 return;
1442             }
1443             incrementInputCapacity(0, connInputWindow, increment);
1444             incrementInputCapacity(id, inputWindow, increment);
1445         }
1446 
1447         @Override
1448         public int write(final ByteBuffer payload) throws IOException {
1449             ioSession.getLock().lock();
1450             try {
1451                 if (localEndStream) {
1452                     return 0;
1453                 }
1454                 return streamData(id, outputWindow, payload);
1455             } finally {
1456                 ioSession.getLock().unlock();
1457             }
1458         }
1459 
1460         @Override
1461         public void endStream(final List<? extends Header> trailers) throws IOException {
1462             ioSession.getLock().lock();
1463             try {
1464                 if (localEndStream) {
1465                     return;
1466                 }
1467                 localEndStream = true;
1468                 if (trailers != null && !trailers.isEmpty()) {
1469                     commitHeaders(id, trailers, true);
1470                 } else {
1471                     final RawFrame frame = frameFactory.createData(id, null, true);
1472                     commitFrameInternal(frame);
1473                 }
1474             } finally {
1475                 ioSession.getLock().unlock();
1476             }
1477         }
1478 
1479         @Override
1480         public void endStream() throws IOException {
1481             endStream(null);
1482         }
1483 
1484         @Override
1485         public void requestOutput() {
1486             requestSessionOutput();
1487         }
1488 
1489         boolean isRemoteClosed() {
1490             return remoteEndStream;
1491         }
1492 
1493         void setRemoteEndStream() {
1494             remoteEndStream = true;
1495         }
1496 
1497         boolean isLocalClosed() {
1498             return localEndStream;
1499         }
1500 
1501         void setLocalEndStream() {
1502             localEndStream = true;
1503         }
1504 
1505         boolean isLocalReset() {
1506             return deadline > 0;
1507         }
1508 
1509         boolean isResetDeadline() {
1510             final long l = deadline;
1511             return l > 0 && l < System.currentTimeMillis();
1512         }
1513 
1514         boolean localReset(final int code) throws IOException {
1515             ioSession.getLock().lock();
1516             try {
1517                 if (localEndStream) {
1518                     return false;
1519                 }
1520                 localEndStream = true;
1521                 deadline = System.currentTimeMillis() + LINGER_TIME;
1522                 if (!idle) {
1523                     final RawFrame resetStream = frameFactory.createResetStream(id, code);
1524                     commitFrameInternal(resetStream);
1525                     return true;
1526                 }
1527                 return false;
1528             } finally {
1529                 ioSession.getLock().unlock();
1530             }
1531         }
1532 
1533         boolean localReset(final H2Error error) throws IOException {
1534             return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1535         }
1536 
1537         @Override
1538         public boolean cancel() {
1539             try {
1540                 return localReset(H2Error.CANCEL);
1541             } catch (final IOException ignore) {
1542                 return false;
1543             }
1544         }
1545 
1546         void appendState(final StringBuilder buf) {
1547             buf.append("id=").append(id)
1548                     .append(", connState=").append(connState)
1549                     .append(", inputWindow=").append(inputWindow)
1550                     .append(", outputWindow=").append(outputWindow)
1551                     .append(", localEndStream=").append(localEndStream)
1552                     .append(", idle=").append(idle);
1553         }
1554 
1555         @Override
1556         public String toString() {
1557             final StringBuilder buf = new StringBuilder();
1558             buf.append("[");
1559             appendState(buf);
1560             buf.append("]");
1561             return buf.toString();
1562         }
1563 
1564     }
1565 
1566     static class H2Stream {
1567 
1568         private final H2StreamChannelImpl channel;
1569         private final H2StreamHandler handler;
1570         private final boolean remoteInitiated;
1571 
1572         private H2Stream(
1573                 final H2StreamChannelImpl channel,
1574                 final H2StreamHandler handler,
1575                 final boolean remoteInitiated) {
1576             this.channel = channel;
1577             this.handler = handler;
1578             this.remoteInitiated = remoteInitiated;
1579         }
1580 
1581         int getId() {
1582             return channel.getId();
1583         }
1584 
1585         boolean isRemoteInitiated() {
1586             return remoteInitiated;
1587         }
1588 
1589         AtomicInteger getOutputWindow() {
1590             return channel.getOutputWindow();
1591         }
1592 
1593         AtomicInteger getInputWindow() {
1594             return channel.getInputWindow();
1595         }
1596 
1597         boolean isTerminated() {
1598             return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1599         }
1600 
1601         boolean isRemoteClosed() {
1602             return channel.isRemoteClosed();
1603         }
1604 
1605         boolean isLocalClosed() {
1606             return channel.isLocalClosed();
1607         }
1608 
1609         boolean isLocalReset() {
1610             return channel.isLocalReset();
1611         }
1612 
1613         void setRemoteEndStream() {
1614             channel.setRemoteEndStream();
1615         }
1616 
1617         void consumePromise(final List<Header> headers) throws HttpException, IOException {
1618             try {
1619                 handler.consumePromise(headers);
1620                 channel.setLocalEndStream();
1621             } catch (final ProtocolException ex) {
1622                 localReset(ex, H2Error.PROTOCOL_ERROR);
1623             }
1624         }
1625 
1626         void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1627             try {
1628                 handler.consumeHeader(headers, channel.isRemoteClosed());
1629             } catch (final ProtocolException ex) {
1630                 localReset(ex, H2Error.PROTOCOL_ERROR);
1631             }
1632         }
1633 
1634         void consumeData(final ByteBuffer src) throws HttpException, IOException {
1635             try {
1636                 handler.consumeData(src, channel.isRemoteClosed());
1637             } catch (final CharacterCodingException ex) {
1638                 localReset(ex, H2Error.INTERNAL_ERROR);
1639             } catch (final ProtocolException ex) {
1640                 localReset(ex, H2Error.PROTOCOL_ERROR);
1641             }
1642         }
1643 
1644         boolean isOutputReady() {
1645             return handler.isOutputReady();
1646         }
1647 
1648         void produceOutput() throws HttpException, IOException {
1649             try {
1650                 handler.produceOutput();
1651             } catch (final ProtocolException ex) {
1652                 localReset(ex, H2Error.PROTOCOL_ERROR);
1653             }
1654         }
1655 
1656         void produceInputCapacityUpdate() throws IOException {
1657             handler.updateInputCapacity();
1658         }
1659 
1660         void reset(final Exception cause) {
1661             channel.setRemoteEndStream();
1662             channel.setLocalEndStream();
1663             handler.failed(cause);
1664         }
1665 
1666         void localReset(final Exception cause, final int code) throws IOException {
1667             channel.localReset(code);
1668             handler.failed(cause);
1669         }
1670 
1671         void localReset(final Exception cause, final H2Error error) throws IOException {
1672             localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1673         }
1674 
1675         void localReset(final H2StreamResetException ex) throws IOException {
1676             localReset(ex, ex.getCode());
1677         }
1678 
1679         void handle(final HttpException ex) throws IOException, HttpException {
1680             handler.handle(ex, channel.isRemoteClosed());
1681         }
1682 
1683         HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1684             return handler.getPushHandlerFactory();
1685         }
1686 
1687         void cancel() {
1688             reset(new RequestNotExecutedException());
1689         }
1690 
1691         boolean abort() {
1692             final boolean cancelled = channel.cancel();
1693             handler.failed(new RequestNotExecutedException());
1694             return cancelled;
1695         }
1696 
1697         void releaseResources() {
1698             handler.releaseResources();
1699         }
1700 
1701         void appendState(final StringBuilder buf) {
1702             buf.append("channel=[");
1703             channel.appendState(buf);
1704             buf.append("]");
1705         }
1706 
1707         @Override
1708         public String toString() {
1709             final StringBuilder buf = new StringBuilder();
1710             buf.append("[");
1711             appendState(buf);
1712             buf.append("]");
1713             return buf.toString();
1714         }
1715 
1716     }
1717 
1718 }