/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections; using log4net; using Apache.Qpid.Client.Message; using Apache.Qpid.Client.Transport; using Apache.Qpid.Framing; using Apache.Qpid.Sasl; namespace Apache.Qpid.Client.Protocol { public class AMQProtocolSession { private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession)); private readonly IProtocolWriter _protocolWriter; private readonly IConnectionCloser _connectionCloser; /** * Counter to ensure unique queue names */ private int _queueId = 1; private readonly Object _queueIdLock = new Object(); /// /// Maps from the channel id to the AmqChannel that it represents. /// //private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable()); //private ConcurrentMap _closingChannels = new ConcurrentHashMap(); private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable()); /// /// Maps from a channel id to an unprocessed message. This is used to tie together the /// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. /// //private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable()); private AMQConnection _connection; public string ClientID { get { return _connection.ClientID; } } public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection) { _protocolWriter = protocolWriter; _connectionCloser = connectionCloser; _connection = connection; } public void Init() { // start the process of setting up the connection. This is the first place that // data is written to the server. _protocolWriter.Write(new ProtocolInitiation()); } public string Username { get { return AMQConnection.Username; } } public string Password { get { return AMQConnection.Password; } } ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too. public ConnectionTuneParameters ConnectionTuneParameters { get { return _connectionTuneParameters; } set { _connectionTuneParameters = value; AMQConnection con = AMQConnection; con.SetMaximumChannelCount(value.ChannelMax); con.MaximumFrameSize = value.FrameMax; } } private ISaslClient _saslClient; public ISaslClient SaslClient { get { return _saslClient; } set { _saslClient = value; } } /// /// Callback invoked from the BasicDeliverMethodHandler when a message has been received. /// This is invoked on the MINA dispatcher thread. /// /// the unprocessed message /// if this was not expected public void UnprocessedMessageReceived(UnprocessedMessage message) { _channelId2UnprocessedMsgMap[message.ChannelId] = message; } public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader) { UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; if (msg == null) { throw new AMQException("Error: received content header without having received a JMSDeliver frame first"); } if (msg.ContentHeader != null) { throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); } msg.ContentHeader = contentHeader; if (contentHeader.BodySize == 0) { DeliverMessageToAMQSession(channelId, msg); } } public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody) { UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; if (msg == null) { throw new AMQException("Error: received content body without having received a BasicDeliver frame first"); } if (msg.ContentHeader == null) { _channelId2UnprocessedMsgMap.Remove(channelId); throw new AMQException("Error: received content body without having received a ContentHeader frame first"); } try { msg.ReceiveBody(contentBody); } catch (UnexpectedBodyReceivedException e) { _channelId2UnprocessedMsgMap.Remove(channelId); throw e; } if (msg.IsAllBodyDataReceived()) { DeliverMessageToAMQSession(channelId, msg); } } /// /// Deliver a message to the appropriate session, removing the unprocessed message /// from our map /// the channel id the message should be delivered to /// the message private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg) { AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; channel.MessageReceived(msg); _channelId2UnprocessedMsgMap.Remove(channelId); } /// /// Convenience method that writes a frame to the protocol session. Equivalent /// to calling getProtocolSession().write(). /// /// the frame to write public void WriteFrame(IDataBlock frame) { _protocolWriter.Write(frame); } public void AddSessionByChannel(ushort channelId, AmqChannel channel) { if (channel == null) { throw new ArgumentNullException("Attempt to register a null channel"); } _logger.Debug("Add channel with channel id " + channelId); _channelId2SessionMap[channelId] = channel; } public void RemoveSessionByChannel(ushort channelId) { _logger.Debug("Removing session with channelId " + channelId); _channelId2SessionMap.Remove(channelId); } /// /// Starts the process of closing a channel /// /// public void CloseSession(AmqChannel channel) { _logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId); ushort channelId = channel.ChannelId; // we need to know when a channel is closing so that we can respond // with a channel.close frame when we receive any other type of frame // on that channel _closingChannels[channelId] = channel; } /// /// Called from the ChannelClose handler when a channel close frame is received. /// This method decides whether this is a response or an initiation. The latter /// case causes the AmqChannel to be closed and an exception to be thrown if /// appropriate. /// /// the id of the channel (session) /// true if the client must respond to the server, i.e. if the server /// initiated the channel close, false if the channel close is just the server /// responding to the client's earlier request to close the channel. public bool ChannelClosed(ushort channelId, int code, string text) { // if this is not a response to an earlier request to close the channel if (!_closingChannels.ContainsKey(channelId)) { _closingChannels.Remove(channelId); AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; channel.ClosedWithException(new AMQException(_logger, code, text)); return true; } else { return false; } } public AMQConnection AMQConnection { get { return _connection; } } public void CloseProtocolSession() { _logger.Debug("Closing protocol session"); _connectionCloser.Close(); } internal string GenerateQueueName() { int id; lock(_queueIdLock) { id = _queueId++; } return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id; } } }