1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import javax.net.ssl.SSLSession;
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.SelectionKey;
34 import java.nio.charset.CharacterCodingException;
35 import java.util.Deque;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Queue;
40 import java.util.concurrent.CancellationException;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentLinkedDeque;
43 import java.util.concurrent.ConcurrentLinkedQueue;
44 import java.util.concurrent.atomic.AtomicInteger;
45
46 import org.apache.hc.core5.concurrent.Cancellable;
47 import org.apache.hc.core5.concurrent.CancellableDependency;
48 import org.apache.hc.core5.http.ConnectionClosedException;
49 import org.apache.hc.core5.http.EndpointDetails;
50 import org.apache.hc.core5.http.Header;
51 import org.apache.hc.core5.http.HttpConnection;
52 import org.apache.hc.core5.http.HttpException;
53 import org.apache.hc.core5.http.HttpStreamResetException;
54 import org.apache.hc.core5.http.HttpVersion;
55 import org.apache.hc.core5.http.ProtocolException;
56 import org.apache.hc.core5.http.ProtocolVersion;
57 import org.apache.hc.core5.http.config.CharCodingConfig;
58 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
59 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
60 import org.apache.hc.core5.http.impl.CharCodingSupport;
61 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
62 import org.apache.hc.core5.http.nio.AsyncPushProducer;
63 import org.apache.hc.core5.http.nio.HandlerFactory;
64 import org.apache.hc.core5.http.nio.command.ExecutableCommand;
65 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
66 import org.apache.hc.core5.http.protocol.HttpCoreContext;
67 import org.apache.hc.core5.http.protocol.HttpProcessor;
68 import org.apache.hc.core5.http2.H2ConnectionException;
69 import org.apache.hc.core5.http2.H2Error;
70 import org.apache.hc.core5.http2.H2StreamResetException;
71 import org.apache.hc.core5.http2.config.H2Config;
72 import org.apache.hc.core5.http2.config.H2Param;
73 import org.apache.hc.core5.http2.config.H2Setting;
74 import org.apache.hc.core5.http2.frame.FrameFactory;
75 import org.apache.hc.core5.http2.frame.FrameFlag;
76 import org.apache.hc.core5.http2.frame.FrameType;
77 import org.apache.hc.core5.http2.frame.RawFrame;
78 import org.apache.hc.core5.http2.frame.StreamIdGenerator;
79 import org.apache.hc.core5.http2.hpack.HPackDecoder;
80 import org.apache.hc.core5.http2.hpack.HPackEncoder;
81 import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
82 import org.apache.hc.core5.http2.nio.AsyncPingHandler;
83 import org.apache.hc.core5.http2.nio.command.PingCommand;
84 import org.apache.hc.core5.io.CloseMode;
85 import org.apache.hc.core5.reactor.Command;
86 import org.apache.hc.core5.reactor.ProtocolIOSession;
87 import org.apache.hc.core5.reactor.ssl.TlsDetails;
88 import org.apache.hc.core5.util.Args;
89 import org.apache.hc.core5.util.ByteArrayBuffer;
90 import org.apache.hc.core5.util.Identifiable;
91 import org.apache.hc.core5.util.Timeout;
92
93 abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
94
95 private static final long LINGER_TIME = 1000;
96 private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024;
97
98 enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
99 enum SettingsHandshake { READY, TRANSMITTED, ACKED }
100
101 private final ProtocolIOSession ioSession;
102 private final FrameFactory frameFactory;
103 private final StreamIdGenerator idGenerator;
104 private final HttpProcessor httpProcessor;
105 private final H2Config localConfig;
106 private final BasicH2TransportMetrics inputMetrics;
107 private final BasicH2TransportMetrics outputMetrics;
108 private final BasicHttpConnectionMetrics connMetrics;
109 private final FrameInputBuffer inputBuffer;
110 private final FrameOutputBuffer outputBuffer;
111 private final Deque<RawFrame> outputQueue;
112 private final HPackEncoder hPackEncoder;
113 private final HPackDecoder hPackDecoder;
114 private final Map<Integer, H2Stream> streamMap;
115 private final Queue<AsyncPingHandler> pingHandlers;
116 private final AtomicInteger connInputWindow;
117 private final AtomicInteger connOutputWindow;
118 private final AtomicInteger outputRequests;
119 private final AtomicInteger lastStreamId;
120 private final H2StreamListener streamListener;
121
122 private ConnectionHandshake connState = ConnectionHandshake.READY;
123 private SettingsHandshake localSettingState = SettingsHandshake.READY;
124 private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
125
126 private int initInputWinSize;
127 private int initOutputWinSize;
128 private int lowMark;
129
130 private volatile H2Config remoteConfig;
131
132 private Continuation continuation;
133
134 private int processedRemoteStreamId;
135 private EndpointDetails endpointDetails;
136
137 AbstractH2StreamMultiplexer(
138 final ProtocolIOSession ioSession,
139 final FrameFactory frameFactory,
140 final StreamIdGenerator idGenerator,
141 final HttpProcessor httpProcessor,
142 final CharCodingConfig charCodingConfig,
143 final H2Config h2Config,
144 final H2StreamListener streamListener) {
145 this.ioSession = Args.notNull(ioSession, "IO session");
146 this.frameFactory = Args.notNull(frameFactory, "Frame factory");
147 this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
148 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
149 this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
150 this.inputMetrics = new BasicH2TransportMetrics();
151 this.outputMetrics = new BasicH2TransportMetrics();
152 this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
153 this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
154 this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
155 this.outputQueue = new ConcurrentLinkedDeque<>();
156 this.pingHandlers = new ConcurrentLinkedQueue<>();
157 this.outputRequests = new AtomicInteger(0);
158 this.lastStreamId = new AtomicInteger(0);
159 this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
160 this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
161 this.streamMap = new ConcurrentHashMap<>();
162 this.remoteConfig = H2Config.INIT;
163 this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
164 this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
165
166 this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
167 this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
168
169 this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
170 this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
171 this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
172
173 this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
174 this.streamListener = streamListener;
175 }
176
177 @Override
178 public String getId() {
179 return ioSession.getId();
180 }
181
182 abstract void acceptHeaderFrame() throws H2ConnectionException;
183
184 abstract void acceptPushRequest() throws H2ConnectionException;
185
186 abstract void acceptPushFrame() throws H2ConnectionException;
187
188 abstract H2StreamHandler createRemotelyInitiatedStream(
189 H2StreamChannel channel,
190 HttpProcessor httpProcessor,
191 BasicHttpConnectionMetrics connMetrics,
192 HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
193
194 abstract H2StreamHandler createLocallyInitiatedStream(
195 ExecutableCommand command,
196 H2StreamChannel channel,
197 HttpProcessor httpProcessor,
198 BasicHttpConnectionMetrics connMetrics) throws IOException;
199
200 private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
201 for (;;) {
202 final int current = window.get();
203 long newValue = (long) current + delta;
204
205
206
207 if (newValue == 0x80000000L) {
208 newValue = Integer.MAX_VALUE;
209 }
210
211
212 if (Math.abs(newValue) > 0x7fffffffL) {
213 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
214 }
215 if (window.compareAndSet(current, (int) newValue)) {
216 return (int) newValue;
217 }
218 }
219 }
220
221 private int updateInputWindow(
222 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
223 final int newSize = updateWindow(window, delta);
224 if (streamListener != null) {
225 streamListener.onInputFlowControl(this, streamId, delta, newSize);
226 }
227 return newSize;
228 }
229
230 private int updateOutputWindow(
231 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
232 final int newSize = updateWindow(window, delta);
233 if (streamListener != null) {
234 streamListener.onOutputFlowControl(this, streamId, delta, newSize);
235 }
236 return newSize;
237 }
238
239 private void commitFrameInternal(final RawFrame frame) throws IOException {
240 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
241 if (streamListener != null) {
242 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
243 }
244 outputBuffer.write(frame, ioSession);
245 } else {
246 outputQueue.addLast(frame);
247 }
248 ioSession.setEvent(SelectionKey.OP_WRITE);
249 }
250
251 private void commitFrame(final RawFrame frame) throws IOException {
252 Args.notNull(frame, "Frame");
253 ioSession.getLock().lock();
254 try {
255 commitFrameInternal(frame);
256 } finally {
257 ioSession.getLock().unlock();
258 }
259 }
260
261 private void commitHeaders(
262 final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
263 if (streamListener != null) {
264 streamListener.onHeaderOutput(this, streamId, headers);
265 }
266 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
267 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
268
269 int off = 0;
270 int remaining = buf.length();
271 boolean continuation = false;
272
273 while (remaining > 0) {
274 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
275 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
276
277 remaining -= chunk;
278 off += chunk;
279
280 final boolean endHeaders = remaining == 0;
281 final RawFrame frame;
282 if (!continuation) {
283 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
284 continuation = true;
285 } else {
286 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
287 }
288 commitFrameInternal(frame);
289 }
290 }
291
292 private void commitPushPromise(
293 final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
294 if (headers == null || headers.isEmpty()) {
295 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
296 }
297 if (streamListener != null) {
298 streamListener.onHeaderOutput(this, streamId, headers);
299 }
300 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
301 buf.append((byte)(promisedStreamId >> 24));
302 buf.append((byte)(promisedStreamId >> 16));
303 buf.append((byte)(promisedStreamId >> 8));
304 buf.append((byte)(promisedStreamId));
305
306 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
307
308 int off = 0;
309 int remaining = buf.length();
310 boolean continuation = false;
311
312 while (remaining > 0) {
313 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
314 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
315
316 remaining -= chunk;
317 off += chunk;
318
319 final boolean endHeaders = remaining == 0;
320 final RawFrame frame;
321 if (!continuation) {
322 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
323 continuation = true;
324 } else {
325 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
326 }
327 commitFrameInternal(frame);
328 }
329 }
330
331 private void streamDataFrame(
332 final int streamId,
333 final AtomicInteger streamOutputWindow,
334 final ByteBuffer payload,
335 final int chunk) throws IOException {
336 final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
337 if (streamListener != null) {
338 streamListener.onFrameOutput(this, streamId, dataFrame);
339 }
340 updateOutputWindow(0, connOutputWindow, -chunk);
341 updateOutputWindow(streamId, streamOutputWindow, -chunk);
342 outputBuffer.write(dataFrame, ioSession);
343 }
344
345 private int streamData(
346 final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
347 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
348 final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
349 if (capacity <= 0) {
350 return 0;
351 }
352 final int maxPayloadSize = Math.min(capacity, remoteConfig.getMaxFrameSize());
353 final int chunk;
354 if (payload.remaining() <= maxPayloadSize) {
355 chunk = payload.remaining();
356 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
357 } else {
358 chunk = maxPayloadSize;
359 final int originalLimit = payload.limit();
360 try {
361 payload.limit(payload.position() + chunk);
362 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
363 } finally {
364 payload.limit(originalLimit);
365 }
366 }
367 payload.position(payload.position() + chunk);
368 ioSession.setEvent(SelectionKey.OP_WRITE);
369 return chunk;
370 }
371 return 0;
372 }
373
374 private void incrementInputCapacity(
375 final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
376 if (inputCapacity > 0) {
377 final int streamWinSize = inputWindow.get();
378 final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
379 final int chunk = Math.min(inputCapacity, remainingCapacity);
380 if (chunk != 0) {
381 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
382 commitFrame(windowUpdateFrame);
383 updateInputWindow(streamId, inputWindow, chunk);
384 }
385 }
386 }
387
388 private void requestSessionOutput() {
389 outputRequests.incrementAndGet();
390 ioSession.setEvent(SelectionKey.OP_WRITE);
391 }
392
393 private void updateLastStreamId(final int streamId) {
394 final int currentId = lastStreamId.get();
395 if (streamId > currentId) {
396 lastStreamId.compareAndSet(currentId, streamId);
397 }
398 }
399
400 private int generateStreamId() {
401 for (;;) {
402 final int currentId = lastStreamId.get();
403 final int newStreamId = idGenerator.generate(currentId);
404 if (lastStreamId.compareAndSet(currentId, newStreamId)) {
405 return newStreamId;
406 }
407 }
408 }
409
410 public final void onConnect() throws HttpException, IOException {
411 connState = ConnectionHandshake.ACTIVE;
412 final RawFrame settingsFrame = frameFactory.createSettings(
413 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
414 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
415 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
416 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
417 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
418 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
419
420 commitFrame(settingsFrame);
421 localSettingState = SettingsHandshake.TRANSMITTED;
422 maximizeConnWindow(connInputWindow.get());
423
424 if (streamListener != null) {
425 final int initInputWindow = connInputWindow.get();
426 streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
427 final int initOutputWindow = connOutputWindow.get();
428 streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
429 }
430 }
431
432 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
433 if (connState == ConnectionHandshake.SHUTDOWN) {
434 ioSession.clearEvent(SelectionKey.OP_READ);
435 } else {
436 if (src != null) {
437 inputBuffer.put(src);
438 }
439 RawFrame frame;
440 while ((frame = inputBuffer.read(ioSession)) != null) {
441 if (streamListener != null) {
442 streamListener.onFrameInput(this, frame.getStreamId(), frame);
443 }
444 consumeFrame(frame);
445 }
446 }
447 }
448
449 public final void onOutput() throws HttpException, IOException {
450 ioSession.getLock().lock();
451 try {
452 if (!outputBuffer.isEmpty()) {
453 outputBuffer.flush(ioSession);
454 }
455 while (outputBuffer.isEmpty()) {
456 final RawFrame frame = outputQueue.poll();
457 if (frame != null) {
458 if (streamListener != null) {
459 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
460 }
461 outputBuffer.write(frame, ioSession);
462 } else {
463 break;
464 }
465 }
466 } finally {
467 ioSession.getLock().unlock();
468 }
469
470 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
471
472 if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
473 produceOutput();
474 }
475 final int pendingOutputRequests = outputRequests.get();
476 boolean outputPending = false;
477 if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
478 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
479 final Map.Entry<Integer, H2Stream> entry = it.next();
480 final H2Stream stream = entry.getValue();
481 if (!stream.isLocalClosed()
482 && stream.getOutputWindow().get() > 0
483 && stream.isOutputReady()) {
484 outputPending = true;
485 break;
486 }
487 }
488 }
489 ioSession.getLock().lock();
490 try {
491 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
492 && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
493 ioSession.clearEvent(SelectionKey.OP_WRITE);
494 } else {
495 outputRequests.addAndGet(-pendingOutputRequests);
496 }
497 } finally {
498 ioSession.getLock().unlock();
499 }
500 }
501
502 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
503 processPendingCommands();
504 }
505 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
506 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
507 final Map.Entry<Integer, H2Stream> entry = it.next();
508 final H2Stream stream = entry.getValue();
509 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
510 stream.releaseResources();
511 it.remove();
512 }
513 }
514 if (streamMap.isEmpty()) {
515 connState = ConnectionHandshake.SHUTDOWN;
516 }
517 }
518 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
519 ioSession.getLock().lock();
520 try {
521 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
522 ioSession.close();
523 }
524 } finally {
525 ioSession.getLock().unlock();
526 }
527 }
528 }
529
530 public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
531 connState = ConnectionHandshake.SHUTDOWN;
532
533 final RawFrame goAway;
534 if (localSettingState != SettingsHandshake.ACKED) {
535 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
536 "Setting timeout (" + timeout + ")");
537 } else {
538 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
539 "Timeout due to inactivity (" + timeout + ")");
540 }
541 commitFrame(goAway);
542 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
543 final Map.Entry<Integer, H2Stream> entry = it.next();
544 final H2Stream stream = entry.getValue();
545 stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
546 }
547 streamMap.clear();
548 }
549
550 public final void onDisconnect() {
551 for (;;) {
552 final AsyncPingHandler pingHandler = pingHandlers.poll();
553 if (pingHandler != null) {
554 pingHandler.cancel();
555 } else {
556 break;
557 }
558 }
559 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
560 final Map.Entry<Integer, H2Stream> entry = it.next();
561 final H2Stream stream = entry.getValue();
562 stream.cancel();
563 }
564 for (;;) {
565 final Command command = ioSession.poll();
566 if (command != null) {
567 if (command instanceof ExecutableCommand) {
568 ((ExecutableCommand) command).failed(new ConnectionClosedException());
569 } else {
570 command.cancel();
571 }
572 } else {
573 break;
574 }
575 }
576 }
577
578 private void processPendingCommands() throws IOException, HttpException {
579 while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
580 final Command command = ioSession.poll();
581 if (command == null) {
582 break;
583 }
584 if (command instanceof ShutdownCommand) {
585 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
586 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
587 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
588 final Map.Entry<Integer, H2Stream> entry = it.next();
589 final H2Stream stream = entry.getValue();
590 stream.cancel();
591 }
592 streamMap.clear();
593 connState = ConnectionHandshake.SHUTDOWN;
594 } else {
595 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
596 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
597 commitFrame(goAway);
598 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
599 }
600 }
601 break;
602 } else if (command instanceof PingCommand) {
603 final PingCommand./../../../org/apache/hc/core5/http2/nio/command/PingCommand.html#PingCommand">PingCommand pingCommand = (PingCommand) command;
604 final AsyncPingHandler handler = pingCommand.getHandler();
605 pingHandlers.add(handler);
606 final RawFrame ping = frameFactory.createPing(handler.getData());
607 commitFrame(ping);
608 } else if (command instanceof ExecutableCommand) {
609 final int streamId = generateStreamId();
610 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
611 streamId, true, initInputWinSize, initOutputWinSize);
612 final ExecutableCommand executableCommand = (ExecutableCommand) command;
613 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
614 executableCommand, channel, httpProcessor, connMetrics);
615
616 final H2Stream stream = new H2Stream(channel, streamHandler, false);
617 streamMap.put(streamId, stream);
618
619 if (streamListener != null) {
620 final int initInputWindow = stream.getInputWindow().get();
621 streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
622 final int initOutputWindow = stream.getOutputWindow().get();
623 streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
624 }
625
626 if (stream.isOutputReady()) {
627 stream.produceOutput();
628 }
629 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
630 if (cancellableDependency != null) {
631 cancellableDependency.setDependency(new Cancellable() {
632
633 @Override
634 public boolean cancel() {
635 return stream.abort();
636 }
637
638 });
639 }
640 if (!outputQueue.isEmpty()) {
641 return;
642 }
643 }
644 }
645 }
646
647 public final void onException(final Exception cause) {
648 try {
649 for (;;) {
650 final AsyncPingHandler pingHandler = pingHandlers.poll();
651 if (pingHandler != null) {
652 pingHandler.failed(cause);
653 } else {
654 break;
655 }
656 }
657 for (;;) {
658 final Command command = ioSession.poll();
659 if (command != null) {
660 if (command instanceof ExecutableCommand) {
661 ((ExecutableCommand) command).failed(new ConnectionClosedException());
662 } else {
663 command.cancel();
664 }
665 } else {
666 break;
667 }
668 }
669 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
670 final Map.Entry<Integer, H2Stream> entry = it.next();
671 final H2Stream stream = entry.getValue();
672 stream.reset(cause);
673 }
674 streamMap.clear();
675 if (!(cause instanceof ConnectionClosedException)) {
676 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
677 final H2Error errorCode;
678 if (cause instanceof H2ConnectionException) {
679 errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
680 } else if (cause instanceof ProtocolException){
681 errorCode = H2Error.PROTOCOL_ERROR;
682 } else {
683 errorCode = H2Error.INTERNAL_ERROR;
684 }
685 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
686 commitFrame(goAway);
687 }
688 }
689 } catch (final IOException ignore) {
690 } finally {
691 connState = ConnectionHandshake.SHUTDOWN;
692 final CloseMode closeMode;
693 if (cause instanceof ConnectionClosedException) {
694 closeMode = CloseMode.GRACEFUL;
695 } else if (cause instanceof IOException) {
696 closeMode = CloseMode.IMMEDIATE;
697 } else {
698 closeMode = CloseMode.GRACEFUL;
699 }
700 ioSession.close(closeMode);
701 }
702 }
703
704 private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
705 if (streamId == 0) {
706 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
707 }
708 final H2Stream stream = streamMap.get(streamId);
709 if (stream == null) {
710 if (streamId <= lastStreamId.get()) {
711 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
712 } else {
713 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
714 }
715 }
716 return stream;
717 }
718
719 private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
720 final FrameType frameType = FrameType.valueOf(frame.getType());
721 final int streamId = frame.getStreamId();
722 if (continuation != null && frameType != FrameType.CONTINUATION) {
723 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
724 }
725 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) >= 0) {
726 if (streamId > processedRemoteStreamId && !idGenerator.isSameSide(streamId)) {
727
728 return;
729 }
730 }
731 switch (frameType) {
732 case DATA: {
733 final H2Stream stream = getValidStream(streamId);
734 try {
735 consumeDataFrame(frame, stream);
736 } catch (final H2StreamResetException ex) {
737 stream.localReset(ex);
738 } catch (final HttpStreamResetException ex) {
739 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
740 }
741
742 if (stream.isTerminated()) {
743 streamMap.remove(streamId);
744 stream.releaseResources();
745 }
746 }
747 break;
748 case HEADERS: {
749 if (streamId == 0) {
750 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
751 }
752 H2Stream stream = streamMap.get(streamId);
753 if (stream == null) {
754 acceptHeaderFrame();
755
756 updateLastStreamId(streamId);
757
758 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
759 streamId, false, initInputWinSize, initOutputWinSize);
760 final H2StreamHandler streamHandler = createRemotelyInitiatedStream(
761 channel, httpProcessor, connMetrics, null);
762 stream = new H2Stream(channel, streamHandler, true);
763 if (stream.isOutputReady()) {
764 stream.produceOutput();
765 }
766 streamMap.put(streamId, stream);
767 }
768
769 try {
770 consumeHeaderFrame(frame, stream);
771
772 if (stream.isOutputReady()) {
773 stream.produceOutput();
774 }
775 } catch (final H2StreamResetException ex) {
776 stream.localReset(ex);
777 } catch (final HttpStreamResetException ex) {
778 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
779 } catch (final HttpException ex) {
780 stream.handle(ex);
781 }
782
783 if (stream.isTerminated()) {
784 streamMap.remove(streamId);
785 stream.releaseResources();
786 }
787 }
788 break;
789 case CONTINUATION: {
790 if (continuation == null) {
791 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
792 }
793 if (streamId != continuation.streamId) {
794 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
795 }
796
797 final H2Stream stream = getValidStream(streamId);
798 try {
799
800 consumeContinuationFrame(frame, stream);
801 } catch (final H2StreamResetException ex) {
802 stream.localReset(ex);
803 } catch (final HttpStreamResetException ex) {
804 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
805 }
806
807 if (stream.isTerminated()) {
808 streamMap.remove(streamId);
809 stream.releaseResources();
810 }
811 }
812 break;
813 case WINDOW_UPDATE: {
814 final ByteBuffer payload = frame.getPayload();
815 if (payload == null || payload.remaining() != 4) {
816 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
817 }
818 final int delta = payload.getInt();
819 if (delta <= 0) {
820 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
821 }
822 if (streamId == 0) {
823 try {
824 updateOutputWindow(0, connOutputWindow, delta);
825 } catch (final ArithmeticException ex) {
826 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
827 }
828 } else {
829 final H2Stream stream = streamMap.get(streamId);
830 if (stream != null) {
831 try {
832 updateOutputWindow(streamId, stream.getOutputWindow(), delta);
833 } catch (final ArithmeticException ex) {
834 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
835 }
836 }
837 }
838 ioSession.setEvent(SelectionKey.OP_WRITE);
839 }
840 break;
841 case RST_STREAM: {
842 if (streamId == 0) {
843 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
844 }
845 final H2Stream stream = streamMap.get(streamId);
846 if (stream == null) {
847 if (streamId > lastStreamId.get()) {
848 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
849 }
850 } else {
851 final ByteBuffer payload = frame.getPayload();
852 if (payload == null || payload.remaining() != 4) {
853 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
854 }
855 final int errorCode = payload.getInt();
856 stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
857 streamMap.remove(streamId);
858 stream.releaseResources();
859 }
860 }
861 break;
862 case PING: {
863 if (streamId != 0) {
864 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
865 }
866 final ByteBuffer ping = frame.getPayloadContent();
867 if (ping == null || ping.remaining() != 8) {
868 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
869 }
870 if (frame.isFlagSet(FrameFlag.ACK)) {
871 final AsyncPingHandler pingHandler = pingHandlers.poll();
872 if (pingHandler != null) {
873 pingHandler.consumeResponse(ping);
874 }
875 } else {
876 final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
877 pong.put(ping);
878 pong.flip();
879 final RawFrame response = frameFactory.createPingAck(pong);
880 commitFrame(response);
881 }
882 }
883 break;
884 case SETTINGS: {
885 if (streamId != 0) {
886 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
887 }
888 if (frame.isFlagSet(FrameFlag.ACK)) {
889 if (localSettingState == SettingsHandshake.TRANSMITTED) {
890 localSettingState = SettingsHandshake.ACKED;
891 ioSession.setEvent(SelectionKey.OP_WRITE);
892 applyLocalSettings();
893 }
894 } else {
895 final ByteBuffer payload = frame.getPayload();
896 if (payload != null) {
897 if ((payload.remaining() % 6) != 0) {
898 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
899 }
900 consumeSettingsFrame(payload);
901 remoteSettingState = SettingsHandshake.TRANSMITTED;
902 }
903
904 final RawFrame response = frameFactory.createSettingsAck();
905 commitFrame(response);
906 remoteSettingState = SettingsHandshake.ACKED;
907 }
908 }
909 break;
910 case PRIORITY:
911
912 break;
913 case PUSH_PROMISE: {
914 acceptPushFrame();
915
916 if (!localConfig.isPushEnabled()) {
917 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
918 }
919
920 final H2Stream stream = getValidStream(streamId);
921 if (stream.isRemoteClosed()) {
922 stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
923 break;
924 }
925
926 final ByteBuffer payload = frame.getPayloadContent();
927 if (payload == null || payload.remaining() < 4) {
928 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
929 }
930 final int promisedStreamId = payload.getInt();
931 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
932 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
933 }
934 if (streamMap.get(promisedStreamId) != null) {
935 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
936 }
937
938 updateLastStreamId(promisedStreamId);
939
940 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
941 promisedStreamId, false, initInputWinSize, initOutputWinSize);
942 final H2StreamHandler streamHandler = createRemotelyInitiatedStream(
943 channel, httpProcessor, connMetrics, stream.getPushHandlerFactory());
944 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
945 streamMap.put(promisedStreamId, promisedStream);
946
947 try {
948 consumePushPromiseFrame(frame, payload, promisedStream);
949 } catch (final H2StreamResetException ex) {
950 promisedStream.localReset(ex);
951 } catch (final HttpStreamResetException ex) {
952 promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
953 }
954 }
955 break;
956 case GOAWAY: {
957 if (streamId != 0) {
958 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
959 }
960 final ByteBuffer payload = frame.getPayload();
961 if (payload == null || payload.remaining() < 8) {
962 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
963 }
964 final int processedLocalStreamId = payload.getInt();
965 final int errorCode = payload.getInt();
966 if (errorCode == H2Error.NO_ERROR.getCode()) {
967 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
968 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
969 final Map.Entry<Integer, H2Stream> entry = it.next();
970 final int activeStreamId = entry.getKey();
971 if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
972 final H2Stream stream = entry.getValue();
973 stream.cancel();
974 it.remove();
975 }
976 }
977 }
978 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
979 } else {
980 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
981 final Map.Entry<Integer, H2Stream> entry = it.next();
982 final H2Stream stream = entry.getValue();
983 stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
984 }
985 streamMap.clear();
986 connState = ConnectionHandshake.SHUTDOWN;
987 }
988 }
989 ioSession.setEvent(SelectionKey.OP_WRITE);
990 break;
991 }
992 }
993
994 private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
995 final int streamId = stream.getId();
996 final ByteBuffer payload = frame.getPayloadContent();
997 if (payload != null) {
998 final int frameLength = frame.getLength();
999 final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1000 if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1001 stream.produceInputCapacityUpdate();
1002 }
1003 final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1004 if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1005 maximizeConnWindow(connWinSize);
1006 }
1007 }
1008 if (stream.isRemoteClosed()) {
1009 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1010 }
1011 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1012 stream.setRemoteEndStream();
1013 }
1014 if (stream.isLocalReset()) {
1015 return;
1016 }
1017 stream.consumeData(payload);
1018 }
1019
1020 private void maximizeConnWindow(final int connWinSize) throws IOException {
1021 final int delta = Integer.MAX_VALUE - connWinSize;
1022 if (delta > 0) {
1023 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1024 commitFrame(windowUpdateFrame);
1025 updateInputWindow(0, connInputWindow, delta);
1026 }
1027 }
1028
1029 private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1030 final int promisedStreamId = promisedStream.getId();
1031 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1032 continuation = new Continuation(promisedStreamId, frame.getType(), true);
1033 }
1034 if (continuation == null) {
1035 final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1036 if (promisedStreamId > processedRemoteStreamId) {
1037 processedRemoteStreamId = promisedStreamId;
1038 }
1039 if (streamListener != null) {
1040 streamListener.onHeaderInput(this, promisedStreamId, headers);
1041 }
1042 if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1043 throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused");
1044 }
1045 promisedStream.consumePromise(headers);
1046 } else {
1047 continuation.copyPayload(payload);
1048 }
1049 }
1050
1051 List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1052 return hPackDecoder.decodeHeaders(payload);
1053 }
1054
1055 private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1056 final int streamId = stream.getId();
1057 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1058 continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1059 }
1060 final ByteBuffer payload = frame.getPayloadContent();
1061 if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1062
1063 payload.getInt();
1064 payload.get();
1065 }
1066 if (continuation == null) {
1067 final List<Header> headers = decodeHeaders(payload);
1068 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1069 processedRemoteStreamId = streamId;
1070 }
1071 if (streamListener != null) {
1072 streamListener.onHeaderInput(this, streamId, headers);
1073 }
1074 if (stream.isRemoteClosed()) {
1075 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1076 }
1077 if (stream.isLocalReset()) {
1078 return;
1079 }
1080 if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1081 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
1082 }
1083 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1084 stream.setRemoteEndStream();
1085 }
1086 stream.consumeHeader(headers);
1087 } else {
1088 continuation.copyPayload(payload);
1089 }
1090 }
1091
1092 private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1093 final int streamId = frame.getStreamId();
1094 final ByteBuffer payload = frame.getPayload();
1095 continuation.copyPayload(payload);
1096 if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1097 final List<Header> headers = decodeHeaders(continuation.getContent());
1098 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1099 processedRemoteStreamId = streamId;
1100 }
1101 if (streamListener != null) {
1102 streamListener.onHeaderInput(this, streamId, headers);
1103 }
1104 if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1105 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
1106 }
1107 if (stream.isRemoteClosed()) {
1108 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1109 }
1110 if (stream.isLocalReset()) {
1111 return;
1112 }
1113 if (continuation.endStream) {
1114 stream.setRemoteEndStream();
1115 }
1116 if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1117 stream.consumePromise(headers);
1118 } else {
1119 stream.consumeHeader(headers);
1120 }
1121 continuation = null;
1122 }
1123 }
1124
1125 private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1126 final H2Config.Builder configBuilder = H2Config.initial();
1127 while (payload.hasRemaining()) {
1128 final int code = payload.getShort();
1129 final int value = payload.getInt();
1130 final H2Param param = H2Param.valueOf(code);
1131 if (param != null) {
1132 switch (param) {
1133 case HEADER_TABLE_SIZE:
1134 try {
1135 configBuilder.setHeaderTableSize(value);
1136 } catch (final IllegalArgumentException ex) {
1137 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1138 }
1139 break;
1140 case MAX_CONCURRENT_STREAMS:
1141 try {
1142 configBuilder.setMaxConcurrentStreams(value);
1143 } catch (final IllegalArgumentException ex) {
1144 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1145 }
1146 break;
1147 case ENABLE_PUSH:
1148 configBuilder.setPushEnabled(value == 1);
1149 break;
1150 case INITIAL_WINDOW_SIZE:
1151 try {
1152 configBuilder.setInitialWindowSize(value);
1153 } catch (final IllegalArgumentException ex) {
1154 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1155 }
1156 break;
1157 case MAX_FRAME_SIZE:
1158 try {
1159 configBuilder.setMaxFrameSize(value);
1160 } catch (final IllegalArgumentException ex) {
1161 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1162 }
1163 break;
1164 case MAX_HEADER_LIST_SIZE:
1165 try {
1166 configBuilder.setMaxHeaderListSize(value);
1167 } catch (final IllegalArgumentException ex) {
1168 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1169 }
1170 break;
1171 }
1172 }
1173 }
1174 applyRemoteSettings(configBuilder.build());
1175 }
1176
1177 private void produceOutput() throws HttpException, IOException {
1178 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1179 final Map.Entry<Integer, H2Stream> entry = it.next();
1180 final H2Stream stream = entry.getValue();
1181 if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1182 stream.produceOutput();
1183 }
1184 if (stream.isTerminated()) {
1185 it.remove();
1186 stream.releaseResources();
1187 }
1188 if (!outputQueue.isEmpty()) {
1189 break;
1190 }
1191 }
1192 }
1193
1194 private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1195 remoteConfig = config;
1196
1197 hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1198 final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1199 initOutputWinSize = remoteConfig.getInitialWindowSize();
1200
1201 if (delta != 0) {
1202 updateOutputWindow(0, connOutputWindow, delta);
1203 if (!streamMap.isEmpty()) {
1204 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1205 final Map.Entry<Integer, H2Stream> entry = it.next();
1206 final H2Stream stream = entry.getValue();
1207 try {
1208 updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1209 } catch (final ArithmeticException ex) {
1210 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1211 }
1212 }
1213 }
1214 }
1215 }
1216
1217 private void applyLocalSettings() throws H2ConnectionException {
1218 hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1219 hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1220
1221 final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1222 initInputWinSize = localConfig.getInitialWindowSize();
1223
1224 if (delta != 0 && !streamMap.isEmpty()) {
1225 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1226 final Map.Entry<Integer, H2Stream> entry = it.next();
1227 final H2Stream stream = entry.getValue();
1228 try {
1229 updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1230 } catch (final ArithmeticException ex) {
1231 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1232 }
1233 }
1234 }
1235 lowMark = initInputWinSize / 2;
1236 }
1237
1238 @Override
1239 public void close() throws IOException {
1240 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1241 }
1242
1243 @Override
1244 public void close(final CloseMode closeMode) {
1245 ioSession.close(closeMode);
1246 }
1247
1248 @Override
1249 public boolean isOpen() {
1250 return connState == ConnectionHandshake.ACTIVE;
1251 }
1252
1253 @Override
1254 public void setSocketTimeout(final Timeout timeout) {
1255 ioSession.setSocketTimeout(timeout);
1256 }
1257
1258 @Override
1259 public SSLSession getSSLSession() {
1260 final TlsDetails tlsDetails = ioSession.getTlsDetails();
1261 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1262 }
1263
1264 @Override
1265 public EndpointDetails getEndpointDetails() {
1266 if (endpointDetails == null) {
1267 endpointDetails = new BasicEndpointDetails(
1268 ioSession.getRemoteAddress(),
1269 ioSession.getLocalAddress(),
1270 connMetrics,
1271 ioSession.getSocketTimeout());
1272 }
1273 return endpointDetails;
1274 }
1275
1276 @Override
1277 public Timeout getSocketTimeout() {
1278 return ioSession.getSocketTimeout();
1279 }
1280
1281 @Override
1282 public ProtocolVersion getProtocolVersion() {
1283 return HttpVersion.HTTP_2;
1284 }
1285
1286 @Override
1287 public SocketAddress getRemoteAddress() {
1288 return ioSession.getRemoteAddress();
1289 }
1290
1291 @Override
1292 public SocketAddress getLocalAddress() {
1293 return ioSession.getLocalAddress();
1294 }
1295
1296 void appendState(final StringBuilder buf) {
1297 buf.append("connState=").append(connState)
1298 .append(", connInputWindow=").append(connInputWindow)
1299 .append(", connOutputWindow=").append(connOutputWindow)
1300 .append(", outputQueue=").append(outputQueue.size())
1301 .append(", streamMap=").append(streamMap.size())
1302 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1303 }
1304
1305 private static class Continuation {
1306
1307 final int streamId;
1308 final int type;
1309 final boolean endStream;
1310 final ByteArrayBuffer headerBuffer;
1311
1312 private Continuation(final int streamId, final int type, final boolean endStream) {
1313 this.streamId = streamId;
1314 this.type = type;
1315 this.endStream = endStream;
1316 this.headerBuffer = new ByteArrayBuffer(1024);
1317 }
1318
1319 void copyPayload(final ByteBuffer payload) {
1320 if (payload == null) {
1321 return;
1322 }
1323 headerBuffer.ensureCapacity(payload.remaining());
1324 payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1325 }
1326
1327 ByteBuffer getContent() {
1328 return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1329 }
1330
1331 }
1332
1333 private class H2StreamChannelImpl implements H2StreamChannel {
1334
1335 private final int id;
1336 private final AtomicInteger inputWindow;
1337 private final AtomicInteger outputWindow;
1338
1339 private volatile boolean idle;
1340 private volatile boolean remoteEndStream;
1341 private volatile boolean localEndStream;
1342
1343 private volatile long deadline;
1344
1345 H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1346 this.id = id;
1347 this.idle = idle;
1348 this.inputWindow = new AtomicInteger(initialInputWindowSize);
1349 this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1350 }
1351
1352 int getId() {
1353 return id;
1354 }
1355
1356 AtomicInteger getOutputWindow() {
1357 return outputWindow;
1358 }
1359
1360 AtomicInteger getInputWindow() {
1361 return inputWindow;
1362 }
1363
1364 @Override
1365 public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1366 ioSession.getLock().lock();
1367 try {
1368 if (headers == null || headers.isEmpty()) {
1369 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1370 }
1371 if (localEndStream) {
1372 return;
1373 }
1374 idle = false;
1375 commitHeaders(id, headers, endStream);
1376 if (endStream) {
1377 localEndStream = true;
1378 }
1379 } finally {
1380 ioSession.getLock().unlock();
1381 }
1382 }
1383
1384 @Override
1385 public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1386 acceptPushRequest();
1387 final int promisedStreamId = generateStreamId();
1388 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1389 promisedStreamId,
1390 true,
1391 localConfig.getInitialWindowSize(),
1392 remoteConfig.getInitialWindowSize());
1393 final HttpCoreContext context = HttpCoreContext.create();
1394 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1395 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1396 final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1397 channel, httpProcessor, connMetrics, pushProducer, context);
1398 final H2Stream stream = new H2Stream(channel, streamHandler, false);
1399 streamMap.put(promisedStreamId, stream);
1400
1401 ioSession.getLock().lock();
1402 try {
1403 if (localEndStream) {
1404 stream.releaseResources();
1405 return;
1406 }
1407 commitPushPromise(id, promisedStreamId, headers);
1408 idle = false;
1409 } finally {
1410 ioSession.getLock().unlock();
1411 }
1412 }
1413
1414 @Override
1415 public void update(final int increment) throws IOException {
1416 if (remoteEndStream) {
1417 return;
1418 }
1419 incrementInputCapacity(0, connInputWindow, increment);
1420 incrementInputCapacity(id, inputWindow, increment);
1421 }
1422
1423 @Override
1424 public int write(final ByteBuffer payload) throws IOException {
1425 ioSession.getLock().lock();
1426 try {
1427 if (localEndStream) {
1428 return 0;
1429 }
1430 return streamData(id, outputWindow, payload);
1431 } finally {
1432 ioSession.getLock().unlock();
1433 }
1434 }
1435
1436 @Override
1437 public void endStream(final List<? extends Header> trailers) throws IOException {
1438 ioSession.getLock().lock();
1439 try {
1440 if (localEndStream) {
1441 return;
1442 }
1443 localEndStream = true;
1444 if (trailers != null && !trailers.isEmpty()) {
1445 commitHeaders(id, trailers, true);
1446 } else {
1447 final RawFrame frame = frameFactory.createData(id, null, true);
1448 commitFrameInternal(frame);
1449 }
1450 } finally {
1451 ioSession.getLock().unlock();
1452 }
1453 }
1454
1455 @Override
1456 public void endStream() throws IOException {
1457 endStream(null);
1458 }
1459
1460 @Override
1461 public void requestOutput() {
1462 requestSessionOutput();
1463 }
1464
1465 boolean isRemoteClosed() {
1466 return remoteEndStream;
1467 }
1468
1469 void setRemoteEndStream() {
1470 remoteEndStream = true;
1471 }
1472
1473 boolean isLocalClosed() {
1474 return localEndStream;
1475 }
1476
1477 void setLocalEndStream() {
1478 localEndStream = true;
1479 }
1480
1481 boolean isLocalReset() {
1482 return deadline > 0;
1483 }
1484
1485 boolean isResetDeadline() {
1486 final long l = deadline;
1487 return l > 0 && l < System.currentTimeMillis();
1488 }
1489
1490 boolean localReset(final int code) throws IOException {
1491 ioSession.getLock().lock();
1492 try {
1493 if (localEndStream) {
1494 return false;
1495 }
1496 localEndStream = true;
1497 deadline = System.currentTimeMillis() + LINGER_TIME;
1498 if (!idle) {
1499 final RawFrame resetStream = frameFactory.createResetStream(id, code);
1500 commitFrameInternal(resetStream);
1501 return true;
1502 }
1503 return false;
1504 } finally {
1505 ioSession.getLock().unlock();
1506 }
1507 }
1508
1509 boolean localReset(final H2Error error) throws IOException {
1510 return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1511 }
1512
1513 @Override
1514 public boolean cancel() {
1515 try {
1516 return localReset(H2Error.CANCEL);
1517 } catch (final IOException ignore) {
1518 return false;
1519 }
1520 }
1521
1522 void appendState(final StringBuilder buf) {
1523 buf.append("id=").append(id)
1524 .append(", connState=").append(connState)
1525 .append(", inputWindow=").append(inputWindow)
1526 .append(", outputWindow=").append(outputWindow)
1527 .append(", localEndStream=").append(localEndStream)
1528 .append(", idle=").append(idle);
1529 }
1530
1531 @Override
1532 public String toString() {
1533 final StringBuilder buf = new StringBuilder();
1534 buf.append("[");
1535 appendState(buf);
1536 buf.append("]");
1537 return buf.toString();
1538 }
1539
1540 }
1541
1542 static class H2Stream {
1543
1544 private final H2StreamChannelImpl channel;
1545 private final H2StreamHandler handler;
1546 private final boolean remoteInitiated;
1547
1548 private H2Stream(
1549 final H2StreamChannelImpl channel,
1550 final H2StreamHandler handler,
1551 final boolean remoteInitiated) {
1552 this.channel = channel;
1553 this.handler = handler;
1554 this.remoteInitiated = remoteInitiated;
1555 }
1556
1557 int getId() {
1558 return channel.getId();
1559 }
1560
1561 boolean isRemoteInitiated() {
1562 return remoteInitiated;
1563 }
1564
1565 AtomicInteger getOutputWindow() {
1566 return channel.getOutputWindow();
1567 }
1568
1569 AtomicInteger getInputWindow() {
1570 return channel.getInputWindow();
1571 }
1572
1573 boolean isTerminated() {
1574 return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1575 }
1576
1577 boolean isRemoteClosed() {
1578 return channel.isRemoteClosed();
1579 }
1580
1581 boolean isLocalClosed() {
1582 return channel.isLocalClosed();
1583 }
1584
1585 boolean isLocalReset() {
1586 return channel.isLocalReset();
1587 }
1588
1589 void setRemoteEndStream() {
1590 channel.setRemoteEndStream();
1591 }
1592
1593 void consumePromise(final List<Header> headers) throws HttpException, IOException {
1594 try {
1595 handler.consumePromise(headers);
1596 channel.setLocalEndStream();
1597 } catch (final ProtocolException ex) {
1598 localReset(ex, H2Error.PROTOCOL_ERROR);
1599 }
1600 }
1601
1602 void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1603 try {
1604 handler.consumeHeader(headers, channel.isRemoteClosed());
1605 } catch (final ProtocolException ex) {
1606 localReset(ex, H2Error.PROTOCOL_ERROR);
1607 }
1608 }
1609
1610 void consumeData(final ByteBuffer src) throws HttpException, IOException {
1611 try {
1612 handler.consumeData(src, channel.isRemoteClosed());
1613 } catch (final CharacterCodingException ex) {
1614 localReset(ex, H2Error.INTERNAL_ERROR);
1615 } catch (final ProtocolException ex) {
1616 localReset(ex, H2Error.PROTOCOL_ERROR);
1617 }
1618 }
1619
1620 boolean isOutputReady() {
1621 return handler.isOutputReady();
1622 }
1623
1624 void produceOutput() throws HttpException, IOException {
1625 try {
1626 handler.produceOutput();
1627 } catch (final ProtocolException ex) {
1628 localReset(ex, H2Error.PROTOCOL_ERROR);
1629 }
1630 }
1631
1632 void produceInputCapacityUpdate() throws IOException {
1633 handler.updateInputCapacity();
1634 }
1635
1636 void reset(final Exception cause) {
1637 channel.setRemoteEndStream();
1638 channel.setLocalEndStream();
1639 handler.failed(cause);
1640 }
1641
1642 void localReset(final Exception cause, final int code) throws IOException {
1643 channel.localReset(code);
1644 handler.failed(cause);
1645 }
1646
1647 void localReset(final Exception cause, final H2Error error) throws IOException {
1648 localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1649 }
1650
1651 void localReset(final H2StreamResetException ex) throws IOException {
1652 localReset(ex, ex.getCode());
1653 }
1654
1655 void handle(final HttpException ex) throws IOException, HttpException {
1656 handler.handle(ex, channel.isRemoteClosed());
1657 }
1658
1659 HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1660 return handler.getPushHandlerFactory();
1661 }
1662
1663 void cancel() {
1664 reset(new CancellationException("HTTP/2 message exchange cancelled"));
1665 }
1666
1667 boolean abort() {
1668 final boolean cancelled = channel.cancel();
1669 handler.failed(new CancellationException("HTTP/2 message exchange cancelled"));
1670 return cancelled;
1671 }
1672
1673 void releaseResources() {
1674 handler.releaseResources();
1675 channel.requestOutput();
1676 }
1677
1678 void appendState(final StringBuilder buf) {
1679 buf.append("channel=[");
1680 channel.appendState(buf);
1681 buf.append("]");
1682 }
1683
1684 @Override
1685 public String toString() {
1686 final StringBuilder buf = new StringBuilder();
1687 buf.append("[");
1688 appendState(buf);
1689 buf.append("]");
1690 return buf.toString();
1691 }
1692
1693 }
1694
1695 }