/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Specialized; using Apache.NMS.ActiveMQ.Util; using Apache.NMS.ActiveMQ.Transport; using Apache.NMS.Util; using Apache.NMS.Policies; namespace Apache.NMS.ActiveMQ { /// /// Represents a connection with a message broker /// public class ConnectionFactory : IConnectionFactory { public const string DEFAULT_BROKER_URL = "failover:tcp://localhost:61616"; public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL"; private static event ExceptionListener onException; private Uri brokerUri; private string connectionUserName; private string connectionPassword; private string clientId; private string clientIdPrefix; private IdGenerator clientIdGenerator; private bool useCompression; private bool copyMessageOnSend = true; private bool dispatchAsync = true; private bool asyncSend; private bool asyncClose; private bool alwaysSyncSend; private bool sendAcksAsync = true; private int producerWindowSize = 0; private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout; private bool messagePrioritySupported = true; private bool watchTopicAdvisories = true; private bool optimizeAcknowledge; private long optimizeAcknowledgeTimeOut = 300; private long optimizedAckScheduledAckInterval = 0; private bool useRetroactiveConsumer; private bool exclusiveConsumer; private long consumerFailoverRedeliveryWaitPeriod = 0; private bool checkForDuplicates = true; private bool transactedIndividualAck = false; private bool nonBlockingRedelivery = false; private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); private PrefetchPolicy prefetchPolicy = new PrefetchPolicy(); private ICompressionPolicy compressionPolicy = new CompressionPolicy(); static ConnectionFactory() { TransportFactory.OnException += ConnectionFactory.ExceptionHandler; } public static string GetDefaultBrokerUrl() { #if (PocketPC||NETCF||NETCF_2_0) return DEFAULT_BROKER_URL; #else return Environment.GetEnvironmentVariable(ENV_BROKER_URL) ?? DEFAULT_BROKER_URL; #endif } public ConnectionFactory() : this(GetDefaultBrokerUrl()) { } public ConnectionFactory(string brokerUri) : this(brokerUri, null) { } public ConnectionFactory(string brokerUri, string clientID) : this(URISupport.CreateCompatibleUri(brokerUri), clientID) { } public ConnectionFactory(Uri brokerUri) : this(brokerUri, null) { } public ConnectionFactory(Uri brokerUri, string clientID) { this.BrokerUri = brokerUri; this.ClientId = clientID; } public IConnection CreateConnection() { return CreateActiveMQConnection(); } public IConnection CreateConnection(string userName, string password) { return CreateActiveMQConnection(userName, password); } protected virtual Connection CreateActiveMQConnection() { return CreateActiveMQConnection(connectionUserName, connectionPassword); } protected virtual Connection CreateActiveMQConnection(string userName, string password) { Connection connection = null; try { Tracer.InfoFormat("Connecting to: {0}", brokerUri.ToString()); ITransport transport = TransportFactory.CreateTransport(brokerUri); connection = CreateActiveMQConnection(transport); ConfigureConnection(connection); connection.UserName = userName; connection.Password = password; if(this.clientId != null) { connection.DefaultClientId = this.clientId; } return connection; } catch(NMSException) { try { connection.Close(); } catch { } throw; } catch(Exception e) { try { connection.Close(); } catch { } throw NMSExceptionSupport.Create("Could not connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e); } } protected virtual Connection CreateActiveMQConnection(ITransport transport) { return new Connection(this.brokerUri, transport, this.ClientIdGenerator); } #region ConnectionFactory Properties /// /// Get/or set the broker Uri. /// public Uri BrokerUri { get { return brokerUri; } set { Tracer.Info("BrokerUri set = " + value.OriginalString); if(value.OriginalString.StartsWith("activemqnettx:")) { brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "activemqnettx:")); } else { brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "activemq:")); } if(!String.IsNullOrEmpty(brokerUri.Query) && !brokerUri.OriginalString.EndsWith(")")) { // Since the Uri class will return the end of a Query string found in a Composite // URI we must ensure that we trim that off before we proceed. string query = brokerUri.Query.Substring(brokerUri.Query.LastIndexOf(")") + 1); StringDictionary properties = URISupport.ParseQuery(query); StringDictionary connection = URISupport.ExtractProperties(properties, "connection."); StringDictionary nms = URISupport.ExtractProperties(properties, "nms."); IntrospectionSupport.SetProperties(this, connection, "connection."); IntrospectionSupport.SetProperties(this, nms, "nms."); brokerUri = URISupport.CreateRemainingUri(brokerUri, properties); } } } public string UserName { get { return connectionUserName; } set { connectionUserName = value; } } public string Password { get { return connectionPassword; } set { connectionPassword = value; } } public string ClientId { get { return clientId; } set { clientId = value; } } public string ClientIdPrefix { get { return clientIdPrefix; } set { clientIdPrefix = value; } } public bool UseCompression { get { return this.useCompression; } set { this.useCompression = value; } } public bool CopyMessageOnSend { get { return copyMessageOnSend; } set { copyMessageOnSend = value; } } public bool AlwaysSyncSend { get { return alwaysSyncSend; } set { alwaysSyncSend = value; } } public bool AsyncClose { get { return asyncClose; } set { asyncClose = value; } } public bool SendAcksAsync { get { return sendAcksAsync; } set { sendAcksAsync = value; } } public bool AsyncSend { get { return asyncSend; } set { asyncSend = value; } } public bool DispatchAsync { get { return this.dispatchAsync; } set { this.dispatchAsync = value; } } public bool WatchTopicAdvisories { get { return this.watchTopicAdvisories; } set { this.watchTopicAdvisories = value; } } public bool MessagePrioritySupported { get { return this.messagePrioritySupported; } set { this.messagePrioritySupported = value; } } public int RequestTimeout { get { return (int)this.requestTimeout.TotalMilliseconds; } set { this.requestTimeout = TimeSpan.FromMilliseconds(value); } } public string AckMode { set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); } } public AcknowledgementMode AcknowledgementMode { get { return acknowledgementMode; } set { this.acknowledgementMode = value; } } public int ProducerWindowSize { get { return producerWindowSize; } set { producerWindowSize = value; } } public PrefetchPolicy PrefetchPolicy { get { return this.prefetchPolicy; } set { this.prefetchPolicy = value; } } public IRedeliveryPolicy RedeliveryPolicy { get { return this.redeliveryPolicy; } set { if(value != null) { this.redeliveryPolicy = value; } } } public ICompressionPolicy CompressionPolicy { get { return this.compressionPolicy; } set { if(value != null) { this.compressionPolicy = value; } } } public IdGenerator ClientIdGenerator { set { this.clientIdGenerator = value; } get { lock(this) { if(this.clientIdGenerator == null) { if(this.clientIdPrefix != null) { this.clientIdGenerator = new IdGenerator(this.clientIdPrefix); } else { this.clientIdGenerator = new IdGenerator(); } } return this.clientIdGenerator; } } } public event ExceptionListener OnException { add { onException += value; } remove { if(onException != null) { onException -= value; } } } private ConsumerTransformerDelegate consumerTransformer; /// /// A Delegate that is called each time a Message is dispatched to allow the client to do /// any necessary transformations on the received message before it is delivered. The /// ConnectionFactory sets the provided delegate instance on each Connection instance that /// is created from this factory, each connection in turn passes the delegate along to each /// Session it creates which then passes that along to the Consumers it creates. /// public ConsumerTransformerDelegate ConsumerTransformer { get { return this.consumerTransformer; } set { this.consumerTransformer = value; } } private ProducerTransformerDelegate producerTransformer; /// /// A delegate that is called each time a Message is sent from this Producer which allows /// the application to perform any needed transformations on the Message before it is sent. /// The ConnectionFactory sets the provided delegate instance on each Connection instance that /// is created from this factory, each connection in turn passes the delegate along to each /// Session it creates which then passes that along to the Producers it creates. /// public ProducerTransformerDelegate ProducerTransformer { get { return this.producerTransformer; } set { this.producerTransformer = value; } } public bool OptimizeAcknowledge { get { return this.optimizeAcknowledge; } set { this.optimizeAcknowledge = value; } } public long OptimizeAcknowledgeTimeOut { get { return this.optimizeAcknowledgeTimeOut; } set { this.optimizeAcknowledgeTimeOut = value; } } public long OptimizedAckScheduledAckInterval { get { return this.optimizedAckScheduledAckInterval; } set { this.optimizedAckScheduledAckInterval = value; } } public bool UseRetroactiveConsumer { get { return this.useRetroactiveConsumer; } set { this.useRetroactiveConsumer = value; } } public bool ExclusiveConsumer { get { return this.exclusiveConsumer; } set { this.exclusiveConsumer = value; } } public long ConsumerFailoverRedeliveryWaitPeriod { get { return this.consumerFailoverRedeliveryWaitPeriod; } set { this.consumerFailoverRedeliveryWaitPeriod = value; } } public bool CheckForDuplicates { get { return this.checkForDuplicates; } set { this.checkForDuplicates = value; } } public bool TransactedIndividualAck { get { return this.transactedIndividualAck; } set { this.transactedIndividualAck = value; } } public bool NonBlockingRedelivery { get { return this.nonBlockingRedelivery; } set { this.nonBlockingRedelivery = value; } } public int AuditDepth { get { return this.auditDepth; } set { this.auditDepth = value; } } public int AuditMaximumProducerNumber { get { return this.auditMaximumProducerNumber; } set { this.auditMaximumProducerNumber = value; } } #endregion protected virtual void ConfigureConnection(Connection connection) { connection.AsyncClose = this.AsyncClose; connection.AsyncSend = this.AsyncSend; connection.CopyMessageOnSend = this.CopyMessageOnSend; connection.AlwaysSyncSend = this.AlwaysSyncSend; connection.DispatchAsync = this.DispatchAsync; connection.SendAcksAsync = this.SendAcksAsync; connection.AcknowledgementMode = this.acknowledgementMode; connection.UseCompression = this.useCompression; connection.RequestTimeout = this.requestTimeout; connection.ProducerWindowSize = this.producerWindowSize; connection.MessagePrioritySupported = this.messagePrioritySupported; connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy; connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy; connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy; connection.ConsumerTransformer = this.consumerTransformer; connection.ProducerTransformer = this.producerTransformer; connection.WatchTopicAdvisories = this.watchTopicAdvisories; connection.OptimizeAcknowledge = this.optimizeAcknowledge; connection.OptimizeAcknowledgeTimeOut = this.optimizeAcknowledgeTimeOut; connection.OptimizedAckScheduledAckInterval = this.optimizedAckScheduledAckInterval; connection.UseRetroactiveConsumer = this.useRetroactiveConsumer; connection.ExclusiveConsumer = this.exclusiveConsumer; connection.ConsumerFailoverRedeliveryWaitPeriod = this.consumerFailoverRedeliveryWaitPeriod; connection.CheckForDuplicates = this.checkForDuplicates; connection.TransactedIndividualAck = this.transactedIndividualAck; connection.NonBlockingRedelivery = this.nonBlockingRedelivery; connection.AuditDepth = this.auditDepth; connection.AuditMaximumProducerNumber = this.auditMaximumProducerNumber; } protected static void ExceptionHandler(Exception ex) { if(ConnectionFactory.onException != null) { ConnectionFactory.onException(ex); } } } }