/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Threading; using log4net; using Apache.Qpid.Buffer; using Apache.Qpid.Client.Message; using Apache.Qpid.Messaging; using Apache.Qpid.Framing; namespace Apache.Qpid.Client { public class BasicMessageProducer : Closeable, IMessagePublisher { protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer)); /// /// If true, messages will not get a timestamp. /// private bool _disableTimestamps; /// /// Priority of messages created by this producer. /// private int _messagePriority; /// /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. /// private long _timeToLive; /// /// Delivery mode used for this producer. /// private DeliveryMode _deliveryMode; private bool _immediate; private bool _mandatory; string _exchangeName; string _routingKey; /// /// Default encoding used for messages produced by this producer. /// private string _encoding; /// /// Default encoding used for message produced by this producer. /// private string _mimeType; /// /// True if this producer was created from a transacted session /// private bool _transacted; private ushort _channelId; /// /// This is an id generated by the session and is used to tie individual producers to the session. This means we /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers /// to the session so that when an error is propagated to the session it can close the producer (meaning that /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). /// private long _producerId; /// /// The session used to create this producer /// private AmqChannel _channel; public BasicMessageProducer(string exchangeName, string routingKey, bool transacted, ushort channelId, AmqChannel channel, long producerId, DeliveryMode deliveryMode, long timeToLive, bool immediate, bool mandatory, int priority) { _exchangeName = exchangeName; _routingKey = routingKey; _transacted = transacted; _channelId = channelId; _channel = channel; _producerId = producerId; _deliveryMode = deliveryMode; _timeToLive = timeToLive; _immediate = immediate; _mandatory = mandatory; _messagePriority = priority; _channel.RegisterProducer(producerId, this); } #region IMessagePublisher Members public DeliveryMode DeliveryMode { get { CheckNotClosed(); return _deliveryMode; } set { CheckNotClosed(); _deliveryMode = value; } } public string ExchangeName { get { return _exchangeName; } } public string RoutingKey { get { return _routingKey; } } public bool DisableMessageID { get { throw new Exception("The method or operation is not implemented."); } set { throw new Exception("The method or operation is not implemented."); } } public bool DisableMessageTimestamp { get { CheckNotClosed(); return _disableTimestamps; } set { CheckNotClosed(); _disableTimestamps = value; } } public int Priority { get { CheckNotClosed(); return _messagePriority; } set { CheckNotClosed(); if ( value < 0 || value > 9 ) { throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9"); } _messagePriority = value; } } public override void Close() { _logger.Debug("Closing producer " + this); Interlocked.Exchange(ref _closed, CLOSED); _channel.DeregisterProducer(_producerId); } public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive) { CheckNotClosed(); SendImpl( _exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, _mandatory, _immediate ); } public void Send(IMessage msg) { CheckNotClosed(); SendImpl( _exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, _mandatory, _immediate ); } // This is a short-term hack (knowing that this code will be re-vamped sometime soon) // to facilitate publishing messages to potentially non-existent recipients. public void Send(IMessage msg, bool mandatory) { CheckNotClosed(); SendImpl( _exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, mandatory, _immediate ); } public long TimeToLive { get { CheckNotClosed(); return _timeToLive; } set { CheckNotClosed(); if ( value < 0 ) { throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value); } _timeToLive = value; } } #endregion public string MimeType { get { CheckNotClosed(); return _mimeType; } set { CheckNotClosed(); _mimeType = value; } } public string Encoding { get { CheckNotClosed(); return _encoding; } set { CheckNotClosed(); _encoding = value; } } public void Dispose() { Close(); } #region Message Publishing private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate) { // todo: handle session access ticket AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame( _channel.ChannelId, 0, exchangeName, routingKey, mandatory, immediate ); // fix message properties if ( !_disableTimestamps ) { message.Timestamp = DateTime.UtcNow.Ticks; if (timeToLive != 0) { message.Expiration = message.Timestamp + timeToLive; } } else { message.Expiration = 0; } message.DeliveryMode = deliveryMode; message.Priority = (byte)priority; ByteBuffer payload = message.Data; int payloadLength = payload.Limit; ContentBody[] contentBodies = CreateContentBodies(payload); AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length]; for ( int i = 0; i < contentBodies.Length; i++ ) { frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]); } if ( contentBodies.Length > 0 && _logger.IsDebugEnabled ) { _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); } // weight argument of zero indicates no child content headers, just bodies AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame( _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0, message.ContentHeaderProperties, (uint)payloadLength ); if ( _logger.IsDebugEnabled ) { _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); } frames[0] = publishFrame; frames[1] = contentHeaderFrame; CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); lock ( _channel.Connection.FailoverMutex ) { _channel.Connection.ProtocolWriter.Write(compositeFrame); } } /// /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated /// maximum frame size. /// /// /// return the array of content bodies private ContentBody[] CreateContentBodies(ByteBuffer payload) { if ( payload == null ) { return null; } else if ( payload.Remaining == 0 ) { return new ContentBody[0]; } // we substract one from the total frame maximum size to account for the end of frame marker in a body frame // (0xCE byte). int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1); int frameCount = CalculateContentBodyFrames(payload); ContentBody[] bodies = new ContentBody[frameCount]; for ( int i = 0; i < frameCount; i++ ) { int length = (payload.Remaining >= framePayloadMax) ? framePayloadMax : payload.Remaining; bodies[i] = new ContentBody(payload, (uint)length); } return bodies; } private int CalculateContentBodyFrames(ByteBuffer payload) { // we substract one from the total frame maximum size to account // for the end of frame marker in a body frame // (0xCE byte). int frameCount; if ( (payload == null) || (payload.Remaining == 0) ) { frameCount = 0; } else { int dataLength = payload.Remaining; int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1; int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; frameCount = (int)(dataLength / framePayloadMax) + lastFrame; } return frameCount; } #endregion // Message Publishing } }