// $Id$
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
namespace Org.Apache.Etch.Bindings.Csharp.Util
{
///
/// Constants and methods related to tcp transport.
///
abstract public class TcpTransport : Connection, TransportData
{
///
/// Constructs the TcpTransport. Pulls common parameters off the uri.
///
///
///
protected TcpTransport(URL uri, Resources resources)
{
options = new TcpOptions(uri, resources);
}
private readonly TcpOptions options;
protected override void Stop0()
{
try
{
Close(false);
}
catch
{
// ignore
}
base.Stop0();
}
protected Socket socket;
///
/// Checks the connection socket for being open.
///
/// the connection socket
protected Socket CheckSocket()
{
Socket s = socket;
if (s == null)
throw new IOException("socket closed");
return s;
}
public override void Close(bool reset)
{
Socket s = socket;
if (s != null)
{
try
{
try
{
if (reset)
s.LingerState = new LingerOption(false, 0);
else
{
Flush();
ShutdownOutput();
}
}
finally
{
s.Close();
}
}
catch
{
// ignore.
}
finally
{
stream = null;
socket = null;
}
}
}
///
/// Sends some data to the remote end. The output data is buffered
/// until the buffer is full or the buffer is flushed.
///
/// the bytes to be sent
/// Exception:
/// throws Exception if there is a problem transmitting the
/// data. Such a problem causes the current connection to be
/// reset.
///
///
public void Send(byte[] buf)
{
Send(buf, 0, buf.Length);
}
///
/// Sends some data to the remote end. The output data is buffered
/// until the buffer is full or the buffer is flushed.
///
/// the bytes to be sent
/// the offset into buf of the first byte to send
/// the number of bytes to send
public void Send(byte[] buf, int off, int len)
{
try
{
Stream s = checkStream();
s.Write(buf, off, len);
if (options.autoFlush)
{
s.Flush();
}
}
catch (Exception)
{
Close(true);
throw;
}
}
public void Flush()
{
try
{
checkStream().Flush();
}
catch (Exception)
{
Close(true);
throw;
}
}
protected Stream checkStream()
{
Stream ns = stream;
if (ns == null)
throw new IOException("net stream closed");
return ns;
}
private void FireData(FlexBuffer buf)
{
session.SessionData(null, buf);
}
protected Stream stream;
public override EndPoint LocalAddress()
{
return CheckSocket().LocalEndPoint;
}
public override EndPoint RemoteAddress()
{
return CheckSocket().RemoteEndPoint;
}
public void TransportData(Who recipient, FlexBuffer buf)
{
Send(buf.GetBuf(), buf.Index(), buf.Avail());
}
protected override void SetUpSocket()
{
Socket s = CheckSocket();
s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.KeepAlive, options.keepAlive);
s.LingerState = new LingerOption(options.lingerTime >= 0, options.lingerTime >= 0 ? options.lingerTime : 0);
s.NoDelay = options.noDelay;
//s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TypeOfService, trafficClass);
stream = new NetworkStream(socket);
// TODO do something about buffering this stream.
}
public void ShutdownInput()
{
CheckSocket().Shutdown(SocketShutdown.Receive);
}
public void ShutdownOutput()
{
CheckSocket().Shutdown(SocketShutdown.Send);
}
abstract protected Socket NewSocket();
abstract protected bool IsServer();
public override object TransportQuery(object query)
{
if (query.Equals(TransportConsts.IS_SERVER))
return IsServer();
return base.TransportQuery(query);
}
[MethodImpl(MethodImplOptions.Synchronized)]
protected override bool OpenSocket(bool reconnect)
{
// if a one time connection from a server socket listener, just
// return the existing socket.
if (!reconnect && socket != null)
return true;
// if a one time connection from a server socket listener, and
// this is a reconnect, then bail.
if (reconnect && IsServer())
return false;
// if a reconnect but no retries allowed, then bail.
if (reconnect && options.reconnectDelay == 0)
return false;
// ok, we don't have an existing socket, and this is either the first
// connection attempt or a reconnect with delay > 0.
bool first = true;
while (IsStarted())
{
// if reconnect is false and first is true, this is our
// very first attempt to connect. otherwise, we are trying
// to reconnect a broken link or establish a link where we
// have already failed at least once.
if (reconnect || !first)
{
if (options.reconnectDelay == 0)
return false;
System.Threading.Monitor.Wait(this, options.reconnectDelay);
if (!IsStarted())
break;
}
// try to open a socket.
try
{
socket = NewSocket();
return true;
}
catch (Exception e)
{
if (first)
{
first = false;
FireException("open", e);
}
}
}
return false;
}
protected override void ReadSocket()
{
Stream ns = checkStream();
FlexBuffer buf = new FlexBuffer(new byte[8192]);
try
{
while (IsStarted())
{
int n = ns.Read(buf.GetBuf(), 0, buf.Length());
if (n <= 0)
break;
buf.SetLength(n);
buf.SetIndex(0);
FireData(buf);
}
}
catch (Exception e)
{
if (e.Message == null)
throw;
if (e.Message.Contains("connection was aborted"))
return;
if (e.Message.Contains("blocking operation"))
return;
if (e.Message.Contains("socket closed"))
return;
if (e.Message.Contains("read operation failed"))
return;
throw;
}
}
}
}