//////////////////////////////////////////////////////////////////////////////// // // Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // //////////////////////////////////////////////////////////////////////////////// package mx.messaging { import flash.utils.Dictionary; import mx.core.mx_internal; import mx.logging.Log; import mx.messaging.events.MessageEvent; use namespace mx_internal; /** * @private * * Helper class that listens for MessageEvents dispatched by ChannelSets that Consumers are subscribed over. * This class is necessary because the server maintains queues of messages to push to this Flex client on a * per-endpoint basis but the client may create more than one Channel that connects to a single server endpoint. * In this scenario, messages can be pushed/polled to the client over a different channel instance than the one * that the target Consumer subscribed over. The server isn't aware of this difference because both channels are * pointed at the same endpoint. Here's a diagram to illustrate. * * Client: * Consumer 1 Consumer 2 Consumer 3 * | | / * ChannelSet 1 ChannelSet 2 * | | * Channel 1 Channel 2 <- The endpoint URIs for these two channels are identical * | | * \_______________________/ * Server: | * | * Endpoint (that the two channels point to) * | * FlexClientOutboundQueue (for this endpoint for this FlexClient) * \-- Outbound messages for the three Consumer subscriptions * * When the endpoint receives a poll request from Channel 1 it will return queued messages for all three subscriptions * but back on the client when Channel 1 dispatches message events for Consumer 2 and 3's subscriptions they won't see * them because they're directly connected to the separate Channel2/ChannelSet2. * This helper class keeps track of Consumer subscriptions and watches all ChannelSets for message events to * ensure they're dispatched to the proper Consumer even when the client has been manually (miss)configured as the * diagram illustrates. * * This class is a singleton that maintains a table of all subscribed Consumers and ref-counts the number of active * subscriptions per ChannelSet to determine whether it needs to be listening for message events from a given * ChannelSet or not; it dispatches message events from these ChannelSets to the proper Consumer instance * by invoking the Consumer's messageHandler() method directly. */ public class ConsumerMessageDispatcher { //-------------------------------------------------------------------------- // // Class variables // //-------------------------------------------------------------------------- /** * @private * The sole instance of this singleton class. */ private static var _instance:ConsumerMessageDispatcher; //-------------------------------------------------------------------------- // // Class methods // //-------------------------------------------------------------------------- /** * Returns the sole instance of this singleton class, * creating it if it does not already exist. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public static function getInstance():ConsumerMessageDispatcher { if (!_instance) _instance = new ConsumerMessageDispatcher(); return _instance; } //-------------------------------------------------------------------------- // // Constructor // //-------------------------------------------------------------------------- /** * Constructor. * Use getInstance() instead of "new" to create. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function ConsumerMessageDispatcher() { super(); } //-------------------------------------------------------------------------- // // Variables // //-------------------------------------------------------------------------- /** * Lookup table for subscribed Consumer instances; Object * This is used to dispatch pushed/polled messages to the proper Consumer instance. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ private const _consumers:Object = {}; /** * Table of ref-counts per ChannelSet that subscribed Consumer instances are using; Dictionary (non-weak keys). * The ref-count is the number of subscribed Consumers for the ChannelSet. * When we add a new ChannelSet we need to start listening on it for MessageEvents to redispatch to subscribed Consumers. * When the ref-count drops to zero we need to stop listening on it for MessageEvents and remove it from the table. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ private const _channelSetRefCounts:Dictionary = new Dictionary(/* strong keys */); /** * Table used to prevent duplicate delivery of messages to a Consumer when multiple ChannelSets are * connected to the same server endpoint over a single, underlying shared Channel. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ private const _consumerDuplicateMessageBarrier:Object = {}; //-------------------------------------------------------------------------- // // Public Methods // //-------------------------------------------------------------------------- /** * Determines whether any subscriptions are using the specified channel. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function isChannelUsedForSubscriptions(channel:Channel):Boolean { var memberOfChannelSets:Array = channel.channelSets; var cs:ChannelSet = null; var n:int = memberOfChannelSets.length; for (var i:int = 0; i < n; i++) { cs = memberOfChannelSets[i]; if ((_channelSetRefCounts[cs] != null) && (cs.currentChannel == channel)) return true; } return false; } /** * Registers a Consumer subscription. * This will cause the ConsumerMessageDispatcher to start listening for MessageEvents * from the underlying ChannelSet used to subscribe and redispatch messages to Consumers. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function registerSubscription(consumer:AbstractConsumer):void { _consumers[consumer.clientId] = consumer; if (_channelSetRefCounts[consumer.channelSet] == null) { // If this is the first time we've seen this ChannelSet start listening for message events // and initialize its ref-count. consumer.channelSet.addEventListener(MessageEvent.MESSAGE, messageHandler); _channelSetRefCounts[consumer.channelSet] = 1; } else { // We're already listening for message events; just increment the ref-count. _channelSetRefCounts[consumer.channelSet]++; } } /** * Unregisters a Consumer subscription. * The ConsumerMessageDispatcher will stop monitoring underlying channels for messages for * this Consumer. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function unregisterSubscription(consumer:AbstractConsumer):void { delete _consumers[consumer.clientId]; var refCount:int = _channelSetRefCounts[consumer.channelSet]; if (--refCount == 0) { // If this was the last Consumer using this ChannelSet stop listening for message events // and blow away the ref-count. consumer.channelSet.removeEventListener(MessageEvent.MESSAGE, messageHandler); delete _channelSetRefCounts[consumer.channelSet]; // And clean up the duplicate message delivery barrier if necessary. if (_consumerDuplicateMessageBarrier[consumer.id] != null) delete _consumerDuplicateMessageBarrier[consumer.id]; } else { // Save the decremented ref-count. _channelSetRefCounts[consumer.channelSet] = refCount; } } //-------------------------------------------------------------------------- // // Private Methods // //-------------------------------------------------------------------------- /** * Handles message events from ChannelSets that Consumers are subscribed over. * We just need to redirect the event to the proper Consumer instance. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ private function messageHandler(event:MessageEvent):void { var consumer:AbstractConsumer = _consumers[event.message.clientId]; if (consumer == null) { if (Log.isDebug()) Log.getLogger("mx.messaging.Consumer").debug("'{0}' received pushed message for consumer but no longer subscribed: {1}", event.message.clientId, event.message); return; } // Determine how many of these will actually redispatch the same event from the shared underlying channel. if (event.target.currentChannel.channelSets.length > 1) { var count:int = 0; for each (var cs:ChannelSet in event.target.currentChannel.channelSets) { if (_channelSetRefCounts[cs] != null) ++count; } if (count > 1) { // We need to dispatch this message to the target Consumer only once and filter out // the duplicate events. if (_consumerDuplicateMessageBarrier[consumer.id] == null) { // Record the number of times we will receive a message event for this message. _consumerDuplicateMessageBarrier[consumer.id] = [event.messageId, count]; // Dispatch once - only the first time we see this message for this Consumer. consumer.messageHandler(event); } // Cleanup. var duplicateDispatchGuard:Array = _consumerDuplicateMessageBarrier[consumer.id]; if (duplicateDispatchGuard[0] == event.messageId) { if (--duplicateDispatchGuard[1] == 0) delete _consumerDuplicateMessageBarrier[consumer.id]; } return; // Exit early. } } // Only one ChannelSet so we don't need to worry about this. consumer.messageHandler(event); } } }