/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* .
*
*/
package org.apache.http.impl.nio;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.http.ConnectionClosedException;
import org.apache.http.Header;
import org.apache.http.HttpConnectionMetrics;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpInetConnection;
import org.apache.http.HttpMessage;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.annotation.NotThreadSafe;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.entity.ContentLengthStrategy;
import org.apache.http.impl.HttpConnectionMetricsImpl;
import org.apache.http.impl.entity.LaxContentLengthStrategy;
import org.apache.http.impl.entity.StrictContentLengthStrategy;
import org.apache.http.impl.io.HttpTransportMetricsImpl;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.impl.nio.codecs.ChunkDecoder;
import org.apache.http.impl.nio.codecs.ChunkEncoder;
import org.apache.http.impl.nio.codecs.IdentityDecoder;
import org.apache.http.impl.nio.codecs.IdentityEncoder;
import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
import org.apache.http.impl.nio.codecs.LengthDelimitedEncoder;
import org.apache.http.impl.nio.reactor.SessionInputBufferImpl;
import org.apache.http.impl.nio.reactor.SessionOutputBufferImpl;
import org.apache.http.io.HttpTransportMetrics;
import org.apache.http.nio.reactor.EventMask;
import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.reactor.SessionBufferStatus;
import org.apache.http.nio.reactor.SessionInputBuffer;
import org.apache.http.nio.reactor.SessionOutputBuffer;
import org.apache.http.nio.reactor.SocketAccessor;
import org.apache.http.nio.util.ByteBufferAllocator;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
/**
* This class serves as a base for all {@link NHttpConnection} implementations
* and implements functionality common to both client and server
* HTTP connections.
*
* The following parameters can be used to customize the behavior of this
* class:
*
* - {@link org.apache.http.params.CoreProtocolPNames#HTTP_ELEMENT_CHARSET}
* - {@link org.apache.http.params.CoreConnectionPNames#SOCKET_BUFFER_SIZE}
*
*
* @since 4.0
*/
@NotThreadSafe
public class NHttpConnectionBase
implements NHttpConnection, HttpInetConnection, SessionBufferStatus, SocketAccessor {
protected final ContentLengthStrategy incomingContentStrategy;
protected final ContentLengthStrategy outgoingContentStrategy;
protected final SessionInputBufferImpl inbuf;
protected final SessionOutputBufferImpl outbuf;
protected final HttpTransportMetricsImpl inTransportMetrics;
protected final HttpTransportMetricsImpl outTransportMetrics;
protected final HttpConnectionMetricsImpl connMetrics;
protected HttpContext context;
protected IOSession session;
protected SocketAddress remote;
protected volatile ContentDecoder contentDecoder;
protected volatile boolean hasBufferedInput;
protected volatile ContentEncoder contentEncoder;
protected volatile boolean hasBufferedOutput;
protected volatile HttpRequest request;
protected volatile HttpResponse response;
protected volatile int status;
/**
* Creates a new instance of this class given the underlying I/O session.
*
* @param session the underlying I/O session.
* @param allocator byte buffer allocator.
* @param params HTTP parameters.
*/
public NHttpConnectionBase(
final IOSession session,
final ByteBufferAllocator allocator,
final HttpParams params) {
super();
if (session == null) {
throw new IllegalArgumentException("I/O session may not be null");
}
if (params == null) {
throw new IllegalArgumentException("HTTP params may not be null");
}
int buffersize = HttpConnectionParams.getSocketBufferSize(params);
if (buffersize <= 0) {
buffersize = 4096;
}
int linebuffersize = buffersize;
if (linebuffersize > 512) {
linebuffersize = 512;
}
this.inbuf = new SessionInputBufferImpl(buffersize, linebuffersize, allocator, params);
this.outbuf = new SessionOutputBufferImpl(buffersize, linebuffersize, allocator, params);
this.incomingContentStrategy = createIncomingContentStrategy();
this.outgoingContentStrategy = createOutgoingContentStrategy();
this.inTransportMetrics = createTransportMetrics();
this.outTransportMetrics = createTransportMetrics();
this.connMetrics = createConnectionMetrics(
this.inTransportMetrics,
this.outTransportMetrics);
setSession(session);
this.status = ACTIVE;
}
private void setSession(final IOSession session) {
this.session = session;
this.context = new SessionHttpContext(this.session);
this.session.setBufferStatus(this);
this.remote = this.session.getRemoteAddress();
}
/**
* Binds the connection to a different {@link IOSession}. This may be necessary
* when the underlying I/O session gets upgraded with SSL/TLS encryption.
*
* @since 4.2
*/
protected void bind(final IOSession session) {
if (session == null) {
throw new IllegalArgumentException("I/O session may not be null");
}
this.session.setBufferStatus(null);
setSession(session);
}
/**
* @since 4.2
*/
protected ContentLengthStrategy createIncomingContentStrategy() {
return new LaxContentLengthStrategy();
}
/**
* @since 4.2
*/
protected ContentLengthStrategy createOutgoingContentStrategy() {
return new StrictContentLengthStrategy();
}
/**
* @since 4.1
*/
protected HttpTransportMetricsImpl createTransportMetrics() {
return new HttpTransportMetricsImpl();
}
/**
* @since 4.1
*/
protected HttpConnectionMetricsImpl createConnectionMetrics(
final HttpTransportMetrics inTransportMetric,
final HttpTransportMetrics outTransportMetric) {
return new HttpConnectionMetricsImpl(inTransportMetric, outTransportMetric);
}
public int getStatus() {
return this.status;
}
public HttpContext getContext() {
return this.context;
}
public HttpRequest getHttpRequest() {
return this.request;
}
public HttpResponse getHttpResponse() {
return this.response;
}
public void requestInput() {
this.session.setEvent(EventMask.READ);
}
public void requestOutput() {
this.session.setEvent(EventMask.WRITE);
}
public void suspendInput() {
this.session.clearEvent(EventMask.READ);
}
public void suspendOutput() {
this.session.clearEvent(EventMask.WRITE);
}
/**
* Initializes a specific {@link ContentDecoder} implementation based on the
* properties of the given {@link HttpMessage} and generates an instance of
* {@link HttpEntity} matching the properties of the content decoder.
*
* @param message the HTTP message.
* @return HTTP entity.
* @throws HttpException in case of an HTTP protocol violation.
*/
protected HttpEntity prepareDecoder(final HttpMessage message) throws HttpException {
BasicHttpEntity entity = new BasicHttpEntity();
long len = this.incomingContentStrategy.determineLength(message);
this.contentDecoder = createContentDecoder(
len,
this.session.channel(),
this.inbuf,
this.inTransportMetrics);
if (len == ContentLengthStrategy.CHUNKED) {
entity.setChunked(true);
entity.setContentLength(-1);
} else if (len == ContentLengthStrategy.IDENTITY) {
entity.setChunked(false);
entity.setContentLength(-1);
} else {
entity.setChunked(false);
entity.setContentLength(len);
}
Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
if (contentTypeHeader != null) {
entity.setContentType(contentTypeHeader);
}
Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
if (contentEncodingHeader != null) {
entity.setContentEncoding(contentEncodingHeader);
}
return entity;
}
/**
* Factory method for {@link ContentDecoder} instances.
*
* @param len content length, if known, {@link ContentLengthStrategy#CHUNKED} or
* {@link ContentLengthStrategy#IDENTITY}, if unknown.
* @param channel the session channel.
* @param buffer the session buffer.
* @param metrics transport metrics.
*
* @return content decoder.
*
* @since 4.1
*/
protected ContentDecoder createContentDecoder(
final long len,
final ReadableByteChannel channel,
final SessionInputBuffer buffer,
final HttpTransportMetricsImpl metrics) {
if (len == ContentLengthStrategy.CHUNKED) {
return new ChunkDecoder(channel, buffer, metrics);
} else if (len == ContentLengthStrategy.IDENTITY) {
return new IdentityDecoder(channel, buffer, metrics);
} else {
return new LengthDelimitedDecoder(channel, buffer, metrics, len);
}
}
/**
* Initializes a specific {@link ContentEncoder} implementation based on the
* properties of the given {@link HttpMessage}.
*
* @param message the HTTP message.
* @throws HttpException in case of an HTTP protocol violation.
*/
protected void prepareEncoder(final HttpMessage message) throws HttpException {
long len = this.outgoingContentStrategy.determineLength(message);
this.contentEncoder = createContentEncoder(
len,
this.session.channel(),
this.outbuf,
this.outTransportMetrics);
}
/**
* Factory method for {@link ContentEncoder} instances.
*
* @param len content length, if known, {@link ContentLengthStrategy#CHUNKED} or
* {@link ContentLengthStrategy#IDENTITY}, if unknown.
* @param channel the session channel.
* @param buffer the session buffer.
* @param metrics transport metrics.
*
* @return content encoder.
*
* @since 4.1
*/
protected ContentEncoder createContentEncoder(
final long len,
final WritableByteChannel channel,
final SessionOutputBuffer buffer,
final HttpTransportMetricsImpl metrics) {
if (len == ContentLengthStrategy.CHUNKED) {
return new ChunkEncoder(channel, buffer, metrics);
} else if (len == ContentLengthStrategy.IDENTITY) {
return new IdentityEncoder(channel, buffer, metrics);
} else {
return new LengthDelimitedEncoder(channel, buffer, metrics, len);
}
}
public boolean hasBufferedInput() {
return this.hasBufferedInput;
}
public boolean hasBufferedOutput() {
return this.hasBufferedOutput;
}
/**
* Assets if the connection is still open.
*
* @throws ConnectionClosedException in case the connection has already
* been closed.
*/
protected void assertNotClosed() throws ConnectionClosedException {
if (this.status != ACTIVE) {
throw new ConnectionClosedException("Connection is closed");
}
}
public void close() throws IOException {
if (this.status != ACTIVE) {
return;
}
this.status = CLOSING;
if (this.outbuf.hasData()) {
this.session.setEvent(EventMask.WRITE);
} else {
this.session.close();
this.status = CLOSED;
}
}
public boolean isOpen() {
return this.status == ACTIVE && !this.session.isClosed();
}
public boolean isStale() {
return this.session.isClosed();
}
public InetAddress getLocalAddress() {
SocketAddress address = this.session.getLocalAddress();
if (address instanceof InetSocketAddress) {
return ((InetSocketAddress) address).getAddress();
} else {
return null;
}
}
public int getLocalPort() {
SocketAddress address = this.session.getLocalAddress();
if (address instanceof InetSocketAddress) {
return ((InetSocketAddress) address).getPort();
} else {
return -1;
}
}
public InetAddress getRemoteAddress() {
SocketAddress address = this.session.getRemoteAddress();
if (address instanceof InetSocketAddress) {
return ((InetSocketAddress) address).getAddress();
} else {
return null;
}
}
public int getRemotePort() {
SocketAddress address = this.session.getRemoteAddress();
if (address instanceof InetSocketAddress) {
return ((InetSocketAddress) address).getPort();
} else {
return -1;
}
}
public void setSocketTimeout(int timeout) {
this.session.setSocketTimeout(timeout);
}
public int getSocketTimeout() {
return this.session.getSocketTimeout();
}
public void shutdown() throws IOException {
this.status = CLOSED;
this.session.shutdown();
}
public HttpConnectionMetrics getMetrics() {
return this.connMetrics;
}
private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
if (socketAddress instanceof InetSocketAddress) {
InetSocketAddress addr = ((InetSocketAddress) socketAddress);
buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
addr.getAddress())
.append(':')
.append(addr.getPort());
} else {
buffer.append(socketAddress);
}
}
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
SocketAddress remoteAddress = this.session.getRemoteAddress();
SocketAddress localAddress = this.session.getLocalAddress();
if (remoteAddress != null && localAddress != null) {
formatAddress(buffer, localAddress);
buffer.append("<->");
formatAddress(buffer, remoteAddress);
}
buffer.append("[");
switch (this.status) {
case ACTIVE:
buffer.append("ACTIVE");
break;
case CLOSING:
buffer.append("CLOSING");
break;
case CLOSED:
buffer.append("CLOSED");
break;
}
buffer.append("]");
return buffer.toString();
}
public Socket getSocket() {
if (this.session instanceof SocketAccessor) {
return ((SocketAccessor) this.session).getSocket();
} else {
return null;
}
}
}