#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys from optparse import OptionParser from cproton import * class Options(object): def __init__(self): parser = OptionParser(usage="usage: %prog [options] ") parser.add_option("-s", "--server", action="store", type="string", default="0.0.0.0:5672", metavar="", help="Address of the server with syntax: hostname | ip-address [:]") parser.add_option("-c", "--count", action="store", type="int", default=1, metavar="<#messages>", help="Number of messages to read from mailbox") parser.add_option("-v", "--verbose", action="store_true", help="Turn on extra trace messages.") # SSL configuration parser.add_option("--ssl-cert-db", type="str", metavar="", help="database of trusted certificates. Enables use of SSL.") # if server wants authentication: #parser.add_option("--ssl-cert-file") #parser.add_option("--ssl-key-file") #parser.add_option("--ssl-key-pw") opts, mailboxes = parser.parse_args() # uses sys.argv[1:] self.mailbox = None if len(mailboxes) == 1: self.mailbox = str(mailboxes[0]) self.server = opts.server self.ca_database = opts.ssl_cert_db addr = opts.server.rsplit(":", 1) self.host = addr[0] if len(addr) == 2: self.port = addr[1] else: self.port = "5672" self.count = opts.count self.verbose = opts.verbose class FetchClient(object): def __init__(self, host, port, mailbox, ca_database=None): """ Initialize the client by supplying the address of the server, and the name of the mailbox to fetch from. """ self.host = host self.port = port self.mailbox = mailbox self.logging = False self.ca_database = ca_database self.ssl_client = None def setup(self): """ Setup and configure the connection to the server. """ # setup a driver connection to the server self.log("Connecting to server host = %s:%s" % (self.host, self.port)) self.driver = pn_driver(); self.cxtr = pn_connector(self.driver, self.host, self.port, None) # Enable SSL if database of trusted CAs given if self.ca_database: self.log("Using SSL, CA database = %s" % self.ca_database) transport = pn_connector_transport(self.cxtr); assert(transport); ssl_client = pn_ssl(transport) assert(ssl_client) rc = pn_ssl_set_trusted_ca_db(ssl_client, self.ca_database) assert(rc == 0) # we want to fail if the server's certificate is invalid: rc = pn_ssl_set_peer_authentication(ssl_client, PN_SSL_VERIFY_PEER, None) assert(rc == 0) # configure SASL self.sasl = pn_connector_sasl(self.cxtr) pn_sasl_mechanisms(self.sasl, "ANONYMOUS") pn_sasl_client(self.sasl) # inform the engine about the connection, and link the driver to it. self.conn = pn_connection() pn_connector_set_connection(self.cxtr, self.conn) # create a session, and Link for receiving from the mailbox self.log("Fetching from mailbox = %s" % self.mailbox) self.ssn = pn_session(self.conn) self.link = pn_receiver(self.ssn, "receiver") src = pn_link_source(self.link) pn_terminus_set_address(src, self.mailbox) # now open all the engine endpoints pn_connection_open(self.conn) pn_session_open(self.ssn) pn_link_open(self.link) def teardown(self): """ Perform a clean disconnect from the server, and release the resources created in setup() """ self.log("Shutting down the connection cleanly...") pn_connection_close(self.conn) # now wait for the connector to close while not (pn_connector_closed(self.cxtr)): self.wait() #pn_sasl_free(self.sasl); pn_link_free(self.link); pn_session_free(self.ssn); pn_connection_free(self.conn); pn_connector_free(self.cxtr); self.log("...Shutdown complete!") def wait(self): """ Wait for an event to process. """ self.log("Waiting for events...") # prepare pending outbound data for the network pn_connector_process(self.cxtr) # wait forever for network event(s) pn_driver_wait(self.driver, -1) # process any data that arrived pn_connector_process(self.cxtr) self.log("...waiting done!") def settle(self): """ In order to be sure that the remote has seen that we accepted the message, we need to wait until the message's delivery has been remotely settled. Once that occurs, we can release the delivery by settling it. """ # locally settle any remotely settled deliveries d = pn_unsettled_head(self.link) while d and not pn_delivery_readable(d): # can stop when we hit first # delivery that has not yet been read _next = pn_unsettled_next(d) if pn_delivery_settled(d): # remote has seen our ack (or doesn't care) pn_delivery_settle(d) # now free it d = _next def closed(self): return self.cxtr == None or pn_connector_closed(self.cxtr) def enableLogging(self): self.logging = True def log(self, msg): if self.logging: print("%s" % msg) ################################################## ################################################## ################################################## def main(): options = Options() if not options.mailbox: print("No mailbox name given!") return -1 receiver = FetchClient(options.host, options.port, options.mailbox, options.ca_database) if options.verbose: receiver.enableLogging() receiver.setup() # wait until we authenticate with the server while pn_sasl_state(receiver.sasl) not in (PN_SASL_PASS, PN_SASL_FAIL): receiver.wait() if receiver.closed(): receiver.log("connection failed") return -1; if pn_sasl_state(receiver.sasl) == PN_SASL_FAIL: print("Error: Authentication failure") return -1 # wait until the server has opened the connection while not (pn_link_state(receiver.link) & PN_REMOTE_ACTIVE): receiver.wait() if receiver.closed(): receiver.log("connection failed") return -1; # check if the server recognizes the mailbox, fail if it does not r_tgt = pn_link_remote_source(receiver.link) if pn_terminus_get_address(r_tgt) != options.mailbox: print("Error: mailbox %s does not exist!" % options.mailbox) return -2 # Allow the server to send "count" messages to the receiver by setting # the credit to the expected count pn_link_flow(receiver.link, options.count) # main loop: continue fetching messages until 'count' messages have been # retrieved while pn_link_credit(receiver.link) > 0 and not receiver.closed(): # while all msgs have not arrived if pn_link_queued(receiver.link) == 0: # wait for some to arrive receiver.wait() # read all queued deliveries while pn_link_queued(receiver.link): delivery = pn_link_current(receiver.link) # read all bytes of message rc, msg = pn_link_recv(receiver.link, pn_delivery_pending(delivery)) receiver.log("Received count/status=%d" % rc) if rc < 0: print("Error: Receive failed (%s), exiting..." % rc) return -3 print("%s" % msg) # let the server know we accept the message pn_delivery_update(delivery, PN_ACCEPTED) pn_link_advance(receiver.link) # next delivery # settle any deliveries that the server has seen receiver.settle() # block until any leftover deliveries are settled while pn_link_unsettled(receiver.link) > 0 and not receiver.closed(): receiver.wait() receiver.settle() # we're done, now clean up the connection: receiver.teardown() return 0 if __name__ == "__main__": sys.exit(main())