/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.coyote.http2;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.http.WebConnection;
import org.apache.coyote.Adapter;
import org.apache.coyote.ProtocolException;
import org.apache.coyote.Request;
import org.apache.coyote.Response;
import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
import org.apache.coyote.http2.HpackDecoder.HeaderEmitter;
import org.apache.coyote.http2.HpackEncoder.State;
import org.apache.coyote.http2.Http2Parser.Input;
import org.apache.coyote.http2.Http2Parser.Output;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.codec.binary.Base64;
import org.apache.tomcat.util.http.FastHttpDateFormat;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.SSLSupport;
import org.apache.tomcat.util.net.SocketEvent;
import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.res.StringManager;
/**
* This represents an HTTP/2 connection from a client to Tomcat. It is designed
* on the basis that there will never be more than one thread performing I/O at
* a time.
*
* For reading, this implementation is blocking within frames and non-blocking
* between frames.
*
* Note:
*
* - Tomcat needs to be configured with honorCipherOrder="false" otherwise
* Tomcat will prefer a cipher suite that is blacklisted by HTTP/2.
* - You will need to nest an <UpgradeProtocol
* className="org.apache.coyote.http2.Http2Protocol" /> element inside
* a TLS enabled Connector element in server.xml to enable HTTP/2 support.
*
*
*/
public class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeHandler,
Input, Output {
private static final Log log = LogFactory.getLog(Http2UpgradeHandler.class);
private static final StringManager sm = StringManager.getManager(Http2UpgradeHandler.class);
private static final AtomicInteger connectionIdGenerator = new AtomicInteger(0);
private static final Integer STREAM_ID_ZERO = Integer.valueOf(0);
private static final int FLAG_END_OF_STREAM = 1;
private static final int FLAG_END_OF_HEADERS = 4;
private static final byte[] PING = { 0x00, 0x00, 0x08, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00};
private static final byte[] PING_ACK = { 0x00, 0x00, 0x08, 0x06, 0x01, 0x00, 0x00, 0x00, 0x00 };
private static final byte[] SETTINGS_ACK = { 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00 };
private static final byte[] GOAWAY = { 0x07, 0x00, 0x00, 0x00, 0x00, 0x00 };
private static final String HTTP2_SETTINGS_HEADER = "HTTP2-Settings";
private static final HeaderSink HEADER_SINK = new HeaderSink();
private final String connectionId;
private final Adapter adapter;
private volatile SocketWrapperBase> socketWrapper;
private volatile SSLSupport sslSupport;
private volatile Http2Parser parser;
// Simple state machine (sequence of states)
private AtomicReference connectionState =
new AtomicReference<>(ConnectionState.NEW);
private volatile long pausedNanoTime = Long.MAX_VALUE;
/**
* Remote settings are settings defined by the client and sent to Tomcat
* that Tomcat must use when communicating with the client.
*/
private final ConnectionSettingsRemote remoteSettings;
/**
* Local settings are settings defined by Tomcat and sent to the client that
* the client must use when communicating with Tomcat.
*/
private final ConnectionSettingsLocal localSettings;
private HpackDecoder hpackDecoder;
private HpackEncoder hpackEncoder;
// All timeouts in milliseconds
private long readTimeout = Http2Protocol.DEFAULT_READ_TIMEOUT;
private long keepAliveTimeout = Http2Protocol.DEFAULT_KEEP_ALIVE_TIMEOUT;
private long writeTimeout = Http2Protocol.DEFAULT_WRITE_TIMEOUT;
private final Map streams = new HashMap<>();
private final AtomicInteger activeRemoteStreamCount = new AtomicInteger(0);
private volatile int maxRemoteStreamId = 0;
// Start at -1 so the 'add 2' logic in closeIdleStreams() works
private volatile int maxActiveRemoteStreamId = -1;
private volatile int maxProcessedStreamId;
private final AtomicInteger nextLocalStreamId = new AtomicInteger(2);
private final PingManager pingManager = new PingManager();
private volatile int newStreamsSinceLastPrune = 0;
// Tracking for when the connection is blocked (windowSize < 1)
private final Map backLogStreams = new ConcurrentHashMap<>();
private long backLogSize = 0;
public Http2UpgradeHandler(Adapter adapter, Request coyoteRequest) {
super (STREAM_ID_ZERO);
this.adapter = adapter;
this.connectionId = Integer.toString(connectionIdGenerator.getAndIncrement());
remoteSettings = new ConnectionSettingsRemote(connectionId);
localSettings = new ConnectionSettingsLocal(connectionId);
// Initial HTTP request becomes stream 1.
if (coyoteRequest != null) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.upgrade", connectionId));
}
Integer key = Integer.valueOf(1);
Stream stream = new Stream(key, this, coyoteRequest);
streams.put(key, stream);
maxRemoteStreamId = 1;
maxActiveRemoteStreamId = 1;
activeRemoteStreamCount.set(1);
maxProcessedStreamId = 1;
}
}
@Override
public void init(WebConnection webConnection) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.init", connectionId, connectionState.get()));
}
if (!connectionState.compareAndSet(ConnectionState.NEW, ConnectionState.CONNECTED)) {
return;
}
parser = new Http2Parser(connectionId, this, this);
Stream stream = null;
socketWrapper.setReadTimeout(getReadTimeout());
socketWrapper.setWriteTimeout(getWriteTimeout());
if (webConnection != null) {
// HTTP/2 started via HTTP upgrade.
// The initial HTTP/1.1 request is available as Stream 1.
try {
// Process the initial settings frame
stream = getStream(1, true);
String base64Settings = stream.getCoyoteRequest().getHeader(HTTP2_SETTINGS_HEADER);
byte[] settings = Base64.decodeBase64(base64Settings);
// Settings are only valid on stream 0
FrameType.SETTINGS.check(0, settings.length);
for (int i = 0; i < settings.length % 6; i++) {
int id = ByteUtil.getTwoBytes(settings, i * 6);
long value = ByteUtil.getFourBytes(settings, (i * 6) + 2);
remoteSettings.set(Setting.valueOf(id), value);
}
} catch (Http2Exception e) {
throw new ProtocolException(
sm.getString("upgradeHandler.upgrade.fail", connectionId));
}
}
// Send the initial settings frame
try {
byte[] settings = localSettings.getSettingsFrameForPending();
socketWrapper.write(true, settings, 0, settings.length);
socketWrapper.flush(true);
} catch (IOException ioe) {
String msg = sm.getString("upgradeHandler.sendPrefaceFail", connectionId);
if (log.isDebugEnabled()) {
log.debug(msg);
}
throw new ProtocolException(msg, ioe);
}
// Make sure the client has sent a valid connection preface before we
// send the response to the original request over HTTP/2.
try {
parser.readConnectionPreface();
} catch (Http2Exception e) {
String msg = sm.getString("upgradeHandler.invalidPreface", connectionId);
if (log.isDebugEnabled()) {
log.debug(msg);
}
throw new ProtocolException(msg);
}
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.prefaceReceived", connectionId));
}
// Send a ping to get an idea of round trip time as early as possible
try {
pingManager.sendPing(true);
} catch (IOException ioe) {
throw new ProtocolException(sm.getString("upgradeHandler.pingFailed"), ioe);
}
if (webConnection != null) {
// Process the initial request on a container thread
StreamProcessor streamProcessor = new StreamProcessor(stream, adapter, socketWrapper);
streamProcessor.setSslSupport(sslSupport);
socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
}
}
@Override
public void setSocketWrapper(SocketWrapperBase> wrapper) {
this.socketWrapper = wrapper;
}
@Override
public void setSslSupport(SSLSupport sslSupport) {
this.sslSupport = sslSupport;
}
@Override
public SocketState upgradeDispatch(SocketEvent status) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.upgradeDispatch.entry", connectionId, status));
}
// WebConnection is not used so passing null here is fine
// Might not be necessary. init() will handle that.
init(null);
SocketState result = SocketState.CLOSED;
try {
pingManager.sendPing(false);
checkPauseState();
switch(status) {
case OPEN_READ:
try {
// There is data to read so use the read timeout while
// reading frames.
socketWrapper.setReadTimeout(getReadTimeout());
while (true) {
try {
if (!parser.readFrame(false)) {
break;
}
} catch (StreamException se) {
// Stream errors are not fatal to the connection so
// continue reading frames
resetStream(se);
}
}
// No more frames to read so switch to the keep-alive
// timeout.
socketWrapper.setReadTimeout(getKeepAliveTimeout());
} catch (Http2Exception ce) {
// Really ConnectionException
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.connectionError"), ce);
}
closeConnection(ce);
break;
}
result = SocketState.UPGRADED;
break;
case OPEN_WRITE:
processWrites();
result = SocketState.UPGRADED;
break;
case DISCONNECT:
case ERROR:
case TIMEOUT:
case STOP:
close();
break;
}
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.ioerror", connectionId), ioe);
}
close();
}
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.upgradeDispatch.exit", connectionId, result));
}
return result;
}
ConnectionSettingsRemote getRemoteSettings() {
return remoteSettings;
}
ConnectionSettingsLocal getLocalSettings() {
return localSettings;
}
@Override
public void pause() {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.pause.entry", connectionId));
}
if (connectionState.compareAndSet(ConnectionState.CONNECTED, ConnectionState.PAUSING)) {
pausedNanoTime = System.nanoTime();
// Write a GOAWAY frame.
byte[] fixedPayload = new byte[8];
ByteUtil.set31Bits(fixedPayload, 0, (1 << 31) - 1);
ByteUtil.setFourBytes(fixedPayload, 4, Http2Error.NO_ERROR.getCode());
byte[] payloadLength = new byte[3];
ByteUtil.setThreeBytes(payloadLength, 0, 8);
try {
synchronized (socketWrapper) {
socketWrapper.write(true, payloadLength, 0, payloadLength.length);
socketWrapper.write(true, GOAWAY, 0, GOAWAY.length);
socketWrapper.write(true, fixedPayload, 0, 8);
socketWrapper.flush(true);
}
} catch (IOException ioe) {
// This is fatal for the connection. Ignore it here. There will be
// further attempts at I/O in upgradeDispatch() and it can better
// handle the IO errors.
}
}
}
@Override
public void destroy() {
// NO-OP
}
private void checkPauseState() throws IOException {
if (connectionState.get() == ConnectionState.PAUSING) {
if (pausedNanoTime + pingManager.getRoundTripTimeNano() < System.nanoTime()) {
connectionState.compareAndSet(ConnectionState.PAUSING, ConnectionState.PAUSED);
// Write a GOAWAY frame.
byte[] fixedPayload = new byte[8];
ByteUtil.set31Bits(fixedPayload, 0, maxProcessedStreamId);
ByteUtil.setFourBytes(fixedPayload, 4, Http2Error.NO_ERROR.getCode());
byte[] payloadLength = new byte[3];
ByteUtil.setThreeBytes(payloadLength, 0, 8);
synchronized (socketWrapper) {
socketWrapper.write(true, payloadLength, 0, payloadLength.length);
socketWrapper.write(true, GOAWAY, 0, GOAWAY.length);
socketWrapper.write(true, fixedPayload, 0, 8);
socketWrapper.flush(true);
}
}
}
}
void resetStream(StreamException se) throws IOException {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.rst.debug", connectionId,
Integer.toString(se.getStreamId()), se.getError()));
}
// Write a RST frame
byte[] rstFrame = new byte[13];
// Length
ByteUtil.setThreeBytes(rstFrame, 0, 4);
// Type
rstFrame[3] = FrameType.RST.getIdByte();
// No flags
// Stream ID
ByteUtil.set31Bits(rstFrame, 5, se.getStreamId());
// Payload
ByteUtil.setFourBytes(rstFrame, 9, se.getError().getCode());
synchronized (socketWrapper) {
socketWrapper.write(true, rstFrame, 0, rstFrame.length);
socketWrapper.flush(true);
}
}
void closeConnection(Http2Exception ce) {
// Write a GOAWAY frame.
byte[] fixedPayload = new byte[8];
ByteUtil.set31Bits(fixedPayload, 0, maxProcessedStreamId);
ByteUtil.setFourBytes(fixedPayload, 4, ce.getError().getCode());
byte[] debugMessage = ce.getMessage().getBytes(StandardCharsets.UTF_8);
byte[] payloadLength = new byte[3];
ByteUtil.setThreeBytes(payloadLength, 0, debugMessage.length + 8);
try {
synchronized (socketWrapper) {
socketWrapper.write(true, payloadLength, 0, payloadLength.length);
socketWrapper.write(true, GOAWAY, 0, GOAWAY.length);
socketWrapper.write(true, fixedPayload, 0, 8);
socketWrapper.write(true, debugMessage, 0, debugMessage.length);
socketWrapper.flush(true);
}
} catch (IOException ioe) {
// Ignore. GOAWAY is sent on a best efforts basis and the original
// error has already been logged.
}
close();
}
void writeHeaders(Stream stream, Response coyoteResponse, int payloadSize)
throws IOException {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.writeHeaders", connectionId,
stream.getIdentifier()));
}
prepareHeaders(coyoteResponse);
// This ensures the Stream processing thread has control of the socket.
synchronized (socketWrapper) {
byte[] header = new byte[9];
ByteBuffer target = ByteBuffer.allocate(payloadSize);
boolean first = true;
State state = null;
while (state != State.COMPLETE) {
state = getHpackEncoder().encode(coyoteResponse.getMimeHeaders(), target);
target.flip();
ByteUtil.setThreeBytes(header, 0, target.limit());
if (first) {
first = false;
header[3] = FrameType.HEADERS.getIdByte();
if (stream.getOutputBuffer().hasNoBody()) {
header[4] = FLAG_END_OF_STREAM;
}
} else {
header[3] = FrameType.CONTINUATION.getIdByte();
}
if (state == State.COMPLETE) {
header[4] += FLAG_END_OF_HEADERS;
}
if (log.isDebugEnabled()) {
log.debug(target.limit() + " bytes");
}
ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
try {
socketWrapper.write(true, header, 0, header.length);
socketWrapper.write(true, target.array(), target.arrayOffset(), target.limit());
socketWrapper.flush(true);
} catch (IOException ioe) {
handleAppInitiatedIOException(ioe);
}
}
}
}
private void prepareHeaders(Response coyoteResponse) {
MimeHeaders headers = coyoteResponse.getMimeHeaders();
int statusCode = coyoteResponse.getStatus();
// Add the pseudo header for status
headers.addValue(":status").setString(Integer.toString(statusCode));
// Check to see if a response body is present
if (!(statusCode < 200 || statusCode == 205 || statusCode == 304)) {
String contentType = coyoteResponse.getContentType();
if (contentType != null) {
headers.setValue("content-type").setString(contentType);
}
String contentLanguage = coyoteResponse.getContentLanguage();
if (contentLanguage != null) {
headers.setValue("content-language").setString(contentLanguage);
}
}
// Add date header unless the application has already set one
if (headers.getValue("date") == null) {
headers.setValue("date").setString(FastHttpDateFormat.getCurrentDate());
}
}
void writePushHeaders(Stream stream, int pushedStreamId, Request coyoteRequest, int payloadSize)
throws IOException {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.writePushHeaders", connectionId,
stream.getIdentifier(), Integer.toString(pushedStreamId)));
}
// This ensures the Stream processing thread has control of the socket.
synchronized (socketWrapper) {
byte[] header = new byte[9];
ByteBuffer target = ByteBuffer.allocate(payloadSize);
boolean first = true;
State state = null;
byte[] pushedStreamIdBytes = new byte[4];
ByteUtil.set31Bits(pushedStreamIdBytes, 0, pushedStreamId);
target.put(pushedStreamIdBytes);
while (state != State.COMPLETE) {
state = getHpackEncoder().encode(coyoteRequest.getMimeHeaders(), target);
target.flip();
ByteUtil.setThreeBytes(header, 0, target.limit());
if (first) {
first = false;
header[3] = FrameType.PUSH_PROMISE.getIdByte();
} else {
header[3] = FrameType.CONTINUATION.getIdByte();
}
if (state == State.COMPLETE) {
header[4] += FLAG_END_OF_HEADERS;
}
if (log.isDebugEnabled()) {
log.debug(target.limit() + " bytes");
}
ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
socketWrapper.write(true, header, 0, header.length);
socketWrapper.write(true, target.array(), target.arrayOffset(), target.limit());
socketWrapper.flush(true);
}
}
}
private HpackEncoder getHpackEncoder() {
if (hpackEncoder == null) {
hpackEncoder = new HpackEncoder(localSettings.getHeaderTableSize());
}
return hpackEncoder;
}
void writeBody(Stream stream, ByteBuffer data, int len, boolean finished) throws IOException {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.writeBody", connectionId, stream.getIdentifier(),
Integer.toString(len)));
}
synchronized (socketWrapper) {
byte[] header = new byte[9];
ByteUtil.setThreeBytes(header, 0, len);
header[3] = FrameType.DATA.getIdByte();
if (finished) {
header[4] = FLAG_END_OF_STREAM;
stream.sentEndOfStream();
if (!stream.isActive()) {
activeRemoteStreamCount.decrementAndGet();
}
}
ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
try {
socketWrapper.write(true, header, 0, header.length);
socketWrapper.write(true, data.array(), data.arrayOffset() + data.position(),
len);
socketWrapper.flush(true);
} catch (IOException ioe) {
handleAppInitiatedIOException(ioe);
}
}
}
/*
* Handles an I/O error on the socket underlying the HTTP/2 connection when
* it is triggered by application code (usually reading the request or
* writing the response). Such I/O errors are fatal so the connection is
* closed. The exception is re-thrown to make the client code aware of the
* problem.
*
* Note: We can not rely on this exception reaching the socket processor
* since the application code may swallow it.
*/
private void handleAppInitiatedIOException(IOException ioe) throws IOException {
close();
throw ioe;
}
/*
* Needs to know if this was application initiated since that affects the
* error handling.
*/
void writeWindowUpdate(Stream stream, int increment, boolean applicationInitiated)
throws IOException {
synchronized (socketWrapper) {
// Build window update frame for stream 0
byte[] frame = new byte[13];
ByteUtil.setThreeBytes(frame, 0, 4);
frame[3] = FrameType.WINDOW_UPDATE.getIdByte();
ByteUtil.set31Bits(frame, 9, increment);
socketWrapper.write(true, frame, 0, frame.length);
// Change stream Id and re-use
ByteUtil.set31Bits(frame, 5, stream.getIdentifier().intValue());
try {
socketWrapper.write(true, frame, 0, frame.length);
socketWrapper.flush(true);
} catch (IOException ioe) {
if (applicationInitiated) {
handleAppInitiatedIOException(ioe);
} else {
throw ioe;
}
}
}
}
private void processWrites() throws IOException {
synchronized (socketWrapper) {
if (socketWrapper.flush(false)) {
socketWrapper.registerWriteInterest();
return;
}
}
}
int reserveWindowSize(Stream stream, int reservation) throws IOException {
// Need to be holding the stream lock so releaseBacklog() can't notify
// this thread until after this thread enters wait()
int allocation = 0;
synchronized (stream) {
do {
synchronized (this) {
if (!stream.canWrite()) {
throw new IOException(sm.getString("upgradeHandler.stream.notWritable",
stream.getConnectionId(), stream.getIdentifier()));
}
long windowSize = getWindowSize();
if (windowSize < 1 || backLogSize > 0) {
// Has this stream been granted an allocation
int[] value = backLogStreams.get(stream);
if (value == null) {
value = new int[] { reservation, 0 };
backLogStreams.put(stream, value);
backLogSize += reservation;
// Add the parents as well
AbstractStream parent = stream.getParentStream();
while (parent != null && backLogStreams.putIfAbsent(parent, new int[2]) == null) {
parent = parent.getParentStream();
}
} else {
if (value[1] > 0) {
allocation = value[1];
decrementWindowSize(allocation);
if (value[0] == 0) {
// The reservation has been fully allocated
// so this stream can be removed from the
// backlog.
backLogStreams.remove(stream);
} else {
// This allocation has been used. Reset the
// allocation to zero. Leave the stream on
// the backlog as it still has more bytes to
// write.
value[1] = 0;
}
}
}
} else if (windowSize < reservation) {
allocation = (int) windowSize;
decrementWindowSize(allocation);
} else {
allocation = reservation;
decrementWindowSize(allocation);
}
}
if (allocation == 0) {
try {
stream.wait();
} catch (InterruptedException e) {
throw new IOException(sm.getString(
"upgradeHandler.windowSizeReservationInterrupted", connectionId,
stream.getIdentifier(), Integer.toString(reservation)), e);
}
}
} while (allocation == 0);
}
return allocation;
}
@SuppressWarnings("sync-override") // notifyAll() needs to be outside sync
// to avoid deadlock
@Override
protected void incrementWindowSize(int increment) throws Http2Exception {
Set streamsToNotify = null;
synchronized (this) {
long windowSize = getWindowSize();
if (windowSize < 1 && windowSize + increment > 0) {
streamsToNotify = releaseBackLog((int) (windowSize +increment));
}
super.incrementWindowSize(increment);
}
if (streamsToNotify != null) {
for (AbstractStream stream : streamsToNotify) {
synchronized (stream) {
stream.notifyAll();
}
}
}
}
private synchronized Set releaseBackLog(int increment) {
Set result = new HashSet<>();
if (backLogSize < increment) {
// Can clear the whole backlog
result.addAll(backLogStreams.keySet());
backLogStreams.clear();
backLogSize = 0;
} else {
int leftToAllocate = increment;
while (leftToAllocate > 0) {
leftToAllocate = allocate(this, leftToAllocate);
}
for (Entry entry : backLogStreams.entrySet()) {
int allocation = entry.getValue()[1];
if (allocation > 0) {
backLogSize -= allocation;
result.add(entry.getKey());
}
}
}
return result;
}
@Override
protected synchronized void doNotifyAll() {
this.notifyAll();
}
private int allocate(AbstractStream stream, int allocation) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(),
stream.getIdentifier(), Integer.toString(allocation)));
}
// Allocate to the specified stream
int[] value = backLogStreams.get(stream);
if (value[0] >= allocation) {
value[0] -= allocation;
value[1] += allocation;
return 0;
}
// There was some left over so allocate that to the children of the
// stream.
int leftToAllocate = allocation;
value[1] = value[0];
value[0] = 0;
leftToAllocate -= value[1];
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.allocate.left",
getConnectionId(), stream.getIdentifier(), Integer.toString(leftToAllocate)));
}
// Recipients are children of the current stream that are in the
// backlog.
Set recipients = new HashSet<>();
recipients.addAll(stream.getChildStreams());
recipients.retainAll(backLogStreams.keySet());
// Loop until we run out of allocation or recipients
while (leftToAllocate > 0) {
if (recipients.size() == 0) {
backLogStreams.remove(stream);
return leftToAllocate;
}
int totalWeight = 0;
for (AbstractStream recipient : recipients) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.allocate.recipient",
getConnectionId(), stream.getIdentifier(), recipient.getIdentifier(),
Integer.toString(recipient.getWeight())));
}
totalWeight += recipient.getWeight();
}
// Use an Iterator so fully allocated children/recipients can be
// removed.
Iterator iter = recipients.iterator();
int allocated = 0;
while (iter.hasNext()) {
AbstractStream recipient = iter.next();
int share = leftToAllocate * recipient.getWeight() / totalWeight;
if (share == 0) {
// This is to avoid rounding issues triggering an infinite
// loop. It will cause a very slight over allocation but
// HTTP/2 should cope with that.
share = 1;
}
int remainder = allocate(recipient, share);
// Remove recipients that receive their full allocation so that
// they are excluded from the next allocation round.
if (remainder > 0) {
iter.remove();
}
allocated += (share - remainder);
}
leftToAllocate -= allocated;
}
return 0;
}
private Stream getStream(int streamId, boolean unknownIsError) throws ConnectionException {
Integer key = Integer.valueOf(streamId);
Stream result = streams.get(key);
if (result == null && unknownIsError) {
// Stream has been closed and removed from the map
throw new ConnectionException(sm.getString("upgradeHandler.stream.closed", key),
Http2Error.PROTOCOL_ERROR);
}
return result;
}
private Stream createRemoteStream(int streamId) throws ConnectionException {
Integer key = Integer.valueOf(streamId);
if (streamId %2 != 1) {
throw new ConnectionException(
sm.getString("upgradeHandler.stream.even", key), Http2Error.PROTOCOL_ERROR);
}
if (streamId <= maxRemoteStreamId) {
throw new ConnectionException(sm.getString("upgradeHandler.stream.old", key,
Integer.valueOf(maxRemoteStreamId)), Http2Error.PROTOCOL_ERROR);
}
pruneClosedStreams();
Stream result = new Stream(key, this);
streams.put(key, result);
maxRemoteStreamId = streamId;
return result;
}
private Stream createLocalStream(Request request) {
int streamId = nextLocalStreamId.getAndAdd(2);
Integer key = Integer.valueOf(streamId);
Stream result = new Stream(key, this, request);
streams.put(key, result);
maxRemoteStreamId = streamId;
return result;
}
private void close() {
connectionState.set(ConnectionState.CLOSED);
try {
socketWrapper.close();
} catch (IOException ioe) {
log.debug(sm.getString("upgradeHandler.socketCloseFailed"), ioe);
}
}
private void pruneClosedStreams() {
// Only prune every 10 new streams
if (newStreamsSinceLastPrune < 9) {
// Not atomic. Increments may be lost. Not a problem.
newStreamsSinceLastPrune++;
return;
}
// Reset counter
newStreamsSinceLastPrune = 0;
// RFC 7540, 5.3.4 endpoints should maintain state for at least the
// maximum number of concurrent streams
long max = localSettings.getMaxConcurrentStreams();
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.pruneStart", connectionId,
Long.toString(max), Integer.toString(streams.size())));
}
// Allow an additional 10% for closed streams that are used in the
// priority tree
max = max + max / 10;
if (max > Integer.MAX_VALUE) {
max = Integer.MAX_VALUE;
}
int toClose = streams.size() - (int) max;
if (toClose < 1) {
return;
}
// Need to try and close some streams.
// Use this Set to keep track of streams that might be part of the
// priority tree. Only remove these if we absolutely have to.
TreeSet additionalCandidates = new TreeSet<>();
Iterator> entryIter = streams.entrySet().iterator();
while (entryIter.hasNext() && toClose > 0) {
Entry entry = entryIter.next();
Stream stream = entry.getValue();
// Never remove active streams or streams with children
if (stream.isActive() || stream.getChildStreams().size() > 0) {
continue;
}
if (stream.isClosedFinal()) {
// This stream went from IDLE to CLOSED and is likely to have
// been created by the client as part of the priority tree. Keep
// it if possible.
additionalCandidates.add(entry.getKey());
} else {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.pruned", connectionId, entry.getKey()));
}
entryIter.remove();
toClose--;
}
}
while (toClose > 0 && additionalCandidates.size() > 0) {
Integer pruned = additionalCandidates.pollLast();
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.prunedPriority", connectionId, pruned));
}
toClose++;
}
if (toClose > 0) {
log.warn(sm.getString("upgradeHandler.pruneIncomplete", connectionId,
Integer.toString(toClose)));
}
}
void push(Request request, Stream associatedStream) throws IOException {
Stream pushStream = createLocalStream(request);
// TODO: Is 1k the optimal value?
writePushHeaders(associatedStream, pushStream.getIdentifier().intValue(), request, 1024);
pushStream.sentPushPromise();
// Process this stream on a container thread
StreamProcessor streamProcessor = new StreamProcessor(pushStream, adapter, socketWrapper);
streamProcessor.setSslSupport(sslSupport);
socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
}
String getProperty(String key) {
return socketWrapper.getEndpoint().getProperty(key);
}
@Override
protected final String getConnectionId() {
return connectionId;
}
@Override
protected final int getWeight() {
return 0;
}
// ------------------------------------------- Configuration getters/setters
public long getReadTimeout() {
return readTimeout;
}
public void setReadTimeout(long readTimeout) {
this.readTimeout = readTimeout;
}
public long getKeepAliveTimeout() {
return keepAliveTimeout;
}
public void setKeepAliveTimeout(long keepAliveTimeout) {
this.keepAliveTimeout = keepAliveTimeout;
}
public long getWriteTimeout() {
return writeTimeout;
}
public void setWriteTimeout(long writeTimeout) {
this.writeTimeout = writeTimeout;
}
public void setMaxConcurrentStreams(long maxConcurrentStreams) {
localSettings.set(Setting.MAX_CONCURRENT_STREAMS, maxConcurrentStreams);
}
public void setInitialWindowSize(int initialWindowSize) {
localSettings.set(Setting.INITIAL_WINDOW_SIZE, initialWindowSize);
}
// ----------------------------------------------- Http2Parser.Input methods
@Override
public boolean fill(boolean block, byte[] data, int offset, int length) throws IOException {
int len = length;
int pos = offset;
boolean nextReadBlock = block;
int thisRead = 0;
while (len > 0) {
thisRead = socketWrapper.read(nextReadBlock, data, pos, len);
if (thisRead == 0) {
if (nextReadBlock) {
// Should never happen
throw new IllegalStateException();
} else {
return false;
}
} else if (thisRead == -1) {
if (connectionState.get().isNewStreamAllowed()) {
throw new EOFException();
} else {
return false;
}
} else {
pos += thisRead;
len -= thisRead;
nextReadBlock = true;
}
}
return true;
}
@Override
public int getMaxFrameSize() {
return localSettings.getMaxFrameSize();
}
// ---------------------------------------------- Http2Parser.Output methods
@Override
public HpackDecoder getHpackDecoder() {
if (hpackDecoder == null) {
hpackDecoder = new HpackDecoder(remoteSettings.getHeaderTableSize());
}
return hpackDecoder;
}
@Override
public ByteBuffer startRequestBodyFrame(int streamId, int payloadSize) throws Http2Exception {
Stream stream = getStream(streamId, true);
stream.checkState(FrameType.DATA);
return stream.getInputByteBuffer();
}
@Override
public void endRequestBodyFrame(int streamId) throws Http2Exception {
Stream stream = getStream(streamId, true);
stream.getInputBuffer().onDataAvailable();
}
@Override
public void receiveEndOfStream(int streamId) throws ConnectionException {
Stream stream = getStream(streamId, connectionState.get().isNewStreamAllowed());
if (stream != null) {
stream.receivedEndOfStream();
if (!stream.isActive()) {
activeRemoteStreamCount.decrementAndGet();
}
}
}
@Override
public void swallowedPadding(int streamId, int paddingLength) throws
ConnectionException, IOException {
Stream stream = getStream(streamId, true);
// +1 is for the payload byte used to define the padding length
writeWindowUpdate(stream, paddingLength + 1, false);
}
@Override
public HeaderEmitter headersStart(int streamId) throws Http2Exception {
if (connectionState.get().isNewStreamAllowed()) {
Stream stream = getStream(streamId, false);
if (stream == null) {
stream = createRemoteStream(streamId);
}
stream.checkState(FrameType.HEADERS);
stream.receivedStartOfHeaders();
closeIdleStreams(streamId);
if (localSettings.getMaxConcurrentStreams() < activeRemoteStreamCount.incrementAndGet()) {
activeRemoteStreamCount.decrementAndGet();
throw new StreamException(sm.getString("upgradeHandler.tooManyRemoteStreams",
Long.toString(localSettings.getMaxConcurrentStreams())),
Http2Error.REFUSED_STREAM, streamId);
}
return stream;
} else {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.noNewStreams",
connectionId, Integer.toString(streamId)));
}
// Stateless so a static can be used to save on GC
return HEADER_SINK;
}
}
private void closeIdleStreams(int newMaxActiveRemoteStreamId) throws Http2Exception {
for (int i = maxActiveRemoteStreamId + 2; i < newMaxActiveRemoteStreamId; i += 2) {
Stream stream = getStream(i, false);
if (stream != null) {
stream.closeIfIdle();
}
}
maxActiveRemoteStreamId = newMaxActiveRemoteStreamId;
}
@Override
public void reprioritise(int streamId, int parentStreamId,
boolean exclusive, int weight) throws Http2Exception {
Stream stream = getStream(streamId, false);
if (stream == null) {
stream = createRemoteStream(streamId);
}
stream.checkState(FrameType.PRIORITY);
AbstractStream parentStream = getStream(parentStreamId, false);
if (parentStream == null) {
parentStream = this;
}
stream.rePrioritise(parentStream, exclusive, weight);
}
@Override
public void headersEnd(int streamId) throws ConnectionException {
setMaxProcessedStream(streamId);
Stream stream = getStream(streamId, connectionState.get().isNewStreamAllowed());
if (stream != null) {
// Process this stream on a container thread
StreamProcessor streamProcessor = new StreamProcessor(stream, adapter, socketWrapper);
streamProcessor.setSslSupport(sslSupport);
socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
}
}
private void setMaxProcessedStream(int streamId) {
if (maxProcessedStreamId < streamId) {
maxProcessedStreamId = streamId;
}
}
@Override
public void reset(int streamId, long errorCode) throws Http2Exception {
Stream stream = getStream(streamId, true);
stream.checkState(FrameType.RST);
stream.reset(errorCode);
}
@Override
public void setting(Setting setting, long value) throws ConnectionException {
// Special handling required
if (setting == Setting.INITIAL_WINDOW_SIZE) {
long oldValue = remoteSettings.getInitialWindowSize();
// Do this first in case new value is invalid
remoteSettings.set(setting, value);
int diff = (int) (value - oldValue);
for (Stream stream : streams.values()) {
try {
stream.incrementWindowSize(diff);
} catch (Http2Exception h2e) {
try {
resetStream(new StreamException(sm.getString(
"upgradeHandler.windowSizeTooBig", connectionId,
stream.getIdentifier()),
h2e.getError(), stream.getIdentifier().intValue()));
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.socketCloseFailed"), ioe);
}
}
}
}
} else {
remoteSettings.set(setting, value);
}
}
@Override
public void settingsEnd(boolean ack) throws IOException {
if (ack) {
if (!localSettings.ack()) {
// Ack was unexpected
log.warn(sm.getString(
"upgradeHandler.unexpectedAck", connectionId, getIdentifier()));
}
} else {
synchronized (socketWrapper) {
socketWrapper.write(true, SETTINGS_ACK, 0, SETTINGS_ACK.length);
socketWrapper.flush(true);
}
}
}
@Override
public void pingReceive(byte[] payload, boolean ack) throws IOException {
pingManager.receivePing(payload, ack);
}
@Override
public void goaway(int lastStreamId, long errorCode, String debugData) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.goaway.debug", connectionId,
Integer.toString(lastStreamId), Long.toHexString(errorCode), debugData));
}
}
@Override
public void incrementWindowSize(int streamId, int increment) throws Http2Exception {
if (streamId == 0) {
incrementWindowSize(increment);
} else {
Stream stream = getStream(streamId, true);
stream.checkState(FrameType.WINDOW_UPDATE);
stream.incrementWindowSize(increment);
}
}
@Override
public void swallowed(int streamId, FrameType frameType, int flags, int size)
throws IOException {
// NO-OP.
}
private class PingManager {
// 10 seconds
private final long pingIntervalNano = 10000000000L;
private int sequence = 0;
private long lastPingNanoTime = Long.MIN_VALUE;
private Queue inflightPings = new ConcurrentLinkedQueue<>();
private Queue roundTripTimes = new ConcurrentLinkedQueue<>();
/**
* Check to see if a ping was sent recently and, if not, send one.
*
* @param force Send a ping, even if one was sent recently
*
* @throws IOException If an I/O issue prevents the ping from being sent
*/
public void sendPing(boolean force) throws IOException {
long now = System.nanoTime();
if (force || now - lastPingNanoTime > pingIntervalNano) {
lastPingNanoTime = now;
byte[] payload = new byte[8];
synchronized (socketWrapper) {
int sentSequence = ++sequence;
PingRecord pingRecord = new PingRecord(sentSequence, now);
inflightPings.add(pingRecord);
ByteUtil.set31Bits(payload, 4, sentSequence);
socketWrapper.write(true, PING, 0, PING.length);
socketWrapper.write(true, payload, 0, payload.length);
socketWrapper.flush(true);
}
}
}
public void receivePing(byte[] payload, boolean ack) throws IOException {
if (ack) {
// Extract the sequence from the payload
int receivedSequence = ByteUtil.get31Bits(payload, 4);
PingRecord pingRecord = inflightPings.poll();
while (pingRecord != null && pingRecord.getSequence() < receivedSequence) {
pingRecord = inflightPings.poll();
}
if (pingRecord == null) {
// Unexpected ACK. Log it.
} else {
long roundTripTime = System.nanoTime() - pingRecord.getSentNanoTime();
roundTripTimes.add(Long.valueOf(roundTripTime));
while (roundTripTimes.size() > 3) {
roundTripTimes.poll();
}
if (log.isDebugEnabled()) {
log.debug(sm.getString("pingManager.roundTripTime",
connectionId, Long.valueOf(roundTripTime)));
}
}
} else {
// Client originated ping. Echo it back.
synchronized (socketWrapper) {
socketWrapper.write(true, PING_ACK, 0, PING_ACK.length);
socketWrapper.write(true, payload, 0, payload.length);
socketWrapper.flush(true);
}
}
}
public long getRoundTripTimeNano() {
return (long) roundTripTimes.stream().mapToLong(x -> x.longValue()).average().orElse(0);
}
}
private static class PingRecord {
private final int sequence;
private final long sentNanoTime;
public PingRecord(int sequence, long sentNanoTime) {
this.sequence = sequence;
this.sentNanoTime = sentNanoTime;
}
public int getSequence() {
return sequence;
}
public long getSentNanoTime() {
return sentNanoTime;
}
}
private enum ConnectionState {
NEW(true),
CONNECTED(true),
PAUSING(true),
PAUSED(false),
CLOSED(false);
private final boolean newStreamsAllowed;
private ConnectionState(boolean newStreamsAllowed) {
this.newStreamsAllowed = newStreamsAllowed;
}
public boolean isNewStreamAllowed() {
return newStreamsAllowed;
}
}
}