/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Text; using Kafka.Client.Request; using Kafka.Client.Util; namespace Kafka.Client { /// /// Sends message to Kafka. /// public class Producer { /// /// Initializes a new instance of the Producer class. /// /// The server to connect to. /// The port to connect to. public Producer(string server, int port) { Server = server; Port = port; } /// /// Gets the server to which the connection is to be established. /// public string Server { get; private set; } /// /// Gets the port to which the connection is to be established. /// public int Port { get; private set; } /// /// Sends a message to Kafka. /// /// The topic to publish to. /// The partition to publish to. /// The message to send. public void Send(string topic, int partition, Message msg) { Send(topic, partition, new List { msg }); } /// /// Sends a list of messages to Kafka. /// /// The topic to publish to. /// The partition to publish to. /// The list of messages to send. public void Send(string topic, int partition, IList messages) { Send(new ProducerRequest(topic, partition, messages)); } /// /// Sends a request to Kafka. /// /// The request to send to Kafka. public void Send(ProducerRequest request) { if (request.IsValid()) { using (KafkaConnection connection = new KafkaConnection(Server, Port)) { connection.Write(request); } } } /// /// Sends a request to Kafka. /// /// The request to send to Kafka. public void Send(MultiProducerRequest request) { if (request.IsValid()) { using (KafkaConnection connection = new KafkaConnection(Server, Port)) { connection.Write(request); } } } /// /// Sends a list of messages to Kafka. /// /// The topic to publish to. /// The partition to publish to. /// The list of messages to send. /// /// A block of code to execute once the request has been sent to Kafka. This value may /// be set to null. /// public void SendAsync(string topic, int partition, IList messages, MessageSent callback) { SendAsync(new ProducerRequest(topic, partition, messages), callback); } /// /// Send a request to Kafka asynchronously. /// /// /// If the callback is not specified then the method behaves as a fire-and-forget call /// with the callback being ignored. By the time this callback is executed, the /// will already have been closed given an /// internal call . /// /// The request to send to Kafka. /// /// A block of code to execute once the request has been sent to Kafka. This value may /// be set to null. /// public void SendAsync(ProducerRequest request, MessageSent callback) { if (request.IsValid()) { KafkaConnection connection = new KafkaConnection(Server, Port); if (callback == null) { // fire and forget connection.BeginWrite(request.GetBytes()); } else { // execute with callback connection.BeginWrite(request, callback); } } } } }