//////////////////////////////////////////////////////////////////////////////// // // Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // //////////////////////////////////////////////////////////////////////////////// package mx.messaging.channels { import flash.events.Event; import flash.events.TimerEvent; import flash.utils.Timer; import mx.core.mx_internal; import mx.logging.Log; import mx.messaging.Channel; import mx.messaging.ChannelSet; import mx.messaging.ConsumerMessageDispatcher; import mx.messaging.MessageAgent; import mx.messaging.MessageResponder; import mx.messaging.events.ChannelFaultEvent; import mx.messaging.messages.AcknowledgeMessage; import mx.messaging.messages.CommandMessage; import mx.messaging.messages.IMessage; import mx.resources.IResourceManager; import mx.resources.ResourceManager; import mx.messaging.Consumer; use namespace mx_internal; [ResourceBundle("messaging")] /** * The PollingChannel class provides the polling behavior that all polling channels in the messaging * system require. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public class PollingChannel extends Channel { //-------------------------------------------------------------------------- // // Protected Static Constants // //-------------------------------------------------------------------------- /** * @private * Channel config parsing constants. */ protected static const POLLING_ENABLED:String = "polling-enabled"; protected static const POLLING_INTERVAL_MILLIS:String = "polling-interval-millis"; protected static const POLLING_INTERVAL_LEGACY:String = "polling-interval-seconds"; protected static const PIGGYBACKING_ENABLED:String = "piggybacking-enabled"; protected static const LOGIN_AFTER_DISCONNECT:String = "login-after-disconnect"; //-------------------------------------------------------------------------- // // Constructor // //-------------------------------------------------------------------------- /** * Creates a new PollingChannel instance with the specified id. Once a PollingChannel is * connected and begins polling, it will issue a poll request once every three seconds * by default. * *

Note: The PollingChannel type should not be constructed directly. Instead * create instances of protocol specific subclasses such as HTTPChannel or * AMFChannel that extend it.

* * @param id The id of this Channel. * * @param uri The uri for this Channel. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function PollingChannel(id:String = null, uri:String = null) { super(id, uri); _pollingEnabled = true; _shouldPoll = false; if (timerRequired()) { // Poll on a 3 second interval by default. // The timer is configured to only dispatch one event per run. // It is restarted after a poll response is received for the current outstanding poll request. _pollingInterval = DEFAULT_POLLING_INTERVAL; _timer = new Timer(_pollingInterval, 1); _timer.addEventListener(TimerEvent.TIMER, internalPoll); } } //-------------------------------------------------------------------------- // // Variables // //-------------------------------------------------------------------------- /** * @private * The base polling interval to use if the server is not triggering adaptive polling * interval waits via its poll responses. */ mx_internal var _pollingInterval:int; /** * @private * Indicates whether we should poll but stopped for some reason. */ mx_internal var _shouldPoll:Boolean; /** * @private * This reference count allows us to determine when polling is needed and * when it is not. */ private var _pollingRef:int = -1; /** * @private * Guard used to avoid issuing poll requests on top of each other. This is * needed when a poll request is issued manually by calling poll() method. */ mx_internal var pollOutstanding:Boolean; /** * @private * Used for polling the server at a given interval. * This may be null if channel implementation does not require the use of a * timer to poll. */ mx_internal var _timer:Timer; /** * @private */ private var resourceManager:IResourceManager = ResourceManager.getInstance(); //-------------------------------------------------------------------------- // // Properties // //-------------------------------------------------------------------------- //---------------------------------- // connected //---------------------------------- /** * @private * Reset polling state following a transient disconnect if possible. * * @param value The new connected state. */ override protected function setConnected(value:Boolean):void { if (connected != value) { if (value) // Potentially a transient reconnect; check for subscribed Consumers. { for each (var channelSet:ChannelSet in channelSets) { for each (var agent:MessageAgent in channelSet.messageAgents) { if (agent is Consumer && (agent as Consumer).subscribed) { enablePolling(); } } } } super.setConnected(value); } } //---------------------------------- // loginAfterDisconnect //---------------------------------- /** * @private */ protected var _loginAfterDisconnect:Boolean; mx_internal function get loginAfterDisconnect():Boolean { return _loginAfterDisconnect; } //---------------------------------- // piggybackingEnabled //---------------------------------- /** * @private */ private var _piggybackingEnabled:Boolean; /** * @private */ protected function get internalPiggybackingEnabled():Boolean { return _piggybackingEnabled; } /** * @private */ protected function set internalPiggybackingEnabled(value:Boolean):void { _piggybackingEnabled = value; } //---------------------------------- // pollingEnabled //---------------------------------- /** * @private */ private var _pollingEnabled:Boolean; /** * @private */ protected function get internalPollingEnabled():Boolean { return _pollingEnabled; } /** * @private */ protected function set internalPollingEnabled(value:Boolean):void { _pollingEnabled = value; // If the value is false, we want to stop polling only if the timer is // definitely running OR the timer isn't running and the polling interval is 0 // because if the polling interval is 0 and we're polling, the timer isn't on // anyway, so we need to include both cases. if (!value && (timerRunning || (!timerRunning && (_pollingInterval == 0)))) { stopPolling(); } else if (value && _shouldPoll && !timerRunning) { startPolling(); } } //---------------------------------- // pollingInterval //---------------------------------- /** * @private */ mx_internal function get internalPollingInterval():Number { return (_timer == null) ? 0 : _pollingInterval; } /** * @private */ mx_internal function set internalPollingInterval(value:Number):void { // We have to be careful here because the timer's delay cannot be set to // 0 so if we are setting the polling interval to 0, we need to stop the // timer AND hold onto the value in the _pollingInterval variable. if (value == 0) { _pollingInterval = value; if (_timer != null) { _timer.stop(); } if (_shouldPoll) { startPolling(); } } else if (value > 0) { if (_timer != null) { _timer.delay = _pollingInterval = value; if (!timerRunning && _shouldPoll) { startPolling(); } } } else { var message:String = resourceManager.getString( "messaging", "pollingIntervalNonPositive"); throw new ArgumentError(message); } } //---------------------------------- // realtime //---------------------------------- /** * @private * Returns true if the channel supports realtime behavior via server push or client poll. * Piggybacking does not qualify as real time because no data will arrive from the server * without a message being explicitly sent by the client. */ override mx_internal function get realtime():Boolean { return _pollingEnabled; } //---------------------------------- // timerRunning //---------------------------------- /** * @private */ mx_internal function get timerRunning():Boolean { return (_timer != null) && _timer.running; } //-------------------------------------------------------------------------- // // Overridden Public Methods // //-------------------------------------------------------------------------- /** * Sends the specified message to its target destination. * Subclasses must override the internalSend() method to * perform the actual send. * PollingChannel will wrap outbound messages in poll requests if a poll * is not currently outstanding. * * @param agent The MessageAgent that is sending the message. * * @param message The Message to send. * * @throws mx.messaging.errors.InvalidDestinationError If neither the MessageAgent nor the * message specify a destination. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ override public function send(agent:MessageAgent, message:IMessage):void { var piggyback:Boolean = false; if (!pollOutstanding && _piggybackingEnabled && !(message is CommandMessage)) { if (_shouldPoll) { piggyback = true; } else { var consumerDispatcher:ConsumerMessageDispatcher = ConsumerMessageDispatcher.getInstance(); if (consumerDispatcher.isChannelUsedForSubscriptions(this)) piggyback = true; } } if (piggyback) internalPoll(); super.send(agent, message); if (piggyback) { // Manually build and send a terminal poll message to return any pushed messages // that may result from the sent message above. Invoking internalPoll() again would // be a no-op because we now have the initial poll outstanding. var msg:CommandMessage = new CommandMessage(); msg.operation = CommandMessage.POLL_OPERATION; if (Log.isDebug()) _log.debug("'{0}' channel sending poll message\n{1}\n", id, msg.toString()); try { internalSend(new PollCommandMessageResponder(null, msg, this, _log)); } catch(e:Error) { // If there was a problem stop polling. stopPolling(); throw e; } } } //-------------------------------------------------------------------------- // // Overridden Protected Methods // //-------------------------------------------------------------------------- /** * @private * This method prevents polling from continuing when the Channel can not connect. * * @param event The ChannelFaultEvent. */ override protected function connectFailed(event:ChannelFaultEvent):void { stopPolling(); super.connectFailed(event); } /** * @private * If a consumer sends a subscribe message to the server, we need to * track that polling should occur. In addition, we don't however, want * to begin polling before we actually receive the acknowledgement that * we have successfully subscribed. This method is used to return a * special message handler that will notify us when we have a successful * subscribe and can safely begin polling. This case is the reverse for * unsubscribe, we need to track that we successfully unsubscribed and * there are no more consumers attached that need polling. * * In addition to handling this case, this method also returns a special * responder to handle the results or fault for a poll request. * * @param agent MessageAgent that requested the message be sent. * * @param msg Message to be sent. * * @return A PollSyncMessageResponder for subscribe/unsubscriber requests or a * PollCommandMessageResponder for poll requests; otherwise the default * message responder. */ final override protected function getMessageResponder(agent:MessageAgent, msg:IMessage):MessageResponder { if ((msg is CommandMessage) && ((msg as CommandMessage).operation == CommandMessage.POLL_OPERATION)) { return new PollCommandMessageResponder(agent, msg, this, _log); } return getDefaultMessageResponder(agent, msg); } /** * @private * Disconnects from the remote destination. */ override protected function internalDisconnect(rejected:Boolean = false):void { stopPolling(); super.internalDisconnect(rejected); } //-------------------------------------------------------------------------- // // Methods // //-------------------------------------------------------------------------- /** * Enables polling based on the number of times enablePolling() * and disablePolling() have been invoked. If the net result is to enable * polling the channel will poll the server on behalf of connected MessageAgents. *

Invoked automatically based upon subscribing or unsubscribing from a remote * destination over a PollingChannel.

* * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function enablePolling():void { _pollingRef++; if (_pollingRef == 0) startPolling(); } /** * Disables polling based on the number of times enablePolling() * and disablePolling() have been invoked. If the net result is to disable * polling the channel stops polling. *

Invoked automatically based upon subscribing or unsubscribing from a remote * destination over a PollingChannel.

* * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function disablePolling():void { _pollingRef--; if (_pollingRef < 0) stopPolling(); } /** * Initiates a poll operation if there are consumers subscribed to this channel, * and polling is enabled for this channel. * * Note that this method will not start a new poll if one is currently in progress. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function poll():void { internalPoll(); } //-------------------------------------------------------------------------- // // Internal Methods // //-------------------------------------------------------------------------- /** * @private * This method allows a PollCommandMessageResponder to indicate that the * channel has lost its connectivity. * * @param rejected Channel will be rejected and will not attempt to reconnect if * this flag is true */ mx_internal function pollFailed(rejected:Boolean = false):void { internalDisconnect(rejected); } /** * @private * This method is invoked automatically when disablePolling() * is called and it results in a net negative number of requests to poll. * * mx_internal to allow the poll responder to shut down polling if a general, * fatal error occurs. */ mx_internal function stopPolling():void { if (Log.isInfo()) _log.info("'{0}' channel polling stopped.", id); if (_timer != null) _timer.stop(); _pollingRef = -1; _shouldPoll = false; pollOutstanding = false; } //-------------------------------------------------------------------------- // // Protected Methods // //-------------------------------------------------------------------------- /** * @private * Processes polling related configuration settings. * * @param settings The Channel settings. */ protected function applyPollingSettings(settings:XML):void { if (settings.properties.length() == 0) return; var props:XML = settings.properties[0]; if (props[POLLING_ENABLED].length() != 0) internalPollingEnabled = props[POLLING_ENABLED].toString() == TRUE; if (props[POLLING_INTERVAL_MILLIS].length() !=0) internalPollingInterval = parseInt(props[POLLING_INTERVAL_MILLIS].toString()); else if (props[POLLING_INTERVAL_LEGACY].length() != 0) internalPollingInterval = parseInt(props[POLLING_INTERVAL_LEGACY].toString()) * 1000; if (props[PIGGYBACKING_ENABLED].length() != 0) internalPiggybackingEnabled = props[PIGGYBACKING_ENABLED].toString() == TRUE; if (props[LOGIN_AFTER_DISCONNECT].length() != 0) _loginAfterDisconnect = props[LOGIN_AFTER_DISCONNECT].toString() == TRUE; } /** * @private */ protected function getDefaultMessageResponder(agent:MessageAgent, msg:IMessage):MessageResponder { return super.getMessageResponder(agent, msg); } /** * @private * Requests the server return any messages queued since the last poll request for this FlexClient. * * @param event Event dispatched by the polling Timer. */ protected function internalPoll(event:Event = null):void { if (!pollOutstanding) { if (Log.isInfo()) _log.info("'{0}' channel requesting queued messages.", id); // If this poll is triggered via a direct invocation make sure no // concurrent poll Timer is running. if (timerRunning) _timer.stop(); var poll:CommandMessage = new CommandMessage(); poll.operation = CommandMessage.POLL_OPERATION; // Pass a null clientId - this indicates that we're polling for // any subscriptions for this client as opposed to receive()'ing // messages for a single Consumer instance subscribed to a specific destination. if (Log.isDebug()) _log.debug("'{0}' channel sending poll message\n{1}\n", id, poll.toString()); try { internalSend(new PollCommandMessageResponder(null, poll, this, _log)); pollOutstanding = true; } catch(e:Error) { // If there was a problem stop polling. stopPolling(); throw e; } } else { if (Log.isInfo()) _log.info("'{0}' channel waiting for poll response.", id); } } /** * @private * This method is invoked automatically when enablePolling() * is called and it results in net positive number of requests to poll. */ protected function startPolling():void { if (_pollingEnabled) { if (Log.isInfo()) _log.info("'{0}' channel polling started.", id); _shouldPoll = true; poll(); // Poll immediately. Once a result is returned we schedule the next poll invocation. } // If polling is not enabled, this is a no-op. } /** * @private * Returns true if this channel requires a timer for polling. */ protected function timerRequired():Boolean { return true; } //-------------------------------------------------------------------------- // // Static Constants // //-------------------------------------------------------------------------- /** * Define the default Polling Interval as 3000ms * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ private static const DEFAULT_POLLING_INTERVAL:int = 3000; } } //------------------------------------------------------------------------------ // // Private Classes // //------------------------------------------------------------------------------ import flash.utils.Timer; import mx.core.mx_internal; import mx.events.PropertyChangeEvent; import mx.logging.Log; import mx.logging.ILogger; import mx.messaging.MessageAgent; import mx.messaging.MessageResponder; import mx.messaging.channels.PollingChannel; import mx.messaging.events.ChannelFaultEvent; import mx.messaging.events.MessageEvent; import mx.messaging.messages.IMessage; import mx.messaging.messages.AcknowledgeMessage; import mx.messaging.messages.CommandMessage; import mx.messaging.messages.ErrorMessage; import mx.messaging.messages.MessagePerformanceUtils; import mx.resources.IResourceManager; import mx.resources.ResourceManager; use namespace mx_internal; [ResourceBundle("messaging")] /** * @private * Used internally to dispatch a batched set of messages returned in the poll * command message. */ class PollCommandMessageResponder extends MessageResponder { //-------------------------------------------------------------------------- // // Constructor // //-------------------------------------------------------------------------- /** * @private * Initializes an instance of the message responder that handles * multiple messages received from a poll request that a Channel makes. * * @param channel PollingChannel. */ public function PollCommandMessageResponder(agent:MessageAgent, msg:IMessage, channel:PollingChannel, log:ILogger) { super(agent, msg, channel); _log = log; // Track channel connected state. // If the channel disconnects while this poll is outstanding, suppress result/fault handling. channel.addEventListener(PropertyChangeEvent.PROPERTY_CHANGE, channelPropertyChangeHandler); } //-------------------------------------------------------------------------- // // Variables // //-------------------------------------------------------------------------- /** * @private * Reference to the logger for the associated Channel. */ private var _log:ILogger; /** * @private */ private var resourceManager:IResourceManager = ResourceManager.getInstance(); /** * @private */ private var suppressHandlers:Boolean; //-------------------------------------------------------------------------- // // Overridden Protected Methods // //-------------------------------------------------------------------------- /** * @private * Handles a poll command result from the server which is either an empty acknowledgement * if there were no messages to deliver or a response containing a list of messages to * dispatch in its body. * * @param msg The result message. */ override protected function resultHandler(msg:IMessage):void { var pollingChannel:PollingChannel = channel as PollingChannel; channel.removeEventListener(PropertyChangeEvent.PROPERTY_CHANGE, channelPropertyChangeHandler); if (suppressHandlers) { if (Log.isDebug()) { _log.debug("'{0}' channel ignoring response for poll request preceeding most recent disconnect.\n", channel.id); } doPoll(); // If the channel has reconnected we may need to start up the polling loop again. return; } if (msg is CommandMessage) // Poll response containing pushed messages. { pollingChannel.pollOutstanding = false; // Return early if the response is tagged as a no-op poll. if (msg.headers[CommandMessage.NO_OP_POLL_HEADER] == true) return; if (msg.body != null) { var messageList:Array = msg.body as Array; for each (var message:IMessage in messageList) { if (Log.isDebug()) { _log.debug("'{0}' channel got message\n{1}\n", channel.id, message.toString()); if (channel.mpiEnabled) { try { var mpiutil:MessagePerformanceUtils = new MessagePerformanceUtils(message); _log.debug(mpiutil.prettyPrint()); } catch (e:Error) { _log.debug("Could not get message performance information for: " + msg.toString()); } } } channel.dispatchEvent(MessageEvent.createEvent(MessageEvent.MESSAGE, message)); } } } else if (msg is AcknowledgeMessage) // Empty response (no messages to push). { pollingChannel.pollOutstanding = false; // The server returns an empty ack if there are no messages to return. // We don't need to do anything here. } else // Generally, the result of a connection failure while the poll was on the network. { var errMsg:ErrorMessage = new ErrorMessage(); errMsg.faultDetail = resourceManager.getString( "messaging", "receivedNull"); status(errMsg); return; } // If no errors, continue the polling interval. if (msg.headers[CommandMessage.POLL_WAIT_HEADER] != null) { doPoll(msg.headers[CommandMessage.POLL_WAIT_HEADER]); } else { doPoll(); } } /** * @private * Handles a fault while attempting to poll. * * @param msg The ErrorMessage from the remote destination. */ override protected function statusHandler(msg:IMessage):void { channel.removeEventListener(PropertyChangeEvent.PROPERTY_CHANGE, channelPropertyChangeHandler); if (suppressHandlers) { if (Log.isDebug()) { _log.debug("'{0}' channel ignoring response for poll request preceeding most recent disconnect.\n", channel.id); } return; } var pollingChannel:PollingChannel = PollingChannel(channel); pollingChannel.stopPolling(); // Shut down all polling. var errMsg:ErrorMessage = msg as ErrorMessage; var details:String = (errMsg != null) ? errMsg.faultDetail : ""; var faultEvent:ChannelFaultEvent = ChannelFaultEvent.createEvent (pollingChannel, false, "Channel.Polling.Error", "error", details); faultEvent.rootCause = msg; pollingChannel.dispatchEvent(faultEvent); // Reject this channel if the server does not support polling if (errMsg != null && errMsg.faultCode == "Server.PollNotSupported") { pollingChannel.pollFailed(true); } else { pollingChannel.pollFailed(false); } } /** * @private * Watch for 'connected' property change and in the event of a disconnect, * suppress poll result/fault handling. * * @param event A PropertyChangeEvent dispatched by the underlying channel. */ private function channelPropertyChangeHandler(event:PropertyChangeEvent):void { if (event.property == "connected" && !event.newValue) { suppressHandlers = true; } } /** * @private * Helper method to run or schedule the next poll for the underlying channel. * * @param adaptivePollWait The optional wait time before the next poll should be issued. */ private function doPoll(adaptivePollWait:int=0):void { var pollingChannel:PollingChannel = PollingChannel(channel); // Only set up the next poll if the channel is still connected. // Subscription invalidation commands pushed by the server can cause the channel to disconnect // and it shouldn't issue another poll request in this case. // Also, if the channel is piggybacking but not polling on an interval we don't want to // schedule the next poll. if (pollingChannel.connected && pollingChannel._shouldPoll) { // An adaptive polling value of 0 indicates that the channel should use its default // polling interval. if (adaptivePollWait == 0) { if (pollingChannel.internalPollingInterval == 0) { // No need for a Timer at all if we're polling immediately. pollingChannel.poll(); } else if (!pollingChannel.timerRunning) { // Poll at the base rate for this Channel; no adaptive poll wait is defined. pollingChannel._timer.delay = pollingChannel._pollingInterval; pollingChannel._timer.start(); } } else { // Use adaptive poll wait. pollingChannel._timer.delay = adaptivePollWait; pollingChannel._timer.start(); } } } }