//////////////////////////////////////////////////////////////////////////////// // // Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // //////////////////////////////////////////////////////////////////////////////// package mx.messaging { import flash.events.TimerEvent; import flash.utils.Timer; import mx.core.mx_internal; import mx.events.PropertyChangeEvent; import mx.logging.Log; import mx.messaging.channels.PollingChannel; import mx.messaging.events.ChannelEvent; import mx.messaging.events.ChannelFaultEvent; import mx.messaging.events.MessageEvent; import mx.messaging.events.MessageFaultEvent; import mx.messaging.messages.AcknowledgeMessage; import mx.messaging.messages.CommandMessage; import mx.messaging.messages.ErrorMessage; import mx.messaging.messages.IMessage; import mx.resources.IResourceManager; import mx.resources.ResourceManager; use namespace mx_internal; /** * Dispatched when a message is received by the Consumer. * * @eventType mx.messaging.events.MessageEvent.MESSAGE * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ [Event(name="message", type="mx.messaging.events.MessageEvent")] [ResourceBundle("messaging")] /** * The AbstractConsumer is the base class for both the Consumer and * MultiTopicConsumer classes. You use those classes to receive pushed * messages from the server. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public class AbstractConsumer extends MessageAgent { //-------------------------------------------------------------------------- // // Constructor // //-------------------------------------------------------------------------- /** * Constructs a Consumer. * * * @example *
* Resubscribe attempts are made at a constant rate according to the resubscribe interval * value. When a resubscribe attempt is made if the underlying channel for the Consumer is not * connected or attempting to connect the channel will start a connect attempt. * Subsequent Consumer resubscribe attempts that occur while the underlying * channel connect attempt is outstanding are effectively ignored until * the outstanding channel connect attempt succeeds or fails. *
* * @see mx.messaging.Consumer#resubscribeInterval * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function get resubscribeAttempts():int { return _resubscribeAttempts; } /** * @private */ public function set resubscribeAttempts(value:int):void { if (_resubscribeAttempts != value) { if (value == 0) stopResubscribeTimer(); var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "resubscribeAttempts", _resubscribeAttempts, value); _resubscribeAttempts = value; dispatchEvent(event); } } //---------------------------------- // resubscribeInterval //---------------------------------- /** * @private */ private var _resubscribeInterval:int = 5000; [Bindable(event="propertyChange")] /** * The number of milliseconds between resubscribe attempts. * If a Consumer doesn't receive an acknowledgement for a subscription * request, it will wait the specified number of milliseconds before * attempting to resubscribe. * Setting the value to zero disables resubscriptions. ** Resubscribe attempts are made at a constant rate according to this * value. When a resubscribe attempt is made if the underlying channel for the Consumer is not * connected or attempting to connect the channel will start a connect attempt. * Subsequent Consumer resubscribe attempts that occur while the underlying * channel connect attempt is outstanding are effectively ignored until * the outstanding channel connect attempt succeeds or fails. *
* * @see mx.messaging.Consumer#resubscribeInterval * * @throws ArgumentError If the assigned value is negative. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function get resubscribeInterval():int { return _resubscribeInterval; } /** * @private */ public function set resubscribeInterval(value:int):void { if (_resubscribeInterval != value) { if (value < 0) { var message:String = resourceManager.getString( "messaging", "resubscribeIntervalNegative"); throw new ArgumentError(message); } else if (value == 0) { stopResubscribeTimer(); } else if (_resubscribeTimer != null) { _resubscribeTimer.delay = value; } var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "resubscribeInterval", _resubscribeInterval, value); _resubscribeInterval = value; dispatchEvent(event); } } //---------------------------------- // subscribed //---------------------------------- /** * @private */ private var _subscribed:Boolean; [Bindable(event="propertyChange")] /** * Indicates whether the Consumer is currently subscribed. ThepropertyChange
* event is dispatched when this property changes.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function get subscribed():Boolean
{
return _subscribed;
}
/**
* @private
*/
protected function setSubscribed(value:Boolean):void
{
if (_subscribed != value)
{
var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "subscribed", _subscribed, value);
_subscribed = value;
// Register or unregister our subscription state with the ConsumerMessageDispatcher.
// This allows the singleton ConsumerMessageDispatcher to start or stop listening for
// messages on our behalf.
if (_subscribed)
{
ConsumerMessageDispatcher.getInstance().registerSubscription(this);
if (channelSet != null && channelSet.currentChannel != null && channelSet.currentChannel is PollingChannel)
PollingChannel(channelSet.currentChannel).enablePolling();
}
else
{
ConsumerMessageDispatcher.getInstance().unregisterSubscription(this);
if (channelSet != null && channelSet.currentChannel != null && channelSet.currentChannel is PollingChannel)
PollingChannel(channelSet.currentChannel).disablePolling();
}
dispatchEvent(event);
}
}
//----------------------------------
// timestamp
//----------------------------------
/**
* @private
*/
private var _timestamp:Number = -1;
[Bindable(event="propertyChange")]
/**
* Contains the timestamp of the most recent message this Consumer
* has received.
* This value is passed to the destination in a receive()
call
* to request that it deliver messages for the Consumer from the timestamp
* forward.
* All messages with a timestamp value greater than the
* timestamp
value will be returned during a poll operation.
* Setting this value to -1 will retrieve all cached messages from the
* destination.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function get timestamp():Number
{
return _timestamp;
}
/**
* @private
*/
public function set timestamp(value:Number):void
{
if (_timestamp != value)
{
var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "timestamp", _timestamp, value);
_timestamp = value;
dispatchEvent(event);
}
}
//--------------------------------------------------------------------------
//
// Overridden Methods
//
//--------------------------------------------------------------------------
/**
* @private
* Custom processing for subscribe, unsubscribe and poll message
* acknowledgments.
*
* @param ackMsg The AcknowledgeMessage.
*
* @param msg The original subscribe, unsubscribe or poll message.
*/
override public function acknowledge(ackMsg:AcknowledgeMessage, msg:IMessage):void
{
// Ignore acks for any outstanding messages that return after disconnect() is invoked.
if (_disconnectBarrier)
return;
// Only run Consumer processing if this isn't an error.
if (!ackMsg.headers[AcknowledgeMessage.ERROR_HINT_HEADER] && (msg is CommandMessage))
{
var command:CommandMessage = msg as CommandMessage;
var op:int = command.operation;
// For MultiTopicConsumers, the message gets marked if this is the
// message completely unsubscribes the client.
if (op == CommandMessage.MULTI_SUBSCRIBE_OPERATION)
{
if (msg.headers.DSlastUnsub != null)
op = CommandMessage.UNSUBSCRIBE_OPERATION;
else
op = CommandMessage.SUBSCRIBE_OPERATION;
}
switch (op)
{
case CommandMessage.UNSUBSCRIBE_OPERATION:
if (Log.isInfo())
_log.info("'{0}' {1} acknowledge for unsubscribe.", id, _agentType);
super.setClientId(null);
setSubscribed(false); // Stop listening for messages.
ackMsg.clientId = null; // Force the ack's clientId to null as well before ack'ing it.
super.acknowledge(ackMsg, msg);
break;
case CommandMessage.SUBSCRIBE_OPERATION:
stopResubscribeTimer();
// NOTE: the -1 in the timestamp assignment below.
// This works around a bug where if a Producer sends
// a message in the same batch as the subscribe,
// it will end up with (likely) the same timestamp
// as the consumer. Because the message is sent
// by the client after the subscribe though, it
// should still be delivered.
// TODO: Improve solution here.
if (ackMsg.timestamp > _timestamp)
_timestamp = ackMsg.timestamp - 1;
if (Log.isInfo())
_log.info("'{0}' {1} acknowledge for subscribe. Client id '{2}' new timestamp {3}",
id, _agentType, ackMsg.clientId, _timestamp);
super.setClientId(ackMsg.clientId);
setSubscribed(true);
super.acknowledge(ackMsg, msg);
break;
// Handle the result of a receive() invocation (a Consumer instance-specific poll request).
case CommandMessage.POLL_OPERATION:
if ((ackMsg.body != null) && (ackMsg.body is Array))
{
var messageList:Array = ackMsg.body as Array;
for each (var message:IMessage in messageList)
messageHandler(MessageEvent.createEvent(MessageEvent.MESSAGE, message));
}
super.acknowledge(ackMsg, msg);
break;
}
}
else
{
super.acknowledge(ackMsg, msg);
}
}
/**
* Disconnects the Consumer from its remote destination.
* This method should be invoked on a Consumer that is no longer
* needed by an application after unsubscribing.
* This method does not wait for outstanding network operations to complete
* and does not send an unsubscribe message to the server.
* After invoking disconnect(), the Consumer will report that it is in an
* disconnected, unsubscribed state because it will not receive any more
* messages until it has reconnected and resubscribed.
* Disconnecting stops automatic resubscription attempts if they are running.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
override public function disconnect():void
{
// We don't invoke unsubscribe() in this case because a Consumer subscribed to a
// JMS destination durably will blow away the durable subscription.
_shouldBeSubscribed = false; // Prevent resubscribe attempts.
stopResubscribeTimer();
setSubscribed(false);
super.disconnect();
}
/**
* @private
* The Consumer supresses ErrorMessage processing if the error is
* retryable and it is configured to resubscribe.
*
* @param errMsg The ErrorMessage describing the fault.
*
* @param msg The original message (generally a subscribe).
*/
override public function fault(errMsg:ErrorMessage, msg:IMessage):void
{
// Ignore faults for any outstanding messages that return after disconnect() is invoked.
if (_disconnectBarrier)
return;
if (errMsg.headers[ErrorMessage.RETRYABLE_HINT_HEADER])
{
if (_resubscribeTimer == null)
{
// If this error correlates to our current subscribe message,
// we should no longer be subscribed.
if ((_subscribeMsg != null) && (errMsg.correlationId == _subscribeMsg.messageId))
_shouldBeSubscribed = false;
super.fault(errMsg, msg);
}
// Else, suppress the fault dispatch because the resubscribe
// timer is running and will generate a fault when it runs out of
// allowed resubscribe attempts.
}
else
{
super.fault(errMsg, msg);
}
}
/**
* @private
* Custom processing to warn the user if the consumer is connected over
* a non-real channel.
*
* @param event The ChannelEvent.
*/
override public function channelConnectHandler(event:ChannelEvent):void
{
super.channelConnectHandler(event);
if (connected && channelSet != null && channelSet.currentChannel != null
&& !channelSet.currentChannel.realtime && Log.isWarn())
{
_log.warn("'{0}' {1} connected over a non-realtime channel '{2}'"
+ " which means channel is not automatically receiving updates via polling or server push."
, id, _agentType, channelSet.currentChannel.id);
}
}
/**
* @private
* Custom processing to start up a resubscribe timer if our channel is
* disconnected when we should be subscribed.
*
* @param event The ChannelEvent.
*/
override public function channelDisconnectHandler(event:ChannelEvent):void
{
setSubscribed(false);
super.channelDisconnectHandler(event);
if (_shouldBeSubscribed && !event.rejected)
startResubscribeTimer();
}
/**
* @private
* Custom processing to start up a resubscribe timer if our channel faults
* when we should be subscribed.
*
* @param event The ChannelFaultEvent.
*/
override public function channelFaultHandler(event:ChannelFaultEvent):void
{
if (!event.channel.connected)
setSubscribed(false);
super.channelFaultHandler(event);
if (_shouldBeSubscribed && !event.rejected && !event.channel.connected)
startResubscribeTimer();
}
//--------------------------------------------------------------------------
//
// Methods
//
//--------------------------------------------------------------------------
/**
* Requests any messages that are queued for this Consumer on the server.
* This method should only be used for Consumers that subscribe over non-realtime,
* non-polling channels.
* This method is a no-op if the Consumer is not subscribed.
*
* @param timestamp This argument is deprecated and is ignored.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function receive(timestamp:Number = 0):void
{
if (clientId != null) // We need a clientId to distinguish this from a generic poll request sent by a polling channel.
{
var msg:CommandMessage = new CommandMessage();
msg.operation = CommandMessage.POLL_OPERATION;
msg.destination = destination;
internalSend(msg);
}
}
/**
* Subscribes to the remote destination.
*
* @param clientId The client id to subscribe with. Use null for non-durable Consumers. If the subscription is durable, a consistent
* value must be supplied every time the Consumer subscribes in order
* to reconnect to the correct durable subscription in the remote destination.
*
* @throws mx.messaging.errors.InvalidDestinationError If no destination is set.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function subscribe(clientId:String = null):void
{
// Set a flag to determine whether the passed clientId differs from the
// current value and should be assigned.
var resetClientId:Boolean = ((clientId != null) &&
(super.clientId != clientId)) ? true : false;
if (subscribed && resetClientId)
{
// We're already subscribed, but we need to resubscribe under
// the new clientId.
unsubscribe();
}
// Make sure any resubscribe timer is stopped.
stopResubscribeTimer();
_shouldBeSubscribed = true;
if (resetClientId)
super.setClientId(clientId);
if (Log.isInfo())
_log.info("'{0}' {1} subscribe.", id, _agentType);
_subscribeMsg = buildSubscribeMessage();
internalSend(_subscribeMsg);
}
/**
* Unsubscribes from the remote destination. In the case of durable JMS
* subscriptions, this will destroy the durable subscription on the JMS server.
*
* @param preserveDurable - when true, durable JMS subscriptions are not destroyed
* allowing consumers to later resubscribe and receive missed messages
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function unsubscribe(preserveDurable:Boolean = false):void
{
_shouldBeSubscribed = false;
if (subscribed)
{
// Stop listening now for any messages as we could be set to a new
// channel before the ack comes back, and once the ack returns we
// will no longer have a valid client id.
if (channelSet != null)
channelSet.removeEventListener(destination, messageHandler);
if (Log.isInfo())
_log.info("'{0}' {1} unsubscribe.", id, _agentType);
internalSend(buildUnsubscribeMessage(preserveDurable));
}
else
{
stopResubscribeTimer();
}
}
//--------------------------------------------------------------------------
//
// Internal Methods
//
//--------------------------------------------------------------------------
/**
* @private
* Consumers subscribe for messages from a destination and this is the handler
* method that is invoked when a message for this Consumer is pushed or polled
* from the server.
*
* @param event The MessageEvent.
*/
mx_internal function messageHandler(event:MessageEvent):void
{
// NOTE: This method is invoked directly by the ConsumerMessageDispatcher.
// The event flow for a pushed message is:
// 1. Channel receives a pushed/polled message and dispatches a message event
// 2. Any ChannelSets connected to the Channel will handle these events in ChannelSet.messageHandler();
// simply redispatching them.
// 3. Consumers that subscribe to a destination trigger the internal use of a shared ConsumerMessageDispatcher
// that listens for message events from any ChannelSets that Consumers have subscribed over and this helper routes pushed messages to the proper Consumer instances.
var message:IMessage = event.message;
if (message is CommandMessage)
{
var command:CommandMessage = message as CommandMessage;
switch (command.operation)
{
case CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION:
// We've been unsubscribed but it wasn't the result of an unsubscribe
// message this agent sent. Set unsubscribe to false which will inform
// the polling channel to stop polling if a polling channel is being used.
setSubscribed(false);
break;
default:
if (Log.isWarn())
_log.warn("'{0}' received a CommandMessage '{1}' that could not be handled.", id, CommandMessage.getOperationAsString(command.operation));
}
/*
* Command messages are handled internally by the Consumer and
* are not dispatched to message listeners via MessageEvents.
*/
return;
}
if (message.timestamp > _timestamp)
_timestamp = message.timestamp;
// Server might push out error messages (eg. during MessageClient.invalidate)
// that need to be dispatched as message fault events.
if (message is ErrorMessage)
dispatchEvent(MessageFaultEvent.createEvent(ErrorMessage(message)));
else
dispatchEvent(MessageEvent.createEvent(MessageEvent.MESSAGE, message));
}
//--------------------------------------------------------------------------
//
// Protected Methods
//
//--------------------------------------------------------------------------
/**
* Returns a subscribe message.
* This method should be overridden by subclasses if they need custom
* subscribe messages.
*
* @return The subscribe CommandMessage.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
protected function buildSubscribeMessage():CommandMessage
{
var msg:CommandMessage = new CommandMessage();
msg.operation = CommandMessage.SUBSCRIBE_OPERATION;
msg.clientId = clientId;
msg.destination = destination;
if (maxFrequency > 0)
msg.headers[CommandMessage.MAX_FREQUENCY_HEADER] = maxFrequency;
return msg;
}
/**
* Returns an unsubscribe message.
* This method should be overridden by subclasses if they need custom
* unsubscribe messages.
*
* @param preserveDurable - when true, durable JMS subscriptions are not destroyed
* allowing consumers to later resubscribe and receive missed messages
*
* @return The unsubscribe CommandMessage.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
protected function buildUnsubscribeMessage(preserveDurable:Boolean):CommandMessage
{
var msg:CommandMessage = new CommandMessage();
msg.operation = CommandMessage.UNSUBSCRIBE_OPERATION;
msg.clientId = clientId;
msg.destination = destination;
// only include the PRESERVE_DURABLE_HEADER param in the message if
// its value is true
if (preserveDurable)
msg.headers[CommandMessage.PRESERVE_DURABLE_HEADER] = preserveDurable;
return msg;
}
/**
* @private
* Attempt to resubscribe.
* This can be called directly or from a Timer's event handler.
*
* @param event The timer event for resubscribe attempts.
*/
protected function resubscribe(event:TimerEvent):void
{
// If we're past our limit of attempts, fault out.
if ((_resubscribeAttempts != -1) &&
(_currentAttempt >= _resubscribeAttempts))
{
stopResubscribeTimer();
_shouldBeSubscribed = false;
var errMsg:ErrorMessage = new ErrorMessage();
errMsg.faultCode = "Client.Error.Subscribe";
errMsg.faultString = resourceManager.getString(
"messaging", "consumerSubscribeError");
errMsg.faultDetail = resourceManager.getString(
"messaging", "failedToSubscribe");
errMsg.correlationId = _subscribeMsg.messageId;
fault(errMsg, _subscribeMsg);
return;
}
if (Log.isDebug())
_log.debug("'{0}' {1} trying to resubscribe.", id, _agentType);
_resubscribeTimer.delay = _resubscribeInterval;
_currentAttempt++;
// Send the resubscribe message, skipping the MessageAgent's queue that blocks
// messages until the clientId is set.
internalSend(_subscribeMsg, false);
}
/**
* @private
* This method will start a timer which attempts to resubscribe
* periodically.
*/
protected function startResubscribeTimer():void
{
if (_shouldBeSubscribed && (_resubscribeTimer == null))
{
// If we're configured for resubscribe start up the timer.
if ((_resubscribeAttempts != 0) && (_resubscribeInterval > 0))
{
if (Log.isDebug())
_log.debug("'{0}' {1} starting resubscribe timer.", id, _agentType);
/*
* Initially, the timeout is set to 1 so we try to
* reconnect immediately (perhaps to a different channel).
* after that, it will poll at the configured time interval.
*
*/
_resubscribeTimer = new Timer(1);
_resubscribeTimer.addEventListener(TimerEvent.TIMER, resubscribe);
_resubscribeTimer.start();
_currentAttempt = 0;
}
}
}
/**
* @private
* Stops a resubscribe timer if one is running.
*/
protected function stopResubscribeTimer():void
{
if (_resubscribeTimer != null)
{
if (Log.isDebug())
_log.debug("'{0}' {1} stopping resubscribe timer.", id, _agentType);
_resubscribeTimer.removeEventListener(TimerEvent.TIMER, resubscribe);
_resubscribeTimer.reset();
_resubscribeTimer = null;
}
}
}
}