/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections; using System.IO; using System.Reflection; using System.Threading; using log4net; using Apache.Qpid.Client.Failover; using Apache.Qpid.Client.Protocol; using Apache.Qpid.Client.Qms; using Apache.Qpid.Client.State; using Apache.Qpid.Client.Transport; using Apache.Qpid.Client.Transport.Socket.Blocking; using Apache.Qpid.Collections; using Apache.Qpid.Framing; using Apache.Qpid.Messaging; namespace Apache.Qpid.Client { public class AMQConnection : Closeable, IConnection { private static readonly ILog _log = LogManager.GetLogger(typeof(AMQConnection)); IConnectionInfo _connectionInfo; private int _nextChannelId = 0; // _Connected should be refactored with a suitable wait object. private bool _connected; Thread _heartBeatThread; HeartBeatThread _heartBeatRunner; // The last error code that occured on the connection. Used to return the correct exception to the client private AMQException _lastAMQException = null; /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. * This must be held by any child objects of this connection such as the session, producers and consumers. */ private readonly Object _failoverMutex = new Object(); public object FailoverMutex { get { return _failoverMutex; } } /** * Policy dictating how to failover */ private FailoverPolicy _failoverPolicy; internal bool IsFailoverAllowed { get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); } } /// /// A channel is roughly analogous to a session. The server can negotiate the maximum number of channels /// per session and we must prevent the client from opening too many. Zero means unlimited. /// private ushort _maximumChannelCount; /// /// The maximum size of frame supported by the server /// private uint _maximumFrameSize; private AMQStateManager _stateManager; private AMQProtocolSession _protocolSession; public AMQProtocolSession ProtocolSession { get { return _protocolSession; } } /// /// Maps from session id (Integer) to AmqChannel instance /// private readonly IDictionary _sessions = new LinkedHashtable(); private ExceptionListenerDelegate _exceptionListener; private IConnectionListener _connectionListener; private ITransport _transport; public ITransport Transport { get { return _transport; } } /// /// Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for /// message publication. /// private bool _started; private AMQProtocolListener _protocolListener; public AMQProtocolListener ProtocolListener { get { return _protocolListener; } } public IProtocolWriter ProtocolWriter { get { return _transport.ProtocolWriter; } } ProtocolWriter _protocolWriter; public ProtocolWriter ConvenientProtocolWriter { get { return _protocolWriter; } } public AMQConnection(IConnectionInfo connectionInfo) { if (connectionInfo == null) { throw new ArgumentException("ConnectionInfo must be specified"); } _log.Debug("ConnectionInfo: " + connectionInfo); _connectionInfo = connectionInfo; _log.Debug("password = " + _connectionInfo.Password); _failoverPolicy = new FailoverPolicy(connectionInfo); // We are not currently connected. _connected = false; Exception lastException = null; do { try { IBrokerInfo brokerInfo = _failoverPolicy.GetNextBrokerInfo(); _log.Debug("Connecting to " + brokerInfo); MakeBrokerConnection(brokerInfo); break; } catch (Exception e) { lastException = e; _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e); // XXX: Should perhaps break out of the do/while here if not a SocketException... } } while (!_connected && _failoverPolicy.FailoverAllowed()); _log.Debug("Are we connected:" + _connected); if (!_connected) { if ( lastException is AMQException ) { throw lastException; } else { throw new AMQConnectionException("Unable to connect", lastException); } } } /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType) { //Assembly assembly = Assembly.LoadFrom(assemblyName); Assembly assembly = Assembly.Load(assemblyName); foreach (Type type in assembly.GetTypes()) { _log.Debug(String.Format("type = {0}", type)); } Type transport = assembly.GetType(transportType); if (transport == null) { throw new ArgumentException( String.Format("Type is not found in assembly. Type={0} Assembly={1}", transportType, assemblyName)); } _log.Debug("transport = " + transport); _log.Debug("ctors = " + transport.GetConstructors()); ConstructorInfo info = transport.GetConstructors()[0]; ITransport result = (ITransport)info.Invoke(new object[] { host, port, this }); _log.Debug("transport = " + result); return result; }*/ public void Disconnect() { _transport.Close(); } #region IConnection Members public string ClientID { get { CheckNotClosed(); return _connectionInfo.ClientName; } set { CheckNotClosed(); _connectionInfo.ClientName = value; } } public override void Close() { lock (FailoverMutex) { // atomically set to closed and check the _previous value was NOT CLOSED if (Interlocked.Exchange(ref _closed, CLOSED) == NOT_CLOSED) { try { CloseAllSessions(null); CloseConnection(); } catch (AMQException e) { throw new QpidException("Error closing connection: " + e); } } } } private void CloseConnection() { _stateManager.ChangeState(AMQState.CONNECTION_CLOSING); AMQFrame frame = ConnectionCloseBody.CreateAMQFrame( 0, 200, "Qpid.NET client is closing the connection.", 0, 0); ProtocolWriter.Write(frame); _log.Debug("Blocking for connection close ok frame"); Disconnect(); } class CreateChannelFailoverSupport : FailoverSupport { private static readonly ILog _log = LogManager.GetLogger(typeof(CreateChannelFailoverSupport)); private bool _transacted; private AcknowledgeMode _acknowledgeMode; int _prefetchHigh; int _prefetchLow; AMQConnection _connection; public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) { _connection = connection; _transacted = transacted; _acknowledgeMode = acknowledgeMode; _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; } protected override object operation() { ushort channelId = _connection.NextChannelId(); if (_log.IsDebugEnabled) { _log.Debug("Write channel open frame for channel id " + channelId); } // We must create the channel and register it before actually sending the frame to the server to // open it, so that there is no window where we could receive data on the channel and not be set // up to handle it appropriately. AmqChannel channel = new AmqChannel(_connection, channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow); _connection.ProtocolSession.AddSessionByChannel(channelId, channel); _connection.RegisterSession(channelId, channel); bool success = false; try { _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted); success = true; } catch (AMQException e) { throw new QpidException("Error creating channel: " + e, e); } finally { if (!success) { _connection.ProtocolSession.RemoveSessionByChannel(channelId); _connection.DeregisterSession(channelId); } } if (_connection._started) { channel.Start(); } return channel; } } internal ushort NextChannelId() { return (ushort) Interlocked.Increment(ref _nextChannelId); } public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode) { return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK); } public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch) { return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch); } public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) { CheckNotClosed(); if (ChannelLimitReached()) { throw new ChannelLimitReachedException(_maximumChannelCount); } else { CreateChannelFailoverSupport operation = new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow); return (IChannel)operation.execute(this); } } public void CloseSession(AmqChannel channel) { // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). _protocolSession.CloseSession(channel); AMQFrame frame = ChannelCloseBody.CreateAMQFrame( channel.ChannelId, 200, "JMS client closing channel", 0, 0); _log.Debug("Blocking for channel close frame for channel " + channel.ChannelId); _protocolWriter.SyncWrite(frame, typeof(ChannelCloseOkBody)); _log.Debug("Received channel close frame"); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully } public ExceptionListenerDelegate ExceptionListener { get { CheckNotClosed(); return _exceptionListener; } set { CheckNotClosed(); _exceptionListener = value; } } /// /// Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread /// and is not thread safe (which is legal according to the JMS specification). /// @throws JMSException /// public void Start() { CheckNotClosed(); if (!_started) { foreach (DictionaryEntry lde in _sessions) { AmqChannel s = (AmqChannel)lde.Value; s.Start(); } _started = true; } } public void Stop() { CheckNotClosed(); if (_started) { foreach (DictionaryEntry lde in _sessions) { AmqChannel s = (AmqChannel) lde.Value; s.Stop(); } _started = false; } } public IConnectionListener ConnectionListener { get { return _connectionListener; } set { _connectionListener = value; } } #endregion #region IDisposable Members public void Dispose() { Close(); } #endregion private bool ChannelLimitReached() { return _maximumChannelCount != 0 && _sessions.Count == _maximumChannelCount; } /// /// Close all the sessions, either due to normal connection closure or due to an error occurring. /// @param cause if not null, the error that is causing this shutdown /// private void CloseAllSessions(Exception cause) { _log.Debug("Closing all session in connection " + this); ICollection sessions = new ArrayList(_sessions.Values); foreach (AmqChannel channel in sessions) { _log.Debug("Closing channel " + channel); if (cause != null) { channel.ClosedWithException(cause); } else { try { channel.Close(); } catch (QpidException e) { _log.Error("Error closing channel: " + e); } } } _log.Debug("Done closing all sessions in connection " + this); } public int MaximumChannelCount { get { CheckNotClosed(); return _maximumChannelCount; } } internal void SetMaximumChannelCount(ushort maximumChannelCount) { CheckNotClosed(); _maximumChannelCount = maximumChannelCount; } public uint MaximumFrameSize { get { return _maximumFrameSize; } set { _maximumFrameSize = value; } } public IDictionary Sessions { get { return _sessions; } } public string Host { get { return _failoverPolicy.GetCurrentBrokerInfo().Host; } } public int Port { get { return _failoverPolicy.GetCurrentBrokerInfo().Port; } } public string Username { get { return _connectionInfo.Username; } } public string Password { get { return _connectionInfo.Password; } } public string VirtualHost { get { return _connectionInfo.VirtualHost; } } /// /// Invoked by the AMQProtocolSession when a protocol session exception has occurred. /// This method sends the exception to a JMS exception listener, if configured, and /// propagates the exception to sessions, which in turn will propagate to consumers. /// This allows synchronous consumers to have exceptions thrown to them. /// /// the exception public void ExceptionReceived(Exception cause) { if (_exceptionListener != null) { // Listener expects one of these... QpidException xe; if (cause is QpidException) { xe = (QpidException) cause; } else { xe = new QpidException("Exception thrown against " + ToString() + ": " + cause, cause); } // in the case of an IOException, MINA has closed the protocol session so we set _closed to true // so that any generic client code that tries to close the connection will not mess up this error // handling sequence if (cause is IOException) { Interlocked.Exchange(ref _closed, CLOSED); } #if __MonoCS__ _exceptionListener(xe); #else _exceptionListener.Invoke(xe); #endif } else { _log.Error("Connection exception: " + cause); } // An undelivered is not fatal to the connections usability. if (!(cause is AMQUndeliveredException)) { Interlocked.Exchange(ref _closed, CLOSED); CloseAllSessions(cause); } else { ; } } internal void RegisterSession(int channelId, AmqChannel channel) { _sessions[channelId] = channel; } internal void DeregisterSession(int channelId) { _sessions.Remove(channelId); } /** * Fire the preFailover event to the registered connection listener (if any) * * @param redirect true if this is the result of a redirect request rather than a connection error * @return true if no listener or listener does not veto change */ public bool FirePreFailover(bool redirect) { bool proceed = true; if (_connectionListener != null) { proceed = _connectionListener.PreFailover(redirect); } return proceed; } /** * Fire the preResubscribe event to the registered connection listener (if any). If the listener * vetoes resubscription then all the sessions are closed. * * @return true if no listener or listener does not veto resubscription. * @throws JMSException */ public bool FirePreResubscribe() { if (_connectionListener != null) { bool resubscribe = _connectionListener.PreResubscribe(); if (!resubscribe) { MarkAllSessionsClosed(); } return resubscribe; } else { return true; } } /** * Marks all sessions and their children as closed without sending any protocol messages. Useful when * you need to mark objects "visible" in userland as closed after failover or other significant event that * impacts the connection. *

* The caller must hold the failover mutex before calling this method. */ private void MarkAllSessionsClosed() { //LinkedList sessionCopy = new LinkedList(_sessions.values()); ArrayList sessionCopy = new ArrayList(_sessions.Values); foreach (AmqChannel session in sessionCopy) { session.MarkClosed(); } _sessions.Clear(); } /** * Fires a failover complete event to the registered connection listener (if any). */ public void FireFailoverComplete() { if (_connectionListener != null) { _connectionListener.FailoverComplete(); } } public bool AttemptReconnection(String host, int port, SslOptions sslConfig) { IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, sslConfig); _failoverPolicy.setBroker(bd); try { MakeBrokerConnection(bd); return true; } catch (Exception e) { _log.Debug("Unable to connect to broker at " + bd, e); AttemptReconnection(); } return false; } private void MakeBrokerConnection(IBrokerInfo brokerDetail) { try { _stateManager = new AMQStateManager(); _protocolListener = new AMQProtocolListener(this, _stateManager); _protocolListener.AddFrameListener(_stateManager); /* // Currently there is only one transport option - BlockingSocket. String assemblyName = "Apache.Qpid.Client.Transport.Socket.Blocking.dll"; String transportType = "Apache.Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport"; // Load the transport assembly dynamically. _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType); */ _transport = new BlockingSocketTransport(); // Connect. _transport.Connect(brokerDetail, this); _protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener); _protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this); _protocolListener.ProtocolSession = _protocolSession; // Now start the connection "handshake". _transport.ProtocolWriter.Write(new ProtocolInitiation()); // Blocks until the connection has been opened. _stateManager.AttainState(AMQState.CONNECTION_OPEN); _failoverPolicy.attainedConnection(); // XXX: Again this should be changed to a suitable notify. _connected = true; } catch (AMQException e) { _lastAMQException = e; throw; // rethrow } } public bool AttemptReconnection() { while (_failoverPolicy.FailoverAllowed()) { try { MakeBrokerConnection(_failoverPolicy.GetNextBrokerInfo()); return true; } catch (Exception e) { if (!(e is AMQException)) { _log.Debug("Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo(), e); } else { _log.Debug(e.Message + ":Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo()); } } } // Connection unsuccessful. return false; } /** * For all channels, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. */ public void ResubscribeChannels() { ArrayList channels = new ArrayList(_sessions.Values); _log.Debug(String.Format("Resubscribing sessions = {0} sessions.size={1}", channels, channels.Count)); foreach (AmqChannel channel in channels) { _protocolSession.AddSessionByChannel(channel.ChannelId, channel); ReopenChannel( channel.ChannelId, channel.DefaultPrefetchHigh, channel.DefaultPrefetchLow, channel.Transacted ); channel.ReplayOnFailOver(); } } private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) { _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}", channelId, prefetchHigh, prefetchLow, transacted)); try { CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); } catch (AMQException e) { _protocolSession.RemoveSessionByChannel(channelId); DeregisterSession(channelId); throw new AMQException("Error reopening channel " + channelId + " after failover: " + e); } } void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) { _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody)); // Don't use the BasicQos frame if connecting to OpenAMQ (at it is not support). We // know this when we connection using AMQP 0.7 if (ProtocolInitiation.CURRENT_PROTOCOL_VERSION_MAJOR != 7) { // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d. _protocolWriter.SyncWrite(BasicQosBody.CreateAMQFrame(channelId, 0, (ushort)prefetchHigh, false), typeof (BasicQosOkBody)); } if (transacted) { if (_log.IsDebugEnabled) { _log.Debug("Issuing TxSelect for " + channelId); } _protocolWriter.SyncWrite(TxSelectBody.CreateAMQFrame(channelId), typeof(TxSelectOkBody)); } } public String toURL() { return _connectionInfo.AsUrl(); } class HeartBeatThread { int _heartbeatMillis; IProtocolWriter _protocolWriter; bool _run = true; public HeartBeatThread(IProtocolWriter protocolWriter, int heartbeatMillis) { _protocolWriter = protocolWriter; _heartbeatMillis = heartbeatMillis; } public void Run() { while (_run) { Thread.Sleep(_heartbeatMillis); if (!_run) break; _log.Debug("Sending heartbeat"); // TODO: Can we optimise this so that heartbeats are only written when we haven't sent anything recently to the broker? _protocolWriter.Write(HeartbeatBody.FRAME); } _log.Debug("Heatbeat thread stopped"); } public void Stop() { _run = false; } } public void StartHeartBeatThread(int heartbeatSeconds) { _log.Debug("Starting new heartbeat thread"); _heartBeatRunner = new HeartBeatThread(ProtocolWriter, heartbeatSeconds * 1000); _heartBeatThread = new Thread(new ThreadStart(_heartBeatRunner.Run)); _heartBeatThread.Name = "HeartBeat"; _heartBeatThread.Start(); } public void StopHeartBeatThread() { if (_heartBeatRunner != null) { _log.Debug("Stopping old heartbeat thread"); _heartBeatRunner.Stop(); } } } }