Apache Qpid : Current Architecture
This page last changed on Sep 16, 2009 by aidan.
Current implementationInside Qpid, data is read from a socket and placed in a buffer. A separate thread then takes this buffer and attempts to parse it as an AMQP command. this AMQP command is then put on a second buffer. Finally a third thread reads the command and processes it. Currently the two buffers between these three threads are unbounded. This means that data is read from the network as fast as possible with no regard as to whether the broker has the capacity to process it. Queues are themselves a kind of buffer between client applications. From a queue the message can be assigned to be send to a client. At this point a delivery command is placed in another buffer awaiting sending on the network. When received by the client a similar process to receiving on the broker occurs The whole process looks something like this Client App sends message -> (MINA Buffer) Or, pictorally: Of all the buffers above, only the TCP buffers are bounded (the Delivery Queue Buffer in the client is potentially bounded by prefetch, although prefetch is not set on bytes but on messages which may be of arbitrary size), every other buffer is a potential source of out of memory exceptions. From the above we can see that there are many potential sources of OutOfMemoryExceptions. We need to consider where we may get unbounded growth, what scenarios will cause that, and what other ways we have to mitigate those risks. In general we get growth of the IO (MINA) buffers when sender and receiver are operating at mismatched rates (i.e. the Client and Broker). We will get unbounded growth of the queue if the sending client is producing at a faster rate than the receiving client can process. Issues
Current implementationBrokerCurrently the broker decodes the incoming network data, adds the frames to a Job queue which are then processed as Events by AMQPFastProtocolHandler which passes the majority of the work to AMQMinaProtocolSession. Often this results in a FrameHandler being called. On the outbound route Frames are written to AMQMinaProtocolSession which calls IoSession.writeFrame which passes the data to Mina for writing to the wire. Client connection creationWhen the client creates a connection it creates an AMQConnectionDelegate for the protocol version it requires and passes the new protocol handler to TransportConnection which creates a socket of the requested type (new TCP socket, existing TCP socket or InVM). It then attaches the socket to the protocol handler which init()s a new ProtocolSession which begins version negotiation with the broker. Client processingOnce a socket has been opened the client processes data similarly to the broker, decoding frames using AMQDecoder and passing them to AMQProtocolHandler which, normally, calls a frame handler to perform the actual work. If this frame is one which has a listener waiting for it, those listeners are notified. Outgoing data is generated in AMQSession or it's delegate and written to AMQProtocolHandler, optionally with a return frame to wait for. This is passed to Mina directly. If the frame is a BasicDeliver containing message payload, it adds an UnprocessedMessage to the session which then waits for the ContentHeaderBody and ContentBody payloads to arrive. Once all the expected bodies have been recieved, the complete message is given to the AMQSession for that channel. The AMQSession instance adds the message to it's internal delivery queue and any locks waiting on the queue are notified. The Dispatcher thread takes the message and delivers it to one of the consumers. The BasicMessageConsumer converst the UnprocessedMessage to an AbstractJMSMessage and then either delivers it to MessageListener if one has been set or stores it on an queue which is popped when the application calls the consumers recieve() method. ![]() ![]() ![]() ![]() |
![]() |
Document generated by Confluence on May 26, 2010 10:33 |