using System; using System.Messaging; using System.Threading; /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using Apache.NMS.Util; namespace Apache.NMS.MSMQ { /// /// An object capable of receiving messages from some destination /// public class MessageConsumer : IMessageConsumer { protected TimeSpan zeroTimeout = new TimeSpan(0); private readonly Session session; private readonly AcknowledgementMode acknowledgementMode; private MessageQueue messageQueue; private event MessageListener listener; private AtomicBoolean asyncDelivery = new AtomicBoolean(false); public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue) { this.session = session; this.acknowledgementMode = acknowledgementMode; this.messageQueue = messageQueue; } public event MessageListener Listener { add { listener += value; StartAsyncDelivery(); } remove { listener -= value; } } public IMessage Receive() { Message message = messageQueue.Receive(); return ToNmsMessage(message); } public IMessage Receive(TimeSpan timeout) { Message message = messageQueue.Receive(timeout); return ToNmsMessage(message); } public IMessage ReceiveNoWait() { Message message = messageQueue.Receive(zeroTimeout); return ToNmsMessage(message); } public void Dispose() { throw new NotImplementedException(); } public void Close() { StopAsyncDelivery(); Dispose(); } public void StopAsyncDelivery() { asyncDelivery.Value = false; } protected virtual void StartAsyncDelivery() { if(asyncDelivery.CompareAndSet(false, true)) { Thread thread = new Thread(new ThreadStart(DispatchLoop)); thread.IsBackground = true; thread.Start(); } } protected virtual void DispatchLoop() { Tracer.Info("Starting dispatcher thread consumer: " + this); while(asyncDelivery.Value) { IMessage message = Receive(); if(message != null) { try { listener(message); } catch(Exception e) { HandleAsyncException(e); } } } Tracer.Info("Stopping dispatcher thread consumer: " + this); } protected virtual void HandleAsyncException(Exception e) { session.Connection.HandleException(e); } protected virtual IMessage ToNmsMessage(Message message) { if(message == null) { return null; } return session.MessageConverter.ToNmsMessage(message); } } }