// $Id$
//
// Copyright 2007-2008 Cisco Systems Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy
// of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
using System;
using Etch.Msg;
using Etch.Support;
using Etch.Util;
namespace Etch.Transport
{
///
/// A plain implementation of a mailbox using a fixed size circular queue.
///
///
public class PlainMailbox : Mailbox, AlarmListener
{
///
/// Constructs the PlainMailbox
///
/// The mailbox manager to use to unregister this mailbox
/// and to deliver undelivered messages.
///
public PlainMailbox(MailboxManager mailboxManager, long messageId)
{
if (mailboxManager == null)
throw new NullReferenceException("mailboxManager == null");
this.mailboxManager = mailboxManager;
this.messageId = messageId;
queue = new CircularQueue(1);
}
private readonly MailboxManager mailboxManager;
private readonly long messageId;
private readonly CircularQueue queue;
private bool alarmSet;
private Notify notify;
private Object state;
public MailboxManager GetMailboxManager()
{
return mailboxManager;
}
public long GetMessageId()
{
return messageId;
}
///
///
///
///
///
/// Exception:
/// throws Exception
public bool Message(Who sender, Message msg)
{
bool ok = queue.Put(new Element(sender, msg), -1);
if (ok)
fireNotify();
return ok;
}
private void fireNotify()
{
Notify n;
Object s;
bool c;
lock (queue)
{
n = notify;
s = state;
c = queue.IsClosed();
}
if (n != null)
n.mailboxStatus(this, s, c);
}
///
///
///
///
/// Exception:
/// throws ThreadInterruptedException
public Element Read()
{
Element elem = null;
try
{
elem = queue.Get();
}
catch (Exception)
{
}
return elem;
}
///
///
///
///
///
/// Exception:
/// throw ThreadInterruptedException
public Element Read(int maxDelay)
{
Element elem = null;
try
{
elem = queue.Get(maxDelay);
}
catch (Exception)
{
}
return elem;
}
public bool CloseDelivery()
{
lock (queue)
{
if (queue.IsClosed())
return false;
if (alarmSet)
{
alarmSet = false;
AlarmManager.staticRemove(this);
}
mailboxManager.Unregister(this);
queue.Close();
}
fireNotify();
return true;
}
///
///
///
/// Exception:
/// throws Exception
public bool CloseRead()
{
if (CloseDelivery())
{
Element mbe;
while ((mbe = Read()) != null)
mailboxManager.Redeliver(mbe.sender, mbe.msg);
return true;
}
return false;
}
public int Wakeup(AlarmManager manager, Object state, long due)
{
//Console.WriteLine(" In wakeup");
CloseDelivery();
return 0;
}
public void RegisterNotify(Notify notify, Object state, int maxDelay)
{
if (notify == null)
throw new NullReferenceException("notify == null");
if (maxDelay < 0)
throw new ArgumentException("maxDelay < 0");
bool isNotEmptyOrIsClosed;
lock (queue)
{
if (this.notify != null)
throw new Exception("this.notify != null");
this.notify = notify;
this.state = state;
if (maxDelay > 0)
{
alarmSet = true;
AlarmManager.staticAdd(this, null, maxDelay);
}
isNotEmptyOrIsClosed = !queue.IsEmpty() || queue.IsClosed();
}
if (isNotEmptyOrIsClosed)
fireNotify();
}
public void UnregisterNotify(Notify notify)
{
if (notify == null)
throw new NullReferenceException("notify == null");
lock (queue)
{
if (notify != this.notify)
throw new NullReferenceException("notify != this.notify");
if (alarmSet)
{
alarmSet = false;
AlarmManager.staticRemove(this);
}
this.notify = null;
this.state = null;
}
}
public bool IsEmpty()
{
return queue.IsEmpty();
}
public bool IsClosed()
{
return queue.IsClosed();
}
public bool IsFull()
{
return queue.IsFull();
}
}
}