View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.SelectionKey;
33  import java.nio.charset.CharacterCodingException;
34  import java.util.Deque;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Queue;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.ConcurrentLinkedDeque;
41  import java.util.concurrent.ConcurrentLinkedQueue;
42  import java.util.concurrent.atomic.AtomicInteger;
43  
44  import javax.net.ssl.SSLSession;
45  
46  import org.apache.hc.core5.concurrent.Cancellable;
47  import org.apache.hc.core5.concurrent.CancellableDependency;
48  import org.apache.hc.core5.http.ConnectionClosedException;
49  import org.apache.hc.core5.http.EndpointDetails;
50  import org.apache.hc.core5.http.Header;
51  import org.apache.hc.core5.http.HttpConnection;
52  import org.apache.hc.core5.http.HttpException;
53  import org.apache.hc.core5.http.HttpStreamResetException;
54  import org.apache.hc.core5.http.HttpVersion;
55  import org.apache.hc.core5.http.ProtocolException;
56  import org.apache.hc.core5.http.ProtocolVersion;
57  import org.apache.hc.core5.http.RequestNotExecutedException;
58  import org.apache.hc.core5.http.config.CharCodingConfig;
59  import org.apache.hc.core5.http.impl.BasicEndpointDetails;
60  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
61  import org.apache.hc.core5.http.impl.CharCodingSupport;
62  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
63  import org.apache.hc.core5.http.nio.AsyncPushProducer;
64  import org.apache.hc.core5.http.nio.HandlerFactory;
65  import org.apache.hc.core5.http.nio.command.ExecutableCommand;
66  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
67  import org.apache.hc.core5.http.protocol.HttpCoreContext;
68  import org.apache.hc.core5.http.protocol.HttpProcessor;
69  import org.apache.hc.core5.http2.H2ConnectionException;
70  import org.apache.hc.core5.http2.H2Error;
71  import org.apache.hc.core5.http2.H2StreamResetException;
72  import org.apache.hc.core5.http2.config.H2Config;
73  import org.apache.hc.core5.http2.config.H2Param;
74  import org.apache.hc.core5.http2.config.H2Setting;
75  import org.apache.hc.core5.http2.frame.FrameFactory;
76  import org.apache.hc.core5.http2.frame.FrameFlag;
77  import org.apache.hc.core5.http2.frame.FrameType;
78  import org.apache.hc.core5.http2.frame.RawFrame;
79  import org.apache.hc.core5.http2.frame.StreamIdGenerator;
80  import org.apache.hc.core5.http2.hpack.HPackDecoder;
81  import org.apache.hc.core5.http2.hpack.HPackEncoder;
82  import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
83  import org.apache.hc.core5.http2.nio.AsyncPingHandler;
84  import org.apache.hc.core5.http2.nio.command.PingCommand;
85  import org.apache.hc.core5.io.CloseMode;
86  import org.apache.hc.core5.reactor.Command;
87  import org.apache.hc.core5.reactor.ProtocolIOSession;
88  import org.apache.hc.core5.reactor.ssl.TlsDetails;
89  import org.apache.hc.core5.util.Args;
90  import org.apache.hc.core5.util.ByteArrayBuffer;
91  import org.apache.hc.core5.util.Identifiable;
92  import org.apache.hc.core5.util.Timeout;
93  
94  abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
95  
96      private static final long LINGER_TIME = 1000; // 1 second
97      private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; // 10 MiB
98  
99      enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
100     enum SettingsHandshake { READY, TRANSMITTED, ACKED }
101 
102     private final ProtocolIOSession ioSession;
103     private final FrameFactory frameFactory;
104     private final StreamIdGenerator idGenerator;
105     private final HttpProcessor httpProcessor;
106     private final H2Config localConfig;
107     private final BasicH2TransportMetrics inputMetrics;
108     private final BasicH2TransportMetrics outputMetrics;
109     private final BasicHttpConnectionMetrics connMetrics;
110     private final FrameInputBuffer inputBuffer;
111     private final FrameOutputBuffer outputBuffer;
112     private final Deque<RawFrame> outputQueue;
113     private final HPackEncoder hPackEncoder;
114     private final HPackDecoder hPackDecoder;
115     private final Map<Integer, H2Stream> streamMap;
116     private final Queue<AsyncPingHandler> pingHandlers;
117     private final AtomicInteger connInputWindow;
118     private final AtomicInteger connOutputWindow;
119     private final AtomicInteger outputRequests;
120     private final AtomicInteger lastStreamId;
121     private final H2StreamListener streamListener;
122 
123     private ConnectionHandshake connState = ConnectionHandshake.READY;
124     private SettingsHandshake localSettingState = SettingsHandshake.READY;
125     private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
126 
127     private int initInputWinSize;
128     private int initOutputWinSize;
129     private int lowMark;
130 
131     private volatile H2Config remoteConfig;
132 
133     private Continuation continuation;
134 
135     private int processedRemoteStreamId;
136     private EndpointDetails endpointDetails;
137     private boolean goAwayReceived;
138 
139     AbstractH2StreamMultiplexer(
140             final ProtocolIOSession ioSession,
141             final FrameFactory frameFactory,
142             final StreamIdGenerator idGenerator,
143             final HttpProcessor httpProcessor,
144             final CharCodingConfig charCodingConfig,
145             final H2Config h2Config,
146             final H2StreamListener streamListener) {
147         this.ioSession = Args.notNull(ioSession, "IO session");
148         this.frameFactory = Args.notNull(frameFactory, "Frame factory");
149         this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
150         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
151         this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
152         this.inputMetrics = new BasicH2TransportMetrics();
153         this.outputMetrics = new BasicH2TransportMetrics();
154         this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
155         this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
156         this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
157         this.outputQueue = new ConcurrentLinkedDeque<>();
158         this.pingHandlers = new ConcurrentLinkedQueue<>();
159         this.outputRequests = new AtomicInteger(0);
160         this.lastStreamId = new AtomicInteger(0);
161         this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
162         this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
163         this.streamMap = new ConcurrentHashMap<>();
164         this.remoteConfig = H2Config.INIT;
165         this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
166         this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
167 
168         this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
169         this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
170 
171         this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
172         this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
173         this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
174 
175         this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
176         this.streamListener = streamListener;
177     }
178 
179     @Override
180     public String getId() {
181         return ioSession.getId();
182     }
183 
184     abstract void acceptHeaderFrame() throws H2ConnectionException;
185 
186     abstract void acceptPushRequest() throws H2ConnectionException;
187 
188     abstract void acceptPushFrame() throws H2ConnectionException;
189 
190     abstract H2StreamHandler createRemotelyInitiatedStream(
191             H2StreamChannel channel,
192             HttpProcessor httpProcessor,
193             BasicHttpConnectionMetrics connMetrics,
194             HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
195 
196     abstract H2StreamHandler createLocallyInitiatedStream(
197             ExecutableCommand command,
198             H2StreamChannel channel,
199             HttpProcessor httpProcessor,
200             BasicHttpConnectionMetrics connMetrics) throws IOException;
201 
202     private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
203         for (;;) {
204             final int current = window.get();
205             long newValue = (long) current + delta;
206 
207             //TODO: work-around for what looks like a bug in Ngnix (1.11)
208             // Tolerate if the update window exceeded by one
209             if (newValue == 0x80000000L) {
210                 newValue = Integer.MAX_VALUE;
211             }
212             //TODO: needs to be removed
213 
214             if (Math.abs(newValue) > 0x7fffffffL) {
215                 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
216             }
217             if (window.compareAndSet(current, (int) newValue)) {
218                 return (int) newValue;
219             }
220         }
221     }
222 
223     private int updateInputWindow(
224             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
225         final int newSize = updateWindow(window, delta);
226         if (streamListener != null) {
227             streamListener.onInputFlowControl(this, streamId, delta, newSize);
228         }
229         return newSize;
230     }
231 
232     private int updateOutputWindow(
233             final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
234         final int newSize = updateWindow(window, delta);
235         if (streamListener != null) {
236             streamListener.onOutputFlowControl(this, streamId, delta, newSize);
237         }
238         return newSize;
239     }
240 
241     private void commitFrameInternal(final RawFrame frame) throws IOException {
242         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
243             if (streamListener != null) {
244                 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
245             }
246             outputBuffer.write(frame, ioSession);
247         } else {
248             outputQueue.addLast(frame);
249         }
250         ioSession.setEvent(SelectionKey.OP_WRITE);
251     }
252 
253     private void commitFrame(final RawFrame frame) throws IOException {
254         Args.notNull(frame, "Frame");
255         ioSession.getLock().lock();
256         try {
257             commitFrameInternal(frame);
258         } finally {
259             ioSession.getLock().unlock();
260         }
261     }
262 
263     private void commitHeaders(
264             final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
265         if (streamListener != null) {
266             streamListener.onHeaderOutput(this, streamId, headers);
267         }
268         final ByteArrayBufferByteArrayBuffer.html#ByteArrayBuffer">ByteArrayBuffer buf = new ByteArrayBuffer(512);
269         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
270 
271         int off = 0;
272         int remaining = buf.length();
273         boolean continuation = false;
274 
275         while (remaining > 0) {
276             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
277             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
278 
279             remaining -= chunk;
280             off += chunk;
281 
282             final boolean endHeaders = remaining == 0;
283             final RawFrame frame;
284             if (!continuation) {
285                 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
286                 continuation = true;
287             } else {
288                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
289             }
290             commitFrameInternal(frame);
291         }
292     }
293 
294     private void commitPushPromise(
295             final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
296         if (headers == null || headers.isEmpty()) {
297             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
298         }
299         if (streamListener != null) {
300             streamListener.onHeaderOutput(this, streamId, headers);
301         }
302         final ByteArrayBufferByteArrayBuffer.html#ByteArrayBuffer">ByteArrayBuffer buf = new ByteArrayBuffer(512);
303         buf.append((byte)(promisedStreamId >> 24));
304         buf.append((byte)(promisedStreamId >> 16));
305         buf.append((byte)(promisedStreamId >> 8));
306         buf.append((byte)(promisedStreamId));
307 
308         hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
309 
310         int off = 0;
311         int remaining = buf.length();
312         boolean continuation = false;
313 
314         while (remaining > 0) {
315             final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
316             final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
317 
318             remaining -= chunk;
319             off += chunk;
320 
321             final boolean endHeaders = remaining == 0;
322             final RawFrame frame;
323             if (!continuation) {
324                 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
325                 continuation = true;
326             } else {
327                 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
328             }
329             commitFrameInternal(frame);
330         }
331     }
332 
333     private void streamDataFrame(
334             final int streamId,
335             final AtomicInteger streamOutputWindow,
336             final ByteBuffer payload,
337             final int chunk) throws IOException {
338         final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
339         if (streamListener != null) {
340             streamListener.onFrameOutput(this, streamId, dataFrame);
341         }
342         updateOutputWindow(0, connOutputWindow, -chunk);
343         updateOutputWindow(streamId, streamOutputWindow, -chunk);
344         outputBuffer.write(dataFrame, ioSession);
345     }
346 
347     private int streamData(
348             final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
349         if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
350             final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
351             if (capacity <= 0) {
352                 return 0;
353             }
354             final int maxPayloadSize = Math.min(capacity, remoteConfig.getMaxFrameSize());
355             final int chunk;
356             if (payload.remaining() <= maxPayloadSize) {
357                 chunk = payload.remaining();
358                 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
359             } else {
360                 chunk = maxPayloadSize;
361                 final int originalLimit = payload.limit();
362                 try {
363                     payload.limit(payload.position() + chunk);
364                     streamDataFrame(streamId, streamOutputWindow, payload, chunk);
365                 } finally {
366                     payload.limit(originalLimit);
367                 }
368             }
369             payload.position(payload.position() + chunk);
370             ioSession.setEvent(SelectionKey.OP_WRITE);
371             return chunk;
372         }
373         return 0;
374     }
375 
376     private void incrementInputCapacity(
377             final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
378         if (inputCapacity > 0) {
379             final int streamWinSize = inputWindow.get();
380             final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
381             final int chunk = Math.min(inputCapacity, remainingCapacity);
382             if (chunk != 0) {
383                 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
384                 commitFrame(windowUpdateFrame);
385                 updateInputWindow(streamId, inputWindow, chunk);
386             }
387         }
388     }
389 
390     private void requestSessionOutput() {
391         outputRequests.incrementAndGet();
392         ioSession.setEvent(SelectionKey.OP_WRITE);
393     }
394 
395     private void updateLastStreamId(final int streamId) {
396         final int currentId = lastStreamId.get();
397         if (streamId > currentId) {
398             lastStreamId.compareAndSet(currentId, streamId);
399         }
400     }
401 
402     private int generateStreamId() {
403         for (;;) {
404             final int currentId = lastStreamId.get();
405             final int newStreamId = idGenerator.generate(currentId);
406             if (lastStreamId.compareAndSet(currentId, newStreamId)) {
407                 return newStreamId;
408             }
409         }
410     }
411 
412     public final void onConnect() throws HttpException, IOException {
413         connState = ConnectionHandshake.ACTIVE;
414         final RawFrame settingsFrame = frameFactory.createSettings(
415                 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
416                 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
417                 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
418                 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
419                 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
420                 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
421 
422         commitFrame(settingsFrame);
423         localSettingState = SettingsHandshake.TRANSMITTED;
424         maximizeConnWindow(connInputWindow.get());
425 
426         if (streamListener != null) {
427             final int initInputWindow = connInputWindow.get();
428             streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
429             final int initOutputWindow = connOutputWindow.get();
430             streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
431         }
432     }
433 
434     public final void onInput(final ByteBuffer src) throws HttpException, IOException {
435         if (connState == ConnectionHandshake.SHUTDOWN) {
436             ioSession.clearEvent(SelectionKey.OP_READ);
437         } else {
438             for (;;) {
439                 final RawFrame frame = inputBuffer.read(src, ioSession);
440                 if (frame == null) {
441                     break;
442                 }
443                 if (streamListener != null) {
444                     streamListener.onFrameInput(this, frame.getStreamId(), frame);
445                 }
446                 consumeFrame(frame);
447             }
448         }
449     }
450 
451     public final void onOutput() throws HttpException, IOException {
452         ioSession.getLock().lock();
453         try {
454             if (!outputBuffer.isEmpty()) {
455                 outputBuffer.flush(ioSession);
456             }
457             while (outputBuffer.isEmpty()) {
458                 final RawFrame frame = outputQueue.poll();
459                 if (frame != null) {
460                     if (streamListener != null) {
461                         streamListener.onFrameOutput(this, frame.getStreamId(), frame);
462                     }
463                     outputBuffer.write(frame, ioSession);
464                 } else {
465                     break;
466                 }
467             }
468         } finally {
469             ioSession.getLock().unlock();
470         }
471 
472         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
473 
474             if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
475                 produceOutput();
476             }
477             final int pendingOutputRequests = outputRequests.get();
478             boolean outputPending = false;
479             if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
480                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
481                     final Map.Entry<Integer, H2Stream> entry = it.next();
482                     final H2Stream stream = entry.getValue();
483                     if (!stream.isLocalClosed()
484                             && stream.getOutputWindow().get() > 0
485                             && stream.isOutputReady()) {
486                         outputPending = true;
487                         break;
488                     }
489                 }
490             }
491             ioSession.getLock().lock();
492             try {
493                 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
494                         && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
495                     ioSession.clearEvent(SelectionKey.OP_WRITE);
496                 } else {
497                     outputRequests.addAndGet(-pendingOutputRequests);
498                 }
499             } finally {
500                 ioSession.getLock().unlock();
501             }
502         }
503 
504         if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
505             processPendingCommands();
506         }
507         if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
508             int liveStreams = 0;
509             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
510                 final Map.Entry<Integer, H2Stream> entry = it.next();
511                 final H2Stream stream = entry.getValue();
512                 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
513                     stream.releaseResources();
514                     it.remove();
515                 } else {
516                     if (idGenerator.isSameSide(stream.getId()) || stream.getId() <= processedRemoteStreamId) {
517                         liveStreams++;
518                     }
519                 }
520             }
521             if (liveStreams == 0) {
522                 connState = ConnectionHandshake.SHUTDOWN;
523             }
524         }
525         if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
526             if (!streamMap.isEmpty()) {
527                 for (final H2Stream stream : streamMap.values()) {
528                     stream.releaseResources();
529                 }
530                 streamMap.clear();
531             }
532             ioSession.getLock().lock();
533             try {
534                 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
535                     ioSession.close();
536                 }
537             } finally {
538                 ioSession.getLock().unlock();
539             }
540         }
541     }
542 
543     public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
544         connState = ConnectionHandshake.SHUTDOWN;
545 
546         final RawFrame goAway;
547         if (localSettingState != SettingsHandshake.ACKED) {
548             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
549                             "Setting timeout (" + timeout + ")");
550         } else {
551             goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
552                             "Timeout due to inactivity (" + timeout + ")");
553         }
554         commitFrame(goAway);
555         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
556             final Map.Entry<Integer, H2Stream> entry = it.next();
557             final H2Stream stream = entry.getValue();
558             stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
559         }
560         streamMap.clear();
561     }
562 
563     public final void onDisconnect() {
564         for (;;) {
565             final AsyncPingHandler pingHandler = pingHandlers.poll();
566             if (pingHandler != null) {
567                 pingHandler.cancel();
568             } else {
569                 break;
570             }
571         }
572         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
573             final Map.Entry<Integer, H2Stream> entry = it.next();
574             final H2Stream stream = entry.getValue();
575             stream.cancel();
576         }
577         for (;;) {
578             final Command command = ioSession.poll();
579             if (command != null) {
580                 if (command instanceof ExecutableCommand) {
581                     ((ExecutableCommand) command).failed(new ConnectionClosedException());
582                 } else {
583                     command.cancel();
584                 }
585             } else {
586                 break;
587             }
588         }
589     }
590 
591     private void processPendingCommands() throws IOException, HttpException {
592         while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
593             final Command command = ioSession.poll();
594             if (command == null) {
595                 break;
596             }
597             if (command instanceof ShutdownCommand) {
598                 final ShutdownCommand../org/apache/hc/core5/http/nio/command/ShutdownCommand.html#ShutdownCommand">ShutdownCommand shutdownCommand = (ShutdownCommand) command;
599                 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
600                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
601                         final Map.Entry<Integer, H2Stream> entry = it.next();
602                         final H2Stream stream = entry.getValue();
603                         stream.cancel();
604                     }
605                     streamMap.clear();
606                     connState = ConnectionHandshake.SHUTDOWN;
607                 } else {
608                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
609                         final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
610                         commitFrame(goAway);
611                         connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
612                     }
613                 }
614                 break;
615             } else if (command instanceof PingCommand) {
616                 final PingCommand./../../../org/apache/hc/core5/http2/nio/command/PingCommand.html#PingCommand">PingCommand pingCommand = (PingCommand) command;
617                 final AsyncPingHandler handler = pingCommand.getHandler();
618                 pingHandlers.add(handler);
619                 final RawFrame ping = frameFactory.createPing(handler.getData());
620                 commitFrame(ping);
621             } else if (command instanceof ExecutableCommand) {
622                 final int streamId = generateStreamId();
623                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
624                         streamId, true, initInputWinSize, initOutputWinSize);
625                 final ExecutableCommandrg/apache/hc/core5/http/nio/command/ExecutableCommand.html#ExecutableCommand">ExecutableCommand executableCommand = (ExecutableCommand) command;
626                 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
627                         executableCommand, channel, httpProcessor, connMetrics);
628 
629                 final H2Stream stream = new H2Stream(channel, streamHandler, false);
630                 streamMap.put(streamId, stream);
631 
632                 if (streamListener != null) {
633                     final int initInputWindow = stream.getInputWindow().get();
634                     streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
635                     final int initOutputWindow = stream.getOutputWindow().get();
636                     streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
637                 }
638 
639                 if (stream.isOutputReady()) {
640                     stream.produceOutput();
641                 }
642                 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
643                 if (cancellableDependency != null) {
644                     cancellableDependency.setDependency(new Cancellable() {
645 
646                         @Override
647                         public boolean cancel() {
648                             return stream.abort();
649                         }
650 
651                     });
652                 }
653                 if (!outputQueue.isEmpty()) {
654                     return;
655                 }
656             }
657         }
658     }
659 
660     public final void onException(final Exception cause) {
661         try {
662             for (;;) {
663                 final AsyncPingHandler pingHandler = pingHandlers.poll();
664                 if (pingHandler != null) {
665                     pingHandler.failed(cause);
666                 } else {
667                     break;
668                 }
669             }
670             for (;;) {
671                 final Command command = ioSession.poll();
672                 if (command != null) {
673                     if (command instanceof ExecutableCommand) {
674                         ((ExecutableCommand) command).failed(new ConnectionClosedException());
675                     } else {
676                         command.cancel();
677                     }
678                 } else {
679                     break;
680                 }
681             }
682             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
683                 final Map.Entry<Integer, H2Stream> entry = it.next();
684                 final H2Stream stream = entry.getValue();
685                 stream.reset(cause);
686             }
687             streamMap.clear();
688             if (!(cause instanceof ConnectionClosedException)) {
689                 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
690                     final H2Error errorCode;
691                     if (cause instanceof H2ConnectionException) {
692                         errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
693                     } else if (cause instanceof ProtocolException){
694                         errorCode = H2Error.PROTOCOL_ERROR;
695                     } else {
696                         errorCode = H2Error.INTERNAL_ERROR;
697                     }
698                     final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
699                     commitFrame(goAway);
700                 }
701             }
702         } catch (final IOException ignore) {
703         } finally {
704             connState = ConnectionHandshake.SHUTDOWN;
705             final CloseMode closeMode;
706             if (cause instanceof ConnectionClosedException) {
707                 closeMode = CloseMode.GRACEFUL;
708             } else if (cause instanceof IOException) {
709                 closeMode = CloseMode.IMMEDIATE;
710             } else {
711                 closeMode = CloseMode.GRACEFUL;
712             }
713             ioSession.close(closeMode);
714         }
715     }
716 
717     private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
718         if (streamId == 0) {
719             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
720         }
721         final H2Stream stream = streamMap.get(streamId);
722         if (stream == null) {
723             if (streamId <= lastStreamId.get()) {
724                 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
725             } else {
726                 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
727             }
728         }
729         return stream;
730     }
731 
732     private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
733         final FrameType frameType = FrameType.valueOf(frame.getType());
734         final int streamId = frame.getStreamId();
735         if (continuation != null && frameType != FrameType.CONTINUATION) {
736             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
737         }
738         switch (frameType) {
739             case DATA: {
740                 final H2Stream stream = getValidStream(streamId);
741                 try {
742                     consumeDataFrame(frame, stream);
743                 } catch (final H2StreamResetException ex) {
744                     stream.localReset(ex);
745                 } catch (final HttpStreamResetException ex) {
746                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
747                 }
748 
749                 if (stream.isTerminated()) {
750                     streamMap.remove(streamId);
751                     stream.releaseResources();
752                     requestSessionOutput();
753                 }
754             }
755             break;
756             case HEADERS: {
757                 if (streamId == 0) {
758                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
759                 }
760                 H2Stream stream = streamMap.get(streamId);
761                 if (stream == null) {
762                     acceptHeaderFrame();
763 
764                     if (idGenerator.isSameSide(streamId)) {
765                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
766                     }
767                     if (goAwayReceived ) {
768                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
769                     }
770 
771                     updateLastStreamId(streamId);
772 
773                     final H2StreamChannelImpl channel = new H2StreamChannelImpl(
774                             streamId, false, initInputWinSize, initOutputWinSize);
775                     final H2StreamHandler streamHandler;
776                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
777                         streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null);
778                     } else {
779                         streamHandler = NoopH2StreamHandler.INSTANCE;
780                         channel.setLocalEndStream();
781                     }
782 
783                     stream = new H2Stream(channel, streamHandler, true);
784                     if (stream.isOutputReady()) {
785                         stream.produceOutput();
786                     }
787                     streamMap.put(streamId, stream);
788                 }
789 
790                 try {
791                     consumeHeaderFrame(frame, stream);
792 
793                     if (stream.isOutputReady()) {
794                         stream.produceOutput();
795                     }
796                 } catch (final H2StreamResetException ex) {
797                     stream.localReset(ex);
798                 } catch (final HttpStreamResetException ex) {
799                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
800                 } catch (final HttpException ex) {
801                     stream.handle(ex);
802                 }
803 
804                 if (stream.isTerminated()) {
805                     streamMap.remove(streamId);
806                     stream.releaseResources();
807                     requestSessionOutput();
808                 }
809             }
810             break;
811             case CONTINUATION: {
812                 if (continuation == null) {
813                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
814                 }
815                 if (streamId != continuation.streamId) {
816                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
817                 }
818 
819                 final H2Stream stream = getValidStream(streamId);
820                 try {
821 
822                     consumeContinuationFrame(frame, stream);
823                 } catch (final H2StreamResetException ex) {
824                     stream.localReset(ex);
825                 } catch (final HttpStreamResetException ex) {
826                     stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
827                 }
828 
829                 if (stream.isTerminated()) {
830                     streamMap.remove(streamId);
831                     stream.releaseResources();
832                     requestSessionOutput();
833                 }
834             }
835             break;
836             case WINDOW_UPDATE: {
837                 final ByteBuffer payload = frame.getPayload();
838                 if (payload == null || payload.remaining() != 4) {
839                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
840                 }
841                 final int delta = payload.getInt();
842                 if (delta <= 0) {
843                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
844                 }
845                 if (streamId == 0) {
846                     try {
847                         updateOutputWindow(0, connOutputWindow, delta);
848                     } catch (final ArithmeticException ex) {
849                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
850                     }
851                 } else {
852                     final H2Stream stream = streamMap.get(streamId);
853                     if (stream != null) {
854                         try {
855                             updateOutputWindow(streamId, stream.getOutputWindow(), delta);
856                         } catch (final ArithmeticException ex) {
857                             throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
858                         }
859                     }
860                 }
861                 ioSession.setEvent(SelectionKey.OP_WRITE);
862             }
863             break;
864             case RST_STREAM: {
865                 if (streamId == 0) {
866                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
867                 }
868                 final H2Stream stream = streamMap.get(streamId);
869                 if (stream == null) {
870                     if (streamId > lastStreamId.get()) {
871                         throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
872                     }
873                 } else {
874                     final ByteBuffer payload = frame.getPayload();
875                     if (payload == null || payload.remaining() != 4) {
876                         throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
877                     }
878                     final int errorCode = payload.getInt();
879                     stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
880                     streamMap.remove(streamId);
881                     stream.releaseResources();
882                     requestSessionOutput();
883                 }
884             }
885             break;
886             case PING: {
887                 if (streamId != 0) {
888                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
889                 }
890                 final ByteBuffer ping = frame.getPayloadContent();
891                 if (ping == null || ping.remaining() != 8) {
892                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
893                 }
894                 if (frame.isFlagSet(FrameFlag.ACK)) {
895                     final AsyncPingHandler pingHandler = pingHandlers.poll();
896                     if (pingHandler != null) {
897                         pingHandler.consumeResponse(ping);
898                     }
899                 } else {
900                     final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
901                     pong.put(ping);
902                     pong.flip();
903                     final RawFrame response = frameFactory.createPingAck(pong);
904                     commitFrame(response);
905                 }
906             }
907             break;
908             case SETTINGS: {
909                 if (streamId != 0) {
910                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
911                 }
912                 if (frame.isFlagSet(FrameFlag.ACK)) {
913                     if (localSettingState == SettingsHandshake.TRANSMITTED) {
914                         localSettingState = SettingsHandshake.ACKED;
915                         ioSession.setEvent(SelectionKey.OP_WRITE);
916                         applyLocalSettings();
917                     }
918                 } else {
919                     final ByteBuffer payload = frame.getPayload();
920                     if (payload != null) {
921                         if ((payload.remaining() % 6) != 0) {
922                             throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
923                         }
924                         consumeSettingsFrame(payload);
925                         remoteSettingState = SettingsHandshake.TRANSMITTED;
926                     }
927                     // Send ACK
928                     final RawFrame response = frameFactory.createSettingsAck();
929                     commitFrame(response);
930                     remoteSettingState = SettingsHandshake.ACKED;
931                 }
932             }
933             break;
934             case PRIORITY:
935                 // Stream priority not supported
936                 break;
937             case PUSH_PROMISE: {
938                 acceptPushFrame();
939 
940                 if (goAwayReceived ) {
941                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
942                 }
943 
944                 if (!localConfig.isPushEnabled()) {
945                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
946                 }
947 
948                 final H2Stream stream = getValidStream(streamId);
949                 if (stream.isRemoteClosed()) {
950                     stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
951                     break;
952                 }
953 
954                 final ByteBuffer payload = frame.getPayloadContent();
955                 if (payload == null || payload.remaining() < 4) {
956                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
957                 }
958                 final int promisedStreamId = payload.getInt();
959                 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
960                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
961                 }
962                 if (streamMap.get(promisedStreamId) != null) {
963                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
964                 }
965 
966                 updateLastStreamId(promisedStreamId);
967 
968                 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
969                         promisedStreamId, false, initInputWinSize, initOutputWinSize);
970                 final H2StreamHandler streamHandler;
971                 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
972                     streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics,
973                             stream.getPushHandlerFactory());
974                 } else {
975                     streamHandler = NoopH2StreamHandler.INSTANCE;
976                     channel.setLocalEndStream();
977                 }
978 
979                 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
980                 streamMap.put(promisedStreamId, promisedStream);
981 
982                 try {
983                     consumePushPromiseFrame(frame, payload, promisedStream);
984                 } catch (final H2StreamResetException ex) {
985                     promisedStream.localReset(ex);
986                 } catch (final HttpStreamResetException ex) {
987                     promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
988                 }
989             }
990             break;
991             case GOAWAY: {
992                 if (streamId != 0) {
993                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
994                 }
995                 final ByteBuffer payload = frame.getPayload();
996                 if (payload == null || payload.remaining() < 8) {
997                     throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
998                 }
999                 final int processedLocalStreamId = payload.getInt();
1000                 final int errorCode = payload.getInt();
1001                 goAwayReceived = true;
1002                 if (errorCode == H2Error.NO_ERROR.getCode()) {
1003                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
1004                         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1005                             final Map.Entry<Integer, H2Stream> entry = it.next();
1006                             final int activeStreamId = entry.getKey();
1007                             if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
1008                                 final H2Stream stream = entry.getValue();
1009                                 stream.cancel();
1010                                 it.remove();
1011                             }
1012                         }
1013                     }
1014                     connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
1015                 } else {
1016                     for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1017                         final Map.Entry<Integer, H2Stream> entry = it.next();
1018                         final H2Stream stream = entry.getValue();
1019                         stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
1020                     }
1021                     streamMap.clear();
1022                     connState = ConnectionHandshake.SHUTDOWN;
1023                 }
1024             }
1025             ioSession.setEvent(SelectionKey.OP_WRITE);
1026             break;
1027         }
1028     }
1029 
1030     private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1031         final int streamId = stream.getId();
1032         final ByteBuffer payload = frame.getPayloadContent();
1033         if (payload != null) {
1034             final int frameLength = frame.getLength();
1035             final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1036             if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1037                 stream.produceInputCapacityUpdate();
1038             }
1039             final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1040             if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1041                 maximizeConnWindow(connWinSize);
1042             }
1043         }
1044         if (stream.isRemoteClosed()) {
1045             throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1046         }
1047         if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1048             stream.setRemoteEndStream();
1049         }
1050         if (stream.isLocalReset()) {
1051             return;
1052         }
1053         stream.consumeData(payload);
1054     }
1055 
1056     private void maximizeConnWindow(final int connWinSize) throws IOException {
1057         final int delta = Integer.MAX_VALUE - connWinSize;
1058         if (delta > 0) {
1059             final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1060             commitFrame(windowUpdateFrame);
1061             updateInputWindow(0, connInputWindow, delta);
1062         }
1063     }
1064 
1065     private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1066         final int promisedStreamId = promisedStream.getId();
1067         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1068             continuation = new Continuation(promisedStreamId, frame.getType(), true);
1069         }
1070         if (continuation == null) {
1071             final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1072             if (promisedStreamId > processedRemoteStreamId) {
1073                 processedRemoteStreamId = promisedStreamId;
1074             }
1075             if (streamListener != null) {
1076                 streamListener.onHeaderInput(this, promisedStreamId, headers);
1077             }
1078             promisedStream.consumePromise(headers);
1079         } else {
1080             continuation.copyPayload(payload);
1081         }
1082     }
1083 
1084     List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1085         return hPackDecoder.decodeHeaders(payload);
1086     }
1087 
1088     private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1089         final int streamId = stream.getId();
1090         if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1091             continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1092         }
1093         final ByteBuffer payload = frame.getPayloadContent();
1094         if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1095             // Priority not supported
1096             payload.getInt();
1097             payload.get();
1098         }
1099         if (continuation == null) {
1100             final List<Header> headers = decodeHeaders(payload);
1101             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1102                 processedRemoteStreamId = streamId;
1103             }
1104             if (streamListener != null) {
1105                 streamListener.onHeaderInput(this, streamId, headers);
1106             }
1107             if (stream.isRemoteClosed()) {
1108                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1109             }
1110             if (stream.isLocalReset()) {
1111                 return;
1112             }
1113             if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1114                 stream.setRemoteEndStream();
1115             }
1116             stream.consumeHeader(headers);
1117         } else {
1118             continuation.copyPayload(payload);
1119         }
1120     }
1121 
1122     private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1123         final int streamId = frame.getStreamId();
1124         final ByteBuffer payload = frame.getPayload();
1125         continuation.copyPayload(payload);
1126         if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1127             final List<Header> headers = decodeHeaders(continuation.getContent());
1128             if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1129                 processedRemoteStreamId = streamId;
1130             }
1131             if (streamListener != null) {
1132                 streamListener.onHeaderInput(this, streamId, headers);
1133             }
1134             if (stream.isRemoteClosed()) {
1135                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1136             }
1137             if (stream.isLocalReset()) {
1138                 return;
1139             }
1140             if (continuation.endStream) {
1141                 stream.setRemoteEndStream();
1142             }
1143             if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1144                 stream.consumePromise(headers);
1145             } else {
1146                 stream.consumeHeader(headers);
1147             }
1148             continuation = null;
1149         }
1150     }
1151 
1152     private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1153         final H2Config.Builder configBuilder = H2Config.initial();
1154         while (payload.hasRemaining()) {
1155             final int code = payload.getShort();
1156             final int value = payload.getInt();
1157             final H2Param param = H2Param.valueOf(code);
1158             if (param != null) {
1159                 switch (param) {
1160                     case HEADER_TABLE_SIZE:
1161                         try {
1162                             configBuilder.setHeaderTableSize(value);
1163                         } catch (final IllegalArgumentException ex) {
1164                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1165                         }
1166                         break;
1167                     case MAX_CONCURRENT_STREAMS:
1168                         try {
1169                             configBuilder.setMaxConcurrentStreams(value);
1170                         } catch (final IllegalArgumentException ex) {
1171                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1172                         }
1173                         break;
1174                     case ENABLE_PUSH:
1175                         configBuilder.setPushEnabled(value == 1);
1176                         break;
1177                     case INITIAL_WINDOW_SIZE:
1178                         try {
1179                             configBuilder.setInitialWindowSize(value);
1180                         } catch (final IllegalArgumentException ex) {
1181                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1182                         }
1183                         break;
1184                     case MAX_FRAME_SIZE:
1185                         try {
1186                             configBuilder.setMaxFrameSize(value);
1187                         } catch (final IllegalArgumentException ex) {
1188                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1189                         }
1190                         break;
1191                     case MAX_HEADER_LIST_SIZE:
1192                         try {
1193                             configBuilder.setMaxHeaderListSize(value);
1194                         } catch (final IllegalArgumentException ex) {
1195                             throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1196                         }
1197                         break;
1198                 }
1199             }
1200         }
1201         applyRemoteSettings(configBuilder.build());
1202     }
1203 
1204     private void produceOutput() throws HttpException, IOException {
1205         for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1206             final Map.Entry<Integer, H2Stream> entry = it.next();
1207             final H2Stream stream = entry.getValue();
1208             if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1209                 stream.produceOutput();
1210             }
1211             if (stream.isTerminated()) {
1212                 it.remove();
1213                 stream.releaseResources();
1214                 requestSessionOutput();
1215             }
1216             if (!outputQueue.isEmpty()) {
1217                 break;
1218             }
1219         }
1220     }
1221 
1222     private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1223         remoteConfig = config;
1224 
1225         hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1226         final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1227         initOutputWinSize = remoteConfig.getInitialWindowSize();
1228 
1229         final int maxFrameSize = remoteConfig.getMaxFrameSize();
1230         if (maxFrameSize > localConfig.getMaxFrameSize()) {
1231             outputBuffer.expand(maxFrameSize);
1232         }
1233 
1234         if (delta != 0) {
1235             if (!streamMap.isEmpty()) {
1236                 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1237                     final Map.Entry<Integer, H2Stream> entry = it.next();
1238                     final H2Stream stream = entry.getValue();
1239                     try {
1240                         updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1241                     } catch (final ArithmeticException ex) {
1242                         throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1243                     }
1244                 }
1245             }
1246         }
1247     }
1248 
1249     private void applyLocalSettings() throws H2ConnectionException {
1250         hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1251         hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1252 
1253         final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1254         initInputWinSize = localConfig.getInitialWindowSize();
1255 
1256         if (delta != 0 && !streamMap.isEmpty()) {
1257             for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1258                 final Map.Entry<Integer, H2Stream> entry = it.next();
1259                 final H2Stream stream = entry.getValue();
1260                 try {
1261                     updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1262                 } catch (final ArithmeticException ex) {
1263                     throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1264                 }
1265             }
1266         }
1267         lowMark = initInputWinSize / 2;
1268     }
1269 
1270     @Override
1271     public void close() throws IOException {
1272         ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1273     }
1274 
1275     @Override
1276     public void close(final CloseMode closeMode) {
1277         ioSession.close(closeMode);
1278     }
1279 
1280     @Override
1281     public boolean isOpen() {
1282         return connState == ConnectionHandshake.ACTIVE;
1283     }
1284 
1285     @Override
1286     public void setSocketTimeout(final Timeout timeout) {
1287         ioSession.setSocketTimeout(timeout);
1288     }
1289 
1290     @Override
1291     public SSLSession getSSLSession() {
1292         final TlsDetails tlsDetails = ioSession.getTlsDetails();
1293         return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1294     }
1295 
1296     @Override
1297     public EndpointDetails getEndpointDetails() {
1298         if (endpointDetails == null) {
1299             endpointDetails = new BasicEndpointDetails(
1300                     ioSession.getRemoteAddress(),
1301                     ioSession.getLocalAddress(),
1302                     connMetrics,
1303                     ioSession.getSocketTimeout());
1304         }
1305         return endpointDetails;
1306     }
1307 
1308     @Override
1309     public Timeout getSocketTimeout() {
1310         return ioSession.getSocketTimeout();
1311     }
1312 
1313     @Override
1314     public ProtocolVersion getProtocolVersion() {
1315         return HttpVersion.HTTP_2;
1316     }
1317 
1318     @Override
1319     public SocketAddress getRemoteAddress() {
1320         return ioSession.getRemoteAddress();
1321     }
1322 
1323     @Override
1324     public SocketAddress getLocalAddress() {
1325         return ioSession.getLocalAddress();
1326     }
1327 
1328     void appendState(final StringBuilder buf) {
1329         buf.append("connState=").append(connState)
1330                 .append(", connInputWindow=").append(connInputWindow)
1331                 .append(", connOutputWindow=").append(connOutputWindow)
1332                 .append(", outputQueue=").append(outputQueue.size())
1333                 .append(", streamMap=").append(streamMap.size())
1334                 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1335     }
1336 
1337     private static class Continuation {
1338 
1339         final int streamId;
1340         final int type;
1341         final boolean endStream;
1342         final ByteArrayBuffer headerBuffer;
1343 
1344         private Continuation(final int streamId, final int type, final boolean endStream) {
1345             this.streamId = streamId;
1346             this.type = type;
1347             this.endStream = endStream;
1348             this.headerBuffer = new ByteArrayBuffer(1024);
1349         }
1350 
1351         void copyPayload(final ByteBuffer payload) {
1352             if (payload == null) {
1353                 return;
1354             }
1355             headerBuffer.ensureCapacity(payload.remaining());
1356             payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1357         }
1358 
1359         ByteBuffer getContent() {
1360             return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1361         }
1362 
1363     }
1364 
1365     private class H2StreamChannelImpl implements H2StreamChannel {
1366 
1367         private final int id;
1368         private final AtomicInteger inputWindow;
1369         private final AtomicInteger outputWindow;
1370 
1371         private volatile boolean idle;
1372         private volatile boolean remoteEndStream;
1373         private volatile boolean localEndStream;
1374 
1375         private volatile long deadline;
1376 
1377         H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1378             this.id = id;
1379             this.idle = idle;
1380             this.inputWindow = new AtomicInteger(initialInputWindowSize);
1381             this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1382         }
1383 
1384         int getId() {
1385             return id;
1386         }
1387 
1388         AtomicInteger getOutputWindow() {
1389             return outputWindow;
1390         }
1391 
1392         AtomicInteger getInputWindow() {
1393             return inputWindow;
1394         }
1395 
1396         @Override
1397         public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1398             ioSession.getLock().lock();
1399             try {
1400                 if (headers == null || headers.isEmpty()) {
1401                     throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1402                 }
1403                 if (localEndStream) {
1404                     return;
1405                 }
1406                 idle = false;
1407                 commitHeaders(id, headers, endStream);
1408                 if (endStream) {
1409                     localEndStream = true;
1410                 }
1411             } finally {
1412                 ioSession.getLock().unlock();
1413             }
1414         }
1415 
1416         @Override
1417         public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1418             acceptPushRequest();
1419             final int promisedStreamId = generateStreamId();
1420             final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1421                     promisedStreamId,
1422                     true,
1423                     localConfig.getInitialWindowSize(),
1424                     remoteConfig.getInitialWindowSize());
1425             final HttpCoreContext context = HttpCoreContext.create();
1426             context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1427             context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1428             final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1429                     channel, httpProcessor, connMetrics, pushProducer, context);
1430             final H2Stream stream = new H2Stream(channel, streamHandler, false);
1431             streamMap.put(promisedStreamId, stream);
1432 
1433             ioSession.getLock().lock();
1434             try {
1435                 if (localEndStream) {
1436                     stream.releaseResources();
1437                     return;
1438                 }
1439                 commitPushPromise(id, promisedStreamId, headers);
1440                 idle = false;
1441             } finally {
1442                 ioSession.getLock().unlock();
1443             }
1444         }
1445 
1446         @Override
1447         public void update(final int increment) throws IOException {
1448             if (remoteEndStream) {
1449                 return;
1450             }
1451             incrementInputCapacity(0, connInputWindow, increment);
1452             incrementInputCapacity(id, inputWindow, increment);
1453         }
1454 
1455         @Override
1456         public int write(final ByteBuffer payload) throws IOException {
1457             ioSession.getLock().lock();
1458             try {
1459                 if (localEndStream) {
1460                     return 0;
1461                 }
1462                 return streamData(id, outputWindow, payload);
1463             } finally {
1464                 ioSession.getLock().unlock();
1465             }
1466         }
1467 
1468         @Override
1469         public void endStream(final List<? extends Header> trailers) throws IOException {
1470             ioSession.getLock().lock();
1471             try {
1472                 if (localEndStream) {
1473                     return;
1474                 }
1475                 localEndStream = true;
1476                 if (trailers != null && !trailers.isEmpty()) {
1477                     commitHeaders(id, trailers, true);
1478                 } else {
1479                     final RawFrame frame = frameFactory.createData(id, null, true);
1480                     commitFrameInternal(frame);
1481                 }
1482             } finally {
1483                 ioSession.getLock().unlock();
1484             }
1485         }
1486 
1487         @Override
1488         public void endStream() throws IOException {
1489             endStream(null);
1490         }
1491 
1492         @Override
1493         public void requestOutput() {
1494             requestSessionOutput();
1495         }
1496 
1497         boolean isRemoteClosed() {
1498             return remoteEndStream;
1499         }
1500 
1501         void setRemoteEndStream() {
1502             remoteEndStream = true;
1503         }
1504 
1505         boolean isLocalClosed() {
1506             return localEndStream;
1507         }
1508 
1509         void setLocalEndStream() {
1510             localEndStream = true;
1511         }
1512 
1513         boolean isLocalReset() {
1514             return deadline > 0;
1515         }
1516 
1517         boolean isResetDeadline() {
1518             final long l = deadline;
1519             return l > 0 && l < System.currentTimeMillis();
1520         }
1521 
1522         boolean localReset(final int code) throws IOException {
1523             ioSession.getLock().lock();
1524             try {
1525                 if (localEndStream) {
1526                     return false;
1527                 }
1528                 localEndStream = true;
1529                 deadline = System.currentTimeMillis() + LINGER_TIME;
1530                 if (!idle) {
1531                     final RawFrame resetStream = frameFactory.createResetStream(id, code);
1532                     commitFrameInternal(resetStream);
1533                     return true;
1534                 }
1535                 return false;
1536             } finally {
1537                 ioSession.getLock().unlock();
1538             }
1539         }
1540 
1541         boolean localReset(final H2Error error) throws IOException {
1542             return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1543         }
1544 
1545         @Override
1546         public boolean cancel() {
1547             try {
1548                 return localReset(H2Error.CANCEL);
1549             } catch (final IOException ignore) {
1550                 return false;
1551             }
1552         }
1553 
1554         void appendState(final StringBuilder buf) {
1555             buf.append("id=").append(id)
1556                     .append(", connState=").append(connState)
1557                     .append(", inputWindow=").append(inputWindow)
1558                     .append(", outputWindow=").append(outputWindow)
1559                     .append(", localEndStream=").append(localEndStream)
1560                     .append(", idle=").append(idle);
1561         }
1562 
1563         @Override
1564         public String toString() {
1565             final StringBuilder buf = new StringBuilder();
1566             buf.append("[");
1567             appendState(buf);
1568             buf.append("]");
1569             return buf.toString();
1570         }
1571 
1572     }
1573 
1574     static class H2Stream {
1575 
1576         private final H2StreamChannelImpl channel;
1577         private final H2StreamHandler handler;
1578         private final boolean remoteInitiated;
1579 
1580         private H2Stream(
1581                 final H2StreamChannelImpl channel,
1582                 final H2StreamHandler handler,
1583                 final boolean remoteInitiated) {
1584             this.channel = channel;
1585             this.handler = handler;
1586             this.remoteInitiated = remoteInitiated;
1587         }
1588 
1589         int getId() {
1590             return channel.getId();
1591         }
1592 
1593         boolean isRemoteInitiated() {
1594             return remoteInitiated;
1595         }
1596 
1597         AtomicInteger getOutputWindow() {
1598             return channel.getOutputWindow();
1599         }
1600 
1601         AtomicInteger getInputWindow() {
1602             return channel.getInputWindow();
1603         }
1604 
1605         boolean isTerminated() {
1606             return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1607         }
1608 
1609         boolean isRemoteClosed() {
1610             return channel.isRemoteClosed();
1611         }
1612 
1613         boolean isLocalClosed() {
1614             return channel.isLocalClosed();
1615         }
1616 
1617         boolean isLocalReset() {
1618             return channel.isLocalReset();
1619         }
1620 
1621         void setRemoteEndStream() {
1622             channel.setRemoteEndStream();
1623         }
1624 
1625         void consumePromise(final List<Header> headers) throws HttpException, IOException {
1626             try {
1627                 handler.consumePromise(headers);
1628                 channel.setLocalEndStream();
1629             } catch (final ProtocolException ex) {
1630                 localReset(ex, H2Error.PROTOCOL_ERROR);
1631             }
1632         }
1633 
1634         void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1635             try {
1636                 handler.consumeHeader(headers, channel.isRemoteClosed());
1637             } catch (final ProtocolException ex) {
1638                 localReset(ex, H2Error.PROTOCOL_ERROR);
1639             }
1640         }
1641 
1642         void consumeData(final ByteBuffer src) throws HttpException, IOException {
1643             try {
1644                 handler.consumeData(src, channel.isRemoteClosed());
1645             } catch (final CharacterCodingException ex) {
1646                 localReset(ex, H2Error.INTERNAL_ERROR);
1647             } catch (final ProtocolException ex) {
1648                 localReset(ex, H2Error.PROTOCOL_ERROR);
1649             }
1650         }
1651 
1652         boolean isOutputReady() {
1653             return handler.isOutputReady();
1654         }
1655 
1656         void produceOutput() throws HttpException, IOException {
1657             try {
1658                 handler.produceOutput();
1659             } catch (final ProtocolException ex) {
1660                 localReset(ex, H2Error.PROTOCOL_ERROR);
1661             }
1662         }
1663 
1664         void produceInputCapacityUpdate() throws IOException {
1665             handler.updateInputCapacity();
1666         }
1667 
1668         void reset(final Exception cause) {
1669             channel.setRemoteEndStream();
1670             channel.setLocalEndStream();
1671             handler.failed(cause);
1672         }
1673 
1674         void localReset(final Exception cause, final int code) throws IOException {
1675             channel.localReset(code);
1676             handler.failed(cause);
1677         }
1678 
1679         void localReset(final Exception cause, final H2Error error) throws IOException {
1680             localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1681         }
1682 
1683         void localReset(final H2StreamResetException ex) throws IOException {
1684             localReset(ex, ex.getCode());
1685         }
1686 
1687         void handle(final HttpExceptionn.html#HttpException">HttpException ex) throws IOException, HttpException {
1688             handler.handle(ex, channel.isRemoteClosed());
1689         }
1690 
1691         HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1692             return handler.getPushHandlerFactory();
1693         }
1694 
1695         void cancel() {
1696             reset(new RequestNotExecutedException());
1697         }
1698 
1699         boolean abort() {
1700             final boolean cancelled = channel.cancel();
1701             handler.failed(new RequestNotExecutedException());
1702             return cancelled;
1703         }
1704 
1705         void releaseResources() {
1706             handler.releaseResources();
1707         }
1708 
1709         void appendState(final StringBuilder buf) {
1710             buf.append("channel=[");
1711             channel.appendState(buf);
1712             buf.append("]");
1713         }
1714 
1715         @Override
1716         public String toString() {
1717             final StringBuilder buf = new StringBuilder();
1718             buf.append("[");
1719             appendState(buf);
1720             buf.append("]");
1721             return buf.toString();
1722         }
1723 
1724     }
1725 
1726 }