Apache Qpid : 0.6 Java Client Dispatcher Changes
This page last changed on Sep 28, 2009 by ritchiem.
Java Client Dispatcher Changes.Investigation of QPID-1871 has highlighted a race condition between the Dispatcher and the clients request to rollback. Problem SummaryThe problem here is that the Dispatcher has the ability to hold on to a message so when the rollback Operation DetailsDue to the way that the AMQSession.Dispatcher is paused when a rollback operation is in progress it is possible that the Dispatcher thread is 'holding' a message for dispatch. The main loop of AMQSession.Dispatcher is shown here: while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null)) { disp.dispatch(AMQSession.this); } The problem is highlighted in the dispatchMessage call below (which is the result of disp.dispatch() on an UnprocessedMessage). If the Dispatcher is in the process of dispatching messages when a second thread calls rollback then the connection will be stopped and the dispatcher can remove a message from _queue and then stop in the dispatchMessage private void dispatchMessage(UnprocessedMessage message) { long deliveryTag = message.getDeliveryTag(); synchronized (_lock) { try { while (connectionStopped()) { _lock.wait(); } } catch (InterruptedException e) { // pass } if (!(message instanceof CloseConsumerMessage) && tagLE(deliveryTag, _rollbackMark.get())) { rejectMessage(message, true); } else { synchronized (_messageDeliveryLock) { notifyConsumer(message); } } } long current = _rollbackMark.get(); if (updateRollbackMark(current, deliveryTag)) { _rollbackMark.compareAndSet(current, deliveryTag); } } When the connection is resumed the deliveryTag of the current message will be 'less than or equal' to the _rollbackMark as this has been set to the highest deliveryTag received prior to rollback. _rollbackMark.set(_highestDeliveryTag.get()); There are no guards in the code to stop the IO layer adding a new message to _queue whilst rollback is in progress. However, both 0-8 and 0-10 ensure that message flow has stopped whilst recovery is processed. The 0-8 sets ChannelFlow=false and waits for the Ok, in 0-10 the consumers are stopped and a sync performed. Code ProblemThe investigation of this problem has highlighted a two areas which need to be addressed:
How the Dispatcher holds a messageThe _queue.take() call is guaranteed never to return null and once we have entered the take() call there is no way to stop the Dispatcher. while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null)) Hence we perform the stop as soon as possible after the take(), but this results in us holding on to a message. Ideally we need to be able to stop the Dispatcher whilst it is in the take() method. How the Dispatcher can keep processing.The Dispatcher is currently uses the connecitonStopped() call to suspend its activities when the connection has been marked as stopped. However, we need to know that the Dispatcher has actually hit this section otherwise we need to guarantee that the _queue is empty. synchronized (_lock) { try { while (connectionStopped()) { _lock.wait(); } } catch (InterruptedException e) { // pass } Having the Dispatcher signal that it has stopped processing will allow us to know that we have hit the stopped state. However, this will mean that we have the opportunity to process one extra message AFTER the rollback command has been requested. Further DetailsAfter a discussion with Rafi/Rob on the recent Python changes expending effort in refactoring the client is probably not worth the effort. If the client message delivery were re-written to mirror the approach taken in the Python codebase then it would be simplier and easier to reason about. As a result I have devised a much smaller, though slightly ugly approach that will address our immediate rollback issues. The approach can be found here . Comment Responses
|
![]() |
Document generated by Confluence on May 26, 2010 10:33 |