Apache Qpid : Java Broker Design - Flow to Disk
This page last changed on Feb 18, 2009 by ritchiem.
Flow to Disk DesignOverviewCurrently, the Java Broker can do one of two things with a message it has to deliver:
This means that the broker is not able to avoid OoM exceptions i.e. send enough messages to the broker, especially if your consumers are not active, and you could bring the broker down once it explodes its available heap. This page pulls together the ideas from QPID-949. Other ImplementationsActive MQ use the idea of a message cursor and have a number of different policies for performing 'Message Spooling' : Message Cursors. Current FunctionalityCurrently the broker treats persistent and transient messages differently. Persistent messages are written to disk as they are received and handles created as WeakReferenceMessageHandles. This means that when an Out of Memory(OoM) condition occurs then all the persistent message handles are GC'd. Performance hits the floor as all messages at the front of the queues must be read from disk whilst new messages are kept in memory but at the back of the queue. Transient messages are created as InMemoryMessageHandles and so cannot be purged from memory. When an OoM condition occurs the broker cannot recover. DesignThere are areas of the broker that are in need of improvement that could be affected by this implementation:
Approach OverviewThe approach here is to reduce the overall complexity of the broker so that it is in easier to reason about smaller chunks. Focusing at the level of a Queues would make life easier as we move towards AMQP 1-0. To facilitate this we should move to handling persistent and transient messages in a similar way. To this end the *MessageHandle objects should be merged into AMQMessage. The current reference counting will be the responsibility of the new TransactionLog. The errors and testing of the reference counting is a tricky issue to tackle cleanly within the Java Broker. The queues will gain additional counts to ensure that it can track its memory usage based on the size of the message body + header + Object Contant to represent the in memory data objects. This will allow the Queues to better reason and act upon their memory usage. Approach Summary
LimitationsThe current design of the broker that utilises a new queue per topic subscriber does not lend itself well to this design. If a large number of subscribers fall behind due to high volume and start flowing to disk, then the amount of data flowing to disk will impare performance. A change in the broker to implement topics in a more AMQP 1-0 style where new subscribers are actually browsers on a special TopicQueue will alleviate the problem. Future Enhancements
Design NotesAreas of Note
Areas for investigation
Validation Rules
Alerting
Testing
Message FlowsTo highlight how these changes will affect the current message delivery flow what follows are a series of message flow diagrams to highlight the changes: M4 FlowsThis is what message delivery via a queue would look like in the M4 broker. Messages is received as an IncomingMessage and the data if persistent is written to the MessageStore. The message is fully received it becomes an AMQMessage and is then routed to a Queue where a QueueEntry is created that references that AMQMessage. In the case of persistent message delivery the message data is held via a WeakReference. So when the broker reaches a full memory scenario it will purge ALL the references to the message data. New messages received by the broker are kept in memory but for message delivery the message must be fetched from the store which may be an expensive operation. The situation is slightly better for Topics. An AMQMessage that is routed to multiple Queues can be referenced by many QueueEntries so duplication of memory is not required. When a full memory situation occurs then as with the Queue case message data is purged. However, the references continue to be shared and an unchecked race to repopulate the message data occurs. No data loss should occur in the race only duplicated effort in reading from disk twice. Flows after Flow-To-DiskThis design for Flow-To-Disk aims not to impact the normal message delivery flow. Here you can see that persistent data is written to the TransactionLog as it was in the MessageStore for M4. The change occurs when the queue hits its defined memory limit. At this point ALL new messages to that queue are written to the QueueBacking. An Inhaler process ensures that the subscribers do not starve for data that has been flowed. The topic scenario is slightly altered by the desire to keep each queue responsible for its own message flow to disk. AMQMessage references can be shared between queues in normal delivery but when a message is flowed to disk for a give queue that queue will restore the message only for its consumption. The orange messages highlight the duplication of message data in memory. This duplication vastly simplifies the responsibilities for flowing to and from disk. Of course when the queue leaves a flowed state newly received message data can again be shared between queues. Again note that the orange messages are pair vertically to represent the duplicated data from messages that have been restored from disk. Design DetailsTo highlight the changes that will be required lets look at the processing that is performed on an incoming message: AMQChannelWhen a persistent message is received the headers and chunks are recorded in the new TransactionLog, the remainder of the current MessageStore will be moved to a new RoutingTable interface. +- TransactionLog -+ | enqueue | | dequeue | | storeHeader | | storeChunk | | | | startTransaction | | commit | | abort | +------------------+ +--RoutingTable--+ | createQueue | | createExchange | | createBinding | | | | deleteQueue | | deleteExchange | | deleteBinding | +----------------+ The TransactionLog is a distilled version of the current MessageStore interfaces. The log is the persistent record of the state of the broker. On start up this log is used to restore the the routing and message states. It is not to be used as a lookup mechanism, the queue's must now be responsible for remembering all the enqueue messages and not rely on the previous MessageStore. As no random access to the log file is needed it can be implemented as a write ahead log. It can periodically cleanup the old state by writing a new log but as it its primary function is to ensure state is persisted to disk it need not maintain maps of the data thus simplifying its implementation. The responsibility for remembering the message data is delegated to the Queue. The TransactionLog shall absorb the current reference counting code and be responsible for deciding when to recoverably delete a message. Currently the reference counting is still spread across a number of different classes and has a couple of serious problems. The TransactionLog will record a series of Queue/Message tuples so that it can pair enqueue/dequeue calls. When there are no more references to the message then it can safely know that the message is no longer needed. By using a list of tuples rather than an integer count the TransactionLog is capable of safely interleaving transactions as there is no shared count value. TransactionLog RecoveryCurrently the MessasgeStore is responsible for providing unique MessageIDs, this is not strictly part of a TransactionLog as a result it would not make sense to include it in the interface. What is recommended is that we unify our message creation as part of removing the *MessageHandle objects. Messages recovered directly from the store currently create the *MessageHandle directly with a Message ID; while message delivered via the wire ask the MessageStore for an ID before creating an IncomingMessage which in turn creates the *MessageHandle. As we will be removing the *MessageHandle objects it makes sense to unify our message creation through a Factory MessageFactory. This will allow the factory to be responsible for the sequence of IDs. When recovery is in progress a call to createMessage(id) will take place and the factory need only:
AMQQueue/QueueEntryWhen a message has been fully received it is then routed to the required Queues as before. Only persistent messages that are routed to persistent queues are written to the TransactionLog which is then responsible for the ultimate deletion of persistent message data. For this to occur the existing model needs to be updated. The *Handle objects we currently have need to be merged in to AMQMessage and all the state about the message needs to be moved to the QueueEntry. This will allow us to null the AMQMessage reference as and when the message is flowed to disk. The QueueEntry interface will be augmented to allow the Queue to flow the data when required. When the data is recovered then no attempt is made to restore the single instance of the message. i.e. If a single message is sent to 10 queues initially there will be one AMQMessage and one copy of the data. When a queue is flowed then it will lose the reference to that message so on recover a new message with a copy of the data will be created soley for that queues use. +-QueueEntry-+ | flow | | isFlowed | | recover | +------------+ Our existing Queue needs to be updated to be able to record the additional state of the QueueEntry s. Currently we have queueCount and queueSize that represent the count and data size used by the queue. The Queue needs to have additional queueMaxMemory, queueMinMemory, queueInMemory and isFlowed added. It is proposed that only the data size is used for flow to disk calculations as counting messages will not give us the control that we need over memory usage. These new variables will be used to control two new threads Inhaler and Purger. PurgerWhen queueMaxMemory is reached the queue is set to flow and all new messages on to the queue are sent straight to disk. As messages are sent to a subscriber there are a couple of possibilities when the queue is in a flowed state:
Using the first mechanism we do not need to have a Purger thread for the simple queue case. However for the second case and to handle queues where the position of the incoming message is not known then the Purger thread will be required. The Purger simply needs to start at the front of the queue and record the amount of data still in memory on the Queue when queueMaxMemory is reached all subsequent messages are flowed. InhalerThe Inhaler is an optimisation to ensure that the broker returns to peek performance after a flow to disk event. Lazily loading messages on demand would be quite slow; so on delivery to a subscriber a check can be performed to see if the current queueInMemory is less than the queueMinMemory which would indicate that there is room to reload older messages. The Inhaler can then begin to load messages from disk until queueMaxMemory has been reached or all messages are back in memory. If all the messages are back in memory then the queue flow state can be reset allowing incoming messages to stay in memory. The updates to delivery to the queue and to the subscriber are expected to be updated in the following ways: Pseudo-Code - Delivery to Subscriber while (message in queue) subscriber.deliver(message) // With a low prefetch or an more complex purging thread this should not be required. if (flowed) flowToDisk(message) if (_queueInMemory < _queueMinMemory) startInhaler Pseudo-Code - Delivery to Queue addToQueue(message) if (flowed) flowToDisk(message) else if (_queueInMemory > _queueMaxMemory) setFlowed startPurger The additional overhead of checking state is done after the message deliveries have been performed and are simple calculations compared to the existing message flow paths. As a result the non-flowed state performance should not be affected. Implementation details for queue*Memory and flow state updatesHere are some further details on how the new queue*Memory values will be calculated. Implementation details for Delivery to Subscriber, this is taken from SimpleAMQQueue and is also used by AMQPriorityQueue. public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { // Current _queueDepth calcualations decrementQueueCount(); decrementQueueSize(entry); // Add update for _queueInMemory_ decrementQueueInMemorySize(entry); ... snip complete entry clean up, including any TransactionLog dequeue and QueueBacking delete ... if (_queueInMemory.get < _queueMinMemory.get()) { _asyncInhaler.execute(inhaler); } When the Inhaler has completely restored all messages to the queue it can call setFlowed(false) to continue normal message delivery. NOTE: It is expected that the Inhaler will have to run once more over the queue to ensure that no new messages were flowed between it retrieving all messages on disk and the resetting of the flowed status. Implementation details for Delivery to Queue, again this is taken from SimpleAMQQueue and is also used by AMQPriorityQueue. public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException { // Current _queueDepth calcualations incrementQueueCount(); incrementQueueSize(message); // Add update for _queueInMemory_ incrementQueueInMemorySize(message); // Add check to see if we should be flowing. if (_queueInMemory.get() > _queueMaxMemory.get()) { setFlowed(true); // if we are to have a purging thread then this is where it would be started. // _asyncPurger.execute(purger); } ... snip section about adding to queue and attempting deliveries ... // For the SimpleAMQQueue case messageAddedToEndOfQueue() will always return true. // The AMQPriorityQueue implementation will be more complicated. if ( _flowed.get && messageAddedToEndOfQueue() ) { entry.flow(); } QueueBackingThe flow to disk implementation will be performed on a queue by queue basis so a new QueueBacking will be created to handle the flowing of messages to and from disk. +-QueueBacking-+ | flow | | recover | | delete | +--------------+ When a message is dequeue then it must also be removed from QueueBacking. BackingFormatThe initial implementation of the QueueBacking will be FileQueueBacking. This will need a new configuration parameter 'flow-to-disk-path' it will default to '$QPID_WORK/queueBacking'. In this directory a new directory will be created to match the queue name that this backing represents. It is in this directory that the queue contents will be written. Each message will be written to its own file which will include the header and body. So the resulting file structure will be as follows: QPID_WORK/ /queueBacking/ /<queueName>/ /<bin>/<msgID> Whilst NTFS can store over 4M files per directory, FAT32 is limited to 65534 and ext3 only 32000. That coupled with the fact that looking up a file in a large directory is not efficient it makes most sense to implement a hash map on disk using the MessageID as the key. Using the least significant 8 bits as the hashing function will give us 512 bins to evenly spread the messages on disk. This approach will improve look ups times and allow us to write over 16M messages per queue before we hit any file system limits. ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() |
![]() |
Document generated by Confluence on May 26, 2010 10:32 |