#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys from optparse import OptionParser from cproton import * class Options(object): def __init__(self): parser = OptionParser(usage="usage: %prog [options] [ ...]") parser.add_option("-s", "--server", action="store", type="string", default="0.0.0.0:5672", metavar="", help="Address of the server with syntax: hostname | ip-address [:]") parser.add_option("-m", "--mailbox", action="store", type="string", default="mailbox", metavar="", help="Name of the mailbox on the server.") parser.add_option("-v", "--verbose", action="store_true", help="Turn on extra trace messages.") opts, self.messages = parser.parse_args() # uses sys.argv[1:] self.server = opts.server self.mailbox = opts.mailbox addr = opts.server.rsplit(":", 1) self.host = addr[0] if len(addr) == 2: self.port = addr[1] else: self.port = "5672" self.verbose = opts.verbose class PostClient(object): def __init__(self, host, port, mailbox): """ Initialize the client by supplying the address of the server, and the name of the mailbox to post to. """ self.host = host self.port = port self.mailbox = mailbox self.logging = False def setup(self): """ Setup and configure the connection to the server. """ # setup a driver connection to the server self.log("Connecting to server host = %s:%s" % (self.host, self.port)) self.driver = pn_driver(); self.cxtr = pn_connector(self.driver, self.host, self.port, None) # configure SASL self.sasl = pn_connector_sasl(self.cxtr) pn_sasl_mechanisms(self.sasl, "ANONYMOUS") pn_sasl_client(self.sasl) # inform the engine about the connection, and link the driver to it. self.conn = pn_connection() pn_connector_set_connection(self.cxtr, self.conn) # create a session, and Link for receiving from the mailbox self.log("Posting to mailbox = %s" % self.mailbox) self.ssn = pn_session(self.conn) self.link = pn_sender(self.ssn, "sender") pn_set_target(self.link, self.mailbox) # now open all the engine endpoints pn_connection_open(self.conn) pn_session_open(self.ssn) pn_link_open(self.link) def wait(self): """ Wait for an event to process. """ self.log("Waiting for events...") # prepare pending outbound data for the network pn_connector_process(self.cxtr) # wait forever for network event(s) pn_driver_wait(self.driver, -1) # process any data that arrived pn_connector_process(self.cxtr) self.log("...waiting done!") def settle(self): """ In order to be sure that the remote has accepted the message, we need to wait until the message's delivery has been remotely settled. Once that occurs, we can release the delivery by settling it. """ d = pn_unsettled_head(self.link) while d: _next = pn_unsettled_next(d) # if the remote has either settled this delivery OR set the # disposition, we consider the message received. disp = pn_remote_disposition(d) if disp and disp != PN_ACCEPTED: print("Warning: message was not accepted by the remote!") if disp or pn_remote_settled(d): pn_settle(d) d = _next def enableLogging(self): self.logging = True def log(self, msg): if self.logging: print("%s" % msg) ################################################## ################################################## ################################################## def main(): options = Options() if len(options.messages) == 0: print("No message data given!") return -1 sender = PostClient(options.host, options.port, options.mailbox) if options.verbose: sender.enableLogging() sender.setup() # wait until we authenticate with the server while pn_sasl_state(sender.sasl) not in (PN_SASL_PASS, PN_SASL_FAIL): sender.wait() if pn_sasl_state(sender.sasl) == PN_SASL_FAIL: print("Error: Authentication failure") return -1 # main loop: send each message pendingSends = list(options.messages) while pendingSends: # wait until the server grands us some send credit if pn_credit(sender.link) == 0: sender.log("wait for credit") sender.wait() while pn_credit(sender.link) > 0: msg = pendingSends.pop(0) sender.log("sending %s" % msg) d = pn_delivery(sender.link, "post-delivery-%s" % len(pendingSends)) rc = pn_send(sender.link, msg) if (rc < 0): print("Error: sending message: %s" % rc) return -2 assert rc == len(msg) pn_advance(sender.link) # deliver the message # settle any deliveries that the server has accepted sender.settle() # done sending, now block until any pending deliveries are settled while pn_unsettled(sender.link) > 0: sender.wait() sender.settle() # we're done, close and wait for the remote to close also pn_connection_close(sender.conn) while not (pn_connection_state(sender.conn) & PN_REMOTE_CLOSED): sender.wait() return 0 if __name__ == "__main__": sys.exit(main())