1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.net.SocketAddress;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.SelectionKey;
33 import java.nio.charset.CharacterCodingException;
34 import java.util.Deque;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Queue;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentLinkedDeque;
41 import java.util.concurrent.ConcurrentLinkedQueue;
42 import java.util.concurrent.atomic.AtomicInteger;
43
44 import javax.net.ssl.SSLSession;
45
46 import org.apache.hc.core5.concurrent.CancellableDependency;
47 import org.apache.hc.core5.http.ConnectionClosedException;
48 import org.apache.hc.core5.http.EndpointDetails;
49 import org.apache.hc.core5.http.Header;
50 import org.apache.hc.core5.http.HttpConnection;
51 import org.apache.hc.core5.http.HttpException;
52 import org.apache.hc.core5.http.HttpStreamResetException;
53 import org.apache.hc.core5.http.HttpVersion;
54 import org.apache.hc.core5.http.ProtocolException;
55 import org.apache.hc.core5.http.ProtocolVersion;
56 import org.apache.hc.core5.http.RequestNotExecutedException;
57 import org.apache.hc.core5.http.config.CharCodingConfig;
58 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
59 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
60 import org.apache.hc.core5.http.impl.CharCodingSupport;
61 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
62 import org.apache.hc.core5.http.nio.AsyncPushProducer;
63 import org.apache.hc.core5.http.nio.HandlerFactory;
64 import org.apache.hc.core5.http.nio.command.ExecutableCommand;
65 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
66 import org.apache.hc.core5.http.protocol.HttpCoreContext;
67 import org.apache.hc.core5.http.protocol.HttpProcessor;
68 import org.apache.hc.core5.http2.H2ConnectionException;
69 import org.apache.hc.core5.http2.H2Error;
70 import org.apache.hc.core5.http2.H2StreamResetException;
71 import org.apache.hc.core5.http2.config.H2Config;
72 import org.apache.hc.core5.http2.config.H2Param;
73 import org.apache.hc.core5.http2.config.H2Setting;
74 import org.apache.hc.core5.http2.frame.FrameFactory;
75 import org.apache.hc.core5.http2.frame.FrameFlag;
76 import org.apache.hc.core5.http2.frame.FrameType;
77 import org.apache.hc.core5.http2.frame.RawFrame;
78 import org.apache.hc.core5.http2.frame.StreamIdGenerator;
79 import org.apache.hc.core5.http2.hpack.HPackDecoder;
80 import org.apache.hc.core5.http2.hpack.HPackEncoder;
81 import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
82 import org.apache.hc.core5.http2.nio.AsyncPingHandler;
83 import org.apache.hc.core5.http2.nio.command.PingCommand;
84 import org.apache.hc.core5.io.CloseMode;
85 import org.apache.hc.core5.reactor.Command;
86 import org.apache.hc.core5.reactor.ProtocolIOSession;
87 import org.apache.hc.core5.reactor.ssl.TlsDetails;
88 import org.apache.hc.core5.util.Args;
89 import org.apache.hc.core5.util.ByteArrayBuffer;
90 import org.apache.hc.core5.util.Identifiable;
91 import org.apache.hc.core5.util.Timeout;
92
93 abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
94
95 private static final long LINGER_TIME = 1000;
96 private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024;
97
98 enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
99 enum SettingsHandshake { READY, TRANSMITTED, ACKED }
100
101 private final ProtocolIOSession ioSession;
102 private final FrameFactory frameFactory;
103 private final StreamIdGenerator idGenerator;
104 private final HttpProcessor httpProcessor;
105 private final H2Config localConfig;
106 private final BasicH2TransportMetrics inputMetrics;
107 private final BasicH2TransportMetrics outputMetrics;
108 private final BasicHttpConnectionMetrics connMetrics;
109 private final FrameInputBuffer inputBuffer;
110 private final FrameOutputBuffer outputBuffer;
111 private final Deque<RawFrame> outputQueue;
112 private final HPackEncoder hPackEncoder;
113 private final HPackDecoder hPackDecoder;
114 private final Map<Integer, H2Stream> streamMap;
115 private final Queue<AsyncPingHandler> pingHandlers;
116 private final AtomicInteger connInputWindow;
117 private final AtomicInteger connOutputWindow;
118 private final AtomicInteger outputRequests;
119 private final AtomicInteger lastStreamId;
120 private final H2StreamListener streamListener;
121
122 private ConnectionHandshake connState = ConnectionHandshake.READY;
123 private SettingsHandshake localSettingState = SettingsHandshake.READY;
124 private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
125
126 private int initInputWinSize;
127 private int initOutputWinSize;
128 private int lowMark;
129
130 private volatile H2Config remoteConfig;
131
132 private Continuation continuation;
133
134 private int processedRemoteStreamId;
135 private EndpointDetails endpointDetails;
136 private boolean goAwayReceived;
137
138 AbstractH2StreamMultiplexer(
139 final ProtocolIOSession ioSession,
140 final FrameFactory frameFactory,
141 final StreamIdGenerator idGenerator,
142 final HttpProcessor httpProcessor,
143 final CharCodingConfig charCodingConfig,
144 final H2Config h2Config,
145 final H2StreamListener streamListener) {
146 this.ioSession = Args.notNull(ioSession, "IO session");
147 this.frameFactory = Args.notNull(frameFactory, "Frame factory");
148 this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
149 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
150 this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
151 this.inputMetrics = new BasicH2TransportMetrics();
152 this.outputMetrics = new BasicH2TransportMetrics();
153 this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
154 this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
155 this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
156 this.outputQueue = new ConcurrentLinkedDeque<>();
157 this.pingHandlers = new ConcurrentLinkedQueue<>();
158 this.outputRequests = new AtomicInteger(0);
159 this.lastStreamId = new AtomicInteger(0);
160 this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
161 this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
162 this.streamMap = new ConcurrentHashMap<>();
163 this.remoteConfig = H2Config.INIT;
164 this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
165 this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
166
167 this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
168 this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
169
170 this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
171 this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
172 this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
173
174 this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
175 this.streamListener = streamListener;
176 }
177
178 @Override
179 public String getId() {
180 return ioSession.getId();
181 }
182
183 abstract void acceptHeaderFrame() throws H2ConnectionException;
184
185 abstract void acceptPushRequest() throws H2ConnectionException;
186
187 abstract void acceptPushFrame() throws H2ConnectionException;
188
189 abstract H2StreamHandler createRemotelyInitiatedStream(
190 H2StreamChannel channel,
191 HttpProcessor httpProcessor,
192 BasicHttpConnectionMetrics connMetrics,
193 HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
194
195 abstract H2StreamHandler createLocallyInitiatedStream(
196 ExecutableCommand command,
197 H2StreamChannel channel,
198 HttpProcessor httpProcessor,
199 BasicHttpConnectionMetrics connMetrics) throws IOException;
200
201 private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
202 for (;;) {
203 final int current = window.get();
204 long newValue = (long) current + delta;
205
206
207
208 if (newValue == 0x80000000L) {
209 newValue = Integer.MAX_VALUE;
210 }
211
212
213 if (Math.abs(newValue) > 0x7fffffffL) {
214 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
215 }
216 if (window.compareAndSet(current, (int) newValue)) {
217 return (int) newValue;
218 }
219 }
220 }
221
222 private int updateInputWindow(
223 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
224 final int newSize = updateWindow(window, delta);
225 if (streamListener != null) {
226 streamListener.onInputFlowControl(this, streamId, delta, newSize);
227 }
228 return newSize;
229 }
230
231 private int updateOutputWindow(
232 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
233 final int newSize = updateWindow(window, delta);
234 if (streamListener != null) {
235 streamListener.onOutputFlowControl(this, streamId, delta, newSize);
236 }
237 return newSize;
238 }
239
240 private void commitFrameInternal(final RawFrame frame) throws IOException {
241 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
242 if (streamListener != null) {
243 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
244 }
245 outputBuffer.write(frame, ioSession);
246 } else {
247 outputQueue.addLast(frame);
248 }
249 ioSession.setEvent(SelectionKey.OP_WRITE);
250 }
251
252 private void commitFrame(final RawFrame frame) throws IOException {
253 Args.notNull(frame, "Frame");
254 ioSession.getLock().lock();
255 try {
256 commitFrameInternal(frame);
257 } finally {
258 ioSession.getLock().unlock();
259 }
260 }
261
262 private void commitHeaders(
263 final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
264 if (streamListener != null) {
265 streamListener.onHeaderOutput(this, streamId, headers);
266 }
267 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
268 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
269
270 int off = 0;
271 int remaining = buf.length();
272 boolean continuation = false;
273
274 while (remaining > 0) {
275 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
276 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
277
278 remaining -= chunk;
279 off += chunk;
280
281 final boolean endHeaders = remaining == 0;
282 final RawFrame frame;
283 if (!continuation) {
284 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
285 continuation = true;
286 } else {
287 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
288 }
289 commitFrameInternal(frame);
290 }
291 }
292
293 private void commitPushPromise(
294 final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
295 if (headers == null || headers.isEmpty()) {
296 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
297 }
298 if (streamListener != null) {
299 streamListener.onHeaderOutput(this, streamId, headers);
300 }
301 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
302 buf.append((byte)(promisedStreamId >> 24));
303 buf.append((byte)(promisedStreamId >> 16));
304 buf.append((byte)(promisedStreamId >> 8));
305 buf.append((byte)(promisedStreamId));
306
307 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
308
309 int off = 0;
310 int remaining = buf.length();
311 boolean continuation = false;
312
313 while (remaining > 0) {
314 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
315 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
316
317 remaining -= chunk;
318 off += chunk;
319
320 final boolean endHeaders = remaining == 0;
321 final RawFrame frame;
322 if (!continuation) {
323 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
324 continuation = true;
325 } else {
326 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
327 }
328 commitFrameInternal(frame);
329 }
330 }
331
332 private void streamDataFrame(
333 final int streamId,
334 final AtomicInteger streamOutputWindow,
335 final ByteBuffer payload,
336 final int chunk) throws IOException {
337 final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
338 if (streamListener != null) {
339 streamListener.onFrameOutput(this, streamId, dataFrame);
340 }
341 updateOutputWindow(0, connOutputWindow, -chunk);
342 updateOutputWindow(streamId, streamOutputWindow, -chunk);
343 outputBuffer.write(dataFrame, ioSession);
344 }
345
346 private int streamData(
347 final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
348 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
349 final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
350 if (capacity <= 0) {
351 return 0;
352 }
353 final int maxPayloadSize = Math.min(capacity, remoteConfig.getMaxFrameSize());
354 final int chunk;
355 if (payload.remaining() <= maxPayloadSize) {
356 chunk = payload.remaining();
357 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
358 } else {
359 chunk = maxPayloadSize;
360 final int originalLimit = payload.limit();
361 try {
362 payload.limit(payload.position() + chunk);
363 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
364 } finally {
365 payload.limit(originalLimit);
366 }
367 }
368 payload.position(payload.position() + chunk);
369 ioSession.setEvent(SelectionKey.OP_WRITE);
370 return chunk;
371 }
372 return 0;
373 }
374
375 private void incrementInputCapacity(
376 final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
377 if (inputCapacity > 0) {
378 final int streamWinSize = inputWindow.get();
379 final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
380 final int chunk = Math.min(inputCapacity, remainingCapacity);
381 if (chunk != 0) {
382 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
383 commitFrame(windowUpdateFrame);
384 updateInputWindow(streamId, inputWindow, chunk);
385 }
386 }
387 }
388
389 private void requestSessionOutput() {
390 outputRequests.incrementAndGet();
391 ioSession.setEvent(SelectionKey.OP_WRITE);
392 }
393
394 private void updateLastStreamId(final int streamId) {
395 final int currentId = lastStreamId.get();
396 if (streamId > currentId) {
397 lastStreamId.compareAndSet(currentId, streamId);
398 }
399 }
400
401 private int generateStreamId() {
402 for (;;) {
403 final int currentId = lastStreamId.get();
404 final int newStreamId = idGenerator.generate(currentId);
405 if (lastStreamId.compareAndSet(currentId, newStreamId)) {
406 return newStreamId;
407 }
408 }
409 }
410
411 public final void onConnect() throws HttpException, IOException {
412 connState = ConnectionHandshake.ACTIVE;
413 final RawFrame settingsFrame = frameFactory.createSettings(
414 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
415 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
416 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
417 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
418 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
419 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
420
421 commitFrame(settingsFrame);
422 localSettingState = SettingsHandshake.TRANSMITTED;
423 maximizeConnWindow(connInputWindow.get());
424
425 if (streamListener != null) {
426 final int initInputWindow = connInputWindow.get();
427 streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
428 final int initOutputWindow = connOutputWindow.get();
429 streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
430 }
431 }
432
433 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
434 if (connState == ConnectionHandshake.SHUTDOWN) {
435 ioSession.clearEvent(SelectionKey.OP_READ);
436 } else {
437 for (;;) {
438 final RawFrame frame = inputBuffer.read(src, ioSession);
439 if (frame == null) {
440 break;
441 }
442 if (streamListener != null) {
443 streamListener.onFrameInput(this, frame.getStreamId(), frame);
444 }
445 consumeFrame(frame);
446 }
447 }
448 }
449
450 public final void onOutput() throws HttpException, IOException {
451 ioSession.getLock().lock();
452 try {
453 if (!outputBuffer.isEmpty()) {
454 outputBuffer.flush(ioSession);
455 }
456 while (outputBuffer.isEmpty()) {
457 final RawFrame frame = outputQueue.poll();
458 if (frame != null) {
459 if (streamListener != null) {
460 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
461 }
462 outputBuffer.write(frame, ioSession);
463 } else {
464 break;
465 }
466 }
467 } finally {
468 ioSession.getLock().unlock();
469 }
470
471 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
472
473 if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
474 produceOutput();
475 }
476 final int pendingOutputRequests = outputRequests.get();
477 boolean outputPending = false;
478 if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
479 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
480 final Map.Entry<Integer, H2Stream> entry = it.next();
481 final H2Stream stream = entry.getValue();
482 if (!stream.isLocalClosed()
483 && stream.getOutputWindow().get() > 0
484 && stream.isOutputReady()) {
485 outputPending = true;
486 break;
487 }
488 }
489 }
490 ioSession.getLock().lock();
491 try {
492 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
493 && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
494 ioSession.clearEvent(SelectionKey.OP_WRITE);
495 } else {
496 outputRequests.addAndGet(-pendingOutputRequests);
497 }
498 } finally {
499 ioSession.getLock().unlock();
500 }
501 }
502
503 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
504 processPendingCommands();
505 }
506 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
507 int liveStreams = 0;
508 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
509 final Map.Entry<Integer, H2Stream> entry = it.next();
510 final H2Stream stream = entry.getValue();
511 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
512 stream.releaseResources();
513 it.remove();
514 } else {
515 if (idGenerator.isSameSide(stream.getId()) || stream.getId() <= processedRemoteStreamId) {
516 liveStreams++;
517 }
518 }
519 }
520 if (liveStreams == 0) {
521 connState = ConnectionHandshake.SHUTDOWN;
522 }
523 }
524 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
525 if (!streamMap.isEmpty()) {
526 for (final H2Stream stream : streamMap.values()) {
527 stream.releaseResources();
528 }
529 streamMap.clear();
530 }
531 ioSession.getLock().lock();
532 try {
533 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
534 ioSession.close();
535 }
536 } finally {
537 ioSession.getLock().unlock();
538 }
539 }
540 }
541
542 public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
543 connState = ConnectionHandshake.SHUTDOWN;
544
545 final RawFrame goAway;
546 if (localSettingState != SettingsHandshake.ACKED) {
547 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
548 "Setting timeout (" + timeout + ")");
549 } else {
550 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
551 "Timeout due to inactivity (" + timeout + ")");
552 }
553 commitFrame(goAway);
554 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
555 final Map.Entry<Integer, H2Stream> entry = it.next();
556 final H2Stream stream = entry.getValue();
557 stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
558 }
559 streamMap.clear();
560 }
561
562 public final void onDisconnect() {
563 for (;;) {
564 final AsyncPingHandler pingHandler = pingHandlers.poll();
565 if (pingHandler != null) {
566 pingHandler.cancel();
567 } else {
568 break;
569 }
570 }
571 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
572 final Map.Entry<Integer, H2Stream> entry = it.next();
573 final H2Stream stream = entry.getValue();
574 stream.cancel();
575 }
576 for (;;) {
577 final Command command = ioSession.poll();
578 if (command != null) {
579 if (command instanceof ExecutableCommand) {
580 ((ExecutableCommand) command).failed(new ConnectionClosedException());
581 } else {
582 command.cancel();
583 }
584 } else {
585 break;
586 }
587 }
588 }
589
590 private void processPendingCommands() throws IOException, HttpException {
591 while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
592 final Command command = ioSession.poll();
593 if (command == null) {
594 break;
595 }
596 if (command instanceof ShutdownCommand) {
597 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
598 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
599 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
600 final Map.Entry<Integer, H2Stream> entry = it.next();
601 final H2Stream stream = entry.getValue();
602 stream.cancel();
603 }
604 streamMap.clear();
605 connState = ConnectionHandshake.SHUTDOWN;
606 } else {
607 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
608 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
609 commitFrame(goAway);
610 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
611 }
612 }
613 break;
614 } else if (command instanceof PingCommand) {
615 final PingCommand pingCommand = (PingCommand) command;
616 final AsyncPingHandler handler = pingCommand.getHandler();
617 pingHandlers.add(handler);
618 final RawFrame ping = frameFactory.createPing(handler.getData());
619 commitFrame(ping);
620 } else if (command instanceof ExecutableCommand) {
621 final int streamId = generateStreamId();
622 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
623 streamId, true, initInputWinSize, initOutputWinSize);
624 final ExecutableCommand executableCommand = (ExecutableCommand) command;
625 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
626 executableCommand, channel, httpProcessor, connMetrics);
627
628 final H2Stream stream = new H2Stream(channel, streamHandler, false);
629 streamMap.put(streamId, stream);
630
631 if (streamListener != null) {
632 final int initInputWindow = stream.getInputWindow().get();
633 streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
634 final int initOutputWindow = stream.getOutputWindow().get();
635 streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
636 }
637
638 if (stream.isOutputReady()) {
639 stream.produceOutput();
640 }
641 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
642 if (cancellableDependency != null) {
643 cancellableDependency.setDependency(stream::abort);
644 }
645 if (!outputQueue.isEmpty()) {
646 return;
647 }
648 }
649 }
650 }
651
652 public final void onException(final Exception cause) {
653 try {
654 for (;;) {
655 final AsyncPingHandler pingHandler = pingHandlers.poll();
656 if (pingHandler != null) {
657 pingHandler.failed(cause);
658 } else {
659 break;
660 }
661 }
662 for (;;) {
663 final Command command = ioSession.poll();
664 if (command != null) {
665 if (command instanceof ExecutableCommand) {
666 ((ExecutableCommand) command).failed(new ConnectionClosedException());
667 } else {
668 command.cancel();
669 }
670 } else {
671 break;
672 }
673 }
674 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
675 final Map.Entry<Integer, H2Stream> entry = it.next();
676 final H2Stream stream = entry.getValue();
677 stream.reset(cause);
678 }
679 streamMap.clear();
680 if (!(cause instanceof ConnectionClosedException)) {
681 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
682 final H2Error errorCode;
683 if (cause instanceof H2ConnectionException) {
684 errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
685 } else if (cause instanceof ProtocolException){
686 errorCode = H2Error.PROTOCOL_ERROR;
687 } else {
688 errorCode = H2Error.INTERNAL_ERROR;
689 }
690 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
691 commitFrame(goAway);
692 }
693 }
694 } catch (final IOException ignore) {
695 } finally {
696 connState = ConnectionHandshake.SHUTDOWN;
697 final CloseMode closeMode;
698 if (cause instanceof ConnectionClosedException) {
699 closeMode = CloseMode.GRACEFUL;
700 } else if (cause instanceof IOException) {
701 closeMode = CloseMode.IMMEDIATE;
702 } else {
703 closeMode = CloseMode.GRACEFUL;
704 }
705 ioSession.close(closeMode);
706 }
707 }
708
709 private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
710 if (streamId == 0) {
711 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
712 }
713 final H2Stream stream = streamMap.get(streamId);
714 if (stream == null) {
715 if (streamId <= lastStreamId.get()) {
716 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
717 } else {
718 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
719 }
720 }
721 return stream;
722 }
723
724 private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
725 final FrameType frameType = FrameType.valueOf(frame.getType());
726 final int streamId = frame.getStreamId();
727 if (continuation != null && frameType != FrameType.CONTINUATION) {
728 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
729 }
730 switch (frameType) {
731 case DATA: {
732 final H2Stream stream = getValidStream(streamId);
733 try {
734 consumeDataFrame(frame, stream);
735 } catch (final H2StreamResetException ex) {
736 stream.localReset(ex);
737 } catch (final HttpStreamResetException ex) {
738 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
739 }
740
741 if (stream.isTerminated()) {
742 streamMap.remove(streamId);
743 stream.releaseResources();
744 requestSessionOutput();
745 }
746 }
747 break;
748 case HEADERS: {
749 if (streamId == 0) {
750 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
751 }
752 H2Stream stream = streamMap.get(streamId);
753 if (stream == null) {
754 acceptHeaderFrame();
755
756 if (idGenerator.isSameSide(streamId)) {
757 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
758 }
759 if (goAwayReceived ) {
760 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
761 }
762
763 updateLastStreamId(streamId);
764
765 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
766 streamId, false, initInputWinSize, initOutputWinSize);
767 final H2StreamHandler streamHandler;
768 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
769 streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null);
770 } else {
771 streamHandler = NoopH2StreamHandler.INSTANCE;
772 channel.setLocalEndStream();
773 }
774
775 stream = new H2Stream(channel, streamHandler, true);
776 if (stream.isOutputReady()) {
777 stream.produceOutput();
778 }
779 streamMap.put(streamId, stream);
780 }
781
782 try {
783 consumeHeaderFrame(frame, stream);
784
785 if (stream.isOutputReady()) {
786 stream.produceOutput();
787 }
788 } catch (final H2StreamResetException ex) {
789 stream.localReset(ex);
790 } catch (final HttpStreamResetException ex) {
791 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
792 } catch (final HttpException ex) {
793 stream.handle(ex);
794 }
795
796 if (stream.isTerminated()) {
797 streamMap.remove(streamId);
798 stream.releaseResources();
799 requestSessionOutput();
800 }
801 }
802 break;
803 case CONTINUATION: {
804 if (continuation == null) {
805 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
806 }
807 if (streamId != continuation.streamId) {
808 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
809 }
810
811 final H2Stream stream = getValidStream(streamId);
812 try {
813
814 consumeContinuationFrame(frame, stream);
815 } catch (final H2StreamResetException ex) {
816 stream.localReset(ex);
817 } catch (final HttpStreamResetException ex) {
818 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
819 }
820
821 if (stream.isTerminated()) {
822 streamMap.remove(streamId);
823 stream.releaseResources();
824 requestSessionOutput();
825 }
826 }
827 break;
828 case WINDOW_UPDATE: {
829 final ByteBuffer payload = frame.getPayload();
830 if (payload == null || payload.remaining() != 4) {
831 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
832 }
833 final int delta = payload.getInt();
834 if (delta <= 0) {
835 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
836 }
837 if (streamId == 0) {
838 try {
839 updateOutputWindow(0, connOutputWindow, delta);
840 } catch (final ArithmeticException ex) {
841 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
842 }
843 } else {
844 final H2Stream stream = streamMap.get(streamId);
845 if (stream != null) {
846 try {
847 updateOutputWindow(streamId, stream.getOutputWindow(), delta);
848 } catch (final ArithmeticException ex) {
849 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
850 }
851 }
852 }
853 ioSession.setEvent(SelectionKey.OP_WRITE);
854 }
855 break;
856 case RST_STREAM: {
857 if (streamId == 0) {
858 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
859 }
860 final H2Stream stream = streamMap.get(streamId);
861 if (stream == null) {
862 if (streamId > lastStreamId.get()) {
863 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
864 }
865 } else {
866 final ByteBuffer payload = frame.getPayload();
867 if (payload == null || payload.remaining() != 4) {
868 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
869 }
870 final int errorCode = payload.getInt();
871 stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
872 streamMap.remove(streamId);
873 stream.releaseResources();
874 requestSessionOutput();
875 }
876 }
877 break;
878 case PING: {
879 if (streamId != 0) {
880 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
881 }
882 final ByteBuffer ping = frame.getPayloadContent();
883 if (ping == null || ping.remaining() != 8) {
884 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
885 }
886 if (frame.isFlagSet(FrameFlag.ACK)) {
887 final AsyncPingHandler pingHandler = pingHandlers.poll();
888 if (pingHandler != null) {
889 pingHandler.consumeResponse(ping);
890 }
891 } else {
892 final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
893 pong.put(ping);
894 pong.flip();
895 final RawFrame response = frameFactory.createPingAck(pong);
896 commitFrame(response);
897 }
898 }
899 break;
900 case SETTINGS: {
901 if (streamId != 0) {
902 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
903 }
904 if (frame.isFlagSet(FrameFlag.ACK)) {
905 if (localSettingState == SettingsHandshake.TRANSMITTED) {
906 localSettingState = SettingsHandshake.ACKED;
907 ioSession.setEvent(SelectionKey.OP_WRITE);
908 applyLocalSettings();
909 }
910 } else {
911 final ByteBuffer payload = frame.getPayload();
912 if (payload != null) {
913 if ((payload.remaining() % 6) != 0) {
914 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
915 }
916 consumeSettingsFrame(payload);
917 remoteSettingState = SettingsHandshake.TRANSMITTED;
918 }
919
920 final RawFrame response = frameFactory.createSettingsAck();
921 commitFrame(response);
922 remoteSettingState = SettingsHandshake.ACKED;
923 }
924 }
925 break;
926 case PRIORITY:
927
928 break;
929 case PUSH_PROMISE: {
930 acceptPushFrame();
931
932 if (goAwayReceived ) {
933 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
934 }
935
936 if (!localConfig.isPushEnabled()) {
937 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
938 }
939
940 final H2Stream stream = getValidStream(streamId);
941 if (stream.isRemoteClosed()) {
942 stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
943 break;
944 }
945
946 final ByteBuffer payload = frame.getPayloadContent();
947 if (payload == null || payload.remaining() < 4) {
948 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
949 }
950 final int promisedStreamId = payload.getInt();
951 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
952 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
953 }
954 if (streamMap.get(promisedStreamId) != null) {
955 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
956 }
957
958 updateLastStreamId(promisedStreamId);
959
960 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
961 promisedStreamId, false, initInputWinSize, initOutputWinSize);
962 final H2StreamHandler streamHandler;
963 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
964 streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics,
965 stream.getPushHandlerFactory());
966 } else {
967 streamHandler = NoopH2StreamHandler.INSTANCE;
968 channel.setLocalEndStream();
969 }
970
971 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
972 streamMap.put(promisedStreamId, promisedStream);
973
974 try {
975 consumePushPromiseFrame(frame, payload, promisedStream);
976 } catch (final H2StreamResetException ex) {
977 promisedStream.localReset(ex);
978 } catch (final HttpStreamResetException ex) {
979 promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
980 }
981 }
982 break;
983 case GOAWAY: {
984 if (streamId != 0) {
985 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
986 }
987 final ByteBuffer payload = frame.getPayload();
988 if (payload == null || payload.remaining() < 8) {
989 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
990 }
991 final int processedLocalStreamId = payload.getInt();
992 final int errorCode = payload.getInt();
993 goAwayReceived = true;
994 if (errorCode == H2Error.NO_ERROR.getCode()) {
995 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
996 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
997 final Map.Entry<Integer, H2Stream> entry = it.next();
998 final int activeStreamId = entry.getKey();
999 if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
1000 final H2Stream stream = entry.getValue();
1001 stream.cancel();
1002 it.remove();
1003 }
1004 }
1005 }
1006 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
1007 } else {
1008 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1009 final Map.Entry<Integer, H2Stream> entry = it.next();
1010 final H2Stream stream = entry.getValue();
1011 stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
1012 }
1013 streamMap.clear();
1014 connState = ConnectionHandshake.SHUTDOWN;
1015 }
1016 }
1017 ioSession.setEvent(SelectionKey.OP_WRITE);
1018 break;
1019 }
1020 }
1021
1022 private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1023 final int streamId = stream.getId();
1024 final ByteBuffer payload = frame.getPayloadContent();
1025 if (payload != null) {
1026 final int frameLength = frame.getLength();
1027 final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1028 if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1029 stream.produceInputCapacityUpdate();
1030 }
1031 final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1032 if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1033 maximizeConnWindow(connWinSize);
1034 }
1035 }
1036 if (stream.isRemoteClosed()) {
1037 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1038 }
1039 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1040 stream.setRemoteEndStream();
1041 }
1042 if (stream.isLocalReset()) {
1043 return;
1044 }
1045 stream.consumeData(payload);
1046 }
1047
1048 private void maximizeConnWindow(final int connWinSize) throws IOException {
1049 final int delta = Integer.MAX_VALUE - connWinSize;
1050 if (delta > 0) {
1051 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1052 commitFrame(windowUpdateFrame);
1053 updateInputWindow(0, connInputWindow, delta);
1054 }
1055 }
1056
1057 private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1058 final int promisedStreamId = promisedStream.getId();
1059 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1060 continuation = new Continuation(promisedStreamId, frame.getType(), true);
1061 }
1062 if (continuation == null) {
1063 final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1064 if (promisedStreamId > processedRemoteStreamId) {
1065 processedRemoteStreamId = promisedStreamId;
1066 }
1067 if (streamListener != null) {
1068 streamListener.onHeaderInput(this, promisedStreamId, headers);
1069 }
1070 promisedStream.consumePromise(headers);
1071 } else {
1072 continuation.copyPayload(payload);
1073 }
1074 }
1075
1076 List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1077 return hPackDecoder.decodeHeaders(payload);
1078 }
1079
1080 private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1081 final int streamId = stream.getId();
1082 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1083 continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1084 }
1085 final ByteBuffer payload = frame.getPayloadContent();
1086 if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1087
1088 payload.getInt();
1089 payload.get();
1090 }
1091 if (continuation == null) {
1092 final List<Header> headers = decodeHeaders(payload);
1093 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1094 processedRemoteStreamId = streamId;
1095 }
1096 if (streamListener != null) {
1097 streamListener.onHeaderInput(this, streamId, headers);
1098 }
1099 if (stream.isRemoteClosed()) {
1100 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1101 }
1102 if (stream.isLocalReset()) {
1103 return;
1104 }
1105 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1106 stream.setRemoteEndStream();
1107 }
1108 stream.consumeHeader(headers);
1109 } else {
1110 continuation.copyPayload(payload);
1111 }
1112 }
1113
1114 private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1115 final int streamId = frame.getStreamId();
1116 final ByteBuffer payload = frame.getPayload();
1117 continuation.copyPayload(payload);
1118 if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1119 final List<Header> headers = decodeHeaders(continuation.getContent());
1120 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1121 processedRemoteStreamId = streamId;
1122 }
1123 if (streamListener != null) {
1124 streamListener.onHeaderInput(this, streamId, headers);
1125 }
1126 if (stream.isRemoteClosed()) {
1127 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1128 }
1129 if (stream.isLocalReset()) {
1130 return;
1131 }
1132 if (continuation.endStream) {
1133 stream.setRemoteEndStream();
1134 }
1135 if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1136 stream.consumePromise(headers);
1137 } else {
1138 stream.consumeHeader(headers);
1139 }
1140 continuation = null;
1141 }
1142 }
1143
1144 private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1145 final H2Config.Builder configBuilder = H2Config.initial();
1146 while (payload.hasRemaining()) {
1147 final int code = payload.getShort();
1148 final int value = payload.getInt();
1149 final H2Param param = H2Param.valueOf(code);
1150 if (param != null) {
1151 switch (param) {
1152 case HEADER_TABLE_SIZE:
1153 try {
1154 configBuilder.setHeaderTableSize(value);
1155 } catch (final IllegalArgumentException ex) {
1156 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1157 }
1158 break;
1159 case MAX_CONCURRENT_STREAMS:
1160 try {
1161 configBuilder.setMaxConcurrentStreams(value);
1162 } catch (final IllegalArgumentException ex) {
1163 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1164 }
1165 break;
1166 case ENABLE_PUSH:
1167 configBuilder.setPushEnabled(value == 1);
1168 break;
1169 case INITIAL_WINDOW_SIZE:
1170 try {
1171 configBuilder.setInitialWindowSize(value);
1172 } catch (final IllegalArgumentException ex) {
1173 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1174 }
1175 break;
1176 case MAX_FRAME_SIZE:
1177 try {
1178 configBuilder.setMaxFrameSize(value);
1179 } catch (final IllegalArgumentException ex) {
1180 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1181 }
1182 break;
1183 case MAX_HEADER_LIST_SIZE:
1184 try {
1185 configBuilder.setMaxHeaderListSize(value);
1186 } catch (final IllegalArgumentException ex) {
1187 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1188 }
1189 break;
1190 }
1191 }
1192 }
1193 applyRemoteSettings(configBuilder.build());
1194 }
1195
1196 private void produceOutput() throws HttpException, IOException {
1197 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1198 final Map.Entry<Integer, H2Stream> entry = it.next();
1199 final H2Stream stream = entry.getValue();
1200 if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1201 stream.produceOutput();
1202 }
1203 if (stream.isTerminated()) {
1204 it.remove();
1205 stream.releaseResources();
1206 requestSessionOutput();
1207 }
1208 if (!outputQueue.isEmpty()) {
1209 break;
1210 }
1211 }
1212 }
1213
1214 private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1215 remoteConfig = config;
1216
1217 hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1218 final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1219 initOutputWinSize = remoteConfig.getInitialWindowSize();
1220
1221 final int maxFrameSize = remoteConfig.getMaxFrameSize();
1222 if (maxFrameSize > localConfig.getMaxFrameSize()) {
1223 outputBuffer.expand(maxFrameSize);
1224 }
1225
1226 if (delta != 0) {
1227 if (!streamMap.isEmpty()) {
1228 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1229 final Map.Entry<Integer, H2Stream> entry = it.next();
1230 final H2Stream stream = entry.getValue();
1231 try {
1232 updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1233 } catch (final ArithmeticException ex) {
1234 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1235 }
1236 }
1237 }
1238 }
1239 }
1240
1241 private void applyLocalSettings() throws H2ConnectionException {
1242 hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1243 hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1244
1245 final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1246 initInputWinSize = localConfig.getInitialWindowSize();
1247
1248 if (delta != 0 && !streamMap.isEmpty()) {
1249 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1250 final Map.Entry<Integer, H2Stream> entry = it.next();
1251 final H2Stream stream = entry.getValue();
1252 try {
1253 updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1254 } catch (final ArithmeticException ex) {
1255 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1256 }
1257 }
1258 }
1259 lowMark = initInputWinSize / 2;
1260 }
1261
1262 @Override
1263 public void close() throws IOException {
1264 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1265 }
1266
1267 @Override
1268 public void close(final CloseMode closeMode) {
1269 ioSession.close(closeMode);
1270 }
1271
1272 @Override
1273 public boolean isOpen() {
1274 return connState == ConnectionHandshake.ACTIVE;
1275 }
1276
1277 @Override
1278 public void setSocketTimeout(final Timeout timeout) {
1279 ioSession.setSocketTimeout(timeout);
1280 }
1281
1282 @Override
1283 public SSLSession getSSLSession() {
1284 final TlsDetails tlsDetails = ioSession.getTlsDetails();
1285 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1286 }
1287
1288 @Override
1289 public EndpointDetails getEndpointDetails() {
1290 if (endpointDetails == null) {
1291 endpointDetails = new BasicEndpointDetails(
1292 ioSession.getRemoteAddress(),
1293 ioSession.getLocalAddress(),
1294 connMetrics,
1295 ioSession.getSocketTimeout());
1296 }
1297 return endpointDetails;
1298 }
1299
1300 @Override
1301 public Timeout getSocketTimeout() {
1302 return ioSession.getSocketTimeout();
1303 }
1304
1305 @Override
1306 public ProtocolVersion getProtocolVersion() {
1307 return HttpVersion.HTTP_2;
1308 }
1309
1310 @Override
1311 public SocketAddress getRemoteAddress() {
1312 return ioSession.getRemoteAddress();
1313 }
1314
1315 @Override
1316 public SocketAddress getLocalAddress() {
1317 return ioSession.getLocalAddress();
1318 }
1319
1320 void appendState(final StringBuilder buf) {
1321 buf.append("connState=").append(connState)
1322 .append(", connInputWindow=").append(connInputWindow)
1323 .append(", connOutputWindow=").append(connOutputWindow)
1324 .append(", outputQueue=").append(outputQueue.size())
1325 .append(", streamMap=").append(streamMap.size())
1326 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1327 }
1328
1329 private static class Continuation {
1330
1331 final int streamId;
1332 final int type;
1333 final boolean endStream;
1334 final ByteArrayBuffer headerBuffer;
1335
1336 private Continuation(final int streamId, final int type, final boolean endStream) {
1337 this.streamId = streamId;
1338 this.type = type;
1339 this.endStream = endStream;
1340 this.headerBuffer = new ByteArrayBuffer(1024);
1341 }
1342
1343 void copyPayload(final ByteBuffer payload) {
1344 if (payload == null) {
1345 return;
1346 }
1347 headerBuffer.ensureCapacity(payload.remaining());
1348 payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1349 }
1350
1351 ByteBuffer getContent() {
1352 return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1353 }
1354
1355 }
1356
1357 private class H2StreamChannelImpl implements H2StreamChannel {
1358
1359 private final int id;
1360 private final AtomicInteger inputWindow;
1361 private final AtomicInteger outputWindow;
1362
1363 private volatile boolean idle;
1364 private volatile boolean remoteEndStream;
1365 private volatile boolean localEndStream;
1366
1367 private volatile long deadline;
1368
1369 H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1370 this.id = id;
1371 this.idle = idle;
1372 this.inputWindow = new AtomicInteger(initialInputWindowSize);
1373 this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1374 }
1375
1376 int getId() {
1377 return id;
1378 }
1379
1380 AtomicInteger getOutputWindow() {
1381 return outputWindow;
1382 }
1383
1384 AtomicInteger getInputWindow() {
1385 return inputWindow;
1386 }
1387
1388 @Override
1389 public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1390 ioSession.getLock().lock();
1391 try {
1392 if (headers == null || headers.isEmpty()) {
1393 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1394 }
1395 if (localEndStream) {
1396 return;
1397 }
1398 idle = false;
1399 commitHeaders(id, headers, endStream);
1400 if (endStream) {
1401 localEndStream = true;
1402 }
1403 } finally {
1404 ioSession.getLock().unlock();
1405 }
1406 }
1407
1408 @Override
1409 public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1410 acceptPushRequest();
1411 final int promisedStreamId = generateStreamId();
1412 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1413 promisedStreamId,
1414 true,
1415 localConfig.getInitialWindowSize(),
1416 remoteConfig.getInitialWindowSize());
1417 final HttpCoreContext context = HttpCoreContext.create();
1418 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1419 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1420 final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1421 channel, httpProcessor, connMetrics, pushProducer, context);
1422 final H2Stream stream = new H2Stream(channel, streamHandler, false);
1423 streamMap.put(promisedStreamId, stream);
1424
1425 ioSession.getLock().lock();
1426 try {
1427 if (localEndStream) {
1428 stream.releaseResources();
1429 return;
1430 }
1431 commitPushPromise(id, promisedStreamId, headers);
1432 idle = false;
1433 } finally {
1434 ioSession.getLock().unlock();
1435 }
1436 }
1437
1438 @Override
1439 public void update(final int increment) throws IOException {
1440 if (remoteEndStream) {
1441 return;
1442 }
1443 incrementInputCapacity(0, connInputWindow, increment);
1444 incrementInputCapacity(id, inputWindow, increment);
1445 }
1446
1447 @Override
1448 public int write(final ByteBuffer payload) throws IOException {
1449 ioSession.getLock().lock();
1450 try {
1451 if (localEndStream) {
1452 return 0;
1453 }
1454 return streamData(id, outputWindow, payload);
1455 } finally {
1456 ioSession.getLock().unlock();
1457 }
1458 }
1459
1460 @Override
1461 public void endStream(final List<? extends Header> trailers) throws IOException {
1462 ioSession.getLock().lock();
1463 try {
1464 if (localEndStream) {
1465 return;
1466 }
1467 localEndStream = true;
1468 if (trailers != null && !trailers.isEmpty()) {
1469 commitHeaders(id, trailers, true);
1470 } else {
1471 final RawFrame frame = frameFactory.createData(id, null, true);
1472 commitFrameInternal(frame);
1473 }
1474 } finally {
1475 ioSession.getLock().unlock();
1476 }
1477 }
1478
1479 @Override
1480 public void endStream() throws IOException {
1481 endStream(null);
1482 }
1483
1484 @Override
1485 public void requestOutput() {
1486 requestSessionOutput();
1487 }
1488
1489 boolean isRemoteClosed() {
1490 return remoteEndStream;
1491 }
1492
1493 void setRemoteEndStream() {
1494 remoteEndStream = true;
1495 }
1496
1497 boolean isLocalClosed() {
1498 return localEndStream;
1499 }
1500
1501 void setLocalEndStream() {
1502 localEndStream = true;
1503 }
1504
1505 boolean isLocalReset() {
1506 return deadline > 0;
1507 }
1508
1509 boolean isResetDeadline() {
1510 final long l = deadline;
1511 return l > 0 && l < System.currentTimeMillis();
1512 }
1513
1514 boolean localReset(final int code) throws IOException {
1515 ioSession.getLock().lock();
1516 try {
1517 if (localEndStream) {
1518 return false;
1519 }
1520 localEndStream = true;
1521 deadline = System.currentTimeMillis() + LINGER_TIME;
1522 if (!idle) {
1523 final RawFrame resetStream = frameFactory.createResetStream(id, code);
1524 commitFrameInternal(resetStream);
1525 return true;
1526 }
1527 return false;
1528 } finally {
1529 ioSession.getLock().unlock();
1530 }
1531 }
1532
1533 boolean localReset(final H2Error error) throws IOException {
1534 return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1535 }
1536
1537 @Override
1538 public boolean cancel() {
1539 try {
1540 return localReset(H2Error.CANCEL);
1541 } catch (final IOException ignore) {
1542 return false;
1543 }
1544 }
1545
1546 void appendState(final StringBuilder buf) {
1547 buf.append("id=").append(id)
1548 .append(", connState=").append(connState)
1549 .append(", inputWindow=").append(inputWindow)
1550 .append(", outputWindow=").append(outputWindow)
1551 .append(", localEndStream=").append(localEndStream)
1552 .append(", idle=").append(idle);
1553 }
1554
1555 @Override
1556 public String toString() {
1557 final StringBuilder buf = new StringBuilder();
1558 buf.append("[");
1559 appendState(buf);
1560 buf.append("]");
1561 return buf.toString();
1562 }
1563
1564 }
1565
1566 static class H2Stream {
1567
1568 private final H2StreamChannelImpl channel;
1569 private final H2StreamHandler handler;
1570 private final boolean remoteInitiated;
1571
1572 private H2Stream(
1573 final H2StreamChannelImpl channel,
1574 final H2StreamHandler handler,
1575 final boolean remoteInitiated) {
1576 this.channel = channel;
1577 this.handler = handler;
1578 this.remoteInitiated = remoteInitiated;
1579 }
1580
1581 int getId() {
1582 return channel.getId();
1583 }
1584
1585 boolean isRemoteInitiated() {
1586 return remoteInitiated;
1587 }
1588
1589 AtomicInteger getOutputWindow() {
1590 return channel.getOutputWindow();
1591 }
1592
1593 AtomicInteger getInputWindow() {
1594 return channel.getInputWindow();
1595 }
1596
1597 boolean isTerminated() {
1598 return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1599 }
1600
1601 boolean isRemoteClosed() {
1602 return channel.isRemoteClosed();
1603 }
1604
1605 boolean isLocalClosed() {
1606 return channel.isLocalClosed();
1607 }
1608
1609 boolean isLocalReset() {
1610 return channel.isLocalReset();
1611 }
1612
1613 void setRemoteEndStream() {
1614 channel.setRemoteEndStream();
1615 }
1616
1617 void consumePromise(final List<Header> headers) throws HttpException, IOException {
1618 try {
1619 handler.consumePromise(headers);
1620 channel.setLocalEndStream();
1621 } catch (final ProtocolException ex) {
1622 localReset(ex, H2Error.PROTOCOL_ERROR);
1623 }
1624 }
1625
1626 void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1627 try {
1628 handler.consumeHeader(headers, channel.isRemoteClosed());
1629 } catch (final ProtocolException ex) {
1630 localReset(ex, H2Error.PROTOCOL_ERROR);
1631 }
1632 }
1633
1634 void consumeData(final ByteBuffer src) throws HttpException, IOException {
1635 try {
1636 handler.consumeData(src, channel.isRemoteClosed());
1637 } catch (final CharacterCodingException ex) {
1638 localReset(ex, H2Error.INTERNAL_ERROR);
1639 } catch (final ProtocolException ex) {
1640 localReset(ex, H2Error.PROTOCOL_ERROR);
1641 }
1642 }
1643
1644 boolean isOutputReady() {
1645 return handler.isOutputReady();
1646 }
1647
1648 void produceOutput() throws HttpException, IOException {
1649 try {
1650 handler.produceOutput();
1651 } catch (final ProtocolException ex) {
1652 localReset(ex, H2Error.PROTOCOL_ERROR);
1653 }
1654 }
1655
1656 void produceInputCapacityUpdate() throws IOException {
1657 handler.updateInputCapacity();
1658 }
1659
1660 void reset(final Exception cause) {
1661 channel.setRemoteEndStream();
1662 channel.setLocalEndStream();
1663 handler.failed(cause);
1664 }
1665
1666 void localReset(final Exception cause, final int code) throws IOException {
1667 channel.localReset(code);
1668 handler.failed(cause);
1669 }
1670
1671 void localReset(final Exception cause, final H2Error error) throws IOException {
1672 localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1673 }
1674
1675 void localReset(final H2StreamResetException ex) throws IOException {
1676 localReset(ex, ex.getCode());
1677 }
1678
1679 void handle(final HttpException ex) throws IOException, HttpException {
1680 handler.handle(ex, channel.isRemoteClosed());
1681 }
1682
1683 HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1684 return handler.getPushHandlerFactory();
1685 }
1686
1687 void cancel() {
1688 reset(new RequestNotExecutedException());
1689 }
1690
1691 boolean abort() {
1692 final boolean cancelled = channel.cancel();
1693 handler.failed(new RequestNotExecutedException());
1694 return cancelled;
1695 }
1696
1697 void releaseResources() {
1698 handler.releaseResources();
1699 }
1700
1701 void appendState(final StringBuilder buf) {
1702 buf.append("channel=[");
1703 channel.appendState(buf);
1704 buf.append("]");
1705 }
1706
1707 @Override
1708 public String toString() {
1709 final StringBuilder buf = new StringBuilder();
1710 buf.append("[");
1711 appendState(buf);
1712 buf.append("]");
1713 return buf.toString();
1714 }
1715
1716 }
1717
1718 }