/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Threading; using log4net; using NUnit.Framework; using Apache.Qpid.Messaging; namespace Apache.Qpid.Client.Tests { [TestFixture] public class ServiceProvidingClient : BaseMessagingTestFixture { private static ILog _logger = LogManager.GetLogger(typeof(ServiceProvidingClient)); private int _messageCount; private string _replyToExchangeName; private string _replyToRoutingKey; const int PACK = 100; private IMessagePublisher _destinationPublisher; private IMessageConsumer _consumer; private string _serviceName = "ServiceQ1"; private string _selector = null; [SetUp] public override void Init() { base.Init(); _logger.Info("Starting..."); _logger.Info("Service (queue) name is '" + _serviceName + "'..."); _connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException); _logger.Info("Message selector is <" + _selector + ">..."); _channel.DeclareQueue(_serviceName, false, false, false); _consumer = _channel.CreateConsumerBuilder(_serviceName) .WithPrefetchLow(100) .WithPrefetchHigh(500) .WithNoLocal(true) .Create(); _consumer.OnMessage = new MessageReceivedDelegate(OnMessage); } public override void Shutdown() { _consumer.Dispose(); base.Shutdown(); } private void OnConnectionException(Exception e) { _logger.Info("Connection exception occurred", e); // XXX: Test still doesn't shutdown when broker terminates. Is there no heartbeat? } [Test] public void Test() { _connection.Start(); _logger.Info("Waiting..."); ServiceRequestingClient client = new ServiceRequestingClient(); client.Init(); client.SendMessages(); } private void OnMessage(IMessage message) { // _logger.Info("Got message '" + message + "'"); ITextMessage tm = (ITextMessage)message; try { string replyToExchangeName = tm.ReplyToExchangeName; string replyToRoutingKey = tm.ReplyToRoutingKey; _replyToExchangeName = replyToExchangeName; _replyToRoutingKey = replyToRoutingKey; _logger.Debug("About to create a producer"); // Console.WriteLine("ReplyTo.ExchangeName = " + _replyToExchangeName); // Console.WriteLine("ReplyTo.RoutingKey = " + _replyToRoutingKey); _destinationPublisher = _channel.CreatePublisherBuilder() .WithExchangeName(_replyToExchangeName) .WithRoutingKey(_replyToRoutingKey) .WithDeliveryMode(DeliveryMode.NonPersistent) .Create(); _destinationPublisher.DisableMessageTimestamp = true; _logger.Debug("After create a producer"); } catch (QpidException e) { _logger.Error("Error creating destination", e); throw e; } _messageCount++; if (_messageCount % PACK == 0) { _logger.Info("Received message total: " + _messageCount); _logger.Info(string.Format("Sending response to '{0}:{1}'", _replyToExchangeName, _replyToRoutingKey)); } try { String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text; ITextMessage msg = _channel.CreateTextMessage(payload); if ( tm.Headers.Contains("timeSent") ) { msg.Headers["timeSent"] = tm.Headers["timeSent"]; } _destinationPublisher.Send(msg); } catch ( QpidException e ) { _logger.Error("Error sending message: " + e, e); throw e; } finally { _destinationPublisher.Dispose(); } } } }