/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ option java_package = "org.apache.hedwig.protocol"; option optimize_for = SPEED; package Hedwig; enum ProtocolVersion{ VERSION_ONE = 1; } // common structure to store header or properties message Map { message Entry { optional string key = 1; optional bytes value = 2; } repeated Entry entries = 1; } // message header message MessageHeader { // user customized fields used for message filter optional Map properties = 1; // following are system properties in message header optional string messageType = 2; } /* * this is the structure that will be serialized */ message Message { required bytes body = 1; optional bytes srcRegion = 2; optional MessageSeqId msgId = 3; // message header optional MessageHeader header = 4; } message RegionSpecificSeqId { required bytes region = 1; required uint64 seqId = 2; } message MessageSeqId{ optional uint64 localComponent = 1; repeated RegionSpecificSeqId remoteComponents = 2; } enum OperationType{ PUBLISH = 0; SUBSCRIBE = 1; CONSUME = 2; UNSUBSCRIBE = 3; //the following two are only used for the hedwig proxy START_DELIVERY = 4; STOP_DELIVERY = 5; // end for requests only used for hedwig proxy CLOSESUBSCRIPTION = 6; } /* A PubSubRequest is just a union of the various request types, with * an enum telling us which type it is. The same can also be done through * extensions. We need one request type that we will deserialize into on * the server side. */ message PubSubRequest{ required ProtocolVersion protocolVersion = 1; required OperationType type = 2; repeated bytes triedServers = 3; required uint64 txnId = 4; optional bool shouldClaim = 5; required bytes topic = 6; //any authentication stuff and other general stuff here /* one entry for each type of request */ optional PublishRequest publishRequest = 52; optional SubscribeRequest subscribeRequest = 53; optional ConsumeRequest consumeRequest = 54; optional UnsubscribeRequest unsubscribeRequest = 55; optional StopDeliveryRequest stopDeliveryRequest = 56; optional StartDeliveryRequest startDeliveryRequest = 57; optional CloseSubscriptionRequest closeSubscriptionRequest = 58; } message PublishRequest{ required Message msg = 2; } // record all preferences for a subscription, // would be serialized to be stored in meta store message SubscriptionPreferences { // user customized subscription options optional Map options = 1; /// /// system defined options /// // message bound optional uint32 messageBound = 2; // server-side message filter optional string messageFilter = 3; // message window size, this is the maximum number of messages // which will be delivered without being consumed optional uint32 messageWindowSize = 4; } message SubscribeRequest{ required bytes subscriberId = 2; enum CreateOrAttach{ CREATE = 0; ATTACH = 1; CREATE_OR_ATTACH = 2; }; optional CreateOrAttach createOrAttach = 3 [default = CREATE_OR_ATTACH]; // wait for cross-regional subscriptions to be established before returning optional bool synchronous = 4 [default = false]; // @Deprecated. set message bound in SubscriptionPreferences optional uint32 messageBound = 5; // subscription options optional SubscriptionPreferences preferences = 6; // force attach subscription which would kill existed channel // this option doesn't need to be persisted optional bool forceAttach = 7 [default = false]; } // used in client only // options are stored in SubscriptionPreferences structure message SubscriptionOptions { // force attach subscription which would kill existed channel // this option doesn't need to be persisted optional bool forceAttach = 1 [default = false]; optional SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH]; optional uint32 messageBound = 3 [default = 0]; // user customized subscription options optional Map options = 4; // server-side message filter optional string messageFilter = 5; // message window size, this is the maximum number of messages // which will be delivered without being consumed optional uint32 messageWindowSize = 6; // enable resubscribe optional bool enableResubscribe = 7 [default = true]; } message ConsumeRequest{ required bytes subscriberId = 2; required MessageSeqId msgId = 3; //the msgId is cumulative: all messages up to this id are marked as consumed } message UnsubscribeRequest{ required bytes subscriberId = 2; } message CloseSubscriptionRequest { required bytes subscriberId = 2; } message StopDeliveryRequest{ required bytes subscriberId = 2; } message StartDeliveryRequest{ required bytes subscriberId = 2; } // Identify an event happened for a subscription enum SubscriptionEvent { // topic has changed ownership (hub server down or topic released) TOPIC_MOVED = 1; // subscription is force closed by other subscribers SUBSCRIPTION_FORCED_CLOSED = 2; } // a response carries an event for a subscription sent to client message SubscriptionEventResponse { optional SubscriptionEvent event = 1; } message PubSubResponse{ required ProtocolVersion protocolVersion = 1; required StatusCode statusCode = 2; required uint64 txnId = 3; optional string statusMsg = 4; //in case of a status code of NOT_RESPONSIBLE_FOR_TOPIC, the status //message will contain the name of the host actually responsible //for the topic //the following fields are sent in delivered messages optional Message message = 5; optional bytes topic = 6; optional bytes subscriberId = 7; // the following fields are sent by other requests optional ResponseBody responseBody = 8; } message PublishResponse { // If the request was a publish request, this was the message Id of the published message. required MessageSeqId publishedMsgId = 1; } message SubscribeResponse { optional SubscriptionPreferences preferences = 2; } message ResponseBody { optional PublishResponse publishResponse = 1; optional SubscribeResponse subscribeResponse = 2; optional SubscriptionEventResponse subscriptionEvent = 3; } enum StatusCode{ SUCCESS = 0; //client-side errors (4xx) MALFORMED_REQUEST = 401; NO_SUCH_TOPIC = 402; CLIENT_ALREADY_SUBSCRIBED = 403; CLIENT_NOT_SUBSCRIBED = 404; COULD_NOT_CONNECT = 405; TOPIC_BUSY = 406; RESUBSCRIBE_EXCEPTION = 407; //server-side errors (5xx) NOT_RESPONSIBLE_FOR_TOPIC = 501; SERVICE_DOWN = 502; UNCERTAIN_STATE = 503; INVALID_MESSAGE_FILTER = 504; //server-side meta manager errors (52x) BAD_VERSION = 520; NO_TOPIC_PERSISTENCE_INFO = 521; TOPIC_PERSISTENCE_INFO_EXISTS = 522; NO_SUBSCRIPTION_STATE = 523; SUBSCRIPTION_STATE_EXISTS = 524; NO_TOPIC_OWNER_INFO = 525; TOPIC_OWNER_INFO_EXISTS = 526; //For all unexpected error conditions UNEXPECTED_CONDITION = 600; COMPOSITE = 700; } //What follows is not the server client protocol, but server-internal structures that are serialized in ZK //They should eventually be moved into the server message SubscriptionState { required MessageSeqId msgId = 1; // @Deprecated. // It is a bad idea to put fields that don't change frequently // together with fields that change frequently // so move it to subscription preferences structure optional uint32 messageBound = 2; } message SubscriptionData { optional SubscriptionState state = 1; optional SubscriptionPreferences preferences = 2; } message LedgerRange{ required uint64 ledgerId = 1; optional MessageSeqId endSeqIdIncluded = 2; optional uint64 startSeqIdIncluded = 3; } message LedgerRanges{ repeated LedgerRange ranges = 1; } message ManagerMeta { required string managerImpl = 2; required uint32 managerVersion = 3; } message HubInfoData { required string hostname = 2; required uint64 czxid = 3; } message HubLoadData { required uint64 numTopics = 2; }