Slow Consumer Disconnect (SCD) is a new feature in Qpid that provides a configurable mechanism to prevent a single slow consumer from causing a back up of unconsumed messages on the broker.
This is most relevant where Topics are in use, since a published message is not removed from the broker's memory until all subscribers have acknowledged that message.
Cases where a consumer is 'slow' can arise due to one of the following: poor network connectivity exists; a transient system issue affects a single client; a single subscriber written by a client team is behaving incorrectly and not acknowledging messages; a downstream resource such as a database is non-responsive.
SCD will enable the application owner to configure limits for a given consumer's queue and the behaviour to execute when those limits are reached.
SCD is only applicable to topics or durable subscriptions and can be configured on either a topic or a subscription name.
On triggering of a specified threshold the offending client will be disconnected from the broker with a 506 error code wrapped in a JMSException returned to the client via the ExceptionListener registered on the Connection object.
Note that it is essential that an ExceptionListener be specified by the client on creation of the connection and that exceptions coming back on that listener are handled correctly.
You can configure the frequency with which the SCD process will check for slow consumers, along with the unit of time used to specify that frequency.
The virtualhosts.virtualhost.hostname.slow-consumer-detection elements delay and timeunit are used to specify the frequency and timeunit respectively in the virtualhosts.xml file e.g.
<virtualhosts> <default>test</default> <virtualhost> <name>test</name> <test> <slow-consumer-detection> <delay>60<delay/> <timeunit>seconds<timeunit/> <slow-consumer-detection/> </test> </virtualhost> </virtualhosts>
When a Slow Consumer is disconnected, the client receives a 506 error from the broker wrapped in a JMSException and the Session and Connection are closed:
Dispatcher-Channel-1 2010-09-01 16:23:34,206 INFO [qpid.client.AMQSession.Dispatcher] Dispatcher-Channel-1 thread terminating for channel 1:org.apache.qpid.client.AMQSession_0_8@1de8aa8 pool-2-thread-3 2010-09-01 16:23:34,238 INFO [apache.qpid.client.AMQConnection] Closing AMQConnection due to :org.apache.qpid.AMQChannelClosedException: Error: Consuming to slow. [error code 506: resource error] javax.jms.JMSException: 506 at org.apache.qpid.client.AMQConnection.exceptionReceived(AMQConnection.java:1396) at org.apache.qpid.client.protocol.AMQProtocolHandler.exception(AMQProtocolHandler.java:329) at org.apache.qpid.client.protocol.AMQProtocolHandler.methodBodyReceived(AMQProtocolHandler.java:536) at org.apache.qpid.client.protocol.AMQProtocolSession.methodFrameReceived(AMQProtocolSession.java:453) at org.apache.qpid.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:93) at org.apache.qpid.client.protocol.AMQProtocolHandler$1.run(AMQProtocolHandler.java:462) at org.apache.qpid.pool.Job.processAll(Job.java:110) at org.apache.qpid.pool.Job.run(Job.java:149) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:619) Caused by: org.apache.qpid.AMQChannelClosedException: Error: Consuming to slow. [error code 506: resource error] at org.apache.qpid.client.handler.ChannelCloseMethodHandler.methodReceived(ChannelCloseMethodHandler.java:96) at org.apache.qpid.client.handler.ClientMethodDispatcherImpl.dispatchChannelClose(ClientMethodDispatcherImpl.java:163) at org.apache.qpid.framing.amqp_8_0.ChannelCloseBodyImpl.execute(ChannelCloseBodyImpl.java:140) at org.apache.qpid.client.state.AMQStateManager.methodReceived(AMQStateManager.java:112) at org.apache.qpid.client.protocol.AMQProtocolHandler.methodBodyReceived(AMQProtocolHandler.java:511) ... 8 more main 2010-09-01 16:23:34,316 INFO [apache.qpid.client.AMQSession] Closing session: org.apache.qpid.client.AMQSession_0_8@ffeef1
One key feature of SCD is the disconnection of a consuming client when a specified threshold is exceeded. For a pub-sub model using topics, this means that messages will no longer be delivered to the private queue which was associated with that consuming client, thus reducing any associated backlog in the broker.
For durable subscriptions, simply disconnecting the consuming client will not suffice since the associated queue is by definition durable and messages would continue to flow to it after disconnection, potentially worsening any backing up of data on the broker.
The solution is to configure durable subscriptions to delete the underlying queue on disconnection. This means that messages will no longer be delivered to the private queue associated with the subscription, thus preventing any backlog.
Full details of how to configure the thresholds are provided below.
You can configure SCD to be triggered on a topic or subscription when the oldest message in the associated private queue for the consumer ages beyond the specified value, in milliseconds.
You can opt to use the depth of the queue in bytes as a threshold. SCD will be triggered by a queue depth greater than the threshold specified i.e. when a broker receives a message that takes the queue depth over the threshold.
You can use the message count for the consumer's queue as the trigger, where a count higher than that specified will trigger disconnection.
You can configure the policy you wish to apply in your broker configuration. There are currently 2 policies available:
Delete Temporary Queues Only
If you do not specify a <topicDelete/> element in your configuration, then only temporary queues associated with a topic subscription will be deleted on client disconnect. This is the default behaviour.
Delete Durable Subscription Queues
If you add the <topicDelete/> element with the sub-element <delete-persistent/> to your config, then the persistent queue which is associated with durable subscriptions to a topic will also be deleted. This is an important consideration since without deleting the underlying queue the client's unconsumed data will grow indefinitely while they will be unable to reconnect to that queue due to the SCD threshold configured, potentially having an adverse effect on the application or broker in use.
Example Topic Configuration
The following steps are required to configure SCD:
Enable SCD checking for your virtual host
Specify frequency for SCD checking
Define thresholds for the topic
Define the policy to apply on trigger
The example below shows a simple definition, with all three thresholds specified and a simple disconnection, with deletion of any temporary queue, defined.
For a durable subscription to this topic, no queue deletion would be applied on disconnect - which is likely to be undesirable (see section above).
<topics> <topic> <name>stocks.us.*</name> <slow-consumer-detection> <!-- The maximum depth before which --> <!-- the policy will be applied--> <depth>4235264</depth> <!-- The maximum message age before which --> <!-- the policy will be applied--> <messageAge>600000</messageAge> <!-- The maximum number of message before --> <!-- which the policy will be applied--> <messageCount>50</messageCount> <!-- Policy Selection --> <policy name="TopicDelete"/> </slow-consumer-detection> </topic> </topics>
Client application developers should be educated about how to correctly handle being disconnected with a 506 error code, to avoid them getting into a thrashing state where they continually attempt to connect, fail to consume fast enough and are disconnected again.
Clients affected by slow consumer disconnect configuration should always use transactions where duplicate processing of an incoming message would have adverse affects, since they may receive a message more than once if disconnected before acknowledging a message in flight.