/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Kafka.Client.Utils { using System.Collections.Generic; using System.Globalization; using System.Reflection; using Kafka.Client.Cluster; using Kafka.Client.ZooKeeperIntegration; using log4net; using ZooKeeperNet; internal class ZkUtils { private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); internal static void UpdatePersistentPath(IZooKeeperClient zkClient, string path, string data) { try { zkClient.WriteData(path, data); } catch (KeeperException.NoNodeException) { CreateParentPath(zkClient, path); try { zkClient.CreatePersistent(path, data); } catch (KeeperException.NodeExistsException) { zkClient.WriteData(path, data); } } } internal static void CreateParentPath(IZooKeeperClient zkClient, string path) { string parentDir = path.Substring(0, path.LastIndexOf('/')); if (parentDir.Length != 0) { zkClient.CreatePersistent(parentDir, true); } } internal static void DeletePath(IZooKeeperClient zkClient, string path) { try { zkClient.Delete(path); } catch (KeeperException.NoNodeException) { Logger.InfoFormat(CultureInfo.CurrentCulture, "{0} deleted during connection loss; this is ok", path); } } internal static IDictionary> GetPartitionsForTopics(IZooKeeperClient zkClient, IEnumerable topics) { var result = new Dictionary>(); foreach (string topic in topics) { var partList = new List(); var brokers = zkClient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic); foreach (var broker in brokers) { var numberOfParts = int.Parse( zkClient.ReadData(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic + "/" + broker), CultureInfo.CurrentCulture); for (int i = 0; i < numberOfParts; i++) { partList.Add(broker + "-" + i); } } partList.Sort(); result.Add(topic, partList); } return result; } internal static void CreateEphemeralPathExpectConflict(IZooKeeperClient zkClient, string path, string data) { try { CreateEphemeralPath(zkClient, path, data); } catch (KeeperException.NodeExistsException) { string storedData; try { storedData = zkClient.ReadData(path); } catch (KeeperException.NoNodeException) { // the node disappeared; treat as if node existed and let caller handles this throw; } if (storedData == null || storedData != data) { Logger.InfoFormat(CultureInfo.CurrentCulture, "conflict in {0} data: {1} stored data: {2}", path, data, storedData); throw; } else { // otherwise, the creation succeeded, return normally Logger.InfoFormat(CultureInfo.CurrentCulture, "{0} exits with value {1} during connection loss; this is ok", path, data); } } } internal static void CreateEphemeralPath(IZooKeeperClient zkClient, string path, string data) { try { zkClient.CreateEphemeral(path, data); } catch (KeeperException.NoNodeException) { ZkUtils.CreateParentPath(zkClient, path); zkClient.CreateEphemeral(path, data); } } } }