#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os from optparse import OptionParser, OptionGroup import sys import locale import socket import re from qpid.messaging import Connection home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) sys.path.append(os.path.join(home, "python")) from qpidtoollibs import BrokerAgent from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong class Config: def __init__(self): self._host = "localhost" self._connTimeout = 10 self._types = "" self._limit = 50 self._increasing = False self._sortcol = None self._sasl_mechanism = None self._ha_admin = False config = Config() def OptionsAndArguments(argv): """ Set global variables for options, return arguments """ global config parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]") group1 = OptionGroup(parser, "General Options") group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="", help="URL of the broker to query") group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="", help="Maximum time to wait for broker connection (in seconds)") group1.add_option("--sasl-mechanism", action="store", type="string", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") parser.add_option_group(group1) group2 = OptionGroup(parser, "Display Options") group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show") group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show") group2.add_option("-S", "--sort-by", metavar="", help="Sort by column name") group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") group2.add_option("-L", "--limit", type="int", default=50, metavar="", help="Limit output to n rows") parser.add_option_group(group2) opts, args = parser.parse_args(args=argv) if not opts.show: parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help") config._types = opts.show config._sortcol = opts.sort_by config._host = opts.broker config._connTimeout = opts.timeout config._increasing = opts.increasing config._limit = opts.limit config._sasl_mechanism = opts.sasl_mechanism config._ha_admin = opts.ha_admin return args class IpAddr: def __init__(self, text): if text.find("@") != -1: tokens = text.split("@") text = tokens[1] if text.find(":") != -1: tokens = text.split(":") text = tokens[0] self.port = int(tokens[1]) else: self.port = 5672 self.dottedQuad = socket.gethostbyname(text) nums = self.dottedQuad.split(".") self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) def bestAddr(self, addrPortList): bestDiff = 0xFFFFFFFFL bestAddr = None for addrPort in addrPortList: diff = IpAddr(addrPort[0]).addr ^ self.addr if diff < bestDiff: bestDiff = diff bestAddr = addrPort return bestAddr class BrokerManager: def __init__(self): self.brokerName = None self.connection = None self.broker = None self.cluster = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl client_properties={} if config._ha_admin: client_properties["qpid.ha-admin"] = 1 self.connection = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties) self.broker = BrokerAgent(self.connection) def Disconnect(self): """ Release any allocated brokers. Ignore any failures as the tool is shutting down. """ try: connection.close() except: pass def _getCluster(self): packages = self.qmf.getPackages() if "org.apache.qpid.cluster" not in packages: return None clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) if len(clusters) == 0: print "Clustering is installed but not enabled on the broker." return None self.cluster = clusters[0] def _getHostList(self, urlList): hosts = [] hostAddr = IpAddr(config._host) for url in urlList: if url.find("amqp:") != 0: raise Exception("Invalid URL 1") url = url[5:] addrs = str(url).split(",") addrList = [] for addr in addrs: tokens = addr.split(":") if len(tokens) != 3: raise Exception("Invalid URL 2") addrList.append((tokens[1], tokens[2])) # Find the address in the list that is most likely to be in the same subnet as the address # with which we made the original QMF connection. This increases the probability that we will # be able to reach the cluster member. best = hostAddr.bestAddr(addrList) bestUrl = best[0] + ":" + best[1] hosts.append(bestUrl) return hosts def displayBroker(self): disp = Display(prefix=" ") heads = [] heads.append(Header('uptime', Header.DURATION)) heads.append(Header('connections', Header.COMMAS)) heads.append(Header('sessions', Header.COMMAS)) heads.append(Header('exchanges', Header.COMMAS)) heads.append(Header('queues', Header.COMMAS)) rows = [] broker = self.broker.getBroker() connections = self.getConnectionMap() sessions = self.getSessionMap() exchanges = self.getExchangeMap() queues = self.getQueueMap() row = (broker.getUpdateTime() - broker.getCreateTime(), len(connections), len(sessions), len(exchanges), len(queues)) rows.append(row) disp.formattedTable('Broker Summary:', heads, rows) if 'queueCount' not in broker.values: return print heads = [] heads.append(Header('Statistic')) heads.append(Header('Messages', Header.COMMAS)) heads.append(Header('Bytes', Header.COMMAS)) rows = [] rows.append(['queue-depth', broker.msgDepth, broker.byteDepth]) rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues]) rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues]) rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues]) rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues]) rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues]) rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues]) rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth]) rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues]) rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues]) rows.append(['acquires', broker.acquires, None]) rows.append(['releases', broker.releases, None]) rows.append(['discards-no-route', broker.discardsNoRoute, None]) rows.append(['discards-ttl-expired', broker.discardsTtl, None]) rows.append(['discards-limit-overflow', broker.discardsOverflow, None]) rows.append(['discards-ring-overflow', broker.discardsRing, None]) rows.append(['discards-lvq-replace', broker.discardsLvq, None]) rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None]) rows.append(['discards-purged', broker.discardsPurge, None]) rows.append(['reroutes', broker.reroutes, None]) rows.append(['abandoned', broker.abandoned, None]) rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None]) disp.formattedTable('Aggregate Broker Statistics:', heads, rows) def displayConn(self): disp = Display(prefix=" ") heads = [] heads.append(Header('client-addr')) heads.append(Header('cproc')) heads.append(Header('cpid')) heads.append(Header('auth')) heads.append(Header('connected', Header.DURATION)) heads.append(Header('idle', Header.DURATION)) heads.append(Header('msgIn', Header.KMG)) heads.append(Header('msgOut', Header.KMG)) rows = [] connections = self.broker.getAllConnections() broker = self.broker.getBroker() for conn in connections: row = [] row.append(conn.address) row.append(conn.remoteProcessName) row.append(conn.remotePid) row.append(conn.authIdentity) row.append(broker.getUpdateTime() - conn.getCreateTime()) row.append(broker.getUpdateTime() - conn.getUpdateTime()) row.append(conn.msgsFromClient) row.append(conn.msgsToClient) rows.append(row) title = "Connections" if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displaySession(self): disp = Display(prefix=" ") def displayExchange(self): disp = Display(prefix=" ") heads = [] heads.append(Header("exchange")) heads.append(Header("type")) heads.append(Header("dur", Header.Y)) heads.append(Header("bind", Header.KMG)) heads.append(Header("msgIn", Header.KMG)) heads.append(Header("msgOut", Header.KMG)) heads.append(Header("msgDrop", Header.KMG)) heads.append(Header("byteIn", Header.KMG)) heads.append(Header("byteOut", Header.KMG)) heads.append(Header("byteDrop", Header.KMG)) rows = [] exchanges = self.broker.getAllExchanges() for ex in exchanges: row = [] row.append(ex.name) row.append(ex.type) row.append(ex.durable) row.append(ex.bindingCount) row.append(ex.msgReceives) row.append(ex.msgRoutes) row.append(ex.msgDrops) row.append(ex.byteReceives) row.append(ex.byteRoutes) row.append(ex.byteDrops) rows.append(row) title = "Exchanges" if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displayQueues(self): disp = Display(prefix=" ") heads = [] heads.append(Header("queue")) heads.append(Header("dur", Header.Y)) heads.append(Header("autoDel", Header.Y)) heads.append(Header("excl", Header.Y)) heads.append(Header("msg", Header.KMG)) heads.append(Header("msgIn", Header.KMG)) heads.append(Header("msgOut", Header.KMG)) heads.append(Header("bytes", Header.KMG)) heads.append(Header("bytesIn", Header.KMG)) heads.append(Header("bytesOut", Header.KMG)) heads.append(Header("cons", Header.KMG)) heads.append(Header("bind", Header.KMG)) rows = [] queues = self.broker.getAllQueues() for q in queues: row = [] row.append(q.name) row.append(q.durable) row.append(q.autoDelete) row.append(q.exclusive) row.append(q.msgDepth) row.append(q.msgTotalEnqueues) row.append(q.msgTotalDequeues) row.append(q.byteDepth) row.append(q.byteTotalEnqueues) row.append(q.byteTotalDequeues) row.append(q.consumerCount) row.append(q.bindingCount) rows.append(row) title = "Queues" if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displayQueue(self, name): queue = self.broker.getQueue(name) if not queue: print "Queue '%s' not found" % name return disp = Display(prefix=" ") heads = [] heads.append(Header('Name')) heads.append(Header('Durable', Header.YN)) heads.append(Header('AutoDelete', Header.YN)) heads.append(Header('Exclusive', Header.YN)) heads.append(Header('FlowStopped', Header.YN)) heads.append(Header('FlowStoppedCount', Header.COMMAS)) heads.append(Header('Consumers', Header.COMMAS)) heads.append(Header('Bindings', Header.COMMAS)) rows = [] rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive, queue.flowStopped, queue.flowStoppedCount, queue.consumerCount, queue.bindingCount]) disp.formattedTable("Properties:", heads, rows) print heads = [] heads.append(Header('Property')) heads.append(Header('Value')) rows = [] rows.append(['arguments', queue.arguments]) rows.append(['alt-exchange', queue.altExchange]) disp.formattedTable("Optional Properties:", heads, rows) print heads = [] heads.append(Header('Statistic')) heads.append(Header('Messages', Header.COMMAS)) heads.append(Header('Bytes', Header.COMMAS)) rows = [] rows.append(['queue-depth', queue.msgDepth, queue.byteDepth]) rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues]) rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues]) rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues]) rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues]) rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues]) rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues]) rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth]) rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues]) rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues]) rows.append(['acquires', queue.acquires, None]) rows.append(['releases', queue.releases, None]) rows.append(['discards-ttl-expired', queue.discardsTtl, None]) rows.append(['discards-limit-overflow', queue.discardsOverflow, None]) rows.append(['discards-ring-overflow', queue.discardsRing, None]) rows.append(['discards-lvq-replace', queue.discardsLvq, None]) rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None]) rows.append(['discards-purged', queue.discardsPurge, None]) rows.append(['reroutes', queue.reroutes, None]) disp.formattedTable("Statistics:", heads, rows) def displaySubscriptions(self): disp = Display(prefix=" ") heads = [] heads.append(Header("subscr")) heads.append(Header("queue")) heads.append(Header("conn")) heads.append(Header("procName")) heads.append(Header("procId")) heads.append(Header("browse", Header.Y)) heads.append(Header("acked", Header.Y)) heads.append(Header("excl", Header.Y)) heads.append(Header("creditMode")) heads.append(Header("delivered", Header.KMG)) rows = [] subscriptions = self.broker.getAllSubscriptions() sessions = self.getSessionMap() connections = self.getConnectionMap() for s in subscriptions: row = [] try: row.append(s.name) row.append(s.queueRef) session = sessions[s.sessionRef] connection = connections[session.connectionRef] row.append(connection.address) row.append(connection.remoteProcessName) row.append(connection.remotePid) row.append(s.browsing) row.append(s.acknowledged) row.append(s.exclusive) row.append(s.creditMode) row.append(s.delivered) rows.append(row) except: pass title = "Subscriptions" if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displayMemory(self): disp = Display(prefix=" ") heads = [Header('Statistic'), Header('Value', Header.COMMAS)] rows = [] memory = self.broker.getMemory() for k,v in memory.values.items(): if k != 'name': rows.append([k, v]) disp.formattedTable('Broker Memory Statistics:', heads, rows) def displayAcl(self): acl = self.broker.getAcl() if not acl: print "ACL Policy Module is not installed" return disp = Display(prefix=" ") heads = [Header('Statistic'), Header('Value')] rows = [] rows.append(['policy-file', acl.policyFile]) rows.append(['enforcing', YN(acl.enforcingAcl)]) rows.append(['has-transfer-acls', YN(acl.transferAcl)]) rows.append(['last-acl-load', TimeLong(acl.lastAclLoad)]) rows.append(['acl-denials', Commas(acl.aclDenyCount)]) disp.formattedTable('ACL Policy Statistics:', heads, rows) def getExchangeMap(self): exchanges = self.broker.getAllExchanges() emap = {} for e in exchanges: emap[e.name] = e return emap def getQueueMap(self): queues = self.broker.getAllQueues() qmap = {} for q in queues: qmap[q.name] = q return qmap def getSessionMap(self): sessions = self.broker.getAllSessions() smap = {} for s in sessions: smap[s.name] = s return smap def getConnectionMap(self): connections = self.broker.getAllConnections() cmap = {} for c in connections: cmap[c.address] = c return cmap def displayMain(self, names, main): if main == 'g': self.displayBroker() elif main == 'c': self.displayConn() elif main == 's': self.displaySession() elif main == 'e': self.displayExchange() elif main == 'q': if len(names) >= 1: self.displayQueue(names[0]) else: self.displayQueues() elif main == 'u': self.displaySubscriptions() elif main == 'm': self.displayMemory() elif main == 'acl': self.displayAcl() def display(self, names): self.displayMain(names, config._types) def main(argv=None): args = OptionsAndArguments(argv) bm = BrokerManager() try: bm.SetBroker(config._host, config._sasl_mechanism) bm.display(args) bm.Disconnect() return 0 except KeyboardInterrupt: print except Exception,e: print "Failed: %s - %s" % (e.__class__.__name__, e) bm.Disconnect() # try to deallocate brokers return 1 if __name__ == "__main__": sys.exit(main())