-*-org-*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. * A new design for Qpid clustering. ** Issues with current design. The cluster is based on virtual synchrony: each broker multicasts events and the events from all brokers are serialized and delivered in the same order to each broker. In the current design raw byte buffers from client connections are multicast, serialized and delivered in the same order to each broker. Each broker has a replica of all queues, exchanges, bindings and also all connections & sessions from every broker. Cluster code treats the broker as a "black box", it "plays" the client data into the connection objects and assumes that by giving the same input, each broker will reach the same state. A new broker joining the cluster receives a snapshot of the current cluster state, and then follows the multicast conversation. *** Maintenance issues. The entire state of each broker is replicated to every member: connections, sessions, queues, messages, exchanges, management objects etc. Any discrepancy in the state that affects how messages are allocated to consumers can cause an inconsistency. - Entire broker state must be faithfully updated to new members. - Management model also has to be replicated. - All queues are replicated, can't have unreplicated queues (e.g. for management) Events that are not deterministically predictable from the client input data stream can cause inconsistencies. In particular use of timers/timestamps require cluster workarounds to synchronize. A member that encounters an error which is not encounted by all other members is considered inconsistent and will shut itself down. Such errors can come from any area of the broker code, e.g. different ACL files can cause inconsistent errors. The following areas required workarounds to work in a cluster: - Timers/timestamps in broker code: management, heartbeats, TTL - Security: cluster must replicate *after* decryption by security layer. - Management: not initially included in the replicated model, source of many inconsistencies. It is very easy for someone adding a feature or fixing a bug in the standalone broker to break the cluster by: - adding new state that needs to be replicated in cluster updates. - doing something in a timer or other non-connection thread. It's very hard to test for such breaks. We need a looser coupling and a more explicitly defined interface between cluster and standalone broker code. *** Performance issues. Virtual synchrony delivers all data from all clients in a single stream to each broker. The cluster must play this data thru the full broker code stack: connections, sessions etc. in a single thread context in order to get identical behavior on each broker. The cluster has a pipelined design to get some concurrency but this is a severe limitation on scalability in multi-core hosts compared to the standalone broker which processes each connection in a separate thread context. ** A new cluster design. Clearly defined interface between broker code and cluster plug-in. Replicate queue events rather than client data. - Broker behavior only needs to match per-queue. - Smaller amount of code (queue implementation) that must behave predictably. - Events only need be serialized per-queue, allows concurrency between queues Use a moving queue ownership protocol to agree order of dequeues. No longer relies on identical state and lock-step behavior to cause identical dequeues on each broker. Each queue has an associated thread-context. Events for a queue are executed in that queues context, in parallel with events for other queues. *** Requirements The cluster must provide these delivery guarantees: - client sends transfer: message must be replicated and not lost even if the local broker crashes. - client acquires a message: message must not be delivered on another broker while acquired. - client accepts message: message is forgotten, will never be delivered or re-queued by any broker. - client releases message: message must be re-queued on cluster and not lost. - client rejects message: message must be dead-lettered or discarded and forgotten. - client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster. Each guarantee takes effect when the client receives a *completion* for the associated command (transfer, acquire, reject, accept) *** Broker receiving messages On recieving a message transfer, in the connection thread we: - multicast a message-received event. - enqueue and complete the transfer when it is self-delivered. Other brokers enqueue the message when they recieve the message-received event. Enqueues are queued up with other queue operations to be executed in the thread context associated with the queue. *** Broker sending messages: moving queue ownership Each queue is *owned* by at most one cluster broker at a time. Only that broker may acquire or dequeue messages. The owner multicasts notification of messages it acquires/dequeues to the cluster. Periodically the owner hands over ownership to another interested broker, providing time-shared access to the queue among all interested brokers. We assume the same IO-driven dequeuing algorithm as the standalone broker with one modification: queues can be "locked". A locked queue is not available for dequeuing messages and will be skipped by the output algorithm. At any given time only those queues owned by the local broker will be unlocked. As messages are acquired/dequeued from unlocked queues by the IO threads the broker multicasts acquire/dequeue events to the cluster. When an unlocked queue has no more consumers with credit, or when a time limit expires, the broker relinquishes ownership by multicasting a release-queue event, allowing another interested broker to take ownership. *** Asynchronous completion of accept ### HERE In acknowledged mode a message is not forgotten until it is accepted, to allow for requeue on rejection or crash. The accept should not be completed till the message has been forgotten. On receiving an accept the broker: - dequeues the message from the local queue - multicasts an "accept" event - completes the accept asynchronously when the dequeue event is self delivered. NOTE: The message store does not currently implement asynchronous completions of accept, this is a bug. ** Inconsistent errors. The new design eliminates most sources of inconsistent errors (connections, sessions, security, management etc.) The only points where inconsistent errors can occur are at enqueue and dequeue (most likely store-related errors.) The new design can use the exisiting error-handling protocol with one major improvement: since brokers are no longer required to maintain identical state they do not have to stall processing while an error is being resolved. #TODO: The only source of dequeue errors is probably an unrecoverable journal failure. ** Updating new members When a new member (the updatee) joins a cluster it needs to be brought up to date with the rest of the cluster. An existing member (the updater) sends an "update". In the old cluster design the update is a snapshot of the entire broker state. To ensure consistency of the snapshot both the updatee and the updater "stall" at the start of the update, i.e. they stop processing multicast events and queue them up for processing when the update is complete. This creates a back-log of work to get through, which leaves them lagging behind the rest of the cluster till they catch up (which is not guaranteed to happen in a bounded time.) With the new cluster design only exchanges, queues, bindings and messages need to be replicated. Update of wiring (exchanges, queues, bindings) is the same as current design. Update of messages is different: - per-queue rather than per-broker, separate queues can be updated in parallel. - updates queues in reverse order to eliminate unbounded catch-up - does not require updater & updatee to stall during update. Replication events, multicast to cluster: - enqueue(q,m): message m pushed on back of queue q . - acquire(q,m): mark m acquired - dequeue(q,m): forget m. Messages sent on update connection: - update_front(q,m): during update, receiver pushes m to *front* of q - update_done(q): during update, update of q is complete. Updater: - when updatee joins set iterator i = q.end() - while i != q.begin(): --i; send update_front(q,*i) to updatee - send update_done(q) to updatee Updatee: - q initially in locked state, can't dequeue locally. - start processing replication events for q immediately (enqueue, dequeue, acquire etc.) - receive update_front(q,m): q.push_front(m) - receive update_done(q): q can be unlocked for local dequeing. Benefits: - Stall only for wiring update: updater & updatee can process multicast messages while messages are updated. - No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update. - During update consumers actually help by removing messages before they need to be updated. - Needs no separate "work to do" queue, only the broker queues themselves. # TODO how can we recover from updater crashing before update complete? # Clear queues that are not updated & send request for udpates on those queues? # TODO updatee may receive a dequeue for a message it has not yet seen, needs # to hold on to that so it can drop the message when it is seen. # Similar problem exists for wiring? ** Cluster API The new cluster API is similar to the MessageStore interface. (Initially I thought it would be an extension of the MessageStore interface, but as the design develops it seems better to make it a separate interface.) The cluster interface captures these events: - wiring changes: queue/exchange declare/bind - message enqueued/acquired/released/rejected/dequeued. The cluster will require some extensions to the Queue: - Queues can be "locked", locked queues are ignored by IO-driven output. - Cluster must be able to apply queue events from the cluster to a queue. These appear to fit into existing queue operations. ** Maintainability This design gives us more robust code with a clear and explicit interfaces. The cluster depends on specific events clearly defined by an explicit interface. Provided the semantics of this interface are not violated, the cluster will not be broken by changes to broker code. The cluster no longer requires identical processing of the entire broker stack on each broker. It is not affected by the details of how the broker allocates messages. It is independent of the protocol-specific state of connections and sessions and so is protected from future protocol changes (e.g. AMQP 1.0) A number of specific ways the code will be simplified: - drop code to replicate management model. - drop timer workarounds for TTL, management, heartbeats. - drop "cluster-safe assertions" in broker code. - drop connections, sessions, management from cluster update. - drop security workarounds: cluster code now operates after message decoding. - drop connection tracking in cluster code. - simper inconsistent-error handling code, no need to stall. ** Performance The only way to verify the relative performance of the new design is to prototype & profile. The following points suggest the new design may scale/perform better: Some work moved from virtual synchrony thread to connection threads: - All connection/session logic moves to connection thread. - Exchange routing logic moves to connection thread. - On local broker dequeueing is done in connection thread - Local broker dequeue is IO driven as for a standalone broker. For queues with all consumers on a single node dequeue is all IO-driven in connection thread. Pay for time-sharing only if queue has consumers on multiple brokers. Doing work for different queues in parallel scales on multi-core boxes when there are multiple queues. One difference works against performance, thre is an extra encode/decode. The old design multicasts raw client data and decodes it in the virtual synchrony thread. The new design would decode messages in the connection thread, re-encode them for multicast, and decode (on non-local brokers) in the virtual synchrony thread. There is extra work here, but only in the *connection* thread: on a multi-core machine this happens in parallel for every connection, so it probably is not a bottleneck. There may be scope to optimize decode/re-encode by re-using some of the original encoded data, this could also benefit the stand-alone broker. ** Asynchronous queue replication The existing "asynchronous queue replication" feature maintains a passive backup passive backup of queues on a remote broker over a TCP connection. The new cluster replication protocol could be re-used to implement asynchronous queue replication: its just a special case where the active broker is always the queue owner and the enqueue/dequeue messages are sent over a TCP connection rather than multicast. The new update update mechanism could also work with 'asynchronous queue replication', allowing such replication (over a TCP connection on a WAN say) to be initiated after the queue had already been created and been in use (one of the key missing features). ** Increasing Concurrency and load sharing The current cluster is bottlenecked by processing everything in the CPG deliver thread. By removing the need for identical operation on each broker, we open up the possiblility of greater concurrency. Handling multicast enqueue, acquire, accpet, release etc: concurrency per queue. Operatons on different queues can be done in different threads. The new design does not force each broker to do all the work in the CPG thread so spreading load across cluster members should give some scale-up. ** Misc outstanding issues & notes Replicating wiring - Need async completion of wiring commands? - qpid.sequence_counter: need extra work to support in new design, do we care? Cluster+persistence: - finish async completion: dequeue completion for store & cluster - cluster restart from store: clean stores *not* identical, pick 1, all others update. - need to generate cluster ids for messages recovered from store. Live updates: we don't need to stall brokers during an update! - update on queue-by-queue basis. - updatee locks queues during update, no dequeue. - update in reverse: don't update messages dequeued during update. - updatee adds update messages at front (as normal), replicated messages at back. - updater starts from back, sends "update done" when it hits front of queue. Flow control: need to throttle multicasting 1. bound the number of outstanding multicasts. 2. ensure the entire cluster keeps up, no unbounded "lag" The existing design uses read-credit to solve 1., and does not solve 2. New design should stop reading on all connections while flow control condition exists? Can federation also be unified, at least in configuration? Consider queues (and exchanges?) as having "reliability" attributes: - persistent: is the message stored on disk. - backed-up (to another broker): active/passive async replication. - replicated (to a cluster): active/active multicast replication to cluster. - federated: federation link to a queue/exchange on another broker. "Reliability" seems right for the first 3 but not for federation, is there a better term? Clustering and scalability: new design may give us the flexibility to address scalability as part of cluster design. Think about relationship to federation and "fragmented queues" idea. * Design debates/descisions ** Active/active vs. active passive An active-active cluster can be used in an active-passive mode. In this mode we would like the cluster to be as efficient as a strictly active-passive implementation. An active/passive implementation allows some simplifications over active/active: - drop Queue ownership and locking - don't need to replicate message acquisition. - can do immediate local enqueue and still guarantee order. Active/passive introduces a few extra requirements: - Exactly one broker hast to take over if primary fails. - Passive members must refuse client connections. - On failover, clients must re-try all known addresses till they find the active member. Active/active benefits: - A broker failure only affects the subset of clients connected to that broker. - Clients can switch to any other broker on failover - Backup brokers are immediately available on failover. - Some load sharing: reading from client + multicast only done on direct node. Active/active drawbacks: - Co-ordinating message acquisition may impact performance (not tested) - Code may be more complex that active/passive. Active/passive benefits: - Don't need message allocation strategy, can feed consumers at top speed. - Code may be simpler than active/active. Active/passive drawbacks: - All clients on one node so a failure affects every client in the system. - After a failure there is a "reconnect storm" as every client reconnects to the new active node. - After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary. - Clients must find the single active node, may involve multiple connect attempts. ** Total ordering. Initial thinking: allow message ordering to differ between brokers. New thinking: use CPG total ordering, get identical ordering on all brokers. - Allowing variation in order introduces too much chance of unexpected behavior. - Usign total order allows other optimizations, see Message Identifiers below. ** Message identifiers. Initial thinking: message ID = CPG node id + 64 bit sequence number. This involves a lot of mapping between cluster IDs and broker messsages. New thinking: message ID = queue name + queue position. - Removes most of the mapping and memory management for cluster code. - Requires total ordering of messages (see above) ** Message rejection Initial thinking: add special reject/rejected points to cluster interface so rejected messages could be re-queued without multicast. New thinking: treat re-queueing after reject as entirely new message. - Simplifies cluster interface & implementation - Not on the critical path.