#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os import getopt import sys import locale import socket import re from qmf.console import Session class Config: def __init__(self): self._host = "localhost" self._connTimeout = 10 self._stopId = None self._stopAll = False self._force = False self._numeric = False self._showConn = False self._delConn = None def usage (): print "Usage: qpid-cluster [OPTIONS] [broker-addr]" print print " broker-addr is in the form: [username/password@] hostname | ip-address [:]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print print "Options:" print " --timeout seconds (10) Maximum time to wait for broker connection" print " -C [--all-connections] View client connections to all cluster members" print " -c [--connections] ID View client connections to specified member" print " -d [--del-connection] HOST:PORT" print " Disconnect a client connection" print " -s [--stop] ID Stop one member of the cluster by its ID" print " -k [--all-stop] Shut down the whole cluster" print " -f [--force] Suppress the 'are-you-sure?' prompt" print " -n [--numeric] Don't resolve names" print class IpAddr: def __init__(self, text): if text.find("@") != -1: tokens = text.split("@") text = tokens[1] if text.find(":") != -1: tokens = text.split(":") text = tokens[0] self.port = int(tokens[1]) else: self.port = 5672 self.dottedQuad = socket.gethostbyname(text) nums = self.dottedQuad.split(".") self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) def bestAddr(self, addrPortList): bestDiff = 0xFFFFFFFFL bestAddr = None for addrPort in addrPortList: diff = IpAddr(addrPort[0]).addr ^ self.addr if diff < bestDiff: bestDiff = diff bestAddr = addrPort return bestAddr class BrokerManager: def __init__(self, config): self.config = config self.brokerName = None self.qmf = None self.broker = None def SetBroker(self, brokerUrl): self.url = brokerUrl self.qmf = Session() self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout) agents = self.qmf.getAgents() for a in agents: if a.getAgentBank() == 0: self.brokerAgent = a def Disconnect(self): if self.broker: self.qmf.delBroker(self.broker) def _getClusters(self): packages = self.qmf.getPackages() if "org.apache.qpid.cluster" not in packages: raise Exception("Clustering is not installed on the broker.") clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) if len(clusters) == 0: raise Exception("Clustering is installed but not enabled on the broker.") return clusters def _getHostList(self, urlList): hosts = [] hostAddr = IpAddr(self.config._host) for url in urlList: if url.find("amqp:") != 0: raise Exception("Invalid URL 1") url = url[5:] addrs = str(url).split(",") addrList = [] for addr in addrs: tokens = addr.split(":") if len(tokens) != 3: raise Exception("Invalid URL 2") addrList.append((tokens[1], tokens[2])) # Find the address in the list that is most likely to be in the same subnet as the address # with which we made the original QMF connection. This increases the probability that we will # be able to reach the cluster member. best = hostAddr.bestAddr(addrList) bestUrl = best[0] + ":" + best[1] hosts.append(bestUrl) return hosts def overview(self): clusters = self._getClusters() cluster = clusters[0] memberList = cluster.members.split(";") idList = cluster.memberIDs.split(";") print " Cluster Name: %s" % cluster.clusterName print "Cluster Status: %s" % cluster.status print " Cluster Size: %d" % cluster.clusterSize print " Members: ID=%s URL=%s" % (idList[0], memberList[0]) for idx in range(1,len(idList)): print " : ID=%s URL=%s" % (idList[idx], memberList[idx]) def stopMember(self, id): clusters = self._getClusters() cluster = clusters[0] idList = cluster.memberIDs.split(";") if id not in idList: raise Exception("No member with matching ID found") if not self.config._force: prompt = "Warning: " if len(idList) == 1: prompt += "This command will shut down the last running cluster member." else: prompt += "This command will shut down a cluster member." prompt += " Are you sure? [N]: " confirm = raw_input(prompt) if len(confirm) == 0 or confirm[0].upper() != 'Y': raise Exception("Operation canceled") cluster.stopClusterNode(id) def stopAll(self): clusters = self._getClusters() if not self.config._force: prompt = "Warning: This command will shut down the entire cluster." prompt += " Are you sure? [N]: " confirm = raw_input(prompt) if len(confirm) == 0 or confirm[0].upper() != 'Y': raise Exception("Operation canceled") cluster = clusters[0] cluster.stopFullCluster() def showConnections(self): clusters = self._getClusters() cluster = clusters[0] memberList = cluster.members.split(";") idList = cluster.memberIDs.split(";") displayList = [] hostList = self._getHostList(memberList) self.qmf.delBroker(self.broker) self.broker = None self.brokers = [] pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") idx = 0 for host in hostList: if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn: self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout)) displayList.append(idList[idx]) idx += 1 idx = 0 found = False for broker in self.brokers: if not self.config._delConn: print "Clients on Member: ID=%s:" % displayList[idx] connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) for conn in connList: if pattern.match(conn.address): if self.config._numeric or self.config._delConn: a = conn.address else: tokens = conn.address.split(":") try: hostList = socket.gethostbyaddr(tokens[0]) host = hostList[0] except: host = tokens[0] a = host + ":" + tokens[1] if self.config._delConn: tokens = self.config._delConn.split(":") ip = socket.gethostbyname(tokens[0]) toDelete = ip + ":" + tokens[1] if a == toDelete: print "Closing connection from client: %s" % a conn.close() found = True else: print " %s" % a idx += 1 if not self.config._delConn: print if self.config._delConn and not found: print "Client connection '%s' not found" % self.config._delConn for broker in self.brokers: self.qmf.delBroker(broker) def main(argv=None): if argv is None: argv = sys.argv try: config = Config() try: longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric", "timeout=") (optlist, encArgs) = getopt.gnu_getopt(argv[1:], "s:kfCc:d:n", longOpts) except: usage() return 1 try: encoding = locale.getpreferredencoding() cargs = [a.decode(encoding) for a in encArgs] except: cargs = encArgs count = 0 for opt in optlist: if opt[0] == "--timeout": config._connTimeout = int(opt[1]) if config._connTimeout == 0: config._connTimeout = None if opt[0] == "-s" or opt[0] == "--stop": config._stopId = opt[1] if len(config._stopId.split(":")) != 2: raise Exception("Member ID must be of form: :") count += 1 if opt[0] == "-k" or opt[0] == "--all-stop": config._stopAll = True count += 1 if opt[0] == "-f" or opt[0] == "--force": config._force = True if opt[0] == "-n" or opt[0] == "--numeric": config._numeric = True if opt[0] == "-C" or opt[0] == "--all-connections": config._showConn = "all" count += 1 if opt[0] == "-c" or opt[0] == "--connections": config._showConn = opt[1] if len(config._showConn.split(":")) != 2: raise Exception("Member ID must be of form: :") count += 1 if opt[0] == "-d" or opt[0] == "--del-connection": config._delConn = opt[1] if len(config._delConn.split(":")) != 2: raise Exception("Connection must be of form: :") count += 1 if count > 1: print "Only one command option may be supplied" print usage() return 1 nargs = len(cargs) bm = BrokerManager(config) if nargs == 1: config._host = cargs[0] try: bm.SetBroker(config._host) if config._stopId: bm.stopMember(config._stopId) elif config._stopAll: bm.stopAll() elif config._showConn or config._delConn: bm.showConnections() else: bm.overview() except KeyboardInterrupt: print except Exception,e: if str(e).find("connection aborted") > 0: # we expect this when asking the connected broker to shut down return 0 raise Exception("Failed: %s - %s" % (e.__class__.__name__, e)) bm.Disconnect() except Exception, e: print str(e) return 1 if __name__ == "__main__": sys.exit(main())