//////////////////////////////////////////////////////////////////////////////// // // Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // //////////////////////////////////////////////////////////////////////////////// package mx.messaging { import flash.events.TimerEvent; import flash.utils.Timer; import mx.collections.ArrayCollection; import mx.core.mx_internal; import mx.events.CollectionEvent; import mx.events.PropertyChangeEvent; import mx.messaging.errors.MessagingError; import mx.messaging.events.MessageEvent; import mx.messaging.messages.CommandMessage; import mx.messaging.messages.IMessage; use namespace mx_internal; /** * Dispatched when a message is received by the Consumer. * * @eventType mx.messaging.events.MessageEvent.MESSAGE * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ [Event(name="message", type="mx.messaging.events.MessageEvent")] /** * Like a Consumer, a MultiTopicConsumer subscribes to a destination with a single * clientId and delivers messages to a single event handler. Unlike a Consumer * it lets you register subscriptions for a list of subtopics and selector expressions * at the same time from a single message handler. Where Consumer has subtopic and selector properties, * this component has an addSubscription(subtopic, selector) method you use to * add a new subscription to the existing set of subscriptions. Alternatively, you can * populate the subscriptions property with a list of SubscriptionInfo instances that * define the subscriptions for this destination. *

* Like the regular Consumer, the MultiTopicConsumer sends subscribe and unsubscribe * messages which generate a MessageAckEvent or MessageFaultEvent depending upon whether the * operation was successful or not. * Once subscribed, a MultiTopicConsumer dispatches a MessageEvent for each message it receives.

* @mxml *

* The <mx:MultiTopicConsumer> tag has these properties: *

*
     *   <mx:Consumer
     *    Properties
     *    subscriptions=""an empty ArrayCollection of SubscriptionInfo objects"
     *    resubscribeAttempts="5"
     *    resubscribeInterval="5000"
     *    timestamp="No default."
     *  />
     *  
* * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public class MultiTopicConsumer extends AbstractConsumer { //-------------------------------------------------------------------------- // // Constructor // //-------------------------------------------------------------------------- /** * Constructor. * * * @example * * function initConsumer():void * { * var consumer:Consumer = new MultiTopicConsumer(); * consumer.destination = "NASDAQ"; * consumer.addEventListener(MessageEvent.MESSAGE, messageHandler); * consumer.addSubscription("myStock1", "operation IN ('BID', 'Ask')"); * consumer.addSubscription("myStock2", "operation IN ('BID', 'Ask')"); * consumer.subscribe(); * } * * function messageHandler(event:MessageEvent):void * { * var msg:IMessage = event.message; * var info:Object = msg.body; * trace("-App recieved message: " + msg.toString()); * } * * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function MultiTopicConsumer() { super(); _subscriptions.addEventListener(CollectionEvent.COLLECTION_CHANGE, subscriptionsChangeHandler); } //-------------------------------------------------------------------------- // // Properties // //-------------------------------------------------------------------------- /** * @private */ private var _subscriptions:ArrayCollection = new ArrayCollection(); /** * This is a map where the keys are string names of the form subtopic + separator + selector * registered with the boolean true. When we generate a subscription message and * send it to the server, we add/remove those subscriptions from this list. Thus this * list tracks the subscriptions we have sent to the server. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ private var _currentSubscriptions:Object = {}; /** * Used when the subscriptions property changes so we batch all changes made in one * frame into a single multi-subscription message * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ private var _subchangeTimer:Timer = null; [Bindable(event="propertyChange")] [Inspectable(category="General", verbose="1")] /** * Stores an Array of SubscriptionInfo objects. Each subscription * contains a subtopic and a selector each of which can be null. * A subscription with a non-null subtopic restricts the subscription * to messages delivered with only that subtopic. * If a subtopic is null, it uses the selector with no subtopic. * If the selector and the subtopic is null, the subscription receives * any messages targeted at the destination with no subtopic. * The subtopic can contain a wildcard specification. * *

Before a call to the subscribe() method, this property * can be set with no side effects. * After the MultiTopicConsumer has subscribed to its destination, changing this * value has the side effect of updating the MultiTopicConsumer's subscription to * include any new subscriptions and remove any subscriptions you deleted from * the ArrayCollection.

* *

The remote destination must understand the value of the selector * expression.

* * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function get subscriptions():ArrayCollection { return _subscriptions; } /** * Provide a new subscriptions array collection. This should be an ArrayCollection * containing SubscriptionInfo instances which define message topics and selectors * you want received by this consumer. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function set subscriptions(value:ArrayCollection):void { if (_subscriptions !== value) { var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "subscriptions", _subscriptions, value); if (subscribed) { unsubscribe(); _shouldBeSubscribed = true; } if (_subscriptions != null) _subscriptions.removeEventListener(CollectionEvent.COLLECTION_CHANGE, subscriptionsChangeHandler); _subscriptions = value; if (_subscriptions != null) _subscriptions.addEventListener(CollectionEvent.COLLECTION_CHANGE, subscriptionsChangeHandler); // Update an existing subscription to use the new selector. if (_shouldBeSubscribed) subscribe(clientId); dispatchEvent(event); } } /** * This is a convenience method for adding a new subscription. It just creates * a new SubscriptionInfo object and adds it to the subscriptions property. * To call this method, you provide the * subtopic and selector string for the new subscription. If the subtopic is null, * the subscription applies to messages which do not have a subtopic set in the * producer. If the selector string is null, all messages sent which match the * subtopic string are received by this consumer. * * @param subtopic The subtopic for the new subscription. * * @param selector The selector for the new subscription. * * @param maxFrequency The maximum number of messages per second the Consumer wants * to receive for the subscription. Note that this value overwrites the Consumer * wide maxFrequency. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function addSubscription(subtopic:String = null, selector:String = null, maxFrequency:uint = 0):void { subscriptions.addItem(new SubscriptionInfo(subtopic, selector, maxFrequency)); } /** * This method removes the subscription specified by the subtopic * and selector. * * @param subtopic The subtopic for the subscription. * * @param selector The selector for the subscription. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function removeSubscription(subtopic:String = null, selector:String = null):void { var n:int = subscriptions.length; for (var i:int = 0; i < n; ++i) { var si:SubscriptionInfo = SubscriptionInfo(subscriptions.getItemAt(i)); if (si.subtopic == subtopic && si.selector == selector) { subscriptions.removeItemAt(i); break; } } if (n == subscriptions.length) throw new MessagingError("Attempt to remove a subscription with subtopic: " + subtopic + " and selector: " + selector + " that this consumer does not have"); } //-------------------------------------------------------------------------- // // Protected Methods // //-------------------------------------------------------------------------- /** * Returns a subscribe message. * * @return The subscribe CommandMessage. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ override protected function buildSubscribeMessage():CommandMessage { var msg:CommandMessage = super.buildSubscribeMessage(); msg.operation = CommandMessage.MULTI_SUBSCRIBE_OPERATION; var subs:Object = getCurrentSubscriptions(); var toAdd:Array = []; var toRemove:Array = []; for (var s:String in subs) { if (_currentSubscriptions[s] == null) toAdd.push(s); } for (s in _currentSubscriptions) { if (subs[s] == null) toRemove.push(s); } if (toAdd.length > 0) msg.headers[CommandMessage.ADD_SUBSCRIPTIONS] = toAdd; if (toRemove.length > 0) msg.headers[CommandMessage.REMOVE_SUBSCRIPTIONS] = toRemove; _currentSubscriptions = subs; // Tell the ack handler to mark this guy as unsubscribed after the request if (_currentSubscriptions.length == 0) msg.headers.DSlastUnsub = true; return msg; } /** * Returns an unsubscribe mesage. * * @param preserveDurable When true, durable JMS subscriptions are * not destroyed, allowing consumers to later resubscribe and * receive missed messages. * * @return The unsubscribe CommandMessage. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ override protected function buildUnsubscribeMessage(preserveDurable:Boolean):CommandMessage { var msg:CommandMessage = super.buildUnsubscribeMessage(preserveDurable); msg.operation = CommandMessage.MULTI_SUBSCRIBE_OPERATION; var toRemove:Array = []; for (var s:String in _currentSubscriptions) { toRemove.push(s); } _currentSubscriptions = {}; if (toRemove.length > 0) msg.headers[CommandMessage.REMOVE_SUBSCRIPTIONS] = toRemove; msg.headers.DSlastUnsub = true; return msg; } /** * @private */ override protected function internalSend(message:IMessage, waitForClientId:Boolean = true):void { // If there is nothing to do with this message, do not send it - instead throw // an exception as this was an attempt to subscribe or unsubscribe with no // subscriptions. if (message.headers[CommandMessage.ADD_SUBSCRIPTIONS] != null || message.headers[CommandMessage.REMOVE_SUBSCRIPTIONS] != null) super.internalSend(message, waitForClientId); else { if (channelSet == null) initChannelSet(message); // If we are disconnected and are trying to subscribe, we still need // to send the message to force a reconnect if (channelSet != null && !channelSet.connected && message is CommandMessage && CommandMessage(message).operation == CommandMessage.MULTI_SUBSCRIBE_OPERATION) super.internalSend(message, waitForClientId); if (message.headers.DSlastUnsub != null) setSubscribed(false); // In this case, if we are not sending the message we can just say we are // subscribed but only if we are connected else if (channelSet != null && channelSet.connected) setSubscribed(true); } } /** * @private */ override protected function setSubscribed(value:Boolean):void { /* * Whenenver we get marked as being unsubscribed (i.e. the server is * disconnected), we clear out the subscriptions on the server. The * client will then resubscribe and send all subscriptions on the next * connect. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ if (!value) _currentSubscriptions = {}; super.setSubscribed(value); } //-------------------------------------------------------------------------- // // Private Methods // //-------------------------------------------------------------------------- private function getCurrentSubscriptions():Object { var subs:Object = {}; for (var i:int = 0; i < subscriptions.length; i++) { var si:SubscriptionInfo = SubscriptionInfo(subscriptions.getItemAt(i)); var temp:String = (si.subtopic == null ? "" : si.subtopic) + CommandMessage.SUBTOPIC_SEPARATOR + (si.selector == null ? "" : si.selector); // Add maxFrequency as another token. if (si.maxFrequency > 0) temp += CommandMessage.SUBTOPIC_SEPARATOR + si.maxFrequency; subs[temp] = true; } return subs; } private function subscriptionsChangeHandler(event:CollectionEvent):void { // process the changes on the next frame to be sure we are done with // all subscriptions. If we get a change event after we've subscribed // but before we've gotten the ack, _shouldBeSubscribed is true but // subscribed is only set when we have acked. if ((_shouldBeSubscribed || subscribed) && _subchangeTimer == null) { _subchangeTimer = new Timer(0, 1); _subchangeTimer.addEventListener(TimerEvent.TIMER, doResubscribe); _subchangeTimer.start(); } } private function doResubscribe(event:TimerEvent):void { _subchangeTimer.removeEventListener(TimerEvent.TIMER, doResubscribe); _subchangeTimer = null; // If the Consumer is not connected yet, the subscribe message will be queued. internalSend(buildSubscribeMessage()); } } }