1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.net.SocketAddress;
31 import java.nio.BufferOverflowException;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.SelectionKey;
34 import java.nio.charset.CharacterCodingException;
35 import java.util.Deque;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Queue;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.ConcurrentLinkedDeque;
42 import java.util.concurrent.ConcurrentLinkedQueue;
43 import java.util.concurrent.atomic.AtomicInteger;
44
45 import javax.net.ssl.SSLSession;
46
47 import org.apache.hc.core5.concurrent.CancellableDependency;
48 import org.apache.hc.core5.http.ConnectionClosedException;
49 import org.apache.hc.core5.http.EndpointDetails;
50 import org.apache.hc.core5.http.Header;
51 import org.apache.hc.core5.http.HttpConnection;
52 import org.apache.hc.core5.http.HttpException;
53 import org.apache.hc.core5.http.HttpStreamResetException;
54 import org.apache.hc.core5.http.HttpVersion;
55 import org.apache.hc.core5.http.ProtocolException;
56 import org.apache.hc.core5.http.ProtocolVersion;
57 import org.apache.hc.core5.http.RequestNotExecutedException;
58 import org.apache.hc.core5.http.config.CharCodingConfig;
59 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
60 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
61 import org.apache.hc.core5.http.impl.CharCodingSupport;
62 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
63 import org.apache.hc.core5.http.nio.AsyncPushProducer;
64 import org.apache.hc.core5.http.nio.HandlerFactory;
65 import org.apache.hc.core5.http.nio.command.ExecutableCommand;
66 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
67 import org.apache.hc.core5.http.protocol.HttpCoreContext;
68 import org.apache.hc.core5.http.protocol.HttpProcessor;
69 import org.apache.hc.core5.http2.H2ConnectionException;
70 import org.apache.hc.core5.http2.H2Error;
71 import org.apache.hc.core5.http2.H2StreamResetException;
72 import org.apache.hc.core5.http2.config.H2Config;
73 import org.apache.hc.core5.http2.config.H2Param;
74 import org.apache.hc.core5.http2.config.H2Setting;
75 import org.apache.hc.core5.http2.frame.FrameFactory;
76 import org.apache.hc.core5.http2.frame.FrameFlag;
77 import org.apache.hc.core5.http2.frame.FrameType;
78 import org.apache.hc.core5.http2.frame.RawFrame;
79 import org.apache.hc.core5.http2.frame.StreamIdGenerator;
80 import org.apache.hc.core5.http2.hpack.HPackDecoder;
81 import org.apache.hc.core5.http2.hpack.HPackEncoder;
82 import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
83 import org.apache.hc.core5.http2.nio.AsyncPingHandler;
84 import org.apache.hc.core5.http2.nio.command.PingCommand;
85 import org.apache.hc.core5.io.CloseMode;
86 import org.apache.hc.core5.reactor.Command;
87 import org.apache.hc.core5.reactor.ProtocolIOSession;
88 import org.apache.hc.core5.reactor.ssl.TlsDetails;
89 import org.apache.hc.core5.util.Args;
90 import org.apache.hc.core5.util.ByteArrayBuffer;
91 import org.apache.hc.core5.util.Identifiable;
92 import org.apache.hc.core5.util.Timeout;
93
94 abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
95
96 private static final long LINGER_TIME = 1000;
97 private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024;
98
99 enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
100 enum SettingsHandshake { READY, TRANSMITTED, ACKED }
101
102 private final ProtocolIOSession ioSession;
103 private final FrameFactory frameFactory;
104 private final StreamIdGenerator idGenerator;
105 private final HttpProcessor httpProcessor;
106 private final H2Config localConfig;
107 private final BasicH2TransportMetrics inputMetrics;
108 private final BasicH2TransportMetrics outputMetrics;
109 private final BasicHttpConnectionMetrics connMetrics;
110 private final FrameInputBuffer inputBuffer;
111 private final FrameOutputBuffer outputBuffer;
112 private final Deque<RawFrame> outputQueue;
113 private final HPackEncoder hPackEncoder;
114 private final HPackDecoder hPackDecoder;
115 private final Map<Integer, H2Stream> streamMap;
116 private final Queue<AsyncPingHandler> pingHandlers;
117 private final AtomicInteger connInputWindow;
118 private final AtomicInteger connOutputWindow;
119 private final AtomicInteger outputRequests;
120 private final AtomicInteger lastStreamId;
121 private final H2StreamListener streamListener;
122
123 private ConnectionHandshake connState = ConnectionHandshake.READY;
124 private SettingsHandshake localSettingState = SettingsHandshake.READY;
125 private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
126
127 private int initInputWinSize;
128 private int initOutputWinSize;
129 private int lowMark;
130
131 private volatile H2Config remoteConfig;
132
133 private Continuation continuation;
134
135 private int processedRemoteStreamId;
136 private EndpointDetails endpointDetails;
137 private boolean goAwayReceived;
138
139 AbstractH2StreamMultiplexer(
140 final ProtocolIOSession ioSession,
141 final FrameFactory frameFactory,
142 final StreamIdGenerator idGenerator,
143 final HttpProcessor httpProcessor,
144 final CharCodingConfig charCodingConfig,
145 final H2Config h2Config,
146 final H2StreamListener streamListener) {
147 this.ioSession = Args.notNull(ioSession, "IO session");
148 this.frameFactory = Args.notNull(frameFactory, "Frame factory");
149 this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
150 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
151 this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
152 this.inputMetrics = new BasicH2TransportMetrics();
153 this.outputMetrics = new BasicH2TransportMetrics();
154 this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
155 this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
156 this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
157 this.outputQueue = new ConcurrentLinkedDeque<>();
158 this.pingHandlers = new ConcurrentLinkedQueue<>();
159 this.outputRequests = new AtomicInteger(0);
160 this.lastStreamId = new AtomicInteger(0);
161 this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
162 this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
163 this.streamMap = new ConcurrentHashMap<>();
164 this.remoteConfig = H2Config.INIT;
165 this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
166 this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
167
168 this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
169 this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
170
171 this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
172 this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
173 this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
174
175 this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
176 this.streamListener = streamListener;
177 }
178
179 @Override
180 public String getId() {
181 return ioSession.getId();
182 }
183
184 abstract void acceptHeaderFrame() throws H2ConnectionException;
185
186 abstract void acceptPushRequest() throws H2ConnectionException;
187
188 abstract void acceptPushFrame() throws H2ConnectionException;
189
190 abstract H2StreamHandler createRemotelyInitiatedStream(
191 H2StreamChannel channel,
192 HttpProcessor httpProcessor,
193 BasicHttpConnectionMetrics connMetrics,
194 HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
195
196 abstract H2StreamHandler createLocallyInitiatedStream(
197 ExecutableCommand command,
198 H2StreamChannel channel,
199 HttpProcessor httpProcessor,
200 BasicHttpConnectionMetrics connMetrics) throws IOException;
201
202 private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
203 for (;;) {
204 final int current = window.get();
205 long newValue = (long) current + delta;
206
207
208
209 if (newValue == 0x80000000L) {
210 newValue = Integer.MAX_VALUE;
211 }
212
213
214 if (Math.abs(newValue) > 0x7fffffffL) {
215 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
216 }
217 if (window.compareAndSet(current, (int) newValue)) {
218 return (int) newValue;
219 }
220 }
221 }
222
223 private int updateInputWindow(
224 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
225 final int newSize = updateWindow(window, delta);
226 if (streamListener != null) {
227 streamListener.onInputFlowControl(this, streamId, delta, newSize);
228 }
229 return newSize;
230 }
231
232 private int updateOutputWindow(
233 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
234 final int newSize = updateWindow(window, delta);
235 if (streamListener != null) {
236 streamListener.onOutputFlowControl(this, streamId, delta, newSize);
237 }
238 return newSize;
239 }
240
241 private void commitFrameInternal(final RawFrame frame) throws IOException {
242 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
243 if (streamListener != null) {
244 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
245 }
246 outputBuffer.write(frame, ioSession);
247 } else {
248 outputQueue.addLast(frame);
249 }
250 ioSession.setEvent(SelectionKey.OP_WRITE);
251 }
252
253 private void commitFrame(final RawFrame frame) throws IOException {
254 Args.notNull(frame, "Frame");
255 ioSession.getLock().lock();
256 try {
257 commitFrameInternal(frame);
258 } finally {
259 ioSession.getLock().unlock();
260 }
261 }
262
263 private void commitHeaders(
264 final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
265 if (streamListener != null) {
266 streamListener.onHeaderOutput(this, streamId, headers);
267 }
268 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
269 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
270
271 int off = 0;
272 int remaining = buf.length();
273 boolean continuation = false;
274
275 while (remaining > 0) {
276 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
277 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
278
279 remaining -= chunk;
280 off += chunk;
281
282 final boolean endHeaders = remaining == 0;
283 final RawFrame frame;
284 if (!continuation) {
285 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
286 continuation = true;
287 } else {
288 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
289 }
290 commitFrameInternal(frame);
291 }
292 }
293
294 private void commitPushPromise(
295 final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
296 if (headers == null || headers.isEmpty()) {
297 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
298 }
299 if (streamListener != null) {
300 streamListener.onHeaderOutput(this, streamId, headers);
301 }
302 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
303 buf.append((byte)(promisedStreamId >> 24));
304 buf.append((byte)(promisedStreamId >> 16));
305 buf.append((byte)(promisedStreamId >> 8));
306 buf.append((byte)(promisedStreamId));
307
308 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
309
310 int off = 0;
311 int remaining = buf.length();
312 boolean continuation = false;
313
314 while (remaining > 0) {
315 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
316 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
317
318 remaining -= chunk;
319 off += chunk;
320
321 final boolean endHeaders = remaining == 0;
322 final RawFrame frame;
323 if (!continuation) {
324 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
325 continuation = true;
326 } else {
327 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
328 }
329 commitFrameInternal(frame);
330 }
331 }
332
333 private void streamDataFrame(
334 final int streamId,
335 final AtomicInteger streamOutputWindow,
336 final ByteBuffer payload,
337 final int chunk) throws IOException {
338 final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
339 if (streamListener != null) {
340 streamListener.onFrameOutput(this, streamId, dataFrame);
341 }
342 updateOutputWindow(0, connOutputWindow, -chunk);
343 updateOutputWindow(streamId, streamOutputWindow, -chunk);
344 outputBuffer.write(dataFrame, ioSession);
345 }
346
347 private int streamData(
348 final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
349 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
350 final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
351 if (capacity <= 0) {
352 return 0;
353 }
354 final int maxPayloadSize = Math.min(capacity, outputBuffer.getMaxFramePayloadSize());
355 final int chunk;
356 if (payload.remaining() <= maxPayloadSize) {
357 chunk = payload.remaining();
358 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
359 } else {
360 chunk = maxPayloadSize;
361 final int originalLimit = payload.limit();
362 try {
363 payload.limit(payload.position() + chunk);
364 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
365 } finally {
366 payload.limit(originalLimit);
367 }
368 }
369 payload.position(payload.position() + chunk);
370 ioSession.setEvent(SelectionKey.OP_WRITE);
371 return chunk;
372 }
373 return 0;
374 }
375
376 private void incrementInputCapacity(
377 final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
378 if (inputCapacity > 0) {
379 final int streamWinSize = inputWindow.get();
380 final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
381 final int chunk = Math.min(inputCapacity, remainingCapacity);
382 if (chunk != 0) {
383 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
384 commitFrame(windowUpdateFrame);
385 updateInputWindow(streamId, inputWindow, chunk);
386 }
387 }
388 }
389
390 private void requestSessionOutput() {
391 outputRequests.incrementAndGet();
392 ioSession.setEvent(SelectionKey.OP_WRITE);
393 }
394
395 private void updateLastStreamId(final int streamId) {
396 final int currentId = lastStreamId.get();
397 if (streamId > currentId) {
398 lastStreamId.compareAndSet(currentId, streamId);
399 }
400 }
401
402 private int generateStreamId() {
403 for (;;) {
404 final int currentId = lastStreamId.get();
405 final int newStreamId = idGenerator.generate(currentId);
406 if (lastStreamId.compareAndSet(currentId, newStreamId)) {
407 return newStreamId;
408 }
409 }
410 }
411
412 public final void onConnect() throws HttpException, IOException {
413 connState = ConnectionHandshake.ACTIVE;
414 final RawFrame settingsFrame = frameFactory.createSettings(
415 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
416 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
417 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
418 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
419 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
420 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
421
422 commitFrame(settingsFrame);
423 localSettingState = SettingsHandshake.TRANSMITTED;
424 maximizeConnWindow(connInputWindow.get());
425
426 if (streamListener != null) {
427 final int initInputWindow = connInputWindow.get();
428 streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
429 final int initOutputWindow = connOutputWindow.get();
430 streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
431 }
432 }
433
434 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
435 if (connState == ConnectionHandshake.SHUTDOWN) {
436 ioSession.clearEvent(SelectionKey.OP_READ);
437 } else {
438 for (;;) {
439 final RawFrame frame = inputBuffer.read(src, ioSession);
440 if (frame == null) {
441 break;
442 }
443 if (streamListener != null) {
444 streamListener.onFrameInput(this, frame.getStreamId(), frame);
445 }
446 consumeFrame(frame);
447 }
448 }
449 }
450
451 public final void onOutput() throws HttpException, IOException {
452 ioSession.getLock().lock();
453 try {
454 if (!outputBuffer.isEmpty()) {
455 outputBuffer.flush(ioSession);
456 }
457 while (outputBuffer.isEmpty()) {
458 final RawFrame frame = outputQueue.poll();
459 if (frame != null) {
460 if (streamListener != null) {
461 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
462 }
463 outputBuffer.write(frame, ioSession);
464 } else {
465 break;
466 }
467 }
468 } finally {
469 ioSession.getLock().unlock();
470 }
471
472 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
473
474 if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
475 produceOutput();
476 }
477 final int pendingOutputRequests = outputRequests.get();
478 boolean outputPending = false;
479 if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
480 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
481 final Map.Entry<Integer, H2Stream> entry = it.next();
482 final H2Stream stream = entry.getValue();
483 if (!stream.isLocalClosed()
484 && stream.getOutputWindow().get() > 0
485 && stream.isOutputReady()) {
486 outputPending = true;
487 break;
488 }
489 }
490 }
491 ioSession.getLock().lock();
492 try {
493 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
494 && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
495 ioSession.clearEvent(SelectionKey.OP_WRITE);
496 } else {
497 outputRequests.addAndGet(-pendingOutputRequests);
498 }
499 } finally {
500 ioSession.getLock().unlock();
501 }
502 }
503
504 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
505 processPendingCommands();
506 }
507 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
508 int liveStreams = 0;
509 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
510 final Map.Entry<Integer, H2Stream> entry = it.next();
511 final H2Stream stream = entry.getValue();
512 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
513 stream.releaseResources();
514 it.remove();
515 } else {
516 if (idGenerator.isSameSide(stream.getId()) || stream.getId() <= processedRemoteStreamId) {
517 liveStreams++;
518 }
519 }
520 }
521 if (liveStreams == 0) {
522 connState = ConnectionHandshake.SHUTDOWN;
523 }
524 }
525 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
526 if (!streamMap.isEmpty()) {
527 for (final H2Stream stream : streamMap.values()) {
528 stream.releaseResources();
529 }
530 streamMap.clear();
531 }
532 ioSession.getLock().lock();
533 try {
534 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
535 ioSession.close();
536 }
537 } finally {
538 ioSession.getLock().unlock();
539 }
540 }
541 }
542
543 public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
544 connState = ConnectionHandshake.SHUTDOWN;
545
546 final RawFrame goAway;
547 if (localSettingState != SettingsHandshake.ACKED) {
548 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
549 "Setting timeout (" + timeout + ")");
550 } else {
551 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
552 "Timeout due to inactivity (" + timeout + ")");
553 }
554 commitFrame(goAway);
555 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
556 final Map.Entry<Integer, H2Stream> entry = it.next();
557 final H2Stream stream = entry.getValue();
558 stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
559 }
560 streamMap.clear();
561 }
562
563 public final void onDisconnect() {
564 for (;;) {
565 final AsyncPingHandler pingHandler = pingHandlers.poll();
566 if (pingHandler != null) {
567 pingHandler.cancel();
568 } else {
569 break;
570 }
571 }
572 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
573 final Map.Entry<Integer, H2Stream> entry = it.next();
574 final H2Stream stream = entry.getValue();
575 stream.cancel();
576 }
577 for (;;) {
578 final Command command = ioSession.poll();
579 if (command != null) {
580 if (command instanceof ExecutableCommand) {
581 ((ExecutableCommand) command).failed(new ConnectionClosedException());
582 } else {
583 command.cancel();
584 }
585 } else {
586 break;
587 }
588 }
589 }
590
591 private void processPendingCommands() throws IOException, HttpException {
592 while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
593 final Command command = ioSession.poll();
594 if (command == null) {
595 break;
596 }
597 if (command instanceof ShutdownCommand) {
598 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
599 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
600 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
601 final Map.Entry<Integer, H2Stream> entry = it.next();
602 final H2Stream stream = entry.getValue();
603 stream.cancel();
604 }
605 streamMap.clear();
606 connState = ConnectionHandshake.SHUTDOWN;
607 } else {
608 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
609 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
610 commitFrame(goAway);
611 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
612 }
613 }
614 break;
615 } else if (command instanceof PingCommand) {
616 final PingCommand pingCommand = (PingCommand) command;
617 final AsyncPingHandler handler = pingCommand.getHandler();
618 pingHandlers.add(handler);
619 final RawFrame ping = frameFactory.createPing(handler.getData());
620 commitFrame(ping);
621 } else if (command instanceof ExecutableCommand) {
622 final int streamId = generateStreamId();
623 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
624 streamId, true, initInputWinSize, initOutputWinSize);
625 final ExecutableCommand executableCommand = (ExecutableCommand) command;
626 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
627 executableCommand, channel, httpProcessor, connMetrics);
628
629 final H2Stream stream = new H2Stream(channel, streamHandler, false);
630 streamMap.put(streamId, stream);
631
632 if (streamListener != null) {
633 final int initInputWindow = stream.getInputWindow().get();
634 streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
635 final int initOutputWindow = stream.getOutputWindow().get();
636 streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
637 }
638
639 if (stream.isOutputReady()) {
640 stream.produceOutput();
641 }
642 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
643 if (cancellableDependency != null) {
644 cancellableDependency.setDependency(stream::abort);
645 }
646 if (!outputQueue.isEmpty()) {
647 return;
648 }
649 }
650 }
651 }
652
653 public final void onException(final Exception cause) {
654 try {
655 for (;;) {
656 final AsyncPingHandler pingHandler = pingHandlers.poll();
657 if (pingHandler != null) {
658 pingHandler.failed(cause);
659 } else {
660 break;
661 }
662 }
663 for (;;) {
664 final Command command = ioSession.poll();
665 if (command != null) {
666 if (command instanceof ExecutableCommand) {
667 ((ExecutableCommand) command).failed(new ConnectionClosedException());
668 } else {
669 command.cancel();
670 }
671 } else {
672 break;
673 }
674 }
675 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
676 final Map.Entry<Integer, H2Stream> entry = it.next();
677 final H2Stream stream = entry.getValue();
678 stream.reset(cause);
679 }
680 streamMap.clear();
681 if (!(cause instanceof ConnectionClosedException)) {
682 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
683 final H2Error errorCode;
684 if (cause instanceof H2ConnectionException) {
685 errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
686 } else if (cause instanceof ProtocolException){
687 errorCode = H2Error.PROTOCOL_ERROR;
688 } else {
689 errorCode = H2Error.INTERNAL_ERROR;
690 }
691 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
692 commitFrame(goAway);
693 }
694 }
695 } catch (final IOException ignore) {
696 } finally {
697 connState = ConnectionHandshake.SHUTDOWN;
698 final CloseMode closeMode;
699 if (cause instanceof ConnectionClosedException) {
700 closeMode = CloseMode.GRACEFUL;
701 } else if (cause instanceof IOException) {
702 closeMode = CloseMode.IMMEDIATE;
703 } else {
704 closeMode = CloseMode.GRACEFUL;
705 }
706 ioSession.close(closeMode);
707 }
708 }
709
710 private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
711 if (streamId == 0) {
712 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
713 }
714 final H2Stream stream = streamMap.get(streamId);
715 if (stream == null) {
716 if (streamId <= lastStreamId.get()) {
717 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
718 } else {
719 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
720 }
721 }
722 return stream;
723 }
724
725 private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
726 final FrameType frameType = FrameType.valueOf(frame.getType());
727 final int streamId = frame.getStreamId();
728 if (continuation != null && frameType != FrameType.CONTINUATION) {
729 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
730 }
731 switch (frameType) {
732 case DATA: {
733 final H2Stream stream = getValidStream(streamId);
734 try {
735 consumeDataFrame(frame, stream);
736 } catch (final H2StreamResetException ex) {
737 stream.localReset(ex);
738 } catch (final HttpStreamResetException ex) {
739 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
740 }
741
742 if (stream.isTerminated()) {
743 streamMap.remove(streamId);
744 stream.releaseResources();
745 requestSessionOutput();
746 }
747 }
748 break;
749 case HEADERS: {
750 if (streamId == 0) {
751 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
752 }
753 H2Stream stream = streamMap.get(streamId);
754 if (stream == null) {
755 acceptHeaderFrame();
756
757 if (idGenerator.isSameSide(streamId)) {
758 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
759 }
760 if (goAwayReceived ) {
761 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
762 }
763
764 updateLastStreamId(streamId);
765
766 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
767 streamId, false, initInputWinSize, initOutputWinSize);
768 final H2StreamHandler streamHandler;
769 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
770 streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null);
771 } else {
772 streamHandler = NoopH2StreamHandler.INSTANCE;
773 channel.setLocalEndStream();
774 }
775
776 stream = new H2Stream(channel, streamHandler, true);
777 if (stream.isOutputReady()) {
778 stream.produceOutput();
779 }
780 streamMap.put(streamId, stream);
781 }
782
783 try {
784 consumeHeaderFrame(frame, stream);
785
786 if (stream.isOutputReady()) {
787 stream.produceOutput();
788 }
789 } catch (final H2StreamResetException ex) {
790 stream.localReset(ex);
791 } catch (final HttpStreamResetException ex) {
792 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
793 } catch (final HttpException ex) {
794 stream.handle(ex);
795 }
796
797 if (stream.isTerminated()) {
798 streamMap.remove(streamId);
799 stream.releaseResources();
800 requestSessionOutput();
801 }
802 }
803 break;
804 case CONTINUATION: {
805 if (continuation == null) {
806 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
807 }
808 if (streamId != continuation.streamId) {
809 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
810 }
811
812 final H2Stream stream = getValidStream(streamId);
813 try {
814
815 consumeContinuationFrame(frame, stream);
816 } catch (final H2StreamResetException ex) {
817 stream.localReset(ex);
818 } catch (final HttpStreamResetException ex) {
819 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
820 }
821
822 if (stream.isTerminated()) {
823 streamMap.remove(streamId);
824 stream.releaseResources();
825 requestSessionOutput();
826 }
827 }
828 break;
829 case WINDOW_UPDATE: {
830 final ByteBuffer payload = frame.getPayload();
831 if (payload == null || payload.remaining() != 4) {
832 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
833 }
834 final int delta = payload.getInt();
835 if (delta <= 0) {
836 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
837 }
838 if (streamId == 0) {
839 try {
840 updateOutputWindow(0, connOutputWindow, delta);
841 } catch (final ArithmeticException ex) {
842 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
843 }
844 } else {
845 final H2Stream stream = streamMap.get(streamId);
846 if (stream != null) {
847 try {
848 updateOutputWindow(streamId, stream.getOutputWindow(), delta);
849 } catch (final ArithmeticException ex) {
850 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
851 }
852 }
853 }
854 ioSession.setEvent(SelectionKey.OP_WRITE);
855 }
856 break;
857 case RST_STREAM: {
858 if (streamId == 0) {
859 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
860 }
861 final H2Stream stream = streamMap.get(streamId);
862 if (stream == null) {
863 if (streamId > lastStreamId.get()) {
864 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
865 }
866 } else {
867 final ByteBuffer payload = frame.getPayload();
868 if (payload == null || payload.remaining() != 4) {
869 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
870 }
871 final int errorCode = payload.getInt();
872 stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
873 streamMap.remove(streamId);
874 stream.releaseResources();
875 requestSessionOutput();
876 }
877 }
878 break;
879 case PING: {
880 if (streamId != 0) {
881 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
882 }
883 final ByteBuffer ping = frame.getPayloadContent();
884 if (ping == null || ping.remaining() != 8) {
885 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
886 }
887 if (frame.isFlagSet(FrameFlag.ACK)) {
888 final AsyncPingHandler pingHandler = pingHandlers.poll();
889 if (pingHandler != null) {
890 pingHandler.consumeResponse(ping);
891 }
892 } else {
893 final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
894 pong.put(ping);
895 pong.flip();
896 final RawFrame response = frameFactory.createPingAck(pong);
897 commitFrame(response);
898 }
899 }
900 break;
901 case SETTINGS: {
902 if (streamId != 0) {
903 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
904 }
905 if (frame.isFlagSet(FrameFlag.ACK)) {
906 if (localSettingState == SettingsHandshake.TRANSMITTED) {
907 localSettingState = SettingsHandshake.ACKED;
908 ioSession.setEvent(SelectionKey.OP_WRITE);
909 applyLocalSettings();
910 }
911 } else {
912 final ByteBuffer payload = frame.getPayload();
913 if (payload != null) {
914 if ((payload.remaining() % 6) != 0) {
915 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
916 }
917 consumeSettingsFrame(payload);
918 remoteSettingState = SettingsHandshake.TRANSMITTED;
919 }
920
921 final RawFrame response = frameFactory.createSettingsAck();
922 commitFrame(response);
923 remoteSettingState = SettingsHandshake.ACKED;
924 }
925 }
926 break;
927 case PRIORITY:
928
929 break;
930 case PUSH_PROMISE: {
931 acceptPushFrame();
932
933 if (goAwayReceived ) {
934 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
935 }
936
937 if (!localConfig.isPushEnabled()) {
938 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
939 }
940
941 final H2Stream stream = getValidStream(streamId);
942 if (stream.isRemoteClosed()) {
943 stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
944 break;
945 }
946
947 final ByteBuffer payload = frame.getPayloadContent();
948 if (payload == null || payload.remaining() < 4) {
949 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
950 }
951 final int promisedStreamId = payload.getInt();
952 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
953 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
954 }
955 if (streamMap.get(promisedStreamId) != null) {
956 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
957 }
958
959 updateLastStreamId(promisedStreamId);
960
961 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
962 promisedStreamId, false, initInputWinSize, initOutputWinSize);
963 final H2StreamHandler streamHandler;
964 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
965 streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics,
966 stream.getPushHandlerFactory());
967 } else {
968 streamHandler = NoopH2StreamHandler.INSTANCE;
969 channel.setLocalEndStream();
970 }
971
972 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
973 streamMap.put(promisedStreamId, promisedStream);
974
975 try {
976 consumePushPromiseFrame(frame, payload, promisedStream);
977 } catch (final H2StreamResetException ex) {
978 promisedStream.localReset(ex);
979 } catch (final HttpStreamResetException ex) {
980 promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
981 }
982 }
983 break;
984 case GOAWAY: {
985 if (streamId != 0) {
986 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
987 }
988 final ByteBuffer payload = frame.getPayload();
989 if (payload == null || payload.remaining() < 8) {
990 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
991 }
992 final int processedLocalStreamId = payload.getInt();
993 final int errorCode = payload.getInt();
994 goAwayReceived = true;
995 if (errorCode == H2Error.NO_ERROR.getCode()) {
996 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
997 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
998 final Map.Entry<Integer, H2Stream> entry = it.next();
999 final int activeStreamId = entry.getKey();
1000 if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
1001 final H2Stream stream = entry.getValue();
1002 stream.cancel();
1003 it.remove();
1004 }
1005 }
1006 }
1007 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
1008 } else {
1009 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1010 final Map.Entry<Integer, H2Stream> entry = it.next();
1011 final H2Stream stream = entry.getValue();
1012 stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
1013 }
1014 streamMap.clear();
1015 connState = ConnectionHandshake.SHUTDOWN;
1016 }
1017 }
1018 ioSession.setEvent(SelectionKey.OP_WRITE);
1019 break;
1020 }
1021 }
1022
1023 private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1024 final int streamId = stream.getId();
1025 final ByteBuffer payload = frame.getPayloadContent();
1026 if (payload != null) {
1027 final int frameLength = frame.getLength();
1028 final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1029 if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1030 stream.produceInputCapacityUpdate();
1031 }
1032 final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1033 if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1034 maximizeConnWindow(connWinSize);
1035 }
1036 }
1037 if (stream.isRemoteClosed()) {
1038 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1039 }
1040 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1041 stream.setRemoteEndStream();
1042 }
1043 if (stream.isLocalReset()) {
1044 return;
1045 }
1046 stream.consumeData(payload);
1047 }
1048
1049 private void maximizeConnWindow(final int connWinSize) throws IOException {
1050 final int delta = Integer.MAX_VALUE - connWinSize;
1051 if (delta > 0) {
1052 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1053 commitFrame(windowUpdateFrame);
1054 updateInputWindow(0, connInputWindow, delta);
1055 }
1056 }
1057
1058 private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1059 final int promisedStreamId = promisedStream.getId();
1060 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1061 continuation = new Continuation(promisedStreamId, frame.getType(), true);
1062 }
1063 if (continuation == null) {
1064 final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1065 if (promisedStreamId > processedRemoteStreamId) {
1066 processedRemoteStreamId = promisedStreamId;
1067 }
1068 if (streamListener != null) {
1069 streamListener.onHeaderInput(this, promisedStreamId, headers);
1070 }
1071 promisedStream.consumePromise(headers);
1072 } else {
1073 continuation.copyPayload(payload);
1074 }
1075 }
1076
1077 List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1078 return hPackDecoder.decodeHeaders(payload);
1079 }
1080
1081 private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1082 final int streamId = stream.getId();
1083 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1084 continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1085 }
1086 final ByteBuffer payload = frame.getPayloadContent();
1087 if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1088
1089 payload.getInt();
1090 payload.get();
1091 }
1092 if (continuation == null) {
1093 final List<Header> headers = decodeHeaders(payload);
1094 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1095 processedRemoteStreamId = streamId;
1096 }
1097 if (streamListener != null) {
1098 streamListener.onHeaderInput(this, streamId, headers);
1099 }
1100 if (stream.isRemoteClosed()) {
1101 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1102 }
1103 if (stream.isLocalReset()) {
1104 return;
1105 }
1106 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1107 stream.setRemoteEndStream();
1108 }
1109 stream.consumeHeader(headers);
1110 } else {
1111 continuation.copyPayload(payload);
1112 }
1113 }
1114
1115 private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1116 final int streamId = frame.getStreamId();
1117 final ByteBuffer payload = frame.getPayload();
1118 continuation.copyPayload(payload);
1119 if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1120 final List<Header> headers = decodeHeaders(continuation.getContent());
1121 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1122 processedRemoteStreamId = streamId;
1123 }
1124 if (streamListener != null) {
1125 streamListener.onHeaderInput(this, streamId, headers);
1126 }
1127 if (stream.isRemoteClosed()) {
1128 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1129 }
1130 if (stream.isLocalReset()) {
1131 return;
1132 }
1133 if (continuation.endStream) {
1134 stream.setRemoteEndStream();
1135 }
1136 if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1137 stream.consumePromise(headers);
1138 } else {
1139 stream.consumeHeader(headers);
1140 }
1141 continuation = null;
1142 }
1143 }
1144
1145 private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1146 final H2Config.Builder configBuilder = H2Config.initial();
1147 while (payload.hasRemaining()) {
1148 final int code = payload.getShort();
1149 final int value = payload.getInt();
1150 final H2Param param = H2Param.valueOf(code);
1151 if (param != null) {
1152 switch (param) {
1153 case HEADER_TABLE_SIZE:
1154 try {
1155 configBuilder.setHeaderTableSize(value);
1156 } catch (final IllegalArgumentException ex) {
1157 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1158 }
1159 break;
1160 case MAX_CONCURRENT_STREAMS:
1161 try {
1162 configBuilder.setMaxConcurrentStreams(value);
1163 } catch (final IllegalArgumentException ex) {
1164 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1165 }
1166 break;
1167 case ENABLE_PUSH:
1168 configBuilder.setPushEnabled(value == 1);
1169 break;
1170 case INITIAL_WINDOW_SIZE:
1171 try {
1172 configBuilder.setInitialWindowSize(value);
1173 } catch (final IllegalArgumentException ex) {
1174 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1175 }
1176 break;
1177 case MAX_FRAME_SIZE:
1178 try {
1179 configBuilder.setMaxFrameSize(value);
1180 } catch (final IllegalArgumentException ex) {
1181 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1182 }
1183 break;
1184 case MAX_HEADER_LIST_SIZE:
1185 try {
1186 configBuilder.setMaxHeaderListSize(value);
1187 } catch (final IllegalArgumentException ex) {
1188 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1189 }
1190 break;
1191 }
1192 }
1193 }
1194 applyRemoteSettings(configBuilder.build());
1195 }
1196
1197 private void produceOutput() throws HttpException, IOException {
1198 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1199 final Map.Entry<Integer, H2Stream> entry = it.next();
1200 final H2Stream stream = entry.getValue();
1201 if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1202 stream.produceOutput();
1203 }
1204 if (stream.isTerminated()) {
1205 it.remove();
1206 stream.releaseResources();
1207 requestSessionOutput();
1208 }
1209 if (!outputQueue.isEmpty()) {
1210 break;
1211 }
1212 }
1213 }
1214
1215 private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1216 remoteConfig = config;
1217
1218 hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1219 final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1220 initOutputWinSize = remoteConfig.getInitialWindowSize();
1221
1222 final int maxFrameSize = remoteConfig.getMaxFrameSize();
1223 if (maxFrameSize < outputBuffer.getMaxFramePayloadSize()) {
1224 try {
1225 outputBuffer.resize(maxFrameSize);
1226 } catch (final BufferOverflowException ex) {
1227 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Failure resizing the frame output buffer");
1228 }
1229 }
1230
1231 if (delta != 0) {
1232 if (!streamMap.isEmpty()) {
1233 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1234 final Map.Entry<Integer, H2Stream> entry = it.next();
1235 final H2Stream stream = entry.getValue();
1236 try {
1237 updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1238 } catch (final ArithmeticException ex) {
1239 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1240 }
1241 }
1242 }
1243 }
1244 }
1245
1246 private void applyLocalSettings() throws H2ConnectionException {
1247 hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1248 hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1249
1250 final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1251 initInputWinSize = localConfig.getInitialWindowSize();
1252
1253 if (delta != 0 && !streamMap.isEmpty()) {
1254 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1255 final Map.Entry<Integer, H2Stream> entry = it.next();
1256 final H2Stream stream = entry.getValue();
1257 try {
1258 updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1259 } catch (final ArithmeticException ex) {
1260 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1261 }
1262 }
1263 }
1264 lowMark = initInputWinSize / 2;
1265 }
1266
1267 @Override
1268 public void close() throws IOException {
1269 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1270 }
1271
1272 @Override
1273 public void close(final CloseMode closeMode) {
1274 ioSession.close(closeMode);
1275 }
1276
1277 @Override
1278 public boolean isOpen() {
1279 return connState == ConnectionHandshake.ACTIVE;
1280 }
1281
1282 @Override
1283 public void setSocketTimeout(final Timeout timeout) {
1284 ioSession.setSocketTimeout(timeout);
1285 }
1286
1287 @Override
1288 public SSLSession getSSLSession() {
1289 final TlsDetails tlsDetails = ioSession.getTlsDetails();
1290 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1291 }
1292
1293 @Override
1294 public EndpointDetails getEndpointDetails() {
1295 if (endpointDetails == null) {
1296 endpointDetails = new BasicEndpointDetails(
1297 ioSession.getRemoteAddress(),
1298 ioSession.getLocalAddress(),
1299 connMetrics,
1300 ioSession.getSocketTimeout());
1301 }
1302 return endpointDetails;
1303 }
1304
1305 @Override
1306 public Timeout getSocketTimeout() {
1307 return ioSession.getSocketTimeout();
1308 }
1309
1310 @Override
1311 public ProtocolVersion getProtocolVersion() {
1312 return HttpVersion.HTTP_2;
1313 }
1314
1315 @Override
1316 public SocketAddress getRemoteAddress() {
1317 return ioSession.getRemoteAddress();
1318 }
1319
1320 @Override
1321 public SocketAddress getLocalAddress() {
1322 return ioSession.getLocalAddress();
1323 }
1324
1325 void appendState(final StringBuilder buf) {
1326 buf.append("connState=").append(connState)
1327 .append(", connInputWindow=").append(connInputWindow)
1328 .append(", connOutputWindow=").append(connOutputWindow)
1329 .append(", outputQueue=").append(outputQueue.size())
1330 .append(", streamMap=").append(streamMap.size())
1331 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1332 }
1333
1334 private static class Continuation {
1335
1336 final int streamId;
1337 final int type;
1338 final boolean endStream;
1339 final ByteArrayBuffer headerBuffer;
1340
1341 private Continuation(final int streamId, final int type, final boolean endStream) {
1342 this.streamId = streamId;
1343 this.type = type;
1344 this.endStream = endStream;
1345 this.headerBuffer = new ByteArrayBuffer(1024);
1346 }
1347
1348 void copyPayload(final ByteBuffer payload) {
1349 if (payload == null) {
1350 return;
1351 }
1352 headerBuffer.ensureCapacity(payload.remaining());
1353 payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1354 }
1355
1356 ByteBuffer getContent() {
1357 return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1358 }
1359
1360 }
1361
1362 private class H2StreamChannelImpl implements H2StreamChannel {
1363
1364 private final int id;
1365 private final AtomicInteger inputWindow;
1366 private final AtomicInteger outputWindow;
1367
1368 private volatile boolean idle;
1369 private volatile boolean remoteEndStream;
1370 private volatile boolean localEndStream;
1371
1372 private volatile long deadline;
1373
1374 H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1375 this.id = id;
1376 this.idle = idle;
1377 this.inputWindow = new AtomicInteger(initialInputWindowSize);
1378 this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1379 }
1380
1381 int getId() {
1382 return id;
1383 }
1384
1385 AtomicInteger getOutputWindow() {
1386 return outputWindow;
1387 }
1388
1389 AtomicInteger getInputWindow() {
1390 return inputWindow;
1391 }
1392
1393 @Override
1394 public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1395 ioSession.getLock().lock();
1396 try {
1397 if (headers == null || headers.isEmpty()) {
1398 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1399 }
1400 if (localEndStream) {
1401 return;
1402 }
1403 idle = false;
1404 commitHeaders(id, headers, endStream);
1405 if (endStream) {
1406 localEndStream = true;
1407 }
1408 } finally {
1409 ioSession.getLock().unlock();
1410 }
1411 }
1412
1413 @Override
1414 public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1415 acceptPushRequest();
1416 final int promisedStreamId = generateStreamId();
1417 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1418 promisedStreamId,
1419 true,
1420 localConfig.getInitialWindowSize(),
1421 remoteConfig.getInitialWindowSize());
1422 final HttpCoreContext context = HttpCoreContext.create();
1423 context.setSSLSession(getSSLSession());
1424 context.setEndpointDetails(getEndpointDetails());
1425 final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1426 channel, httpProcessor, connMetrics, pushProducer, context);
1427 final H2Stream stream = new H2Stream(channel, streamHandler, false);
1428 streamMap.put(promisedStreamId, stream);
1429
1430 ioSession.getLock().lock();
1431 try {
1432 if (localEndStream) {
1433 stream.releaseResources();
1434 return;
1435 }
1436 commitPushPromise(id, promisedStreamId, headers);
1437 idle = false;
1438 } finally {
1439 ioSession.getLock().unlock();
1440 }
1441 }
1442
1443 @Override
1444 public void update(final int increment) throws IOException {
1445 if (remoteEndStream) {
1446 return;
1447 }
1448 incrementInputCapacity(0, connInputWindow, increment);
1449 incrementInputCapacity(id, inputWindow, increment);
1450 }
1451
1452 @Override
1453 public int write(final ByteBuffer payload) throws IOException {
1454 ioSession.getLock().lock();
1455 try {
1456 if (localEndStream) {
1457 return 0;
1458 }
1459 return streamData(id, outputWindow, payload);
1460 } finally {
1461 ioSession.getLock().unlock();
1462 }
1463 }
1464
1465 @Override
1466 public void endStream(final List<? extends Header> trailers) throws IOException {
1467 ioSession.getLock().lock();
1468 try {
1469 if (localEndStream) {
1470 return;
1471 }
1472 localEndStream = true;
1473 if (trailers != null && !trailers.isEmpty()) {
1474 commitHeaders(id, trailers, true);
1475 } else {
1476 final RawFrame frame = frameFactory.createData(id, null, true);
1477 commitFrameInternal(frame);
1478 }
1479 } finally {
1480 ioSession.getLock().unlock();
1481 }
1482 }
1483
1484 @Override
1485 public void endStream() throws IOException {
1486 endStream(null);
1487 }
1488
1489 @Override
1490 public void requestOutput() {
1491 requestSessionOutput();
1492 }
1493
1494 boolean isRemoteClosed() {
1495 return remoteEndStream;
1496 }
1497
1498 void setRemoteEndStream() {
1499 remoteEndStream = true;
1500 }
1501
1502 boolean isLocalClosed() {
1503 return localEndStream;
1504 }
1505
1506 void setLocalEndStream() {
1507 localEndStream = true;
1508 }
1509
1510 boolean isLocalReset() {
1511 return deadline > 0;
1512 }
1513
1514 boolean isResetDeadline() {
1515 final long l = deadline;
1516 return l > 0 && l < System.currentTimeMillis();
1517 }
1518
1519 boolean localReset(final int code) throws IOException {
1520 ioSession.getLock().lock();
1521 try {
1522 if (localEndStream) {
1523 return false;
1524 }
1525 localEndStream = true;
1526 deadline = System.currentTimeMillis() + LINGER_TIME;
1527 if (!idle) {
1528 final RawFrame resetStream = frameFactory.createResetStream(id, code);
1529 commitFrameInternal(resetStream);
1530 return true;
1531 }
1532 return false;
1533 } finally {
1534 ioSession.getLock().unlock();
1535 }
1536 }
1537
1538 boolean localReset(final H2Error error) throws IOException {
1539 return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1540 }
1541
1542 @Override
1543 public boolean cancel() {
1544 try {
1545 return localReset(H2Error.CANCEL);
1546 } catch (final IOException ignore) {
1547 return false;
1548 }
1549 }
1550
1551 void appendState(final StringBuilder buf) {
1552 buf.append("id=").append(id)
1553 .append(", connState=").append(connState)
1554 .append(", inputWindow=").append(inputWindow)
1555 .append(", outputWindow=").append(outputWindow)
1556 .append(", localEndStream=").append(localEndStream)
1557 .append(", idle=").append(idle);
1558 }
1559
1560 @Override
1561 public String toString() {
1562 final StringBuilder buf = new StringBuilder();
1563 buf.append("[");
1564 appendState(buf);
1565 buf.append("]");
1566 return buf.toString();
1567 }
1568
1569 }
1570
1571 static class H2Stream {
1572
1573 private final H2StreamChannelImpl channel;
1574 private final H2StreamHandler handler;
1575 private final boolean remoteInitiated;
1576
1577 private H2Stream(
1578 final H2StreamChannelImpl channel,
1579 final H2StreamHandler handler,
1580 final boolean remoteInitiated) {
1581 this.channel = channel;
1582 this.handler = handler;
1583 this.remoteInitiated = remoteInitiated;
1584 }
1585
1586 int getId() {
1587 return channel.getId();
1588 }
1589
1590 boolean isRemoteInitiated() {
1591 return remoteInitiated;
1592 }
1593
1594 AtomicInteger getOutputWindow() {
1595 return channel.getOutputWindow();
1596 }
1597
1598 AtomicInteger getInputWindow() {
1599 return channel.getInputWindow();
1600 }
1601
1602 boolean isTerminated() {
1603 return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1604 }
1605
1606 boolean isRemoteClosed() {
1607 return channel.isRemoteClosed();
1608 }
1609
1610 boolean isLocalClosed() {
1611 return channel.isLocalClosed();
1612 }
1613
1614 boolean isLocalReset() {
1615 return channel.isLocalReset();
1616 }
1617
1618 void setRemoteEndStream() {
1619 channel.setRemoteEndStream();
1620 }
1621
1622 void consumePromise(final List<Header> headers) throws HttpException, IOException {
1623 try {
1624 handler.consumePromise(headers);
1625 channel.setLocalEndStream();
1626 } catch (final ProtocolException ex) {
1627 localReset(ex, H2Error.PROTOCOL_ERROR);
1628 }
1629 }
1630
1631 void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1632 try {
1633 handler.consumeHeader(headers, channel.isRemoteClosed());
1634 } catch (final ProtocolException ex) {
1635 localReset(ex, H2Error.PROTOCOL_ERROR);
1636 }
1637 }
1638
1639 void consumeData(final ByteBuffer src) throws HttpException, IOException {
1640 try {
1641 handler.consumeData(src, channel.isRemoteClosed());
1642 } catch (final CharacterCodingException ex) {
1643 localReset(ex, H2Error.INTERNAL_ERROR);
1644 } catch (final ProtocolException ex) {
1645 localReset(ex, H2Error.PROTOCOL_ERROR);
1646 }
1647 }
1648
1649 boolean isOutputReady() {
1650 return handler.isOutputReady();
1651 }
1652
1653 void produceOutput() throws HttpException, IOException {
1654 try {
1655 handler.produceOutput();
1656 } catch (final ProtocolException ex) {
1657 localReset(ex, H2Error.PROTOCOL_ERROR);
1658 }
1659 }
1660
1661 void produceInputCapacityUpdate() throws IOException {
1662 handler.updateInputCapacity();
1663 }
1664
1665 void reset(final Exception cause) {
1666 channel.setRemoteEndStream();
1667 channel.setLocalEndStream();
1668 handler.failed(cause);
1669 }
1670
1671 void localReset(final Exception cause, final int code) throws IOException {
1672 channel.localReset(code);
1673 handler.failed(cause);
1674 }
1675
1676 void localReset(final Exception cause, final H2Error error) throws IOException {
1677 localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1678 }
1679
1680 void localReset(final H2StreamResetException ex) throws IOException {
1681 localReset(ex, ex.getCode());
1682 }
1683
1684 void handle(final HttpException ex) throws IOException, HttpException {
1685 handler.handle(ex, channel.isRemoteClosed());
1686 }
1687
1688 HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1689 return handler.getPushHandlerFactory();
1690 }
1691
1692 void cancel() {
1693 reset(new RequestNotExecutedException());
1694 }
1695
1696 boolean abort() {
1697 final boolean cancelled = channel.cancel();
1698 handler.failed(new RequestNotExecutedException());
1699 return cancelled;
1700 }
1701
1702 void releaseResources() {
1703 handler.releaseResources();
1704 }
1705
1706 void appendState(final StringBuilder buf) {
1707 buf.append("channel=[");
1708 channel.appendState(buf);
1709 buf.append("]");
1710 }
1711
1712 @Override
1713 public String toString() {
1714 final StringBuilder buf = new StringBuilder();
1715 buf.append("[");
1716 appendState(buf);
1717 buf.append("]");
1718 return buf.toString();
1719 }
1720
1721 }
1722
1723 }