/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.IntegrationTests
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Kafka.Client.Consumers;
using Kafka.Client.Messages;
using Kafka.Client.Producers.Async;
using Kafka.Client.Producers.Sync;
using Kafka.Client.Requests;
using NUnit.Framework;
///
/// Contains tests that go all the way to Kafka and back.
///
[TestFixture]
public class KafkaIntegrationTest : IntegrationFixtureBase
{
///
/// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
///
private static readonly int MaxTestWaitTimeInMiliseconds = 5000;
///
/// Sends a pair of message to Kafka.
///
[Test]
public void ProducerSendsMessage()
{
var prodConfig = this.SyncProducerConfig1;
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
var msg1 = new Message(payloadData1);
string payload2 = "kafka 2.";
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
var msg2 = new Message(payloadData2);
using (var producer = new SyncProducer(prodConfig))
{
var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List { msg1, msg2 });
producer.Send(producerRequest);
}
}
///
/// Sends a message with long topic to Kafka.
///
[Test]
public void ProducerSendsMessageWithLongTopic()
{
var prodConfig = this.SyncProducerConfig1;
var msg = new Message(Encoding.UTF8.GetBytes("test message"));
string topic = "ThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopic";
using (var producer = new SyncProducer(prodConfig))
{
var producerRequest = new ProducerRequest(topic, 0, new List { msg });
producer.Send(producerRequest);
}
}
///
/// Asynchronously sends many random messages to Kafka
///
[Test]
public void AsyncProducerSendsManyLongRandomMessages()
{
var prodConfig = this.AsyncProducerConfig1;
List messages = GenerateRandomTextMessages(50);
using (var producer = new AsyncProducer(prodConfig))
{
producer.Send(CurrentTestTopic, 0, messages);
}
}
///
/// Asynchronously sends few short fixed messages to Kafka
///
[Test]
public void AsyncProducerSendsFewShortFixedMessages()
{
var prodConfig = this.AsyncProducerConfig1;
var messages = new List
{
new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
new Message(Encoding.UTF8.GetBytes("Async Test Message 2")),
new Message(Encoding.UTF8.GetBytes("Async Test Message 3")),
new Message(Encoding.UTF8.GetBytes("Async Test Message 4"))
};
using (var producer = new AsyncProducer(prodConfig))
{
producer.Send(CurrentTestTopic, 0, messages);
}
}
///
/// Asynchronously sends few short fixed messages to Kafka in separate send actions
///
[Test]
public void AsyncProducerSendsFewShortFixedMessagesInSeparateSendActions()
{
var prodConfig = this.AsyncProducerConfig1;
using (var producer = new AsyncProducer(prodConfig))
{
var req1 = new ProducerRequest(
CurrentTestTopic,
0,
new List { new Message(Encoding.UTF8.GetBytes("Async Test Message 1")) });
producer.Send(req1);
var req2 = new ProducerRequest(
CurrentTestTopic,
0,
new List { new Message(Encoding.UTF8.GetBytes("Async Test Message 2")) });
producer.Send(req2);
var req3 = new ProducerRequest(
CurrentTestTopic,
0,
new List { new Message(Encoding.UTF8.GetBytes("Async Test Message 3")) });
producer.Send(req3);
}
}
[Test]
public void AsyncProducerSendsMessageWithCallbackClass()
{
var prodConfig = this.AsyncProducerConfig1;
var messages = new List
{
new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
};
var myHandler = new TestCallbackHandler();
using (var producer = new AsyncProducer(prodConfig, myHandler))
{
producer.Send(CurrentTestTopic, 0, messages);
}
Thread.Sleep(1000);
Assert.IsTrue(myHandler.WasRun);
}
[Test]
public void AsyncProducerSendsMessageWithCallback()
{
var prodConfig = this.AsyncProducerConfig1;
var messages = new List
{
new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
};
var myHandler = new TestCallbackHandler();
using (var producer = new AsyncProducer(prodConfig))
{
producer.Send(CurrentTestTopic, 0, messages, myHandler.Handle);
}
Thread.Sleep(1000);
Assert.IsTrue(myHandler.WasRun);
}
private class TestCallbackHandler : ICallbackHandler
{
public bool WasRun { get; private set; }
public void Handle(RequestContext context)
{
WasRun = true;
}
}
///
/// Send a multi-produce request to Kafka.
///
[Test]
public void ProducerSendMultiRequest()
{
var prodConfig = this.SyncProducerConfig1;
var requests = new List
{
new ProducerRequest(CurrentTestTopic, 0, new List { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
new ProducerRequest(CurrentTestTopic, 0, new List { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
new ProducerRequest(CurrentTestTopic, 0, new List { new Message(Encoding.UTF8.GetBytes("3: " + DateTime.UtcNow)) }),
new ProducerRequest(CurrentTestTopic, 0, new List { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })
};
using (var producer = new SyncProducer(prodConfig))
{
producer.MultiSend(requests);
}
}
///
/// Generates messages for Kafka then gets them back.
///
[Test]
public void ConsumerFetchMessage()
{
var consumerConfig = this.ConsumerConfig1;
ProducerSendsMessage();
Thread.Sleep(1000);
IConsumer consumer = new Consumer(consumerConfig);
var request = new FetchRequest(CurrentTestTopic, 0, 0);
BufferedMessageSet response = consumer.Fetch(request);
Assert.NotNull(response);
int count = 0;
foreach (var message in response)
{
count++;
Console.WriteLine(message.Message);
}
Assert.AreEqual(2, count);
}
///
/// Generates multiple messages for Kafka then gets them back.
///
[Test]
public void ConsumerMultiFetchGetsMessage()
{
var config = this.ConsumerConfig1;
ProducerSendMultiRequest();
Thread.Sleep(2000);
IConsumer cons = new Consumer(config);
var request = new MultiFetchRequest(new List
{
new FetchRequest(CurrentTestTopic, 0, 0),
new FetchRequest(CurrentTestTopic, 0, 0),
new FetchRequest(CurrentTestTopic, 0, 0)
});
IList response = cons.MultiFetch(request);
Assert.AreEqual(3, response.Count);
for (int ix = 0; ix < response.Count; ix++)
{
IEnumerable messageSet = response[ix].Messages;
Assert.AreEqual(4, messageSet.Count());
Console.WriteLine(string.Format("Request #{0}-->", ix));
foreach (Message msg in messageSet)
{
Console.WriteLine(msg.ToString());
}
}
}
///
/// Gets offsets from Kafka.
///
[Test]
public void ConsumerGetsOffsets()
{
var consumerConfig = this.ConsumerConfig1;
var request = new OffsetRequest(CurrentTestTopic, 0, DateTime.Now.AddHours(-24).Ticks, 10);
IConsumer consumer = new Consumer(consumerConfig);
IList list = consumer.GetOffsetsBefore(request);
foreach (long l in list)
{
Console.Out.WriteLine(l);
}
}
///
/// Synchronous Producer sends a single simple message and then a consumer consumes it
///
[Test]
public void ProducerSendsAndConsumerReceivesSingleSimpleMessage()
{
var prodConfig = this.SyncProducerConfig1;
var consumerConfig = this.ConsumerConfig1;
var sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, consumerConfig);
using (var producer = new SyncProducer(prodConfig))
{
var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List { sourceMessage });
producer.Send(producerRequest);
}
IConsumer consumer = new Consumer(consumerConfig);
var request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
BufferedMessageSet response;
int totalWaitTimeInMiliseconds = 0;
int waitSingle = 100;
while (true)
{
Thread.Sleep(waitSingle);
response = consumer.Fetch(request);
if (response != null && response.Messages.Count() > 0)
{
break;
}
totalWaitTimeInMiliseconds += waitSingle;
if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
{
break;
}
}
Assert.NotNull(response);
Assert.AreEqual(1, response.Messages.Count());
Message resultMessage = response.Messages.First();
Assert.AreEqual(sourceMessage.ToString(), resultMessage.ToString());
}
///
/// Asynchronous Producer sends a single simple message and then a consumer consumes it
///
[Test]
public void AsyncProducerSendsAndConsumerReceivesSingleSimpleMessage()
{
var prodConfig = this.AsyncProducerConfig1;
var consumerConfig = this.ConsumerConfig1;
var sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
using (var producer = new AsyncProducer(prodConfig))
{
var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List { sourceMessage });
producer.Send(producerRequest);
}
long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, consumerConfig);
IConsumer consumer = new Consumer(consumerConfig);
var request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
BufferedMessageSet response;
int totalWaitTimeInMiliseconds = 0;
int waitSingle = 100;
while (true)
{
Thread.Sleep(waitSingle);
response = consumer.Fetch(request);
if (response != null && response.Messages.Count() > 0)
{
break;
}
totalWaitTimeInMiliseconds += waitSingle;
if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
{
break;
}
}
Assert.NotNull(response);
Assert.AreEqual(1, response.Messages.Count());
Message resultMessage = response.Messages.First();
Assert.AreEqual(sourceMessage.ToString(), resultMessage.ToString());
}
///
/// Synchronous producer sends a multi request and a consumer receives it from to Kafka.
///
[Test]
public void ProducerSendsAndConsumerReceivesMultiRequest()
{
var prodConfig = this.SyncProducerConfig1;
var consumerConfig = this.ConsumerConfig1;
string testTopic1 = CurrentTestTopic + "1";
string testTopic2 = CurrentTestTopic + "2";
string testTopic3 = CurrentTestTopic + "3";
var sourceMessage1 = new Message(Encoding.UTF8.GetBytes("1: TestMessage"));
var sourceMessage2 = new Message(Encoding.UTF8.GetBytes("2: TestMessage"));
var sourceMessage3 = new Message(Encoding.UTF8.GetBytes("3: TestMessage"));
var sourceMessage4 = new Message(Encoding.UTF8.GetBytes("4: TestMessage"));
var requests = new List
{
new ProducerRequest(testTopic1, 0, new List { sourceMessage1 }),
new ProducerRequest(testTopic1, 0, new List { sourceMessage2 }),
new ProducerRequest(testTopic2, 0, new List { sourceMessage3 }),
new ProducerRequest(testTopic3, 0, new List { sourceMessage4 })
};
long currentOffset1 = TestHelper.GetCurrentKafkaOffset(testTopic1, consumerConfig);
long currentOffset2 = TestHelper.GetCurrentKafkaOffset(testTopic2, consumerConfig);
long currentOffset3 = TestHelper.GetCurrentKafkaOffset(testTopic3, consumerConfig);
using (var producer = new SyncProducer(prodConfig))
{
producer.MultiSend(requests);
}
IConsumer consumer = new Consumer(consumerConfig);
var request = new MultiFetchRequest(new List
{
new FetchRequest(testTopic1, 0, currentOffset1),
new FetchRequest(testTopic2, 0, currentOffset2),
new FetchRequest(testTopic3, 0, currentOffset3)
});
IList messageSets;
int totalWaitTimeInMiliseconds = 0;
int waitSingle = 100;
while (true)
{
Thread.Sleep(waitSingle);
messageSets = consumer.MultiFetch(request);
if (messageSets.Count > 2 && messageSets[0].Messages.Count() > 0 && messageSets[1].Messages.Count() > 0 && messageSets[2].Messages.Count() > 0)
{
break;
}
totalWaitTimeInMiliseconds += waitSingle;
if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
{
break;
}
}
Assert.AreEqual(3, messageSets.Count);
Assert.AreEqual(2, messageSets[0].Messages.Count());
Assert.AreEqual(1, messageSets[1].Messages.Count());
Assert.AreEqual(1, messageSets[2].Messages.Count());
Assert.AreEqual(sourceMessage1.ToString(), messageSets[0].Messages.First().ToString());
Assert.AreEqual(sourceMessage2.ToString(), messageSets[0].Messages.Skip(1).First().ToString());
Assert.AreEqual(sourceMessage3.ToString(), messageSets[1].Messages.First().ToString());
Assert.AreEqual(sourceMessage4.ToString(), messageSets[2].Messages.First().ToString());
}
///
/// Gererates a randome list of text messages.
///
/// The number of messages to generate.
/// A list of random text messages.
private static List GenerateRandomTextMessages(int numberOfMessages)
{
var messages = new List();
for (int ix = 0; ix < numberOfMessages; ix++)
{
////messages.Add(new Message(GenerateRandomBytes(10000)));
messages.Add(new Message(Encoding.UTF8.GetBytes(GenerateRandomMessage(10000))));
}
return messages;
}
///
/// Generate a random message text.
///
/// Length of the message string.
/// Random message string.
private static string GenerateRandomMessage(int length)
{
var builder = new StringBuilder();
var random = new Random();
for (int i = 0; i < length; i++)
{
char ch = Convert.ToChar(Convert.ToInt32(
Math.Floor((26 * random.NextDouble()) + 65)));
builder.Append(ch);
}
return builder.ToString();
}
}
}