-*-org-*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. * Status of impementation Meaning of priorities: [#A] Essential for basic functioning. [#B] Required for first release. [#C] Can be addressed in a later release. The existig prototype is bare bones to do performance benchmarks: - Implements publish and consumer locking protocol. - Defered delivery and asynchronous completion of message. - Optimize the case all consumers are on the same node. - No new member updates, no failover updates, no transactions, no persistence etc. Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/ ** Similarities to existing cluster. /Active-active/: the new cluster can be a drop-in replacement for the old, existing tests & customer deployment configurations are still valid. /Virtual synchrony/: Uses corosync to co-ordinate activity of members. /XML controls/: Uses XML to define the primitives multicast to the cluster. ** Differences with existing cluster. /Report rather than predict consumption/: brokers explicitly tell each other which messages have been acquired or dequeued. This removes the major cause of bugs in the existing cluster. /Queue consumer locking/: to avoid duplicates only one broker can acquire or dequeue messages at a time - while has the consume-lock on the queue. If multiple brokers are consuming from the same queue the lock is passed around to time-share access to the queue. /Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting the concurrency of the host) to allow concurrent processing on different queues. Queues are hashed onto the groups. * Completed tasks ** DONE [#A] Minimal POC: publish/acquire/dequeue protocol. CLOSED: [2011-10-05 Wed 16:03] Defines broker::Cluster interface and call points. Initial interface commite Main classes Core: central object holding cluster classes together (replaces cluster::Cluster) BrokerContext: implements broker::Cluster interface. QueueContext: Attached to a broker::Queue, holds cluster status. MessageHolder:holds local messages while they are being enqueued. Implements multiple CPG groups for better concurrency. ** DONE [#A] Large message replication. CLOSED: [2011-10-05 Wed 17:22] Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame) * Design Questions ** [[Queue sequence numbers vs. independant message IDs]] Current prototype uses queue+sequence number to identify message. This is tricky for updating new members as the sequence numbers are only known on delivery. Independent message IDs that can be generated and sent as part of the message simplify this and potentially allow performance benefits by relaxing total ordering. However they require additional map lookups that hurt performance. - [X] Prototype independent message IDs, check performance. Throughput worse by 30% in contented case, 10% in uncontended. * Tasks to match existing cluster ** TODO [#A] Review old cluster code for more tasks. 1 ** TODO [#A] Put cluster enqueue after all policy & other checks. gsim: we do policy check after multicasting enqueue so could have inconsistent outcome. aconway: Multicast should be after enqueue and any other code that may decide to send/not send the message. gsime: while later is better, is moving it that late the right thing? That will mean for example that any dequeues triggered by the enqueue (e.g. ring queue or lvq) will happen before the enqueue is broadcast. ** TODO [#A] Defer and async completion of wiring commands. 5 Testing requirement: Many tests assume wiring changes are visible across the cluster once the wiring commad completes. Name clashes: avoid race if same name queue/exchange declared on 2 brokers simultaneously. Ken async accept, never merged: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/ Clashes with non-replicated: see [[Allow non-replicated]] below. ** TODO [#A] defer & async completion for explicit accept. Explicit accept currently ignores the consume lock. Defer and complete it when the lock is acquired. ** TODO [#A] Update to new members joining. 10. Need to resolve [[Queue sequence numbers vs. independant message IDs]] first. - implicit sequence numbers are more tricky to replicate to new member. Update individual objects (queues and exchanges) independently. - create queues first, then update all queues and exchanges in parallel. - multiple updater threads, per queue/exchange. - updater sends messages to special exchange(s) (not using extended AMQP controls) Queue updater: - marks the queue position at the sync point - sends messages starting from the sync point working towards the head of the queue. - send "done" message. Note: updater remains active throughout, consuming clients actually reduce the size of the update. Queue updatee: - enqueues received from CPG: add to back of queue as normal. - dequeues received from CPG: apply if found, else save to check at end of update. - messages from updater: add to the *front* of the queue. - update complete: apply any saved dequeues. Exchange updater: - updater: send snapshot of exchange as it was at the sync point. Exchange updatee: - queue exchange operations after the sync point. - when snapshot is received: apply saved operations. Updater remains active throughout. Updatee stalls clients until the update completes. Updating queue/exchange/binding objects is via the same encode/decode that is used by the store. Updatee to use recovery interfaces to recover? ** TODO [#A] Failover updates to client. 2 Implement the amq.failover exchange to notify clients of membership. ** TODO [#A] Passing all existing cluster tests. 5 The new cluster should be a drop-in replacement for the old, so it should be able to pass all the existing tests. ** TODO [#B] Initial status protocol. 3 Handshake to give status of each broker member to new members joining. Status includes - cluster protocol version. - persistent store state (clean, dirty) - make it extensible, so additional state can be added in new protocols Clean store if last man standing or clean shutdown. Need to add multicast controls for shutdown. ** TODO [#B] Persistent cluster startup. 4 Based on existing code: - Exchange dirty/clean exchanged in initial status. - Only one broker recovers from store, others update. ** TODO [#B] Replace boost::hash with our own hash function. 1 The hash function is effectively part of the interface so we need to be sure it doesn't change underneath us. ** TODO [#B] Management model. 3 Alerts for inconsistent message loss. ** TODO [#B] Management methods that modify queues. 5 Replicate management methods that modify queues - e.g. move, purge. Target broker may not have all messages on other brokers for purge/destroy. - Queue::purge() - wait for lock, purge local, mcast dequeues. - Queue::move() - wait for lock, move msgs (mcasts enqueues), mcast dequeues. - Queue::destroy() - messages to alternate exchange on all brokers. - Queue::get() - ??? Need to add callpoints & mcast messages to replicate these? ** TODO [#B] TX transaction support. 5 Extend broker::Cluster interface to capture transaction context and completion. Running brokers exchange TX information. New broker update includes TX information. // FIXME aconway 2010-10-18: As things stand the cluster is not // compatible with transactions // - enqueues occur after routing is complete // - no call to Cluster::enqueue, should be in Queue::process? // - no transaction context associated with messages in the Cluster interface. // - no call to Cluster::accept in Queue::dequeueCommitted Injecting holes into a queue: - Multicast a 'non-message' that just consumes one queue position. - Used to reserve a message ID (position) for a non-commited message. - Also could allow non-replicated messages on a replicated queue if required. ** TODO [#B] DTX transaction support. 5 Extend broker::Cluster interface to capture transaction context and completion. Running brokers exchange DTX information. New broker update includes DTX information. ** TODO [#B] Replicate message groups? Message groups may require additional state to be replicated. ** TODO [#B] Replicate state for Fairshare? gsim: fairshare would need explicit code to keep it in sync across nodes; that may not be required however. ** TODO [#B] Timed auto-delete queues? gsim: may need specific attention? ** TODO [#B] Async completion of accept. 4 When this is fixed in the standalone broker, it should be fixed for cluster. ** TODO [#B] Network partitions and quorum. 2 Re-use existing implementation. ** TODO [#B] Review error handling, put in a consitent model. 4. - [ ] Review all asserts, for possible throw. - [ ] Decide on fatal vs. non-fatal errors. ** TODO [#B] Implement inconsistent error handling policy. 5 What to do if a message is enqueued sucessfully on some broker(s), but fails on other(s) - e.g. due to store limits? - fail on local broker = possible message loss. - fail on non-local broker = possible duplication. We have more flexibility now, we don't *have* to crash - but we've lost some of our redundancy guarantee, how to inform user? Options to respond to inconsistent error: - stop broker - reset broker (exec a new qpidd) - reset queue - log critical - send management event Most important is to inform of the risk of message loss. Current favourite: reset queue+log critical+ management event. Configurable choices? ** TODO [#C] Allow non-replicated exchanges, queues. 5 3 levels set in declare arguments: - qpid.replicate=no - nothing is replicated. - qpid.replicate=wiring - queues/exchanges are replicated but not messages. - qpid.replicate=yes - queues exchanges and messages are replicated. Wiring use case: it's OK to lose some messages (up to the max depth of the queue) but the queue/exchange structure must be highly available so clients can resume communication after fail over. Configurable default? Default same as old cluster? Need to - save replicated status to stored (in arguments). - support in management tools. Avoid name clashes between replicated/non-replicated: multicast local-only names as well, all brokers keep a map and refuse to create clashes. ** TODO [#C] Handling immediate messages in a cluster. 2 Include remote consumers in descision to deliver an immediate message. * Improvements over existing cluster ** TODO [#C] Remove old cluster hacks and workarounds. The old cluster has workarounds in the broker code that can be removed. - [ ] drop code to replicate management model. - [ ] drop timer workarounds for TTL, management, heartbeats. - [ ] drop "cluster-safe assertions" in broker code. - [ ] drop connections, sessions, management from cluster update. - [ ] drop security workarounds: cluster code now operates after message decoding. - [ ] drop connection tracking in cluster code. - [ ] simpler inconsistent-error handling code, no need to stall. ** TODO [#C] Support for live upgrades. Allow brokers in a running cluster to be replaced one-by-one with a new version. (see new-cluster-design for design notes.) The old cluster protocol was unstable because any changes in broker state caused changes to the cluster protocol.The new design should be much more stable. Points to implement in anticipation of live upgrade: - Prefix each CPG message with a version number and length. Version number determines how to decode the message. - Brokers ignore messages that have a higher version number than they understand. - Protocol version XML element in cluster.xml, on each control. - Initial status protocol to include protocol version number. New member udpates: use the store encode/decode for updates, use the same backward compatibility strategy as the store. This allows for adding new elements to the end of structures but not changing or removing new elements. NOTE: Any change to the association of CPG group names and queues will break compatibility. How to work around this? ** TODO [#C] Refactoring of common concerns. There are a bunch of things that act as "Queue observers" with intercept points in similar places. - QueuePolicy - QueuedEvents (async replication) - MessageStore - Cluster Look for ways to capitalize on the similarity & simplify the code. In particular QueuedEvents (async replication) strongly resembles cluster replication, but over TCP rather than multicast. ** TODO [#C] Support for AMQP 1.0. * Testing ** TODO [#A] Pass all existing cluster tests. Requires [[Defer and async completion of wiring commands.]] ** TODO [#A] New cluster tests. Stress tests & performance benchmarks focused on changes in new cluster: - concurrency by queues rather than connections. - different handling shared queues when consuemrs are on different brokers.