//////////////////////////////////////////////////////////////////////////////// // // Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // //////////////////////////////////////////////////////////////////////////////// package mx.messaging { import flash.events.TimerEvent; import flash.utils.Timer; import mx.core.mx_internal; import mx.events.PropertyChangeEvent; import mx.logging.Log; import mx.messaging.channels.PollingChannel; import mx.messaging.events.ChannelEvent; import mx.messaging.events.ChannelFaultEvent; import mx.messaging.events.MessageEvent; import mx.messaging.events.MessageFaultEvent; import mx.messaging.messages.AcknowledgeMessage; import mx.messaging.messages.CommandMessage; import mx.messaging.messages.ErrorMessage; import mx.messaging.messages.IMessage; import mx.resources.IResourceManager; import mx.resources.ResourceManager; use namespace mx_internal; /** * Dispatched when a message is received by the Consumer. * * @eventType mx.messaging.events.MessageEvent.MESSAGE * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ [Event(name="message", type="mx.messaging.events.MessageEvent")] [ResourceBundle("messaging")] /** * The AbstractConsumer is the base class for both the Consumer and * MultiTopicConsumer classes. You use those classes to receive pushed * messages from the server. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public class AbstractConsumer extends MessageAgent { //-------------------------------------------------------------------------- // // Constructor // //-------------------------------------------------------------------------- /** * Constructs a Consumer. * * * @example * * function initConsumer():void * { * var consumer:Consumer = new Consumer(); * consumer.destination = "NASDAQ"; * consumer.selector = "operation IN ('Bid','Ask')"; * consumer.addEventListener(MessageEvent.MESSAGE, messageHandler); * consumer.subscribe(); * } * * function messageHandler(event:MessageEvent):void * { * var msg:IMessage = event.message; * var info:Object = msg.body; * trace("-App recieved message: " + msg.toString()); * } * * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function AbstractConsumer() { super(); _log = Log.getLogger("mx.messaging.Consumer"); _agentType = "consumer"; } //-------------------------------------------------------------------------- // // Variables // //-------------------------------------------------------------------------- /** * @private * This is the current number of resubscribe attempts that we've done. */ private var _currentAttempt:int; /** * @private * The timer used for resubscribe attempts. */ private var _resubscribeTimer:Timer; /** * Flag indicating whether this consumer should be subscribed or not. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ protected var _shouldBeSubscribed:Boolean; /** * @private * Current subscribe message - used for resubscribe attempts. */ private var _subscribeMsg:CommandMessage; /** * @private */ private var resourceManager:IResourceManager = ResourceManager.getInstance(); //-------------------------------------------------------------------------- // // Properties // //-------------------------------------------------------------------------- //---------------------------------- // clientId //---------------------------------- /** * @private * If our clientId has changed we may need to unsubscribe() using the * current clientId and then resubscribe using the new clientId. * // TODO - remove this? * * @param value The clientId value. */ override mx_internal function setClientId(value:String):void { if (super.clientId != value) { var resetSubscription:Boolean = false; if (subscribed) { unsubscribe(); resetSubscription = true; } super.setClientId(value); if (resetSubscription) subscribe(value); } } //---------------------------------- // destination //---------------------------------- /** * @private * Updates the destination for this Consumer and resubscribes if the * Consumer is currently subscribed. */ override public function set destination(value:String):void { if (destination != value) { var resetSubscription:Boolean = false; if (subscribed) { unsubscribe(); resetSubscription = true; } super.destination = value; if (resetSubscription) subscribe(); } } //---------------------------------- // maxFrequency //---------------------------------- /** * @private */ private var _maxFrequency:uint = 0; [Bindable(event="propertyChange")] /** * Determines the maximum number of messages per second the Consumer wants * to receive. A server that understands this value will use it as an input * while it determines how fast to send messages to the Consumer. Default is 0 * which means Consumer does not have a preference for the message rate. * Note that this property should be set before the Consumer subscribes and * any changes after Consumer subscription will not have any effect until * Consumer unsubscribes and resubscribes. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function get maxFrequency():uint { return _maxFrequency; } /** * @private */ public function set maxFrequency(value:uint):void { var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "maxFrequency", _maxFrequency, value); _maxFrequency = value; dispatchEvent(event); } //---------------------------------- // resubscribeAttempts //---------------------------------- /** * @private */ private var _resubscribeAttempts:int = 5; [Bindable(event="propertyChange")] /** * The number of resubscribe attempts that the Consumer makes in the event * that the destination is unavailable or the connection to the destination fails. * A value of -1 enables infinite attempts. * A value of zero disables resubscribe attempts. *

* Resubscribe attempts are made at a constant rate according to the resubscribe interval * value. When a resubscribe attempt is made if the underlying channel for the Consumer is not * connected or attempting to connect the channel will start a connect attempt. * Subsequent Consumer resubscribe attempts that occur while the underlying * channel connect attempt is outstanding are effectively ignored until * the outstanding channel connect attempt succeeds or fails. *

* * @see mx.messaging.Consumer#resubscribeInterval * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function get resubscribeAttempts():int { return _resubscribeAttempts; } /** * @private */ public function set resubscribeAttempts(value:int):void { if (_resubscribeAttempts != value) { if (value == 0) stopResubscribeTimer(); var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "resubscribeAttempts", _resubscribeAttempts, value); _resubscribeAttempts = value; dispatchEvent(event); } } //---------------------------------- // resubscribeInterval //---------------------------------- /** * @private */ private var _resubscribeInterval:int = 5000; [Bindable(event="propertyChange")] /** * The number of milliseconds between resubscribe attempts. * If a Consumer doesn't receive an acknowledgement for a subscription * request, it will wait the specified number of milliseconds before * attempting to resubscribe. * Setting the value to zero disables resubscriptions. *

* Resubscribe attempts are made at a constant rate according to this * value. When a resubscribe attempt is made if the underlying channel for the Consumer is not * connected or attempting to connect the channel will start a connect attempt. * Subsequent Consumer resubscribe attempts that occur while the underlying * channel connect attempt is outstanding are effectively ignored until * the outstanding channel connect attempt succeeds or fails. *

* * @see mx.messaging.Consumer#resubscribeInterval * * @throws ArgumentError If the assigned value is negative. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function get resubscribeInterval():int { return _resubscribeInterval; } /** * @private */ public function set resubscribeInterval(value:int):void { if (_resubscribeInterval != value) { if (value < 0) { var message:String = resourceManager.getString( "messaging", "resubscribeIntervalNegative"); throw new ArgumentError(message); } else if (value == 0) { stopResubscribeTimer(); } else if (_resubscribeTimer != null) { _resubscribeTimer.delay = value; } var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "resubscribeInterval", _resubscribeInterval, value); _resubscribeInterval = value; dispatchEvent(event); } } //---------------------------------- // subscribed //---------------------------------- /** * @private */ private var _subscribed:Boolean; [Bindable(event="propertyChange")] /** * Indicates whether the Consumer is currently subscribed. The propertyChange * event is dispatched when this property changes. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function get subscribed():Boolean { return _subscribed; } /** * @private */ protected function setSubscribed(value:Boolean):void { if (_subscribed != value) { var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "subscribed", _subscribed, value); _subscribed = value; // Register or unregister our subscription state with the ConsumerMessageDispatcher. // This allows the singleton ConsumerMessageDispatcher to start or stop listening for // messages on our behalf. if (_subscribed) { ConsumerMessageDispatcher.getInstance().registerSubscription(this); if (channelSet != null && channelSet.currentChannel != null && channelSet.currentChannel is PollingChannel) PollingChannel(channelSet.currentChannel).enablePolling(); } else { ConsumerMessageDispatcher.getInstance().unregisterSubscription(this); if (channelSet != null && channelSet.currentChannel != null && channelSet.currentChannel is PollingChannel) PollingChannel(channelSet.currentChannel).disablePolling(); } dispatchEvent(event); } } //---------------------------------- // timestamp //---------------------------------- /** * @private */ private var _timestamp:Number = -1; [Bindable(event="propertyChange")] /** * Contains the timestamp of the most recent message this Consumer * has received. * This value is passed to the destination in a receive() call * to request that it deliver messages for the Consumer from the timestamp * forward. * All messages with a timestamp value greater than the * timestamp value will be returned during a poll operation. * Setting this value to -1 will retrieve all cached messages from the * destination. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function get timestamp():Number { return _timestamp; } /** * @private */ public function set timestamp(value:Number):void { if (_timestamp != value) { var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "timestamp", _timestamp, value); _timestamp = value; dispatchEvent(event); } } //-------------------------------------------------------------------------- // // Overridden Methods // //-------------------------------------------------------------------------- /** * @private * Custom processing for subscribe, unsubscribe and poll message * acknowledgments. * * @param ackMsg The AcknowledgeMessage. * * @param msg The original subscribe, unsubscribe or poll message. */ override public function acknowledge(ackMsg:AcknowledgeMessage, msg:IMessage):void { // Ignore acks for any outstanding messages that return after disconnect() is invoked. if (_disconnectBarrier) return; // Only run Consumer processing if this isn't an error. if (!ackMsg.headers[AcknowledgeMessage.ERROR_HINT_HEADER] && (msg is CommandMessage)) { var command:CommandMessage = msg as CommandMessage; var op:int = command.operation; // For MultiTopicConsumers, the message gets marked if this is the // message completely unsubscribes the client. if (op == CommandMessage.MULTI_SUBSCRIBE_OPERATION) { if (msg.headers.DSlastUnsub != null) op = CommandMessage.UNSUBSCRIBE_OPERATION; else op = CommandMessage.SUBSCRIBE_OPERATION; } switch (op) { case CommandMessage.UNSUBSCRIBE_OPERATION: if (Log.isInfo()) _log.info("'{0}' {1} acknowledge for unsubscribe.", id, _agentType); super.setClientId(null); setSubscribed(false); // Stop listening for messages. ackMsg.clientId = null; // Force the ack's clientId to null as well before ack'ing it. super.acknowledge(ackMsg, msg); break; case CommandMessage.SUBSCRIBE_OPERATION: stopResubscribeTimer(); // NOTE: the -1 in the timestamp assignment below. // This works around a bug where if a Producer sends // a message in the same batch as the subscribe, // it will end up with (likely) the same timestamp // as the consumer. Because the message is sent // by the client after the subscribe though, it // should still be delivered. // TODO: Improve solution here. if (ackMsg.timestamp > _timestamp) _timestamp = ackMsg.timestamp - 1; if (Log.isInfo()) _log.info("'{0}' {1} acknowledge for subscribe. Client id '{2}' new timestamp {3}", id, _agentType, ackMsg.clientId, _timestamp); super.setClientId(ackMsg.clientId); setSubscribed(true); super.acknowledge(ackMsg, msg); break; // Handle the result of a receive() invocation (a Consumer instance-specific poll request). case CommandMessage.POLL_OPERATION: if ((ackMsg.body != null) && (ackMsg.body is Array)) { var messageList:Array = ackMsg.body as Array; for each (var message:IMessage in messageList) messageHandler(MessageEvent.createEvent(MessageEvent.MESSAGE, message)); } super.acknowledge(ackMsg, msg); break; } } else { super.acknowledge(ackMsg, msg); } } /** * Disconnects the Consumer from its remote destination. * This method should be invoked on a Consumer that is no longer * needed by an application after unsubscribing. * This method does not wait for outstanding network operations to complete * and does not send an unsubscribe message to the server. * After invoking disconnect(), the Consumer will report that it is in an * disconnected, unsubscribed state because it will not receive any more * messages until it has reconnected and resubscribed. * Disconnecting stops automatic resubscription attempts if they are running. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ override public function disconnect():void { // We don't invoke unsubscribe() in this case because a Consumer subscribed to a // JMS destination durably will blow away the durable subscription. _shouldBeSubscribed = false; // Prevent resubscribe attempts. stopResubscribeTimer(); setSubscribed(false); super.disconnect(); } /** * @private * The Consumer supresses ErrorMessage processing if the error is * retryable and it is configured to resubscribe. * * @param errMsg The ErrorMessage describing the fault. * * @param msg The original message (generally a subscribe). */ override public function fault(errMsg:ErrorMessage, msg:IMessage):void { // Ignore faults for any outstanding messages that return after disconnect() is invoked. if (_disconnectBarrier) return; if (errMsg.headers[ErrorMessage.RETRYABLE_HINT_HEADER]) { if (_resubscribeTimer == null) { // If this error correlates to our current subscribe message, // we should no longer be subscribed. if ((_subscribeMsg != null) && (errMsg.correlationId == _subscribeMsg.messageId)) _shouldBeSubscribed = false; super.fault(errMsg, msg); } // Else, suppress the fault dispatch because the resubscribe // timer is running and will generate a fault when it runs out of // allowed resubscribe attempts. } else { super.fault(errMsg, msg); } } /** * @private * Custom processing to warn the user if the consumer is connected over * a non-real channel. * * @param event The ChannelEvent. */ override public function channelConnectHandler(event:ChannelEvent):void { super.channelConnectHandler(event); if (connected && channelSet != null && channelSet.currentChannel != null && !channelSet.currentChannel.realtime && Log.isWarn()) { _log.warn("'{0}' {1} connected over a non-realtime channel '{2}'" + " which means channel is not automatically receiving updates via polling or server push." , id, _agentType, channelSet.currentChannel.id); } } /** * @private * Custom processing to start up a resubscribe timer if our channel is * disconnected when we should be subscribed. * * @param event The ChannelEvent. */ override public function channelDisconnectHandler(event:ChannelEvent):void { setSubscribed(false); super.channelDisconnectHandler(event); if (_shouldBeSubscribed && !event.rejected) startResubscribeTimer(); } /** * @private * Custom processing to start up a resubscribe timer if our channel faults * when we should be subscribed. * * @param event The ChannelFaultEvent. */ override public function channelFaultHandler(event:ChannelFaultEvent):void { if (!event.channel.connected) setSubscribed(false); super.channelFaultHandler(event); if (_shouldBeSubscribed && !event.rejected && !event.channel.connected) startResubscribeTimer(); } //-------------------------------------------------------------------------- // // Methods // //-------------------------------------------------------------------------- /** * Requests any messages that are queued for this Consumer on the server. * This method should only be used for Consumers that subscribe over non-realtime, * non-polling channels. * This method is a no-op if the Consumer is not subscribed. * * @param timestamp This argument is deprecated and is ignored. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function receive(timestamp:Number = 0):void { if (clientId != null) // We need a clientId to distinguish this from a generic poll request sent by a polling channel. { var msg:CommandMessage = new CommandMessage(); msg.operation = CommandMessage.POLL_OPERATION; msg.destination = destination; internalSend(msg); } } /** * Subscribes to the remote destination. * * @param clientId The client id to subscribe with. Use null for non-durable Consumers. If the subscription is durable, a consistent * value must be supplied every time the Consumer subscribes in order * to reconnect to the correct durable subscription in the remote destination. * * @throws mx.messaging.errors.InvalidDestinationError If no destination is set. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function subscribe(clientId:String = null):void { // Set a flag to determine whether the passed clientId differs from the // current value and should be assigned. var resetClientId:Boolean = ((clientId != null) && (super.clientId != clientId)) ? true : false; if (subscribed && resetClientId) { // We're already subscribed, but we need to resubscribe under // the new clientId. unsubscribe(); } // Make sure any resubscribe timer is stopped. stopResubscribeTimer(); _shouldBeSubscribed = true; if (resetClientId) super.setClientId(clientId); if (Log.isInfo()) _log.info("'{0}' {1} subscribe.", id, _agentType); _subscribeMsg = buildSubscribeMessage(); internalSend(_subscribeMsg); } /** * Unsubscribes from the remote destination. In the case of durable JMS * subscriptions, this will destroy the durable subscription on the JMS server. * * @param preserveDurable - when true, durable JMS subscriptions are not destroyed * allowing consumers to later resubscribe and receive missed messages * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function unsubscribe(preserveDurable:Boolean = false):void { _shouldBeSubscribed = false; if (subscribed) { // Stop listening now for any messages as we could be set to a new // channel before the ack comes back, and once the ack returns we // will no longer have a valid client id. if (channelSet != null) channelSet.removeEventListener(destination, messageHandler); if (Log.isInfo()) _log.info("'{0}' {1} unsubscribe.", id, _agentType); internalSend(buildUnsubscribeMessage(preserveDurable)); } else { stopResubscribeTimer(); } } //-------------------------------------------------------------------------- // // Internal Methods // //-------------------------------------------------------------------------- /** * @private * Consumers subscribe for messages from a destination and this is the handler * method that is invoked when a message for this Consumer is pushed or polled * from the server. * * @param event The MessageEvent. */ mx_internal function messageHandler(event:MessageEvent):void { // NOTE: This method is invoked directly by the ConsumerMessageDispatcher. // The event flow for a pushed message is: // 1. Channel receives a pushed/polled message and dispatches a message event // 2. Any ChannelSets connected to the Channel will handle these events in ChannelSet.messageHandler(); // simply redispatching them. // 3. Consumers that subscribe to a destination trigger the internal use of a shared ConsumerMessageDispatcher // that listens for message events from any ChannelSets that Consumers have subscribed over and this helper routes pushed messages to the proper Consumer instances. var message:IMessage = event.message; if (message is CommandMessage) { var command:CommandMessage = message as CommandMessage; switch (command.operation) { case CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION: // We've been unsubscribed but it wasn't the result of an unsubscribe // message this agent sent. Set unsubscribe to false which will inform // the polling channel to stop polling if a polling channel is being used. setSubscribed(false); break; default: if (Log.isWarn()) _log.warn("'{0}' received a CommandMessage '{1}' that could not be handled.", id, CommandMessage.getOperationAsString(command.operation)); } /* * Command messages are handled internally by the Consumer and * are not dispatched to message listeners via MessageEvents. */ return; } if (message.timestamp > _timestamp) _timestamp = message.timestamp; // Server might push out error messages (eg. during MessageClient.invalidate) // that need to be dispatched as message fault events. if (message is ErrorMessage) dispatchEvent(MessageFaultEvent.createEvent(ErrorMessage(message))); else dispatchEvent(MessageEvent.createEvent(MessageEvent.MESSAGE, message)); } //-------------------------------------------------------------------------- // // Protected Methods // //-------------------------------------------------------------------------- /** * Returns a subscribe message. * This method should be overridden by subclasses if they need custom * subscribe messages. * * @return The subscribe CommandMessage. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ protected function buildSubscribeMessage():CommandMessage { var msg:CommandMessage = new CommandMessage(); msg.operation = CommandMessage.SUBSCRIBE_OPERATION; msg.clientId = clientId; msg.destination = destination; if (maxFrequency > 0) msg.headers[CommandMessage.MAX_FREQUENCY_HEADER] = maxFrequency; return msg; } /** * Returns an unsubscribe message. * This method should be overridden by subclasses if they need custom * unsubscribe messages. * * @param preserveDurable - when true, durable JMS subscriptions are not destroyed * allowing consumers to later resubscribe and receive missed messages * * @return The unsubscribe CommandMessage. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ protected function buildUnsubscribeMessage(preserveDurable:Boolean):CommandMessage { var msg:CommandMessage = new CommandMessage(); msg.operation = CommandMessage.UNSUBSCRIBE_OPERATION; msg.clientId = clientId; msg.destination = destination; // only include the PRESERVE_DURABLE_HEADER param in the message if // its value is true if (preserveDurable) msg.headers[CommandMessage.PRESERVE_DURABLE_HEADER] = preserveDurable; return msg; } /** * @private * Attempt to resubscribe. * This can be called directly or from a Timer's event handler. * * @param event The timer event for resubscribe attempts. */ protected function resubscribe(event:TimerEvent):void { // If we're past our limit of attempts, fault out. if ((_resubscribeAttempts != -1) && (_currentAttempt >= _resubscribeAttempts)) { stopResubscribeTimer(); _shouldBeSubscribed = false; var errMsg:ErrorMessage = new ErrorMessage(); errMsg.faultCode = "Client.Error.Subscribe"; errMsg.faultString = resourceManager.getString( "messaging", "consumerSubscribeError"); errMsg.faultDetail = resourceManager.getString( "messaging", "failedToSubscribe"); errMsg.correlationId = _subscribeMsg.messageId; fault(errMsg, _subscribeMsg); return; } if (Log.isDebug()) _log.debug("'{0}' {1} trying to resubscribe.", id, _agentType); _resubscribeTimer.delay = _resubscribeInterval; _currentAttempt++; // Send the resubscribe message, skipping the MessageAgent's queue that blocks // messages until the clientId is set. internalSend(_subscribeMsg, false); } /** * @private * This method will start a timer which attempts to resubscribe * periodically. */ protected function startResubscribeTimer():void { if (_shouldBeSubscribed && (_resubscribeTimer == null)) { // If we're configured for resubscribe start up the timer. if ((_resubscribeAttempts != 0) && (_resubscribeInterval > 0)) { if (Log.isDebug()) _log.debug("'{0}' {1} starting resubscribe timer.", id, _agentType); /* * Initially, the timeout is set to 1 so we try to * reconnect immediately (perhaps to a different channel). * after that, it will poll at the configured time interval. * */ _resubscribeTimer = new Timer(1); _resubscribeTimer.addEventListener(TimerEvent.TIMER, resubscribe); _resubscribeTimer.start(); _currentAttempt = 0; } } } /** * @private * Stops a resubscribe timer if one is running. */ protected function stopResubscribeTimer():void { if (_resubscribeTimer != null) { if (Log.isDebug()) _log.debug("'{0}' {1} stopping resubscribe timer.", id, _agentType); _resubscribeTimer.removeEventListener(TimerEvent.TIMER, resubscribe); _resubscribeTimer.reset(); _resubscribeTimer = null; } } } }