#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import getopt import sys import socket import os import locale from qmf.console import Session, BrokerURL def Usage(): print "Usage: qpid-route [OPTIONS] dynamic add [tag] [exclude-list]" print " qpid-route [OPTIONS] dynamic del " print print " qpid-route [OPTIONS] route add [tag] [exclude-list]" print " qpid-route [OPTIONS] route del " print " qpid-route [OPTIONS] queue add " print " qpid-route [OPTIONS] queue del " print " qpid-route [OPTIONS] route list []" print " qpid-route [OPTIONS] route flush []" print " qpid-route [OPTIONS] route map []" print print " qpid-route [OPTIONS] link add " print " qpid-route [OPTIONS] link del " print " qpid-route [OPTIONS] link list []" print print "Options:" print " --timeout seconds (10) Maximum time to wait for broker connection" print " -v [ --verbose ] Verbose output" print " -q [ --quiet ] Quiet output, don't print duplicate warnings" print " -d [ --durable ] Added configuration shall be durable" print " -e [ --del-empty-link ] Delete link after deleting last route on the link" print " -s [ --src-local ] Make connection to source broker (push route)" print " --ack N Acknowledge transfers over the bridge in batches of N" print " -t [ --transport ]" print " Specify transport to use for links, defaults to tcp" print print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print sys.exit(1) _verbose = False _quiet = False _durable = False _dellink = False _srclocal = False _transport = "tcp" _ack = 0 _connTimeout = 10 class RouteManager: def __init__(self, localBroker): self.local = BrokerURL(localBroker) self.remote = None self.qmf = Session() self.broker = self.qmf.addBroker(localBroker, _connTimeout) def disconnect(self): self.qmf.delBroker(self.broker) def getLink(self): links = self.qmf.getObjects(_class="link") for link in links: if self.remote.match(link.host, link.port): return link return None def addLink(self, remoteBroker): self.remote = BrokerURL(remoteBroker) if self.local.match(self.remote.host, self.remote.port): raise Exception("Linking broker to itself is not permitted") brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() if link == None: if not self.remote.authName or self.remote.authName == "anonymous": mech = "ANONYMOUS" else: mech = "PLAIN" res = broker.connect(self.remote.host, self.remote.port, _durable, mech, self.remote.authName or "", self.remote.authPass or "", _transport) if _verbose: print "Connect method returned:", res.status, res.text def delLink(self, remoteBroker): self.remote = BrokerURL(remoteBroker) brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() if link == None: raise Exception("Link not found") res = link.close() if _verbose: print "Close method returned:", res.status, res.text def listLinks(self): links = self.qmf.getObjects(_class="link") if len(links) == 0: print "No Links Found" else: print print "Host Port Transport Durable State Last Error" print "=============================================================================" for link in links: print "%-16s%-8d%-13s%c %-18s%s" % \ (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError) def mapRoutes(self): qmf = self.qmf print print "Finding Linked Brokers:" brokerList = {} brokerList[self.local.name()] = self.broker print " %s... Ok" % self.local added = True while added: added = False links = qmf.getObjects(_class="link") for link in links: url = BrokerURL("%s:%d" % (link.host, link.port)) if url.name() not in brokerList: print " %s..." % url.name(), try: b = qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout) brokerList[url.name()] = b added = True print "Ok" except Exception, e: print e print print "Dynamic Routes:" bridges = qmf.getObjects(_class="bridge", dynamic=True) fedExchanges = [] for bridge in bridges: if bridge.src not in fedExchanges: fedExchanges.append(bridge.src) if len(fedExchanges) == 0: print " none found" print for ex in fedExchanges: print " Exchange %s:" % ex pairs = [] for bridge in bridges: if bridge.src == ex: link = bridge._linkRef_ fromUrl = "%s:%s" % (link.host, link.port) toUrl = bridge.getBroker().getUrl() found = False for pair in pairs: if pair.matches(fromUrl, toUrl): found = True if not found: pairs.append(RoutePair(fromUrl, toUrl)) for pair in pairs: print " %s" % pair print print "Static Routes:" bridges = qmf.getObjects(_class="bridge", dynamic=False) if len(bridges) == 0: print " none found" print for bridge in bridges: link = bridge._linkRef_ fromUrl = "%s:%s" % (link.host, link.port) toUrl = bridge.getBroker().getUrl() leftType = "ex" rightType = "ex" if bridge.srcIsLocal: arrow = "=>" left = bridge.src right = bridge.dest if bridge.srcIsQueue: leftType = "queue" else: arrow = "<=" left = bridge.dest right = bridge.src if bridge.srcIsQueue: rightType = "queue" if bridge.srcIsQueue: print " %s(%s=%s) %s %s(%s=%s)" % \ (toUrl, leftType, left, arrow, fromUrl, rightType, right) else: print " %s(%s=%s) %s %s(%s=%s) key=%s" % \ (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) print for broker in brokerList: if broker != self.local.name(): qmf.delBroker(brokerList[broker]) def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False): if dynamic and _srclocal: raise Exception("--src-local is not permitted on dynamic routes") self.addLink(remoteBroker) link = self.getLink() if link == None: raise Exception("Link failed to create") bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: if not _quiet: raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) sys.exit(0) if _verbose: print "Creating inter-broker binding..." res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack) if res.status != 0: raise Exception(res.text) if _verbose: print "Bridge method returned:", res.status, res.text def addQueueRoute(self, remoteBroker, exchange, queue): self.addLink(remoteBroker) link = self.getLink() if link == None: raise Exception("Link failed to create") bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: if not _quiet: raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) sys.exit(0) if _verbose: print "Creating inter-broker binding..." res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack) if res.status != 0: raise Exception(res.text) if _verbose: print "Bridge method returned:", res.status, res.text def delQueueRoute(self, remoteBroker, exchange, queue): self.remote = BrokerURL(remoteBroker) link = self.getLink() if link == None: if not _quiet: raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) sys.exit(0) bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: if _verbose: print "Closing bridge..." res = bridge.close() if res.status != 0: raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) if len(bridges) == 1 and _dellink: link = self.getLink() if link == None: sys.exit(0) if _verbose: print "Last bridge on link, closing link..." res = link.close() if res.status != 0: raise Exception("Error closing link: %d - %s" % (res.status, res.text)) sys.exit(0) if not _quiet: raise Exception("Route not found") def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): self.remote = BrokerURL(remoteBroker) link = self.getLink() if link == None: if not _quiet: raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) sys.exit(0) bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ and bridge.dynamic == dynamic: if _verbose: print "Closing bridge..." res = bridge.close() if res.status != 0: raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) if len(bridges) == 1 and _dellink: link = self.getLink() if link == None: sys.exit(0) if _verbose: print "Last bridge on link, closing link..." res = link.close() if res.status != 0: raise Exception("Error closing link: %d - %s" % (res.status, res.text)) sys.exit(0) if not _quiet: raise Exception("Route not found") def listRoutes(self): links = self.qmf.getObjects(_class="link") bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: myLink = None for link in links: if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: if bridge.dynamic: keyText = "" else: keyText = bridge.key print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText) def clearAllRoutes(self): links = self.qmf.getObjects(_class="link") bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if _verbose: myLink = None for link in links: if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), res = bridge.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.text) elif _verbose: print "Ok" if _dellink: links = self.qmf.getObjects(_class="link") for link in links: if _verbose: print "Deleting Link: %s:%d... " % (link.host, link.port), res = link.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.text) elif _verbose: print "Ok" class RoutePair: def __init__(self, fromUrl, toUrl): self.fromUrl = fromUrl self.toUrl = toUrl self.bidir = False def __repr__(self): if self.bidir: delimit = "<=>" else: delimit = " =>" return "%s %s %s" % (self.fromUrl, delimit, self.toUrl) def matches(self, fromUrl, toUrl): if fromUrl == self.fromUrl and toUrl == self.toUrl: return True if toUrl == self.fromUrl and fromUrl == self.toUrl: self.bidir = True return True return False def YN(val): if val == 1: return 'Y' return 'N' ## ## Main Program ## try: longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=", "timeout=") (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts) except: Usage() try: encoding = locale.getpreferredencoding() cargs = [a.decode(encoding) for a in encArgs] except: cargs = encArgs for opt in optlist: if opt[0] == "--timeout": _connTimeout = int(opt[1]) if _connTimeout == 0: _connTimeout = None if opt[0] == "-v" or opt[0] == "--verbose": _verbose = True if opt[0] == "-q" or opt[0] == "--quiet": _quiet = True if opt[0] == "-d" or opt[0] == "--durable": _durable = True if opt[0] == "-e" or opt[0] == "--del-empty-link": _dellink = True if opt[0] == "-s" or opt[0] == "--src-local": _srclocal = True if opt[0] == "-t" or opt[0] == "--transport": _transport = opt[1] if opt[0] == "--ack": _ack = int(opt[1]) nargs = len(cargs) if nargs < 2: Usage() if nargs == 2: localBroker = "localhost" else: if _srclocal: localBroker = cargs[3] remoteBroker = cargs[2] else: localBroker = cargs[2] if nargs > 3: remoteBroker = cargs[3] group = cargs[0] cmd = cargs[1] try: rm = RouteManager(localBroker) if group == "link": if cmd == "add": if nargs != 4: Usage() rm.addLink(remoteBroker) elif cmd == "del": if nargs != 4: Usage() rm.delLink(remoteBroker) elif cmd == "list": rm.listLinks() elif group == "dynamic": if cmd == "add": if nargs < 5 or nargs > 7: Usage() tag = "" excludes = "" if nargs > 5: tag = cargs[5] if nargs > 6: excludes = cargs[6] rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True) elif cmd == "del": if nargs != 5: Usage() else: rm.delRoute(remoteBroker, cargs[4], "", dynamic=True) elif group == "route": if cmd == "add": if nargs < 6 or nargs > 8: Usage() tag = "" excludes = "" if nargs > 6: tag = cargs[6] if nargs > 7: excludes = cargs[7] rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False) elif cmd == "del": if nargs != 6: Usage() rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False) elif cmd == "map": rm.mapRoutes() else: if cmd == "list": rm.listRoutes() elif cmd == "flush": rm.clearAllRoutes() else: Usage() elif group == "queue": if nargs != 6: Usage() if cmd == "add": rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) elif cmd == "del": rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) else: Usage() except Exception,e: print "Failed: %s - %s" % (e.__class__.__name__, e) sys.exit(1) rm.disconnect()