Chapter 4. Using the Qpid WCF client

Table of Contents

4.1. XML and Binary Bindings
4.2. Endpoints
4.3. Message Headers
4.4. Security
4.5. Transactions

4.1. XML and Binary Bindings

The Qpid WCF client provides two bindings, each with support for Windows .NET transactions.

The AmqpBinding is suitable for communication between two WCF applications. By default it uses the WCF binary .NET XML encoder (BinaryMessageEncodingBindingElement) for efficient message transmission, but it can also use the text and Message Transmission Optimization Mechanism (MTOM) encoders. Here is a traditional service model sample program using the AmqpBinding. It assumes that the queue "hello_service_node" has been created and configured on the AMQP broker.

Example 4.1. Traditional service model "Hello world!" example

namespace Apache.Qpid.Documentation.HelloService
{
  using System;
  using System.ServiceModel;
  using System.ServiceModel.Channels;
  using System.Threading;
  using Apache.Qpid.Channel;

  [ServiceContract]
  public interface IHelloService
  {
    [OperationContract(IsOneWay = true, Action = "*")]
    void SayHello(string greeting);
  }

  public class HelloService : IHelloService
  {
    private static int greetingCount;

    public static int GreetingCount
    {
      get { return greetingCount; }
    }

    public void SayHello(string greeting)
    {
      Console.WriteLine("Service received: " + greeting);
      greetingCount++;
    }
    static void Main(string[] args)
    {
      try
      {
        AmqpBinding amqpBinding = new AmqpBinding();
        amqpBinding.BrokerHost = "localhost";
        amqpBinding.BrokerPort = 5672;

        ServiceHost serviceHost = new ServiceHost(typeof(HelloService));
        serviceHost.AddServiceEndpoint(typeof(IHelloService),
          amqpBinding, "amqp:hello_service_node");
        serviceHost.Open();

        // Send the service a test greeting
        Uri amqpClientUri=new Uri("amqp:amq.direct?routingkey=hello_service_node");
        EndpointAddress clientEndpoint = new EndpointAddress(amqpClientUri);
        ChannelFactory<IHelloService> channelFactory =
          new ChannelFactory<IHelloService>(amqpBinding, clientEndpoint);
        IHelloService clientProxy = channelFactory.CreateChannel();

        clientProxy.SayHello("Greetings from WCF client");

        // wait for service to process the greeting
        while (HelloService.GreetingCount == 0)
        {
          Thread.Sleep(100);
        }
        channelFactory.Close();
        serviceHost.Close();
      }
      catch (Exception e)
      {
        Console.WriteLine("Exception: {0}", e);
      }
    }
  }
}
      

The second binding, AmqpBinaryBinding, is suitable for WCF applications that need to inter-operate with non-WCF clients or that wish to have direct access to the raw wire representation of the message body. It relies on a custom encoder to read and write raw (binary) content which operates similarly to the ByteStream encoder (introduced in .NET 4.0). The encoder presents an abstract XML infoset view of the raw message content on input. On output, the encoder does the reverse and peels away the XML infoset layer exposing the raw content to the wire representation of the message body. The application must do the inverse of what the encoder does to allow the XML infoset wrapper to cancel properly. This is demonstrated in the following sample code (using the channel programming model) which directly manipulates or provides callbacks to the WCF message readers and writers when the content is consumed. In contrast to the AmqpBinding sample where the simple greeting is encapsulated in a compressed SOAP envelope, the wire representation of the message contains the raw content and is identical and fully interoperable with the Qpid C++ "Hello world!" example.

Example 4.2. Binary "Hello world!" example using the channel model

namespace Apache.Qpid.Samples.Channel.HelloWorld
{
  using System;
  using System.ServiceModel;
  using System.ServiceModel.Channels;
  using System.ServiceModel.Description;
  using System.Text;
  using System.Xml;
  using Apache.Qpid.Channel;

  public class HelloWorld
  {
    static void Main(string[] args)
    {
      String broker = "localhost";
      int port = 5672;
      String target = "amq.topic";
      String source = "my_topic_node";

      if (args.Length > 0)
      {
        broker = args[0];
      }

      if (args.Length > 1)
      {
        port = int.Parse(args[1]);
      }

      if (args.Length > 2)
      {
        target = args[2];
      }

      if (args.Length > 3)
      {
        source = args[3];
      }

      AmqpBinaryBinding binding = new AmqpBinaryBinding();
      binding.BrokerHost = broker;
      binding.BrokerPort = port;

      IChannelFactory<IInputChannel> receiverFactory = binding.BuildChannelFactory<IInputChannel>();
      receiverFactory.Open();
      IInputChannel receiver = receiverFactory.CreateChannel(new EndpointAddress("amqp:" + source));
      receiver.Open();

      IChannelFactory<IOutputChannel> senderFactory = binding.BuildChannelFactory<IOutputChannel>();
      senderFactory.Open();
      IOutputChannel sender = senderFactory.CreateChannel(new EndpointAddress("amqp:" + target));
      sender.Open();

      sender.Send(Message.CreateMessage(MessageVersion.None, "", new HelloWorldBinaryBodyWriter()));

      Message message = receiver.Receive();
      XmlDictionaryReader reader = message.GetReaderAtBodyContents();
      while (!reader.HasValue)
      {
        reader.Read();
      }

      byte[] binaryContent = reader.ReadContentAsBase64();
      string text = Encoding.UTF8.GetString(binaryContent);

      Console.WriteLine(text);

      senderFactory.Close();
      receiverFactory.Close();
    }
  }

  public class HelloWorldBinaryBodyWriter : BodyWriter
  {
    public HelloWorldBinaryBodyWriter() : base (true) {}

    protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
    {
      byte[] binaryContent = Encoding.UTF8.GetBytes("Hello world!");

      // wrap the content:
      writer.WriteStartElement("Binary");
      writer.WriteBase64(binaryContent, 0, binaryContent.Length);
    }
  }
}

Bindings define ChannelFactories and ChannelListeners associated with an AMQP Broker. WCF will frequently automatically create and manage the life cycle of a these and the resulting IChannel objects used in message transfer. The binding parameters that can be set are:

Table 4.1. WCF Binding Parameters

ParameterDefaultDescription
BrokerHost localhost The broker's server name. Currently the WCF channel only supports connections with a single broker. Failover to multiple brokers will be provided in the future.
BrokerPort 5672 The port the broker is listening on.
PrefetchLimit 0 The number of messages to prefetch from the amqp broker before the application actually consumes them. Increasing this number can dramatically increase the read performance in some circumstances.
Shared false Indicates if separate channels to the same broker can share an underlying AMQP tcp connection (provided they also share the same authentication credentials).
TransferMode buffered Indicates whether the channel's encoder uses the WCF BufferManager cache to temporarily store message content during the encoding/decoding phase. For small to medium sized SOAP based messages, buffered is usually the preferred choice. For binary messages, streamed TransferMode is the more efficient mode.