/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Kafka.Client.IntegrationTests { using System; using System.Collections.Generic; using System.Reflection; using System.Threading; using Kafka.Client.Cfg; using Kafka.Client.Exceptions; using Kafka.Client.Utils; using Kafka.Client.ZooKeeperIntegration; using Kafka.Client.ZooKeeperIntegration.Events; using Kafka.Client.ZooKeeperIntegration.Listeners; using log4net; using NUnit.Framework; using ZooKeeperNet; [TestFixture] internal class ZooKeeperClientTests : IntegrationFixtureBase, IZooKeeperDataListener, IZooKeeperStateListener, IZooKeeperChildListener { private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); private readonly IList events = new List(); [SetUp] public void TestSetup() { this.events.Clear(); } [Test] public void ZooKeeperClientCreateWorkerThreadsOnBeingCreated() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Connect(); var eventWorker = ReflectionHelper.GetInstanceField("eventWorker", client); var zooKeeperWorker = ReflectionHelper.GetInstanceField("zooKeeperEventWorker", client); Assert.NotNull(eventWorker); Assert.NotNull(zooKeeperWorker); } } [Test] public void ZooKeeperClientFailsWhenCreatedWithWrongConnectionInfo() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; using (IZooKeeperClient client = new ZooKeeperClient( "random text", prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { Assert.Throws(client.Connect); } } [Test] public void WhenStateChangedToConnectedStateListenerFires() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Subscribe(this); client.Connect(); WaitUntillIdle(client, 500); } Assert.AreEqual(1, this.events.Count); ZooKeeperEventArgs e = this.events[0]; Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type); Assert.IsInstanceOf(e); Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.SyncConnected); } [Test] public void WhenStateChangedToDisconnectedStateListenerFires() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Subscribe(this); client.Connect(); WaitUntillIdle(client, 500); client.Process(new WatchedEvent(KeeperState.Disconnected, EventType.None, null)); WaitUntillIdle(client, 500); } Assert.AreEqual(2, this.events.Count); ZooKeeperEventArgs e = this.events[1]; Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type); Assert.IsInstanceOf(e); Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.Disconnected); } [Test] public void WhenStateChangedToExpiredStateAndSessionListenersFire() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Subscribe(this); client.Connect(); WaitUntillIdle(client, 500); client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null)); WaitUntillIdle(client, 3000); } Assert.AreEqual(4, this.events.Count); ZooKeeperEventArgs e = this.events[1]; Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type); Assert.IsInstanceOf(e); Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.Expired); e = this.events[2]; Assert.AreEqual(ZooKeeperEventTypes.SessionCreated, e.Type); Assert.IsInstanceOf(e); e = this.events[3]; Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type); Assert.IsInstanceOf(e); Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.SyncConnected); } [Test] public void WhenSessionExpiredClientReconnects() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; IZooKeeperConnection conn1; IZooKeeperConnection conn2; using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Connect(); conn1 = ReflectionHelper.GetInstanceField("connection", client); client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null)); WaitUntillIdle(client, 1000); conn2 = ReflectionHelper.GetInstanceField("connection", client); } Assert.AreNotEqual(conn1, conn2); } [Test] public void ZooKeeperClientChecksIfPathExists() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Connect(); Assert.IsTrue(client.Exists(ZooKeeperClient.DefaultBrokerTopicsPath, false)); } } [Test] public void ZooKeeperClientCreatesANewPathAndDeletesIt() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Connect(); string myPath = "/" + Guid.NewGuid(); client.CreatePersistent(myPath, false); Assert.IsTrue(client.Exists(myPath)); client.Delete(myPath); Assert.IsFalse(client.Exists(myPath)); } } [Test] public void WhenChildIsCreatedChilListenerOnParentFires() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; string myPath = "/" + Guid.NewGuid(); using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Connect(); WaitUntillIdle(client, 500); client.Subscribe("/", this as IZooKeeperChildListener); client.CreatePersistent(myPath, true); WaitUntillIdle(client, 500); client.UnsubscribeAll(); client.Delete(myPath); } Assert.AreEqual(1, this.events.Count); ZooKeeperEventArgs e = this.events[0]; Assert.AreEqual(ZooKeeperEventTypes.ChildChanged, e.Type); Assert.IsInstanceOf(e); Assert.AreEqual(((ZooKeeperChildChangedEventArgs)e).Path, "/"); Assert.Greater(((ZooKeeperChildChangedEventArgs)e).Children.Count, 0); Assert.IsTrue(((ZooKeeperChildChangedEventArgs)e).Children.Contains(myPath.Replace("/", string.Empty))); } [Test] public void WhenChildIsDeletedChildListenerOnParentFires() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; string myPath = "/" + Guid.NewGuid(); using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Connect(); client.CreatePersistent(myPath, true); WaitUntillIdle(client, 500); client.Subscribe("/", this as IZooKeeperChildListener); client.Delete(myPath); WaitUntillIdle(client, 500); } Assert.AreEqual(1, this.events.Count); ZooKeeperEventArgs e = this.events[0]; Assert.AreEqual(ZooKeeperEventTypes.ChildChanged, e.Type); Assert.IsInstanceOf(e); Assert.AreEqual(((ZooKeeperChildChangedEventArgs)e).Path, "/"); Assert.Greater(((ZooKeeperChildChangedEventArgs)e).Children.Count, 0); Assert.IsFalse(((ZooKeeperChildChangedEventArgs)e).Children.Contains(myPath.Replace("/", string.Empty))); } [Test] public void WhenZNodeIsDeletedChildAndDataDeletedListenersFire() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; string myPath = "/" + Guid.NewGuid(); using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Connect(); client.CreatePersistent(myPath, true); WaitUntillIdle(client, 500); client.Subscribe(myPath, this as IZooKeeperChildListener); client.Subscribe(myPath, this as IZooKeeperDataListener); client.Delete(myPath); WaitUntillIdle(client, 500); } Assert.AreEqual(2, this.events.Count); ZooKeeperEventArgs e = this.events[0]; Assert.AreEqual(ZooKeeperEventTypes.ChildChanged, e.Type); Assert.IsInstanceOf(e); Assert.AreEqual(((ZooKeeperChildChangedEventArgs)e).Path, myPath); Assert.IsNull(((ZooKeeperChildChangedEventArgs)e).Children); e = this.events[1]; Assert.AreEqual(ZooKeeperEventTypes.DataChanged, e.Type); Assert.IsInstanceOf(e); Assert.AreEqual(((ZooKeeperDataChangedEventArgs)e).Path, myPath); Assert.IsNull(((ZooKeeperDataChangedEventArgs)e).Data); } [Test] public void ZooKeeperClientCreatesAChildAndGetsChildren() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Connect(); string child = Guid.NewGuid().ToString(); string myPath = "/" + child; client.CreatePersistent(myPath, false); IList children = client.GetChildren("/", false); int countChildren = client.CountChildren("/"); Assert.Greater(children.Count, 0); Assert.AreEqual(children.Count, countChildren); Assert.IsTrue(children.Contains(child)); client.Delete(myPath); } } [Test] public void WhenDataChangedDataListenerFires() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; string myPath = "/" + Guid.NewGuid(); string sourceData = "my test data"; string resultData; using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer)) { client.Connect(); client.CreatePersistent(myPath, true); WaitUntillIdle(client, 500); client.Subscribe(myPath, this as IZooKeeperDataListener); client.Subscribe(myPath, this as IZooKeeperChildListener); client.WriteData(myPath, sourceData); WaitUntillIdle(client, 500); client.UnsubscribeAll(); resultData = client.ReadData(myPath); client.Delete(myPath); } Assert.IsTrue(!string.IsNullOrEmpty(resultData)); Assert.AreEqual(sourceData, resultData); Assert.AreEqual(1, this.events.Count); ZooKeeperEventArgs e = this.events[0]; Assert.AreEqual(ZooKeeperEventTypes.DataChanged, e.Type); Assert.IsInstanceOf(e); Assert.AreEqual(((ZooKeeperDataChangedEventArgs)e).Path, myPath); Assert.IsNotNull(((ZooKeeperDataChangedEventArgs)e).Data); Assert.AreEqual(((ZooKeeperDataChangedEventArgs)e).Data, sourceData); } [Test] [ExpectedException(typeof(ZooKeeperException))] public void WhenClientWillNotConnectWithinGivenTimeThrows() { var prodConfig = this.ZooKeeperBasedSyncProdConfig; using (IZooKeeperClient client = new ZooKeeperClient( prodConfig.ZooKeeper.ZkConnect, prodConfig.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer, 1)) { client.Connect(); } } public void HandleDataChange(ZooKeeperDataChangedEventArgs args) { Logger.Debug(args + " reach test event handler"); this.events.Add(args); } public void HandleDataDelete(ZooKeeperDataChangedEventArgs args) { Logger.Debug(args + " reach test event handler"); this.events.Add(args); } public void HandleStateChanged(ZooKeeperStateChangedEventArgs args) { Logger.Debug(args + " reach test event handler"); this.events.Add(args); } public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args) { Logger.Debug(args + " reach test event handler"); this.events.Add(args); } public void HandleChildChange(ZooKeeperChildChangedEventArgs args) { Logger.Debug(args + " reach test event handler"); this.events.Add(args); } public void ResetState() { } } }