Package qpid :: Package messaging :: Module endpoints
[hide private]
[frames] | no frames]

Source Code for Module qpid.messaging.endpoints

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  A candidate high level messaging API for python. 
  22   
  23  Areas that still need work: 
  24   
  25    - definition of the arguments for L{Session.sender} and L{Session.receiver} 
  26    - standard L{Message} properties 
  27    - L{Message} content encoding 
  28    - protocol negotiation/multiprotocol impl 
  29  """ 
  30   
  31  from logging import getLogger 
  32  from math import ceil 
  33  from qpid.codec010 import StringCodec 
  34  from qpid.concurrency import synchronized, Waiter, Condition 
  35  from qpid.datatypes import Serial, uuid4 
  36  from qpid.messaging.constants import * 
  37  from qpid.messaging.exceptions import * 
  38  from qpid.messaging.message import * 
  39  from qpid.ops import PRIMITIVE 
  40  from qpid.util import default, URL 
  41  from threading import Thread, RLock 
  42   
  43  log = getLogger("qpid.messaging") 
  44   
  45  static = staticmethod 
46 47 -class Endpoint:
48
49 - def _ecwait(self, predicate, timeout=None):
50 result = self._ewait(lambda: self.closed or predicate(), timeout) 51 self.check_closed() 52 return result
53
54 -class Connection(Endpoint):
55 56 """ 57 A Connection manages a group of L{Sessions<Session>} and connects 58 them with a remote endpoint. 59 """ 60 61 @static
62 - def establish(url=None, **options):
63 """ 64 Constructs a L{Connection} with the supplied parameters and opens 65 it. 66 """ 67 conn = Connection(url, **options) 68 conn.open() 69 return conn
70
71 - def __init__(self, url=None, **options):
72 """ 73 Creates a connection. A newly created connection must be connected 74 with the Connection.connect() method before it can be used. 75 76 @type url: str 77 @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] 78 @type host: str 79 @param host: the name or ip address of the remote host (overriden by url) 80 @type port: int 81 @param port: the port number of the remote host (overriden by url) 82 @type transport: str 83 @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) 84 @type heartbeat: int 85 @param heartbeat: heartbeat interval in seconds 86 87 @type username: str 88 @param username: the username for authentication (overriden by url) 89 @type password: str 90 @param password: the password for authentication (overriden by url) 91 92 @type sasl_mechanisms: str 93 @param sasl_mechanisms: space separated list of permitted sasl mechanisms 94 @type sasl_service: str 95 @param sasl_service: ??? 96 @type sasl_min_ssf: ??? 97 @param sasl_min_ssf: ??? 98 @type sasl_max_ssf: ??? 99 @param sasl_max_ssf: ??? 100 101 @type reconnect: bool 102 @param reconnect: enable/disable automatic reconnect 103 @type reconnect_timeout: float 104 @param reconnect_timeout: total time to attempt reconnect 105 @type reconnect_internal_min: float 106 @param reconnect_internal_min: minimum interval between reconnect attempts 107 @type reconnect_internal_max: float 108 @param reconnect_internal_max: maximum interval between reconnect attempts 109 @type reconnect_internal: float 110 @param reconnect_interval: set both min and max reconnect intervals 111 @type reconnect_limit: int 112 @param reconnect_limit: limit the total number of reconnect attempts 113 @type reconnect_urls: list[str] 114 @param reconnect_urls: list of backup hosts specified as urls 115 116 @type address_ttl: float 117 @param address_ttl: time until cached address resolution expires 118 119 @rtype: Connection 120 @return: a disconnected Connection 121 """ 122 if url is None: 123 url = options.get("host") 124 if isinstance(url, basestring): 125 url = URL(url) 126 self.host = url.host 127 if options.has_key("transport"): 128 self.transport = options.get("transport") 129 elif url.scheme == url.AMQP: 130 self.transport = "tcp" 131 elif url.scheme == url.AMQPS: 132 self.transport = "ssl" 133 else: 134 self.transport = "tcp" 135 if self.transport in ("ssl", "tcp+tls"): 136 self.port = default(url.port, options.get("port", AMQPS_PORT)) 137 else: 138 self.port = default(url.port, options.get("port", AMQP_PORT)) 139 self.heartbeat = options.get("heartbeat") 140 self.username = default(url.user, options.get("username", None)) 141 self.password = default(url.password, options.get("password", None)) 142 self.auth_username = None 143 144 self.sasl_mechanisms = options.get("sasl_mechanisms") 145 self.sasl_service = options.get("sasl_service", "qpidd") 146 self.sasl_min_ssf = options.get("sasl_min_ssf") 147 self.sasl_max_ssf = options.get("sasl_max_ssf") 148 149 self.reconnect = options.get("reconnect", False) 150 self.reconnect_timeout = options.get("reconnect_timeout") 151 reconnect_interval = options.get("reconnect_interval") 152 self.reconnect_interval_min = options.get("reconnect_interval_min", 153 default(reconnect_interval, 1)) 154 self.reconnect_interval_max = options.get("reconnect_interval_max", 155 default(reconnect_interval, 2*60)) 156 self.reconnect_limit = options.get("reconnect_limit") 157 self.reconnect_urls = options.get("reconnect_urls", []) 158 self.reconnect_log = options.get("reconnect_log", True) 159 160 self.address_ttl = options.get("address_ttl", 60) 161 self.tcp_nodelay = options.get("tcp_nodelay", False) 162 163 self.options = options 164 165 166 self.id = str(uuid4()) 167 self.session_counter = 0 168 self.sessions = {} 169 self._open = False 170 self._connected = False 171 self._transport_connected = False 172 self._lock = RLock() 173 self._condition = Condition(self._lock) 174 self._waiter = Waiter(self._condition) 175 self._modcount = Serial(0) 176 self.error = None 177 from driver import Driver 178 self._driver = Driver(self)
179
180 - def _wait(self, predicate, timeout=None):
181 return self._waiter.wait(predicate, timeout=timeout)
182
183 - def _wakeup(self):
184 self._modcount += 1 185 self._driver.wakeup()
186
187 - def check_error(self):
188 if self.error: 189 self._condition.gc() 190 raise self.error
191
192 - def get_error(self):
193 return self.error
194
195 - def _ewait(self, predicate, timeout=None):
196 result = self._wait(lambda: self.error or predicate(), timeout) 197 self.check_error() 198 return result
199
200 - def check_closed(self):
201 if not self._connected: 202 self._condition.gc() 203 raise ConnectionClosed()
204 205 @synchronized
206 - def session(self, name=None, transactional=False):
207 """ 208 Creates or retrieves the named session. If the name is omitted or 209 None, then a unique name is chosen based on a randomly generated 210 uuid. 211 212 @type name: str 213 @param name: the session name 214 @rtype: Session 215 @return: the named Session 216 """ 217 218 if name is None: 219 name = "%s:%s" % (self.id, self.session_counter) 220 self.session_counter += 1 221 else: 222 name = "%s:%s" % (self.id, name) 223 224 if self.sessions.has_key(name): 225 return self.sessions[name] 226 else: 227 ssn = Session(self, name, transactional) 228 self.sessions[name] = ssn 229 self._wakeup() 230 return ssn
231 232 @synchronized
233 - def _remove_session(self, ssn):
234 self.sessions.pop(ssn.name, 0)
235 236 @synchronized
237 - def open(self):
238 """ 239 Opens a connection. 240 """ 241 if self._open: 242 raise ConnectionError("already open") 243 self._open = True 244 self.attach()
245 246 @synchronized
247 - def opened(self):
248 """ 249 Return true if the connection is open, false otherwise. 250 """ 251 return self._open
252 253 @synchronized
254 - def attach(self):
255 """ 256 Attach to the remote endpoint. 257 """ 258 if not self._connected: 259 self._connected = True 260 self._driver.start() 261 self._wakeup() 262 self._ewait(lambda: self._transport_connected and not self._unlinked())
263
264 - def _unlinked(self):
265 return [l 266 for ssn in self.sessions.values() 267 if not (ssn.error or ssn.closed) 268 for l in ssn.senders + ssn.receivers 269 if not (l.linked or l.error or l.closed)]
270 271 @synchronized
272 - def detach(self, timeout=None):
273 """ 274 Detach from the remote endpoint. 275 """ 276 if self._connected: 277 self._connected = False 278 self._wakeup() 279 cleanup = True 280 else: 281 cleanup = False 282 try: 283 if not self._wait(lambda: not self._transport_connected, timeout=timeout): 284 raise Timeout("detach timed out") 285 finally: 286 if cleanup: 287 self._driver.stop() 288 self._condition.gc()
289 290 @synchronized
291 - def attached(self):
292 """ 293 Return true if the connection is attached, false otherwise. 294 """ 295 return self._connected
296 297 @synchronized
298 - def close(self, timeout=None):
299 """ 300 Close the connection and all sessions. 301 """ 302 try: 303 for ssn in self.sessions.values(): 304 ssn.close(timeout=timeout) 305 finally: 306 self.detach(timeout=timeout) 307 self._open = False
308
309 -class Session(Endpoint):
310 311 """ 312 Sessions provide a linear context for sending and receiving 313 L{Messages<Message>}. L{Messages<Message>} are sent and received 314 using the L{Sender.send} and L{Receiver.fetch} methods of the 315 L{Sender} and L{Receiver} objects associated with a Session. 316 317 Each L{Sender} and L{Receiver} is created by supplying either a 318 target or source address to the L{sender} and L{receiver} methods of 319 the Session. The address is supplied via a string syntax documented 320 below. 321 322 Addresses 323 ========= 324 325 An address identifies a source or target for messages. In its 326 simplest form this is just a name. In general a target address may 327 also be used as a source address, however not all source addresses 328 may be used as a target, e.g. a source might additionally have some 329 filtering criteria that would not be present in a target. 330 331 A subject may optionally be specified along with the name. When an 332 address is used as a target, any subject specified in the address is 333 used as the default subject of outgoing messages for that target. 334 When an address is used as a source, any subject specified in the 335 address is pattern matched against the subject of available messages 336 as a filter for incoming messages from that source. 337 338 The options map contains additional information about the address 339 including: 340 341 - policies for automatically creating, and deleting the node to 342 which an address refers 343 344 - policies for asserting facts about the node to which an address 345 refers 346 347 - extension points that can be used for sender/receiver 348 configuration 349 350 Mapping to AMQP 0-10 351 -------------------- 352 The name is resolved to either an exchange or a queue by querying 353 the broker. 354 355 The subject is set as a property on the message. Additionally, if 356 the name refers to an exchange, the routing key is set to the 357 subject. 358 359 Syntax 360 ------ 361 The following regular expressions define the tokens used to parse 362 addresses:: 363 LBRACE: \\{ 364 RBRACE: \\} 365 LBRACK: \\[ 366 RBRACK: \\] 367 COLON: : 368 SEMI: ; 369 SLASH: / 370 COMMA: , 371 NUMBER: [+-]?[0-9]*\\.?[0-9]+ 372 ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? 373 STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' 374 ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] 375 SYM: [.#*%@$^!+-] 376 WSPACE: [ \\n\\r\\t]+ 377 378 The formal grammar for addresses is given below:: 379 address = name [ "/" subject ] [ ";" options ] 380 name = ( part | quoted )+ 381 subject = ( part | quoted | "/" )* 382 quoted = STRING / ESC 383 part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM 384 options = map 385 map = "{" ( keyval ( "," keyval )* )? "}" 386 keyval = ID ":" value 387 value = NUMBER / STRING / ID / map / list 388 list = "[" ( value ( "," value )* )? "]" 389 390 This grammar resuls in the following informal syntax:: 391 392 <name> [ / <subject> ] [ ; <options> ] 393 394 Where options is:: 395 396 { <key> : <value>, ... } 397 398 And values may be: 399 - numbers 400 - single, double, or non quoted strings 401 - maps (dictionaries) 402 - lists 403 404 Options 405 ------- 406 The options map permits the following parameters:: 407 408 <name> [ / <subject> ] ; { 409 create: always | sender | receiver | never, 410 delete: always | sender | receiver | never, 411 assert: always | sender | receiver | never, 412 mode: browse | consume, 413 node: { 414 type: queue | topic, 415 durable: True | False, 416 x-declare: { ... <declare-overrides> ... }, 417 x-bindings: [<binding_1>, ... <binding_n>] 418 }, 419 link: { 420 name: <link-name>, 421 durable: True | False, 422 reliability: unreliable | at-most-once | at-least-once | exactly-once, 423 x-declare: { ... <declare-overrides> ... }, 424 x-bindings: [<binding_1>, ... <binding_n>], 425 x-subscribe: { ... <subscribe-overrides> ... } 426 } 427 } 428 429 Bindings are specified as a map with the following options:: 430 431 { 432 exchange: <exchange>, 433 queue: <queue>, 434 key: <key>, 435 arguments: <arguments> 436 } 437 438 The create, delete, and assert policies specify who should perfom 439 the associated action: 440 441 - I{always}: the action will always be performed 442 - I{sender}: the action will only be performed by the sender 443 - I{receiver}: the action will only be performed by the receiver 444 - I{never}: the action will never be performed (this is the default) 445 446 The node-type is one of: 447 448 - I{topic}: a topic node will default to the topic exchange, 449 x-declare may be used to specify other exchange types 450 - I{queue}: this is the default node-type 451 452 The x-declare map permits protocol specific keys and values to be 453 specified when exchanges or queues are declared. These keys and 454 values are passed through when creating a node or asserting facts 455 about an existing node. 456 457 Examples 458 -------- 459 A simple name resolves to any named node, usually a queue or a 460 topic:: 461 462 my-queue-or-topic 463 464 A simple name with a subject will also resolve to a node, but the 465 presence of the subject will cause a sender using this address to 466 set the subject on outgoing messages, and receivers to filter based 467 on the subject:: 468 469 my-queue-or-topic/my-subject 470 471 A subject pattern can be used and will cause filtering if used by 472 the receiver. If used for a sender, the literal value gets set as 473 the subject:: 474 475 my-queue-or-topic/my-* 476 477 In all the above cases, the address is resolved to an existing node. 478 If you want the node to be auto-created, then you can do the 479 following. By default nonexistent nodes are assumed to be queues:: 480 481 my-queue; {create: always} 482 483 You can customize the properties of the queue:: 484 485 my-queue; {create: always, node: {durable: True}} 486 487 You can create a topic instead if you want:: 488 489 my-queue; {create: always, node: {type: topic}} 490 491 You can assert that the address resolves to a node with particular 492 properties:: 493 494 my-transient-topic; { 495 assert: always, 496 node: { 497 type: topic, 498 durable: False 499 } 500 } 501 """ 502
503 - def __init__(self, connection, name, transactional):
504 self.connection = connection 505 self.name = name 506 self.log_id = "%x" % id(self) 507 508 self.transactional = transactional 509 510 self.committing = False 511 self.committed = True 512 self.aborting = False 513 self.aborted = False 514 515 self.next_sender_id = 0 516 self.senders = [] 517 self.next_receiver_id = 0 518 self.receivers = [] 519 self.outgoing = [] 520 self.incoming = [] 521 self.unacked = [] 522 self.acked = [] 523 # XXX: I hate this name. 524 self.ack_capacity = UNLIMITED 525 526 self.error = None 527 self.closing = False 528 self.closed = False 529 530 self._lock = connection._lock
531
532 - def __repr__(self):
533 return "<Session %s>" % self.name
534
535 - def _wait(self, predicate, timeout=None):
536 return self.connection._wait(predicate, timeout=timeout)
537
538 - def _wakeup(self):
539 self.connection._wakeup()
540
541 - def check_error(self):
542 self.connection.check_error() 543 if self.error: 544 raise self.error
545
546 - def get_error(self):
547 err = self.connection.get_error() 548 if err: 549 return err 550 else: 551 return self.error
552
553 - def _ewait(self, predicate, timeout=None):
554 result = self.connection._ewait(lambda: self.error or predicate(), timeout) 555 self.check_error() 556 return result
557
558 - def check_closed(self):
559 if self.closed: 560 raise SessionClosed()
561 562 @synchronized
563 - def sender(self, target, **options):
564 """ 565 Creates a L{Sender} that may be used to send L{Messages<Message>} 566 to the specified target. 567 568 @type target: str 569 @param target: the target to which messages will be sent 570 @rtype: Sender 571 @return: a new Sender for the specified target 572 """ 573 target = _mangle(target) 574 sender = Sender(self, self.next_sender_id, target, options) 575 self.next_sender_id += 1 576 self.senders.append(sender) 577 if not self.closed and self.connection._connected: 578 self._wakeup() 579 try: 580 sender._ewait(lambda: sender.linked) 581 except LinkError, e: 582 sender.close() 583 raise e 584 return sender
585 586 @synchronized
587 - def receiver(self, source, **options):
588 """ 589 Creates a receiver that may be used to fetch L{Messages<Message>} 590 from the specified source. 591 592 @type source: str 593 @param source: the source of L{Messages<Message>} 594 @rtype: Receiver 595 @return: a new Receiver for the specified source 596 """ 597 source = _mangle(source) 598 receiver = Receiver(self, self.next_receiver_id, source, options) 599 self.next_receiver_id += 1 600 self.receivers.append(receiver) 601 if not self.closed and self.connection._connected: 602 self._wakeup() 603 try: 604 receiver._ewait(lambda: receiver.linked) 605 except LinkError, e: 606 receiver.close() 607 raise e 608 return receiver
609 610 @synchronized
611 - def _count(self, predicate):
612 result = 0 613 for msg in self.incoming: 614 if predicate(msg): 615 result += 1 616 return result
617
618 - def _peek(self, receiver):
619 for msg in self.incoming: 620 if msg._receiver == receiver: 621 return msg
622
623 - def _pop(self, receiver):
624 i = 0 625 while i < len(self.incoming): 626 msg = self.incoming[i] 627 if msg._receiver == receiver: 628 del self.incoming[i] 629 return msg 630 else: 631 i += 1
632 633 @synchronized
634 - def _get(self, receiver, timeout=None):
635 if self._ewait(lambda: ((self._peek(receiver) is not None) or 636 self.closing or receiver.closed), 637 timeout): 638 msg = self._pop(receiver) 639 if msg is not None: 640 msg._receiver.returned += 1 641 self.unacked.append(msg) 642 log.debug("RETR[%s]: %s", self.log_id, msg) 643 return msg 644 return None
645 646 @synchronized
647 - def next_receiver(self, timeout=None):
648 if self._ecwait(lambda: self.incoming, timeout): 649 return self.incoming[0]._receiver 650 else: 651 raise Empty
652 653 @synchronized
654 - def acknowledge(self, message=None, disposition=None, sync=True):
655 """ 656 Acknowledge the given L{Message}. If message is None, then all 657 unacknowledged messages on the session are acknowledged. 658 659 @type message: Message 660 @param message: the message to acknowledge or None 661 @type sync: boolean 662 @param sync: if true then block until the message(s) are acknowledged 663 """ 664 if message is None: 665 messages = self.unacked[:] 666 else: 667 messages = [message] 668 669 for m in messages: 670 if self.ack_capacity is not UNLIMITED: 671 if self.ack_capacity <= 0: 672 # XXX: this is currently a SendError, maybe it should be a SessionError? 673 raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) 674 self._wakeup() 675 self._ecwait(lambda: len(self.acked) < self.ack_capacity) 676 m._disposition = disposition 677 self.unacked.remove(m) 678 self.acked.append(m) 679 680 self._wakeup() 681 if sync: 682 self._ecwait(lambda: not [m for m in messages if m in self.acked])
683 684 @synchronized
685 - def commit(self):
686 """ 687 Commit outstanding transactional work. This consists of all 688 message sends and receives since the prior commit or rollback. 689 """ 690 if not self.transactional: 691 raise NontransactionalSession() 692 self.committing = True 693 self._wakeup() 694 self._ecwait(lambda: not self.committing) 695 if self.aborted: 696 raise TransactionAborted() 697 assert self.committed
698 699 @synchronized
700 - def rollback(self):
701 """ 702 Rollback outstanding transactional work. This consists of all 703 message sends and receives since the prior commit or rollback. 704 """ 705 if not self.transactional: 706 raise NontransactionalSession() 707 self.aborting = True 708 self._wakeup() 709 self._ecwait(lambda: not self.aborting) 710 assert self.aborted
711 712 @synchronized
713 - def sync(self, timeout=None):
714 """ 715 Sync the session. 716 """ 717 for snd in self.senders: 718 snd.sync(timeout=timeout) 719 if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): 720 raise Timeout("session sync timed out")
721 722 @synchronized
723 - def close(self, timeout=None):
724 """ 725 Close the session. 726 """ 727 self.sync(timeout=timeout) 728 729 for link in self.receivers + self.senders: 730 link.close(timeout=timeout) 731 732 if not self.closing: 733 self.closing = True 734 self._wakeup() 735 736 try: 737 if not self._ewait(lambda: self.closed, timeout=timeout): 738 raise Timeout("session close timed out") 739 finally: 740 self.connection._remove_session(self)
741
742 -def _mangle(addr):
743 if addr and addr.startswith("#"): 744 return str(uuid4()) + addr 745 else: 746 return addr
747
748 -class Sender(Endpoint):
749 750 """ 751 Sends outgoing messages. 752 """ 753
754 - def __init__(self, session, id, target, options):
755 self.session = session 756 self.id = id 757 self.target = target 758 self.options = options 759 self.capacity = options.get("capacity", UNLIMITED) 760 self.threshold = 0.5 761 self.durable = options.get("durable") 762 self.queued = Serial(0) 763 self.synced = Serial(0) 764 self.acked = Serial(0) 765 self.error = None 766 self.linked = False 767 self.closing = False 768 self.closed = False 769 self._lock = self.session._lock
770
771 - def _wakeup(self):
772 self.session._wakeup()
773
774 - def check_error(self):
775 self.session.check_error() 776 if self.error: 777 raise self.error
778
779 - def get_error(self):
780 err = self.session.get_error() 781 if err: 782 return err 783 else: 784 return self.error
785
786 - def _ewait(self, predicate, timeout=None):
787 result = self.session._ewait(lambda: self.error or predicate(), timeout) 788 self.check_error() 789 return result
790
791 - def check_closed(self):
792 if self.closed: 793 raise LinkClosed()
794 795 @synchronized
796 - def unsettled(self):
797 """ 798 Returns the number of messages awaiting acknowledgment. 799 @rtype: int 800 @return: the number of unacknowledged messages 801 """ 802 return self.queued - self.acked
803 804 @synchronized
805 - def available(self):
806 if self.capacity is UNLIMITED: 807 return UNLIMITED 808 else: 809 return self.capacity - self.unsettled()
810 811 @synchronized
812 - def send(self, object, sync=True, timeout=None):
813 """ 814 Send a message. If the object passed in is of type L{unicode}, 815 L{str}, L{list}, or L{dict}, it will automatically be wrapped in a 816 L{Message} and sent. If it is of type L{Message}, it will be sent 817 directly. If the sender capacity is not L{UNLIMITED} then send 818 will block until there is available capacity to send the message. 819 If the timeout parameter is specified, then send will throw an 820 L{InsufficientCapacity} exception if capacity does not become 821 available within the specified time. 822 823 @type object: unicode, str, list, dict, Message 824 @param object: the message or content to send 825 826 @type sync: boolean 827 @param sync: if true then block until the message is sent 828 829 @type timeout: float 830 @param timeout: the time to wait for available capacity 831 """ 832 833 if not self.session.connection._connected or self.session.closing: 834 raise Detached() 835 836 self._ecwait(lambda: self.linked) 837 838 if isinstance(object, Message): 839 message = object 840 else: 841 message = Message(object) 842 843 if message.durable is None: 844 message.durable = self.durable 845 846 if self.capacity is not UNLIMITED: 847 if self.capacity <= 0: 848 raise InsufficientCapacity("capacity = %s" % self.capacity) 849 if not self._ecwait(self.available, timeout=timeout): 850 raise InsufficientCapacity("capacity = %s" % self.capacity) 851 852 # XXX: what if we send the same message to multiple senders? 853 message._sender = self 854 if self.capacity is not UNLIMITED: 855 message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) 856 else: 857 message._sync = sync 858 self.session.outgoing.append(message) 859 self.queued += 1 860 861 if sync: 862 self.sync() 863 assert message not in self.session.outgoing 864 else: 865 self._wakeup()
866 867 @synchronized
868 - def sync(self, timeout=None):
869 mno = self.queued 870 if self.synced < mno: 871 self.synced = mno 872 self._wakeup() 873 if not self._ewait(lambda: self.acked >= mno, timeout=timeout): 874 raise Timeout("sender sync timed out")
875 876 @synchronized
877 - def close(self, timeout=None):
878 """ 879 Close the Sender. 880 """ 881 # avoid erroring out when closing a sender that was never 882 # established 883 if self.acked < self.queued: 884 self.sync(timeout=timeout) 885 886 if not self.closing: 887 self.closing = True 888 self._wakeup() 889 890 try: 891 if not self.session._ewait(lambda: self.closed, timeout=timeout): 892 raise Timeout("sender close timed out") 893 finally: 894 try: 895 self.session.senders.remove(self) 896 except ValueError: 897 pass
898
899 -class Receiver(Endpoint, object):
900 901 """ 902 Receives incoming messages from a remote source. Messages may be 903 fetched with L{fetch}. 904 """ 905
906 - def __init__(self, session, id, source, options):
907 self.session = session 908 self.id = id 909 self.source = source 910 self.options = options 911 912 self.granted = Serial(0) 913 self.draining = False 914 self.impending = Serial(0) 915 self.received = Serial(0) 916 self.returned = Serial(0) 917 918 self.error = None 919 self.linked = False 920 self.closing = False 921 self.closed = False 922 self._lock = self.session._lock 923 self._capacity = 0 924 self._set_capacity(options.get("capacity", 0), False) 925 self.threshold = 0.5
926 927 @synchronized
928 - def _set_capacity(self, c, wakeup=True):
929 if c is UNLIMITED: 930 self._capacity = c.value 931 else: 932 self._capacity = c 933 self._grant() 934 if wakeup: 935 self._wakeup()
936
937 - def _get_capacity(self):
938 if self._capacity == UNLIMITED.value: 939 return UNLIMITED 940 else: 941 return self._capacity
942 943 capacity = property(_get_capacity, _set_capacity) 944
945 - def _wakeup(self):
946 self.session._wakeup()
947
948 - def check_error(self):
949 self.session.check_error() 950 if self.error: 951 raise self.error
952
953 - def get_error(self):
954 err = self.session.get_error() 955 if err: 956 return err 957 else: 958 return self.error
959
960 - def _ewait(self, predicate, timeout=None):
961 result = self.session._ewait(lambda: self.error or predicate(), timeout) 962 self.check_error() 963 return result
964
965 - def check_closed(self):
966 if self.closed: 967 raise LinkClosed()
968 969 @synchronized
970 - def unsettled(self):
971 """ 972 Returns the number of acknowledged messages awaiting confirmation. 973 """ 974 return len([m for m in self.acked if m._receiver is self])
975 976 @synchronized
977 - def available(self):
978 """ 979 Returns the number of messages available to be fetched by the 980 application. 981 982 @rtype: int 983 @return: the number of available messages 984 """ 985 return self.received - self.returned
986 987 @synchronized
988 - def fetch(self, timeout=None):
989 """ 990 Fetch and return a single message. A timeout of None will block 991 forever waiting for a message to arrive, a timeout of zero will 992 return immediately if no messages are available. 993 994 @type timeout: float 995 @param timeout: the time to wait for a message to be available 996 """ 997 998 self._ecwait(lambda: self.linked) 999 1000 if self._capacity == 0: 1001 self.granted = self.returned + 1 1002 self._wakeup() 1003 self._ecwait(lambda: self.impending >= self.granted) 1004 msg = self.session._get(self, timeout=timeout) 1005 if msg is None: 1006 self.check_closed() 1007 self.draining = True 1008 self._wakeup() 1009 self._ecwait(lambda: not self.draining) 1010 self._grant() 1011 self._wakeup() 1012 msg = self.session._get(self, timeout=0) 1013 if msg is None: 1014 raise Empty() 1015 elif self._capacity not in (0, UNLIMITED.value): 1016 t = int(ceil(self.threshold * self._capacity)) 1017 if self.received - self.returned <= t: 1018 self.granted = self.returned + self._capacity 1019 self._wakeup() 1020 return msg
1021
1022 - def _grant(self):
1023 if self._capacity == UNLIMITED.value: 1024 self.granted = UNLIMITED 1025 else: 1026 self.granted = self.returned + self._capacity
1027 1028 @synchronized
1029 - def close(self, timeout=None):
1030 """ 1031 Close the receiver. 1032 """ 1033 if not self.closing: 1034 self.closing = True 1035 self._wakeup() 1036 1037 try: 1038 if not self.session._ewait(lambda: self.closed, timeout=timeout): 1039 raise Timeout("receiver close timed out") 1040 finally: 1041 try: 1042 self.session.receivers.remove(self) 1043 except ValueError: 1044 pass
1045 1046 __all__ = ["Connection", "Session", "Sender", "Receiver"] 1047