#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os import optparse import sys import socket from types import * from cmd import Cmd from shlex import split from threading import Lock from time import strftime, gmtime from qpid.disp import Display from qpid.peer import Closed from qmf.console import Session, Console, SchemaClass, ObjectId class Mcli(Cmd): """ Management Command Interpreter """ def __init__(self, dataObject, dispObject): Cmd.__init__(self) self.dataObject = dataObject self.dispObject = dispObject self.dataObject.setCli(self) self.prompt = "qpid: " def emptyline(self): pass def setPromptMessage(self, p): if p == None: self.prompt = "qpid: " else: self.prompt = "qpid[%s]: " % p def do_help(self, data): print "Management Tool for QPID" print print "Commands:" print " agents - Print a list of the known Agents" print " list - Print summary of existing objects by class" print " list - Print list of objects of the specified class" print " list active - Print list of non-deleted objects of the specified class" # print " show - Print contents of all objects of specified class" # print " show active - Print contents of all non-deleted objects of specified class" print " show - Print contents of an object (infer className)" # print " show - Print contents of one or more objects" # print " list is space-separated, ranges may be specified (i.e. 1004-1010)" print " call [] - Invoke a method on an object" print " schema - Print summary of object classes seen on the target" print " schema - Print details of an object class" print " set time-format short - Select short timestamp format (default)" print " set time-format long - Select long timestamp format" print " quit or ^D - Exit the program" print def complete_set(self, text, line, begidx, endidx): """ Command completion for the 'set' command """ tokens = split(line) if len(tokens) < 2: return ["time-format "] elif tokens[1] == "time-format": if len(tokens) == 2: return ["long", "short"] elif len(tokens) == 3: if "long".find(text) == 0: return ["long"] elif "short".find(text) == 0: return ["short"] elif "time-format".find(text) == 0: return ["time-format "] return [] def do_set(self, data): tokens = split(data) try: if tokens[0] == "time-format": self.dispObject.do_setTimeFormat(tokens[1]) except: pass def complete_schema(self, text, line, begidx, endidx): tokens = split(line) if len(tokens) > 2: return [] return self.dataObject.classCompletions(text) def do_schema(self, data): try: self.dataObject.do_schema(data) except Exception, e: print "Exception in do_schema: %r" % e def do_agents(self, data): try: self.dataObject.do_agents(data) except Exception, e: print "Exception in do_agents: %r" % e def do_id(self, data): try: self.dataObject.do_id(data) except Exception, e: print "Exception in do_id: %r" % e def complete_list(self, text, line, begidx, endidx): tokens = split(line) if len(tokens) > 2: return [] return self.dataObject.classCompletions(text) def do_list(self, data): try: self.dataObject.do_list(data) except Exception, e: print "Exception in do_list: %r" % e def do_show(self, data): try: self.dataObject.do_show(data) except Exception, e: print "Exception in do_show: %r" % e def do_call(self, data): try: self.dataObject.do_call(data) except Exception, e: print "Exception in do_call: %r", e def do_EOF(self, data): print "quit" try: self.dataObject.do_exit() except: pass return True def do_quit(self, data): try: self.dataObject.do_exit() except: pass return True def postcmd(self, stop, line): return stop def postloop(self): print "Exiting..." self.dataObject.close() #====================================================================================================== # QmfData #====================================================================================================== class QmfData(Console): """ """ def __init__(self, disp, url): self.disp = disp self.url = url self.session = Session(self, manageConnections=True) self.broker = self.session.addBroker(self.url) self.lock = Lock() self.connected = None self.closing = None self.first_connect = True self.cli = None self.idRegistry = IdRegistry() self.objects = {} #======================= # Methods to support CLI #======================= def setCli(self, cli): self.cli = cli def close(self): try: self.closing = True if self.session and self.broker: self.session.delBroker(self.broker) except: pass # we're shutting down - ignore any errors def classCompletions(self, text): pass def do_schema(self, data): if data == "": self.schemaSummary() else: self.schemaTable(data) def do_agents(self, data): agents = self.session.getAgents() rows = [] for agent in agents: version = 1 if agent.isV2: version = 2 rows.append(("%d.%s" % (agent.getBrokerBank(), agent.getAgentBank()), agent.label, agent.epoch, version)) self.disp.table("QMF Agents:", ("Agent Name", "Label", "Epoch", "QMF Version"), rows) def do_id(self, data): tokens = data.split() for token in tokens: if not token.isdigit(): print "Value %s is non-numeric" % token return title = "Translation of Display IDs:" heads = ('DisplayID', 'Epoch', 'Agent', 'ObjectName') if len(tokens) == 0: tokens = self.idRegistry.getDisplayIds() rows = [] for token in tokens: rows.append(self.idRegistry.getIdInfo(int(token))) self.disp.table(title, heads, rows) def do_list(self, data): tokens = data.split() if len(tokens) == 0: self.listClasses() else: self.listObjects(tokens) def do_show(self, data): tokens = data.split() if len(tokens) == 0: print "Missing Class or ID" return keys = self.classKeysByToken(tokens[0]) if keys: self.showObjectsByKey(keys) elif tokens[0].isdigit(): self.showObjectById(int(tokens[0])) def do_call(self, data): tokens = data.split() if len(tokens) < 2: print "Not enough arguments supplied" return displayId = long(tokens[0]) methodName = tokens[1] args = [] for arg in tokens[2:]: ## ## If the argument is a map, list, boolean, integer, or floating (one decimal point), ## run it through the Python evaluator so it is converted to the correct type. ## ## TODO: use a regex for this instead of this convoluted logic, ## or even consider passing all args through eval() [which would ## be a minor change to the interface as string args would then ## always need to be quoted as strings within a map/list would ## now] if arg[0] == '{' or arg[0] == '[' or arg[0] == '"' or arg[0] == '\'' or arg == "True" or arg == "False" or \ ((arg.count('.') < 2 and (arg.count('-') == 0 or \ (arg.count('-') == 1 and arg[0] == '-')) and \ arg.replace('.','').replace('-','').isdigit())): args.append(eval(arg)) else: args.append(arg) obj = None try: self.lock.acquire() if displayId not in self.objects: print "Unknown ID" return obj = self.objects[displayId] finally: self.lock.release() object_id = obj.getObjectId(); if not object_id.isV2 and obj.getAgent().isV2: object_key = ",".join([str(v) for p, v in obj.getProperties() if p.name != "vhostRef" and p.index == 1]) class_key = obj.getClassKey(); object_name = class_key.getPackageName() + ":" + class_key.getClassName() + ":" + object_key object_id = ObjectId.create(object_id.agentName, object_name) self.session._sendMethodRequest(self.broker, obj.getClassKey(), object_id, methodName, args) def do_exit(self): pass #==================== # Sub-Command Methods #==================== def schemaSummary(self, package_filter=None): rows = [] packages = self.session.getPackages() for package in packages: if package_filter and package_filter != package: continue keys = self.session.getClasses(package) for key in keys: kind = "object" schema = self.session.getSchema(key) if schema: if schema.kind == SchemaClass.CLASS_KIND_EVENT: kind = "event" if schema.kind == SchemaClass.CLASS_KIND_TABLE: # # Don't display event schemata. This will be a future feature. # rows.append((package, key.getClassName(), kind)) self.disp.table("QMF Classes:", ("Package", "Name", "Kind"), rows) def schemaTable(self, text): packages = self.session.getPackages() if text in packages: self.schemaSummary(package_filter=text) for package in packages: keys = self.session.getClasses(package) for key in keys: if text == key.getClassName() or text == package + ":" + key.getClassName(): schema = self.session.getSchema(key) if schema.kind == SchemaClass.CLASS_KIND_TABLE: self.schemaObject(schema) else: self.schemaEvent(schema) def schemaObject(self, schema): rows = [] title = "Object Class: %s" % schema.__repr__() heads = ("Element", "Type", "Access", "Unit", "Notes", "Description") for prop in schema.getProperties(): notes = "" if prop.index : notes += "index " if prop.optional : notes += "optional " row = (prop.name, self.typeName(prop.type), self.accessName(prop.access), self.notNone(prop.unit), notes, self.notNone(prop.desc)) rows.append(row) for stat in schema.getStatistics(): row = (stat.name, self.typeName(stat.type), "", self.notNone(stat.unit), "", self.notNone(stat.desc)) rows.append(row) self.disp.table(title, heads, rows) for method in schema.methods: rows = [] heads = ("Argument", "Type", "Direction", "Unit", "Description") title = " Method: %s" % method.name for arg in method.arguments: row = (arg.name, self.typeName(arg.type), arg.dir, self.notNone(arg.unit), self.notNone(arg.desc)) rows.append(row) print self.disp.table(title, heads, rows) def schemaEvent(self, schema): rows = [] title = "Event Class: %s" % schema.__repr__() heads = ("Element", "Type", "Unit", "Description") for arg in schema.arguments: row = (arg.name, self.typeName(arg.type), self.notNone(arg.unit), self.notNone(arg.desc)) rows.append(row) self.disp.table(title, heads, rows) def listClasses(self): title = "Summary of Objects by Type:" heads = ("Package", "Class", "Active", "Deleted") rows = [] totals = {} try: self.lock.acquire() for dispId in self.objects: obj = self.objects[dispId] key = obj.getClassKey() index = (key.getPackageName(), key.getClassName()) if index in totals: stats = totals[index] else: stats = (0, 0) if obj.isDeleted(): stats = (stats[0], stats[1] + 1) else: stats = (stats[0] + 1, stats[1]) totals[index] = stats finally: self.lock.release() for index in totals: stats = totals[index] rows.append((index[0], index[1], stats[0], stats[1])) self.disp.table(title, heads, rows) def listObjects(self, tokens): ckeys = self.classKeysByToken(tokens[0]) show_deleted = True if len(tokens) > 1 and tokens[1] == 'active': show_deleted = None heads = ("ID", "Created", "Destroyed", "Index") rows = [] try: self.lock.acquire() for dispId in self.objects: obj = self.objects[dispId] if obj.getClassKey() in ckeys: utime, ctime, dtime = obj.getTimestamps() dtimestr = self.disp.timestamp(dtime) if dtime == 0: dtimestr = "-" if dtime == 0 or (dtime > 0 and show_deleted): row = (dispId, self.disp.timestamp(ctime), dtimestr, self.objectIndex(obj)) rows.append(row) finally: self.lock.release() self.disp.table("Object Summary:", heads, rows) def showObjectsByKey(self, key): pass def showObjectById(self, dispId): heads = ("Attribute", str(dispId)) rows = [] try: self.lock.acquire() if dispId in self.objects: obj = self.objects[dispId] caption = "Object of type: %r" % obj.getClassKey() for prop in obj.getProperties(): row = (prop[0].name, self.valueByType(prop[0].type, prop[1])) rows.append(row) for stat in obj.getStatistics(): row = (stat[0].name, self.valueByType(stat[0].type, stat[1])) rows.append(row) else: print "No object found with ID %d" % dispId finally: self.lock.release() self.disp.table(caption, heads, rows) def classKeysByToken(self, token): """ Given a token, return a list of matching class keys (if found): token formats: : """ pname = None cname = None parts = token.split(':') if len(parts) == 1: cname = parts[0] elif len(parts) == 2: pname = parts[0] cname = parts[1] else: raise ValueError("Invalid Class Name: %s" % token) keys = [] packages = self.session.getPackages() for p in packages: if pname == None or pname == p: classes = self.session.getClasses(p) for key in classes: if key.getClassName() == cname: keys.append(key) return keys def typeName (self, typecode): """ Convert type-codes to printable strings """ if typecode == 1: return "uint8" elif typecode == 2: return "uint16" elif typecode == 3: return "uint32" elif typecode == 4: return "uint64" elif typecode == 5: return "bool" elif typecode == 6: return "short-string" elif typecode == 7: return "long-string" elif typecode == 8: return "abs-time" elif typecode == 9: return "delta-time" elif typecode == 10: return "reference" elif typecode == 11: return "boolean" elif typecode == 12: return "float" elif typecode == 13: return "double" elif typecode == 14: return "uuid" elif typecode == 15: return "field-table" elif typecode == 16: return "int8" elif typecode == 17: return "int16" elif typecode == 18: return "int32" elif typecode == 19: return "int64" elif typecode == 20: return "object" elif typecode == 21: return "list" elif typecode == 22: return "array" else: raise ValueError ("Invalid type code: %s" % str(typecode)) def valueByType(self, typecode, val): if type(val) is type(None): return "absent" if typecode == 1: return "%d" % val elif typecode == 2: return "%d" % val elif typecode == 3: return "%d" % val elif typecode == 4: return "%d" % val elif typecode == 6: return val elif typecode == 7: return val elif typecode == 8: return strftime("%c", gmtime(val / 1000000000)) elif typecode == 9: if val < 0: val = 0 sec = val / 1000000000 min = sec / 60 hour = min / 60 day = hour / 24 result = "" if day > 0: result = "%dd " % day if hour > 0 or result != "": result += "%dh " % (hour % 24) if min > 0 or result != "": result += "%dm " % (min % 60) result += "%ds" % (sec % 60) return result elif typecode == 10: return str(self.idRegistry.displayId(val)) elif typecode == 11: if val: return "True" else: return "False" elif typecode == 12: return "%f" % val elif typecode == 13: return "%f" % val elif typecode == 14: return "%r" % val elif typecode == 15: return "%r" % val elif typecode == 16: return "%d" % val elif typecode == 17: return "%d" % val elif typecode == 18: return "%d" % val elif typecode == 19: return "%d" % val elif typecode == 20: return "%r" % val elif typecode == 21: return "%r" % val elif typecode == 22: return "%r" % val else: raise ValueError ("Invalid type code: %s" % str(typecode)) def accessName (self, code): """ Convert element access codes to printable strings """ if code == '1': return "ReadCreate" elif code == '2': return "ReadWrite" elif code == '3': return "ReadOnly" else: raise ValueError ("Invalid access code: %s" % str(code)) def notNone (self, text): if text == None: return "" else: return text def objectIndex(self, obj): if obj._objectId.isV2: return obj._objectId.getObject() result = "" first = True props = obj.getProperties() for prop in props: if prop[0].index: if not first: result += "." result += self.valueByType(prop[0].type, prop[1]) first = None return result #===================== # Methods from Console #===================== def brokerConnected(self, broker): """ Invoked when a connection is established to a broker """ try: self.lock.acquire() self.connected = True finally: self.lock.release() if not self.first_connect: print "Broker connected:", broker self.first_connect = None def brokerDisconnected(self, broker): """ Invoked when the connection to a broker is lost """ try: self.lock.acquire() self.connected = None finally: self.lock.release() if not self.closing: print "Broker disconnected:", broker def objectProps(self, broker, record): """ Invoked when an object is updated. """ oid = record.getObjectId() dispId = self.idRegistry.displayId(oid) try: self.lock.acquire() if dispId in self.objects: self.objects[dispId].mergeUpdate(record) else: self.objects[dispId] = record finally: self.lock.release() def objectStats(self, broker, record): """ Invoked when an object is updated. """ oid = record.getObjectId() dispId = self.idRegistry.displayId(oid) try: self.lock.acquire() if dispId in self.objects: self.objects[dispId].mergeUpdate(record) finally: self.lock.release() def event(self, broker, event): """ Invoked when an event is raised. """ pass def methodResponse(self, broker, seq, response): print response #====================================================================================================== # IdRegistry #====================================================================================================== class IdRegistry(object): """ """ def __init__(self): self.next_display_id = 101 self.oid_to_display = {} self.display_to_oid = {} self.lock = Lock() def displayId(self, oid): try: self.lock.acquire() if oid in self.oid_to_display: return self.oid_to_display[oid] newId = self.next_display_id self.next_display_id += 1 self.oid_to_display[oid] = newId self.display_to_oid[newId] = oid return newId finally: self.lock.release() def objectId(self, displayId): try: self.lock.acquire() if displayId in self.display_to_oid: return self.display_to_oid[displayId] return None finally: self.lock.release() def getDisplayIds(self): result = [] for displayId in self.display_to_oid: result.append(str(displayId)) return result def getIdInfo(self, displayId): """ Given a display ID, return a tuple of (displayID, bootSequence/Durable, AgentBank/Name, ObjectName) """ oid = self.objectId(displayId) if oid == None: return (displayId, "?", "unknown", "unknown") bootSeq = oid.getSequence() if bootSeq == 0: bootSeq = '' agent = oid.getAgentBank() if agent == '0': agent = 'Broker' return (displayId, bootSeq, agent, oid.getObject()) def Usage(): print "Usage: qpid-tool [[/@][:]]" print #========================================================= # Main Program #========================================================= # Get host name and port if specified on the command line cargs = sys.argv[1:] _host = "localhost" if len(cargs) > 0: _host = cargs[0] if _host[0] == '-': Usage() if _host != '-h' and _host != "--help": print "qpid-tool: error: no such option:", _host sys.exit(1) disp = Display() # Attempt to make a connection to the target broker try: data = QmfData(disp, _host) except Exception, e: if str(e).find("Exchange not found") != -1: print "Management not enabled on broker: Use '-m yes' option on broker startup." else: print "Failed: %s - %s" % (e.__class__.__name__, e) sys.exit(1) # Instantiate the CLI interpreter and launch it. cli = Mcli(data, disp) print("Management Tool for QPID") try: cli.cmdloop() except KeyboardInterrupt: print print "Exiting..." except Exception, e: print "Failed: %s - %s" % (e.__class__.__name__, e) # alway attempt to cleanup broker resources data.close()