/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Kafka.Client.IntegrationTests { using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Reflection; using System.Text; using System.Threading; using Kafka.Client.Consumers; using Kafka.Client.Exceptions; using Kafka.Client.Messages; using Kafka.Client.Producers.Sync; using Kafka.Client.Requests; using NUnit.Framework; [TestFixture] public class ConsumerTests : IntegrationFixtureBase { [Test] public void ConsumerConnectorIsCreatedConnectsDisconnectsAndShutsDown() { var config = this.ZooKeeperBasedConsumerConfig; using (new ZookeeperConsumerConnector(config, true)) { } } [Test] public void SimpleSyncProducerSends2MessagesAndConsumerConnectorGetsThemBack() { var prodConfig = this.SyncProducerConfig1; var consumerConfig = this.ZooKeeperBasedConsumerConfig; // first producing string payload1 = "kafka 1."; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); var msg1 = new Message(payloadData1); string payload2 = "kafka 2."; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); var msg2 = new Message(payloadData2); var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List { msg1, msg2 }); using (var producer = new SyncProducer(prodConfig)) { producer.Send(producerRequest); } // now consuming var resultMessages = new List(); using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true)) { var topicCount = new Dictionary { { CurrentTestTopic, 1 } }; var messages = consumerConnector.CreateMessageStreams(topicCount); var sets = messages[CurrentTestTopic]; try { foreach (var set in sets) { foreach (var message in set) { resultMessages.Add(message); } } } catch (ConsumerTimeoutException) { // do nothing, this is expected } } Assert.AreEqual(2, resultMessages.Count); Assert.AreEqual(msg1.ToString(), resultMessages[0].ToString()); Assert.AreEqual(msg2.ToString(), resultMessages[1].ToString()); } [Test] public void OneMessageIsSentAndReceivedThenExceptionsWhenNoMessageThenAnotherMessageIsSentAndReceived() { var prodConfig = this.SyncProducerConfig1; var consumerConfig = this.ZooKeeperBasedConsumerConfig; // first producing string payload1 = "kafka 1."; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); var msg1 = new Message(payloadData1); using (var producer = new SyncProducer(prodConfig)) { var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List { msg1 }); producer.Send(producerRequest); // now consuming using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true)) { var topicCount = new Dictionary { { CurrentTestTopic, 1 } }; var messages = consumerConnector.CreateMessageStreams(topicCount); var sets = messages[CurrentTestTopic]; KafkaMessageStream myStream = sets[0]; var enumerator = myStream.GetEnumerator(); Assert.IsTrue(enumerator.MoveNext()); Assert.AreEqual(msg1.ToString(), enumerator.Current.ToString()); Assert.Throws(() => enumerator.MoveNext()); Assert.Throws(() => enumerator.MoveNext()); // iterator is in failed state enumerator.Reset(); // producing again string payload2 = "kafka 2."; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); var msg2 = new Message(payloadData2); var producerRequest2 = new ProducerRequest(CurrentTestTopic, 0, new List { msg2 }); producer.Send(producerRequest2); Thread.Sleep(3000); Assert.IsTrue(enumerator.MoveNext()); Assert.AreEqual(msg2.ToString(), enumerator.Current.ToString()); } } } [Test] public void ConsumerConnectorConsumesTwoDifferentTopics() { var prodConfig = this.SyncProducerConfig1; var consumerConfig = this.ZooKeeperBasedConsumerConfig; string topic1 = CurrentTestTopic + "1"; string topic2 = CurrentTestTopic + "2"; // first producing string payload1 = "kafka 1."; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); var msg1 = new Message(payloadData1); string payload2 = "kafka 2."; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); var msg2 = new Message(payloadData2); using (var producer = new SyncProducer(prodConfig)) { var producerRequest1 = new ProducerRequest(topic1, 0, new List { msg1 }); producer.Send(producerRequest1); var producerRequest2 = new ProducerRequest(topic2, 0, new List { msg2 }); producer.Send(producerRequest2); } // now consuming var resultMessages1 = new List(); var resultMessages2 = new List(); using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true)) { var topicCount = new Dictionary { { topic1, 1 }, { topic2, 1 } }; var messages = consumerConnector.CreateMessageStreams(topicCount); Assert.IsTrue(messages.ContainsKey(topic1)); Assert.IsTrue(messages.ContainsKey(topic2)); var sets1 = messages[topic1]; try { foreach (var set in sets1) { foreach (var message in set) { resultMessages1.Add(message); } } } catch (ConsumerTimeoutException) { // do nothing, this is expected } var sets2 = messages[topic2]; try { foreach (var set in sets2) { foreach (var message in set) { resultMessages2.Add(message); } } } catch (ConsumerTimeoutException) { // do nothing, this is expected } } Assert.AreEqual(1, resultMessages1.Count); Assert.AreEqual(msg1.ToString(), resultMessages1[0].ToString()); Assert.AreEqual(1, resultMessages2.Count); Assert.AreEqual(msg2.ToString(), resultMessages2[0].ToString()); } [Test] public void ConsumerConnectorReceivesAShutdownSignal() { var consumerConfig = this.ZooKeeperBasedConsumerConfig; // now consuming using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true)) { var topicCount = new Dictionary { { CurrentTestTopic, 1 } }; var messages = consumerConnector.CreateMessageStreams(topicCount); // putting the shutdown command into the queue FieldInfo fi = typeof(ZookeeperConsumerConnector).GetField( "queues", BindingFlags.NonPublic | BindingFlags.Instance); var value = (IDictionary, BlockingCollection>) fi.GetValue(consumerConnector); foreach (var topicConsumerQueueMap in value) { topicConsumerQueueMap.Value.Add(ZookeeperConsumerConnector.ShutdownCommand); } var sets = messages[CurrentTestTopic]; var resultMessages = new List(); foreach (var set in sets) { foreach (var message in set) { resultMessages.Add(message); } } Assert.AreEqual(0, resultMessages.Count); } } [Test] public void ProducersSendMessagesToDifferentPartitionsAndConsumerConnectorGetsThemBack() { var prodConfig = this.SyncProducerConfig1; var consumerConfig = this.ZooKeeperBasedConsumerConfig; // first producing string payload1 = "kafka 1."; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); var msg1 = new Message(payloadData1); string payload2 = "kafka 2."; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); var msg2 = new Message(payloadData2); using (var producer = new SyncProducer(prodConfig)) { var producerRequest1 = new ProducerRequest(CurrentTestTopic, 0, new List { msg1 }); producer.Send(producerRequest1); var producerRequest2 = new ProducerRequest(CurrentTestTopic, 1, new List { msg2 }); producer.Send(producerRequest2); } // now consuming var resultMessages = new List(); using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true)) { var topicCount = new Dictionary { { CurrentTestTopic, 1 } }; var messages = consumerConnector.CreateMessageStreams(topicCount); var sets = messages[CurrentTestTopic]; try { foreach (var set in sets) { foreach (var message in set) { resultMessages.Add(message); } } } catch (ConsumerTimeoutException) { // do nothing, this is expected } } Assert.AreEqual(2, resultMessages.Count); Assert.AreEqual(msg1.ToString(), resultMessages[0].ToString()); Assert.AreEqual(msg2.ToString(), resultMessages[1].ToString()); } } }