#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys from optparse import OptionParser from cproton import * FAILED = 0 CONNECTION_UP = 1 AUTHENTICATING = 2 counter = 0 mailboxes = {} class Options(object): def __init__(self): parser = OptionParser(usage="usage: %prog [options] ") parser.add_option("-v", "--verbose", action="store_true", dest="verbose", help="print status messages to stdout") # SSL configuration parser.add_option("--ssl-cert-file", type="str", metavar="", help="PEM file containing identifying certificate.") parser.add_option("--ssl-key-file", type="str", metavar="", help="PEM file containing private key of certificate.") parser.add_option("--ssl-key-pw", type="str", metavar='"password"', help="key file password (if key encrypted).") parser.add_option("--require-encryption", action="store_true", help="Do not accept connections from clients that do not use encryption.") parser.add_option("--ssl-cert-db", type="str", metavar="", help="database of trusted certificates. Used to authenticate clients.") # For client authentication using SSL: #parser.add_option("-c", "--ssl-require-client-authentication", action="store_true", help="force client to authenticate itself.") #parser.add_option("-t", "--ssl-trusted-CA-file", type="str", metavar="", help="file holding certificates of CAs to advertise") opts, self.server = parser.parse_args() # uses sys.argv[1:] self.verbose = opts.verbose if opts.ssl_cert_file: if not opts.ssl_key_file: print("Error: if --ssl-cert-file given, --ssl-key-file must be supplied!"); sys.exit(-1) self.certificate_file = opts.ssl_cert_file self.certificate_key_file = opts.ssl_key_file self.key_file_password = opts.ssl_key_pw self.require_encryption = opts.require_encryption self.ca_database = opts.ssl_cert_db if self.server: addr = self.server[0].rsplit(":", 1) self.host = addr[0] if len(addr) == 2: self.port = addr[1] else: self.port = "5672" else: self.host = "0.0.0.0" self.port = "5672" class MailboxServer(object): def __init__(self, host, port, require_encryption=False, certificate_file=None, certificate_key_file=None, key_file_password=None, ca_database=None): """ Initialize the server to wait on the given address for inbound connection requests. """ self.host = host self.port = port self.certificate_file = certificate_file self.certificate_key_file = certificate_key_file self.key_file_password = key_file_password self.ca_database = ca_database self.require_encryption = require_encryption self.mailboxes = {} self.logging = False self.counter = 0 def setup(self): """ Setup and configure the server """ self.log("Server started, listening on %s:%s" % (self.host, self.port)) self.driver = pn_driver(); self.listener = pn_listener(self.driver, self.host, self.port, None) if self.listener is None: print("Error: could not listen on %s:%s" % (self.host, self.port)) return False # if self.certificate_file: # self.log("Setting SSL certificate %s" % self.certificate_file) # if not self.require_encryption: # self.log("Allowing both encrypted AND unencrypted connections!") if self.certificate_file: if not self.require_encryption: self.log("Allowing both encrypted AND unencrypted connections!") return True def wait(self): """ Wait for a network event from the driver """ self.log("Driver sleep...") pn_driver_wait(self.driver, -1) self.log("...Driver wakeup.") def acceptConnectionRequests(self): """ Accept connection request coming from remote clients. Create a connector to track each accepted network connection. """ l = pn_driver_listener(self.driver) while l: self.log("Accepting Connection.") cxtr = pn_listener_accept(l) pn_connector_set_context(cxtr, AUTHENTICATING) # configure SSL if self.certificate_file: transport = pn_connector_transport(cxtr); assert(transport); ssl_server = pn_ssl(transport) assert(ssl_server) rc = pn_ssl_init(ssl_server, PN_SSL_MODE_SERVER); assert(rc == 0) rc = pn_ssl_set_credentials(ssl_server, self.certificate_file, self.certificate_key_file, self.key_file_password) assert(rc == 0) if not self.require_encryption: rc = pn_ssl_allow_unsecured_client(ssl_server) assert(rc == 0) # TODO: client authentication incomplete, for now skip peer verification rc = pn_ssl_set_peer_authentication(ssl_server, PN_SSL_NO_VERIFY_PEER, None) assert(rc == 0) l = pn_driver_listener(self.driver) def processConnections(self): """ Check each connector for pending work. """ cxtr = pn_driver_connector(self.driver) while cxtr: self.log("Process Connector") # process any data coming from the network, this will update the # engine's view of the state of the remote clients pn_connector_process(cxtr) state = pn_connector_context(cxtr) if state == AUTHENTICATING: # connection has not passed SASL authentication yet self.authenticateConnector(cxtr) elif state == CONNECTION_UP: # active connection, service any engine events self.serviceConnector(cxtr) else: print("Error: Unknown Connection state=%s" % state) # now generate any outbound network data generated in reponse to # any work done by the engine. pn_connector_process(cxtr) if pn_connector_closed(cxtr): self.log("Closing connector") pn_connector_free(cxtr) cxtr = pn_driver_connector(self.driver) def enableLogging(self): self.logging = True def log(self, msg): if self.logging: print("%s" % msg) def authenticateConnector(self, cxtr): """ Authenticate the remote client. Run the SASL algorithm until it passes, fails, or needs more data from the remote. """ self.log("Authenticating...") sasl = pn_connector_sasl(cxtr) state = pn_sasl_state(sasl) while state == PN_SASL_CONF or state == PN_SASL_STEP: if state == PN_SASL_CONF: self.log("Authenticating-CONF...") pn_sasl_mechanisms(sasl, "ANONYMOUS") pn_sasl_server(sasl) elif state == PN_SASL_STEP: self.log("Authenticating-STEP...") mech = pn_sasl_remote_mechanisms(sasl) if mech == "ANONYMOUS": pn_sasl_done(sasl, PN_SASL_OK) else: pn_sasl_done(sasl, PN_SASL_AUTH) state = pn_sasl_state(sasl) if state == PN_SASL_PASS: pn_connector_set_connection(cxtr, pn_connection()); pn_connector_set_context(cxtr, CONNECTION_UP) self.log("Authentication-PASSED") elif state == PN_SASL_FAIL: pn_connector_set_context(cxtr, FAILED) self.log("Authentication-FAILED") else: self.log("Authentication-PENDING") def setupLink(self, link): """ Configure a link coming from a client. """ r_tgt = pn_remote_target(link) r_src = pn_remote_source(link) if pn_is_sender(link): self.log("Opening Link to read from mailbox: %s" % r_src) if r_src not in self.mailboxes: print("Error: mailbox %s does not exist!" % r_src) r_src = None # indicate to remote the mailbox does not exist else: self.log("Opening Link to write to mailbox: %s" % r_tgt) if r_tgt not in self.mailboxes: self.mailboxes[r_tgt] = [] # create a new mailbox pn_set_target(link, r_tgt) pn_set_source(link, r_src) if pn_is_sender(link): # grant a delivery to the link - it will become "writable" when the # driver can accept messages for the sender. pn_delivery(link, "server-delivery-%d" % self.counter) self.counter += 1 else: # Grant enough credit to the receiver to allow one inbound message pn_flow(link, 1) pn_link_open(link) def serviceConnector(self, cxtr): """ Process any pending I/O events on the given connector once it has been authenticated. """ self.log("I/O processing start.") # get the engine's connection from the driver conn = pn_connector_connection(cxtr) ## Step 1: setup the engine's connection, and any sessions and links ## that may be pending. # initialize the connection if it's new if pn_connection_state(conn) & PN_LOCAL_UNINIT: self.log("Connection Opened.") pn_connection_open(conn) # open all pending sessions ssn = pn_session_head(conn, PN_LOCAL_UNINIT) while ssn: pn_session_open(ssn) self.log("Session Opened.") ssn = pn_session_next(ssn, PN_LOCAL_UNINIT) # configure and open any pending links link = pn_link_head(conn, PN_LOCAL_UNINIT); while link: self.setupLink(link) link = pn_link_next(link, PN_LOCAL_UNINIT); ## Step 2: Now drain all the pending deliveries from the connection's ## work queue and process them delivery = pn_work_head(conn) while delivery: self.log("Process delivery %s." % pn_delivery_tag(delivery)) if pn_readable(delivery): # inbound data available self.processReceive(delivery) elif pn_writable(delivery): # can send a message self.sendMessage(delivery) if pn_updated(delivery): # check to see if the remote has accepted message we sent self.log("remote disposition for %s: %s " % (str(pn_delivery_tag(delivery)), str(pn_remote_disposition(delivery)))) if pn_remote_disposition(delivery): # once we know the remote has seen the message, we can # release the delivery. pn_settle(delivery) delivery = pn_work_next(delivery) ## Step 3: Clean up any links or sessions that have been closed by the ## remote. If the connection has been closed remotely, clean that up ## also. # teardown any terminating links link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED) while link: pn_link_close(link) self.log("Link Closed") link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED) # teardown any terminating sessions ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED) while ssn: pn_session_close(ssn) self.log("Session Closed") ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED) # teardown the connection if it's terminating if pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED): self.log("Connection Closed") pn_connection_close(conn) def processReceive(self, delivery): """ A delivery has indicated that message data is available. Read the message data, process it, then accept the message. """ link = pn_link(delivery) mbox = pn_remote_target(link) rc, msg = pn_recv(link, 1024); self.log("Msg Received %d" % rc); while rc >= 0: if mbox not in self.mailboxes: print("Error: cannot sent to mailbox %s - dropping message." % mbox) else: self.mailboxes[mbox].append(msg) self.log("Mailbox %s contains: %s" % (mbox, str(self.mailboxes[mbox]))) rc, msg = pn_recv(link, 1024); # now that we hit the end of the message, update the # disposition to let the remote know we accepted it. self.log("Msg Accepted."); pn_disposition(delivery, PN_ACCEPTED) # since we no longer have any more work to do on this delivery, finish # it and move to the next. pn_settle(delivery) pn_advance(link) # if more credit is needed, grant it if pn_credit(link) == 0: pn_flow(link, 1) def sendMessage(self, delivery): """ A delivery has indicated that the link is able to accept a message. Send a message over the link, but do not settle it until the remote accepts it (and updates the delivery's disposition). """ link = pn_link(delivery) mbox = pn_remote_source(link) self.log("Request for Mailbox=%s" % str(mbox)) if mbox in self.mailboxes and self.mailboxes[mbox]: msg = self.mailboxes[mbox].pop(0) self.log("Fetching message %s" % str(msg)) else: print("Warning: mailbox %s is empty, sending empty message." % mbox) msg = "" sent = pn_send(link, msg) assert(sent == len(msg)) self.log("Msg Sent %d" % sent); # if the link can accept more ???RAFI, is that correct????, grant # another delivery if pn_advance(link): pn_delivery(link, "server-delivery-%d" % self.counter) self.counter += 1 # do not settle the delivery now - wait until the remote sets the disposition. ################################################## ################################################## ################################################## def main(): options = Options() server = MailboxServer(options.host, options.port, options.require_encryption, options.certificate_file, options.certificate_key_file, options.key_file_password, options.ca_database) if (options.verbose): server.enableLogging() if not server.setup(): return -1; while True: # wait for a driver event server.wait() # accept all pending connection requests server.acceptConnectionRequests() # process all connectors with pending events server.processConnections() if __name__ == "__main__": sys.exit(main())