This page last changed on Sep 29, 2009 by ritchiem.

BasicFlow Synchronisation Design

The recently fixed QPID-1871/0.6 Java Client Dispatcher Changes raised the issue of QPID-2116 which is described here in more detail.

Problem

The initial problem under investigation(QPID-1871) was the Java client and when used with the Java Broker (0-8/9) protocol was resulting in messages returning out of order after a TxRollback. The initial analysis highlighted the Dispatcher as holding an incomming message and acking it late. The fix for QPID-1871 does not remove the potential for the Dispatcher to hold a message rather it attempts to clear all messages that the Dispatcher has to process so that none can be left behind. The change has fixed the related issue with the 0-10/CPP Broker code path but the 0-8/Java broker was still failing. On further investigation it appears that the reason the Dispatcher had a message to reject after completion in the Java case was due to the broker violating the Flow status of the Channel.

The log extracts from the RollbackOrderTest highlight that whilst the Client has requested Flow = false and the Broker acknowledges with a FlowOk, one last message is still sent to the Client.

// Client Requests Flow = false/suspended
main 2009-09-28 13:09:20,994 DEBUG [apache.qpid.client.AMQSession] Setting channel flow : suspended
...
// Broker Receives request acknowledges with FlowOk (not logged)
pool-1-thread-1 2009-09-28 13:09:21,020 DEBUG [qpid.server.protocol.AMQProtocolSession] Frame Received: Frame channelId: 1, bodyFrame: [ChannelFlowBodyImpl: active=false]
pool-1-thread-1 2009-09-28 13:09:21,020 INFO [qpid.message] MESSAGE [con:0(guest@anonymous(11875256)/test)/ch:1] [con:0(guest@anonymous(11875256)/test)/ch:1] CHN-1002 : Flow Stopped
pool-1-thread-1 2009-09-28 13:09:21,020 DEBUG [qpid.server.handler.ChannelFlowHandler] Channel.Flow for channel 1, active=false
...
// Asynchronous delivery thread can be seen to be delivering a message to the client
pool-1-thread-2 2009-09-28 13:09:21,046 DEBUG [qpid.server.subscription.SubscriptionImpl] (25976423) checking filters for message ((HC:32576775 ID:21 Ref:1)
pool-1-thread-2 2009-09-28 13:09:21,046 DEBUG [apache.qpid.server.AMQChannel] 1(8529229) Adding unacked message(Message[(HC:32576775 ID:21 Ref:1)]: 21; ref count: 1 DT:289) with a queue(org.apache.qpid.server.queue.SimpleAMQQueue@1c5ddd3) for [channel=[anonymous(11875256)(guest):1], consumerTag=1, session=anonymous(11875256)]
...
// Client receives FlowOk message
pool-1-thread-1 2009-09-28 13:09:21,086 DEBUG [qpid.client.protocol.AMQProtocolHandler] (2443549)Method frame received: [ChannelFlowOkBodyImpl: active=false]
pool-1-thread-1 2009-09-28 13:09:21,086 DEBUG [qpid.client.handler.ChannelFlowOkMethodHandler] Received Channel.Flow-Ok message, active = false
...
// Client receives additional message
pool-1-thread-4 2009-09-28 13:09:21,143 DEBUG [qpid.client.protocol.AMQProtocolHandler] (2443549)Method frame received: [BasicDeliverBodyImpl: consumerTag=1, deliveryTag=289, redelivered=true, exchange=amq.direct, routingKey=RollbackOrderTest-testOrderingAfterRollback]
pool-1-thread-4 2009-09-28 13:09:21,144 DEBUG [qpid.client.handler.BasicDeliverMethodHandler] New JmsDeliver method received:org.apache.qpid.client.protocol.AMQProtocolSession@c22a3b
pool-1-thread-4 2009-09-28 13:09:21,144 DEBUG [apache.qpid.client.AMQSession] Message[ContentHeader org.apache.qpid.framing.ContentHeaderBody@1e5ba24] received in session 

The additional message that is received causes the Dispatcher to awake from its _queue.take() call and hold until the connection is restarted. At this point the message is rejected as its delivery tag is old. However, the Java Broker has already resent the message with a new delivery tag.

Solution

What needs to happen is that when the the Channel is suspended we must ensure that all Subscriptions have noticed the change in state. This can be done by taking and releasing each of the Subscriptions send locks.

Document generated by Confluence on May 26, 2010 10:32