#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os import getopt import sys import locale import socket import re from qmf.console import Session, SchemaClass _host = "localhost" _connTimeout = 10 _verbose = 0 _del_test = False; pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") _debug_recursion = 0 def Usage (): print "Usage: verify_cluster_objects [OPTIONS] [broker-addr]" print print " broker-addr is in the form: [username/password@] hostname | ip-address [:]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print print " This program contacts every node of a cluster, loads all manageable objects from" print " those nodes and verifies that the management data is identical across the clusters." print print "Options:" print " --timeout seconds (10) Maximum time to wait for broker connection" print " --verbose level (0) Show details of objects and their IDs" print " --delete Delete some objects after creation, to test synchup" print sys.exit (1) class IpAddr: def __init__(self, text): if text.find("@") != -1: tokens = text.split("@") text = tokens[1] if text.find(":") != -1: tokens = text.split(":") text = tokens[0] self.port = int(tokens[1]) else: self.port = 5672 self.dottedQuad = socket.gethostbyname(text) nums = self.dottedQuad.split(".") self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) def bestAddr(self, addrPortList): bestDiff = 0xFFFFFFFFL bestAddr = None for addrPort in addrPortList: diff = IpAddr(addrPort[0]).addr ^ self.addr if diff < bestDiff: bestDiff = diff bestAddr = addrPort return bestAddr class ObjectId: """Object identity, use for dictionaries by object id""" def __init__(self, object): self.object = object def __eq__(self, other): return self.object is other.object def __hash__(self): return hash(id(self.object)) class Broker(object): def __init__(self, qmf, broker): self.broker = broker self.qmf = qmf agents = qmf.getAgents() for a in agents: if a.getAgentBank() == 0: self.brokerAgent = a bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] self.currentTime = bobj.getTimestamps()[0] try: self.uptime = bobj.uptime except: self.uptime = 0 self.tablesByName = {} self.package = "org.apache.qpid.broker" self.id_cache = {} # Cache for getAbstractId def getUrl(self): return self.broker.getUrl() def getData(self): if _verbose > 1: print "Broker:", self.broker classList = self.qmf.getClasses(self.package) for cls in classList: if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE: self.loadTable(cls) # # this should be a method on an object, but is kept here for now, until # we finish sorting out the treatment of names in qmfv2 # def getAbstractId(self, object): """ return a string the of the hierarchical name """ if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)] global _debug_recursion result = u"" valstr = u"" _debug_recursion += 1 debug_prefix = _debug_recursion if (_verbose > 9): print debug_prefix, " enter gai: props ", object._properties for property, value in object._properties: # we want to recurse on things which are refs. we tell by # asking each property if it's an index. I think... if (_verbose > 9): print debug_prefix, " prop ", property, " val " , value, " idx ", property.index, " type ", property.type # property is an instance, you can ask its type, name, etc. # special case system refs, as they will never be the same on # distinct cluster nodes. later we probably want a different # way of representing these objects, like for instance don't # include the system ref in the hierarchy. if property.name == "systemRef": _debug_recursion -= 1 self.id_cache[ObjectId(object)] = "" return "" if property.index: if result != u"": result += u":" if property.type == 10: try: recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker) if (_verbose > 9): print debug_prefix, " r ", recursive_objects[0] for rp, rv in recursive_objects[0]._properties: print debug_prefix, " rrr ", rp, " idx-p ", rp.index, " v ", rv print debug_prefix, " recursing on ", recursive_objects[0] valstr = self.getAbstractId(recursive_objects[0]) if (_verbose > 9): print debug_prefix, " recursing on ", recursive_objects[0], " -> ", valstr except Exception, e: if (_verbose > 9): print debug_prefix, " except ", e valstr = u"" else: # this yields UUID-blah. not good. try something else # valstr = value.__repr__() # print debug_prefix, " val ", value # yetch. this needs to be abstracted someplace? I don't # think we have the infrastructure we need to make these id # strings be sensible in the general case if property.name == "systemId": # special case. try to do something sensible about systemref objects valstr = object.nodeName else: valstr = value.__repr__() # I think... result += valstr if (_verbose > 9): print debug_prefix, " id ", self, " -> ", result _debug_recursion -= 1 self.id_cache[ObjectId(object)] = result return result def loadTable(self, cls): if _verbose > 1: print " Class:", cls.getClassName() list = self.qmf.getObjects(_class=cls.getClassName(), _package=cls.getPackageName(), _agent=self.brokerAgent) # tables-by-name maps class name to a table by object-name of # objects. ie use the class name ("broker", "queue", etc) to # index tables-by-name, returning a second table, use the # object name to index that to get an object. self.tablesByName[cls.getClassName()] = {} for obj in list: # make sure we aren't colliding on name. it's an internal # error (ie, the name-generation code is busted) if we do key = self.getAbstractId(obj) if key in self.tablesByName[cls.getClassName()]: raise Exception("internal error: collision for %s on key %s\n" % (obj, key)) self.tablesByName[cls.getClassName()][key] = obj if _verbose > 1: print " ", obj.getObjectId(), " ", obj.getIndex(), " ", key class BrokerManager: def __init__(self): self.brokerName = None self.qmf = None self.broker = None self.brokers = [] self.cluster = None def SetBroker(self, brokerUrl): self.url = brokerUrl self.qmf = Session() self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) agents = self.qmf.getAgents() for a in agents: if a.getAgentBank() == 0: self.brokerAgent = a def Disconnect(self): if self.broker: self.qmf.delBroker(self.broker) def _getCluster(self): packages = self.qmf.getPackages() if "org.apache.qpid.cluster" not in packages: return None clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) if len(clusters) == 0: print "Clustering is installed but not enabled on the broker." return None self.cluster = clusters[0] def _getHostList(self, urlList): hosts = [] hostAddr = IpAddr(_host) for url in urlList: if url.find("amqp:") != 0: raise Exception("Invalid URL 1") url = url[5:] addrs = str(url).split(",") addrList = [] for addr in addrs: tokens = addr.split(":") if len(tokens) != 3: raise Exception("Invalid URL 2") addrList.append((tokens[1], tokens[2])) # Find the address in the list that is most likely to be # in the same subnet as the address with which we made the # original QMF connection. This increases the probability # that we will be able to reach the cluster member. best = hostAddr.bestAddr(addrList) bestUrl = best[0] + ":" + best[1] hosts.append(bestUrl) return hosts # the main fun which tests for broker state "identity". now that # we're using qmf2 style object names across the board, that test # means that we are ensuring that for all objects of a given # class, an object of that class with the same object name exists # on the peer broker. def verify(self): if _verbose > 0: print "Connecting to the cluster..." self._getCluster() if self.cluster: memberList = self.cluster.members.split(";") hostList = self._getHostList(memberList) self.qmf.delBroker(self.broker) self.broker = None for host in hostList: b = self.qmf.addBroker(host, _connTimeout) self.brokers.append(Broker(self.qmf, b)) if _verbose > 0: print " ", b else: raise Exception("Failed - Not a cluster") failures = [] # Wait until connections to all nodes are established before # loading the management data. This will ensure that the # objects are all stable and the same. if _verbose > 0: print "Loading management data from nodes..." for broker in self.brokers: broker.getData() # If we're testing delete-some-objects functionality, create a # few widgets here and then delete them. if _del_test: if _verbose > 0: print "Running delete test" # just stick 'em in the first broker b = self.brokers[0] session = b.qmf.brokers[0].getAmqpSession() session.queue_declare(queue="foo", exclusive=True, auto_delete=True) session.exchange_bind(exchange="amq.direct", queue="foo", binding_key="foo") session.queue_declare(queue="bar", exclusive=True, auto_delete=True) session.exchange_bind(exchange="amq.direct", queue="bar", binding_key="bar") # now delete 'em session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo") session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar") session.queue_delete("bar") session.queue_delete("foo") # Verify that each node has the same set of objects (based on # object name). if _verbose > 0: print "Verifying objects based on object name..." base = self.brokers[0] for broker in self.brokers[1:]: # walk over the class names, for each class (with some # exceptions) walk over the objects of that class, making # sure they match between broker A and broker B for className in base.tablesByName: if className in ["broker", "system", "connection"]: continue tab1 = base.tablesByName[className] tab2 = broker.tablesByName[className] for key in tab1: if key not in tab2: failures.append("%s key %s not found on node %s" % (className, key, broker.getUrl())) for key in tab2: if key not in tab1: failures.append("%s key %s not found on node %s" % (className, key, base.getUrl())) if len(failures) > 0: print "Failures:" for failure in failures: print " %s" % failure raise Exception("Failures") if _verbose > 0: print "Success" ## ## Main Program ## try: longOpts = ("verbose=", "timeout=", "delete") (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts) except: Usage() try: encoding = locale.getpreferredencoding() cargs = [a.decode(encoding) for a in encArgs] except: cargs = encArgs for opt in optlist: if opt[0] == "--timeout": _connTimeout = int(opt[1]) if _connTimeout == 0: _connTimeout = None elif opt[0] == "--verbose": _verbose = int(opt[1]) elif opt[0] == "--delete": _del_test = True; else: Usage() nargs = len(cargs) bm = BrokerManager() if nargs == 1: _host = cargs[0] try: bm.SetBroker(_host) bm.verify() except KeyboardInterrupt: print except Exception,e: print "Failed: %s - %s" % (e.__class__.__name__, e) sys.exit(1) bm.Disconnect()