/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Collections;
using log4net;
using Apache.Qpid.Client.Message;
using Apache.Qpid.Client.Transport;
using Apache.Qpid.Framing;
using Apache.Qpid.Sasl;
namespace Apache.Qpid.Client.Protocol
{
public class AMQProtocolSession
{
private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession));
private readonly IProtocolWriter _protocolWriter;
private readonly IConnectionCloser _connectionCloser;
/**
* Counter to ensure unique queue names
*/
private int _queueId = 1;
private readonly Object _queueIdLock = new Object();
///
/// Maps from the channel id to the AmqChannel that it represents.
///
//private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable());
//private ConcurrentMap _closingChannels = new ConcurrentHashMap();
private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable());
///
/// Maps from a channel id to an unprocessed message. This is used to tie together the
/// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
///
//private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable());
private AMQConnection _connection;
public string ClientID { get { return _connection.ClientID; } }
public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection)
{
_protocolWriter = protocolWriter;
_connectionCloser = connectionCloser;
_connection = connection;
}
public void Init()
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
_protocolWriter.Write(new ProtocolInitiation());
}
public string Username
{
get
{
return AMQConnection.Username;
}
}
public string Password
{
get
{
return AMQConnection.Password;
}
}
ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too.
public ConnectionTuneParameters ConnectionTuneParameters
{
get
{
return _connectionTuneParameters;
}
set
{
_connectionTuneParameters = value;
AMQConnection con = AMQConnection;
con.SetMaximumChannelCount(value.ChannelMax);
con.MaximumFrameSize = value.FrameMax;
}
}
private ISaslClient _saslClient;
public ISaslClient SaslClient
{
get { return _saslClient; }
set { _saslClient = value; }
}
///
/// Callback invoked from the BasicDeliverMethodHandler when a message has been received.
/// This is invoked on the MINA dispatcher thread.
///
/// the unprocessed message
/// if this was not expected
public void UnprocessedMessageReceived(UnprocessedMessage message)
{
_channelId2UnprocessedMsgMap[message.ChannelId] = message;
}
public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader)
{
UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId];
if (msg == null)
{
throw new AMQException("Error: received content header without having received a JMSDeliver frame first");
}
if (msg.ContentHeader != null)
{
throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
}
msg.ContentHeader = contentHeader;
if (contentHeader.BodySize == 0)
{
DeliverMessageToAMQSession(channelId, msg);
}
}
public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody)
{
UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId];
if (msg == null)
{
throw new AMQException("Error: received content body without having received a BasicDeliver frame first");
}
if (msg.ContentHeader == null)
{
_channelId2UnprocessedMsgMap.Remove(channelId);
throw new AMQException("Error: received content body without having received a ContentHeader frame first");
}
try
{
msg.ReceiveBody(contentBody);
}
catch (UnexpectedBodyReceivedException e)
{
_channelId2UnprocessedMsgMap.Remove(channelId);
throw e;
}
if (msg.IsAllBodyDataReceived())
{
DeliverMessageToAMQSession(channelId, msg);
}
}
///
/// Deliver a message to the appropriate session, removing the unprocessed message
/// from our map
/// the channel id the message should be delivered to
/// the message
private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg)
{
AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId];
channel.MessageReceived(msg);
_channelId2UnprocessedMsgMap.Remove(channelId);
}
///
/// Convenience method that writes a frame to the protocol session. Equivalent
/// to calling getProtocolSession().write().
///
/// the frame to write
public void WriteFrame(IDataBlock frame)
{
_protocolWriter.Write(frame);
}
public void AddSessionByChannel(ushort channelId, AmqChannel channel)
{
if (channel == null)
{
throw new ArgumentNullException("Attempt to register a null channel");
}
_logger.Debug("Add channel with channel id " + channelId);
_channelId2SessionMap[channelId] = channel;
}
public void RemoveSessionByChannel(ushort channelId)
{
_logger.Debug("Removing session with channelId " + channelId);
_channelId2SessionMap.Remove(channelId);
}
///
/// Starts the process of closing a channel
///
///
public void CloseSession(AmqChannel channel)
{
_logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId);
ushort channelId = channel.ChannelId;
// we need to know when a channel is closing so that we can respond
// with a channel.close frame when we receive any other type of frame
// on that channel
_closingChannels[channelId] = channel;
}
///
/// Called from the ChannelClose handler when a channel close frame is received.
/// This method decides whether this is a response or an initiation. The latter
/// case causes the AmqChannel to be closed and an exception to be thrown if
/// appropriate.
///
/// the id of the channel (session)
/// true if the client must respond to the server, i.e. if the server
/// initiated the channel close, false if the channel close is just the server
/// responding to the client's earlier request to close the channel.
public bool ChannelClosed(ushort channelId, int code, string text)
{
// if this is not a response to an earlier request to close the channel
if (!_closingChannels.ContainsKey(channelId))
{
_closingChannels.Remove(channelId);
AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId];
channel.ClosedWithException(new AMQException(_logger, code, text));
return true;
}
else
{
return false;
}
}
public AMQConnection AMQConnection
{
get
{
return _connection;
}
}
public void CloseProtocolSession()
{
_logger.Debug("Closing protocol session");
_connectionCloser.Close();
}
internal string GenerateQueueName()
{
int id;
lock(_queueIdLock)
{
id = _queueId++;
}
return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id;
}
}
}