#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import getopt import sys import socket import qpid import uuid from qpid.management import managementClient from qpid.peer import Closed from qpid.connection import Connection from qpid.util import connect def Usage (): print "Usage: qpid-route [OPTIONS] add " print " qpid-route [OPTIONS] del " print " qpid-route [OPTIONS] list " #print " qpid-route [OPTIONS] load " print " qpid-route [OPTIONS] flush " print print "Options:" print " -s [ --spec-file ] PATH (/usr/share/amqp/amqp.0-10.xml)" print " -v [ --verbose ] Verbose output" print " -q [ --quiet ] Quiet output, don't print duplicate warnings" print print " dest-broker and src-broker are in the form: hostname | ip-address [:]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000" print #print " If loading the route configuration from a file, the input file has one line per route" #print " in the form:" #print #print " " #print sys.exit (1) _specpath = "/usr/share/amqp/amqp.0-10.xml" _verbose = False _quiet = False class Broker: def __init__ (self, text): colon = text.find (":") if colon == -1: host = text self.port = 5672 else: host = text[:colon] self.port = int (text[colon+1:]) self.host = socket.gethostbyname (host) def name (self): return self.host + ":" + str (self.port) class RouteManager: def __init__ (self, destBroker): self.dest = Broker (destBroker) self.src = None def ConnectToBroker (self): broker = self.dest if _verbose: print "Connecting to broker: %s:%d" % (broker.host, broker.port) try: self.spec = qpid.spec.load (_specpath) self.conn = Connection (connect (broker.host, broker.port), self.spec) self.conn.start () self.mclient = managementClient (self.spec) self.mch = self.mclient.addChannel (self.conn.session(str(uuid.uuid4()))) self.mclient.syncWaitForStable (self.mch) except socket.error, e: print "Connect Error:", e sys.exit (1) def getLink (self): links = self.mclient.syncGetObjects (self.mch, "link") for link in links: if link.address == self.src.name (): return link return None def AddRoute (self, srcBroker, exchange, routingKey): self.src = Broker (srcBroker) mc = self.mclient brokers = mc.syncGetObjects (self.mch, "broker") broker = brokers[0] link = self.getLink () if link == None: if _verbose: print "Inter-broker link not found, creating..." connectArgs = {} connectArgs["host"] = self.src.host connectArgs["port"] = self.src.port res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) if _verbose: print "Connect method returned:", res.status, res.statusText link = self.getLink () if link == None: print "Protocol Error - Missing link ID" sys.exit (1) bridges = mc.syncGetObjects (self.mch, "bridge") for bridge in bridges: if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: if not _quiet: print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey) sys.exit (1) sys.exit (0) if _verbose: print "Creating inter-broker binding..." bridgeArgs = {} bridgeArgs["src"] = exchange bridgeArgs["dest"] = exchange bridgeArgs["key"] = routingKey bridgeArgs["src_is_queue"] = 0 bridgeArgs["src_is_local"] = 0 res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs) if _verbose: print "Bridge method returned:", res.status, res.statusText def DelRoute (self, srcBroker, exchange, routingKey): self.src = Broker (srcBroker) mc = self.mclient link = self.getLink () if link == None: if not _quiet: print "No link found from %s to %s" % (self.src.name(), self.dest.name()) sys.exit (1) sys.exit (0) bridges = mc.syncGetObjects (self.mch, "bridge") for bridge in bridges: if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: if _verbose: print "Closing bridge..." res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") if res.status != 0: print "Error closing bridge: %d - %s" % (res.status, res.statusText) sys.exit (1) if len (bridges) == 1: link = self.getLink () if link == None: sys.exit (0) if _verbose: print "Last bridge on link, closing link..." res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") if res.status != 0: print "Error closing link: %d - %s" % (res.status, res.statusText) sys.exit (1) sys.exit (0) if not _quiet: print "Route not found" sys.exit (1) def ListRoutes (self): mc = self.mclient links = mc.syncGetObjects (self.mch, "link") bridges = mc.syncGetObjects (self.mch, "bridge") for bridge in bridges: myLink = None for link in links: if bridge.linkRef == link.id: myLink = link break if myLink != None: print "%s %s %s %s" % (self.dest.name(), myLink.address, bridge.dest, bridge.key) def LoadRoutes (self, inFile): pass def ClearAllRoutes (self): mc = self.mclient links = mc.syncGetObjects (self.mch, "link") bridges = mc.syncGetObjects (self.mch, "bridge") for bridge in bridges: if _verbose: myLink = None for link in links: if bridge.linkRef == link.id: myLink = link break if myLink != None: print "Deleting Bridge: %s %s %s... " % (myLink.address, bridge.dest, bridge.key), res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") if res.status != 0: print "Error: %d - %s" % (res.status, res.statusText) elif _verbose: print "Ok" links = mc.syncGetObjects (self.mch, "link") for link in links: if _verbose: print "Deleting Link: %s... " % link.address, res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") if res.status != 0: print "Error: %d - %s" % (res.status, res.statusText) elif _verbose: print "Ok" ## ## Main Program ## try: longOpts = ("verbose", "quiet", "spec-file=") (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "s:vq", longOpts) except: Usage () for opt in optlist: if opt[0] == "-s" or opt[0] == "--spec-file": _specpath = opt[1] if opt[0] == "-v" or opt[0] == "--verbose": _verbose = True if opt[0] == "-q" or opt[0] == "--quiet": _quiet = True nargs = len (cargs) if nargs < 2: Usage () cmd = cargs[0] if cmd != "load": rm = RouteManager (cargs[1]) rm.ConnectToBroker () if cmd == "add" or cmd == "del": if nargs != 5: Usage () if cmd == "add": rm.AddRoute (cargs[2], cargs[3], cargs[4]) else: rm.DelRoute (cargs[2], cargs[3], cargs[4]) else: if nargs != 2: Usage () if cmd == "list": rm.ListRoutes () #elif cmd == "load": # rm.LoadRoutes (cargs[1]) elif cmd == "flush": rm.ClearAllRoutes () else: Usage ()