// $Id$ // // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // using System; using System.IO; using System.Net; using System.Net.Sockets; using System.Runtime.CompilerServices; namespace Org.Apache.Etch.Bindings.Csharp.Util { /// /// Constants and methods related to tcp transport. /// abstract public class TcpTransport : Connection, TransportData { /// /// Constructs the TcpTransport. Pulls common parameters off the uri. /// /// /// protected TcpTransport(URL uri, Resources resources) { options = new TcpOptions(uri, resources); } private readonly TcpOptions options; protected override void Stop0() { try { Close(false); } catch { // ignore } base.Stop0(); } protected Socket socket; /// /// Checks the connection socket for being open. /// /// the connection socket protected Socket CheckSocket() { Socket s = socket; if (s == null) throw new IOException("socket closed"); return s; } public override void Close(bool reset) { Socket s = socket; if (s != null) { try { try { if (reset) s.LingerState = new LingerOption(false, 0); else { Flush(); ShutdownOutput(); } } finally { s.Close(); } } catch { // ignore. } finally { stream = null; socket = null; } } } /// /// Sends some data to the remote end. The output data is buffered /// until the buffer is full or the buffer is flushed. /// /// the bytes to be sent /// Exception: /// throws Exception if there is a problem transmitting the /// data. Such a problem causes the current connection to be /// reset. /// /// public void Send(byte[] buf) { Send(buf, 0, buf.Length); } /// /// Sends some data to the remote end. The output data is buffered /// until the buffer is full or the buffer is flushed. /// /// the bytes to be sent /// the offset into buf of the first byte to send /// the number of bytes to send public void Send(byte[] buf, int off, int len) { try { Stream s = checkStream(); s.Write(buf, off, len); if (options.autoFlush) { s.Flush(); } } catch (Exception) { Close(true); throw; } } public void Flush() { try { checkStream().Flush(); } catch (Exception) { Close(true); throw; } } protected Stream checkStream() { Stream ns = stream; if (ns == null) throw new IOException("net stream closed"); return ns; } private void FireData(FlexBuffer buf) { session.SessionData(null, buf); } protected Stream stream; public override EndPoint LocalAddress() { return CheckSocket().LocalEndPoint; } public override EndPoint RemoteAddress() { return CheckSocket().RemoteEndPoint; } public void TransportData(Who recipient, FlexBuffer buf) { Send(buf.GetBuf(), buf.Index(), buf.Avail()); } protected override void SetUpSocket() { Socket s = CheckSocket(); s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.KeepAlive, options.keepAlive); s.LingerState = new LingerOption(options.lingerTime >= 0, options.lingerTime >= 0 ? options.lingerTime : 0); s.NoDelay = options.noDelay; //s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TypeOfService, trafficClass); stream = new NetworkStream(socket); // TODO do something about buffering this stream. } public void ShutdownInput() { CheckSocket().Shutdown(SocketShutdown.Receive); } public void ShutdownOutput() { CheckSocket().Shutdown(SocketShutdown.Send); } abstract protected Socket NewSocket(); abstract protected bool IsServer(); public override object TransportQuery(object query) { if (query.Equals(TransportConsts.IS_SERVER)) return IsServer(); return base.TransportQuery(query); } [MethodImpl(MethodImplOptions.Synchronized)] protected override bool OpenSocket(bool reconnect) { // if a one time connection from a server socket listener, just // return the existing socket. if (!reconnect && socket != null) return true; // if a one time connection from a server socket listener, and // this is a reconnect, then bail. if (reconnect && IsServer()) return false; // if a reconnect but no retries allowed, then bail. if (reconnect && options.reconnectDelay == 0) return false; // ok, we don't have an existing socket, and this is either the first // connection attempt or a reconnect with delay > 0. bool first = true; while (IsStarted()) { // if reconnect is false and first is true, this is our // very first attempt to connect. otherwise, we are trying // to reconnect a broken link or establish a link where we // have already failed at least once. if (reconnect || !first) { if (options.reconnectDelay == 0) return false; System.Threading.Monitor.Wait(this, options.reconnectDelay); if (!IsStarted()) break; } // try to open a socket. try { socket = NewSocket(); return true; } catch (Exception e) { if (first) { first = false; FireException("open", e); } } } return false; } protected override void ReadSocket() { Stream ns = checkStream(); FlexBuffer buf = new FlexBuffer(new byte[8192]); try { while (IsStarted()) { int n = ns.Read(buf.GetBuf(), 0, buf.Length()); if (n <= 0) break; buf.SetLength(n); buf.SetIndex(0); FireData(buf); } } catch (Exception e) { if (e.Message == null) throw; if (e.Message.Contains("connection was aborted")) return; if (e.Message.Contains("blocking operation")) return; if (e.Message.Contains("socket closed")) return; if (e.Message.Contains("read operation failed")) return; throw; } } } }