/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ // TODO: flow control // timeout handling // transactions // check if should split into separate input and output classes (little overlap) namespace Apache.Qpid.Channel { using System; using System.Collections; using System.Collections.Generic; using System.ServiceModel; using System.ServiceModel.Channels; using System.Text; using System.Threading; using System.Globalization; using System.Web; using System.Xml; // the thin interop layer that provides access to the Qpid AMQP client libraries using Apache.Qpid.Interop; using Apache.Qpid.AmqpTypes; /// /// WCF client transport channel for accessing AMQP brokers using the Qpid C++ library /// public class AmqpTransportChannel : ChannelBase, IOutputChannel, IInputChannel { private static readonly EndpointAddress AnonymousAddress = new EndpointAddress("http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous"); private EndpointAddress remoteAddress; private MessageEncoder encoder; private AmqpChannelProperties factoryChannelProperties; private bool shared; private int prefetchLimit; private string encoderContentType; // AMQP subject/routing key private string subject; // Qpid addressing value for "qpid.subject" property private string qpidSubject; private BufferManager bufferManager; private AmqpProperties outputMessageProperties; private InputLink inputLink; private OutputLink outputLink; private bool isInputChannel; private bool streamed; private AsyncTimeSpanCaller asyncOpenCaller; private AsyncTimeSpanCaller asyncCloseCaller; internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int prefetchLimit) : base(factory) { this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory); if (remoteAddress == null) { throw new ArgumentException("Null Endpoint Address"); } this.factoryChannelProperties = channelProperties; this.shared = sharedConnection; this.prefetchLimit = prefetchLimit; this.remoteAddress = remoteAddress; // pull out host, port, queue, and connection arguments string qpidAddress = this.UriToQpidAddress(remoteAddress.Uri, out subject); this.encoder = msgEncoder; string ct = String.Empty; if (this.encoder != null) { ct = this.encoder.ContentType; if (ct != null) { int pos = ct.IndexOf(';'); if (pos != -1) { ct = ct.Substring(0, pos).Trim(); } } else { ct = "application/octet-stream"; } } this.encoderContentType = ct; if (this.factoryChannelProperties.TransferMode == TransferMode.Streamed) { this.streamed = true; } else { if (!(this.factoryChannelProperties.TransferMode == TransferMode.Buffered)) { throw new ArgumentException("TransferMode mode must be \"Streamed\" or \"Buffered\""); } this.streamed = false; } this.bufferManager = BufferManager.CreateBufferManager(maxBufferPoolSize, int.MaxValue); this.asyncOpenCaller = new AsyncTimeSpanCaller(this.OnOpen); this.asyncCloseCaller = new AsyncTimeSpanCaller(this.OnClose); if (this.isInputChannel) { this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, qpidAddress); this.inputLink.PrefetchLimit = this.prefetchLimit; } else { this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, qpidAddress); this.subject = this.outputLink.DefaultSubject; this.qpidSubject = this.outputLink.QpidSubject; } } private delegate bool AsyncTryReceiveCaller(TimeSpan timeout, out Message message); private delegate void AsyncTimeSpanCaller(TimeSpan timeout); EndpointAddress IOutputChannel.RemoteAddress { get { return this.remoteAddress; } } // i.e what you would insert into a ReplyTo header to reach // here. Presumably should be exchange/link and routing info, // rather than the actual input queue name. EndpointAddress IInputChannel.LocalAddress { get { // TODO: something better return AnonymousAddress; } } AmqpProperties OutputMessageProperties { get { if (this.outputMessageProperties == null) { this.outputMessageProperties = this.factoryChannelProperties.DefaultMessageProperties; if (this.outputMessageProperties == null) { this.outputMessageProperties = new AmqpProperties(); } } return this.outputMessageProperties; } } Uri IOutputChannel.Via { get { return this.remoteAddress.Uri; } } public override T GetProperty() { if (typeof(T) == typeof(IInputChannel)) { if (this.isInputChannel) { return (T)(object)this; } } else if (typeof(T) == typeof(IOutputChannel)) { if (!this.isInputChannel) { return (T)(object)this; } } return base.GetProperty(); } public void Send(Message message, TimeSpan timeout) { this.ThrowIfDisposedOrNotOpen(); AmqpChannelHelpers.ValidateTimeout(timeout); try { using (AmqpMessage amqpMessage = this.WcfToQpid(message)) { this.outputLink.Send(amqpMessage, timeout); } } finally { message.Close(); } } public void Send(Message message) { this.Send(message, this.DefaultSendTimeout); } public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) { this.ThrowIfDisposedOrNotOpen(); AmqpChannelHelpers.ValidateTimeout(timeout); try { using (AmqpMessage amqpMessage = this.WcfToQpid(message)) { return this.outputLink.BeginSend(amqpMessage, timeout, callback, state); } } finally { message.Close(); } } public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state) { return this.BeginSend(message, this.DefaultSendTimeout, callback, state); } public void EndSend(IAsyncResult result) { this.outputLink.EndSend(result); } public Message Receive(TimeSpan timeout) { Message message; if (this.TryReceive(timeout, out message)) { return message; } else { throw new TimeoutException("Receive"); } } public Message Receive() { return this.Receive(this.DefaultReceiveTimeout); } public bool TryReceive(TimeSpan timeout, out Message message) { AmqpMessage amqpMessage; message = null; if (this.inputLink.TryReceive(timeout, out amqpMessage)) { message = this.QpidToWcf(amqpMessage); return true; } return false; } public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) { return this.inputLink.BeginTryReceive(timeout, callback, state); } public bool EndTryReceive(IAsyncResult result, out Message message) { AmqpMessage amqpMessage = null; if (!this.inputLink.EndTryReceive(result, out amqpMessage)) { message = null; return false; } message = QpidToWcf(amqpMessage); return true; } public bool WaitForMessage(TimeSpan timeout) { return this.inputLink.WaitForMessage(timeout); } public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) { return this.inputLink.BeginTryReceive(timeout, callback, state); } public IAsyncResult BeginReceive(AsyncCallback callback, object state) { return this.BeginReceive(this.DefaultReceiveTimeout, callback, state); } public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) { return this.inputLink.BeginWaitForMessage(timeout, callback, state); } public Message EndReceive(IAsyncResult result) { Message message; if (this.EndTryReceive(result, out message)) { return message; } else { throw new TimeoutException("EndReceive"); } } public bool EndWaitForMessage(IAsyncResult result) { return this.inputLink.EndWaitForMessage(result); } public void CloseEndPoint() { if (this.inputLink != null) { this.inputLink.Close(); } if (this.outputLink != null) { this.outputLink.Close(); } } /// /// Open connection to Broker /// protected override void OnOpen(TimeSpan timeout) { // TODO: move open logic to here from constructor } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return this.asyncOpenCaller.BeginInvoke(timeout, callback, state); } protected override void OnEndOpen(IAsyncResult result) { this.asyncOpenCaller.EndInvoke(result); } protected override void OnAbort() { //// TODO: check for network-less qpid teardown or launch special thread this.CloseEndPoint(); this.Cleanup(); } /// /// Shutdown gracefully /// protected override void OnClose(TimeSpan timeout) { this.CloseEndPoint(); this.Cleanup(); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { return this.asyncCloseCaller.BeginInvoke(timeout, callback, state); } protected override void OnEndClose(IAsyncResult result) { this.asyncCloseCaller.EndInvoke(result); } private AmqpMessage WcfToQpid(Message wcfMessage) { object obj; AmqpProperties applicationProperties = null; bool success = false; AmqpMessage amqpMessage = null; if (wcfMessage.Properties.TryGetValue("AmqpProperties", out obj)) { applicationProperties = obj as AmqpProperties; } try { AmqpProperties outgoingProperties = new AmqpProperties(); // Start with AMQP properties from the binding and the URI if (this.factoryChannelProperties.DefaultMessageProperties != null) { outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties); } if (this.subject != null) { outgoingProperties.RoutingKey = this.subject; } if (this.qpidSubject != null) { outgoingProperties.PropertyMap["qpid.subject"] = new AmqpString(this.qpidSubject); } // Add the Properties set by the application on this particular message. // Application properties trump channel properties if (applicationProperties != null) { outgoingProperties.MergeFrom(applicationProperties); } amqpMessage = this.outputLink.CreateMessage(); amqpMessage.Properties = outgoingProperties; // copy the WCF message body to the AMQP message body if (this.streamed) { this.encoder.WriteMessage(wcfMessage, amqpMessage.BodyStream); } else { ArraySegment encodedBody = this.encoder.WriteMessage(wcfMessage, int.MaxValue, this.bufferManager); try { amqpMessage.BodyStream.Write(encodedBody.Array, encodedBody.Offset, encodedBody.Count); } finally { this.bufferManager.ReturnBuffer(encodedBody.Array); } } success = true; } finally { if (!success && (amqpMessage != null)) { amqpMessage.Dispose(); } } return amqpMessage; } private Message QpidToWcf(AmqpMessage amqpMessage) { if (amqpMessage == null) { return null; } Message wcfMessage = null; byte[] managedBuffer = null; try { if (this.streamed) { wcfMessage = this.encoder.ReadMessage(amqpMessage.BodyStream, int.MaxValue); } else { int count = (int)amqpMessage.BodyStream.Length; managedBuffer = this.bufferManager.TakeBuffer(count); int nr = amqpMessage.BodyStream.Read(managedBuffer, 0, count); ArraySegment bufseg = new ArraySegment(managedBuffer, 0, count); wcfMessage = this.encoder.ReadMessage(bufseg, this.bufferManager); // set to null for finally{} block, since the encoder is now responsible for // returning the BufferManager memory managedBuffer = null; } // This message will be discarded unless the "To" header matches // the WCF endpoint dispatcher's address filter (or the service is // AddressFilterMode=AddressFilterMode.Any). this.remoteAddress.ApplyTo(wcfMessage); if (amqpMessage.Properties != null) { wcfMessage.Properties.Add("AmqpProperties", amqpMessage.Properties); } } catch (XmlException xmlException) { throw new ProtocolException( "There is a problem with the XML that was received from the network. See inner exception for more details.", xmlException); } catch (Exception e) { // TODO: logging Console.WriteLine("TX channel encoder exception " + e); } finally { // close the amqpMessage unless the body will be read at a later time. if (!this.streamed || wcfMessage == null) { amqpMessage.Close(); } // the handoff to the encoder failed if (managedBuffer != null) { this.bufferManager.ReturnBuffer(managedBuffer); } } return wcfMessage; } private void Cleanup() { this.bufferManager.Clear(); } private string UriToQpidAddress(Uri uri, out string subject) { if (uri.Scheme != AmqpConstants.Scheme) { throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, "The scheme {0} specified in address is not supported.", uri.Scheme), "uri"); } subject = ""; string path = uri.LocalPath; string query = uri.Query; // legacy... convert old style myqueue?routingkey=key to myqueue/key if (query.Length > 0) { if (!query.StartsWith("?")) { throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, "Invalid query argument."), "uri"); } string routingParseKey = "routingkey="; string subjectParseKey = "subject="; char[] charSeparators = new char[] { '?', ';' }; string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries); foreach (string s in args) { if (s.StartsWith(routingParseKey)) { subject = s.Substring(routingParseKey.Length); } else if (s.StartsWith(subjectParseKey)) { subject = s.Substring(subjectParseKey.Length); } else { if (s.Length > 0) { throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, "Invalid query argument {0}.", s), "uri"); } } } if (path.Contains("/")) { throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, "Invalid queue name {0}.", path), "uri"); } if (path.Length == 0) { // special case, user wants default exchange return "//" + subject; } return path + "/" + subject; } // find subject in "myqueue/mysubject;{mode:browse}" int pos = path.IndexOf('/'); if ((pos > -1) && (pos < path.Length + 1)) { subject = path.Substring(pos); pos = subject.IndexOf(';'); if (pos == 0) { throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, "Empty subject in address {0}.", path), "uri"); } if (pos > 0) { subject = subject.Substring(0, pos); } } if (subject.Length > 0) { subject = HttpUtility.UrlDecode(subject); } return HttpUtility.UrlDecode(path); } } }