/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using Apache.NMS.Stomp.Util; using Apache.NMS.Stomp.Commands; using Apache.NMS.Stomp.Transport; using Apache.NMS; using Apache.NMS.Util; using Apache.NMS.Policies; namespace Apache.NMS.Stomp { /// /// Represents a connection with a message broker /// public class ConnectionFactory : IConnectionFactory { public const string DEFAULT_BROKER_URL = "tcp://localhost:61613"; public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL"; private static event ExceptionListener onException; private Uri brokerUri; private string connectionUserName; private string connectionPassword; private string clientId; private string clientIdPrefix; private IdGenerator clientIdGenerator; private bool copyMessageOnSend = true; private bool asyncSend; private bool alwaysSyncSend; private bool sendAcksAsync=true; private bool dispatchAsync=true; private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); private PrefetchPolicy prefetchPolicy = new PrefetchPolicy(); static ConnectionFactory() { TransportFactory.OnException += ConnectionFactory.ExceptionHandler; } public static string GetDefaultBrokerUrl() { #if (PocketPC||NETCF||NETCF_2_0) return DEFAULT_BROKER_URL; #else return Environment.GetEnvironmentVariable(ENV_BROKER_URL) ?? DEFAULT_BROKER_URL; #endif } public ConnectionFactory() : this(GetDefaultBrokerUrl()) { } public ConnectionFactory(string brokerUri) : this(brokerUri, null) { } public ConnectionFactory(string brokerUri, string clientID) : this(new Uri(brokerUri), clientID) { } public ConnectionFactory(Uri brokerUri) : this(brokerUri, null) { } public ConnectionFactory(Uri brokerUri, string clientID) { this.BrokerUri = brokerUri; this.ClientId = clientID; } public IConnection CreateConnection() { return CreateConnection(connectionUserName, connectionPassword); } public IConnection CreateConnection(string userName, string password) { Connection connection = null; try { // Strip off the activemq prefix, if it exists. Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "stomp:")); Tracer.InfoFormat("Connecting to: {0}", uri.ToString()); ITransport transport = TransportFactory.CreateTransport(uri); connection = new Connection(uri, transport, this.ClientIdGenerator); connection.UserName = userName; connection.Password = password; ConfigureConnection(connection); if(this.clientId != null) { connection.DefaultClientId = this.clientId; } connection.ITransport.Start(); return connection; } catch(NMSException e) { try { connection.Close(); } catch { } throw e; } catch(Exception e) { try { connection.Close(); } catch { } throw NMSExceptionSupport.Create("Could not connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e); } } #region ConnectionFactory Properties /// /// Get/or set the broker Uri. /// public Uri BrokerUri { get { return brokerUri; } set { brokerUri = value; Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "stomp:")); URISupport.CompositeData c = URISupport.parseComposite(uri); URISupport.SetProperties(this, c.Parameters, "connection."); URISupport.SetProperties(this.PrefetchPolicy, c.Parameters, "nms.PrefetchPolicy."); URISupport.SetProperties(this.RedeliveryPolicy, c.Parameters, "nms.RedeliveryPolicy."); } } public string UserName { get { return connectionUserName; } set { connectionUserName = value; } } public string Password { get { return connectionPassword; } set { connectionPassword = value; } } public string ClientId { get { return clientId; } set { clientId = value; } } public string ClientIdPrefix { get { return clientIdPrefix; } set { clientIdPrefix = value; } } public bool CopyMessageOnSend { get { return copyMessageOnSend; } set { copyMessageOnSend = value; } } public bool AlwaysSyncSend { get { return alwaysSyncSend; } set { alwaysSyncSend = value; } } public bool SendAcksAsync { get { return sendAcksAsync; } set { sendAcksAsync = value; } } public bool AsyncSend { get { return asyncSend; } set { asyncSend = value; } } public string AckMode { set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); } } public bool DispatchAsync { get { return this.dispatchAsync; } set { this.dispatchAsync = value; } } public AcknowledgementMode AcknowledgementMode { get { return acknowledgementMode; } set { this.acknowledgementMode = value; } } public PrefetchPolicy PrefetchPolicy { get { return this.prefetchPolicy; } set { this.prefetchPolicy = value; } } public IRedeliveryPolicy RedeliveryPolicy { get { return this.redeliveryPolicy; } set { if(value != null) { this.redeliveryPolicy = value; } } } public IdGenerator ClientIdGenerator { set { this.clientIdGenerator = value; } get { lock(this) { if(this.clientIdGenerator == null) { if(this.clientIdPrefix != null) { this.clientIdGenerator = new IdGenerator(this.clientIdPrefix); } else { this.clientIdGenerator = new IdGenerator(); } } return this.clientIdGenerator; } } } public event ExceptionListener OnException { add { onException += value; } remove { if(onException != null) { onException -= value; } } } #endregion protected virtual void ConfigureConnection(Connection connection) { connection.AsyncSend = this.AsyncSend; connection.CopyMessageOnSend = this.CopyMessageOnSend; connection.AlwaysSyncSend = this.AlwaysSyncSend; connection.SendAcksAsync = this.SendAcksAsync; connection.DispatchAsync = this.DispatchAsync; connection.AcknowledgementMode = this.acknowledgementMode; connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy; connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy; } protected static void ExceptionHandler(Exception ex) { if(ConnectionFactory.onException != null) { ConnectionFactory.onException(ex); } } } }