Apache Qpid : Java Broker Design - Message Representation
This page last changed on Jun 16, 2009 by ritchiem.
Message RepresentationFollowing on from the high level design this page will provide a more detailed design of the changes to the broker. OverviewCurrently the broker uses two implementations of AMQMessageHandle, one transient, one persistent, which actively stores and retrieves the underlying data. The AMQMessage class should provide immutable access to the message data (header and body) stored within the broker, there should be a 1:1 relationship between AMQMessage objects and messages delivered through the broker. Currently the AMQMessage is responsible for holding a count of references but the responsibility for maintaining this value is spread throughout the code base. This responsibility needs to be given to a single class so that we can more easily reason about and test its functionality. This work is part of the MessageStore refactoring. DesignAs part of the initial high level design the following interfaces were designed. Message Representation InterfaceMessage public interface Message<Header extends MessageHeader> { /** * Add additional content data to this message. * * @param buffer The data that is to be added. */ void addContent(ByteBuffer buffer); /** * Signal that all content has been received. * * The returned CompleteFuture can then be used to ensure that all * persistent data has been safely persisted. * * @return CompleteFuture to ensure data is persisted. */ CompleteFuture contentComplete(); /** * Get the Message ID for this Message * * @return the message id */ long getMessageID(); /** * Retrieve the Header for this Message * The return is parameterised so the caller does not need to immediately * cast from the {@link MessageHeader} that the MessageStore knows about. * * @return the Header for this Message */ Header getHeader(); /** * Write the Message to the specified ByteBuffer. * * If the Message does not fit into the specified ByteBuffer then a * subsequent call must be made using the offset value. * * The offset value is the offset in to the underlying data of the Message. * It is needed on subsequent calls if the first ByteBuffer provided was * too small. * * The method will return zero when no data has been written to the Buffer. * * @param offset position in the underlying data start from which to start * to the ByteBuffer. * @param length of data to write into the ByteBuffer * @param buffer to write into * * @return the amount of data written to the buffer */ long writeContentToBuffer(long offset, long length, ByteBuffer buffer); /** * Create a new Reference to this Message * * @param referer the object that is refering to this Message. * * @return a MessageReference. */ MessageReference newReference(Object referer); /** * Signal to the MessageStore that the requestor is interested in this * Message. * * This means that the requestor is likely to use the Message shortly so * the MessageStore would be advised to keep this Message in memory. * * @param requestor that is interested in this Message * * @return a MessageInterest */ MessageInterest newInterest(Object requestor); } The new Message interface unifies the current IncomingMessage, AMQMessage and the Store operations becoming a container for the ByteBuffers of data that make up the content of the Message. The Message becomes a proxy for the MessageStore allowing incoming ByteBuffers of content to be added and the ability to use a future to check that the data has been safely stored by the MessageStore. CompleteFuture public interface CompleteFuture { /** Block the calling thread the task that created this future has completed. */ void complete(); /** * Non-Blocking call to test that the task that created this future * has completed. * * @return true if the task is complete */ boolean isComplete(); } To enable support for multiple protocols the type of MessageHeader will depend on the protocol of the publishing connection. In addition the Message will be responsible for writing its self to a given ByteBuffer. This allows the Framing layer to be responsible for setting up the ByteBuffer and then requesting that the Message fill in its data. Unified Reference Counting.Reference counting is performed in a number of locations in the code so the new Message interface provides a new approach to handling these references. The previous approach used an AtomicInteger count to know when a Message was no longer required this made debugging difficult and so the newReference call takes the object that is referring to this Message. This will allow for the potential to provide a list of what is referencing this Message. The returned MessageReference object will have a release() to relinquish the reference. However, by also using a the finalizer to automatically release the reference at GC time instances we do not explicitly need to call release(). This pattern is also going to be used for the new concept of Interest. MessageReference/Interest public abstract class MessageReference/Interest { /** Release the reference in the underlying Message. */ abstract void release(); /** * Finalise - AutoRelease * @throws Throwable */ protected final void finalize() throws Throwable { release(); super.finalize(); } } Message InterestAs mentioned above in Reference Counting, Message Interest is a new concept for the Java Broker. This allows the running broker to provide a hint to the MessageStore that it is interested in the Message and is likely to use it shortly. This will enable the MessageStore to better control its memory usage. Messages that have interested parties could be safely removed from memory such as by a Flow to Disk routine. However, if Interest is requested then it would be a beneficial to load this message back in to memory or performance will suffer. Message Interests will follow the same pattern as MessageReference, interest will be requested by an Object and the resulting MessageInterest will allow that interest to be explicitly released. MessageHeaderThe current header (ContentHeaderBody) is used to access the header properties and protocol frame values, as a result it is focused on a single protocol version. The introduction of a number of MessageHeader interfaces will allow the various layers to have their own MessageHeader interface and in so doing limit the required access to the header. The main interface will contain all the existing common properties accessors (CommonContentHeaderProperties). However, the interface should only provide accessors to the data as one of the goals is of this change is to create immutable messages. There is an argument that the broker may wish to add additional data to the message such as routing details. This could be performed in a similar way to SMTP by the addition of properties to the Message. An alternative approach would be to encapsulate original message in a new message. As the broker does not currently need to add any additional properties to messages that it delivers we can delay the decision/inclusion of this feature until the AMQP-WG decide what approach they want implementers to use. MessageHeader public interface MessageHeader { /* The size in bytes of the header. */ long getHeaderSize(); /* The size in bytes of the message body content. */ long getBodySize(); /* * General getter to allow arbitrary properties to be * used by JMS Selectors */ Object getProperty(String key); /* * Getters for Common Header properties primarily used * by JMS Selectors */ AMQShortString getAppId(); AMQShortString getContentType(); AMQShortString getCorrelationId(); byte getDeliveryMode(); AMQShortString getEncoding(); long getExpiration(); AMQShortString getMessageId(); byte getPriority(); AMQShortString getReplyTo(); AMQShortString getTimestamp(); AMQShortString getMessageType(); AMQShortString getUserId(); } The changes to the storage classes allow the introduction of an interface to a MessageHeader that is specific to the store classes. This interface will allow 3rd party storage plugins written in the storage layer to be independant of any future protocol changes. store.MessageHeader public interface MessageHeader { /** * is this Message a Persistent Message? * * @return true if it is persistent */ boolean isPersistent(); /** * Write the MessageHeader to the specified ByteBuffer in the correct * format for the specified Session. * * If the MessageHeader does not fit into the specified ByteBuffer then a * subsequent call must be made using the offset value. * * The offset value is the offset in to the underlying data of the * MessageHeader. It is needed on subsequent calls if the first ByteBuffer * provided was too small. * * The method will return zero when no data has been written to the Buffer. * * @param session the target session for which this Header should be * formatted. * @param offset position in the underlying data start from which to start * to the ByteBuffer. * @param length of data to write into the ByteBuffer * @param buffer to write into * * @return the amount of data written to the buffer */ long writeContentToBuffer(Session session, long offset, long length, ByteBuffer buffer); } In a similar vein, a new Framing layer MessageHeader will allow access to only the properties/methods that are needed by that layer. This approach allows the Framing layer to be independent of the message header/data it is transporting and focus on the correct formatting of frames for the given consumer. framing.MessageHeader public interface MessageHeader { long getHeaderSize(); long getBodySize(); long writeContentToBuffer(Session session, long offset, long length, ByteBuffer buffer); } As we will eventually have clients connected on a variety of protocol versions isolating the framing layer from any transformations that must be performed through the MessageHeader interface should reduce any potential complexity at this layer and require that the MessageHeader is capable of writing itself to a ByteBuffer for the given session. MessageHeaderFactory public interface MessageHeaderFactory<Header extends MessageHeader> { /** * Convert the given ByteBuffer in to an MessageHeader * @param buffer containing the data to convert in to an MessageHeader * @return the MessageHeader that was in the buffer */ Header createMessageHeaderFromBuffer(ByteBuffer buffer); } Technical DesignTBC |
![]() |
Document generated by Confluence on May 26, 2010 10:32 |