1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 A candidate high level messaging API for python.
22
23 Areas that still need work:
24
25 - definition of the arguments for L{Session.sender} and L{Session.receiver}
26 - standard L{Message} properties
27 - L{Message} content encoding
28 - protocol negotiation/multiprotocol impl
29 """
30
31 from logging import getLogger
32 from math import ceil
33 from qpid.codec010 import StringCodec
34 from qpid.concurrency import synchronized, Waiter, Condition
35 from qpid.datatypes import Serial, uuid4
36 from qpid.messaging.constants import *
37 from qpid.messaging.exceptions import *
38 from qpid.messaging.message import *
39 from qpid.ops import PRIMITIVE
40 from qpid.util import default, URL
41 from threading import Thread, RLock
42
43 log = getLogger("qpid.messaging")
44
45 static = staticmethod
48
49 - def _ecwait(self, predicate, timeout=None):
53
55
56 """
57 A Connection manages a group of L{Sessions<Session>} and connects
58 them with a remote endpoint.
59 """
60
61 @static
63 """
64 Constructs a L{Connection} with the supplied parameters and opens
65 it.
66 """
67 conn = Connection(url, **options)
68 conn.open()
69 return conn
70
71 - def __init__(self, url=None, **options):
72 """
73 Creates a connection. A newly created connection must be connected
74 with the Connection.connect() method before it can be used.
75
76 @type url: str
77 @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ]
78 @type host: str
79 @param host: the name or ip address of the remote host (overriden by url)
80 @type port: int
81 @param port: the port number of the remote host (overriden by url)
82 @type transport: str
83 @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls)
84 @type heartbeat: int
85 @param heartbeat: heartbeat interval in seconds
86
87 @type username: str
88 @param username: the username for authentication (overriden by url)
89 @type password: str
90 @param password: the password for authentication (overriden by url)
91
92 @type sasl_mechanisms: str
93 @param sasl_mechanisms: space separated list of permitted sasl mechanisms
94 @type sasl_service: str
95 @param sasl_service: ???
96 @type sasl_min_ssf: ???
97 @param sasl_min_ssf: ???
98 @type sasl_max_ssf: ???
99 @param sasl_max_ssf: ???
100
101 @type reconnect: bool
102 @param reconnect: enable/disable automatic reconnect
103 @type reconnect_timeout: float
104 @param reconnect_timeout: total time to attempt reconnect
105 @type reconnect_internal_min: float
106 @param reconnect_internal_min: minimum interval between reconnect attempts
107 @type reconnect_internal_max: float
108 @param reconnect_internal_max: maximum interval between reconnect attempts
109 @type reconnect_internal: float
110 @param reconnect_interval: set both min and max reconnect intervals
111 @type reconnect_limit: int
112 @param reconnect_limit: limit the total number of reconnect attempts
113 @type reconnect_urls: list[str]
114 @param reconnect_urls: list of backup hosts specified as urls
115
116 @type address_ttl: float
117 @param address_ttl: time until cached address resolution expires
118
119 @rtype: Connection
120 @return: a disconnected Connection
121 """
122 if url is None:
123 url = options.get("host")
124 if isinstance(url, basestring):
125 url = URL(url)
126 self.host = url.host
127 if options.has_key("transport"):
128 self.transport = options.get("transport")
129 elif url.scheme == url.AMQP:
130 self.transport = "tcp"
131 elif url.scheme == url.AMQPS:
132 self.transport = "ssl"
133 else:
134 self.transport = "tcp"
135 if self.transport in ("ssl", "tcp+tls"):
136 self.port = default(url.port, options.get("port", AMQPS_PORT))
137 else:
138 self.port = default(url.port, options.get("port", AMQP_PORT))
139 self.heartbeat = options.get("heartbeat")
140 self.username = default(url.user, options.get("username", None))
141 self.password = default(url.password, options.get("password", None))
142 self.auth_username = None
143
144 self.sasl_mechanisms = options.get("sasl_mechanisms")
145 self.sasl_service = options.get("sasl_service", "qpidd")
146 self.sasl_min_ssf = options.get("sasl_min_ssf")
147 self.sasl_max_ssf = options.get("sasl_max_ssf")
148
149 self.reconnect = options.get("reconnect", False)
150 self.reconnect_timeout = options.get("reconnect_timeout")
151 reconnect_interval = options.get("reconnect_interval")
152 self.reconnect_interval_min = options.get("reconnect_interval_min",
153 default(reconnect_interval, 1))
154 self.reconnect_interval_max = options.get("reconnect_interval_max",
155 default(reconnect_interval, 2*60))
156 self.reconnect_limit = options.get("reconnect_limit")
157 self.reconnect_urls = options.get("reconnect_urls", [])
158 self.reconnect_log = options.get("reconnect_log", True)
159
160 self.address_ttl = options.get("address_ttl", 60)
161 self.tcp_nodelay = options.get("tcp_nodelay", False)
162
163 self.options = options
164
165
166 self.id = str(uuid4())
167 self.session_counter = 0
168 self.sessions = {}
169 self._open = False
170 self._connected = False
171 self._transport_connected = False
172 self._lock = RLock()
173 self._condition = Condition(self._lock)
174 self._waiter = Waiter(self._condition)
175 self._modcount = Serial(0)
176 self.error = None
177 from driver import Driver
178 self._driver = Driver(self)
179
180 - def _wait(self, predicate, timeout=None):
182
184 self._modcount += 1
185 self._driver.wakeup()
186
188 if self.error:
189 self._condition.gc()
190 raise self.error
191
194
195 - def _ewait(self, predicate, timeout=None):
199
201 if not self._connected:
202 self._condition.gc()
203 raise ConnectionClosed()
204
205 @synchronized
206 - def session(self, name=None, transactional=False):
207 """
208 Creates or retrieves the named session. If the name is omitted or
209 None, then a unique name is chosen based on a randomly generated
210 uuid.
211
212 @type name: str
213 @param name: the session name
214 @rtype: Session
215 @return: the named Session
216 """
217
218 if name is None:
219 name = "%s:%s" % (self.id, self.session_counter)
220 self.session_counter += 1
221 else:
222 name = "%s:%s" % (self.id, name)
223
224 if self.sessions.has_key(name):
225 return self.sessions[name]
226 else:
227 ssn = Session(self, name, transactional)
228 self.sessions[name] = ssn
229 self._wakeup()
230 return ssn
231
232 @synchronized
234 self.sessions.pop(ssn.name, 0)
235
236 @synchronized
238 """
239 Opens a connection.
240 """
241 if self._open:
242 raise ConnectionError("already open")
243 self._open = True
244 self.attach()
245
246 @synchronized
248 """
249 Return true if the connection is open, false otherwise.
250 """
251 return self._open
252
253 @synchronized
255 """
256 Attach to the remote endpoint.
257 """
258 if not self._connected:
259 self._connected = True
260 self._driver.start()
261 self._wakeup()
262 self._ewait(lambda: self._transport_connected and not self._unlinked())
263
265 return [l
266 for ssn in self.sessions.values()
267 if not (ssn.error or ssn.closed)
268 for l in ssn.senders + ssn.receivers
269 if not (l.linked or l.error or l.closed)]
270
271 @synchronized
272 - def detach(self, timeout=None):
273 """
274 Detach from the remote endpoint.
275 """
276 if self._connected:
277 self._connected = False
278 self._wakeup()
279 cleanup = True
280 else:
281 cleanup = False
282 try:
283 if not self._wait(lambda: not self._transport_connected, timeout=timeout):
284 raise Timeout("detach timed out")
285 finally:
286 if cleanup:
287 self._driver.stop()
288 self._condition.gc()
289
290 @synchronized
292 """
293 Return true if the connection is attached, false otherwise.
294 """
295 return self._connected
296
297 @synchronized
298 - def close(self, timeout=None):
299 """
300 Close the connection and all sessions.
301 """
302 try:
303 for ssn in self.sessions.values():
304 ssn.close(timeout=timeout)
305 finally:
306 self.detach(timeout=timeout)
307 self._open = False
308
310
311 """
312 Sessions provide a linear context for sending and receiving
313 L{Messages<Message>}. L{Messages<Message>} are sent and received
314 using the L{Sender.send} and L{Receiver.fetch} methods of the
315 L{Sender} and L{Receiver} objects associated with a Session.
316
317 Each L{Sender} and L{Receiver} is created by supplying either a
318 target or source address to the L{sender} and L{receiver} methods of
319 the Session. The address is supplied via a string syntax documented
320 below.
321
322 Addresses
323 =========
324
325 An address identifies a source or target for messages. In its
326 simplest form this is just a name. In general a target address may
327 also be used as a source address, however not all source addresses
328 may be used as a target, e.g. a source might additionally have some
329 filtering criteria that would not be present in a target.
330
331 A subject may optionally be specified along with the name. When an
332 address is used as a target, any subject specified in the address is
333 used as the default subject of outgoing messages for that target.
334 When an address is used as a source, any subject specified in the
335 address is pattern matched against the subject of available messages
336 as a filter for incoming messages from that source.
337
338 The options map contains additional information about the address
339 including:
340
341 - policies for automatically creating, and deleting the node to
342 which an address refers
343
344 - policies for asserting facts about the node to which an address
345 refers
346
347 - extension points that can be used for sender/receiver
348 configuration
349
350 Mapping to AMQP 0-10
351 --------------------
352 The name is resolved to either an exchange or a queue by querying
353 the broker.
354
355 The subject is set as a property on the message. Additionally, if
356 the name refers to an exchange, the routing key is set to the
357 subject.
358
359 Syntax
360 ------
361 The following regular expressions define the tokens used to parse
362 addresses::
363 LBRACE: \\{
364 RBRACE: \\}
365 LBRACK: \\[
366 RBRACK: \\]
367 COLON: :
368 SEMI: ;
369 SLASH: /
370 COMMA: ,
371 NUMBER: [+-]?[0-9]*\\.?[0-9]+
372 ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?
373 STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\'
374 ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]
375 SYM: [.#*%@$^!+-]
376 WSPACE: [ \\n\\r\\t]+
377
378 The formal grammar for addresses is given below::
379 address = name [ "/" subject ] [ ";" options ]
380 name = ( part | quoted )+
381 subject = ( part | quoted | "/" )*
382 quoted = STRING / ESC
383 part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM
384 options = map
385 map = "{" ( keyval ( "," keyval )* )? "}"
386 keyval = ID ":" value
387 value = NUMBER / STRING / ID / map / list
388 list = "[" ( value ( "," value )* )? "]"
389
390 This grammar resuls in the following informal syntax::
391
392 <name> [ / <subject> ] [ ; <options> ]
393
394 Where options is::
395
396 { <key> : <value>, ... }
397
398 And values may be:
399 - numbers
400 - single, double, or non quoted strings
401 - maps (dictionaries)
402 - lists
403
404 Options
405 -------
406 The options map permits the following parameters::
407
408 <name> [ / <subject> ] ; {
409 create: always | sender | receiver | never,
410 delete: always | sender | receiver | never,
411 assert: always | sender | receiver | never,
412 mode: browse | consume,
413 node: {
414 type: queue | topic,
415 durable: True | False,
416 x-declare: { ... <declare-overrides> ... },
417 x-bindings: [<binding_1>, ... <binding_n>]
418 },
419 link: {
420 name: <link-name>,
421 durable: True | False,
422 reliability: unreliable | at-most-once | at-least-once | exactly-once,
423 x-declare: { ... <declare-overrides> ... },
424 x-bindings: [<binding_1>, ... <binding_n>],
425 x-subscribe: { ... <subscribe-overrides> ... }
426 }
427 }
428
429 Bindings are specified as a map with the following options::
430
431 {
432 exchange: <exchange>,
433 queue: <queue>,
434 key: <key>,
435 arguments: <arguments>
436 }
437
438 The create, delete, and assert policies specify who should perfom
439 the associated action:
440
441 - I{always}: the action will always be performed
442 - I{sender}: the action will only be performed by the sender
443 - I{receiver}: the action will only be performed by the receiver
444 - I{never}: the action will never be performed (this is the default)
445
446 The node-type is one of:
447
448 - I{topic}: a topic node will default to the topic exchange,
449 x-declare may be used to specify other exchange types
450 - I{queue}: this is the default node-type
451
452 The x-declare map permits protocol specific keys and values to be
453 specified when exchanges or queues are declared. These keys and
454 values are passed through when creating a node or asserting facts
455 about an existing node.
456
457 Examples
458 --------
459 A simple name resolves to any named node, usually a queue or a
460 topic::
461
462 my-queue-or-topic
463
464 A simple name with a subject will also resolve to a node, but the
465 presence of the subject will cause a sender using this address to
466 set the subject on outgoing messages, and receivers to filter based
467 on the subject::
468
469 my-queue-or-topic/my-subject
470
471 A subject pattern can be used and will cause filtering if used by
472 the receiver. If used for a sender, the literal value gets set as
473 the subject::
474
475 my-queue-or-topic/my-*
476
477 In all the above cases, the address is resolved to an existing node.
478 If you want the node to be auto-created, then you can do the
479 following. By default nonexistent nodes are assumed to be queues::
480
481 my-queue; {create: always}
482
483 You can customize the properties of the queue::
484
485 my-queue; {create: always, node: {durable: True}}
486
487 You can create a topic instead if you want::
488
489 my-queue; {create: always, node: {type: topic}}
490
491 You can assert that the address resolves to a node with particular
492 properties::
493
494 my-transient-topic; {
495 assert: always,
496 node: {
497 type: topic,
498 durable: False
499 }
500 }
501 """
502
503 - def __init__(self, connection, name, transactional):
504 self.connection = connection
505 self.name = name
506 self.log_id = "%x" % id(self)
507
508 self.transactional = transactional
509
510 self.committing = False
511 self.committed = True
512 self.aborting = False
513 self.aborted = False
514
515 self.next_sender_id = 0
516 self.senders = []
517 self.next_receiver_id = 0
518 self.receivers = []
519 self.outgoing = []
520 self.incoming = []
521 self.unacked = []
522 self.acked = []
523
524 self.ack_capacity = UNLIMITED
525
526 self.error = None
527 self.closing = False
528 self.closed = False
529
530 self._lock = connection._lock
531
533 return "<Session %s>" % self.name
534
535 - def _wait(self, predicate, timeout=None):
537
540
542 self.connection.check_error()
543 if self.error:
544 raise self.error
545
547 err = self.connection.get_error()
548 if err:
549 return err
550 else:
551 return self.error
552
553 - def _ewait(self, predicate, timeout=None):
554 result = self.connection._ewait(lambda: self.error or predicate(), timeout)
555 self.check_error()
556 return result
557
561
562 @synchronized
563 - def sender(self, target, **options):
564 """
565 Creates a L{Sender} that may be used to send L{Messages<Message>}
566 to the specified target.
567
568 @type target: str
569 @param target: the target to which messages will be sent
570 @rtype: Sender
571 @return: a new Sender for the specified target
572 """
573 target = _mangle(target)
574 sender = Sender(self, self.next_sender_id, target, options)
575 self.next_sender_id += 1
576 self.senders.append(sender)
577 if not self.closed and self.connection._connected:
578 self._wakeup()
579 try:
580 sender._ewait(lambda: sender.linked)
581 except LinkError, e:
582 sender.close()
583 raise e
584 return sender
585
586 @synchronized
588 """
589 Creates a receiver that may be used to fetch L{Messages<Message>}
590 from the specified source.
591
592 @type source: str
593 @param source: the source of L{Messages<Message>}
594 @rtype: Receiver
595 @return: a new Receiver for the specified source
596 """
597 source = _mangle(source)
598 receiver = Receiver(self, self.next_receiver_id, source, options)
599 self.next_receiver_id += 1
600 self.receivers.append(receiver)
601 if not self.closed and self.connection._connected:
602 self._wakeup()
603 try:
604 receiver._ewait(lambda: receiver.linked)
605 except LinkError, e:
606 receiver.close()
607 raise e
608 return receiver
609
610 @synchronized
612 result = 0
613 for msg in self.incoming:
614 if predicate(msg):
615 result += 1
616 return result
617
618 - def _peek(self, receiver):
619 for msg in self.incoming:
620 if msg._receiver == receiver:
621 return msg
622
623 - def _pop(self, receiver):
624 i = 0
625 while i < len(self.incoming):
626 msg = self.incoming[i]
627 if msg._receiver == receiver:
628 del self.incoming[i]
629 return msg
630 else:
631 i += 1
632
633 @synchronized
634 - def _get(self, receiver, timeout=None):
635 if self._ewait(lambda: ((self._peek(receiver) is not None) or
636 self.closing or receiver.closed),
637 timeout):
638 msg = self._pop(receiver)
639 if msg is not None:
640 msg._receiver.returned += 1
641 self.unacked.append(msg)
642 log.debug("RETR[%s]: %s", self.log_id, msg)
643 return msg
644 return None
645
646 @synchronized
648 if self._ecwait(lambda: self.incoming, timeout):
649 return self.incoming[0]._receiver
650 else:
651 raise Empty
652
653 @synchronized
654 - def acknowledge(self, message=None, disposition=None, sync=True):
655 """
656 Acknowledge the given L{Message}. If message is None, then all
657 unacknowledged messages on the session are acknowledged.
658
659 @type message: Message
660 @param message: the message to acknowledge or None
661 @type sync: boolean
662 @param sync: if true then block until the message(s) are acknowledged
663 """
664 if message is None:
665 messages = self.unacked[:]
666 else:
667 messages = [message]
668
669 for m in messages:
670 if self.ack_capacity is not UNLIMITED:
671 if self.ack_capacity <= 0:
672
673 raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
674 self._wakeup()
675 self._ecwait(lambda: len(self.acked) < self.ack_capacity)
676 m._disposition = disposition
677 self.unacked.remove(m)
678 self.acked.append(m)
679
680 self._wakeup()
681 if sync:
682 self._ecwait(lambda: not [m for m in messages if m in self.acked])
683
684 @synchronized
686 """
687 Commit outstanding transactional work. This consists of all
688 message sends and receives since the prior commit or rollback.
689 """
690 if not self.transactional:
691 raise NontransactionalSession()
692 self.committing = True
693 self._wakeup()
694 self._ecwait(lambda: not self.committing)
695 if self.aborted:
696 raise TransactionAborted()
697 assert self.committed
698
699 @synchronized
701 """
702 Rollback outstanding transactional work. This consists of all
703 message sends and receives since the prior commit or rollback.
704 """
705 if not self.transactional:
706 raise NontransactionalSession()
707 self.aborting = True
708 self._wakeup()
709 self._ecwait(lambda: not self.aborting)
710 assert self.aborted
711
712 @synchronized
713 - def sync(self, timeout=None):
714 """
715 Sync the session.
716 """
717 for snd in self.senders:
718 snd.sync(timeout=timeout)
719 if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout):
720 raise Timeout("session sync timed out")
721
722 @synchronized
723 - def close(self, timeout=None):
741
743 if addr and addr.startswith("#"):
744 return str(uuid4()) + addr
745 else:
746 return addr
747
749
750 """
751 Sends outgoing messages.
752 """
753
754 - def __init__(self, session, id, target, options):
755 self.session = session
756 self.id = id
757 self.target = target
758 self.options = options
759 self.capacity = options.get("capacity", UNLIMITED)
760 self.threshold = 0.5
761 self.durable = options.get("durable")
762 self.queued = Serial(0)
763 self.synced = Serial(0)
764 self.acked = Serial(0)
765 self.error = None
766 self.linked = False
767 self.closing = False
768 self.closed = False
769 self._lock = self.session._lock
770
773
778
780 err = self.session.get_error()
781 if err:
782 return err
783 else:
784 return self.error
785
786 - def _ewait(self, predicate, timeout=None):
790
794
795 @synchronized
797 """
798 Returns the number of messages awaiting acknowledgment.
799 @rtype: int
800 @return: the number of unacknowledged messages
801 """
802 return self.queued - self.acked
803
804 @synchronized
810
811 @synchronized
812 - def send(self, object, sync=True, timeout=None):
813 """
814 Send a message. If the object passed in is of type L{unicode},
815 L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
816 L{Message} and sent. If it is of type L{Message}, it will be sent
817 directly. If the sender capacity is not L{UNLIMITED} then send
818 will block until there is available capacity to send the message.
819 If the timeout parameter is specified, then send will throw an
820 L{InsufficientCapacity} exception if capacity does not become
821 available within the specified time.
822
823 @type object: unicode, str, list, dict, Message
824 @param object: the message or content to send
825
826 @type sync: boolean
827 @param sync: if true then block until the message is sent
828
829 @type timeout: float
830 @param timeout: the time to wait for available capacity
831 """
832
833 if not self.session.connection._connected or self.session.closing:
834 raise Detached()
835
836 self._ecwait(lambda: self.linked)
837
838 if isinstance(object, Message):
839 message = object
840 else:
841 message = Message(object)
842
843 if message.durable is None:
844 message.durable = self.durable
845
846 if self.capacity is not UNLIMITED:
847 if self.capacity <= 0:
848 raise InsufficientCapacity("capacity = %s" % self.capacity)
849 if not self._ecwait(self.available, timeout=timeout):
850 raise InsufficientCapacity("capacity = %s" % self.capacity)
851
852
853 message._sender = self
854 if self.capacity is not UNLIMITED:
855 message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
856 else:
857 message._sync = sync
858 self.session.outgoing.append(message)
859 self.queued += 1
860
861 if sync:
862 self.sync()
863 assert message not in self.session.outgoing
864 else:
865 self._wakeup()
866
867 @synchronized
868 - def sync(self, timeout=None):
869 mno = self.queued
870 if self.synced < mno:
871 self.synced = mno
872 self._wakeup()
873 if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
874 raise Timeout("sender sync timed out")
875
876 @synchronized
877 - def close(self, timeout=None):
878 """
879 Close the Sender.
880 """
881
882
883 if self.acked < self.queued:
884 self.sync(timeout=timeout)
885
886 if not self.closing:
887 self.closing = True
888 self._wakeup()
889
890 try:
891 if not self.session._ewait(lambda: self.closed, timeout=timeout):
892 raise Timeout("sender close timed out")
893 finally:
894 try:
895 self.session.senders.remove(self)
896 except ValueError:
897 pass
898
900
901 """
902 Receives incoming messages from a remote source. Messages may be
903 fetched with L{fetch}.
904 """
905
906 - def __init__(self, session, id, source, options):
907 self.session = session
908 self.id = id
909 self.source = source
910 self.options = options
911
912 self.granted = Serial(0)
913 self.draining = False
914 self.impending = Serial(0)
915 self.received = Serial(0)
916 self.returned = Serial(0)
917
918 self.error = None
919 self.linked = False
920 self.closing = False
921 self.closed = False
922 self._lock = self.session._lock
923 self._capacity = 0
924 self._set_capacity(options.get("capacity", 0), False)
925 self.threshold = 0.5
926
927 @synchronized
936
942
943 capacity = property(_get_capacity, _set_capacity)
944
947
952
954 err = self.session.get_error()
955 if err:
956 return err
957 else:
958 return self.error
959
960 - def _ewait(self, predicate, timeout=None):
964
968
969 @synchronized
971 """
972 Returns the number of acknowledged messages awaiting confirmation.
973 """
974 return len([m for m in self.acked if m._receiver is self])
975
976 @synchronized
978 """
979 Returns the number of messages available to be fetched by the
980 application.
981
982 @rtype: int
983 @return: the number of available messages
984 """
985 return self.received - self.returned
986
987 @synchronized
988 - def fetch(self, timeout=None):
989 """
990 Fetch and return a single message. A timeout of None will block
991 forever waiting for a message to arrive, a timeout of zero will
992 return immediately if no messages are available.
993
994 @type timeout: float
995 @param timeout: the time to wait for a message to be available
996 """
997
998 self._ecwait(lambda: self.linked)
999
1000 if self._capacity == 0:
1001 self.granted = self.returned + 1
1002 self._wakeup()
1003 self._ecwait(lambda: self.impending >= self.granted)
1004 msg = self.session._get(self, timeout=timeout)
1005 if msg is None:
1006 self.check_closed()
1007 self.draining = True
1008 self._wakeup()
1009 self._ecwait(lambda: not self.draining)
1010 self._grant()
1011 self._wakeup()
1012 msg = self.session._get(self, timeout=0)
1013 if msg is None:
1014 raise Empty()
1015 elif self._capacity not in (0, UNLIMITED.value):
1016 t = int(ceil(self.threshold * self._capacity))
1017 if self.received - self.returned <= t:
1018 self.granted = self.returned + self._capacity
1019 self._wakeup()
1020 return msg
1021
1023 if self._capacity == UNLIMITED.value:
1024 self.granted = UNLIMITED
1025 else:
1026 self.granted = self.returned + self._capacity
1027
1028 @synchronized
1029 - def close(self, timeout=None):
1030 """
1031 Close the receiver.
1032 """
1033 if not self.closing:
1034 self.closing = True
1035 self._wakeup()
1036
1037 try:
1038 if not self.session._ewait(lambda: self.closed, timeout=timeout):
1039 raise Timeout("receiver close timed out")
1040 finally:
1041 try:
1042 self.session.receivers.remove(self)
1043 except ValueError:
1044 pass
1045
1046 __all__ = ["Connection", "Session", "Sender", "Receiver"]
1047