#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os import getopt import sys import locale import socket import re from qmf.console import Session, Console from qpid.disp import Display, Header, Sorter _host = "localhost" _connTimeout = 10 _types = "" _limit = 50 _increasing = False _sortcol = None pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") def Usage (): print "Usage: qpid-stat [OPTIONS] [broker-addr]" print print " broker-addr is in the form: [username/password@] hostname | ip-address [:]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print print "General Options:" print " --timeout seconds (10) Maximum time to wait for broker connection" # print " -n [--numeric] Don't resolve names" print print "Display Options:" print print " -b Show Brokers" print " -c Show Connections" # print " -s Show Sessions" print " -e Show Exchanges" print " -q Show Queues" print print " -S [--sort-by] COLNAME Sort by column name" print " -I [--increasing] Sort by increasing value (default = decreasing)" print " -L [--limit] NUM Limit output to NUM rows (default = 50)" print sys.exit (1) class IpAddr: def __init__(self, text): if text.find("@") != -1: tokens = text.split("@") text = tokens[1] if text.find(":") != -1: tokens = text.split(":") text = tokens[0] self.port = int(tokens[1]) else: self.port = 5672 self.dottedQuad = socket.gethostbyname(text) nums = self.dottedQuad.split(".") self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) def bestAddr(self, addrPortList): bestDiff = 0xFFFFFFFFL bestAddr = None for addrPort in addrPortList: diff = IpAddr(addrPort[0]).addr ^ self.addr if diff < bestDiff: bestDiff = diff bestAddr = addrPort return bestAddr class Broker(object): def __init__(self, qmf, broker): self.broker = broker agents = qmf.getAgents() for a in agents: if a.getAgentBank() == 0: self.brokerAgent = a bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] self.currentTime = bobj.getTimestamps()[0] try: self.uptime = bobj.uptime except: self.uptime = 0 self.connections = {} self.sessions = {} self.exchanges = {} self.queues = {} package = "org.apache.qpid.broker" list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent) for conn in list: if pattern.match(conn.address): self.connections[conn.getObjectId()] = conn list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent) for sess in list: if sess.connectionRef in self.connections: self.sessions[sess.getObjectId()] = sess list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent) for exchange in list: self.exchanges[exchange.getObjectId()] = exchange list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent) for queue in list: self.queues[queue.getObjectId()] = queue def getName(self): return self.broker.getUrl() def getCurrentTime(self): return self.currentTime def getUptime(self): return self.uptime class BrokerManager(Console): def __init__(self): self.brokerName = None self.qmf = None self.broker = None self.brokers = [] self.cluster = None def SetBroker(self, brokerUrl): self.url = brokerUrl self.qmf = Session() self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) agents = self.qmf.getAgents() for a in agents: if a.getAgentBank() == 0: self.brokerAgent = a def Disconnect(self): if self.broker: self.qmf.delBroker(self.broker) def _getCluster(self): packages = self.qmf.getPackages() if "org.apache.qpid.cluster" not in packages: return None clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) if len(clusters) == 0: print "Clustering is installed but not enabled on the broker." return None self.cluster = clusters[0] def _getHostList(self, urlList): hosts = [] hostAddr = IpAddr(_host) for url in urlList: if url.find("amqp:") != 0: raise Exception("Invalid URL 1") url = url[5:] addrs = str(url).split(",") addrList = [] for addr in addrs: tokens = addr.split(":") if len(tokens) != 3: raise Exception("Invalid URL 2") addrList.append((tokens[1], tokens[2])) # Find the address in the list that is most likely to be in the same subnet as the address # with which we made the original QMF connection. This increases the probability that we will # be able to reach the cluster member. best = hostAddr.bestAddr(addrList) bestUrl = best[0] + ":" + best[1] hosts.append(bestUrl) return hosts def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): if len(subs) == 0: return this = subs[0] remaining = subs[1:] newindent = indent + " " if this == 'b': pass elif this == 'c': if broker: for oid in broker.connections: iconn = broker.connections[oid] self.printConnSub(indent, broker.getName(), iconn) self.displaySubs(remaining, newindent, broker=broker, conn=iconn, sess=sess, exchange=exchange, queue=queue) elif this == 's': pass elif this == 'e': pass elif this == 'q': pass print def displayBroker(self, subs): disp = Display(prefix=" ") heads = [] heads.append(Header('broker')) heads.append(Header('cluster')) heads.append(Header('uptime', Header.DURATION)) heads.append(Header('conn', Header.KMG)) heads.append(Header('sess', Header.KMG)) heads.append(Header('exch', Header.KMG)) heads.append(Header('queue', Header.KMG)) rows = [] for broker in self.brokers: if self.cluster: ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) else: ctext = "" row = (broker.getName(), ctext, broker.getUptime(), len(broker.connections), len(broker.sessions), len(broker.exchanges), len(broker.queues)) rows.append(row) title = "Brokers" if _sortcol: sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displayConn(self, subs): disp = Display(prefix=" ") heads = [] if self.cluster: heads.append(Header('broker')) heads.append(Header('client-addr')) heads.append(Header('cproc')) heads.append(Header('cpid')) heads.append(Header('auth')) heads.append(Header('connected', Header.DURATION)) heads.append(Header('idle', Header.DURATION)) heads.append(Header('msgIn', Header.KMG)) heads.append(Header('msgOut', Header.KMG)) rows = [] for broker in self.brokers: for oid in broker.connections: conn = broker.connections[oid] row = [] if self.cluster: row.append(broker.getName()) row.append(conn.address) row.append(conn.remoteProcessName) row.append(conn.remotePid) row.append(conn.authIdentity) row.append(broker.getCurrentTime() - conn.getTimestamps()[1]) idle = broker.getCurrentTime() - conn.getTimestamps()[0] row.append(broker.getCurrentTime() - conn.getTimestamps()[0]) row.append(conn.framesFromClient) row.append(conn.framesToClient) rows.append(row) title = "Connections" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName if _sortcol: sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displaySession(self, subs): disp = Display(prefix=" ") def displayExchange(self, subs): disp = Display(prefix=" ") heads = [] if self.cluster: heads.append(Header('broker')) heads.append(Header("exchange")) heads.append(Header("type")) heads.append(Header("dur", Header.Y)) heads.append(Header("bind", Header.KMG)) heads.append(Header("msgIn", Header.KMG)) heads.append(Header("msgOut", Header.KMG)) heads.append(Header("msgDrop", Header.KMG)) heads.append(Header("byteIn", Header.KMG)) heads.append(Header("byteOut", Header.KMG)) heads.append(Header("byteDrop", Header.KMG)) rows = [] for broker in self.brokers: for oid in broker.exchanges: ex = broker.exchanges[oid] row = [] if self.cluster: row.append(broker.getName()) row.append(ex.name) row.append(ex.type) row.append(ex.durable) row.append(ex.bindingCount) row.append(ex.msgReceives) row.append(ex.msgRoutes) row.append(ex.msgDrops) row.append(ex.byteReceives) row.append(ex.byteRoutes) row.append(ex.byteDrops) rows.append(row) title = "Exchanges" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName if _sortcol: sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displayQueue(self, subs): disp = Display(prefix=" ") heads = [] if self.cluster: heads.append(Header('broker')) heads.append(Header("queue")) heads.append(Header("dur", Header.Y)) heads.append(Header("autoDel", Header.Y)) heads.append(Header("excl", Header.Y)) heads.append(Header("msg", Header.KMG)) heads.append(Header("msgIn", Header.KMG)) heads.append(Header("msgOut", Header.KMG)) heads.append(Header("bytes", Header.KMG)) heads.append(Header("bytesIn", Header.KMG)) heads.append(Header("bytesOut", Header.KMG)) heads.append(Header("cons", Header.KMG)) heads.append(Header("bind", Header.KMG)) rows = [] for broker in self.brokers: for oid in broker.queues: q = broker.queues[oid] row = [] if self.cluster: row.append(broker.getName()) row.append(q.name) row.append(q.durable) row.append(q.autoDelete) row.append(q.exclusive) row.append(q.msgDepth) row.append(q.msgTotalEnqueues) row.append(q.msgTotalDequeues) row.append(q.byteDepth) row.append(q.byteTotalEnqueues) row.append(q.byteTotalDequeues) row.append(q.consumerCount) row.append(q.bindingCount) rows.append(row) title = "Queues" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName if _sortcol: sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) dispRows = sorter.getSorted() else: dispRows = rows disp.formattedTable(title, heads, dispRows) def displayMain(self, main, subs): if main == 'b': self.displayBroker(subs) elif main == 'c': self.displayConn(subs) elif main == 's': self.displaySession(subs) elif main == 'e': self.displayExchange(subs) elif main == 'q': self.displayQueue(subs) def display(self): self._getCluster() if self.cluster: memberList = self.cluster.members.split(";") hostList = self._getHostList(memberList) self.qmf.delBroker(self.broker) self.broker = None if _host.find("@") > 0: authString = _host.split("@")[0] + "@" else: authString = "" for host in hostList: b = self.qmf.addBroker(authString + host, _connTimeout) self.brokers.append(Broker(self.qmf, b)) else: self.brokers.append(Broker(self.qmf, self.broker)) self.displayMain(_types[0], _types[1:]) ## ## Main Program ## try: longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing", "timeout=") (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts) except: Usage() try: encoding = locale.getpreferredencoding() cargs = [a.decode(encoding) for a in encArgs] except: cargs = encArgs for opt in optlist: if opt[0] == "--timeout": _connTimeout = int(opt[1]) if _connTimeout == 0: _connTimeout = None elif opt[0] == "-n" or opt[0] == "--numeric": _numeric = True elif opt[0] == "-S" or opt[0] == "--sort-by": _sortcol = opt[1] elif opt[0] == "-I" or opt[0] == "--increasing": _increasing = True elif opt[0] == "-L" or opt[0] == "--limit": _limit = int(opt[1]) elif len(opt[0]) == 2: char = opt[0][1] if "bcseq".find(char) != -1: _types += char else: Usage() else: Usage() if len(_types) == 0: Usage() nargs = len(cargs) bm = BrokerManager() if nargs == 1: _host = cargs[0] try: bm.SetBroker(_host) bm.display() except KeyboardInterrupt: print except Exception,e: print "Failed: %s - %s" % (e.__class__.__name__, e) sys.exit(1) bm.Disconnect()