#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys from optparse import OptionParser from cproton import * class Options(object): def __init__(self): parser = OptionParser(usage="usage: %prog [options] ") parser.add_option("-s", "--server", action="store", type="string", default="0.0.0.0:5672", metavar="", help="Address of the server with syntax: hostname | ip-address [:]") parser.add_option("-c", "--count", action="store", type="int", default=1, metavar="<#messages>", help="Number of messages to read from mailbox") parser.add_option("-v", "--verbose", action="store_true", help="Turn on extra trace messages.") opts, mailboxes = parser.parse_args() # uses sys.argv[1:] self.mailbox = None if len(mailboxes) == 1: self.mailbox = str(mailboxes[0]) self.server = opts.server addr = opts.server.rsplit(":", 1) self.host = addr[0] if len(addr) == 2: self.port = addr[1] else: self.port = "5672" self.count = opts.count self.verbose = opts.verbose class FetchClient(object): def __init__(self, host, port, mailbox): """ Initialize the client by supplying the address of the server, and the name of the mailbox to fetch from. """ self.host = host self.port = port self.mailbox = mailbox self.logging = False def setup(self): """ Setup and configure the connection to the server. """ # setup a driver connection to the server self.log("Connecting to server host = %s:%s" % (self.host, self.port)) self.driver = pn_driver(); self.cxtr = pn_connector(self.driver, self.host, self.port, None) # configure SASL self.sasl = pn_connector_sasl(self.cxtr) pn_sasl_mechanisms(self.sasl, "ANONYMOUS") pn_sasl_client(self.sasl) # inform the engine about the connection, and link the driver to it. self.conn = pn_connection() pn_connection_set_container(self.conn, "fetch") pn_connector_set_connection(self.cxtr, self.conn) # create a session, and Link for receiving from the mailbox self.log("Fetching from mailbox = %s" % self.mailbox) self.ssn = pn_session(self.conn) self.link = pn_receiver(self.ssn, "receiver") pn_set_source(self.link, self.mailbox) # now open all the engine endpoints pn_connection_open(self.conn) pn_session_open(self.ssn) pn_link_open(self.link) def wait(self): """ Wait for an event to process. """ self.log("Waiting for events...") # prepare pending outbound data for the network pn_connector_process(self.cxtr) # wait forever for network event(s) pn_driver_wait(self.driver, -1) # process any data that arrived pn_connector_process(self.cxtr) self.log("...waiting done!") def settle(self): """ In order to be sure that the remote has seen that we accepted the message, we need to wait until the message's delivery has been remotely settled. Once that occurs, we can release the delivery by settling it. """ # locally settle any remotely settled deliveries d = pn_unsettled_head(self.link) while d and not pn_readable(d): # can stop when we hit first # delivery that has not yet been read _next = pn_unsettled_next(d) if pn_remote_settled(d): # remote has seen our ack (or doesn't care) pn_settle(d) # now free it d = _next def enableLogging(self): self.logging = True def log(self, msg): if self.logging: print("%s" % msg) ################################################## ################################################## ################################################## def main(): options = Options() if not options.mailbox: print("No mailbox name given!") return -1 receiver = FetchClient(options.host, options.port, options.mailbox) if options.verbose: receiver.enableLogging() receiver.setup() # wait until we authenticate with the server while pn_sasl_state(receiver.sasl) not in (PN_SASL_PASS, PN_SASL_FAIL): receiver.wait() if pn_sasl_state(receiver.sasl) == PN_SASL_FAIL: print("Error: Authentication failure") return -1 # wait until the server has opened the connection while not (pn_link_state(receiver.link) & PN_REMOTE_ACTIVE): receiver.wait() # check if the server recognizes the mailbox, fail if it does not if pn_remote_source(receiver.link) != options.mailbox: print("Error: mailbox %s does not exist!" % options.mailbox) return -2 # Allow the server to send "count" messages to the receiver by setting # the credit to the expected count pn_flow(receiver.link, options.count) # main loop: continue fetching messages until 'count' messages have been # retrieved while pn_credit(receiver.link) > 0: # while all msgs have not arrived if pn_queued(receiver.link) == 0: # wait for some to arrive receiver.wait() # read all queued deliveries while pn_queued(receiver.link): delivery = pn_current(receiver.link) # read all bytes of message rc, msg = pn_recv(receiver.link, pn_pending(delivery)) receiver.log("Received count/status=%d" % rc) if rc < 0: print("Error: Receive failed (%s), exiting..." % rc) return -3 print("%s" % msg) # let the server know we accept the message pn_disposition(delivery, PN_ACCEPTED) pn_advance(receiver.link) # next delivery # settle any deliveries that the server has seen receiver.settle() # block until any leftover deliveries are settled while pn_unsettled(receiver.link) > 0: receiver.wait() receiver.settle() # we're done, close and wait for the remote to close also pn_connection_close(receiver.conn) while not (pn_connection_state(receiver.conn) & PN_REMOTE_CLOSED): receiver.wait() return 0 if __name__ == "__main__": sys.exit(main())