// $Id$
//
// Copyright 2007-2008 Cisco Systems Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy
// of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
namespace Etch.Util
{
///
/// Constants and methods related to tcp transport.
///
abstract public class TcpTransport : Connection, TransportData
{
///
/// Term on the uri which specifies the auto flush flag. The term string is
/// "TcpTransport.autoFlush". The value is "true" or "false". The default
/// is "false".
///
public const String AUTO_FLUSH = "TcpTransport.autoFlush";
///
/// Term on the uri which specifies the buffer size in bytes. The term
/// string is "TcpTransport.bufferSize". The value is an integer between
/// 0 and 65536. The default is 0.
///
public const String BUFFER_SIZE = "TcpTransport.bufferSize";
///
/// Term on the uri which specifies the keep alive flag. The term string is
/// "TcpTransport.keepAlive". The value is "true" or "false". The default is
/// "false".
///
public const String KEEP_ALIVE = "TcpTransport.keepAlive";
///
/// Term on the uri which specifies the linger time in seconds. The term
/// string is "TcpTransport.lingerTime". The value is an integer between -1
/// and 240. The default is 30. -1 means disable.
///
public const String LINGER_TIME = "TcpTransport.lingerTime";
///
/// Term on the uri which specifies the no delay flag. The term string is
/// "TcpTransport.noDelay". The value is "true" or "false". The default is
/// "true".
///
public const String NO_DELAY = "TcpTransport.noDelay";
///
/// Term on the uri which specifies the reconnect delay in milliseconds. The
/// term string is "TcpTransport.reconnectDelay". The value is an integer >=
/// 0. The default is 0.
///
public const String RECONNECT_DELAY = "TcpTransport.reconnectDelay";
///
/// Term on the uri which specifies the traffic class. The term string is
/// "TcpTransport.trafficClass". The value is an integer between 0 and 255.
/// The default is 0.
///
public const String TRAFFIC_CLASS = "TcpTransport.trafficClass";
///
/// Constructs the TcpTransport. Pulls common parameters off the uri.
///
///
///
protected TcpTransport(URL uri, Resources resources)
{
SetDefaultAutoFlush(uri.GetBooleanTerm(AUTO_FLUSH, false));
SetDefaultBufferSize((int) uri.GetIntegerTerm(BUFFER_SIZE, 0));
SetDefaultKeepAlive(uri.GetBooleanTerm(KEEP_ALIVE, false));
SetDefaultLingerTime((int) uri.GetIntegerTerm(LINGER_TIME, 30));
SetDefaultNoDelay(uri.GetBooleanTerm(NO_DELAY, true));
SetDefaultReconnectDelay((int) uri.GetIntegerTerm(RECONNECT_DELAY, 0));
SetDefaultTrafficClass((int) uri.GetIntegerTerm(TRAFFIC_CLASS, 0));
}
private void SetDefaultAutoFlush(bool autoFlush)
{
this.autoFlush = autoFlush;
}
///
/// The auto flush setting for this connection. If true, each call to send
/// must automatically call flush.
///
protected bool autoFlush;
private void SetDefaultBufferSize(int bufferSize)
{
if (bufferSize < 0 || bufferSize > 65536)
throw new ArgumentException(
"bufferSize < 0 || bufferSize > 65536");
this.bufferSize = bufferSize;
}
///
/// The output buffer size to use for this connection. Bytes, 0 means
/// unbuffered output. If using buffered output, you'll want to disable
/// auto flush and call flush manually only when needed.
///
protected int bufferSize;
private void SetDefaultKeepAlive(bool keepAlive)
{
this.keepAlive = keepAlive;
}
///
/// The tcp keep alive setting for this connection.
///
protected bool keepAlive;
private void SetDefaultLingerTime(int lingerTime)
{
if (lingerTime < -1 || lingerTime > 240)
throw new ArgumentException(
"lingerTime < -1 || lingerTime > 240");
this.lingerTime = lingerTime;
}
///
/// The tcp linger time setting for this connection. Time in seconds, -1
/// means disable.
///
protected int lingerTime;
private void SetDefaultNoDelay(bool noDelay)
{
this.noDelay = noDelay;
}
///
/// The tcp no delay setting for this connection. True disables nagle's
/// algorithm and causes all sends to be made asap.
///
protected bool noDelay;
private void SetDefaultReconnectDelay(int reconnectDelay)
{
if (reconnectDelay < 0)
throw new ArgumentException(
"reconnectDelay < 0");
this.reconnectDelay = reconnectDelay;
}
///
/// The reconnect delay for this connection. Time in milliseconds, 0 means
/// do not reconnect.
///
protected int reconnectDelay;
private void SetDefaultTrafficClass(int trafficClass)
{
if (trafficClass < 0 || trafficClass > 255)
throw new ArgumentException(
"trafficClass < 0 || trafficClass > 255");
this.trafficClass = trafficClass;
}
///
/// The traffic class for this connection. 0-255, 0 means normal handling.
/// Also called type of service or dscp.
///
protected int trafficClass;
protected override void Stop0()
{
try
{
Close(false);
}
catch (Exception)
{
// ignore
}
base.Stop0();
}
protected Socket socket;
///
/// Checks the connection socket for being open.
///
/// the connection socket
protected Socket CheckSocket()
{
Socket s = socket;
if (s == null)
throw new IOException("socket closed");
return s;
}
public override void Close(bool reset)
{
Socket s = socket;
if (s != null)
{
try
{
try
{
if (reset)
s.LingerState = new LingerOption(false, 0);
else
{
Flush();
ShutdownOutput();
}
}
finally
{
s.Close();
}
}
catch (Exception)
{
// ignore.
}
finally
{
stream = null;
socket = null;
}
}
}
///
/// Sends some data to the remote end. The output data is buffered
/// until the buffer is full or the buffer is flushed.
///
/// the bytes to be sent
/// Exception:
/// throws Exception if there is a problem transmitting the
/// data. Such a problem causes the current connection to be
/// reset.
///
///
public void Send(byte[] buf)
{
Send(buf, 0, buf.Length);
}
///
/// Sends some data to the remote end. The output data is buffered
/// until the buffer is full or the buffer is flushed.
///
/// the bytes to be sent
/// the offset into buf of the first byte to send
/// the number of bytes to send
public void Send(byte[] buf, int off, int len)
{
try
{
Stream s = checkStream();
s.Write(buf, off, len);
if (autoFlush)
{
s.Flush();
}
}
catch (Exception e)
{
Close(true);
throw e;
}
}
public void Flush()
{
try
{
checkStream().Flush();
}
catch (Exception e)
{
Close(true);
throw e;
}
}
protected Stream checkStream()
{
Stream ns = stream;
if (ns == null)
throw new IOException("net stream closed");
return ns;
}
private void FireData(FlexBuffer buf)
{
session.SessionData(null, buf);
}
protected Stream stream;
public override EndPoint LocalAddress()
{
return CheckSocket().LocalEndPoint;
}
public override EndPoint RemoteAddress()
{
return CheckSocket().RemoteEndPoint;
}
public void TransportData(Who recipient, FlexBuffer buf)
{
Send(buf.GetBuf(), buf.Index(), buf.Avail());
}
protected override void SetUpSocket()
{
Socket s = CheckSocket();
s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.KeepAlive, keepAlive);
s.LingerState = new LingerOption(lingerTime >= 0, lingerTime >= 0 ? lingerTime : 0);
s.NoDelay = noDelay;
//s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TypeOfService, trafficClass);
stream = new NetworkStream(socket);
// TODO do something about buffering this stream.
}
public void ShutdownInput()
{
CheckSocket().Shutdown(SocketShutdown.Receive);
}
public void ShutdownOutput()
{
CheckSocket().Shutdown(SocketShutdown.Send);
}
abstract protected Socket NewSocket();
abstract protected bool IsServer();
public override object TransportQuery(object query)
{
if (query.Equals(TransportConsts.IS_SERVER))
return IsServer();
return base.TransportQuery(query);
}
[MethodImpl(MethodImplOptions.Synchronized)]
protected override bool OpenSocket(bool reconnect)
{
// if a one time connection from a server socket listener, just
// return the existing socket.
if (!reconnect && socket != null)
return true;
// if a one time connection from a server socket listener, and
// this is a reconnect, then bail.
if (reconnect && IsServer())
return false;
// if a reconnect but no retries allowed, then bail.
if (reconnect && reconnectDelay == 0)
return false;
// ok, we don't have an existing socket, and this is either the first
// connection attempt or a reconnect with delay > 0.
bool first = true;
while (IsStarted())
{
// if reconnect is false and first is true, this is our
// very first attempt to connect. otherwise, we are trying
// to reconnect a broken link or establish a link where we
// have already failed at least once.
if (reconnect || !first)
{
if (reconnectDelay == 0)
return false;
System.Threading.Monitor.Wait(this, reconnectDelay);
if (!IsStarted())
break;
}
// try to open a socket.
try
{
socket = NewSocket();
return true;
}
catch (Exception e)
{
if (first)
{
first = false;
FireException("open", e);
}
}
}
return false;
}
protected override void ReadSocket()
{
Stream ns = checkStream();
FlexBuffer buf = new FlexBuffer(new byte[8192]);
try
{
while (IsStarted())
{
int n = ns.Read(buf.GetBuf(), 0, buf.Length());
if (n <= 0)
break;
buf.SetLength(n);
buf.SetIndex(0);
FireData(buf);
}
}
catch (Exception e)
{
if (e.Message == null)
throw e;
if (e.Message.Contains("connection was aborted"))
return;
if (e.Message.Contains("blocking operation"))
return;
if (e.Message.Contains("socket closed"))
return;
if (e.Message.Contains("read operation failed"))
return;
throw e;
}
}
}
}