#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os from optparse import OptionParser, OptionGroup from time import sleep ### debug import sys import locale import socket import re from qmf.console import Session, Console from qpid.disp import Display, Header, Sorter class Config: def __init__(self): self._host = "localhost" self._connTimeout = 10 self._types = "" self._limit = 50 self._increasing = False self._sortcol = None self._cluster_detail = False self._sasl_mechanism = None config = Config() def OptionsAndArguments(argv): """ Set global variables for options, return arguments """ global config parser = OptionParser(usage="usage: %prog [options] BROKER", description="Example: $ qpid-stat -q broker-host:10000") group1 = OptionGroup(parser, "General Options") group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="", help="Maximum time to wait for broker connection (in seconds)") group1.add_option("--sasl-mechanism", action="store", type="string", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option_group(group1) group2 = OptionGroup(parser, "Display Options") group2.add_option("-b", "--broker", help="Show Brokers", action="store_const", const="b", dest="show") group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") group2.add_option("-S", "--sort-by", metavar="", help="Sort by column name") group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") group2.add_option("-L", "--limit", default=50, metavar="", help="Limit output to n rows") group2.add_option("-C", "--cluster", action="store_true", default=False, help="Display per-broker cluster detail.") parser.add_option_group(group2) opts, args = parser.parse_args(args=argv) if not opts.show: parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help") config._types = opts.show config._sortcol = opts.sort_by config._connTimeout = opts.timeout config._increasing = opts.increasing config._limit = opts.limit config._cluster_detail = opts.cluster config._sasl_mechanism = opts.sasl_mechanism if args: config._host = args[0] return args class IpAddr: def __init__(self, text): if text.find("@") != -1: tokens = text.split("@") text = tokens[1] if text.find(":") != -1: tokens = text.split(":") text = tokens[0] self.port = int(tokens[1]) else: self.port = 5672 self.dottedQuad = socket.gethostbyname(text) nums = self.dottedQuad.split(".") self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) def bestAddr(self, addrPortList): bestDiff = 0xFFFFFFFFL bestAddr = None for addrPort in addrPortList: diff = IpAddr(addrPort[0]).addr ^ self.addr if diff < bestDiff: bestDiff = diff bestAddr = addrPort return bestAddr class Broker(object): def __init__(self, qmf, broker): self.broker = broker agents = qmf.getAgents() for a in agents: if a.getAgentBank() == '0': self.brokerAgent = a bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] self.currentTime = bobj.getTimestamps()[0] try: self.uptime = bobj.uptime except: self.uptime = 0 self.connections = {} self.sessions = {} self.exchanges = {} self.queues = {} self.subscriptions = {} package = "org.apache.qpid.broker" list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent) for conn in list: if not conn.shadow: self.connections[conn.getObjectId()] = conn list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent) for sess in list: if sess.connectionRef in self.connections: self.sessions[sess.getObjectId()] = sess list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent) for exchange in list: self.exchanges[exchange.getObjectId()] = exchange list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent) for queue in list: self.queues[queue.getObjectId()] = queue list = qmf.getObjects(_class="subscription", _package=package, _agent=self.brokerAgent) for subscription in list: self.subscriptions[subscription.getObjectId()] = subscription def getName(self): return self.broker.getUrl() def getCurrentTime(self): return self.currentTime def getUptime(self): return self.uptime class BrokerManager(Console): def __init__(self): self.brokerName = None self.qmf = None self.broker = None self.brokers = [] self.cluster = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl self.qmf = Session() self.mechanism = mechanism self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism) agents = self.qmf.getAgents() for a in agents: if a.getAgentBank() == '0': self.brokerAgent = a def Disconnect(self): """ Release any allocated brokers. Ignore any failures as the tool is shutting down. """ try: if self.broker: self.qmf.delBroker(self.broker) else: for b in self.brokers: self.qmf.delBroker(b.broker) except: pass def _getCluster(self): packages = self.qmf.getPackages() if "org.apache.qpid.cluster" not in packages: return None clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) if len(clusters) == 0: print "Clustering is installed but not enabled on the broker." return None self.cluster = clusters[0] def _getHostList(self, urlList): hosts = [] hostAddr = IpAddr(config._host) for url in urlList: if url.find("amqp:") != 0: raise Exception("Invalid URL 1") url = url[5:] addrs = str(url).split(",") addrList = [] for addr in addrs: tokens = addr.split(":") if len(tokens) != 3: raise Exception("Invalid URL 2") addrList.append((tokens[1], tokens[2])) # Find the address in the list that is most likely to be in the same subnet as the address # with which we made the original QMF connection. This increases the probability that we will # be able to reach the cluster member. best = hostAddr.bestAddr(addrList) bestUrl = best[0] + ":" + best[1] hosts.append(bestUrl) return hosts def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): if len(subs) == 0: return this = subs[0] remaining = subs[1:] newindent = indent + " " if this == 'b': pass elif this == 'c': if broker: for oid in broker.connections: iconn = broker.connections[oid] self.printConnSub(indent, broker.getName(), iconn) self.displaySubs(remaining, newindent, broker=broker, conn=iconn, sess=sess, exchange=exchange, queue=queue) elif this == 's': pass elif this == 'e': pass elif this == 'q': pass print def displayBroker(self, subs): disp = Display(prefix=" ") heads = [] heads.append(Header('broker')) heads.append(Header('cluster')) heads.append(Header('uptime', Header.DURATION)) heads.append(Header('conn', Header.KMG)) heads.append(Header('sess', Header.KMG)) heads.append(Header('exch', Header.KMG)) heads.append(Header('queue', Header.KMG)) rows = [] for broker in self.brokers: if self.cluster: ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) else: ctext = "" row = (broker.getName(), ctext, broker.getUptime(), len(broker.connections), len(broker.sessions), len(broker.exchanges), len(broker.queues)) rows.append(row) title = "Brokers" if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displayConn(self, subs): disp = Display(prefix=" ") heads = [] if self.cluster: heads.append(Header('broker')) heads.append(Header('client-addr')) heads.append(Header('cproc')) heads.append(Header('cpid')) heads.append(Header('auth')) heads.append(Header('connected', Header.DURATION)) heads.append(Header('idle', Header.DURATION)) heads.append(Header('msgIn', Header.KMG)) heads.append(Header('msgOut', Header.KMG)) rows = [] for broker in self.brokers: for oid in broker.connections: conn = broker.connections[oid] row = [] if self.cluster: row.append(broker.getName()) row.append(conn.address) row.append(conn.remoteProcessName) row.append(conn.remotePid) row.append(conn.authIdentity) row.append(broker.getCurrentTime() - conn.getTimestamps()[1]) idle = broker.getCurrentTime() - conn.getTimestamps()[0] row.append(broker.getCurrentTime() - conn.getTimestamps()[0]) row.append(conn.framesFromClient) row.append(conn.framesToClient) rows.append(row) title = "Connections" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displaySession(self, subs): disp = Display(prefix=" ") def displayExchange(self, subs): disp = Display(prefix=" ") heads = [] if self.cluster: heads.append(Header('broker')) heads.append(Header("exchange")) heads.append(Header("type")) heads.append(Header("dur", Header.Y)) heads.append(Header("bind", Header.KMG)) heads.append(Header("msgIn", Header.KMG)) heads.append(Header("msgOut", Header.KMG)) heads.append(Header("msgDrop", Header.KMG)) heads.append(Header("byteIn", Header.KMG)) heads.append(Header("byteOut", Header.KMG)) heads.append(Header("byteDrop", Header.KMG)) rows = [] for broker in self.brokers: for oid in broker.exchanges: ex = broker.exchanges[oid] row = [] if self.cluster: row.append(broker.getName()) row.append(ex.name) row.append(ex.type) row.append(ex.durable) row.append(ex.bindingCount) row.append(ex.msgReceives) row.append(ex.msgRoutes) row.append(ex.msgDrops) row.append(ex.byteReceives) row.append(ex.byteRoutes) row.append(ex.byteDrops) rows.append(row) title = "Exchanges" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displayQueue(self, subs): disp = Display(prefix=" ") heads = [] if self.cluster: heads.append(Header('broker')) heads.append(Header("queue")) heads.append(Header("dur", Header.Y)) heads.append(Header("autoDel", Header.Y)) heads.append(Header("excl", Header.Y)) heads.append(Header("msg", Header.KMG)) heads.append(Header("msgIn", Header.KMG)) heads.append(Header("msgOut", Header.KMG)) heads.append(Header("bytes", Header.KMG)) heads.append(Header("bytesIn", Header.KMG)) heads.append(Header("bytesOut", Header.KMG)) heads.append(Header("cons", Header.KMG)) heads.append(Header("bind", Header.KMG)) rows = [] for broker in self.brokers: for oid in broker.queues: q = broker.queues[oid] row = [] if self.cluster: row.append(broker.getName()) row.append(q.name) row.append(q.durable) row.append(q.autoDelete) row.append(q.exclusive) row.append(q.msgDepth) row.append(q.msgTotalEnqueues) row.append(q.msgTotalDequeues) row.append(q.byteDepth) row.append(q.byteTotalEnqueues) row.append(q.byteTotalDequeues) row.append(q.consumerCount) row.append(q.bindingCount) rows.append(row) title = "Queues" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displaySubscriptions(self, subs): disp = Display(prefix=" ") heads = [] if self.cluster: heads.append(Header('broker')) heads.append(Header("subscription")) heads.append(Header("queue")) heads.append(Header("connection")) heads.append(Header("processName")) heads.append(Header("processId")) heads.append(Header("browsing", Header.Y)) heads.append(Header("acknowledged", Header.Y)) heads.append(Header("exclusive", Header.Y)) heads.append(Header("creditMode")) heads.append(Header("delivered", Header.KMG)) rows = [] for broker in self.brokers: for oid in broker.subscriptions: s = broker.subscriptions[oid] row = [] try: if self.cluster: row.append(broker.getName()) row.append(s.name) row.append(self.qmf.getObjects(_objectId=s.queueRef)[0].name) connectionRef = self.qmf.getObjects(_objectId=s.sessionRef)[0].connectionRef row.append(self.qmf.getObjects(_objectId=connectionRef)[0].address) row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remoteProcessName) row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remotePid) row.append(s.browsing) row.append(s.acknowledged) row.append(s.exclusive) row.append(s.creditMode) row.append(s.delivered) rows.append(row) except: pass title = "Subscriptions" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displayMain(self, main, subs): if main == 'b': self.displayBroker(subs) elif main == 'c': self.displayConn(subs) elif main == 's': self.displaySession(subs) elif main == 'e': self.displayExchange(subs) elif main == 'q': self.displayQueue(subs) elif main == 'u': self.displaySubscriptions(subs) def display(self): if config._cluster_detail or config._types[0] == 'b': # always show cluster detail when dumping broker stats self._getCluster() if self.cluster: memberList = self.cluster.members.split(";") hostList = self._getHostList(memberList) self.qmf.delBroker(self.broker) self.broker = None if config._host.find("@") > 0: authString = config._host.split("@")[0] + "@" else: authString = "" for host in hostList: b = self.qmf.addBroker(authString + host, config._connTimeout) self.brokers.append(Broker(self.qmf, b)) else: self.brokers.append(Broker(self.qmf, self.broker)) self.displayMain(config._types[0], config._types[1:]) def main(argv=None): args = OptionsAndArguments(argv) bm = BrokerManager() try: bm.SetBroker(config._host, config._sasl_mechanism) bm.display() bm.Disconnect() return 0 except KeyboardInterrupt: print except Exception,e: print "Failed: %s - %s" % (e.__class__.__name__, e) bm.Disconnect() # try to deallocate brokers return 1 if __name__ == "__main__": sys.exit(main())