/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using ZContext = ZMQ.Context; namespace Apache.NMS.ZMQ { /// /// Represents a NMS connection ZMQ. /// /// public class Connection : IConnection { private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; private IRedeliveryPolicy redeliveryPolicy; private ConnectionMetaData metaData = null; private bool closed = true; private string clientId; private Uri brokerUri; /// /// ZMQ context /// static private ZContext _context = new ZContext(1); /// /// Starts message delivery for this connection. /// public void Start() { closed = false; } /// /// This property determines if the asynchronous message delivery of incoming /// messages has been started for this connection. /// public bool IsStarted { get { return !closed; } } /// /// Stop message delivery for this connection. /// public void Stop() { closed = true; } /// /// Creates a new session to work on this connection /// public ISession CreateSession() { return CreateSession(acknowledgementMode); } /// /// Creates a new session to work on this connection /// public ISession CreateSession(AcknowledgementMode mode) { return new Session(this, mode); } public void Dispose() { Close(); } public void Close() { Stop(); } public void PurgeTempDestinations() { } /// /// The default timeout for network requests. /// public TimeSpan RequestTimeout { get { return NMSConstants.defaultRequestTimeout; } set { } } public AcknowledgementMode AcknowledgementMode { get { return acknowledgementMode; } set { acknowledgementMode = value; } } /// /// Get/or set the broker Uri. /// public Uri BrokerUri { get { return brokerUri; } set { brokerUri = value; } } /// /// Get/or set the client Id /// public string ClientId { get { return clientId; } set { clientId = value; } } /// /// Get/or set the redelivery policy for this connection. /// public IRedeliveryPolicy RedeliveryPolicy { get { return this.redeliveryPolicy; } set { this.redeliveryPolicy = value; } } private ConsumerTransformerDelegate consumerTransformer; public ConsumerTransformerDelegate ConsumerTransformer { get { return this.consumerTransformer; } set { this.consumerTransformer = value; } } private ProducerTransformerDelegate producerTransformer; public ProducerTransformerDelegate ProducerTransformer { get { return this.producerTransformer; } set { this.producerTransformer = value; } } /// /// Gets ZMQ connection context /// static internal ZContext Context { get { return _context; } } /// /// Gets the Meta Data for the NMS Connection instance. /// public IConnectionMetaData MetaData { get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } } /// /// A delegate that can receive transport level exceptions. /// public event ExceptionListener ExceptionListener; /// /// An asynchronous listener that is notified when a Fault tolerant connection /// has been interrupted. /// public event ConnectionInterruptedListener ConnectionInterruptedListener; /// /// An asynchronous listener that is notified when a Fault tolerant connection /// has been resumed. /// public event ConnectionResumedListener ConnectionResumedListener; public void HandleException(System.Exception e) { if(ExceptionListener != null && !this.closed) { ExceptionListener(e); } else { Tracer.Error(e); } } public void HandleTransportInterrupted() { Tracer.Debug("Transport has been Interrupted."); if(this.ConnectionInterruptedListener != null && !this.closed) { try { this.ConnectionInterruptedListener(); } catch { } } } public void HandleTransportResumed() { Tracer.Debug("Transport has resumed normal operation."); if(this.ConnectionResumedListener != null && !this.closed) { try { this.ConnectionResumedListener(); } catch { } } } } }