#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os import getopt import sys import locale from qmf.console import Session _recursive = False _host = "localhost" _connTimeout = 10 _altern_ex = None _passive = False _durable = False _clusterDurable = False _if_empty = True _if_unused = True _fileCount = 8 _fileSize = 24 _maxQueueSize = None _maxQueueCount = None _limitPolicy = None _order = None _msgSequence = False _ive = False _eventGeneration = None FILECOUNT = "qpid.file_count" FILESIZE = "qpid.file_size" MAX_QUEUE_SIZE = "qpid.max_size" MAX_QUEUE_COUNT = "qpid.max_count" POLICY_TYPE = "qpid.policy_type" CLUSTER_DURABLE = "qpid.persist_last_node" LVQ = "qpid.last_value_queue" LVQNB = "qpid.last_value_queue_no_browse" MSG_SEQUENCE = "qpid.msg_sequence" IVE = "qpid.ive" QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" def Usage (): print "Usage: qpid-config [OPTIONS]" print " qpid-config [OPTIONS] exchanges [filter-string]" print " qpid-config [OPTIONS] queues [filter-string]" print " qpid-config [OPTIONS] add exchange [AddExchangeOptions]" print " qpid-config [OPTIONS] del exchange " print " qpid-config [OPTIONS] add queue [AddQueueOptions]" print " qpid-config [OPTIONS] del queue [DelQueueOptions]" print " qpid-config [OPTIONS] bind [binding-key]" print " qpid-config [OPTIONS] unbind [binding-key]" print print "Options:" print " --timeout seconds (10) Maximum time to wait for broker connection" print " -b [ --bindings ] Show bindings in queue or exchange list" print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker" print " broker-addr is in the form: [username/password@] hostname | ip-address [:]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print print "Add Queue Options:" print " --alternate-exchange [name of the alternate exchange]" print " The alternate-exchange field specifies how messages on this queue should" print " be treated when they are rejected by a subscriber, or when they are" print " orphaned by queue deletion. When present, rejected or orphaned messages" print " MUST be routed to the alternate-exchange. In all cases the messages MUST" print " be removed from the queue." print " --passive Do not actually change the broker state (queue will not be created)" print " --durable Queue is durable" print " --cluster-durable Queue becomes durable if there is only one functioning cluster node" print " --file-count N (8) Number of files in queue's persistence journal" print " --file-size N (24) File size in pages (64Kib/page)" print " --max-queue-size N Maximum in-memory queue size as bytes" print " --max-queue-count N Maximum in-memory queue size as a number of messages" print " --limit-policy [none | reject | flow-to-disk | ring | ring-strict]" print " Action taken when queue limit is reached:" print " none (default) - Use broker's default policy" print " reject - Reject enqueued messages" print " flow-to-disk - Page messages to disk" print " ring - Replace oldest unacquired message with new" print " ring-strict - Replace oldest message, reject if oldest is acquired" print " --order [fifo | lvq | lvq-no-browse]" print " Set queue ordering policy:" print " fifo (default) - First in, first out" print " lvq - Last Value Queue ordering, allows queue browsing" print " lvq-no-browse - Last Value Queue ordering, browsing clients may lose data" print " --generate-queue-events N" print " If set to 1, every enqueue will generate an event that can be processed by" print " registered listeners (e.g. for replication). If set to 2, events will be" print " generated for enqueues and dequeues" print print "Del Queue Options:" print " --force Force delete of queue even if it's currently used or it's not empty" print " --force-if-not-empty Force delete of queue even if it's not empty" print " --force-if-used Force delete of queue even if it's currently used" print print "Add Exchange values:" print " direct Direct exchange for point-to-point communication" print " fanout Fanout exchange for broadcast communication" print " topic Topic exchange that routes messages using binding keys with wildcards" print " headers Headers exchange that matches header fields against the binding keys" print print "Add Exchange Options:" print " --alternate-exchange [name of the alternate exchange]" print " In the event that a message cannot be routed, this is the name of the exchange to" print " which the message will be sent. Messages transferred using message.transfer will" print " be routed to the alternate-exchange only if they are sent with the \"none\"" print " accept-mode, and the discard-unroutable delivery property is set to false, and" print " there is no queue to route to for the given message according to the bindings" print " on this exchange." print " --passive Do not actually change the broker state (exchange will not be created)" print " --durable Exchange is durable" print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header" print " with a value that increments for each message forwarded." print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference" print " to the last message forwarded and enqueuing that message to newly bound" print " queues." print sys.exit (1) class BrokerManager: def __init__ (self): self.brokerName = None self.qmf = None self.broker = None def SetBroker (self, brokerUrl): self.url = brokerUrl self.qmf = Session() self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) agents = self.qmf.getAgents() for a in agents: if a.getAgentBank() == 0: self.brokerAgent = a def Disconnect(self): if self.broker: self.qmf.delBroker(self.broker) def Overview (self): exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) print "Total Exchanges: %d" % len (exchanges) etype = {} for ex in exchanges: if ex.type not in etype: etype[ex.type] = 1 else: etype[ex.type] = etype[ex.type] + 1 for typ in etype: print "%15s: %d" % (typ, etype[typ]) print print " Total Queues: %d" % len (queues) _durable = 0 for queue in queues: if queue.durable: _durable = _durable + 1 print " durable: %d" % _durable print " non-durable: %d" % (len (queues) - _durable) def ExchangeList (self, filter): exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) caption1 = "Type " caption2 = "Exchange Name" maxNameLen = len(caption2) for ex in exchanges: if self.match(ex.name, filter): if len(ex.name) > maxNameLen: maxNameLen = len(ex.name) print "%s%-*s Attributes" % (caption1, maxNameLen, caption2) line = "" for i in range(((maxNameLen + len(caption1)) / 5) + 5): line += "=====" print line for ex in exchanges: if self.match (ex.name, filter): print "%-10s%-*s " % (ex.type, maxNameLen, ex.name), args = ex.arguments if ex.durable: print "--durable", if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", if IVE in args and args[IVE] == 1: print "--ive", if ex.altExchange: print "--alternate-exchange=%s" % ex._altExchange_.name, print def ExchangeListRecurse (self, filter): exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) for ex in exchanges: if self.match (ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: if bind.exchangeRef == ex.getObjectId(): qname = "" queue = self.findById (queues, bind.queueRef) if queue != None: qname = queue.name print " bind [%s] => %s" % (bind.bindingKey, qname) def QueueList (self, filter): queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) caption = "Queue Name" maxNameLen = len(caption) for q in queues: if self.match (q.name, filter): if len(q.name) > maxNameLen: maxNameLen = len(q.name) print "%-*s Attributes" % (maxNameLen, caption) line = "" for i in range((maxNameLen / 5) + 5): line += "=====" print line for q in queues: if self.match (q.name, filter): print "%-*s " % (maxNameLen, q.name), args = q.arguments if q.durable: print "--durable", if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", if q.autoDelete: print "auto-del", if q.exclusive: print "excl", if FILESIZE in args: print "--file-size=%d" % args[FILESIZE], if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT], if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE], if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT], if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), if LVQ in args and args[LVQ] == 1: print "--order lvq", if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION], if q.altExchange: print "--alternate-exchange=%s" % q._altExchange_.name, print def QueueListRecurse (self, filter): exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) for queue in queues: if self.match (queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: if bind.queueRef == queue.getObjectId(): ename = "" ex = self.findById (exchanges, bind.exchangeRef) if ex != None: ename = ex.name if ename == "": ename = "''" print " bind [%s] => %s" % (bind.bindingKey, ename) def AddExchange (self, args): if len (args) < 2: Usage () etype = args[0] ename = args[1] declArgs = {} if _msgSequence: declArgs[MSG_SEQUENCE] = 1 if _ive: declArgs[IVE] = 1 if _altern_ex != None: self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) else: self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=_passive, durable=_durable, arguments=declArgs) def DelExchange (self, args): if len (args) < 1: Usage () ename = args[0] self.broker.getAmqpSession().exchange_delete (exchange=ename) def AddQueue (self, args): if len (args) < 1: Usage () qname = args[0] declArgs = {} if _durable: declArgs[FILECOUNT] = _fileCount declArgs[FILESIZE] = _fileSize if _maxQueueSize: declArgs[MAX_QUEUE_SIZE] = _maxQueueSize if _maxQueueCount: declArgs[MAX_QUEUE_COUNT] = _maxQueueCount if _limitPolicy: if _limitPolicy == "none": pass elif _limitPolicy == "reject": declArgs[POLICY_TYPE] = "reject" elif _limitPolicy == "flow-to-disk": declArgs[POLICY_TYPE] = "flow_to_disk" elif _limitPolicy == "ring": declArgs[POLICY_TYPE] = "ring" elif _limitPolicy == "ring-strict": declArgs[POLICY_TYPE] = "ring_strict" if _clusterDurable: declArgs[CLUSTER_DURABLE] = 1 if _order: if _order == "fifo": pass elif _order == "lvq": declArgs[LVQ] = 1 elif _order == "lvq-no-browse": declArgs[LVQNB] = 1 if _eventGeneration: declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration if _altern_ex != None: self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) else: self.broker.getAmqpSession().queue_declare (queue=qname, passive=_passive, durable=_durable, arguments=declArgs) def DelQueue (self, args): if len (args) < 1: Usage () qname = args[0] self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=_if_empty, if_unused=_if_unused) def Bind (self, args): if len (args) < 2: Usage () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key) def Unbind (self, args): if len (args) < 2: Usage () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) def findById (self, items, id): for item in items: if item.getObjectId() == id: return item return None def match (self, name, filter): if filter == "": return True if name.find (filter) == -1: return False return True def YN (bool): if bool: return 'Y' return 'N' ## ## Main Program ## try: longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=", "file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=", "order=", "sequence", "ive", "generate-queue-events=", "force", "force-if-not-empty", "force_if_used", "alternate-exchange=", "passive", "timeout=") (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts) except: Usage () try: encoding = locale.getpreferredencoding() cargs = [a.decode(encoding) for a in encArgs] except: cargs = encArgs for opt in optlist: if opt[0] == "-b" or opt[0] == "--bindings": _recursive = True if opt[0] == "-a" or opt[0] == "--broker-addr": _host = opt[1] if opt[0] == "--timeout": _connTimeout = int(opt[1]) if _connTimeout == 0: _connTimeout = None if opt[0] == "--alternate-exchange": _altern_ex = opt[1] if opt[0] == "--passive": _passive = True if opt[0] == "--durable": _durable = True if opt[0] == "--cluster-durable": _clusterDurable = True if opt[0] == "--file-count": _fileCount = int (opt[1]) if opt[0] == "--file-size": _fileSize = int (opt[1]) if opt[0] == "--max-queue-size": _maxQueueSize = int (opt[1]) if opt[0] == "--max-queue-count": _maxQueueCount = int (opt[1]) if opt[0] == "--limit-policy": _limitPolicy = opt[1] if _limitPolicy not in ("none", "reject", "flow-to-disk", "ring", "ring-strict"): print "Error: Invalid --limit-policy argument" sys.exit(1) if opt[0] == "--order": _order = opt[1] if _order not in ("fifo", "lvq", "lvq-no-browse"): print "Error: Invalid --order argument" sys.exit(1) if opt[0] == "--sequence": _msgSequence = True if opt[0] == "--ive": _ive = True if opt[0] == "--generate-queue-events": _eventGeneration = int (opt[1]) if opt[0] == "--force": _if_empty = False _if_unused = False if opt[0] == "--force-if-not-empty": _if_empty = False if opt[0] == "--force-if-used": _if_unused = False nargs = len (cargs) bm = BrokerManager () try: bm.SetBroker(_host) if nargs == 0: bm.Overview () else: cmd = cargs[0] modifier = "" if nargs > 1: modifier = cargs[1] if cmd == "exchanges": if _recursive: bm.ExchangeListRecurse (modifier) else: bm.ExchangeList (modifier) elif cmd == "queues": if _recursive: bm.QueueListRecurse (modifier) else: bm.QueueList (modifier) elif cmd == "add": if modifier == "exchange": bm.AddExchange (cargs[2:]) elif modifier == "queue": bm.AddQueue (cargs[2:]) else: Usage () elif cmd == "del": if modifier == "exchange": bm.DelExchange (cargs[2:]) elif modifier == "queue": bm.DelQueue (cargs[2:]) else: Usage () elif cmd == "bind": bm.Bind (cargs[1:]) elif cmd == "unbind": bm.Unbind (cargs[1:]) else: Usage () except KeyboardInterrupt: print except Exception,e: print "Failed: %s: %s" % (e.__class__.__name__, e) sys.exit(1) bm.Disconnect()