There are two classes of metadata that need to be managed in Hedwig: one is the list of available hubs, which is used to track server availability (ZooKeeper is designed naturally for this); while the other is for data structures to track topic states and subscription states. This second class can be handled by any key/value store which provides ah CAS (Compare And Set) operation. The metadata in this class are:
Topic Ownership
: tracks which hub server is assigned to serve requests for a specific topic.Topic Persistence Info
: records what bookkeeper ledgers are used to store messages for a specific topic and their message id ranges.Subscription Data
: records the preferences and subscription state for a specific subscription (topic, subscriber).Each kind of metadata is handled by a specific metadata manager. They are TopicOwnershipManager, TopicPersistenceManager and SubscriptionDataManager.
There are two ways to management topic ownership. One is leveraging ZooKeeper's ephemeral znodes to record the topic's owner info as a child ephemeral znode under its topic znode. When a hub server, owning a specific topic, crashes, the ephemeral znode which signifies topic ownership will be deleted due to the loss of the zookeeper session. Other hubs can then be assigned the ownership of the topic. The other one is to leverage the CAS operation provided by key/value stores to do leader election. CAS doesn't require the underlying key/value store to provide functionality similar to ZooKeeper's ephemeral nodes. With CAS it is possible to guarantee that only one hub server gains the ownership for a specific topic, which is more scalable and generic solution.
The implementation of a TopicOwnershipManager is required to implement following methods:
public void readOwnerInfo(ByteString topic, Callback<Versioned<HubInfo>> callback, Object ctx);
public void writeOwnerInfo(ByteString topic, HubInfo owner, Version version,
Callback<Version> callback, Object ctx);
public void deleteOwnerInfo(ByteString topic, Version version,
Callback<Void> callback, Object ctx);
readOwnerInfo
: Read the owner info from the underlying key/value store. The implementation should take the responsibility of deserializing the metadata into a HubInfo object identifying a hub server. Also, its current version needs to be returned for future updates. If there is no owner info found for a topic, null value is returned.writeOwnerInfo
: Write the owner info into the underlying key/value store with the given version. If the current version in underlying key/value store doesn't equal to the provided version, the write should be rejected with BadVersionException. The new version should be returned for a successful write. NoTopicOwnerInfoException is returned if no owner info found for a topic.deleteOwnerInfo
: Delete the owner info from key/value store with the given version. The owner info should be removed if the current version in key/value store is equal to the provided version. Otherwise, the deletion should be rejected with BadVersionException. NoTopicOwnerInfoException is returned if no owner info is found for the topic.Similar as TopicOwnershipManager, an implementation of TopicPersistenceManager is required to implement READ/WRITE/DELETE interfaces as below:
public void readTopicPersistenceInfo(ByteString topic,
Callback<Versioned<LedgerRanges>> callback, Object ctx);
public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version,
Callback<Version> callback, Object ctx);
public void deleteTopicPersistenceInfo(ByteString topic, Version version,
Callback<Void> callback, Object ctx);
readTopicPersistenceInfo
: Read the persistence info from the underlying key/value store. The implementation should take the responsibility of deserializing the metadata into a LedgerRanges object includes the ledgers used to store messages. Also, its current version needs to be returned for future updates. If there is no persistence info found for a topic, a null value is returned.writeTopicPersistenceInfo
: Write the persistence info into the underlying key/value store with the given version. If the current version in the underlying key/value store doesn't equal the provided version, the write should be rejected with BadVersionException. The new version should be returned on a successful write. NoTopicPersistenceInfoException is returned if no persistence info is found for a topic.deleteTopicPersistenceInfo
: Delete the persistence info from the key/value store with the given version. The owner info should be removed if the current version in the key/value store equals the provided version. Otherwise, the deletion should be rejected with BadVersionException. NoTopicPersistenceInfoException is returned if no persistence info is found for a topic.SubscriptionDataManager has similar READ/CREATE/WRITE/DELETE interfaces as other managers. Besides that, the implementation needs to implement READ SUBSCRIPTIONS interface, which is to fetch all the subscriptions for a given topic.
public void createSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
Callback<Version> callback, Object ctx);
public boolean isPartialUpdateSupported();
public void updateSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData dataToUpdate,
Version version, Callback<Version> callback, Object ctx);
public void replaceSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData dataToReplace,
Version version, Callback<Version> callback, Object ctx);
public void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version,
Callback<Void> callback, Object ctx);
public void readSubscriptionData(ByteString topic, ByteString subscriberId,
Callback<Versioned<SubscriptionData>> callback, Object ctx);
public void readSubscriptions(ByteString topic, Callback<Map<ByteString, Versioned<SubscriptionData>>> cb,
Object ctx);
The metadata for a subscription includes two parts, one is preferences and the other one is subscription state. SubscriptionPreferences tracks all the preferences for a subscriber (etc. Application could store its customized preferences for message filtering), while SubscriptionState is used internally to track the message consumption state for a given subscriber. These two kinds of metadata are quite different: SubscriptionPreferences is not updated
frequently while SubscriptionState is be updated frequently when messages are consumed. If the underlying key/value store supports independent field update for a given key (subscription), SubscriptionPreferences and SubscriptionState could be stored as two different fields for a given subscription. In this case isPartialUpdateSupported should return true. Otherwise, isPartialUpdateSupported should return false and the implementation should serialize/deserialize SubscriptionData as an opaque blob.
createSubscriptionData
: Create a subscription entry for a given topic. The initial version would be returned for a success creation. SubscriptionStateExistsException is returned if the subscription entry already exists.updateSubscriptionData/replaceSubscriptionData
: Update/replace the subscription data in the underlying key/value store with the given version. If the current version in underlying key/value store doesn't equal to the provided version, the update should be rejected with BadVersionException. The new version should be returned for a successful write. NoSubscriptionStateException is returned if no subscription entry is found for a subscription (topic, subscriber).readSubscriptionData
: Read the subscription data from the underlying key/value store. The implementation should take the responsibility of deserializing the metadata into a SubscriptionData object including its preferences and subscription state. Also, its current version needs to be returned for future updates. If there is no subscription data found for a subscription, a null value is returned.readSubscriptions
: Read all the subscription data from key/value store for a given topic. The implementation should take the responsibility of managing all subscription for a topic for efficient access. An empty map is returned if there are no subscriptions found for a given topic.deleteSubscriptionData
: Delete the subscription data from the key/value store with given version for a specific subscription (topic, subscriber). The subscription info should be removed if current version in key/value store equals the provided version. Otherwise, the deletion should be rejected with BadVersionException. NoSubscriptionStateException is returned if no subscription data is found for a subscription (topic, subscriber).From the interface, several requirements needs to meet before picking up a key/value store for Hedwig:
CAS
: The ability to do strict updates according to specific condition, i.e. a specific version (ZooKeeper) and same content (HBase).Optimized for Writes
: The metadata access pattern for Hedwig is read first and continuous updates.Optimized for retrieving all subscriptions for a topic
: Either hierarchical structures to maintain such relationships (ZooKeeper), or ordered key/value storage to cluster the subscription for a topic together, would provide efficient subscription data management.ZooKeeper is the default implementation for Hedwig metadata management, which holds data in memory and provides filesystem-like namespace, meeting the above requirements. ZooKeeper is suitable for most Hedwig usecases. However, if your application needs to manage millions of topics/subscriptions, a more scalable solution would be HBase, which also meet the above requirements.