1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import javax.net.ssl.SSLSession;
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.SelectionKey;
34 import java.nio.charset.CharacterCodingException;
35 import java.util.Deque;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Queue;
40 import java.util.concurrent.CancellationException;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentLinkedDeque;
43 import java.util.concurrent.ConcurrentLinkedQueue;
44 import java.util.concurrent.atomic.AtomicInteger;
45
46 import org.apache.hc.core5.concurrent.Cancellable;
47 import org.apache.hc.core5.concurrent.CancellableDependency;
48 import org.apache.hc.core5.http.ConnectionClosedException;
49 import org.apache.hc.core5.http.EndpointDetails;
50 import org.apache.hc.core5.http.Header;
51 import org.apache.hc.core5.http.HttpConnection;
52 import org.apache.hc.core5.http.HttpException;
53 import org.apache.hc.core5.http.HttpStreamResetException;
54 import org.apache.hc.core5.http.HttpVersion;
55 import org.apache.hc.core5.http.ProtocolException;
56 import org.apache.hc.core5.http.ProtocolVersion;
57 import org.apache.hc.core5.http.config.CharCodingConfig;
58 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
59 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
60 import org.apache.hc.core5.http.impl.CharCodingSupport;
61 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
62 import org.apache.hc.core5.http.nio.AsyncPushProducer;
63 import org.apache.hc.core5.http.nio.HandlerFactory;
64 import org.apache.hc.core5.http.nio.command.ExecutableCommand;
65 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
66 import org.apache.hc.core5.http.protocol.HttpCoreContext;
67 import org.apache.hc.core5.http.protocol.HttpProcessor;
68 import org.apache.hc.core5.http2.H2ConnectionException;
69 import org.apache.hc.core5.http2.H2Error;
70 import org.apache.hc.core5.http2.H2StreamResetException;
71 import org.apache.hc.core5.http2.config.H2Config;
72 import org.apache.hc.core5.http2.config.H2Param;
73 import org.apache.hc.core5.http2.config.H2Setting;
74 import org.apache.hc.core5.http2.frame.FrameFactory;
75 import org.apache.hc.core5.http2.frame.FrameFlag;
76 import org.apache.hc.core5.http2.frame.FrameType;
77 import org.apache.hc.core5.http2.frame.RawFrame;
78 import org.apache.hc.core5.http2.frame.StreamIdGenerator;
79 import org.apache.hc.core5.http2.hpack.HPackDecoder;
80 import org.apache.hc.core5.http2.hpack.HPackEncoder;
81 import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
82 import org.apache.hc.core5.http2.nio.AsyncPingHandler;
83 import org.apache.hc.core5.http2.nio.command.PingCommand;
84 import org.apache.hc.core5.io.CloseMode;
85 import org.apache.hc.core5.reactor.Command;
86 import org.apache.hc.core5.reactor.ProtocolIOSession;
87 import org.apache.hc.core5.reactor.ssl.TlsDetails;
88 import org.apache.hc.core5.util.Args;
89 import org.apache.hc.core5.util.ByteArrayBuffer;
90 import org.apache.hc.core5.util.Identifiable;
91 import org.apache.hc.core5.util.Timeout;
92
93 abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {
94
95 private static final long LINGER_TIME = 1000;
96 private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024;
97
98 enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
99 enum SettingsHandshake { READY, TRANSMITTED, ACKED }
100
101 private final ProtocolIOSession ioSession;
102 private final FrameFactory frameFactory;
103 private final StreamIdGenerator idGenerator;
104 private final HttpProcessor httpProcessor;
105 private final H2Config localConfig;
106 private final BasicH2TransportMetrics inputMetrics;
107 private final BasicH2TransportMetrics outputMetrics;
108 private final BasicHttpConnectionMetrics connMetrics;
109 private final FrameInputBuffer inputBuffer;
110 private final FrameOutputBuffer outputBuffer;
111 private final Deque<RawFrame> outputQueue;
112 private final HPackEncoder hPackEncoder;
113 private final HPackDecoder hPackDecoder;
114 private final Map<Integer, H2Stream> streamMap;
115 private final Queue<AsyncPingHandler> pingHandlers;
116 private final AtomicInteger connInputWindow;
117 private final AtomicInteger connOutputWindow;
118 private final AtomicInteger outputRequests;
119 private final AtomicInteger lastStreamId;
120 private final H2StreamListener streamListener;
121
122 private ConnectionHandshake connState = ConnectionHandshake.READY;
123 private SettingsHandshake localSettingState = SettingsHandshake.READY;
124 private SettingsHandshake remoteSettingState = SettingsHandshake.READY;
125
126 private int initInputWinSize;
127 private int initOutputWinSize;
128 private int lowMark;
129
130 private volatile H2Config remoteConfig;
131
132 private Continuation continuation;
133
134 private int processedRemoteStreamId;
135 private EndpointDetails endpointDetails;
136
137 AbstractH2StreamMultiplexer(
138 final ProtocolIOSession ioSession,
139 final FrameFactory frameFactory,
140 final StreamIdGenerator idGenerator,
141 final HttpProcessor httpProcessor,
142 final CharCodingConfig charCodingConfig,
143 final H2Config h2Config,
144 final H2StreamListener streamListener) {
145 this.ioSession = Args.notNull(ioSession, "IO session");
146 this.frameFactory = Args.notNull(frameFactory, "Frame factory");
147 this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
148 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
149 this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
150 this.inputMetrics = new BasicH2TransportMetrics();
151 this.outputMetrics = new BasicH2TransportMetrics();
152 this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
153 this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
154 this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
155 this.outputQueue = new ConcurrentLinkedDeque<>();
156 this.pingHandlers = new ConcurrentLinkedQueue<>();
157 this.outputRequests = new AtomicInteger(0);
158 this.lastStreamId = new AtomicInteger(0);
159 this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
160 this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
161 this.streamMap = new ConcurrentHashMap<>();
162 this.remoteConfig = H2Config.INIT;
163 this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
164 this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
165
166 this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
167 this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();
168
169 this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
170 this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
171 this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());
172
173 this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
174 this.streamListener = streamListener;
175 }
176
177 @Override
178 public String getId() {
179 return ioSession.getId();
180 }
181
182 abstract void acceptHeaderFrame() throws H2ConnectionException;
183
184 abstract void acceptPushRequest() throws H2ConnectionException;
185
186 abstract void acceptPushFrame() throws H2ConnectionException;
187
188 abstract H2StreamHandler createRemotelyInitiatedStream(
189 H2StreamChannel channel,
190 HttpProcessor httpProcessor,
191 BasicHttpConnectionMetrics connMetrics,
192 HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;
193
194 abstract H2StreamHandler createLocallyInitiatedStream(
195 ExecutableCommand command,
196 H2StreamChannel channel,
197 HttpProcessor httpProcessor,
198 BasicHttpConnectionMetrics connMetrics) throws IOException;
199
200 private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
201 for (;;) {
202 final int current = window.get();
203 long newValue = (long) current + delta;
204
205
206
207 if (newValue == 0x80000000L) {
208 newValue = Integer.MAX_VALUE;
209 }
210
211
212 if (Math.abs(newValue) > 0x7fffffffL) {
213 throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
214 }
215 if (window.compareAndSet(current, (int) newValue)) {
216 return (int) newValue;
217 }
218 }
219 }
220
221 private int updateInputWindow(
222 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
223 final int newSize = updateWindow(window, delta);
224 if (streamListener != null) {
225 streamListener.onInputFlowControl(this, streamId, delta, newSize);
226 }
227 return newSize;
228 }
229
230 private int updateOutputWindow(
231 final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
232 final int newSize = updateWindow(window, delta);
233 if (streamListener != null) {
234 streamListener.onOutputFlowControl(this, streamId, delta, newSize);
235 }
236 return newSize;
237 }
238
239 private void commitFrameInternal(final RawFrame frame) throws IOException {
240 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
241 if (streamListener != null) {
242 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
243 }
244 outputBuffer.write(frame, ioSession);
245 } else {
246 outputQueue.addLast(frame);
247 }
248 ioSession.setEvent(SelectionKey.OP_WRITE);
249 }
250
251 private void commitFrame(final RawFrame frame) throws IOException {
252 Args.notNull(frame, "Frame");
253 ioSession.getLock().lock();
254 try {
255 commitFrameInternal(frame);
256 } finally {
257 ioSession.getLock().unlock();
258 }
259 }
260
261 private void commitHeaders(
262 final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
263 if (streamListener != null) {
264 streamListener.onHeaderOutput(this, streamId, headers);
265 }
266 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
267 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
268
269 int off = 0;
270 int remaining = buf.length();
271 boolean continuation = false;
272
273 while (remaining > 0) {
274 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
275 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
276
277 remaining -= chunk;
278 off += chunk;
279
280 final boolean endHeaders = remaining == 0;
281 final RawFrame frame;
282 if (!continuation) {
283 frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
284 continuation = true;
285 } else {
286 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
287 }
288 commitFrameInternal(frame);
289 }
290 }
291
292 private void commitPushPromise(
293 final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
294 if (headers == null || headers.isEmpty()) {
295 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
296 }
297 if (streamListener != null) {
298 streamListener.onHeaderOutput(this, streamId, headers);
299 }
300 final ByteArrayBuffer buf = new ByteArrayBuffer(512);
301 buf.append((byte)(promisedStreamId >> 24));
302 buf.append((byte)(promisedStreamId >> 16));
303 buf.append((byte)(promisedStreamId >> 8));
304 buf.append((byte)(promisedStreamId));
305
306 hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());
307
308 int off = 0;
309 int remaining = buf.length();
310 boolean continuation = false;
311
312 while (remaining > 0) {
313 final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
314 final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);
315
316 remaining -= chunk;
317 off += chunk;
318
319 final boolean endHeaders = remaining == 0;
320 final RawFrame frame;
321 if (!continuation) {
322 frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
323 continuation = true;
324 } else {
325 frame = frameFactory.createContinuation(streamId, payload, endHeaders);
326 }
327 commitFrameInternal(frame);
328 }
329 }
330
331 private void streamDataFrame(
332 final int streamId,
333 final AtomicInteger streamOutputWindow,
334 final ByteBuffer payload,
335 final int chunk) throws IOException {
336 final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
337 if (streamListener != null) {
338 streamListener.onFrameOutput(this, streamId, dataFrame);
339 }
340 updateOutputWindow(0, connOutputWindow, -chunk);
341 updateOutputWindow(streamId, streamOutputWindow, -chunk);
342 outputBuffer.write(dataFrame, ioSession);
343 }
344
345 private int streamData(
346 final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
347 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
348 final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
349 if (capacity <= 0) {
350 return 0;
351 }
352 final int maxPayloadSize = Math.min(capacity, remoteConfig.getMaxFrameSize());
353 final int chunk;
354 if (payload.remaining() <= maxPayloadSize) {
355 chunk = payload.remaining();
356 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
357 } else {
358 chunk = maxPayloadSize;
359 final int originalLimit = payload.limit();
360 try {
361 payload.limit(payload.position() + chunk);
362 streamDataFrame(streamId, streamOutputWindow, payload, chunk);
363 } finally {
364 payload.limit(originalLimit);
365 }
366 }
367 payload.position(payload.position() + chunk);
368 ioSession.setEvent(SelectionKey.OP_WRITE);
369 return chunk;
370 }
371 return 0;
372 }
373
374 private void incrementInputCapacity(
375 final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
376 if (inputCapacity > 0) {
377 final int streamWinSize = inputWindow.get();
378 final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
379 final int chunk = Math.min(inputCapacity, remainingCapacity);
380 if (chunk != 0) {
381 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
382 commitFrame(windowUpdateFrame);
383 updateInputWindow(streamId, inputWindow, chunk);
384 }
385 }
386 }
387
388 private void requestSessionOutput() {
389 outputRequests.incrementAndGet();
390 ioSession.setEvent(SelectionKey.OP_WRITE);
391 }
392
393 private void updateLastStreamId(final int streamId) {
394 final int currentId = lastStreamId.get();
395 if (streamId > currentId) {
396 lastStreamId.compareAndSet(currentId, streamId);
397 }
398 }
399
400 private int generateStreamId() {
401 for (;;) {
402 final int currentId = lastStreamId.get();
403 final int newStreamId = idGenerator.generate(currentId);
404 if (lastStreamId.compareAndSet(currentId, newStreamId)) {
405 return newStreamId;
406 }
407 }
408 }
409
410 public final void onConnect() throws HttpException, IOException {
411 connState = ConnectionHandshake.ACTIVE;
412 final RawFrame settingsFrame = frameFactory.createSettings(
413 new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
414 new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
415 new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
416 new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
417 new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
418 new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));
419
420 commitFrame(settingsFrame);
421 localSettingState = SettingsHandshake.TRANSMITTED;
422 maximizeConnWindow(connInputWindow.get());
423
424 if (streamListener != null) {
425 final int initInputWindow = connInputWindow.get();
426 streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
427 final int initOutputWindow = connOutputWindow.get();
428 streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
429 }
430 }
431
432 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
433 if (connState == ConnectionHandshake.SHUTDOWN) {
434 ioSession.clearEvent(SelectionKey.OP_READ);
435 } else {
436 if (src != null) {
437 inputBuffer.put(src);
438 }
439 RawFrame frame;
440 while ((frame = inputBuffer.read(ioSession)) != null) {
441 if (streamListener != null) {
442 streamListener.onFrameInput(this, frame.getStreamId(), frame);
443 }
444 consumeFrame(frame);
445 }
446 }
447 }
448
449 public final void onOutput() throws HttpException, IOException {
450 ioSession.getLock().lock();
451 try {
452 if (!outputBuffer.isEmpty()) {
453 outputBuffer.flush(ioSession);
454 }
455 while (outputBuffer.isEmpty()) {
456 final RawFrame frame = outputQueue.poll();
457 if (frame != null) {
458 if (streamListener != null) {
459 streamListener.onFrameOutput(this, frame.getStreamId(), frame);
460 }
461 outputBuffer.write(frame, ioSession);
462 } else {
463 break;
464 }
465 }
466 } finally {
467 ioSession.getLock().unlock();
468 }
469
470 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
471
472 if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
473 produceOutput();
474 }
475 final int pendingOutputRequests = outputRequests.get();
476 boolean outputPending = false;
477 if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
478 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
479 final Map.Entry<Integer, H2Stream> entry = it.next();
480 final H2Stream stream = entry.getValue();
481 if (!stream.isLocalClosed()
482 && stream.getOutputWindow().get() > 0
483 && stream.isOutputReady()) {
484 outputPending = true;
485 break;
486 }
487 }
488 }
489 ioSession.getLock().lock();
490 try {
491 if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
492 && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
493 ioSession.clearEvent(SelectionKey.OP_WRITE);
494 } else {
495 outputRequests.addAndGet(-pendingOutputRequests);
496 }
497 } finally {
498 ioSession.getLock().unlock();
499 }
500 }
501
502 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
503 processPendingCommands();
504 }
505 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
506 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
507 final Map.Entry<Integer, H2Stream> entry = it.next();
508 final H2Stream stream = entry.getValue();
509 if (stream.isLocalClosed() && stream.isRemoteClosed()) {
510 stream.releaseResources();
511 it.remove();
512 }
513 }
514 if (streamMap.isEmpty()) {
515 connState = ConnectionHandshake.SHUTDOWN;
516 }
517 }
518 if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
519 ioSession.getLock().lock();
520 try {
521 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
522 ioSession.close();
523 }
524 } finally {
525 ioSession.getLock().unlock();
526 }
527 }
528 }
529
530 public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
531 connState = ConnectionHandshake.SHUTDOWN;
532
533 final RawFrame goAway;
534 if (localSettingState != SettingsHandshake.ACKED) {
535 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
536 "Setting timeout (" + timeout + ")");
537 } else {
538 goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
539 "Timeout due to inactivity (" + timeout + ")");
540 }
541 commitFrame(goAway);
542 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
543 final Map.Entry<Integer, H2Stream> entry = it.next();
544 final H2Stream stream = entry.getValue();
545 stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
546 }
547 streamMap.clear();
548 }
549
550 public final void onDisconnect() {
551 for (;;) {
552 final AsyncPingHandler pingHandler = pingHandlers.poll();
553 if (pingHandler != null) {
554 pingHandler.cancel();
555 } else {
556 break;
557 }
558 }
559 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
560 final Map.Entry<Integer, H2Stream> entry = it.next();
561 final H2Stream stream = entry.getValue();
562 stream.cancel();
563 }
564 for (;;) {
565 final Command command = ioSession.poll();
566 if (command != null) {
567 if (command instanceof ExecutableCommand) {
568 ((ExecutableCommand) command).failed(new ConnectionClosedException());
569 } else {
570 command.cancel();
571 }
572 } else {
573 break;
574 }
575 }
576 }
577
578 private void processPendingCommands() throws IOException, HttpException {
579 while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
580 final Command command = ioSession.poll();
581 if (command == null) {
582 break;
583 }
584 if (command instanceof ShutdownCommand) {
585 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
586 if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
587 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
588 final Map.Entry<Integer, H2Stream> entry = it.next();
589 final H2Stream stream = entry.getValue();
590 stream.cancel();
591 }
592 streamMap.clear();
593 connState = ConnectionHandshake.SHUTDOWN;
594 } else {
595 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
596 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
597 commitFrame(goAway);
598 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
599 }
600 }
601 break;
602 } else if (command instanceof PingCommand) {
603 final PingCommand./../../../org/apache/hc/core5/http2/nio/command/PingCommand.html#PingCommand">PingCommand pingCommand = (PingCommand) command;
604 final AsyncPingHandler handler = pingCommand.getHandler();
605 pingHandlers.add(handler);
606 final RawFrame ping = frameFactory.createPing(handler.getData());
607 commitFrame(ping);
608 } else if (command instanceof ExecutableCommand) {
609 final int streamId = generateStreamId();
610 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
611 streamId, true, initInputWinSize, initOutputWinSize);
612 final ExecutableCommand executableCommand = (ExecutableCommand) command;
613 final H2StreamHandler streamHandler = createLocallyInitiatedStream(
614 executableCommand, channel, httpProcessor, connMetrics);
615
616 final H2Stream stream = new H2Stream(channel, streamHandler, false);
617 streamMap.put(streamId, stream);
618
619 if (streamListener != null) {
620 final int initInputWindow = stream.getInputWindow().get();
621 streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
622 final int initOutputWindow = stream.getOutputWindow().get();
623 streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
624 }
625
626 if (stream.isOutputReady()) {
627 stream.produceOutput();
628 }
629 final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
630 if (cancellableDependency != null) {
631 cancellableDependency.setDependency(new Cancellable() {
632
633 @Override
634 public boolean cancel() {
635 return stream.abort();
636 }
637
638 });
639 }
640 if (!outputQueue.isEmpty()) {
641 return;
642 }
643 }
644 }
645 }
646
647 public final void onException(final Exception cause) {
648 try {
649 for (;;) {
650 final AsyncPingHandler pingHandler = pingHandlers.poll();
651 if (pingHandler != null) {
652 pingHandler.failed(cause);
653 } else {
654 break;
655 }
656 }
657 for (;;) {
658 final Command command = ioSession.poll();
659 if (command != null) {
660 if (command instanceof ExecutableCommand) {
661 ((ExecutableCommand) command).failed(new ConnectionClosedException());
662 } else {
663 command.cancel();
664 }
665 } else {
666 break;
667 }
668 }
669 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
670 final Map.Entry<Integer, H2Stream> entry = it.next();
671 final H2Stream stream = entry.getValue();
672 stream.reset(cause);
673 }
674 streamMap.clear();
675 if (!(cause instanceof ConnectionClosedException)) {
676 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
677 final H2Error errorCode;
678 if (cause instanceof H2ConnectionException) {
679 errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
680 } else if (cause instanceof ProtocolException){
681 errorCode = H2Error.PROTOCOL_ERROR;
682 } else {
683 errorCode = H2Error.INTERNAL_ERROR;
684 }
685 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
686 commitFrame(goAway);
687 }
688 }
689 } catch (final IOException ignore) {
690 } finally {
691 connState = ConnectionHandshake.SHUTDOWN;
692 final CloseMode closeMode;
693 if (cause instanceof ConnectionClosedException) {
694 closeMode = CloseMode.GRACEFUL;
695 } else if (cause instanceof IOException) {
696 closeMode = CloseMode.IMMEDIATE;
697 } else {
698 closeMode = CloseMode.GRACEFUL;
699 }
700 ioSession.close(closeMode);
701 }
702 }
703
704 private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
705 if (streamId == 0) {
706 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
707 }
708 final H2Stream stream = streamMap.get(streamId);
709 if (stream == null) {
710 if (streamId <= lastStreamId.get()) {
711 throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
712 } else {
713 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
714 }
715 }
716 return stream;
717 }
718
719 private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
720 final FrameType frameType = FrameType.valueOf(frame.getType());
721 final int streamId = frame.getStreamId();
722 if (continuation != null && frameType != FrameType.CONTINUATION) {
723 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
724 }
725 if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) >= 0) {
726 if (streamId > processedRemoteStreamId && !idGenerator.isSameSide(streamId)) {
727
728 return;
729 }
730 }
731 switch (frameType) {
732 case DATA: {
733 final H2Stream stream = getValidStream(streamId);
734 try {
735 consumeDataFrame(frame, stream);
736 } catch (final H2StreamResetException ex) {
737 stream.localReset(ex);
738 } catch (final HttpStreamResetException ex) {
739 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
740 }
741
742 if (stream.isTerminated()) {
743 streamMap.remove(streamId);
744 stream.releaseResources();
745 }
746 }
747 break;
748 case HEADERS: {
749 if (streamId == 0) {
750 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
751 }
752 H2Stream stream = streamMap.get(streamId);
753 if (stream == null) {
754 acceptHeaderFrame();
755
756 updateLastStreamId(streamId);
757
758 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
759 streamId, false, initInputWinSize, initOutputWinSize);
760 final H2StreamHandler streamHandler = createRemotelyInitiatedStream(
761 channel, httpProcessor, connMetrics, null);
762 stream = new H2Stream(channel, streamHandler, true);
763 if (stream.isOutputReady()) {
764 stream.produceOutput();
765 }
766 streamMap.put(streamId, stream);
767 }
768
769 try {
770 consumeHeaderFrame(frame, stream);
771
772 if (stream.isOutputReady()) {
773 stream.produceOutput();
774 }
775 } catch (final H2StreamResetException ex) {
776 stream.localReset(ex);
777 } catch (final HttpStreamResetException ex) {
778 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
779 } catch (final HttpException ex) {
780 stream.handle(ex);
781 }
782
783 if (stream.isTerminated()) {
784 streamMap.remove(streamId);
785 stream.releaseResources();
786 }
787 }
788 break;
789 case CONTINUATION: {
790 if (continuation == null) {
791 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
792 }
793 if (streamId != continuation.streamId) {
794 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
795 }
796
797 final H2Stream stream = getValidStream(streamId);
798 try {
799
800 consumeContinuationFrame(frame, stream);
801 } catch (final H2StreamResetException ex) {
802 stream.localReset(ex);
803 } catch (final HttpStreamResetException ex) {
804 stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
805 }
806
807 if (stream.isTerminated()) {
808 streamMap.remove(streamId);
809 stream.releaseResources();
810 }
811 }
812 break;
813 case WINDOW_UPDATE: {
814 final ByteBuffer payload = frame.getPayload();
815 if (payload == null || payload.remaining() != 4) {
816 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
817 }
818 final int delta = payload.getInt();
819 if (delta <= 0) {
820 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
821 }
822 if (streamId == 0) {
823 try {
824 updateOutputWindow(0, connOutputWindow, delta);
825 } catch (final ArithmeticException ex) {
826 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
827 }
828 } else {
829 final H2Stream stream = streamMap.get(streamId);
830 if (stream != null) {
831 try {
832 updateOutputWindow(streamId, stream.getOutputWindow(), delta);
833 } catch (final ArithmeticException ex) {
834 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
835 }
836 }
837 }
838 ioSession.setEvent(SelectionKey.OP_WRITE);
839 }
840 break;
841 case RST_STREAM: {
842 if (streamId == 0) {
843 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
844 }
845 final H2Stream stream = streamMap.get(streamId);
846 if (stream == null) {
847 if (streamId > lastStreamId.get()) {
848 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
849 }
850 } else {
851 final ByteBuffer payload = frame.getPayload();
852 if (payload == null || payload.remaining() != 4) {
853 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
854 }
855 final int errorCode = payload.getInt();
856 stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
857 streamMap.remove(streamId);
858 stream.releaseResources();
859 }
860 }
861 break;
862 case PING: {
863 if (streamId != 0) {
864 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
865 }
866 final ByteBuffer ping = frame.getPayloadContent();
867 if (ping == null || ping.remaining() != 8) {
868 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
869 }
870 if (frame.isFlagSet(FrameFlag.ACK)) {
871 final AsyncPingHandler pingHandler = pingHandlers.poll();
872 if (pingHandler != null) {
873 pingHandler.consumeResponse(ping);
874 }
875 } else {
876 final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
877 pong.put(ping);
878 pong.flip();
879 final RawFrame response = frameFactory.createPingAck(pong);
880 commitFrame(response);
881 }
882 }
883 break;
884 case SETTINGS: {
885 if (streamId != 0) {
886 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
887 }
888 if (frame.isFlagSet(FrameFlag.ACK)) {
889 if (localSettingState == SettingsHandshake.TRANSMITTED) {
890 localSettingState = SettingsHandshake.ACKED;
891 ioSession.setEvent(SelectionKey.OP_WRITE);
892 applyLocalSettings();
893 }
894 } else {
895 final ByteBuffer payload = frame.getPayload();
896 if (payload != null) {
897 if ((payload.remaining() % 6) != 0) {
898 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
899 }
900 consumeSettingsFrame(payload);
901 remoteSettingState = SettingsHandshake.TRANSMITTED;
902 }
903
904 final RawFrame response = frameFactory.createSettingsAck();
905 commitFrame(response);
906 remoteSettingState = SettingsHandshake.ACKED;
907 }
908 }
909 break;
910 case PRIORITY:
911
912 break;
913 case PUSH_PROMISE: {
914 acceptPushFrame();
915
916 if (!localConfig.isPushEnabled()) {
917 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
918 }
919
920 final H2Stream stream = getValidStream(streamId);
921 if (stream.isRemoteClosed()) {
922 stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
923 break;
924 }
925
926 final ByteBuffer payload = frame.getPayloadContent();
927 if (payload == null || payload.remaining() < 4) {
928 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
929 }
930 final int promisedStreamId = payload.getInt();
931 if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
932 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
933 }
934 if (streamMap.get(promisedStreamId) != null) {
935 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
936 }
937
938 updateLastStreamId(promisedStreamId);
939
940 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
941 promisedStreamId, false, initInputWinSize, initOutputWinSize);
942 final H2StreamHandler streamHandler = createRemotelyInitiatedStream(
943 channel, httpProcessor, connMetrics, stream.getPushHandlerFactory());
944 final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
945 streamMap.put(promisedStreamId, promisedStream);
946
947 try {
948 consumePushPromiseFrame(frame, payload, promisedStream);
949 } catch (final H2StreamResetException ex) {
950 promisedStream.localReset(ex);
951 } catch (final HttpStreamResetException ex) {
952 promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
953 }
954 }
955 break;
956 case GOAWAY: {
957 if (streamId != 0) {
958 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
959 }
960 final ByteBuffer payload = frame.getPayload();
961 if (payload == null || payload.remaining() < 8) {
962 throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
963 }
964 final int processedLocalStreamId = payload.getInt();
965 final int errorCode = payload.getInt();
966 if (errorCode == H2Error.NO_ERROR.getCode()) {
967 if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
968 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
969 final Map.Entry<Integer, H2Stream> entry = it.next();
970 final int activeStreamId = entry.getKey();
971 if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
972 final H2Stream stream = entry.getValue();
973 stream.cancel();
974 it.remove();
975 }
976 }
977 }
978 connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
979 } else {
980 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
981 final Map.Entry<Integer, H2Stream> entry = it.next();
982 final H2Stream stream = entry.getValue();
983 stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
984 }
985 streamMap.clear();
986 connState = ConnectionHandshake.SHUTDOWN;
987 }
988 }
989 ioSession.setEvent(SelectionKey.OP_WRITE);
990 break;
991 }
992 }
993
994 private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
995 final int streamId = stream.getId();
996 final ByteBuffer payload = frame.getPayloadContent();
997 if (payload != null) {
998 final int frameLength = frame.getLength();
999 final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
1000 if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
1001 stream.produceInputCapacityUpdate();
1002 }
1003 final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
1004 if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
1005 maximizeConnWindow(connWinSize);
1006 }
1007 }
1008 if (stream.isRemoteClosed()) {
1009 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1010 }
1011 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1012 stream.setRemoteEndStream();
1013 }
1014 if (stream.isLocalReset()) {
1015 return;
1016 }
1017 stream.consumeData(payload);
1018 }
1019
1020 private void maximizeConnWindow(final int connWinSize) throws IOException {
1021 final int delta = Integer.MAX_VALUE - connWinSize;
1022 if (delta > 0) {
1023 final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
1024 commitFrame(windowUpdateFrame);
1025 updateInputWindow(0, connInputWindow, delta);
1026 }
1027 }
1028
1029 private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
1030 final int promisedStreamId = promisedStream.getId();
1031 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1032 continuation = new Continuation(promisedStreamId, frame.getType(), true);
1033 }
1034 if (continuation == null) {
1035 final List<Header> headers = hPackDecoder.decodeHeaders(payload);
1036 if (promisedStreamId > processedRemoteStreamId) {
1037 processedRemoteStreamId = promisedStreamId;
1038 }
1039 if (streamListener != null) {
1040 streamListener.onHeaderInput(this, promisedStreamId, headers);
1041 }
1042 if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1043 throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused");
1044 }
1045 promisedStream.consumePromise(headers);
1046 } else {
1047 continuation.copyPayload(payload);
1048 }
1049 }
1050
1051 List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
1052 return hPackDecoder.decodeHeaders(payload);
1053 }
1054
1055 private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1056 final int streamId = stream.getId();
1057 if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
1058 continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
1059 }
1060 final ByteBuffer payload = frame.getPayloadContent();
1061 if (frame.isFlagSet(FrameFlag.PRIORITY)) {
1062
1063 payload.getInt();
1064 payload.get();
1065 }
1066 if (continuation == null) {
1067 final List<Header> headers = decodeHeaders(payload);
1068 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1069 processedRemoteStreamId = streamId;
1070 }
1071 if (streamListener != null) {
1072 streamListener.onHeaderInput(this, streamId, headers);
1073 }
1074 if (stream.isRemoteClosed()) {
1075 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1076 }
1077 if (stream.isLocalReset()) {
1078 return;
1079 }
1080 if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1081 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
1082 }
1083 if (frame.isFlagSet(FrameFlag.END_STREAM)) {
1084 stream.setRemoteEndStream();
1085 }
1086 stream.consumeHeader(headers);
1087 } else {
1088 continuation.copyPayload(payload);
1089 }
1090 }
1091
1092 private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
1093 final int streamId = frame.getStreamId();
1094 final ByteBuffer payload = frame.getPayload();
1095 continuation.copyPayload(payload);
1096 if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
1097 final List<Header> headers = decodeHeaders(continuation.getContent());
1098 if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
1099 processedRemoteStreamId = streamId;
1100 }
1101 if (streamListener != null) {
1102 streamListener.onHeaderInput(this, streamId, headers);
1103 }
1104 if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
1105 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
1106 }
1107 if (stream.isRemoteClosed()) {
1108 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
1109 }
1110 if (stream.isLocalReset()) {
1111 return;
1112 }
1113 if (continuation.endStream) {
1114 stream.setRemoteEndStream();
1115 }
1116 if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
1117 stream.consumePromise(headers);
1118 } else {
1119 stream.consumeHeader(headers);
1120 }
1121 continuation = null;
1122 }
1123 }
1124
1125 private void consumeSettingsFrame(final ByteBuffer payload) throws HttpException, IOException {
1126 final H2Config.Builder configBuilder = H2Config.initial();
1127 while (payload.hasRemaining()) {
1128 final int code = payload.getShort();
1129 final int value = payload.getInt();
1130 final H2Param param = H2Param.valueOf(code);
1131 if (param != null) {
1132 switch (param) {
1133 case HEADER_TABLE_SIZE:
1134 try {
1135 configBuilder.setHeaderTableSize(value);
1136 } catch (final IllegalArgumentException ex) {
1137 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1138 }
1139 break;
1140 case MAX_CONCURRENT_STREAMS:
1141 try {
1142 configBuilder.setMaxConcurrentStreams(value);
1143 } catch (final IllegalArgumentException ex) {
1144 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1145 }
1146 break;
1147 case ENABLE_PUSH:
1148 configBuilder.setPushEnabled(value == 1);
1149 break;
1150 case INITIAL_WINDOW_SIZE:
1151 try {
1152 configBuilder.setInitialWindowSize(value);
1153 } catch (final IllegalArgumentException ex) {
1154 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1155 }
1156 break;
1157 case MAX_FRAME_SIZE:
1158 try {
1159 configBuilder.setMaxFrameSize(value);
1160 } catch (final IllegalArgumentException ex) {
1161 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1162 }
1163 break;
1164 case MAX_HEADER_LIST_SIZE:
1165 try {
1166 configBuilder.setMaxHeaderListSize(value);
1167 } catch (final IllegalArgumentException ex) {
1168 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
1169 }
1170 break;
1171 }
1172 }
1173 }
1174 applyRemoteSettings(configBuilder.build());
1175 }
1176
1177 private void produceOutput() throws HttpException, IOException {
1178 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1179 final Map.Entry<Integer, H2Stream> entry = it.next();
1180 final H2Stream stream = entry.getValue();
1181 if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
1182 stream.produceOutput();
1183 }
1184 if (stream.isTerminated()) {
1185 it.remove();
1186 stream.releaseResources();
1187 }
1188 if (!outputQueue.isEmpty()) {
1189 break;
1190 }
1191 }
1192 }
1193
1194 private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
1195 remoteConfig = config;
1196
1197 hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
1198 final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
1199 initOutputWinSize = remoteConfig.getInitialWindowSize();
1200
1201 if (delta != 0) {
1202 if (!streamMap.isEmpty()) {
1203 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1204 final Map.Entry<Integer, H2Stream> entry = it.next();
1205 final H2Stream stream = entry.getValue();
1206 try {
1207 updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
1208 } catch (final ArithmeticException ex) {
1209 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1210 }
1211 }
1212 }
1213 }
1214 }
1215
1216 private void applyLocalSettings() throws H2ConnectionException {
1217 hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
1218 hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());
1219
1220 final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
1221 initInputWinSize = localConfig.getInitialWindowSize();
1222
1223 if (delta != 0 && !streamMap.isEmpty()) {
1224 for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
1225 final Map.Entry<Integer, H2Stream> entry = it.next();
1226 final H2Stream stream = entry.getValue();
1227 try {
1228 updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
1229 } catch (final ArithmeticException ex) {
1230 throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
1231 }
1232 }
1233 }
1234 lowMark = initInputWinSize / 2;
1235 }
1236
1237 @Override
1238 public void close() throws IOException {
1239 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
1240 }
1241
1242 @Override
1243 public void close(final CloseMode closeMode) {
1244 ioSession.close(closeMode);
1245 }
1246
1247 @Override
1248 public boolean isOpen() {
1249 return connState == ConnectionHandshake.ACTIVE;
1250 }
1251
1252 @Override
1253 public void setSocketTimeout(final Timeout timeout) {
1254 ioSession.setSocketTimeout(timeout);
1255 }
1256
1257 @Override
1258 public SSLSession getSSLSession() {
1259 final TlsDetails tlsDetails = ioSession.getTlsDetails();
1260 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
1261 }
1262
1263 @Override
1264 public EndpointDetails getEndpointDetails() {
1265 if (endpointDetails == null) {
1266 endpointDetails = new BasicEndpointDetails(
1267 ioSession.getRemoteAddress(),
1268 ioSession.getLocalAddress(),
1269 connMetrics,
1270 ioSession.getSocketTimeout());
1271 }
1272 return endpointDetails;
1273 }
1274
1275 @Override
1276 public Timeout getSocketTimeout() {
1277 return ioSession.getSocketTimeout();
1278 }
1279
1280 @Override
1281 public ProtocolVersion getProtocolVersion() {
1282 return HttpVersion.HTTP_2;
1283 }
1284
1285 @Override
1286 public SocketAddress getRemoteAddress() {
1287 return ioSession.getRemoteAddress();
1288 }
1289
1290 @Override
1291 public SocketAddress getLocalAddress() {
1292 return ioSession.getLocalAddress();
1293 }
1294
1295 void appendState(final StringBuilder buf) {
1296 buf.append("connState=").append(connState)
1297 .append(", connInputWindow=").append(connInputWindow)
1298 .append(", connOutputWindow=").append(connOutputWindow)
1299 .append(", outputQueue=").append(outputQueue.size())
1300 .append(", streamMap=").append(streamMap.size())
1301 .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
1302 }
1303
1304 private static class Continuation {
1305
1306 final int streamId;
1307 final int type;
1308 final boolean endStream;
1309 final ByteArrayBuffer headerBuffer;
1310
1311 private Continuation(final int streamId, final int type, final boolean endStream) {
1312 this.streamId = streamId;
1313 this.type = type;
1314 this.endStream = endStream;
1315 this.headerBuffer = new ByteArrayBuffer(1024);
1316 }
1317
1318 void copyPayload(final ByteBuffer payload) {
1319 if (payload == null) {
1320 return;
1321 }
1322 headerBuffer.ensureCapacity(payload.remaining());
1323 payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
1324 }
1325
1326 ByteBuffer getContent() {
1327 return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
1328 }
1329
1330 }
1331
1332 private class H2StreamChannelImpl implements H2StreamChannel {
1333
1334 private final int id;
1335 private final AtomicInteger inputWindow;
1336 private final AtomicInteger outputWindow;
1337
1338 private volatile boolean idle;
1339 private volatile boolean remoteEndStream;
1340 private volatile boolean localEndStream;
1341
1342 private volatile long deadline;
1343
1344 H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
1345 this.id = id;
1346 this.idle = idle;
1347 this.inputWindow = new AtomicInteger(initialInputWindowSize);
1348 this.outputWindow = new AtomicInteger(initialOutputWindowSize);
1349 }
1350
1351 int getId() {
1352 return id;
1353 }
1354
1355 AtomicInteger getOutputWindow() {
1356 return outputWindow;
1357 }
1358
1359 AtomicInteger getInputWindow() {
1360 return inputWindow;
1361 }
1362
1363 @Override
1364 public void submit(final List<Header> headers, final boolean endStream) throws IOException {
1365 ioSession.getLock().lock();
1366 try {
1367 if (headers == null || headers.isEmpty()) {
1368 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
1369 }
1370 if (localEndStream) {
1371 return;
1372 }
1373 idle = false;
1374 commitHeaders(id, headers, endStream);
1375 if (endStream) {
1376 localEndStream = true;
1377 }
1378 } finally {
1379 ioSession.getLock().unlock();
1380 }
1381 }
1382
1383 @Override
1384 public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
1385 acceptPushRequest();
1386 final int promisedStreamId = generateStreamId();
1387 final H2StreamChannelImpl channel = new H2StreamChannelImpl(
1388 promisedStreamId,
1389 true,
1390 localConfig.getInitialWindowSize(),
1391 remoteConfig.getInitialWindowSize());
1392 final HttpCoreContext context = HttpCoreContext.create();
1393 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
1394 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
1395 final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
1396 channel, httpProcessor, connMetrics, pushProducer, context);
1397 final H2Stream stream = new H2Stream(channel, streamHandler, false);
1398 streamMap.put(promisedStreamId, stream);
1399
1400 ioSession.getLock().lock();
1401 try {
1402 if (localEndStream) {
1403 stream.releaseResources();
1404 return;
1405 }
1406 commitPushPromise(id, promisedStreamId, headers);
1407 idle = false;
1408 } finally {
1409 ioSession.getLock().unlock();
1410 }
1411 }
1412
1413 @Override
1414 public void update(final int increment) throws IOException {
1415 if (remoteEndStream) {
1416 return;
1417 }
1418 incrementInputCapacity(0, connInputWindow, increment);
1419 incrementInputCapacity(id, inputWindow, increment);
1420 }
1421
1422 @Override
1423 public int write(final ByteBuffer payload) throws IOException {
1424 ioSession.getLock().lock();
1425 try {
1426 if (localEndStream) {
1427 return 0;
1428 }
1429 return streamData(id, outputWindow, payload);
1430 } finally {
1431 ioSession.getLock().unlock();
1432 }
1433 }
1434
1435 @Override
1436 public void endStream(final List<? extends Header> trailers) throws IOException {
1437 ioSession.getLock().lock();
1438 try {
1439 if (localEndStream) {
1440 return;
1441 }
1442 localEndStream = true;
1443 if (trailers != null && !trailers.isEmpty()) {
1444 commitHeaders(id, trailers, true);
1445 } else {
1446 final RawFrame frame = frameFactory.createData(id, null, true);
1447 commitFrameInternal(frame);
1448 }
1449 } finally {
1450 ioSession.getLock().unlock();
1451 }
1452 }
1453
1454 @Override
1455 public void endStream() throws IOException {
1456 endStream(null);
1457 }
1458
1459 @Override
1460 public void requestOutput() {
1461 requestSessionOutput();
1462 }
1463
1464 boolean isRemoteClosed() {
1465 return remoteEndStream;
1466 }
1467
1468 void setRemoteEndStream() {
1469 remoteEndStream = true;
1470 }
1471
1472 boolean isLocalClosed() {
1473 return localEndStream;
1474 }
1475
1476 void setLocalEndStream() {
1477 localEndStream = true;
1478 }
1479
1480 boolean isLocalReset() {
1481 return deadline > 0;
1482 }
1483
1484 boolean isResetDeadline() {
1485 final long l = deadline;
1486 return l > 0 && l < System.currentTimeMillis();
1487 }
1488
1489 boolean localReset(final int code) throws IOException {
1490 ioSession.getLock().lock();
1491 try {
1492 if (localEndStream) {
1493 return false;
1494 }
1495 localEndStream = true;
1496 deadline = System.currentTimeMillis() + LINGER_TIME;
1497 if (!idle) {
1498 final RawFrame resetStream = frameFactory.createResetStream(id, code);
1499 commitFrameInternal(resetStream);
1500 return true;
1501 }
1502 return false;
1503 } finally {
1504 ioSession.getLock().unlock();
1505 }
1506 }
1507
1508 boolean localReset(final H2Error error) throws IOException {
1509 return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1510 }
1511
1512 @Override
1513 public boolean cancel() {
1514 try {
1515 return localReset(H2Error.CANCEL);
1516 } catch (final IOException ignore) {
1517 return false;
1518 }
1519 }
1520
1521 void appendState(final StringBuilder buf) {
1522 buf.append("id=").append(id)
1523 .append(", connState=").append(connState)
1524 .append(", inputWindow=").append(inputWindow)
1525 .append(", outputWindow=").append(outputWindow)
1526 .append(", localEndStream=").append(localEndStream)
1527 .append(", idle=").append(idle);
1528 }
1529
1530 @Override
1531 public String toString() {
1532 final StringBuilder buf = new StringBuilder();
1533 buf.append("[");
1534 appendState(buf);
1535 buf.append("]");
1536 return buf.toString();
1537 }
1538
1539 }
1540
1541 static class H2Stream {
1542
1543 private final H2StreamChannelImpl channel;
1544 private final H2StreamHandler handler;
1545 private final boolean remoteInitiated;
1546
1547 private H2Stream(
1548 final H2StreamChannelImpl channel,
1549 final H2StreamHandler handler,
1550 final boolean remoteInitiated) {
1551 this.channel = channel;
1552 this.handler = handler;
1553 this.remoteInitiated = remoteInitiated;
1554 }
1555
1556 int getId() {
1557 return channel.getId();
1558 }
1559
1560 boolean isRemoteInitiated() {
1561 return remoteInitiated;
1562 }
1563
1564 AtomicInteger getOutputWindow() {
1565 return channel.getOutputWindow();
1566 }
1567
1568 AtomicInteger getInputWindow() {
1569 return channel.getInputWindow();
1570 }
1571
1572 boolean isTerminated() {
1573 return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
1574 }
1575
1576 boolean isRemoteClosed() {
1577 return channel.isRemoteClosed();
1578 }
1579
1580 boolean isLocalClosed() {
1581 return channel.isLocalClosed();
1582 }
1583
1584 boolean isLocalReset() {
1585 return channel.isLocalReset();
1586 }
1587
1588 void setRemoteEndStream() {
1589 channel.setRemoteEndStream();
1590 }
1591
1592 void consumePromise(final List<Header> headers) throws HttpException, IOException {
1593 try {
1594 handler.consumePromise(headers);
1595 channel.setLocalEndStream();
1596 } catch (final ProtocolException ex) {
1597 localReset(ex, H2Error.PROTOCOL_ERROR);
1598 }
1599 }
1600
1601 void consumeHeader(final List<Header> headers) throws HttpException, IOException {
1602 try {
1603 handler.consumeHeader(headers, channel.isRemoteClosed());
1604 } catch (final ProtocolException ex) {
1605 localReset(ex, H2Error.PROTOCOL_ERROR);
1606 }
1607 }
1608
1609 void consumeData(final ByteBuffer src) throws HttpException, IOException {
1610 try {
1611 handler.consumeData(src, channel.isRemoteClosed());
1612 } catch (final CharacterCodingException ex) {
1613 localReset(ex, H2Error.INTERNAL_ERROR);
1614 } catch (final ProtocolException ex) {
1615 localReset(ex, H2Error.PROTOCOL_ERROR);
1616 }
1617 }
1618
1619 boolean isOutputReady() {
1620 return handler.isOutputReady();
1621 }
1622
1623 void produceOutput() throws HttpException, IOException {
1624 try {
1625 handler.produceOutput();
1626 } catch (final ProtocolException ex) {
1627 localReset(ex, H2Error.PROTOCOL_ERROR);
1628 }
1629 }
1630
1631 void produceInputCapacityUpdate() throws IOException {
1632 handler.updateInputCapacity();
1633 }
1634
1635 void reset(final Exception cause) {
1636 channel.setRemoteEndStream();
1637 channel.setLocalEndStream();
1638 handler.failed(cause);
1639 }
1640
1641 void localReset(final Exception cause, final int code) throws IOException {
1642 channel.localReset(code);
1643 handler.failed(cause);
1644 }
1645
1646 void localReset(final Exception cause, final H2Error error) throws IOException {
1647 localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
1648 }
1649
1650 void localReset(final H2StreamResetException ex) throws IOException {
1651 localReset(ex, ex.getCode());
1652 }
1653
1654 void handle(final HttpException ex) throws IOException, HttpException {
1655 handler.handle(ex, channel.isRemoteClosed());
1656 }
1657
1658 HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1659 return handler.getPushHandlerFactory();
1660 }
1661
1662 void cancel() {
1663 reset(new CancellationException("HTTP/2 message exchange cancelled"));
1664 }
1665
1666 boolean abort() {
1667 final boolean cancelled = channel.cancel();
1668 handler.failed(new CancellationException("HTTP/2 message exchange cancelled"));
1669 return cancelled;
1670 }
1671
1672 void releaseResources() {
1673 handler.releaseResources();
1674 channel.requestOutput();
1675 }
1676
1677 void appendState(final StringBuilder buf) {
1678 buf.append("channel=[");
1679 channel.appendState(buf);
1680 buf.append("]");
1681 }
1682
1683 @Override
1684 public String toString() {
1685 final StringBuilder buf = new StringBuilder();
1686 buf.append("[");
1687 appendState(buf);
1688 buf.append("]");
1689 return buf.toString();
1690 }
1691
1692 }
1693
1694 }