#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os import optparse import sys import socket import locale from types import * from cmd import Cmd from shlex import split from threading import Lock from time import strftime, gmtime from qpid.disp import Display from qpid.peer import Closed from qmf.console import Session, Console, SchemaClass, ObjectId class Mcli(Cmd): """ Management Command Interpreter """ def __init__(self, dataObject, dispObject): Cmd.__init__(self) self.dataObject = dataObject self.dispObject = dispObject self.dataObject.setCli(self) self.prompt = "qpid: " def emptyline(self): pass def setPromptMessage(self, p): if p == None: self.prompt = "qpid: " else: self.prompt = "qpid[%s]: " % p def do_help(self, data): print "Management Tool for QPID" print print "Commands:" print " agents - Print a list of the known Agents" print " list - Print summary of existing objects by class" print " list - Print list of objects of the specified class" print " list active - Print list of non-deleted objects of the specified class" # print " show - Print contents of all objects of specified class" # print " show active - Print contents of all non-deleted objects of specified class" print " show - Print contents of an object (infer className)" # print " show - Print contents of one or more objects" # print " list is space-separated, ranges may be specified (i.e. 1004-1010)" print " call [] - Invoke a method on an object" print " schema - Print summary of object classes seen on the target" print " schema - Print details of an object class" print " set time-format short - Select short timestamp format (default)" print " set time-format long - Select long timestamp format" print " quit or ^D - Exit the program" print def complete_set(self, text, line, begidx, endidx): """ Command completion for the 'set' command """ tokens = split(line) if len(tokens) < 2: return ["time-format "] elif tokens[1] == "time-format": if len(tokens) == 2: return ["long", "short"] elif len(tokens) == 3: if "long".find(text) == 0: return ["long"] elif "short".find(text) == 0: return ["short"] elif "time-format".find(text) == 0: return ["time-format "] return [] def do_set(self, data): tokens = split(data) try: if tokens[0] == "time-format": self.dispObject.do_setTimeFormat(tokens[1]) except: pass def complete_schema(self, text, line, begidx, endidx): tokens = split(line) if len(tokens) > 2: return [] return self.dataObject.classCompletions(text) def do_schema(self, data): try: self.dataObject.do_schema(data) except Exception, e: print "Exception in do_schema: %r" % e def do_agents(self, data): try: self.dataObject.do_agents(data) except Exception, e: print "Exception in do_agents: %r" % e def do_id(self, data): try: self.dataObject.do_id(data) except Exception, e: print "Exception in do_id: %r" % e def complete_list(self, text, line, begidx, endidx): tokens = split(line) if len(tokens) > 2: return [] return self.dataObject.classCompletions(text) def do_list(self, data): try: self.dataObject.do_list(data) except Exception, e: print "Exception in do_list: %r" % e def do_show(self, data): try: self.dataObject.do_show(data) except Exception, e: print "Exception in do_show: %r" % e def do_call(self, data): try: self.dataObject.do_call(data) except Exception, e: print "Exception in do_call: %r" % e def do_EOF(self, data): print "quit" try: self.dataObject.do_exit() except: pass return True def do_quit(self, data): try: self.dataObject.do_exit() except: pass return True def postcmd(self, stop, line): return stop def postloop(self): print "Exiting..." self.dataObject.close() #====================================================================================================== # QmfData #====================================================================================================== class QmfData(Console): """ """ def __init__(self, disp, url, conn_options): self.disp = disp self.url = url self.session = Session(self, manageConnections=True) self.broker = self.session.addBroker(self.url, **conn_options) self.lock = Lock() self.connected = None self.closing = None self.first_connect = True self.cli = None self.idRegistry = IdRegistry() self.objects = {} #======================= # Methods to support CLI #======================= def setCli(self, cli): self.cli = cli def close(self): try: self.closing = True if self.session and self.broker: self.session.delBroker(self.broker) except: pass # we're shutting down - ignore any errors def classCompletions(self, text): pass def do_schema(self, data): if data == "": self.schemaSummary() else: self.schemaTable(data) def do_agents(self, data): agents = self.session.getAgents() rows = [] for agent in agents: version = 1 if agent.isV2: version = 2 rows.append(("%d.%s" % (agent.getBrokerBank(), agent.getAgentBank()), agent.label, agent.epoch, version)) self.disp.table("QMF Agents:", ("Agent Name", "Label", "Epoch", "QMF Version"), rows) def do_id(self, data): tokens = data.split() for token in tokens: if not token.isdigit(): print "Value %s is non-numeric" % token return title = "Translation of Display IDs:" heads = ('DisplayID', 'Epoch', 'Agent', 'ObjectName') if len(tokens) == 0: tokens = self.idRegistry.getDisplayIds() rows = [] for token in tokens: rows.append(self.idRegistry.getIdInfo(int(token))) self.disp.table(title, heads, rows) def do_list(self, data): tokens = data.split() if len(tokens) == 0: self.listClasses() else: self.listObjects(tokens) def do_show(self, data): tokens = data.split() if len(tokens) == 0: print "Missing Class or ID" return keys = self.classKeysByToken(tokens[0]) if keys: self.showObjectsByKey(keys) elif tokens[0].isdigit(): self.showObjectById(int(tokens[0])) def _build_object_name(self, obj): values = [] for p,v in obj.getProperties(): if p.name != "vhostRef" and p.index == 1: if p.name == "brokerRef": # reference to broker values.append('org.apache.qpid.broker:broker:amqp-broker') else: values.append(str(v)) object_key = ",".join(values) class_key = obj.getClassKey(); return class_key.getPackageName() + ":" + class_key.getClassName() + ":" + object_key def do_call(self, data): tokens = data.split() if len(tokens) < 2: print "Not enough arguments supplied" return displayId = long(tokens[0]) methodName = tokens[1] args = [] for arg in tokens[2:]: ## ## If the argument is a map, list, boolean, integer, or floating (one decimal point), ## run it through the Python evaluator so it is converted to the correct type. ## ## TODO: use a regex for this instead of this convoluted logic, ## or even consider passing all args through eval() [which would ## be a minor change to the interface as string args would then ## always need to be quoted as strings within a map/list would ## now] if arg[0] == '{' or arg[0] == '[' or arg[0] == '"' or arg[0] == '\'' or arg == "True" or arg == "False" or \ ((arg.count('.') < 2 and (arg.count('-') == 0 or \ (arg.count('-') == 1 and arg[0] == '-')) and \ arg.replace('.','').replace('-','').isdigit())): args.append(eval(arg)) else: args.append(arg) obj = None try: self.lock.acquire() if displayId not in self.objects: print "Unknown ID" return obj = self.objects[displayId] finally: self.lock.release() object_id = obj.getObjectId(); if not object_id.isV2 and obj.getAgent().isV2: object_name = self._build_object_name(obj) object_id = ObjectId.create(object_id.agentName, object_name) self.session._sendMethodRequest(self.broker, obj.getClassKey(), object_id, methodName, args) def do_exit(self): pass #==================== # Sub-Command Methods #==================== def schemaSummary(self, package_filter=None): rows = [] packages = self.session.getPackages() for package in packages: if package_filter and package_filter != package: continue keys = self.session.getClasses(package) for key in keys: kind = "object" schema = self.session.getSchema(key) if schema: if schema.kind == SchemaClass.CLASS_KIND_EVENT: kind = "event" if schema.kind == SchemaClass.CLASS_KIND_TABLE: # # Don't display event schemata. This will be a future feature. # rows.append((package, key.getClassName(), kind)) self.disp.table("QMF Classes:", ("Package", "Name", "Kind"), rows) def schemaTable(self, text): packages = self.session.getPackages() if text in packages: self.schemaSummary(package_filter=text) for package in packages: keys = self.session.getClasses(package) for key in keys: if text == key.getClassName() or text == package + ":" + key.getClassName(): schema = self.session.getSchema(key) if schema.kind == SchemaClass.CLASS_KIND_TABLE: self.schemaObject(schema) else: self.schemaEvent(schema) def schemaObject(self, schema): rows = [] title = "Object Class: %s" % schema.__repr__() heads = ("Element", "Type", "Access", "Unit", "Notes", "Description") for prop in schema.getProperties(): notes = "" if prop.index : notes += "index " if prop.optional : notes += "optional " row = (prop.name, self.typeName(prop.type), self.accessName(prop.access), self.notNone(prop.unit), notes, self.notNone(prop.desc)) rows.append(row) for stat in schema.getStatistics(): row = (stat.name, self.typeName(stat.type), "", self.notNone(stat.unit), "", self.notNone(stat.desc)) rows.append(row) self.disp.table(title, heads, rows) for method in schema.methods: rows = [] heads = ("Argument", "Type", "Direction", "Unit", "Description") title = " Method: %s" % method.name for arg in method.arguments: row = (arg.name, self.typeName(arg.type), arg.dir, self.notNone(arg.unit), self.notNone(arg.desc)) rows.append(row) print self.disp.table(title, heads, rows) def schemaEvent(self, schema): rows = [] title = "Event Class: %s" % schema.__repr__() heads = ("Element", "Type", "Unit", "Description") for arg in schema.arguments: row = (arg.name, self.typeName(arg.type), self.notNone(arg.unit), self.notNone(arg.desc)) rows.append(row) self.disp.table(title, heads, rows) def listClasses(self): title = "Summary of Objects by Type:" heads = ("Package", "Class", "Active", "Deleted") rows = [] totals = {} try: self.lock.acquire() for dispId in self.objects: obj = self.objects[dispId] key = obj.getClassKey() index = (key.getPackageName(), key.getClassName()) if index in totals: stats = totals[index] else: stats = (0, 0) if obj.isDeleted(): stats = (stats[0], stats[1] + 1) else: stats = (stats[0] + 1, stats[1]) totals[index] = stats finally: self.lock.release() for index in totals: stats = totals[index] rows.append((index[0], index[1], stats[0], stats[1])) self.disp.table(title, heads, rows) def listObjects(self, tokens): ckeys = self.classKeysByToken(tokens[0]) show_deleted = True if len(tokens) > 1 and tokens[1] == 'active': show_deleted = None heads = ("ID", "Created", "Destroyed", "Index") rows = [] try: self.lock.acquire() for dispId in self.objects: obj = self.objects[dispId] if obj.getClassKey() in ckeys: utime, ctime, dtime = obj.getTimestamps() dtimestr = self.disp.timestamp(dtime) if dtime == 0: dtimestr = "-" if dtime == 0 or (dtime > 0 and show_deleted): row = (dispId, self.disp.timestamp(ctime), dtimestr, self.objectIndex(obj)) rows.append(row) finally: self.lock.release() self.disp.table("Object Summary:", heads, rows) def showObjectsByKey(self, key): pass def showObjectById(self, dispId): heads = ("Attribute", str(dispId)) rows = [] try: self.lock.acquire() if dispId in self.objects: obj = self.objects[dispId] caption = "Object of type: %r" % obj.getClassKey() for prop in obj.getProperties(): row = (prop[0].name, self.valueByType(prop[0].type, prop[1])) rows.append(row) for stat in obj.getStatistics(): row = (stat[0].name, self.valueByType(stat[0].type, stat[1])) rows.append(row) else: print "No object found with ID %d" % dispId return finally: self.lock.release() self.disp.table(caption, heads, rows) def classKeysByToken(self, token): """ Given a token, return a list of matching class keys (if found): token formats: : """ pname = None cname = None parts = token.split(':') if len(parts) == 1: cname = parts[0] elif len(parts) == 2: pname = parts[0] cname = parts[1] else: raise ValueError("Invalid Class Name: %s" % token) keys = [] packages = self.session.getPackages() for p in packages: if pname == None or pname == p: classes = self.session.getClasses(p) for key in classes: if key.getClassName() == cname: keys.append(key) return keys def typeName (self, typecode): """ Convert type-codes to printable strings """ if typecode == 1: return "uint8" elif typecode == 2: return "uint16" elif typecode == 3: return "uint32" elif typecode == 4: return "uint64" elif typecode == 5: return "bool" elif typecode == 6: return "short-string" elif typecode == 7: return "long-string" elif typecode == 8: return "abs-time" elif typecode == 9: return "delta-time" elif typecode == 10: return "reference" elif typecode == 11: return "boolean" elif typecode == 12: return "float" elif typecode == 13: return "double" elif typecode == 14: return "uuid" elif typecode == 15: return "field-table" elif typecode == 16: return "int8" elif typecode == 17: return "int16" elif typecode == 18: return "int32" elif typecode == 19: return "int64" elif typecode == 20: return "object" elif typecode == 21: return "list" elif typecode == 22: return "array" else: raise ValueError ("Invalid type code: %s" % str(typecode)) def valueByType(self, typecode, val): if type(val) is type(None): return "absent" if typecode == 1: return "%d" % val elif typecode == 2: return "%d" % val elif typecode == 3: return "%d" % val elif typecode == 4: return "%d" % val elif typecode == 6: return val elif typecode == 7: return val elif typecode == 8: return strftime("%c", gmtime(val / 1000000000)) elif typecode == 9: if val < 0: val = 0 sec = val / 1000000000 min = sec / 60 hour = min / 60 day = hour / 24 result = "" if day > 0: result = "%dd " % day if hour > 0 or result != "": result += "%dh " % (hour % 24) if min > 0 or result != "": result += "%dm " % (min % 60) result += "%ds" % (sec % 60) return result elif typecode == 10: return str(self.idRegistry.displayId(val)) elif typecode == 11: if val: return "True" else: return "False" elif typecode == 12: return "%f" % val elif typecode == 13: return "%f" % val elif typecode == 14: return "%r" % val elif typecode == 15: return "%r" % val elif typecode == 16: return "%d" % val elif typecode == 17: return "%d" % val elif typecode == 18: return "%d" % val elif typecode == 19: return "%d" % val elif typecode == 20: return "%r" % val elif typecode == 21: return "%r" % val elif typecode == 22: return "%r" % val else: raise ValueError ("Invalid type code: %s" % str(typecode)) def accessName (self, code): """ Convert element access codes to printable strings """ if code == '1': return "ReadCreate" elif code == '2': return "ReadWrite" elif code == '3': return "ReadOnly" else: raise ValueError ("Invalid access code: %s" % str(code)) def notNone (self, text): if text == None: return "" else: return text def objectIndex(self, obj): if obj._objectId.isV2: return obj._objectId.getObject() result = "" first = True props = obj.getProperties() for prop in props: if prop[0].index: if not first: result += "." result += self.valueByType(prop[0].type, prop[1]) first = None return result #===================== # Methods from Console #===================== def brokerConnected(self, broker): """ Invoked when a connection is established to a broker """ try: self.lock.acquire() self.connected = True finally: self.lock.release() if not self.first_connect: print "Broker connected:", broker self.first_connect = None def brokerDisconnected(self, broker): """ Invoked when the connection to a broker is lost """ try: self.lock.acquire() self.connected = None finally: self.lock.release() if not self.closing: print "Broker disconnected:", broker def objectProps(self, broker, record): """ Invoked when an object is updated. """ oid = record.getObjectId() dispId = self.idRegistry.displayId(oid) try: self.lock.acquire() if dispId in self.objects: self.objects[dispId].mergeUpdate(record) else: self.objects[dispId] = record finally: self.lock.release() def objectStats(self, broker, record): """ Invoked when an object is updated. """ oid = record.getObjectId() dispId = self.idRegistry.displayId(oid) try: self.lock.acquire() if dispId in self.objects: self.objects[dispId].mergeUpdate(record) finally: self.lock.release() def event(self, broker, event): """ Invoked when an event is raised. """ pass def methodResponse(self, broker, seq, response): print response #====================================================================================================== # IdRegistry #====================================================================================================== class IdRegistry(object): """ """ def __init__(self): self.next_display_id = 101 self.oid_to_display = {} self.display_to_oid = {} self.lock = Lock() def displayId(self, oid): try: self.lock.acquire() if oid in self.oid_to_display: return self.oid_to_display[oid] newId = self.next_display_id self.next_display_id += 1 self.oid_to_display[oid] = newId self.display_to_oid[newId] = oid return newId finally: self.lock.release() def objectId(self, displayId): try: self.lock.acquire() if displayId in self.display_to_oid: return self.display_to_oid[displayId] return None finally: self.lock.release() def getDisplayIds(self): result = [] for displayId in self.display_to_oid: result.append(str(displayId)) return result def getIdInfo(self, displayId): """ Given a display ID, return a tuple of (displayID, bootSequence/Durable, AgentBank/Name, ObjectName) """ oid = self.objectId(displayId) if oid == None: return (displayId, "?", "unknown", "unknown") bootSeq = oid.getSequence() if bootSeq == 0: bootSeq = '' agent = oid.getAgentBank() if agent == '0': agent = 'Broker' return (displayId, bootSeq, agent, oid.getObject()) #========================================================= # Option Parsing #========================================================= def parse_options( argv ): _usage = """qpid-tool [OPTIONS] [[/@][:]]""" parser = optparse.OptionParser(usage=_usage) parser.add_option("-b", "--broker", action="store", type="string", metavar="
", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:]") parser.add_option("--sasl-mechanism", action="store", type="string", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option("--ssl-certificate", action="store", type="string", metavar="", help="SSL certificate for client authentication") parser.add_option("--ssl-key", action="store", type="string", metavar="", help="Private key (if not contained in certificate)") opts, encArgs = parser.parse_args(args=argv) try: encoding = locale.getpreferredencoding() args = [a.decode(encoding) for a in encArgs] except: args = encArgs conn_options = {} broker_option = None if opts.broker: broker_option = opts.broker if opts.ssl_certificate: conn_options['ssl_certfile'] = opts.ssl_certificate if opts.ssl_key: if not opts.ssl_certificate: parser.error("missing '--ssl-certificate' (required by '--ssl-key')") conn_options['ssl_keyfile'] = opts.ssl_key if opts.sasl_mechanism: conn_options['mechanisms'] = opts.sasl_mechanism return broker_option, conn_options, args[1:] #========================================================= # Main Program #========================================================= # Get options specified on the command line broker_option, conn_options, cargs = parse_options(sys.argv) _host = "localhost" if broker_option is not None: _host = broker_option elif len(cargs) > 0: _host = cargs[0] # note: prior to supporting options, qpid-tool assumed positional parameters. # the first argument was assumed to be the broker address. The second argument # was optional, and, if supplied, was assumed to be the path to the # certificate. To preserve backward compatibility, accept the certificate if # supplied via the second parameter. # if 'ssl_certfile' not in conn_options: if len(cargs) > 1: conn_options['ssl_certfile'] = cargs[1] disp = Display() # Attempt to make a connection to the target broker try: data = QmfData(disp, _host, conn_options) except Exception, e: if str(e).find("Exchange not found") != -1: print "Management not enabled on broker: Use '-m yes' option on broker startup." else: print "Failed: %s - %s" % (e.__class__.__name__, e) sys.exit(1) # Instantiate the CLI interpreter and launch it. cli = Mcli(data, disp) print("Management Tool for QPID") try: cli.cmdloop() except KeyboardInterrupt: print print "Exiting..." except Exception, e: print "Failed: %s - %s" % (e.__class__.__name__, e) # alway attempt to cleanup broker resources data.close()