/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Kafka.Client.IntegrationTests { using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using Kafka.Client.Cfg; using Kafka.Client.Consumers; using Kafka.Client.Messages; using Kafka.Client.Producers; using Kafka.Client.Requests; using Kafka.Client.Serialization; using NUnit.Framework; [TestFixture] public class ProducerTests : IntegrationFixtureBase { /// /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds) /// private readonly int maxTestWaitTimeInMiliseconds = 5000; [Test] public void ProducerSends1Message() { var prodConfig = this.ConfigBasedSyncProdConfig; int totalWaitTimeInMiliseconds = 0; int waitSingle = 100; var originalMessage = new Message(Encoding.UTF8.GetBytes("TestData")); var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic); multipleBrokersHelper.GetCurrentOffsets( new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }); using (var producer = new Producer(prodConfig)) { var producerData = new ProducerData( CurrentTestTopic, new List { originalMessage }); producer.Send(producerData); Thread.Sleep(waitSingle); } while ( !multipleBrokersHelper.CheckIfAnyBrokerHasChanged( new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 })) { totalWaitTimeInMiliseconds += waitSingle; Thread.Sleep(waitSingle); if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds) { Assert.Fail("None of the brokers changed their offset after sending a message"); } } totalWaitTimeInMiliseconds = 0; var consumerConfig = new ConsumerConfiguration( multipleBrokersHelper.BrokerThatHasChanged.Host, multipleBrokersHelper.BrokerThatHasChanged.Port); IConsumer consumer = new Consumer(consumerConfig); var request1 = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange); BufferedMessageSet response; while (true) { Thread.Sleep(waitSingle); response = consumer.Fetch(request1); if (response != null && response.Messages.Count() > 0) { break; } totalWaitTimeInMiliseconds += waitSingle; if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds) { break; } } Assert.NotNull(response); Assert.AreEqual(1, response.Messages.Count()); Assert.AreEqual(originalMessage.ToString(), response.Messages.First().ToString()); } [Test] public void ProducerSends3Messages() { var prodConfig = this.ConfigBasedSyncProdConfig; int totalWaitTimeInMiliseconds = 0; int waitSingle = 100; var originalMessage1 = new Message(Encoding.UTF8.GetBytes("TestData1")); var originalMessage2 = new Message(Encoding.UTF8.GetBytes("TestData2")); var originalMessage3 = new Message(Encoding.UTF8.GetBytes("TestData3")); var originalMessageList = new List { originalMessage1, originalMessage2, originalMessage3 }; var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic); multipleBrokersHelper.GetCurrentOffsets( new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }); using (var producer = new Producer(prodConfig)) { var producerData = new ProducerData(CurrentTestTopic, originalMessageList); producer.Send(producerData); } Thread.Sleep(waitSingle); while ( !multipleBrokersHelper.CheckIfAnyBrokerHasChanged( new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 })) { totalWaitTimeInMiliseconds += waitSingle; Thread.Sleep(waitSingle); if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds) { Assert.Fail("None of the brokers changed their offset after sending a message"); } } totalWaitTimeInMiliseconds = 0; var consumerConfig = new ConsumerConfiguration( multipleBrokersHelper.BrokerThatHasChanged.Host, multipleBrokersHelper.BrokerThatHasChanged.Port); IConsumer consumer = new Consumer(consumerConfig); var request = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange); BufferedMessageSet response; while (true) { Thread.Sleep(waitSingle); response = consumer.Fetch(request); if (response != null && response.Messages.Count() > 2) { break; } totalWaitTimeInMiliseconds += waitSingle; if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds) { break; } } Assert.NotNull(response); Assert.AreEqual(3, response.Messages.Count()); Assert.AreEqual(originalMessage1.ToString(), response.Messages.First().ToString()); Assert.AreEqual(originalMessage2.ToString(), response.Messages.Skip(1).First().ToString()); Assert.AreEqual(originalMessage3.ToString(), response.Messages.Skip(2).First().ToString()); } [Test] public void ProducerSends1MessageUsingNotDefaultEncoder() { var prodConfig = this.ConfigBasedSyncProdConfig; int totalWaitTimeInMiliseconds = 0; int waitSingle = 100; string originalMessage = "TestData"; var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic); multipleBrokersHelper.GetCurrentOffsets(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }); using (var producer = new Producer(prodConfig, null, new StringEncoder(), null)) { var producerData = new ProducerData( CurrentTestTopic, new List { originalMessage }); producer.Send(producerData); } Thread.Sleep(waitSingle); while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 })) { totalWaitTimeInMiliseconds += waitSingle; Thread.Sleep(waitSingle); if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds) { Assert.Fail("None of the brokers changed their offset after sending a message"); } } totalWaitTimeInMiliseconds = 0; var consumerConfig = new ConsumerConfiguration( multipleBrokersHelper.BrokerThatHasChanged.Host, multipleBrokersHelper.BrokerThatHasChanged.Port); IConsumer consumer = new Consumer(consumerConfig); var request = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange); BufferedMessageSet response; while (true) { Thread.Sleep(waitSingle); response = consumer.Fetch(request); if (response != null && response.Messages.Count() > 0) { break; } totalWaitTimeInMiliseconds += waitSingle; if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds) { break; } } Assert.NotNull(response); Assert.AreEqual(1, response.Messages.Count()); Assert.AreEqual(originalMessage, Encoding.UTF8.GetString(response.Messages.First().Payload)); } } }