// $Id$ // // Copyright 2007-2008 Cisco Systems Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy // of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations // under the License. using System; using Etch.Msg; using Etch.Support; using Etch.Util; namespace Etch.Transport { /// /// A plain implementation of a mailbox using a fixed size circular queue. /// /// public class PlainMailbox : Mailbox, AlarmListener { /// /// Constructs the PlainMailbox /// /// The mailbox manager to use to unregister this mailbox /// and to deliver undelivered messages. /// public PlainMailbox(MailboxManager mailboxManager, long messageId) { if (mailboxManager == null) throw new NullReferenceException("mailboxManager == null"); this.mailboxManager = mailboxManager; this.messageId = messageId; queue = new CircularQueue(1); } private readonly MailboxManager mailboxManager; private readonly long messageId; private readonly CircularQueue queue; private bool alarmSet; private Notify notify; private Object state; public MailboxManager GetMailboxManager() { return mailboxManager; } public long GetMessageId() { return messageId; } /// /// /// /// /// /// Exception: /// throws Exception public bool Message(Who sender, Message msg) { bool ok = queue.Put(new Element(sender, msg), -1); if (ok) fireNotify(); return ok; } private void fireNotify() { Notify n; Object s; bool c; lock (queue) { n = notify; s = state; c = queue.IsClosed(); } if (n != null) n.mailboxStatus(this, s, c); } /// /// /// /// /// Exception: /// throws ThreadInterruptedException public Element Read() { Element elem = null; try { elem = queue.Get(); } catch (Exception) { } return elem; } /// /// /// /// /// /// Exception: /// throw ThreadInterruptedException public Element Read(int maxDelay) { Element elem = null; try { elem = queue.Get(maxDelay); } catch (Exception) { } return elem; } public bool CloseDelivery() { lock (queue) { if (queue.IsClosed()) return false; if (alarmSet) { alarmSet = false; AlarmManager.staticRemove(this); } mailboxManager.Unregister(this); queue.Close(); } fireNotify(); return true; } /// /// /// /// Exception: /// throws Exception public bool CloseRead() { if (CloseDelivery()) { Element mbe; while ((mbe = Read()) != null) mailboxManager.Redeliver(mbe.sender, mbe.msg); return true; } return false; } public int Wakeup(AlarmManager manager, Object state, long due) { //Console.WriteLine(" In wakeup"); CloseDelivery(); return 0; } public void RegisterNotify(Notify notify, Object state, int maxDelay) { if (notify == null) throw new NullReferenceException("notify == null"); if (maxDelay < 0) throw new ArgumentException("maxDelay < 0"); bool isNotEmptyOrIsClosed; lock (queue) { if (this.notify != null) throw new Exception("this.notify != null"); this.notify = notify; this.state = state; if (maxDelay > 0) { alarmSet = true; AlarmManager.staticAdd(this, null, maxDelay); } isNotEmptyOrIsClosed = !queue.IsEmpty() || queue.IsClosed(); } if (isNotEmptyOrIsClosed) fireNotify(); } public void UnregisterNotify(Notify notify) { if (notify == null) throw new NullReferenceException("notify == null"); lock (queue) { if (notify != this.notify) throw new NullReferenceException("notify != this.notify"); if (alarmSet) { alarmSet = false; AlarmManager.staticRemove(this); } this.notify = null; this.state = null; } } public bool IsEmpty() { return queue.IsEmpty(); } public bool IsClosed() { return queue.IsClosed(); } public bool IsFull() { return queue.IsFull(); } } }