////////////////////////////////////////////////////////////////////////////////
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////////
package mx.messaging
{
import flash.errors.IllegalOperationError;
import flash.events.TimerEvent;
import flash.utils.Timer;
import mx.core.mx_internal;
import mx.events.PropertyChangeEvent;
import mx.logging.Log;
import mx.messaging.config.ConfigMap;
import mx.messaging.config.ServerConfig;
import mx.messaging.errors.InvalidDestinationError;
import mx.messaging.errors.MessagingError;
import mx.messaging.events.ChannelEvent;
import mx.messaging.events.ChannelFaultEvent;
import mx.messaging.events.MessageAckEvent;
import mx.messaging.events.MessageEvent;
import mx.messaging.events.MessageFaultEvent;
import mx.messaging.messages.AbstractMessage;
import mx.messaging.messages.AcknowledgeMessage;
import mx.messaging.messages.AsyncMessage;
import mx.messaging.messages.CommandMessage;
import mx.messaging.messages.ErrorMessage;
import mx.messaging.messages.IMessage;
import mx.resources.IResourceManager;
import mx.resources.ResourceManager;
use namespace mx_internal;
[ResourceBundle("messaging")]
/**
* The AbstractProducer is the base class for the Producer and
* MultiTopicConsumer classes.
* You use these classes to push messages to the server.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public class AbstractProducer extends MessageAgent
{
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* @private
*/
public function AbstractProducer()
{
super();
}
//--------------------------------------------------------------------------
//
// Variables
//
//--------------------------------------------------------------------------
/**
* @private
* A connect message to use for (re)connect attempts which allows the underlying
* ChannelSet to de-dupe if multiple reconnects queue up at the channel layer.
*/
private var _connectMsg:CommandMessage;
/**
* @private
* This is the current number of reconnect attempts that we've done.
*/
private var _currentAttempt:int;
/**
* @private
* The timer used for reconnect attempts.
*/
private var _reconnectTimer:Timer;
/**
* @private
* Indicates whether this agent should be connected or not.
*/
protected var _shouldBeConnected:Boolean;
/**
* @private
*/
private var resourceManager:IResourceManager =
ResourceManager.getInstance();
//--------------------------------------------------------------------------
//
// Properties
//
//--------------------------------------------------------------------------
//----------------------------------
// autoConnect
//----------------------------------
/**
* @private
*/
private var _autoConnect:Boolean = true;
[Bindable(event="propertyChange")]
/**
* If true
the Producer automatically connects to its destination the
* first time the send()
method is called.
* If false
then the connect()
method must be called explicitly to
* establish a connection to the destination.
* By default this property is true
, but applications that need to operate
* in an offline mode may set this to false
to prevent the send()
method
* from connecting implicitly.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function get autoConnect():Boolean
{
return _autoConnect;
}
/**
* @private
*/
public function set autoConnect(value:Boolean):void
{
if (_autoConnect != value)
{
var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "autoConnect", _autoConnect, value);
_autoConnect = value;
dispatchEvent(event);
}
}
//----------------------------------
// defaultHeaders
//----------------------------------
/**
* @private
*/
private var _defaultHeaders:Object;
[Bindable(event="propertyChange")]
/**
* The default headers to apply to messages sent by the Producer.
* Any default headers that do not exist in the message will be created.
* If the message already contains a matching header, the value in the
* message takes precedence and the default header value is ignored.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function get defaultHeaders():Object
{
return _defaultHeaders;
}
/**
* @private
*/
public function set defaultHeaders(value:Object):void
{
if (_defaultHeaders != value)
{
var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "defaultHeaders", _defaultHeaders, value);
_defaultHeaders = value;
dispatchEvent(event);
}
}
//----------------------------------
// priority
//----------------------------------
/**
* @private
*/
private var _priority:int = -1;
[Bindable(event="propertyChange")]
/**
* The default message priority for the messages sent by the Producer. The
* valid values are 0 to 9 (0 being lowest) and -1 means that the Producer
* does not have a priority set. Note that if the message already has a
* priority defined, that takes precedence over Producer's priority.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function get priority():int
{
return _priority;
}
/**
* @private
*/
public function set priority(value:int):void
{
if (_priority != value)
{
value = value < 0? 0 : value > 9? 9 : value;
var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "priority", _priority, value);
_priority = value;
dispatchEvent(event);
}
}
//----------------------------------
// reconnectAttempts
//----------------------------------
/**
* @private
*/
private var _reconnectAttempts:int;
[Bindable(event="propertyChange")]
/**
* The number of reconnect attempts that the Producer makes in the event
* that the destination is unavailable or the connection to the destination closes.
* A value of -1 enables infinite attempts.
* A value of zero disables reconnect attempts.
*
*
Reconnect attempts are made at a constant rate according to the reconnect interval * value. When a reconnect attempt is made if the underlying channel for the Producer is not * connected or attempting to connect the channel will start a connect attempt. * Subsequent Producer reconnect attempts that occur while the underlying * channel connect attempt is outstanding are effectively ignored until * the outstanding channel connect attempt succeeds or fails.
* * @see mx.messaging.Producer#reconnectInterval * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function get reconnectAttempts():int { return _reconnectAttempts; } /** * @private */ public function set reconnectAttempts(value:int):void { if (_reconnectAttempts != value) { if (value == 0) stopReconnectTimer(); var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "reconnectAttempts", _reconnectAttempts, value); _reconnectAttempts = value; dispatchEvent(event); } } //---------------------------------- // reconnectInterval //---------------------------------- /** * @private */ private var _reconnectInterval:int; [Bindable(event="propertyChange")] /** * The number of milliseconds between reconnect attempts. * If a Producer doesn't receive an acknowledgement for a connect * attempt, it will wait the specified number of milliseconds before * making a subsequent reconnect attempt. * Setting the value to zero disables reconnect attempts. * *Reconnect attempts are made at a constant rate according to this * value. When a reconnect attempt is made if the underlying channel for the Producer is not * connected or attempting to connect the channel will start a connect attempt. * Subsequent Producer reconnect attempts that occur while the underlying * channel connect attempt is outstanding are effectively ignored until * the outstanding channel connect attempt succeeds or fails.
* * @see mx.messaging.Producer#reconnectInterval * * @throws ArgumentError If the assigned value is negative. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function get reconnectInterval():int { return _reconnectInterval; } /** * @private */ public function set reconnectInterval(value:int):void { if (_reconnectInterval != value) { if (value < 0) { var message:String = resourceManager.getString( "messaging", "reconnectIntervalNegative"); throw new ArgumentError(message); } else if (value == 0) { stopReconnectTimer(); } else if (_reconnectTimer != null) { _reconnectTimer.delay = value; } var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "reconnectInterval", _reconnectInterval, value); _reconnectInterval = value; dispatchEvent(event); } } //-------------------------------------------------------------------------- // // Overridden Public Methods // //-------------------------------------------------------------------------- /** * @private * Custom processing for message acknowledgments. * Specifically, re/connect acknowledgements. * * @param ackMsg The AcknowledgeMessage. * * @param msg The original message. */ override public function acknowledge(ackMsg:AcknowledgeMessage, msg:IMessage):void { // Ignore acks for any outstanding messages that return after disconnect() is invoked. if (_disconnectBarrier) return; super.acknowledge(ackMsg, msg); if (msg is CommandMessage && CommandMessage(msg).operation == CommandMessage.TRIGGER_CONNECT_OPERATION) stopReconnectTimer(); } /** * @private * The Producer suppresses ErrorMessage processing if the fault is for a connect * attempt that is being retried. * * @param errMsg The ErrorMessage describing the fault. * * @param msg The original message. */ override public function fault(errMsg:ErrorMessage, msg:IMessage):void { internalFault(errMsg, msg); } /** * @private * Custom processing to start up a reconnect timer if our channel is * disconnected when we should be connected. * * @param event The ChannelEvent. */ override public function channelDisconnectHandler(event:ChannelEvent):void { super.channelDisconnectHandler(event); if (_shouldBeConnected && !event.rejected) startReconnectTimer(); } /** * @private * Custom processing to start up a reconnect timer if our channel faults * when we should be connected. * * @param event The ChannelFaultEvent. */ override public function channelFaultHandler(event:ChannelFaultEvent):void { super.channelFaultHandler(event); if (_shouldBeConnected && !event.rejected && !event.channel.connected) startReconnectTimer(); } /** * Disconnects the Producer from its remote destination. * This method does not wait for outstanding network operations to complete. * After invokingdisconnect()
, the Producer will report that it is not
* connected and it will not receive any outstanding message acknowledgements or faults.
* Disconnecting stops automatic reconnect attempts if they are running.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
override public function disconnect():void
{
_shouldBeConnected = false; // Prevent reconnect attempts.
stopReconnectTimer();
super.disconnect();
}
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
* Connects the Producer to its target destination.
* When a connection is established the connected
property will
* change to true
and this property is bindable and generates
* PropertyChangeEvent
s.
* The internal TRIGGER_CONNECT_OPERATION CommandMessage that is sent will result
* in an acknowledge or fault event depending upon whether the underlying channel
* establishes its connection.
*
* @throws mx.messaging.errors.InvalidDestinationError If no destination is set.
*
* @example
* * var producer:Producer = new Producer(); * producer.destination = "TestTopic"; * producer.addEventListener(PropertyChangeEvent.PROPERTY_CHANGE, handleConnect); * producer.connect(); ** * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function connect():void { if (!connected) { _shouldBeConnected = true; if (_connectMsg == null) _connectMsg = buildConnectMessage(); internalSend(_connectMsg, false); } } //-------------------------------------------------------------------------- // // Methods // //-------------------------------------------------------------------------- /** * Sends the specified message to its destination. * If the producer is being used for publish/subscribe messaging, only messages of type AsyncMessage * should be sent unless a custom message type is being used and the * message destination on the server has been configured to process the * custom message type. * * @param message The Message to send. * * @throws mx.messaging.errors.InvalidDestinationError If no destination is set. * * @example *
* var producer:Producer = new Producer(); * producer.destination = "TestTopic"; * var msg:AsyncMessage = new AsyncMessage(); * msg.body = "test message"; * producer.send(msg); ** * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ public function send(message:IMessage):void { if (!connected && autoConnect) _shouldBeConnected = true; if (defaultHeaders != null) { for (var header:String in defaultHeaders) { if (!message.headers.hasOwnProperty(header)) message.headers[header] = defaultHeaders[header]; } } if (!connected && !autoConnect) { _shouldBeConnected = false; var errMsg2:ErrorMessage = new ErrorMessage(); errMsg2.faultCode = "Client.Error.MessageSend"; errMsg2.faultString = resourceManager.getString( "messaging", "producerSendError"); errMsg2.faultDetail = resourceManager.getString( "messaging", "producerSendErrorDetails"); errMsg2.correlationId = message.messageId; internalFault(errMsg2, message, false, true); } else { if (Log.isInfo()) _log.info("'{0}' {1} sending message '{2}'", id, _agentType, message.messageId); internalSend(message); } } //-------------------------------------------------------------------------- // // Internal Methods // //-------------------------------------------------------------------------- /** * @private * The Producer suppresses ErrorMessage processing if the fault is for a connect * attempt that is being retried. * * @param errMsg The ErrorMessage describing the fault. * * @param msg The original message. * * @param routeToStore currently not used. Previously was a flag used to * indicate if the faulted message shoudl be stored offline to retry. * * @param ignoreDisconnectBarrier If true the message is faulted regardless * of whether disconnect() has been invoked. Generally a disconnect() will * suppress pending acks and faults. */ mx_internal function internalFault(errMsg:ErrorMessage, msg:IMessage, routeToStore:Boolean = true, ignoreDisconnectBarrier:Boolean = false):void { // Ignore faults for any outstanding messages that return after disconnect() is invoked. if (_disconnectBarrier && !ignoreDisconnectBarrier) return; if (msg is CommandMessage && CommandMessage(msg).operation == CommandMessage.TRIGGER_CONNECT_OPERATION) { if (_reconnectTimer == null) { // If this error correlates to our current connect message, // we should no longer be connected. if ((_connectMsg != null) && (errMsg.correlationId == _connectMsg.messageId)) { _shouldBeConnected = false; // Improve the messaging. var errMsg2:ErrorMessage = buildConnectErrorMessage(); errMsg2.rootCause = errMsg.rootCause; super.fault(errMsg2, msg); } else { super.fault(errMsg, msg); } } // Else, suppress the fault dispatch because the reconnect timer // is running and will generate a fault when it runs out of // allowed reconnect attempts. } else { super.fault(errMsg, msg); } } //-------------------------------------------------------------------------- // // Protected Methods // //-------------------------------------------------------------------------- /** * @private * Attempt to reconnect. This can be called directly or * from a Timer's event handler. * * @param event The timer event for reconnect attempts. */ protected function reconnect(event:TimerEvent):void { // If we're past our limit of attempts, fault out. if ((_reconnectAttempts != -1) && (_currentAttempt >= _reconnectAttempts)) { stopReconnectTimer(); _shouldBeConnected = false; fault(buildConnectErrorMessage(), _connectMsg); return; } if (Log.isDebug()) _log.debug("'{0}' {1} trying to reconnect.", id, _agentType); _reconnectTimer.delay = _reconnectInterval; _currentAttempt++; if (_connectMsg == null) _connectMsg = buildConnectMessage(); internalSend(_connectMsg, false); } /** * @private * This method will start a timer which attempts to reconnect * periodically. */ protected function startReconnectTimer():void { if (_shouldBeConnected && (_reconnectTimer == null)) { // If we're configured for reconnect set up the timer. if ((_reconnectAttempts != 0) && (_reconnectInterval > 0)) { if (Log.isDebug()) _log.debug("'{0}' {1} starting reconnect timer.", id, _agentType); /* * Initially, the timeout is set to 1 so we try to * reconnect immediately (perhaps to a different channel). * after that, it will poll at the configured time interval. * * @langversion 3.0 * @playerversion Flash 9 * @playerversion AIR 1.1 * @productversion BlazeDS 4 * @productversion LCDS 3 */ _reconnectTimer = new Timer(1); _reconnectTimer.addEventListener(TimerEvent.TIMER, reconnect); _reconnectTimer.start(); _currentAttempt = 0; } } } /** * @private * Stops a reconnect timer if one is running. */ protected function stopReconnectTimer():void { if (_reconnectTimer != null) { if (Log.isDebug()) _log.debug("'{0}' {1} stopping reconnect timer.", id, _agentType); _reconnectTimer.removeEventListener(TimerEvent.TIMER, reconnect); _reconnectTimer.reset(); _reconnectTimer = null; } } //-------------------------------------------------------------------------- // // Private Methods // //-------------------------------------------------------------------------- /** * @private * Builds an ErrorMessage for a failed connect attempt. * * @return The ErrorMessage. */ private function buildConnectErrorMessage():ErrorMessage { var errMsg:ErrorMessage = new ErrorMessage(); errMsg.faultCode = "Client.Error.Connect"; errMsg.faultString = resourceManager.getString( "messaging", "producerConnectError"); errMsg.faultDetail = resourceManager.getString( "messaging", "failedToConnect"); errMsg.correlationId = _connectMsg.messageId; return errMsg; } /** * @private * Builds a 'connect' message to use for a connect attempt. * * @return The 'connect' CommandMessage. */ private function buildConnectMessage():CommandMessage { var msg:CommandMessage = new CommandMessage(); msg.operation = CommandMessage.TRIGGER_CONNECT_OPERATION; msg.clientId = clientId; msg.destination = destination; return msg; } } }