/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* .
*
*/
package org.apache.hc.core5.http.impl.nio;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpConnectionMetrics;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpMessage;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
import org.apache.hc.core5.http.impl.ConnSupport;
import org.apache.hc.core5.http.impl.ConnectionListener;
import org.apache.hc.core5.http.nio.ContentDecoder;
import org.apache.hc.core5.http.nio.ContentEncoder;
import org.apache.hc.core5.http.nio.NHttpMessageParser;
import org.apache.hc.core5.http.nio.NHttpMessageWriter;
import org.apache.hc.core5.http.nio.ResourceHolder;
import org.apache.hc.core5.http.nio.SessionInputBuffer;
import org.apache.hc.core5.http.nio.SessionOutputBuffer;
import org.apache.hc.core5.http.nio.command.ExecutionCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.nio.command.ShutdownType;
import org.apache.hc.core5.net.InetAddressUtils;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.EventMask;
import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ssl.SSLBufferManagement;
import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
import org.apache.hc.core5.util.Args;
abstract class AbstractHttp1StreamDuplexer
implements ResourceHolder, UpgradeableHttpConnection {
private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
private final IOSession ioSession;
private final SessionInputBufferImpl inbuf;
private final SessionOutputBufferImpl outbuf;
private final BasicHttpTransportMetrics inTransportMetrics;
private final BasicHttpTransportMetrics outTransportMetrics;
private final BasicHttpConnectionMetrics connMetrics;
private final NHttpMessageParser incomingMessageParser;
private final NHttpMessageWriter outgoingMessageWriter;
private final ConnectionListener connectionListener;
private final Lock outputLock;
private final AtomicInteger outputRequests;
private volatile Message incomingMessage;
private volatile Message outgoingMessage;
private volatile ConnectionState connState = ConnectionState.READY;
private volatile ProtocolVersion version;
AbstractHttp1StreamDuplexer(
final IOSession ioSession,
final H1Config h1Config,
final ConnectionConfig connectionConfig,
final NHttpMessageParser incomingMessageParser,
final NHttpMessageWriter outgoingMessageWriter,
final ConnectionListener connectionListener) {
this.ioSession = Args.notNull(ioSession, "I/O session");
final int bufferSize = connectionConfig.getBufferSize();
this.inbuf = new SessionInputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
(h1Config != null ? h1Config : H1Config.DEFAULT).getMaxLineLength(),
ConnSupport.createDecoder(connectionConfig));
this.outbuf = new SessionOutputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
ConnSupport.createEncoder(connectionConfig));
this.inTransportMetrics = new BasicHttpTransportMetrics();
this.outTransportMetrics = new BasicHttpTransportMetrics();
this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
this.incomingMessageParser = incomingMessageParser;
this.outgoingMessageWriter = outgoingMessageWriter;
this.connectionListener = connectionListener;
this.outputLock = new ReentrantLock();
this.outputRequests = new AtomicInteger(0);
this.connState = ConnectionState.READY;
}
void doTerminate(final Exception exception) {
connState = ConnectionState.SHUTDOWN;
try {
terminate(exception);
} finally {
ioSession.close();
}
}
abstract void terminate(final Exception exception);
abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
abstract void consumeHeader(IncomingMessage messageHead, boolean endStream) throws HttpException, IOException;
abstract ContentDecoder handleIncomingMessage(
IncomingMessage incomingMessage,
ReadableByteChannel channel,
SessionInputBuffer buffer,
BasicHttpTransportMetrics metrics) throws HttpException;
abstract ContentEncoder handleOutgoingMessage(
OutgoingMessage outgoingMessage,
WritableByteChannel channel,
SessionOutputBuffer buffer,
BasicHttpTransportMetrics metrics) throws HttpException;
abstract int consumeData(ContentDecoder contentDecoder) throws HttpException, IOException;
abstract boolean isOutputReady();
abstract void produceOutput() throws HttpException, IOException;
abstract void execute(ExecutionCommand executionCommand) throws HttpException, IOException;
abstract void inputEnd() throws HttpException, IOException;
abstract void outputEnd() throws HttpException, IOException;
abstract boolean inputIdle();
abstract boolean outputIdle();
abstract boolean handleTimeout();
private void processCommands() throws HttpException, IOException {
for (;;) {
final Command command = ioSession.getCommand();
if (command == null) {
return;
}
if (command instanceof ShutdownCommand) {
final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
requestShutdown(shutdownCommand.getType());
} else if (command instanceof ExecutionCommand) {
if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
command.cancel();
} else {
execute((ExecutionCommand) command);
return;
}
} else {
throw new HttpException("Unexpected command: " + command.getClass());
}
}
}
public final void onConnect() throws HttpException, IOException {
if (connectionListener != null) {
connectionListener.onConnect(this);
}
connState = ConnectionState.ACTIVE;
processCommands();
}
public final void onInput() throws HttpException, IOException {
while (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
int totalBytesRead = 0;
int messagesReceived = 0;
if (incomingMessage == null) {
if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle()) {
ioSession.clearEvent(SelectionKey.OP_READ);
return;
}
int bytesRead;
do {
bytesRead = inbuf.fill(ioSession.channel());
if (bytesRead > 0) {
totalBytesRead += bytesRead;
inTransportMetrics.incrementBytesTransferred(bytesRead);
}
final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, bytesRead == -1);
if (messageHead != null) {
messagesReceived++;
incomingMessageParser.reset();
this.version = messageHead.getVersion();
updateInputMetrics(messageHead, connMetrics);
final ContentDecoder contentDecoder = handleIncomingMessage(messageHead, ioSession.channel(), inbuf, inTransportMetrics);
consumeHeader(messageHead, contentDecoder == null);
if (contentDecoder != null) {
incomingMessage = new Message<>(messageHead, contentDecoder);
break;
} else {
inputEnd();
ioSession.setEvent(SelectionKey.OP_READ);
}
}
} while (bytesRead > 0);
if (bytesRead == -1 && !inbuf.hasData()) {
if (incomingMessage == null && outgoingMessage == null) {
requestShutdown(ShutdownType.IMMEDIATE);
} else {
doTerminate(new ConnectionClosedException("Connection closed by peer"));
}
return;
}
}
if (incomingMessage != null) {
final ContentDecoder contentDecoder = incomingMessage.getBody();
final int bytesRead = consumeData(contentDecoder);
if (bytesRead > 0) {
totalBytesRead += bytesRead;
}
if (contentDecoder.isCompleted()) {
incomingMessage = null;
inputEnd();
ioSession.setEvent(SelectionKey.OP_READ);
}
}
if (totalBytesRead == 0 && messagesReceived == 0) {
break;
}
}
}
public final void onOutput() throws IOException, HttpException {
outputLock.lock();
try {
if (outbuf.hasData()) {
final int bytesWritten = outbuf.flush(ioSession.channel());
if (bytesWritten > 0) {
outTransportMetrics.incrementBytesTransferred(bytesWritten);
}
}
} finally {
outputLock.unlock();
}
if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) < 0) {
if (isOutputReady()) {
produceOutput();
} else {
final int pendingOutputRequests = outputRequests.get();
final boolean outputPending;
outputLock.lock();
try {
outputPending = outbuf.hasData();
} finally {
outputLock.unlock();
}
if (!outputPending && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
ioSession.clearEvent(SelectionKey.OP_WRITE);
} else {
outputRequests.addAndGet(-pendingOutputRequests);
}
}
outputLock.lock();
final boolean outputEnd;
try {
outputEnd = outgoingMessage == null && !outbuf.hasData();
} finally {
outputLock.unlock();
}
if (outputEnd) {
outputEnd();
if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
processCommands();
} else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
connState = ConnectionState.SHUTDOWN;
}
}
}
if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
ioSession.close();
cancelPendingCommands();
releaseResources();
}
}
public final void onTimeout() throws IOException, HttpException {
if (!handleTimeout()) {
doTerminate(new SocketTimeoutException());
}
}
public final void onException(final Exception ex) {
doTerminate(ex);
if (connectionListener != null) {
connectionListener.onError(this, ex);
}
}
public final void onDisconnect() {
cancelPendingCommands();
releaseResources();
if (connectionListener != null) {
connectionListener.onDisconnect(this);
}
}
private void cancelPendingCommands() {
for (;;) {
final Command command = ioSession.getCommand();
if (command != null) {
command.cancel();
} else {
break;
}
}
}
void requestShutdown(final ShutdownType shutdownType) {
switch (shutdownType) {
case GRACEFUL:
if (connState == ConnectionState.ACTIVE) {
connState = ConnectionState.GRACEFUL_SHUTDOWN;
}
break;
case IMMEDIATE:
connState = ConnectionState.SHUTDOWN;
break;
}
ioSession.setEvent(SelectionKey.OP_WRITE);
}
void commitMessageHead(final OutgoingMessage messageHead, final boolean endStream) throws HttpException, IOException {
outputLock.lock();
try {
outgoingMessageWriter.write(messageHead, outbuf);
updateOutputMetrics(messageHead, connMetrics);
if (!endStream) {
final ContentEncoder contentEncoder = handleOutgoingMessage(messageHead, ioSession.channel(), outbuf, outTransportMetrics);
if (contentEncoder != null) {
outgoingMessage = new Message<>(messageHead, contentEncoder);
}
}
outgoingMessageWriter.reset();
ioSession.setEvent(EventMask.WRITE);
} finally {
outputLock.unlock();
}
}
void requestSessionInput() {
ioSession.setEvent(SelectionKey.OP_READ);
}
void suspendSessionInput() {
ioSession.clearEvent(SelectionKey.OP_READ);
}
void requestSessionOutput() {
outputRequests.incrementAndGet();
ioSession.setEvent(SelectionKey.OP_WRITE);
}
void suspendSessionOutput() {
ioSession.clearEvent(SelectionKey.OP_WRITE);
}
int streamOutput(final ByteBuffer src) throws IOException {
outputLock.lock();
try {
if (outgoingMessage == null) {
throw new ClosedChannelException();
}
final ContentEncoder contentEncoder = outgoingMessage.getBody();
final int bytesWritten = contentEncoder.write(src);
if (bytesWritten > 0) {
ioSession.setEvent(SelectionKey.OP_WRITE);
}
return bytesWritten;
} finally {
outputLock.unlock();
}
}
enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
MessageDelineation endOutputStream(final List extends Header> trailers) throws IOException {
outputLock.lock();
try {
if (outgoingMessage == null) {
return MessageDelineation.NONE;
}
final ContentEncoder contentEncoder = outgoingMessage.getBody();
contentEncoder.complete(trailers);
ioSession.setEvent(SelectionKey.OP_WRITE);
outgoingMessage = null;
if (contentEncoder instanceof ChunkEncoder) {
return MessageDelineation.CHUNK_CODED;
} else {
return MessageDelineation.MESSAGE_HEAD;
}
} finally {
outputLock.unlock();
}
}
boolean isOutputCompleted() {
outputLock.lock();
try {
if (outgoingMessage == null) {
return true;
}
final ContentEncoder contentEncoder = outgoingMessage.getBody();
return contentEncoder.isCompleted();
} finally {
outputLock.unlock();
}
}
@Override
public void close() throws IOException {
ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
@Override
public void shutdown() throws IOException {
ioSession.addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
}
@Override
public boolean isOpen() {
return connState == ConnectionState.ACTIVE;
}
@Override
public void setSocketTimeout(final int timeout) {
ioSession.setSocketTimeout(timeout);
}
@Override
public HttpConnectionMetrics getMetrics() {
return connMetrics;
}
@Override
public int getSocketTimeout() {
return ioSession.getSocketTimeout();
}
@Override
public ProtocolVersion getProtocolVersion() {
return version;
}
@Override
public SocketAddress getRemoteAddress() {
return ioSession.getRemoteAddress();
}
@Override
public SocketAddress getLocalAddress() {
return ioSession.getLocalAddress();
}
@Override
public SSLSession getSSLSession() {
if (ioSession instanceof TransportSecurityLayer) {
return ((TransportSecurityLayer) ioSession).getSSLSession();
} else {
return null;
}
}
@Override
public void start(
final SSLContext sslContext,
final SSLBufferManagement sslBufferManagement,
final SSLSessionInitializer initializer,
final SSLSessionVerifier verifier) throws UnsupportedOperationException {
if (ioSession instanceof TransportSecurityLayer) {
((TransportSecurityLayer) ioSession).start(sslContext, sslBufferManagement, initializer, verifier);
} else {
throw new UnsupportedOperationException();
}
}
@Override
public void upgrade(final IOEventHandler eventHandler) {
ioSession.setHandler(eventHandler);
}
@Override
public String toString() {
final SocketAddress remoteAddress = ioSession.getRemoteAddress();
final SocketAddress localAddress = ioSession.getLocalAddress();
final StringBuilder buffer = new StringBuilder();
InetAddressUtils.formatAddress(buffer, localAddress);
buffer.append("->");
InetAddressUtils.formatAddress(buffer, remoteAddress);
return buffer.toString();
}
}