/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections; using log4net; using Apache.Qpid.Client.Handler; using Apache.Qpid.Client.Protocol; using Apache.Qpid.Client.Protocol.Listener; using Apache.Qpid.Framing; namespace Apache.Qpid.Client.State { public class AMQStateManager : IAMQMethodListener { private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQStateManager)); const bool InfoLoggingHack = true; /// /// The current state /// private AMQState _currentState; /// /// Maps from an AMQState instance to a Map from Class to StateTransitionHandler. /// The class must be a subclass of AMQFrame. /// private readonly IDictionary _state2HandlersMap; private ArrayList _stateListeners; private object _syncLock; public AMQStateManager() { _syncLock = new object(); _state2HandlersMap = new Hashtable(); _stateListeners = ArrayList.Synchronized(new ArrayList(5)); _currentState = AMQState.CONNECTION_NOT_STARTED; RegisterListeners(); } private void RegisterListeners() { IStateAwareMethodListener connectionStart = new ConnectionStartMethodHandler(); IStateAwareMethodListener connectionClose = new ConnectionCloseMethodHandler(); IStateAwareMethodListener connectionCloseOk = new ConnectionCloseOkHandler(); IStateAwareMethodListener connectionTune = new ConnectionTuneMethodHandler(); IStateAwareMethodListener connectionSecure = new ConnectionSecureMethodHandler(); IStateAwareMethodListener connectionOpenOk = new ConnectionOpenOkMethodHandler(); IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler(); IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler(); IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler(); IStateAwareMethodListener queueDeleteOk = new QueueDeleteOkMethodHandler(); IStateAwareMethodListener queuePurgeOk = new QueuePurgeOkMethodHandler(); // We need to register a map for the null (i.e. all state) handlers otherwise you get // a stack overflow in the handler searching code when you present it with a frame for which // no handlers are registered. _state2HandlersMap[AMQState.ALL] = new Hashtable(); { Hashtable notStarted = new Hashtable(); notStarted[typeof(ConnectionStartBody)] = connectionStart; notStarted[typeof(ConnectionCloseBody)] = connectionClose; _state2HandlersMap[AMQState.CONNECTION_NOT_STARTED] = notStarted; } { Hashtable notTuned = new Hashtable(); notTuned[typeof(ConnectionTuneBody)] = connectionTune; notTuned[typeof(ConnectionSecureBody)] = connectionSecure; notTuned[typeof(ConnectionCloseBody)] = connectionClose; _state2HandlersMap[AMQState.CONNECTION_NOT_TUNED] = notTuned; } { Hashtable notOpened = new Hashtable(); notOpened[typeof(ConnectionOpenOkBody)] = connectionOpenOk; notOpened[typeof(ConnectionCloseBody)] = connectionClose; _state2HandlersMap[AMQState.CONNECTION_NOT_OPENED] = notOpened; } { Hashtable open = new Hashtable(); open[typeof(ChannelCloseBody)] = channelClose; open[typeof(ConnectionCloseBody)] = connectionClose; open[typeof(BasicDeliverBody)] = basicDeliver; open[typeof(BasicReturnBody)] = basicReturn; open[typeof(QueueDeleteOkBody)] = queueDeleteOk; open[typeof(QueuePurgeOkBody)] = queuePurgeOk; _state2HandlersMap[AMQState.CONNECTION_OPEN] = open; } { Hashtable closing = new Hashtable(); closing[typeof(ConnectionCloseOkBody)] = connectionCloseOk; _state2HandlersMap[AMQState.CONNECTION_CLOSING] = closing; } } public AMQState CurrentState { get { return _currentState; } } /// /// Changes the state. /// /// The new state. /// if there is an error changing state public void ChangeState(AMQState newState) { if (InfoLoggingHack) { _logger.Debug("State changing to " + newState + " from old state " + _currentState); } _logger.Debug("State changing to " + newState + " from old state " + _currentState); AMQState oldState = _currentState; _currentState = newState; lock ( _syncLock ) { foreach ( IStateListener l in _stateListeners ) { l.StateChanged(oldState, newState); } } } public void Error(Exception e) { _logger.Debug("State manager receive error notification: " + e); lock ( _syncLock ) { foreach ( IStateListener l in _stateListeners ) { l.Error(e); } } } public bool MethodReceived(AMQMethodEvent evt) { _logger.Debug(String.Format("Finding method handler. currentState={0} type={1}", _currentState, evt.Method.GetType())); IStateAwareMethodListener handler = FindStateTransitionHandler(_currentState, evt.Method); if (handler != null) { handler.MethodReceived(this, evt); return true; } return false; } /// /// Finds the state transition handler. /// /// State of the current. /// The frame. /// /// if the state transition if not allowed private IStateAwareMethodListener FindStateTransitionHandler(AMQState currentState, AMQMethodBody frame) { Type clazz = frame.GetType(); if (_logger.IsDebugEnabled) { _logger.Debug("Looking for state transition handler for frame " + clazz); } IDictionary classToHandlerMap = (IDictionary) _state2HandlersMap[currentState]; if (classToHandlerMap == null) { // if no specialised per state handler is registered look for a // handler registered for "all" states return FindStateTransitionHandler(AMQState.ALL, frame); } IStateAwareMethodListener handler = (IStateAwareMethodListener) classToHandlerMap[clazz]; if (handler == null) { if (currentState == AMQState.ALL) { _logger.Debug("No state transition handler defined for receiving frame " + frame); return null; } else { // if no specialised per state handler is registered look for a // handler registered for "all" states return FindStateTransitionHandler(AMQState.ALL, frame); } } else { return handler; } } public void AddStateListener(IStateListener listener) { _logger.Debug("Adding state listener"); lock ( _syncLock ) { _stateListeners.Add(listener); } } public void RemoveStateListener(IStateListener listener) { lock ( _syncLock ) { _stateListeners.Remove(listener); } } public void AttainState(AMQState s) { if (_currentState != s) { StateWaiter sw = null; try { _logger.Debug("Adding state wait to reach state " + s); sw = new StateWaiter(s); AddStateListener(sw); sw.WaituntilStateHasChanged(); // at this point the state will have changed. } finally { RemoveStateListener(sw); } } } } }