/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.IO; using System.ServiceModel; using System.ServiceModel.Channels; using org.apache.qpid.client; using org.apache.qpid.transport; using org.apache.qpid.transport.util; namespace org.apache.qpid.wcf.model { internal sealed class QpidOutputChannel : QpidOutputChannelBase { private readonly MessageEncoder _encoder; private readonly ClientSession _session; private readonly string _queueName; public QpidOutputChannel(BindingContext context, ClientSession session, EndpointAddress address) : base(context, address) { var encoderElement = context.Binding.Elements.Find(); if (encoderElement != null) { _encoder = encoderElement.CreateMessageEncoderFactory().Encoder; } _queueName = address.Uri.ToString(); _session = session; } public override void Send(System.ServiceModel.Channels.Message message, TimeSpan timeout) { if (message.State != MessageState.Closed) { byte[] body; using (var str = new MemoryStream()) { _encoder.WriteMessage(message, str); body = str.ToArray(); } _session.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(new DeliveryProperties().setRoutingKey(_queueName), new transport.MessageProperties().setMessageId(UUID.randomUUID())), body); } } public override void Close(TimeSpan timeout) { if (State == CommunicationState.Closed || State == CommunicationState.Closing) return; // Ignore the call, we're already closing. OnClosing(); OnClosed(); } public override void Open(TimeSpan timeout) { if (State != CommunicationState.Created && State != CommunicationState.Closed) throw new InvalidOperationException(string.Format("Cannot open the channel from the {0} state.", State)); OnOpening(); var qr = (QueueQueryResult) _session.queueQuery(_queueName).Result; if (qr.getQueue() == null) { // create the queue _session.queueDeclare(_queueName, null, null); } OnOpened(); } } }