This page last changed on Feb 18, 2009 by ritchiem.

Flow to Disk Design

Overview

Currently, the Java Broker can do one of two things with a message it has to deliver:

  1. Keep transient messages in memory until delivered
  2. Write persistent messages to a message store (like BDB) and keep in memory until delivery complete or memory is full.

This means that the broker is not able to avoid OoM exceptions i.e. send enough messages to the broker, especially if your consumers are not active, and you could bring the broker down once it explodes its available heap.

This page pulls together the ideas from QPID-949.

Other Implementations

Active MQ use the idea of a message cursor and have a number of different policies for performing 'Message Spooling' : Message Cursors.

Current Functionality

Currently the broker treats persistent and transient messages differently. Persistent messages are written to disk as they are received and handles created as WeakReferenceMessageHandles. This means that when an Out of Memory(OoM) condition occurs then all the persistent message handles are GC'd. Performance hits the floor as all messages at the front of the queues must be read from disk whilst new messages are kept in memory but at the back of the queue.

Transient messages are created as InMemoryMessageHandles and so cannot be purged from memory. When an OoM condition occurs the broker cannot recover.

Design

There are areas of the broker that are in need of improvement that could be affected by this implementation:

  1. MessageStore : Currently only persistent msgs are written here along with transactional data. The MS should become a TransactionLog_ so messages should not be retrieved from here for normal operation.
  2. Message Reference Counting : To minimise message data duplication references are used to record how many queues the message has been enqueued on. This is currently maintained by the message but has been a large cause of runtime problems. If the TransactionLog maintains a list of Message,Queue Tuples then we can remove the error prone reference count integer that is currently used.

Approach Overview

The approach here is to reduce the overall complexity of the broker so that it is in easier to reason about smaller chunks. Focusing at the level of a Queues would make life easier as we move towards AMQP 1-0. To facilitate this we should move to handling persistent and transient messages in a similar way. To this end the *MessageHandle objects should be merged into AMQMessage. The current reference counting will be the responsibility of the new TransactionLog. The errors and testing of the reference counting is a tricky issue to tackle cleanly within the Java Broker. The queues will gain additional counts to ensure that it can track its memory usage based on the size of the message body + header + Object Contant to represent the in memory data objects. This will allow the Queues to better reason and act upon their memory usage.

Approach Summary

  1. Remove shared state from the AMQMessage class, and move everything into QueueEntry s (this allows for a message to be flowed to disk on one queue while staying in memory on another).
  2. Break apart the MessageStore interface creating a new TransacitonLog interface that covers only the logging of the durable transactional events (message data arriving, enqueues, dequeues).
  3. Move reference counting into the TransactionLog
    At this point we will have removed our current (fragile) flow-to-disk capabilities on persistent messages... and all messages will be held in memory while live
  4. Add a new properties to queues to keep track of memory usage.
  5. Create QueueBacking to enable storage and retrieval of flowed messages.
  6. Update QueueEntries / AMQMessage to use QueueBacking for, disk to disk
  7. Add capabilities to queues to shrink their in memory profile by flowing queue-entries to disk (from the tail upward) until they are under a given notional memory usage.
  8. Add check on message enqueue to ensure queue size does not grow beyond defined limit. Mark queue as flowed to disk when that occurs. Immediately flowing new messages on that queue, and potentially starting a Purger thread.
  9. Add check on message send to potentially start an Inhale thread to restore flowed messages.
  10. Add properties to QueueDeclare for flow to disk control extensions as defined in C++ broker.

Limitations

The current design of the broker that utilises a new queue per topic subscriber does not lend itself well to this design. If a large number of subscribers fall behind due to high volume and start flowing to disk, then the amount of data flowing to disk will impare performance. A change in the broker to implement topics in a more AMQP 1-0 style where new subscribers are actually browsers on a special TopicQueue will alleviate the problem.

Future Enhancements

  1. Enable the flow to disk of the queue structure. This will remove the final constraints on memory and only limit the broker to the amount of disk space available.
  2. Dynamic sizing of queues. Policies for this sizing. i.e. fn(MessageSize,ConsumeRate) so slow consumers can have their queues flowed sooner with a smaller in memory cache. Whilst fast consumers with large messages would have a larger cache.
  3. Conversion of Topic implementation to use an AMQP 1-0 style single queue where each consumer is a browser.

Design Notes

Areas of Note

  1. Initially a Purger thread will not be required if we are to simply let messages be deleted on ack. However, if we do not have a small prefetch on the client then we will quickly OOM the broker. Ensuring the prefetch is set to a more sensible value (<100) is important here.
  2. Priority Queue implementation will be more tricky as IncomingMessage may be destined for the front of the queue unlike the non-priority case where new messages go at the end. The implementation for this will require a Purger thread.
  3. The queueMinMemory value will be initially a fixed value or at least a calculation based on queueMaxMemory. However, it should be implemented with the future prospect of using a dynamic sizing method.
  4. Care must be taken for the NO_ACK mode as the dequeue is performed before delivery so the message must be in memory before that occurs or the data will be lost.

Areas for investigation

  1. When to perform an fsync?
  2. Will NFS be capable of supporting the backing.
    1. Sizing guidelines for users, What is the overhead/message on disk
  3. Raise awareness that the data will now end up on disk. If it is confidential it should be sent encrypted.

Validation Rules

  1. What checks to run on start-up
    1. Defined Queues queueMaxMemory sum to less than X * Xmx value.
  2. How to calcuate default values

Alerting

  1. New Alerts when Queue flows and recovers
  2. Suggestion to set queueDepth less than queueMaxMemory to get early warning that flow-to-disk may occur.

Testing

  1. What happens when the disk fills up.
  2. Currently the Java Brokers behaviour with WeakReferences is quite poor after it has dropped all the references.
    1. Generate test that models the current behaviour and ensure that the new flow-to-disk improves the performance.

Message Flows

To highlight how these changes will affect the current message delivery flow what follows are a series of message flow diagrams to highlight the changes:

M4 Flows

This is what message delivery via a queue would look like in the M4 broker. Messages is received as an IncomingMessage and the data if persistent is written to the MessageStore. The message is fully received it becomes an AMQMessage and is then routed to a Queue where a QueueEntry is created that references that AMQMessage.

In the case of persistent message delivery the message data is held via a WeakReference. So when the broker reaches a full memory scenario it will purge ALL the references to the message data. New messages received by the broker are kept in memory but for message delivery the message must be fetched from the store which may be an expensive operation.

The situation is slightly better for Topics. An AMQMessage that is routed to multiple Queues can be referenced by many QueueEntries so duplication of memory is not required.

When a full memory situation occurs then as with the Queue case message data is purged. However, the references continue to be shared and an unchecked race to repopulate the message data occurs. No data loss should occur in the race only duplicated effort in reading from disk twice.

Flows after Flow-To-Disk

This design for Flow-To-Disk aims not to impact the normal message delivery flow. Here you can see that persistent data is written to the TransactionLog as it was in the MessageStore for M4.

The change occurs when the queue hits its defined memory limit. At this point ALL new messages to that queue are written to the QueueBacking. An Inhaler process ensures that the subscribers do not starve for data that has been flowed.

The topic scenario is slightly altered by the desire to keep each queue responsible for its own message flow to disk. AMQMessage references can be shared between queues in normal delivery but when a message is flowed to disk for a give queue that queue will restore the message only for its consumption.

The orange messages highlight the duplication of message data in memory. This duplication vastly simplifies the responsibilities for flowing to and from disk.

Of course when the queue leaves a flowed state newly received message data can again be shared between queues. Again note that the orange messages are pair vertically to represent the duplicated data from messages that have been restored from disk.

Design Details

To highlight the changes that will be required lets look at the processing that is performed on an incoming message:

AMQChannel

When a persistent message is received the headers and chunks are recorded in the new TransactionLog, the remainder of the current MessageStore will be moved to a new RoutingTable interface.

+- TransactionLog -+
| enqueue          |
| dequeue          |
| storeHeader      |
| storeChunk       |
|                  |
| startTransaction |
| commit           |
| abort            |
+------------------+

+--RoutingTable--+
| createQueue    |
| createExchange |
| createBinding  |
|                |
| deleteQueue    |
| deleteExchange |
| deleteBinding  |
+----------------+

The TransactionLog is a distilled version of the current MessageStore interfaces. The log is the persistent record of the state of the broker. On start up this log is used to restore the the routing and message states. It is not to be used as a lookup mechanism, the queue's must now be responsible for remembering all the enqueue messages and not rely on the previous MessageStore. As no random access to the log file is needed it can be implemented as a write ahead log. It can periodically cleanup the old state by writing a new log but as it its primary function is to ensure state is persisted to disk it need not maintain maps of the data thus simplifying its implementation. The responsibility for remembering the message data is delegated to the Queue. The TransactionLog shall absorb the current reference counting code and be responsible for deciding when to recoverably delete a message. Currently the reference counting is still spread across a number of different classes and has a couple of serious problems. The TransactionLog will record a series of Queue/Message tuples so that it can pair enqueue/dequeue calls. When there are no more references to the message then it can safely know that the message is no longer needed. By using a list of tuples rather than an integer count the TransactionLog is capable of safely interleaving transactions as there is no shared count value.

TransactionLog Recovery

Currently the MessasgeStore is responsible for providing unique MessageIDs, this is not strictly part of a TransactionLog as a result it would not make sense to include it in the interface. What is recommended is that we unify our message creation as part of removing the *MessageHandle objects. Messages recovered directly from the store currently create the *MessageHandle directly with a Message ID; while message delivered via the wire ask the MessageStore for an ID before creating an IncomingMessage which in turn creates the *MessageHandle. As we will be removing the *MessageHandle objects it makes sense to unify our message creation through a Factory MessageFactory. This will allow the factory to be responsible for the sequence of IDs. When recovery is in progress a call to createMessage(id) will take place and the factory need only:

  1. ensure the id is unique
  2. record the highest value seen to seed its sequence of IDs handed out by createMessage()
    +--MessageFactory---+
    | createMessage(id) |
    | createMessage()   |
    +-------------------+
    

AMQQueue/QueueEntry

When a message has been fully received it is then routed to the required Queues as before. Only persistent messages that are routed to persistent queues are written to the TransactionLog which is then responsible for the ultimate deletion of persistent message data.

For this to occur the existing model needs to be updated. The *Handle objects we currently have need to be merged in to AMQMessage and all the state about the message needs to be moved to the QueueEntry. This will allow us to null the AMQMessage reference as and when the message is flowed to disk. The QueueEntry interface will be augmented to allow the Queue to flow the data when required. When the data is recovered then no attempt is made to restore the single instance of the message. i.e. If a single message is sent to 10 queues initially there will be one AMQMessage and one copy of the data. When a queue is flowed then it will lose the reference to that message so on recover a new message with a copy of the data will be created soley for that queues use.

+-QueueEntry-+
| flow       |
| isFlowed   |
| recover    |
+------------+

Our existing Queue needs to be updated to be able to record the additional state of the QueueEntry s. Currently we have queueCount and queueSize that represent the count and data size used by the queue. The Queue needs to have additional queueMaxMemory, queueMinMemory, queueInMemory and isFlowed added. It is proposed that only the data size is used for flow to disk calculations as counting messages will not give us the control that we need over memory usage.

These new variables will be used to control two new threads Inhaler and Purger.

Purger

When queueMaxMemory is reached the queue is set to flow and all new messages on to the queue are sent straight to disk. As messages are sent to a subscriber there are a couple of possibilities when the queue is in a flowed state:

  1. The messages are also flowed to disk.
  2. A number or percentage of the queueMaxMemory could be kept to handle rollbacks.

Using the first mechanism we do not need to have a Purger thread for the simple queue case. However for the second case and to handle queues where the position of the incoming message is not known then the Purger thread will be required. The Purger simply needs to start at the front of the queue and record the amount of data still in memory on the Queue when queueMaxMemory is reached all subsequent messages are flowed.

Inhaler

The Inhaler is an optimisation to ensure that the broker returns to peek performance after a flow to disk event. Lazily loading messages on demand would be quite slow; so on delivery to a subscriber a check can be performed to see if the current queueInMemory is less than the queueMinMemory which would indicate that there is room to reload older messages. The Inhaler can then begin to load messages from disk until queueMaxMemory has been reached or all messages are back in memory. If all the messages are back in memory then the queue flow state can be reset allowing incoming messages to stay in memory.
NOTE: as there are no locks a second check by the Inhaler is required to ensure a message was not flowed between the last load and the change of queue state.

The updates to delivery to the queue and to the subscriber are expected to be updated in the following ways:

Pseudo-Code - Delivery to Subscriber
while (message in queue)
  subscriber.deliver(message)

  // With a low prefetch or an more complex purging thread this should not be required. 
  if (flowed)
    flowToDisk(message)

  if (_queueInMemory < _queueMinMemory)
    startInhaler
Pseudo-Code - Delivery to Queue
  addToQueue(message)

  if (flowed)
    flowToDisk(message)
  else
    if (_queueInMemory > _queueMaxMemory)
      setFlowed
      startPurger

The additional overhead of checking state is done after the message deliveries have been performed and are simple calculations compared to the existing message flow paths. As a result the non-flowed state performance should not be affected.

Implementation details for queue*Memory and flow state updates

Here are some further details on how the new queue*Memory values will be calculated.

Implementation details for Delivery to Subscriber, this is taken from SimpleAMQQueue and is also used by AMQPriorityQueue.

    public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
    {
     	// Current _queueDepth calcualations
        decrementQueueCount();
        decrementQueueSize(entry);
	
	// Add update for _queueInMemory_
        decrementQueueInMemorySize(entry);
	
	... snip complete entry clean up, including any TransactionLog dequeue and QueueBacking delete ...

	if (_queueInMemory.get < _queueMinMemory.get())
	{
            _asyncInhaler.execute(inhaler);
	}
	


When the Inhaler has completely restored all messages to the queue it can call setFlowed(false) to continue normal message delivery. NOTE: It is expected that the Inhaler will have to run once more over the queue to ensure that no new messages were flowed between it retrieving all messages on disk and the resetting of the flowed status.

Implementation details for Delivery to Queue, again this is taken from SimpleAMQQueue and is also used by AMQPriorityQueue.

 
    public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
    {
    	// Current _queueDepth calcualations
        incrementQueueCount();
        incrementQueueSize(message);
	
	// Add update for _queueInMemory_
        incrementQueueInMemorySize(message);
	
	// Add check to see if we should be flowing.
	if (_queueInMemory.get() > _queueMaxMemory.get())
	{
	    setFlowed(true);
	    // if we are to have a purging thread then this is where it would be started.
	    // _asyncPurger.execute(purger);
	}		
        
	... snip section about adding to queue and attempting deliveries ...
	 	 
	// For the SimpleAMQQueue case messageAddedToEndOfQueue() will always return true.
	// The AMQPriorityQueue implementation will be more complicated.
	if ( _flowed.get && messageAddedToEndOfQueue() )
	{
	    entry.flow();
	}

QueueBacking

The flow to disk implementation will be performed on a queue by queue basis so a new QueueBacking will be created to handle the flowing of messages to and from disk.

+-QueueBacking-+
| flow         |
| recover      |
| delete       |
+--------------+

When a message is dequeue then it must also be removed from QueueBacking.
NOTE: care must be taken here for the NO_ACK mode as the dequeue is performed before delivery so the message must be in memory before that occurs or the data will be lost.

BackingFormat

The initial implementation of the QueueBacking will be FileQueueBacking. This will need a new configuration parameter 'flow-to-disk-path' it will default to '$QPID_WORK/queueBacking'. In this directory a new directory will be created to match the queue name that this backing represents. It is in this directory that the queue contents will be written. Each message will be written to its own file which will include the header and body. So the resulting file structure will be as follows:

QPID_WORK/
         /queueBacking/
	              /<queueName>/
		                  /<bin>/<msgID>

Whilst NTFS can store over 4M files per directory, FAT32 is limited to 65534 and ext3 only 32000. That coupled with the fact that looking up a file in a large directory is not efficient it makes most sense to implement a hash map on disk using the MessageID as the key. Using the least significant 8 bits as the hashing function will give us 512 bins to evenly spread the messages on disk. This approach will improve look ups times and allow us to write over 16M messages per queue before we hit any file system limits.


Copy-per-queue is incredibly expensive when dealing with topics, and changes our memory usage there from O(<QueueEntry>*subscribers) to O(<size of message>*subscribers). It also involves a lot (N = subscribers) of copies in memory, which aren't nearly as cheap as you'd hope.

We don't need to actually implement copy-per-queue to use that as a simplifying assumption when calculating queue usage, and I totally agree that trying to acurately gauge queue memory usage within the overall context of the broker is futile.

Posted by aidan at Feb 02, 2009 08:24
Document generated by Confluence on May 26, 2010 10:32