/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Text;
using Kafka.Client.Request;
using Kafka.Client.Util;
namespace Kafka.Client
{
///
/// Sends message to Kafka.
///
public class Producer
{
///
/// Initializes a new instance of the Producer class.
///
/// The server to connect to.
/// The port to connect to.
public Producer(string server, int port)
{
Server = server;
Port = port;
}
///
/// Gets the server to which the connection is to be established.
///
public string Server { get; private set; }
///
/// Gets the port to which the connection is to be established.
///
public int Port { get; private set; }
///
/// Sends a message to Kafka.
///
/// The topic to publish to.
/// The partition to publish to.
/// The message to send.
public void Send(string topic, int partition, Message msg)
{
Send(topic, partition, new List { msg });
}
///
/// Sends a list of messages to Kafka.
///
/// The topic to publish to.
/// The partition to publish to.
/// The list of messages to send.
public void Send(string topic, int partition, IList messages)
{
Send(new ProducerRequest(topic, partition, messages));
}
///
/// Sends a request to Kafka.
///
/// The request to send to Kafka.
public void Send(ProducerRequest request)
{
if (request.IsValid())
{
using (KafkaConnection connection = new KafkaConnection(Server, Port))
{
connection.Write(request);
}
}
}
///
/// Sends a request to Kafka.
///
/// The request to send to Kafka.
public void Send(MultiProducerRequest request)
{
if (request.IsValid())
{
using (KafkaConnection connection = new KafkaConnection(Server, Port))
{
connection.Write(request);
}
}
}
///
/// Sends a list of messages to Kafka.
///
/// The topic to publish to.
/// The partition to publish to.
/// The list of messages to send.
///
/// A block of code to execute once the request has been sent to Kafka. This value may
/// be set to null.
///
public void SendAsync(string topic, int partition, IList messages, MessageSent callback)
{
SendAsync(new ProducerRequest(topic, partition, messages), callback);
}
///
/// Send a request to Kafka asynchronously.
///
///
/// If the callback is not specified then the method behaves as a fire-and-forget call
/// with the callback being ignored. By the time this callback is executed, the
/// will already have been closed given an
/// internal call .
///
/// The request to send to Kafka.
///
/// A block of code to execute once the request has been sent to Kafka. This value may
/// be set to null.
///
public void SendAsync(ProducerRequest request, MessageSent callback)
{
if (request.IsValid())
{
KafkaConnection connection = new KafkaConnection(Server, Port);
if (callback == null)
{
// fire and forget
connection.BeginWrite(request.GetBytes());
}
else
{
// execute with callback
connection.BeginWrite(request, callback);
}
}
}
}
}