/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using Apache.NMS.Policies;
using Apache.NMS.Util;
namespace Apache.NMS.ZMQ
{
///
/// A Factory that can estbalish NMS connections to ZMQ subscriber
///
public class ConnectionFactory : IConnectionFactory
{
private Uri brokerUri;
private string clientId;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private const string DEFAULT_BROKER_URL = "tcp://localhost:5556";
private const string ENV_BROKER_URL = "ZMQ_BROKER_URL";
public ConnectionFactory()
: this(GetDefaultBrokerUrl())
{
}
public ConnectionFactory(string rawBrokerUri)
: this(rawBrokerUri, null)
{
}
public ConnectionFactory(string rawBrokerUri, string clientID)
: this(URISupport.CreateCompatibleUri(rawBrokerUri), clientID)
{
}
public ConnectionFactory(Uri rawBrokerUri)
: this(rawBrokerUri, null)
{
}
public ConnectionFactory(Uri rawBrokerUri, string clientID)
{
this.BrokerUri = rawBrokerUri;
if(this.BrokerUri.Port < 1)
{
throw new NMSConnectionException("Missing connection port number.");
}
if(null == clientID)
{
clientID = Guid.NewGuid().ToString();
}
this.ClientId = clientID;
}
///
/// Get the default connection Uri if none is specified.
/// The environment variable is checked first.
///
///
private static string GetDefaultBrokerUrl()
{
string brokerUrl = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
if(string.IsNullOrEmpty(brokerUrl))
{
brokerUrl = DEFAULT_BROKER_URL;
}
return brokerUrl;
}
#region IConnectionFactory Members
///
/// Creates a new connection to ZMQ.
///
public IConnection CreateConnection()
{
return CreateConnection(string.Empty, string.Empty, false);
}
///
/// Creates a new connection to ZMQ.
///
public IConnection CreateConnection(string userName, string password)
{
return CreateConnection(userName, password, false);
}
///
/// Creates a new connection to ZMQ.
///
public IConnection CreateConnection(string userName, string password, bool useLogging)
{
Connection connection = new Connection(this.BrokerUri);
connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
connection.ConsumerTransformer = this.consumerTransformer;
connection.ProducerTransformer = this.producerTransformer;
connection.ClientId = this.ClientId;
return connection;
}
///
/// Get/or set the broker Uri.
///
public Uri BrokerUri
{
get { return this.brokerUri; }
set
{
Tracer.InfoFormat("BrokerUri set {0}", value.OriginalString);
this.brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "zmq:"));
}
}
public string ClientId
{
get { return this.clientId; }
set { this.clientId = value; }
}
///
/// Get/or set the redelivery policy that new IConnection objects are
/// assigned upon creation.
///
public IRedeliveryPolicy RedeliveryPolicy
{
get { return this.redeliveryPolicy; }
set
{
if(value != null)
{
this.redeliveryPolicy = value;
}
}
}
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
{
get { return this.consumerTransformer; }
set { this.consumerTransformer = value; }
}
private ProducerTransformerDelegate producerTransformer;
public ProducerTransformerDelegate ProducerTransformer
{
get { return this.producerTransformer; }
set { this.producerTransformer = value; }
}
#endregion
}
}