/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections; using System.IO; using System.Threading; using Apache.Qpid.Client.Qms; using Apache.Qpid.Client.Protocol; using Apache.Qpid.Codec; using Apache.Qpid.Framing; namespace Apache.Qpid.Client.Transport.Socket.Blocking { /// /// TCP Socket transport supporting both /// SSL and non-SSL connections. /// public class BlockingSocketTransport : ITransport { // Configuration variables. IProtocolListener _protocolListener; // Runtime variables. private ISocketConnector _connector; private IoHandler _ioHandler; private AmqpChannel _amqpChannel; private ManualResetEvent _stopEvent; public IProtocolWriter ProtocolWriter { get { return _amqpChannel; } } public string LocalEndpoint { get { return _connector.LocalEndpoint; } } /// /// Connect to the specified broker /// /// The broker to connect to /// The AMQ connection public void Connect(IBrokerInfo broker, AMQConnection connection) { _stopEvent = new ManualResetEvent(false); _protocolListener = connection.ProtocolListener; _ioHandler = MakeBrokerConnection(broker, connection); // todo: get default read size from config! IProtocolDecoderOutput decoderOutput = new ProtocolDecoderOutput(_protocolListener); _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput); // post an initial async read _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this); } /// /// Close the broker connection /// public void Close() { StopReading(); CloseBrokerConnection(); } private void StopReading() { _stopEvent.Set(); } private void CloseBrokerConnection() { if ( _ioHandler != null ) { _ioHandler.Dispose(); _ioHandler = null; } if ( _connector != null ) { _connector.Dispose(); _connector = null; } } private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection) { if ( broker.UseSSL ) { _connector = new SslSocketConnector(); } else { _connector = new SocketConnector(); } Stream stream = _connector.Connect(broker); return new IoHandler(stream, connection.ProtocolListener); } private void OnAsyncReadDone(IAsyncResult result) { try { _amqpChannel.EndRead(result); bool stopping = _stopEvent.WaitOne(0, false); if ( !stopping ) _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null); } catch ( Exception e ) { // ignore any errors during closing bool stopping = _stopEvent.WaitOne(0, false); if ( !stopping ) _protocolListener.OnException(e); } } #region IProtocolDecoderOutput Members public void Write(object message) { _protocolListener.OnMessage((IDataBlock)message); } #endregion } }