#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from optparse import OptionParser, OptionGroup, IndentedHelpFormatter import sys import socket import os import locale from qmf.console import Session, BrokerURL usage = """ Usage: qpid-route [OPTIONS] dynamic add [tag] [exclude-list] [mechanism] qpid-route [OPTIONS] dynamic del qpid-route [OPTIONS] route add [tag] [exclude-list] [mechanism] qpid-route [OPTIONS] route del qpid-route [OPTIONS] queue add [mechanism] qpid-route [OPTIONS] queue del qpid-route [OPTIONS] route list [] qpid-route [OPTIONS] route flush [] qpid-route [OPTIONS] route map [] qpid-route [OPTIONS] link add [mechanism] qpid-route [OPTIONS] link del qpid-route [OPTIONS] link list []""" description = """ ADDRESS syntax: [username/password@] hostname ip-address [:]""" def Usage(): print usage class Config: def __init__(self): self._verbose = False self._quiet = False self._durable = False self._dellink = False self._srclocal = False self._transport = "tcp" self._ack = 0 self._credit = 0xFFFFFFFF # unlimited self._connTimeout = 10 self._conn_options = {} config = Config() class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings """ def format_usage(self, usage): return usage def format_description(self, description): if description: return description + "\n" else: return "" def OptionsAndArguments(argv): parser = OptionParser(usage=usage, description=description, formatter=JHelpFormatter()) parser.add_option("--timeout", action="store", type="int", default=10, metavar="", help="Maximum time to wait for broker connection (in seconds)") parser.add_option("-v", "--verbose", action="store_true", help="Verbose output") parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings") parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable") parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link") parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)") parser.add_option("--ack", action="store", type="int", metavar="", help="Acknowledge transfers over the bridge in batches of N") parser.add_option("--credit", action="store", type="int", default=0xFFFFFFFF, metavar="", help="Maximum number of messages a sender can have outstanding (0=unlimited)") parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="", help="Transport to use for links, defaults to tcp") parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option("--ssl-certificate", action="store", type="string", metavar="", help="Client SSL certificate (PEM Format)") parser.add_option("--ssl-key", action="store", type="string", metavar="", help="Client SSL private key (PEM Format)") parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") opts, encArgs = parser.parse_args(args=argv) try: encoding = locale.getpreferredencoding() args = [a.decode(encoding) for a in encArgs] except: args = encArgs if opts.timeout: config._connTimeout = opts.timeout if config._connTimeout == 0: config._connTimeout = None if opts.verbose: config._verbose = True if opts.quiet: config._quiet = True if opts.durable: config._durable = True if opts.del_empty_link: config._dellink = True if opts.src_local: config._srclocal = True if opts.transport: config._transport = opts.transport if opts.ha_admin: config._conn_options['client_properties'] = {'qpid.ha-admin' : 1} if opts.ack: config._ack = opts.ack config._credit = opts.credit if opts.client_sasl_mechanism: config._conn_options['mechanisms'] = opts.client_sasl_mechanism if opts.ssl_certificate: config._conn_options['ssl_certfile'] = opts.ssl_certificate if opts.ssl_key: if not opts.ssl_certificate: parser.error("missing '--ssl-certificate' (required by '--ssl-key')") config._conn_options['ssl_keyfile'] = opts.ssl_key return args class RouteManager: def __init__(self, localBroker): self.brokerList = {} self.local = BrokerURL(localBroker) self.remote = None self.qmf = Session() self.broker = self.qmf.addBroker(localBroker, config._connTimeout, **config._conn_options) self.broker._waitForStable() self.agent = self.broker.getBrokerAgent() def disconnect(self): try: if self.broker: self.qmf.delBroker(self.broker) self.broker = None while len(self.brokerList): b = self.brokerList.popitem() if b[0] != self.local.name(): self.qmf.delBroker(b[1]) except: pass # ignore errors while shutting down def getLink(self): links = self.agent.getObjects(_class="link") for link in links: if self.remote.match(link.host, link.port): return link return None def addLink(self, remoteBroker, interbroker_mechanism=""): self.remote = BrokerURL(remoteBroker) if self.local.match(self.remote.host, self.remote.port): raise Exception("Linking broker to itself is not permitted") brokers = self.agent.getObjects(_class="broker") broker = brokers[0] link = self.getLink() if link == None: res = broker.connect(self.remote.host, self.remote.port, config._durable, interbroker_mechanism, self.remote.authName or "", self.remote.authPass or "", config._transport) if config._verbose: print "Connect method returned:", res.status, res.text def delLink(self, remoteBroker): self.remote = BrokerURL(remoteBroker) brokers = self.agent.getObjects(_class="broker") broker = brokers[0] link = self.getLink() if link == None: raise Exception("Link not found") res = link.close() if config._verbose: print "Close method returned:", res.status, res.text def listLinks(self): links = self.agent.getObjects(_class="link") if len(links) == 0: print "No Links Found" else: print print "Host Port Transport Durable State Last Error" print "=============================================================================" for link in links: print "%-16s%-8d%-13s%c %-18s%s" % \ (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError) def mapRoutes(self): print print "Finding Linked Brokers:" self.brokerList[self.local.name()] = self.broker print " %s... Ok" % self.local added = True while added: added = False links = self.qmf.getObjects(_class="link") for link in links: url = BrokerURL(host=link.host, port=link.port) if url.name() not in self.brokerList: print " %s..." % url.name(), try: url.authName = self.local.authName url.authPass = self.local.authPass b = self.qmf.addBroker(url, config._connTimeout, **config._conn_options) self.brokerList[url.name()] = b added = True print "Ok" except Exception, e: print e print print "Dynamic Routes:" bridges = self.qmf.getObjects(_class="bridge", dynamic=True) fedExchanges = [] for bridge in bridges: if bridge.src not in fedExchanges: fedExchanges.append(bridge.src) if len(fedExchanges) == 0: print " none found" print for ex in fedExchanges: print " Exchange %s:" % ex pairs = [] for bridge in bridges: if bridge.src == ex: link = bridge._linkRef_ fromUrl = BrokerURL(host=link.host, port=link.port) toUrl = bridge.getBroker().getUrl() found = False for pair in pairs: if pair.matches(fromUrl, toUrl): found = True if not found: pairs.append(RoutePair(fromUrl, toUrl)) for pair in pairs: print " %s" % pair print print "Static Routes:" bridges = self.qmf.getObjects(_class="bridge", dynamic=False) if len(bridges) == 0: print " none found" print for bridge in bridges: link = bridge._linkRef_ fromUrl = "%s:%s" % (link.host, link.port) toUrl = bridge.getBroker().getUrl() leftType = "ex" rightType = "ex" if bridge.srcIsLocal: arrow = "=>" left = bridge.src right = bridge.dest if bridge.srcIsQueue: leftType = "queue" else: arrow = "<=" left = bridge.dest right = bridge.src if bridge.srcIsQueue: rightType = "queue" if bridge.srcIsQueue: print " %s(%s=%s) %s %s(%s=%s)" % \ (toUrl, leftType, left, arrow, fromUrl, rightType, right) else: print " %s(%s=%s) %s %s(%s=%s) key=%s" % \ (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) print while len(self.brokerList): b = self.brokerList.popitem() if b[0] != self.local.name(): self.qmf.delBroker(b[1]) def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, interbroker_mechanism="", dynamic=False): if dynamic and config._srclocal: raise Exception("--src-local is not permitted on dynamic routes") self.addLink(remoteBroker, interbroker_mechanism) link = self.getLink() if link == None: raise Exception("Link failed to create") bridges = self.agent.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: if not config._quiet: raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) sys.exit(0) if config._verbose: print "Creating inter-broker binding..." res = link.bridge(config._durable, exchange, exchange, routingKey, tag, excludes, False, config._srclocal, dynamic, config._ack, credit=config._credit) if res.status != 0: raise Exception(res.text) if config._verbose: print "Bridge method returned:", res.status, res.text def addQueueRoute(self, remoteBroker, interbroker_mechanism, exchange, queue ): self.addLink(remoteBroker, interbroker_mechanism) link = self.getLink() if link == None: raise Exception("Link failed to create") bridges = self.agent.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: if not config._quiet: raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) sys.exit(0) if config._verbose: print "Creating inter-broker binding..." res = link.bridge(config._durable, queue, exchange, "", "", "", True, config._srclocal, False, config._ack, credit=config._credit) if res.status != 0: raise Exception(res.text) if config._verbose: print "Bridge method returned:", res.status, res.text def delQueueRoute(self, remoteBroker, exchange, queue): self.remote = BrokerURL(remoteBroker) link = self.getLink() if link == None: if not config._quiet: raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) sys.exit(0) bridges = self.agent.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: if config._verbose: print "Closing bridge..." res = bridge.close() if res.status != 0: raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) if len(bridges) == 1 and config._dellink: link = self.getLink() if link == None: sys.exit(0) if config._verbose: print "Last bridge on link, closing link..." res = link.close() if res.status != 0: raise Exception("Error closing link: %d - %s" % (res.status, res.text)) sys.exit(0) if not config._quiet: raise Exception("Route not found") def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): self.remote = BrokerURL(remoteBroker) link = self.getLink() if link == None: if not config._quiet: raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) sys.exit(0) bridges = self.agent.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ and bridge.dynamic == dynamic: if config._verbose: print "Closing bridge..." res = bridge.close() if res.status != 0: raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) if len(bridges) == 1 and config._dellink: link = self.getLink() if link == None: sys.exit(0) if config._verbose: print "Last bridge on link, closing link..." res = link.close() if res.status != 0: raise Exception("Error closing link: %d - %s" % (res.status, res.text)) return if not config._quiet: raise Exception("Route not found") def listRoutes(self): links = self.qmf.getObjects(_class="link") bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: myLink = None for link in links: if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: if bridge.dynamic: keyText = "" else: keyText = bridge.key print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText) def clearAllRoutes(self): links = self.qmf.getObjects(_class="link") bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if config._verbose: myLink = None for link in links: if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), res = bridge.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.text) elif config._verbose: print "Ok" if config._dellink: links = self.qmf.getObjects(_class="link") for link in links: if config._verbose: print "Deleting Link: %s:%d... " % (link.host, link.port), res = link.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.text) elif config._verbose: print "Ok" class RoutePair: def __init__(self, fromUrl, toUrl): self.fromUrl = fromUrl self.toUrl = toUrl self.bidir = False def __repr__(self): if self.bidir: delimit = "<=>" else: delimit = " =>" return "%s %s %s" % (self.fromUrl, delimit, self.toUrl) def matches(self, fromUrl, toUrl): if fromUrl == self.fromUrl and toUrl == self.toUrl: return True if toUrl == self.fromUrl and fromUrl == self.toUrl: self.bidir = True return True return False def YN(val): if val == 1: return 'Y' return 'N' def main(argv=None): args = OptionsAndArguments(argv) nargs = len(args) if nargs < 2: Usage() return(-1) if nargs == 2: try: socket.gethostbyname(socket.gethostname()) localBroker = socket.gethostname() except socket.gaierror: localBroker = "localhost" else: if config._srclocal: localBroker = args[3] remoteBroker = args[2] else: localBroker = args[2] if nargs > 3: remoteBroker = args[3] group = args[0] cmd = args[1] rm = None try: rm = RouteManager(localBroker) if group == "link": if cmd == "add": if nargs < 3 or nargs > 5: Usage() return(-1) interbroker_mechanism = "" if nargs > 4: interbroker_mechanism = args[4] rm.addLink(remoteBroker, interbroker_mechanism) elif cmd == "del": if nargs != 4: Usage() return(-1) rm.delLink(remoteBroker) elif cmd == "list": rm.listLinks() elif group == "dynamic": if cmd == "add": if nargs < 5 or nargs > 8: Usage() return(-1) tag = "" excludes = "" interbroker_mechanism = "" if nargs > 5: tag = args[5] if nargs > 6: excludes = args[6] if nargs > 7: interbroker_mechanism = args[7] rm.addRoute(remoteBroker, args[4], "", tag, excludes, interbroker_mechanism, dynamic=True) elif cmd == "del": if nargs != 5: Usage() return(-1) else: rm.delRoute(remoteBroker, args[4], "", dynamic=True) elif group == "route": if cmd == "add": if nargs < 6 or nargs > 9: Usage() return(-1) tag = "" excludes = "" interbroker_mechanism = "" if nargs > 6: tag = args[6] if nargs > 7: excludes = args[7] if nargs > 8: interbroker_mechanism = args[8] rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, interbroker_mechanism, dynamic=False) elif cmd == "del": if nargs != 6: Usage() return(-1) rm.delRoute(remoteBroker, args[4], args[5], dynamic=False) elif cmd == "map": rm.mapRoutes() else: if cmd == "list": rm.listRoutes() elif cmd == "flush": rm.clearAllRoutes() else: Usage() return(-1) elif group == "queue": if nargs < 6 or nargs > 7: Usage() return(-1) if cmd == "add": interbroker_mechanism = "" if nargs > 6: interbroker_mechanism = args[6] rm.addQueueRoute(remoteBroker, interbroker_mechanism, exchange=args[4], queue=args[5] ) elif cmd == "del": rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) else: Usage() return(-1) else: Usage() return(-1) except Exception,e: if rm: rm.disconnect() # try to release broker resources print "Failed: %s - %s" % (e.__class__.__name__, e) return 1 rm.disconnect() return 0 if __name__ == "__main__": sys.exit(main())