1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 A candidate high level messaging API for python.
22
23 Areas that still need work:
24
25 - definition of the arguments for L{Session.sender} and L{Session.receiver}
26 - standard L{Message} properties
27 - L{Message} content encoding
28 - protocol negotiation/multiprotocol impl
29 """
30
31 from logging import getLogger
32 from math import ceil
33 from qpid.codec010 import StringCodec
34 from qpid.concurrency import synchronized, Waiter, Condition
35 from qpid.datatypes import Serial, uuid4
36 from qpid.messaging.constants import *
37 from qpid.messaging.exceptions import *
38 from qpid.messaging.message import *
39 from qpid.ops import PRIMITIVE
40 from qpid.util import default, URL
41 from threading import Thread, RLock
42
43 log = getLogger("qpid.messaging")
44
45 static = staticmethod
48
49 - def _ecwait(self, predicate, timeout=None):
53
55
56 """
57 A Connection manages a group of L{Sessions<Session>} and connects
58 them with a remote endpoint.
59 """
60
61 @static
63 """
64 Constructs a L{Connection} with the supplied parameters and opens
65 it.
66 """
67 conn = Connection(url, **options)
68 conn.open()
69 return conn
70
71 - def __init__(self, url=None, **options):
72 """
73 Creates a connection. A newly created connection must be connected
74 with the Connection.connect() method before it can be used.
75
76 @type url: str
77 @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ]
78 @type host: str
79 @param host: the name or ip address of the remote host (overriden by url)
80 @type port: int
81 @param port: the port number of the remote host (overriden by url)
82 @type transport: str
83 @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls)
84 @type heartbeat: int
85 @param heartbeat: heartbeat interval in seconds
86
87 @type username: str
88 @param username: the username for authentication (overriden by url)
89 @type password: str
90 @param password: the password for authentication (overriden by url)
91
92 @type sasl_mechanisms: str
93 @param sasl_mechanisms: space separated list of permitted sasl mechanisms
94 @type sasl_service: str
95 @param sasl_service: ???
96 @type sasl_min_ssf: ???
97 @param sasl_min_ssf: ???
98 @type sasl_max_ssf: ???
99 @param sasl_max_ssf: ???
100
101 @type reconnect: bool
102 @param reconnect: enable/disable automatic reconnect
103 @type reconnect_timeout: float
104 @param reconnect_timeout: total time to attempt reconnect
105 @type reconnect_internal_min: float
106 @param reconnect_internal_min: minimum interval between reconnect attempts
107 @type reconnect_internal_max: float
108 @param reconnect_internal_max: maximum interval between reconnect attempts
109 @type reconnect_internal: float
110 @param reconnect_interval: set both min and max reconnect intervals
111 @type reconnect_limit: int
112 @param reconnect_limit: limit the total number of reconnect attempts
113 @type reconnect_urls: list[str]
114 @param reconnect_urls: list of backup hosts specified as urls
115
116 @type address_ttl: float
117 @param address_ttl: time until cached address resolution expires
118
119 @type ssl_keyfile: str
120 @param ssl_keyfile: file with client's private key (PEM format)
121 @type ssl_certfile: str
122 @param ssl_certfile: file with client's public (eventually priv+pub) key (PEM format)
123 @type ssl_trustfile: str
124 @param ssl_trustfile: file trusted certificates to validate the server
125
126 @rtype: Connection
127 @return: a disconnected Connection
128 """
129 if url is None:
130 url = options.get("host")
131 if isinstance(url, basestring):
132 url = URL(url)
133 self.host = url.host
134 if options.has_key("transport"):
135 self.transport = options.get("transport")
136 elif url.scheme == url.AMQP:
137 self.transport = "tcp"
138 elif url.scheme == url.AMQPS:
139 self.transport = "ssl"
140 else:
141 self.transport = "tcp"
142 if self.transport in ("ssl", "tcp+tls"):
143 self.port = default(url.port, options.get("port", AMQPS_PORT))
144 else:
145 self.port = default(url.port, options.get("port", AMQP_PORT))
146 self.heartbeat = options.get("heartbeat")
147 self.username = default(url.user, options.get("username", None))
148 self.password = default(url.password, options.get("password", None))
149 self.auth_username = None
150
151 self.sasl_mechanisms = options.get("sasl_mechanisms")
152 self.sasl_service = options.get("sasl_service", "qpidd")
153 self.sasl_min_ssf = options.get("sasl_min_ssf")
154 self.sasl_max_ssf = options.get("sasl_max_ssf")
155
156 self.reconnect = options.get("reconnect", False)
157 self.reconnect_timeout = options.get("reconnect_timeout")
158 reconnect_interval = options.get("reconnect_interval")
159 self.reconnect_interval_min = options.get("reconnect_interval_min",
160 default(reconnect_interval, 1))
161 self.reconnect_interval_max = options.get("reconnect_interval_max",
162 default(reconnect_interval, 2*60))
163 self.reconnect_limit = options.get("reconnect_limit")
164 self.reconnect_urls = options.get("reconnect_urls", [])
165 self.reconnect_log = options.get("reconnect_log", True)
166
167 self.address_ttl = options.get("address_ttl", 60)
168 self.tcp_nodelay = options.get("tcp_nodelay", False)
169
170 self.ssl_keyfile = options.get("ssl_keyfile", None)
171 self.ssl_certfile = options.get("ssl_certfile", None)
172 self.ssl_trustfile = options.get("ssl_trustfile", None)
173 self.client_properties = options.get("client_properties", {})
174
175 self.options = options
176
177
178 self.id = str(uuid4())
179 self.session_counter = 0
180 self.sessions = {}
181 self._open = False
182 self._connected = False
183 self._transport_connected = False
184 self._lock = RLock()
185 self._condition = Condition(self._lock)
186 self._waiter = Waiter(self._condition)
187 self._modcount = Serial(0)
188 self.error = None
189 from driver import Driver
190 self._driver = Driver(self)
191
192 - def _wait(self, predicate, timeout=None):
194
196 self._modcount += 1
197 self._driver.wakeup()
198
200 if self.error:
201 self._condition.gc()
202 raise self.error
203
206
207 - def _ewait(self, predicate, timeout=None):
211
213 if not self._connected:
214 self._condition.gc()
215 raise ConnectionClosed()
216
217 @synchronized
218 - def session(self, name=None, transactional=False):
219 """
220 Creates or retrieves the named session. If the name is omitted or
221 None, then a unique name is chosen based on a randomly generated
222 uuid.
223
224 @type name: str
225 @param name: the session name
226 @rtype: Session
227 @return: the named Session
228 """
229
230 if name is None:
231 name = "%s:%s" % (self.id, self.session_counter)
232 self.session_counter += 1
233 else:
234 name = "%s:%s" % (self.id, name)
235
236 if self.sessions.has_key(name):
237 return self.sessions[name]
238 else:
239 ssn = Session(self, name, transactional)
240 self.sessions[name] = ssn
241 self._wakeup()
242 return ssn
243
244 @synchronized
246 self.sessions.pop(ssn.name, 0)
247
248 @synchronized
250 """
251 Opens a connection.
252 """
253 if self._open:
254 raise ConnectionError("already open")
255 self._open = True
256 self.attach()
257
258 @synchronized
260 """
261 Return true if the connection is open, false otherwise.
262 """
263 return self._open
264
265 @synchronized
267 """
268 Attach to the remote endpoint.
269 """
270 if not self._connected:
271 self._connected = True
272 self._driver.start()
273 self._wakeup()
274 self._ewait(lambda: self._transport_connected and not self._unlinked())
275
277 return [l
278 for ssn in self.sessions.values()
279 if not (ssn.error or ssn.closed)
280 for l in ssn.senders + ssn.receivers
281 if not (l.linked or l.error or l.closed)]
282
283 @synchronized
284 - def detach(self, timeout=None):
285 """
286 Detach from the remote endpoint.
287 """
288 if self._connected:
289 self._connected = False
290 self._wakeup()
291 cleanup = True
292 else:
293 cleanup = False
294 try:
295 if not self._wait(lambda: not self._transport_connected, timeout=timeout):
296 raise Timeout("detach timed out")
297 finally:
298 if cleanup:
299 self._driver.stop()
300 self._condition.gc()
301
302 @synchronized
304 """
305 Return true if the connection is attached, false otherwise.
306 """
307 return self._connected
308
309 @synchronized
310 - def close(self, timeout=None):
311 """
312 Close the connection and all sessions.
313 """
314 try:
315 for ssn in self.sessions.values():
316 ssn.close(timeout=timeout)
317 finally:
318 self.detach(timeout=timeout)
319 self._open = False
320
322
323 """
324 Sessions provide a linear context for sending and receiving
325 L{Messages<Message>}. L{Messages<Message>} are sent and received
326 using the L{Sender.send} and L{Receiver.fetch} methods of the
327 L{Sender} and L{Receiver} objects associated with a Session.
328
329 Each L{Sender} and L{Receiver} is created by supplying either a
330 target or source address to the L{sender} and L{receiver} methods of
331 the Session. The address is supplied via a string syntax documented
332 below.
333
334 Addresses
335 =========
336
337 An address identifies a source or target for messages. In its
338 simplest form this is just a name. In general a target address may
339 also be used as a source address, however not all source addresses
340 may be used as a target, e.g. a source might additionally have some
341 filtering criteria that would not be present in a target.
342
343 A subject may optionally be specified along with the name. When an
344 address is used as a target, any subject specified in the address is
345 used as the default subject of outgoing messages for that target.
346 When an address is used as a source, any subject specified in the
347 address is pattern matched against the subject of available messages
348 as a filter for incoming messages from that source.
349
350 The options map contains additional information about the address
351 including:
352
353 - policies for automatically creating, and deleting the node to
354 which an address refers
355
356 - policies for asserting facts about the node to which an address
357 refers
358
359 - extension points that can be used for sender/receiver
360 configuration
361
362 Mapping to AMQP 0-10
363 --------------------
364 The name is resolved to either an exchange or a queue by querying
365 the broker.
366
367 The subject is set as a property on the message. Additionally, if
368 the name refers to an exchange, the routing key is set to the
369 subject.
370
371 Syntax
372 ------
373 The following regular expressions define the tokens used to parse
374 addresses::
375 LBRACE: \\{
376 RBRACE: \\}
377 LBRACK: \\[
378 RBRACK: \\]
379 COLON: :
380 SEMI: ;
381 SLASH: /
382 COMMA: ,
383 NUMBER: [+-]?[0-9]*\\.?[0-9]+
384 ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?
385 STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\'
386 ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]
387 SYM: [.#*%@$^!+-]
388 WSPACE: [ \\n\\r\\t]+
389
390 The formal grammar for addresses is given below::
391 address = name [ "/" subject ] [ ";" options ]
392 name = ( part | quoted )+
393 subject = ( part | quoted | "/" )*
394 quoted = STRING / ESC
395 part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM
396 options = map
397 map = "{" ( keyval ( "," keyval )* )? "}"
398 keyval = ID ":" value
399 value = NUMBER / STRING / ID / map / list
400 list = "[" ( value ( "," value )* )? "]"
401
402 This grammar resuls in the following informal syntax::
403
404 <name> [ / <subject> ] [ ; <options> ]
405
406 Where options is::
407
408 { <key> : <value>, ... }
409
410 And values may be:
411 - numbers
412 - single, double, or non quoted strings
413 - maps (dictionaries)
414 - lists
415
416 Options
417 -------
418 The options map permits the following parameters::
419
420 <name> [ / <subject> ] ; {
421 create: always | sender | receiver | never,
422 delete: always | sender | receiver | never,
423 assert: always | sender | receiver | never,
424 mode: browse | consume,
425 node: {
426 type: queue | topic,
427 durable: True | False,
428 x-declare: { ... <declare-overrides> ... },
429 x-bindings: [<binding_1>, ... <binding_n>]
430 },
431 link: {
432 name: <link-name>,
433 durable: True | False,
434 reliability: unreliable | at-most-once | at-least-once | exactly-once,
435 x-declare: { ... <declare-overrides> ... },
436 x-bindings: [<binding_1>, ... <binding_n>],
437 x-subscribe: { ... <subscribe-overrides> ... }
438 }
439 }
440
441 Bindings are specified as a map with the following options::
442
443 {
444 exchange: <exchange>,
445 queue: <queue>,
446 key: <key>,
447 arguments: <arguments>
448 }
449
450 The create, delete, and assert policies specify who should perfom
451 the associated action:
452
453 - I{always}: the action will always be performed
454 - I{sender}: the action will only be performed by the sender
455 - I{receiver}: the action will only be performed by the receiver
456 - I{never}: the action will never be performed (this is the default)
457
458 The node-type is one of:
459
460 - I{topic}: a topic node will default to the topic exchange,
461 x-declare may be used to specify other exchange types
462 - I{queue}: this is the default node-type
463
464 The x-declare map permits protocol specific keys and values to be
465 specified when exchanges or queues are declared. These keys and
466 values are passed through when creating a node or asserting facts
467 about an existing node.
468
469 Examples
470 --------
471 A simple name resolves to any named node, usually a queue or a
472 topic::
473
474 my-queue-or-topic
475
476 A simple name with a subject will also resolve to a node, but the
477 presence of the subject will cause a sender using this address to
478 set the subject on outgoing messages, and receivers to filter based
479 on the subject::
480
481 my-queue-or-topic/my-subject
482
483 A subject pattern can be used and will cause filtering if used by
484 the receiver. If used for a sender, the literal value gets set as
485 the subject::
486
487 my-queue-or-topic/my-*
488
489 In all the above cases, the address is resolved to an existing node.
490 If you want the node to be auto-created, then you can do the
491 following. By default nonexistent nodes are assumed to be queues::
492
493 my-queue; {create: always}
494
495 You can customize the properties of the queue::
496
497 my-queue; {create: always, node: {durable: True}}
498
499 You can create a topic instead if you want::
500
501 my-queue; {create: always, node: {type: topic}}
502
503 You can assert that the address resolves to a node with particular
504 properties::
505
506 my-transient-topic; {
507 assert: always,
508 node: {
509 type: topic,
510 durable: False
511 }
512 }
513 """
514
515 - def __init__(self, connection, name, transactional):
516 self.connection = connection
517 self.name = name
518 self.log_id = "%x" % id(self)
519
520 self.transactional = transactional
521
522 self.committing = False
523 self.committed = True
524 self.aborting = False
525 self.aborted = False
526
527 self.next_sender_id = 0
528 self.senders = []
529 self.next_receiver_id = 0
530 self.receivers = []
531 self.outgoing = []
532 self.incoming = []
533 self.unacked = []
534 self.acked = []
535
536 self.ack_capacity = UNLIMITED
537
538 self.error = None
539 self.closing = False
540 self.closed = False
541
542 self._lock = connection._lock
543
545 return "<Session %s>" % self.name
546
547 - def _wait(self, predicate, timeout=None):
549
552
554 self.connection.check_error()
555 if self.error:
556 raise self.error
557
559 err = self.connection.get_error()
560 if err:
561 return err
562 else:
563 return self.error
564
565 - def _ewait(self, predicate, timeout=None):
566 result = self.connection._ewait(lambda: self.error or predicate(), timeout)
567 self.check_error()
568 return result
569
573
574 @synchronized
575 - def sender(self, target, **options):
576 """
577 Creates a L{Sender} that may be used to send L{Messages<Message>}
578 to the specified target.
579
580 @type target: str
581 @param target: the target to which messages will be sent
582 @rtype: Sender
583 @return: a new Sender for the specified target
584 """
585 target = _mangle(target)
586 sender = Sender(self, self.next_sender_id, target, options)
587 self.next_sender_id += 1
588 self.senders.append(sender)
589 if not self.closed and self.connection._connected:
590 self._wakeup()
591 try:
592 sender._ewait(lambda: sender.linked)
593 except LinkError, e:
594 sender.close()
595 raise e
596 return sender
597
598 @synchronized
600 """
601 Creates a receiver that may be used to fetch L{Messages<Message>}
602 from the specified source.
603
604 @type source: str
605 @param source: the source of L{Messages<Message>}
606 @rtype: Receiver
607 @return: a new Receiver for the specified source
608 """
609 source = _mangle(source)
610 receiver = Receiver(self, self.next_receiver_id, source, options)
611 self.next_receiver_id += 1
612 self.receivers.append(receiver)
613 if not self.closed and self.connection._connected:
614 self._wakeup()
615 try:
616 receiver._ewait(lambda: receiver.linked)
617 except LinkError, e:
618 receiver.close()
619 raise e
620 return receiver
621
622 @synchronized
624 result = 0
625 for msg in self.incoming:
626 if predicate(msg):
627 result += 1
628 return result
629
630 - def _peek(self, receiver):
631 for msg in self.incoming:
632 if msg._receiver == receiver:
633 return msg
634
635 - def _pop(self, receiver):
636 i = 0
637 while i < len(self.incoming):
638 msg = self.incoming[i]
639 if msg._receiver == receiver:
640 del self.incoming[i]
641 return msg
642 else:
643 i += 1
644
645 @synchronized
646 - def _get(self, receiver, timeout=None):
647 if self._ewait(lambda: ((self._peek(receiver) is not None) or
648 self.closing or receiver.closed),
649 timeout):
650 msg = self._pop(receiver)
651 if msg is not None:
652 msg._receiver.returned += 1
653 self.unacked.append(msg)
654 log.debug("RETR[%s]: %s", self.log_id, msg)
655 return msg
656 return None
657
658 @synchronized
660 if self._ecwait(lambda: self.incoming, timeout):
661 return self.incoming[0]._receiver
662 else:
663 raise Empty
664
665 @synchronized
666 - def acknowledge(self, message=None, disposition=None, sync=True):
667 """
668 Acknowledge the given L{Message}. If message is None, then all
669 unacknowledged messages on the session are acknowledged.
670
671 @type message: Message
672 @param message: the message to acknowledge or None
673 @type sync: boolean
674 @param sync: if true then block until the message(s) are acknowledged
675 """
676 if message is None:
677 messages = self.unacked[:]
678 else:
679 messages = [message]
680
681 for m in messages:
682 if self.ack_capacity is not UNLIMITED:
683 if self.ack_capacity <= 0:
684
685 raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
686 self._wakeup()
687 self._ecwait(lambda: len(self.acked) < self.ack_capacity)
688 m._disposition = disposition
689 self.unacked.remove(m)
690 self.acked.append(m)
691
692 self._wakeup()
693 if sync:
694 self._ecwait(lambda: not [m for m in messages if m in self.acked])
695
696 @synchronized
698 """
699 Commit outstanding transactional work. This consists of all
700 message sends and receives since the prior commit or rollback.
701 """
702 if not self.transactional:
703 raise NontransactionalSession()
704 self.committing = True
705 self._wakeup()
706 self._ecwait(lambda: not self.committing)
707 if self.aborted:
708 raise TransactionAborted()
709 assert self.committed
710
711 @synchronized
713 """
714 Rollback outstanding transactional work. This consists of all
715 message sends and receives since the prior commit or rollback.
716 """
717 if not self.transactional:
718 raise NontransactionalSession()
719 self.aborting = True
720 self._wakeup()
721 self._ecwait(lambda: not self.aborting)
722 assert self.aborted
723
724 @synchronized
725 - def sync(self, timeout=None):
726 """
727 Sync the session.
728 """
729 for snd in self.senders:
730 snd.sync(timeout=timeout)
731 if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout):
732 raise Timeout("session sync timed out")
733
734 @synchronized
735 - def close(self, timeout=None):
753
755 if addr and addr.startswith("#"):
756 return str(uuid4()) + addr
757 else:
758 return addr
759
761
762 """
763 Sends outgoing messages.
764 """
765
766 - def __init__(self, session, id, target, options):
767 self.session = session
768 self.id = id
769 self.target = target
770 self.options = options
771 self.capacity = options.get("capacity", UNLIMITED)
772 self.threshold = 0.5
773 self.durable = options.get("durable")
774 self.queued = Serial(0)
775 self.synced = Serial(0)
776 self.acked = Serial(0)
777 self.error = None
778 self.linked = False
779 self.closing = False
780 self.closed = False
781 self._lock = self.session._lock
782
785
790
792 err = self.session.get_error()
793 if err:
794 return err
795 else:
796 return self.error
797
798 - def _ewait(self, predicate, timeout=None):
802
806
807 @synchronized
809 """
810 Returns the number of messages awaiting acknowledgment.
811 @rtype: int
812 @return: the number of unacknowledged messages
813 """
814 return self.queued - self.acked
815
816 @synchronized
822
823 @synchronized
824 - def send(self, object, sync=True, timeout=None):
825 """
826 Send a message. If the object passed in is of type L{unicode},
827 L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
828 L{Message} and sent. If it is of type L{Message}, it will be sent
829 directly. If the sender capacity is not L{UNLIMITED} then send
830 will block until there is available capacity to send the message.
831 If the timeout parameter is specified, then send will throw an
832 L{InsufficientCapacity} exception if capacity does not become
833 available within the specified time.
834
835 @type object: unicode, str, list, dict, Message
836 @param object: the message or content to send
837
838 @type sync: boolean
839 @param sync: if true then block until the message is sent
840
841 @type timeout: float
842 @param timeout: the time to wait for available capacity
843 """
844
845 if not self.session.connection._connected or self.session.closing:
846 raise Detached()
847
848 self._ecwait(lambda: self.linked)
849
850 if isinstance(object, Message):
851 message = object
852 else:
853 message = Message(object)
854
855 if message.durable is None:
856 message.durable = self.durable
857
858 if self.capacity is not UNLIMITED:
859 if self.capacity <= 0:
860 raise InsufficientCapacity("capacity = %s" % self.capacity)
861 if not self._ecwait(self.available, timeout=timeout):
862 raise InsufficientCapacity("capacity = %s" % self.capacity)
863
864
865 message._sender = self
866 if self.capacity is not UNLIMITED:
867 message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
868 else:
869 message._sync = sync
870 self.session.outgoing.append(message)
871 self.queued += 1
872
873 if sync:
874 self.sync()
875 assert message not in self.session.outgoing
876 else:
877 self._wakeup()
878
879 @synchronized
880 - def sync(self, timeout=None):
881 mno = self.queued
882 if self.synced < mno:
883 self.synced = mno
884 self._wakeup()
885 if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
886 raise Timeout("sender sync timed out")
887
888 @synchronized
889 - def close(self, timeout=None):
890 """
891 Close the Sender.
892 """
893
894
895 if self.acked < self.queued:
896 self.sync(timeout=timeout)
897
898 if not self.closing:
899 self.closing = True
900 self._wakeup()
901
902 try:
903 if not self.session._ewait(lambda: self.closed, timeout=timeout):
904 raise Timeout("sender close timed out")
905 finally:
906 try:
907 self.session.senders.remove(self)
908 except ValueError:
909 pass
910
912
913 """
914 Receives incoming messages from a remote source. Messages may be
915 fetched with L{fetch}.
916 """
917
918 - def __init__(self, session, id, source, options):
919 self.session = session
920 self.id = id
921 self.source = source
922 self.options = options
923
924 self.granted = Serial(0)
925 self.draining = False
926 self.impending = Serial(0)
927 self.received = Serial(0)
928 self.returned = Serial(0)
929
930 self.error = None
931 self.linked = False
932 self.closing = False
933 self.closed = False
934 self._lock = self.session._lock
935 self._capacity = 0
936 self._set_capacity(options.get("capacity", 0), False)
937 self.threshold = 0.5
938
939 @synchronized
948
954
955 capacity = property(_get_capacity, _set_capacity)
956
959
964
966 err = self.session.get_error()
967 if err:
968 return err
969 else:
970 return self.error
971
972 - def _ewait(self, predicate, timeout=None):
976
980
981 @synchronized
983 """
984 Returns the number of acknowledged messages awaiting confirmation.
985 """
986 return len([m for m in self.acked if m._receiver is self])
987
988 @synchronized
990 """
991 Returns the number of messages available to be fetched by the
992 application.
993
994 @rtype: int
995 @return: the number of available messages
996 """
997 return self.received - self.returned
998
999 @synchronized
1000 - def fetch(self, timeout=None):
1001 """
1002 Fetch and return a single message. A timeout of None will block
1003 forever waiting for a message to arrive, a timeout of zero will
1004 return immediately if no messages are available.
1005
1006 @type timeout: float
1007 @param timeout: the time to wait for a message to be available
1008 """
1009
1010 self._ecwait(lambda: self.linked)
1011
1012 if self._capacity == 0:
1013 self.granted = self.returned + 1
1014 self._wakeup()
1015 self._ecwait(lambda: self.impending >= self.granted)
1016 msg = self.session._get(self, timeout=timeout)
1017 if msg is None:
1018 self.check_closed()
1019 self.draining = True
1020 self._wakeup()
1021 self._ecwait(lambda: not self.draining)
1022 msg = self.session._get(self, timeout=0)
1023 self._grant()
1024 self._wakeup()
1025 if msg is None:
1026 raise Empty()
1027 elif self._capacity not in (0, UNLIMITED.value):
1028 t = int(ceil(self.threshold * self._capacity))
1029 if self.received - self.returned <= t:
1030 self.granted = self.returned + self._capacity
1031 self._wakeup()
1032 return msg
1033
1035 if self._capacity == UNLIMITED.value:
1036 self.granted = UNLIMITED
1037 else:
1038 self.granted = self.returned + self._capacity
1039
1040 @synchronized
1041 - def close(self, timeout=None):
1042 """
1043 Close the receiver.
1044 """
1045 if not self.closing:
1046 self.closing = True
1047 self._wakeup()
1048
1049 try:
1050 if not self.session._ewait(lambda: self.closed, timeout=timeout):
1051 raise Timeout("receiver close timed out")
1052 finally:
1053 try:
1054 self.session.receivers.remove(self)
1055 except ValueError:
1056 pass
1057
1058 __all__ = ["Connection", "Session", "Sender", "Receiver"]
1059