/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Collections;
using System.Threading;
using log4net;
using Qpid.Client.Failover;
using Qpid.Client.Protocol.Listener;
using Qpid.Client.State;
using Qpid.Framing;
namespace Qpid.Client.Protocol
{
public class AMQProtocolListener : IProtocolListener
{
private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener));
/**
* We create the failover handler when the session is created since it needs a reference to the IoSession in order
* to be able to send errors during failover back to the client application. The session won't be available in the
* case where we failing over due to a Connection.Redirect message from the broker.
*/
private FailoverHandler _failoverHandler;
/**
* This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
* attempting failover where it is failing.
*/
internal FailoverState _failoverState = FailoverState.NOT_STARTED;
internal FailoverState FailoverState
{
get { return _failoverState; }
set { _failoverState = value; }
}
internal ManualResetEvent FailoverLatch;
AMQConnection _connection;
AMQStateManager _stateManager;
public AMQStateManager StateManager
{
get { return _stateManager; }
set { _stateManager = value; }
}
//private readonly CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList());
AMQProtocolSession _protocolSession = null; // FIXME
public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } // FIXME: can this be fixed?
private readonly Object _lock = new Object();
public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager)
{
_connection = connection;
_stateManager = stateManager;
_failoverHandler = new FailoverHandler(connection);
}
public void OnMessage(IDataBlock message)
{
// Handle incorrect protocol version.
if (message is ProtocolInitiation)
{
string error = String.Format("Protocol mismatch - {0}", message.ToString());
AMQException e = new AMQProtocolHeaderException(error);
_log.Error("Closing connection because of protocol mismatch", e);
//_protocolSession.CloseProtocolSession();
_stateManager.Error(e);
return;
}
AMQFrame frame = (AMQFrame)message;
if (frame.BodyFrame is AMQMethodBody)
{
if (_log.IsDebugEnabled)
{
_log.Debug("Method frame received: " + frame);
}
AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession);
try
{
bool wasAnyoneInterested = false;
lock (_frameListeners.SyncRoot)
{
foreach (IAMQMethodListener listener in _frameListeners)
{
wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested;
}
}
if (!wasAnyoneInterested)
{
throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
}
}
catch (Exception e)
{
foreach (IAMQMethodListener listener in _frameListeners)
{
listener.Error(e);
}
}
}
else if (frame.BodyFrame is ContentHeaderBody)
{
_protocolSession.MessageContentHeaderReceived(frame.Channel,
(ContentHeaderBody)frame.BodyFrame);
}
else if (frame.BodyFrame is ContentBody)
{
_protocolSession.MessageContentBodyReceived(frame.Channel,
(ContentBody)frame.BodyFrame);
}
else if (frame.BodyFrame is HeartbeatBody)
{
_log.Debug("HeartBeat received");
}
//_connection.BytesReceived(_protocolSession.Channel.ReadBytes); // XXX: is this really useful?
}
public void OnException(Exception cause)
{
_log.Warn("Protocol Listener received exception", cause);
lock (_lock)
{
if (_failoverState == FailoverState.NOT_STARTED)
{
if (!(cause is AMQUndeliveredException))
{
WhenClosed();
}
}
// We reach this point if failover was attempted and failed therefore we need to let the calling app
// know since we cannot recover the situation.
else if (_failoverState == FailoverState.FAILED)
{
// we notify the state manager of the error in case we have any clients waiting on a state
// change. Those "waiters" will be interrupted and can handle the exception
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
PropagateExceptionToWaiters(amqe);
_connection.ExceptionReceived(cause);
}
}
}
/**
* When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
* sessionClosed() depending on whether we were trying to send data at the time of failure.
*
* @param session
* @throws Exception
*/
void WhenClosed()
{
_connection.StopHeartBeatThread();
// TODO: Server just closes session with no warning if auth fails.
if (_connection.Closed)
{
_log.Info("Channel closed called by client");
}
else
{
_log.Info("Channel closed called with failover state currently " + _failoverState);
// Reconnectablility was introduced here so as not to disturb the client as they have made their intentions
// known through the policy settings.
if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.IsFailoverAllowed)
{
_log.Info("FAILOVER STARTING");
if (_failoverState == FailoverState.NOT_STARTED)
{
_failoverState = FailoverState.IN_PROGRESS;
startFailoverThread();
}
else
{
_log.Info("Not starting failover as state currently " + _failoverState);
}
}
else
{
_log.Info("Failover not allowed by policy.");
if (_failoverState != FailoverState.IN_PROGRESS)
{
_log.Info("sessionClose() not allowed to failover");
_connection.ExceptionReceived(
new AMQDisconnectedException("Server closed connection and reconnection not permitted."));
}
else
{
_log.Info("sessionClose() failover in progress");
}
}
}
_log.Info("Protocol Channel [" + this + "] closed");
}
///
/// There are two cases where we have other threads potentially blocking for events to be handled by this
/// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
/// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
/// react appropriately.
///
/// the exception to propagate
///
public void PropagateExceptionToWaiters(Exception e)
{
// FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over.
_stateManager.Error(e);
foreach (IAMQMethodListener listener in _frameListeners)
{
listener.Error(e);
}
}
public void AddFrameListener(IAMQMethodListener listener)
{
_frameListeners.Add(listener);
}
public void RemoveFrameListener(IAMQMethodListener listener)
{
if (_log.IsDebugEnabled)
{
_log.Debug("Removing frame listener: " + listener.ToString());
}
_frameListeners.Remove(listener);
}
public void BlockUntilNotFailingOver()
{
if (FailoverLatch != null)
{
FailoverLatch.WaitOne();
}
}
///
/// "Failover" for redirection.
///
///
///
public void Failover(string host, int port)
{
_failoverHandler.setHost(host);
_failoverHandler.setPort(port);
// see javadoc for FailoverHandler to see rationale for separate thread
startFailoverThread();
}
private void startFailoverThread()
{
Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run));
failoverThread.Name = "Failover";
// Do not inherit daemon-ness from current thread as this can be a daemon
// thread such as a AnonymousIoService thread.
failoverThread.IsBackground = false;
failoverThread.Start();
}
}
}