-*-org-*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. * A new design for Qpid clustering. ** Issues with old cluster design See [[./old-cluster-issues.txt]] ** A new cluster design. 1. Clearly defined interface between broker code and cluster plug-in. 2. Replicate queue events rather than client data. - Only requires consistent enqueue order. - Events only need be serialized per-queue, allows concurrency between queues - Allows for replicated and non-replicated queues. 3. Use a lock protocol to agree order of dequeues: only the broker holding the lock can acqiure & dequeue. No longer relies on identical state and lock-step behavior to cause identical dequeues on each broker. 4. Use multiple CPG groups to process different queues in parallel. Use a fixed set of groups and hash queue names to choose the group for each queue. *** Requirements The cluster must provide these delivery guarantees: - client sends transfer: message must be replicated and not lost even if the local broker crashes. - client acquires a message: message must not be delivered on another broker while acquired. - client accepts message: message is forgotten, will never be delivered or re-queued by any broker. - client releases message: message must be re-queued on cluster and not lost. - client rejects message: message must be dead-lettered or discarded and forgotten. - client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster. Each guarantee takes effect when the client receives a *completion* for the associated command (transfer, acquire, reject, accept) *** Broker receiving messages On recieving a message transfer, in the connection thread we: - multicast a message-received event. - enqueue and complete the transfer when it is self-delivered. Other brokers enqueue the message when they recieve the message-received event. Enqueues are queued up with other queue operations to be executed in the thread context associated with the queue. *** Broker sending messages: moving queue ownership Each queue is *owned* by at most one cluster broker at a time. Only that broker may acquire or dequeue messages. The owner multicasts notification of messages it acquires/dequeues to the cluster. Periodically the owner hands over ownership to another interested broker, providing time-shared access to the queue among all interested brokers. We assume the same IO-driven dequeuing algorithm as the standalone broker with one modification: queues can be "locked". A locked queue is not available for dequeuing messages and will be skipped by the output algorithm. At any given time only those queues owned by the local broker will be unlocked. As messages are acquired/dequeued from unlocked queues by the IO threads the broker multicasts acquire/dequeue events to the cluster. When an unlocked queue has no more consumers with credit, or when a time limit expires, the broker relinquishes ownership by multicasting a release-queue event, allowing another interested broker to take ownership. *** Asynchronous completion of accept In acknowledged mode a message is not forgotten until it is accepted, to allow for requeue on rejection or crash. The accept should not be completed till the message has been forgotten. On receiving an accept the broker: - dequeues the message from the local queue - multicasts an "accept" event - completes the accept asynchronously when the dequeue event is self delivered. NOTE: The message store does not currently implement asynchronous completions of accept, this is a bug. *** Multiple CPG groups. The old cluster was bottlenecked by processing everything in a single CPG deliver thread. The new cluster uses a set of CPG groups, one per core. Queue names are hashed to give group indexes, so statistically queues are likely to be spread over the set of groups. Operations on a given queue always use the same group, so we have order within each queue, but operations on different queues can use different groups giving greater throughput sending to CPG and multiple handler threads to process CPG messages. ** Inconsistent errors. An inconsistent error means that after multicasting an enqueue, accept or dequeue, some brokers succeed in processing it and others fail. The new design eliminates most sources of inconsistent errors in the old broker: connections, sessions, security, management etc. Only store journal errors remain. The new inconsistent error protocol is similar to the old one with one major improvement: brokers do not have to stall processing while an error is being resolved. ** Updating new members When a new member (the updatee) joins a cluster it needs to be brought up to date with the rest of the cluster. An existing member (the updater) sends an "update". In the old cluster design the update is a snapshot of the entire broker state. To ensure consistency of the snapshot both the updatee and the updater "stall" at the start of the update, i.e. they stop processing multicast events and queue them up for processing when the update is complete. This creates a back-log of work to get through, which leaves them lagging behind the rest of the cluster till they catch up (which is not guaranteed to happen in a bounded time.) With the new cluster design only exchanges, queues, bindings and messages need to be replicated. We update individual objects (queues and exchanges) independently. - create queues first, then update all queues and exchanges in parallel. - multiple updater threads, per queue/exchange. Queue updater: - marks the queue position at the sync point - sends messages starting from the sync point working towards the head of the queue. - send "done" message. Queue updatee: - enqueues received from CPG: add to back of queue as normal. - dequeues received from CPG: apply if found, else save to check at end of update. - messages from updater: add to the *front* of the queue. - update complete: apply any saved dequeues. Exchange updater: - updater: send snapshot of exchange as it was at the sync point. Exchange updatee: - queue exchange operations after the sync point. - when snapshot is received: apply saved operations. Note: - Updater is active throughout, no stalling. - Consuming clients actually reduce the size of the update. - Updatee stalls clients until the update completes. (Note: May be possible to avoid updatee stall as well, needs thought) ** Internal cluster interface The new cluster interface is similar to the MessageStore interface, but provides more detail (message positions) and some additional call points (e.g. acquire) The cluster interface captures these events: - wiring changes: queue/exchange declare/bind - message enqueued/acquired/released/rejected/dequeued. - transactional events. ** Maintainability This design gives us more robust code with a clear and explicit interfaces. The cluster depends on specific events clearly defined by an explicit interface. Provided the semantics of this interface are not violated, the cluster will not be broken by changes to broker code. The cluster no longer requires identical processing of the entire broker stack on each broker. It is not affected by the details of how the broker allocates messages. It is independent of the protocol-specific state of connections and sessions and so is protected from future protocol changes (e.g. AMQP 1.0) A number of specific ways the code will be simplified: - drop code to replicate management model. - drop timer workarounds for TTL, management, heartbeats. - drop "cluster-safe assertions" in broker code. - drop connections, sessions, management from cluster update. - drop security workarounds: cluster code now operates after message decoding. - drop connection tracking in cluster code. - simper inconsistent-error handling code, no need to stall. ** Performance The standalone broker processes _connections_ concurrently, so CPU usage increases as you add more connections. The new cluster processes _queues_ concurrently, so CPU usage increases as you add more queues. In both cases, CPU usage peaks when the number of "units of concurrency" (connections or queues) goes above the number of cores. When all consumers on a queue are connected to the same broker the new cluster uses the same messagea allocation threading/logic as a standalone broker, with a little extra asynchronous book-keeping. If a queue has multiple consumers connected to multiple brokers, the new cluster time-shares the queue which is less efficient than having all consumers on a queue connected to the same broker. ** Flow control New design does not queue up CPG delivered messages, they are processed immediately in the CPG deliver thread. This means that CPG's flow control is sufficient for qpid. ** Live upgrades Live upgrades refers to the ability to upgrade a cluster while it is running, with no downtime. Each brokers in the cluster is shut down, and then re-started with a new version of the broker code. To achieve this - Cluster protocl XML file has a new element attached to each method. This is the version at which the method was added. - New versions can only add methods, existing methods cannot be changed. - The cluster handshake for new members includes the protocol version at each member. - Each cpg message starts with header: version,size. Allows new encodings later. - Brokers ignore messages of higher version. - The cluster's version is the lowest version among its members. - A newer broker can join and older cluster. When it does, it must restrict itself to speaking the older version protocol. - When the cluster version increases (because the lowest version member has left) the remaining members may move up to the new version. * Design debates ** Active/active vs. active passive An active-active cluster can be used in an active-passive mode. In this mode we would like the cluster to be as efficient as a strictly active-passive implementation. An active/passive implementation allows some simplifications over active/active: - drop Queue ownership and locking - don't need to replicate message acquisition. - can do immediate local enqueue and still guarantee order. Active/passive introduces a few extra requirements: - Exactly one broker has to take over if primary fails. - Passive members must refuse client connections. - On failover, clients must re-try all known addresses till they find the active member. Active/active benefits: - A broker failure only affects the subset of clients connected to that broker. - Clients can switch to any other broker on failover - Backup brokers are immediately available on failover. - As long as a client can connect to any broker in the cluster, it can be served. Active/passive benefits: - Don't need to replicate message allocation, can feed consumers at top speed. Active/passive drawbacks: - All clients on one node so a failure affects every client in the system. - After a failure there is a "reconnect storm" as every client reconnects to the new active node. - After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary. - Clients must find the single active node, may involve multiple connect attempts. - No service if a partition separates a client from the active broker, even if the client can see other brokers.