#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import qmf.console, optparse, sys from qpid.management import managementChannel, managementClient from qpid.messaging import Connection from qpid.messaging import Message as QpidMessage try: from uuid import uuid4 except ImportError: from qpid.datatypes import uuid4 # Utility for doing fast qmf2 operations on a broker. class QmfBroker(object): def __init__(self, conn): self.conn = conn self.sess = self.conn.session() self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ str(uuid4()) self.reply_rx = self.sess.receiver(self.reply_to) self.reply_rx.capacity = 10 self.tx = self.sess.sender("qmf.default.direct/broker") self.next_correlator = 1 def close(self): self.conn.close() def __repr__(self): return "Qpid Broker: %s" % self.url def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"): props = {'method' : 'request', 'qmf.opcode' : '_method_request', 'x-amqp-0-10.app-id' : 'qmf2'} correlator = str(self.next_correlator) self.next_correlator += 1 content = {'_object_id' : {'_object_name' : addr}, '_method_name' : method, '_arguments' : arguments} message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, properties=props, subject="broker") self.tx.send(message) response = self.reply_rx.fetch(10) if response.properties['qmf.opcode'] == '_exception': raise Exception("Exception from Agent: %r" % response.content['_values']) if response.properties['qmf.opcode'] != '_method_response': raise Exception("bad response: %r" % response.properties) return response.content['_arguments'] def _sendRequest(self, opcode, content): props = {'method' : 'request', 'qmf.opcode' : opcode, 'x-amqp-0-10.app-id' : 'qmf2'} correlator = str(self.next_correlator) self.next_correlator += 1 message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, properties=props, subject="broker") self.tx.send(message) return correlator def _doClassQuery(self, class_name): query = {'_what' : 'OBJECT', '_schema_id' : {'_class_name' : class_name}} correlator = self._sendRequest('_query_request', query) response = self.reply_rx.fetch(10) if response.properties['qmf.opcode'] != '_query_response': raise Exception("bad response") items = [] done = False while not done: for item in response.content: items.append(item['_values']) if 'partial' in response.properties: response = self.reply_rx.fetch(10) else: done = True return items def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} correlator = self._sendRequest('_query_request', query) response = self.reply_rx.fetch(10) if response.properties['qmf.opcode'] != '_query_response': raise Exception("bad response") items = [] done = False while not done: for item in response.content: items.append(item['_values']) if 'partial' in response.properties: response = self.reply_rx.fetch(10) else: done = True if len(items) == 1: return items[0] return None def _getAllBrokerObjects(self, cls): items = self._doClassQuery(cls.__name__.lower()) objs = [] for item in items: objs.append(cls(self, item)) return objs def _getBrokerObject(self, cls, name): obj = self._doNameQuery(cls.__name__.lower(), name) if obj: return cls(self, obj) return None def get_ha_broker(self): ha_brokers = self._doClassQuery("habroker") if (not ha_brokers): raise Exception("Broker does not have HA enabled.") return ha_brokers[0] HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker" class Command: commands = {} def __init__(self, name, help, args=[]): Command.commands[name] = self self.name = name self.args = args usage="%s [options] %s\n\n%s"%(name, " ".join(args), help) self.help = help self.op=optparse.OptionParser(usage) self.op.add_option("-b", "--broker", metavar="", help="Connect to broker at ") def execute(self, command): opts, args = self.op.parse_args(command) if len(args) != len(self.args)+1: self.op.print_help() print "Error: wrong number of arguments" return broker = opts.broker or "localhost:5672" # FIXME aconway 2012-02-23: enforce not doing primary-only operations on a backup & vice versa connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) try: self.do_execute(QmfBroker(connection), opts, args) finally: connection.close() def do_execute(self, qmf_broker, opts, args): raise Exception("Command '%s' is not yet implemented"%self.name) def print_all_help(name): print "usage: %s []\n\nCommands are:\n"%name for c in Command.commands: help = Command.commands[c].help print " %-12s %s."%(c, help.split(".")[0]) print "\nFor help with a command: %s --help\n"%name class PromoteCmd(Command): def __init__(self): Command.__init__(self, "promote","Promote broker from backup to primary") def do_execute(self, qmf_broker, opts, args): qmf_broker._method("promote", {}, HA_BROKER) PromoteCmd() class ReadyCmd(Command): def __init__(self): Command.__init__(self, "ready", "Test if a backup broker is ready.\nReturn 0 if broker is a ready backup, non-0 otherwise.") self.op.add_option( "--wait", type="int", metavar="", help="Wait up to for broker to be ready. 0 means wait forever.") ReadyCmd() class ReplicateCmd(Command): def __init__(self): Command.__init__(self, "replicate", "Replicate from broker to the current broker.", ["", ""]) ReplicateCmd() class SetCmd(Command): def __init__(self): Command.__init__(self, "set", "Set HA configuration settings") def add(optname, metavar, type, help): self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store") add("--brokers", "", "string", "HA brokers use to connect to each other") add("--public-brokers", "", "string", "Clients use to connect to HA brokers") add("--backups", "", "int", "Expect backups to be running") def do_execute(self, qmf_broker, opts, args): if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER) if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER) if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER) SetCmd() class QueryCmd(Command): def __init__(self): Command.__init__(self, "query", "Print HA configuration settings") def do_execute(self, qmf_broker, opts, args): hb = qmf_broker.get_ha_broker() for x in [("Status:", "status"), ("Brokers URL:", "brokers"), ("Public URL:", "publicBrokers")]: print "%-16s%s"%(x[0], hb[x[1]]) QueryCmd() def main(argv): try: command=argv[1:] if command and command[0] == "--help-all": for c in Command.commands.itervalues(): c.op.print_help(); print return 1 if not command or not command[0] in Command.commands: print_all_help(argv[0]); return 1; Command.commands[command[0]].execute(command) except Exception, e: raise # FIXME aconway 2012-02-23: print e return 1 if __name__ == "__main__": sys.exit(main(sys.argv))