1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import socket, struct, sys, time
21 from logging import getLogger, DEBUG
22 from qpid import compat
23 from qpid import sasl
24 from qpid.concurrency import synchronized
25 from qpid.datatypes import RangedSet, Serial
26 from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
27 FrameDecoder, SegmentDecoder, OpDecoder
28 from qpid.messaging import address, transports
29 from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
30 from qpid.messaging.exceptions import *
31 from qpid.messaging.message import get_codec, Disposition, Message
32 from qpid.ops import *
33 from qpid.selector import Selector
34 from qpid.util import URL, default
35 from qpid.validator import And, Context, List, Map, Types, Values
36 from threading import Condition, Thread
37
38 log = getLogger("qpid.messaging")
39 rawlog = getLogger("qpid.messaging.io.raw")
40 opslog = getLogger("qpid.messaging.io.ops")
43 name, subject, options = address.parse(addr)
44 if options:
45 type = options.get("node", {}).get("type")
46 else:
47 type = None
48
49 if type == "topic":
50 return ReplyTo(name, subject)
51 else:
52 return ReplyTo(None, name)
53
55 if reply_to.exchange in (None, ""):
56 return reply_to.routing_key
57 elif reply_to.routing_key is None:
58 return "%s; {node: {type: topic}}" % reply_to.exchange
59 else:
60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
66
67
68
69 DURABLE_DEFAULT=False
74 """
75 The pattern filter matches the supplied wildcard pattern against a
76 message subject.
77 """
78
81
82
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind
85
86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
87 binding_key=self.value.replace("*", "#")))
88
89 SUBJECT_DEFAULTS = {
90 "topic": "#"
91 }
92
93
94 ppid = 0
95 try:
96 ppid = os.getppid()
97 except:
98 pass
99
100 CLIENT_PROPERTIES = {"product": "qpid python client",
101 "version": "development",
102 "platform": os.name,
103 "qpid.client_process": os.path.basename(sys.argv[0]),
104 "qpid.client_pid": os.getpid(),
105 "qpid.client_ppid": ppid}
106
107 -def noop(): pass
109
111
112 - def __init__(self, driver, session, name, channel):
113 self.driver = driver
114 self.session = session
115 self.name = name
116 self.channel = channel
117 self.detached = False
118 self.committing = False
119 self.aborting = False
120
121
122 self.sent = Serial(0)
123 self.acknowledged = RangedSet()
124 self.actions = {}
125 self.min_completion = self.sent
126 self.max_completion = self.sent
127 self.results = {}
128 self.need_sync = False
129
130
131 self.received = None
132 self.executed = RangedSet()
133
134
135
136 self.destinations = {}
137
139 id = self.sent
140 self.write_cmd(query, lambda: handler(self.results.pop(id)))
141
143 for k, v in overrides.items():
144 cmd[k.replace('-', '_')] = v
145
146 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
147 if overrides:
148 self.apply_overrides(cmd, overrides)
149
150 if action != noop:
151 cmd.sync = sync
152 if self.detached:
153 raise Exception("detached")
154 cmd.id = self.sent
155 self.sent += 1
156 self.actions[cmd.id] = action
157 self.max_completion = cmd.id
158 self.write_op(cmd)
159 self.need_sync = not cmd.sync
160
162 if cmds:
163 for cmd in cmds[:-1]:
164 self.write_cmd(cmd)
165 self.write_cmd(cmds[-1], action)
166 else:
167 action()
168
172
173 POLICIES = Values("always", "sender", "receiver", "never")
174 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
175 "exactly-once")
176
177 DECLARE = Map({}, restricted=False)
178 BINDINGS = List(Map({
179 "exchange": Types(basestring),
180 "queue": Types(basestring),
181 "key": Types(basestring),
182 "arguments": Map({}, restricted=False)
183 }))
184
185 COMMON_OPTS = {
186 "create": POLICIES,
187 "delete": POLICIES,
188 "assert": POLICIES,
189 "node": Map({
190 "type": Values("queue", "topic"),
191 "durable": Types(bool),
192 "x-declare": DECLARE,
193 "x-bindings": BINDINGS
194 }),
195 "link": Map({
196 "name": Types(basestring),
197 "durable": Types(bool),
198 "reliability": RELIABILITY,
199 "x-declare": DECLARE,
200 "x-bindings": BINDINGS,
201 "x-subscribe": Map({}, restricted=False)
202 })
203 }
204
205 RECEIVE_MODES = Values("browse", "consume")
206
207 SOURCE_OPTS = COMMON_OPTS.copy()
208 SOURCE_OPTS.update({
209 "mode": RECEIVE_MODES
210 })
211
212 TARGET_OPTS = COMMON_OPTS.copy()
215
216 ADDR_NAME = "source"
217 DIR_NAME = "receiver"
218 VALIDATOR = Map(SOURCE_OPTS)
219
221 _rcv.destination = str(rcv.id)
222 sst.destinations[_rcv.destination] = _rcv
223 _rcv.draining = False
224 _rcv.bytes_open = False
225 _rcv.on_unlink = []
226
227 - def do_link(self, sst, rcv, _rcv, type, subtype, action):
228 link_opts = _rcv.options.get("link", {})
229 if type == "topic":
230 default_reliability = "unreliable"
231 else:
232 default_reliability = "at-least-once"
233 reliability = link_opts.get("reliability", default_reliability)
234 declare = link_opts.get("x-declare", {})
235 subscribe = link_opts.get("x-subscribe", {})
236 acq_mode = acquire_mode.pre_acquired
237 if reliability in ("unreliable", "at-most-once"):
238 rcv._accept_mode = accept_mode.none
239 else:
240 rcv._accept_mode = accept_mode.explicit
241
242 if type == "topic":
243 default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
244 _rcv._queue = link_opts.get("name", default_name)
245 sst.write_cmd(QueueDeclare(queue=_rcv._queue,
246 durable=link_opts.get("durable", False),
247 exclusive=True,
248 auto_delete=(reliability == "unreliable")),
249 overrides=declare)
250 _rcv.on_unlink = [QueueDelete(_rcv._queue)]
251 subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
252 bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
253 if not bindings:
254 sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
255
256 elif type == "queue":
257 _rcv._queue = _rcv.name
258 if _rcv.options.get("mode", "consume") == "browse":
259 acq_mode = acquire_mode.not_acquired
260 bindings = get_bindings(link_opts, queue=_rcv._queue)
261
262
263 sst.write_cmds(bindings)
264 sst.write_cmd(MessageSubscribe(queue=_rcv._queue,
265 destination=_rcv.destination,
266 acquire_mode = acq_mode,
267 accept_mode = rcv._accept_mode),
268 overrides=subscribe)
269 sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
270
272 link_opts = _rcv.options.get("link", {})
273 reliability = link_opts.get("reliability")
274 cmds = [MessageCancel(_rcv.destination)]
275 cmds.extend(_rcv.on_unlink)
276 sst.write_cmds(cmds, action)
277
279 del sst.destinations[_rcv.destination]
280
282
283 ADDR_NAME = "target"
284 DIR_NAME = "sender"
285 VALIDATOR = Map(TARGET_OPTS)
286
288 _snd.closing = False
289 _snd.pre_ack = False
290
291 - def do_link(self, sst, snd, _snd, type, subtype, action):
292 link_opts = _snd.options.get("link", {})
293 reliability = link_opts.get("reliability", "at-least-once")
294 _snd.pre_ack = reliability in ("unreliable", "at-most-once")
295 if type == "topic":
296 _snd._exchange = _snd.name
297 _snd._routing_key = _snd.subject
298 bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
299 elif type == "queue":
300 _snd._exchange = ""
301 _snd._routing_key = _snd.name
302 bindings = get_bindings(link_opts, queue=_snd.name)
303 sst.write_cmds(bindings, action)
304
307
310
312
314 self.ttl = ttl
315 self.entries = {}
316
318 self.entries[key] = time.time(), value
319
321 tstamp, value = self.entries[key]
322 if time.time() - tstamp >= self.ttl:
323 del self.entries[key]
324 raise KeyError(key)
325 else:
326 return value
327
329 del self.entries[key]
330
331
332 HEADER="!4s4B"
333
334 EMPTY_DP = DeliveryProperties()
335 EMPTY_MP = MessageProperties()
336
337 SUBJECT = "qpid.subject"
338
339 CLOSED = "CLOSED"
340 READ_ONLY = "READ_ONLY"
341 WRITE_ONLY = "WRITE_ONLY"
342 OPEN = "OPEN"
345
347 self.connection = connection
348 self.log_id = "%x" % id(self.connection)
349 self._lock = self.connection._lock
350
351 self._selector = Selector.default()
352 self._attempts = 0
353 self._delay = self.connection.reconnect_interval_min
354 self._reconnect_log = self.connection.reconnect_log
355 self._host = 0
356 self._retrying = False
357 self._next_retry = None
358 self._transport = None
359
360 self._timeout = None
361
362 self.engine = None
363
365 urls = [URL(u) for u in self.connection.reconnect_urls]
366 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \
367 [(u.host, default(u.port, 5672)) for u in urls]
368 if self._host >= len(hosts):
369 self._host = 0
370 result = hosts[self._host]
371 if self._host == 0:
372 self._attempts += 1
373 self._host = self._host + 1
374 return result
375
377 return len(self.connection.reconnect_urls) + 1
378
379 @synchronized
383
385 self._selector.register(self)
386
388 self._selector.unregister(self)
389 if self._transport:
390 self.st_closed()
391
393 return self._transport.fileno()
394
395 @synchronized
397 return self._transport is not None and \
398 self._transport.reading(True)
399
400 @synchronized
402 return self._transport is not None and \
403 self._transport.writing(self.engine.pending())
404
405 @synchronized
408
409 @synchronized
426
428 if self.connection.error:
429 self.connection._condition.gc()
430 self.connection._waiter.notifyAll()
431
433 if e is None:
434 e = ConnectionError(text="connection aborted")
435
436 if (self.connection.reconnect and
437 (self.connection.reconnect_limit is None or
438 self.connection.reconnect_limit <= 0 or
439 self._attempts <= self.connection.reconnect_limit)):
440 if self._host < self._num_hosts():
441 delay = 0
442 else:
443 delay = self._delay
444 self._delay = min(2*self._delay,
445 self.connection.reconnect_interval_max)
446 self._next_retry = time.time() + delay
447 if self._reconnect_log:
448 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
449 if delay > 0:
450 log.warn("sleeping %s seconds" % delay)
451 self._retrying = True
452 self.engine.close()
453 else:
454 self.engine.close(e)
455
456 self.schedule()
457
461
463
464
465 self._transport.close()
466 self._transport = None
467 self.engine = None
468 return True
469
472
473 @synchronized
475 notify = False
476 try:
477 n = self._transport.send(self.engine.peek())
478 if n == 0: return
479 sent = self.engine.read(n)
480 rawlog.debug("SENT[%s]: %r", self.log_id, sent)
481 except socket.error, e:
482 self.close_engine(e)
483 notify = True
484
485 if self.update_status() or notify:
486 self._notify()
487
488 @synchronized
493
495 times = []
496 if self.connection.heartbeat:
497 times.append(time.time() + self.connection.heartbeat)
498 if self._next_retry:
499 times.append(self._next_retry)
500 if times:
501 self._timeout = min(times)
502 else:
503 self._timeout = None
504
506 try:
507 if self._transport is None:
508 if self.connection._connected and not self.connection.error:
509 self.connect()
510 else:
511 self.engine.dispatch()
512 except HeartbeatTimeout, e:
513 self.close_engine(e)
514 except:
515
516 msg = compat.format_exc()
517 self.connection.error = InternalError(text=msg)
518
520 if self._retrying and time.time() < self._next_retry:
521 return
522
523 try:
524
525 host, port = self._next_host()
526 if self._retrying and self._reconnect_log:
527 log.warn("trying: %s:%s", host, port)
528 self.engine = Engine(self.connection)
529 self.engine.open()
530 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
531 trans = transports.TRANSPORTS.get(self.connection.transport)
532 if trans:
533 self._transport = trans(self.connection, host, port)
534 else:
535 raise ConnectError("no such transport: %s" % self.connection.transport)
536 if self._retrying and self._reconnect_log:
537 log.warn("reconnect succeeded: %s:%s", host, port)
538 self._next_retry = None
539 self._attempts = 0
540 self._delay = self.connection.reconnect_interval_min
541 self._retrying = False
542 self.schedule()
543 except socket.error, e:
544 self.close_engine(ConnectError(text=str(e)))
545
546 DEFAULT_DISPOSITION = Disposition(None)
547
548 -def get_bindings(opts, queue=None, exchange=None, key=None):
549 bindings = opts.get("x-bindings", [])
550 cmds = []
551 for b in bindings:
552 exchange = b.get("exchange", exchange)
553 queue = b.get("queue", queue)
554 key = b.get("key", key)
555 args = b.get("arguments", {})
556 cmds.append(ExchangeBind(queue, exchange, key, args))
557 return cmds
558
559 CONNECTION_ERRS = {
560
561
562 }
563
564 SESSION_ERRS = {
565
566 error_code.unauthorized_access: UnauthorizedAccess,
567 error_code.not_found: NotFound,
568 error_code.resource_locked: ReceiverError,
569 error_code.resource_limit_exceeded: TargetCapacityExceeded,
570 error_code.internal_error: ServerError
571 }
574
576 self.connection = connection
577 self.log_id = "%x" % id(self.connection)
578 self._closing = False
579 self._connected = False
580 self._attachments = {}
581
582 self._in = LinkIn()
583 self._out = LinkOut()
584
585 self._channel_max = 65536
586 self._channels = 0
587 self._sessions = {}
588
589 self.address_cache = Cache(self.connection.address_ttl)
590
591 self._status = CLOSED
592 self._buf = ""
593 self._hdr = ""
594 self._last_in = None
595 self._last_out = None
596 self._op_enc = OpEncoder()
597 self._seg_enc = SegmentEncoder()
598 self._frame_enc = FrameEncoder()
599 self._frame_dec = FrameDecoder()
600 self._seg_dec = SegmentDecoder()
601 self._op_dec = OpDecoder()
602
603 self._sasl = sasl.Client()
604 if self.connection.username:
605 self._sasl.setAttr("username", self.connection.username)
606 if self.connection.password:
607 self._sasl.setAttr("password", self.connection.password)
608 if self.connection.host:
609 self._sasl.setAttr("host", self.connection.host)
610 self._sasl.setAttr("service", self.connection.sasl_service)
611 if self.connection.sasl_min_ssf is not None:
612 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf)
613 if self.connection.sasl_max_ssf is not None:
614 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf)
615 self._sasl.init()
616 self._sasl_encode = False
617 self._sasl_decode = False
618
620 self.connection._transport_connected = False
621
622 for ssn in self.connection.sessions.values():
623 for m in ssn.acked + ssn.unacked + ssn.incoming:
624 m._transfer_id = None
625 for snd in ssn.senders:
626 snd.linked = False
627 for rcv in ssn.receivers:
628 rcv.impending = rcv.received
629 rcv.linked = False
630
633
635 self._last_in = time.time()
636 try:
637 if self._sasl_decode:
638 data = self._sasl.decode(data)
639
640 if len(self._hdr) < 8:
641 r = 8 - len(self._hdr)
642 self._hdr += data[:r]
643 data = data[r:]
644
645 if len(self._hdr) == 8:
646 self.do_header(self._hdr)
647
648 self._frame_dec.write(data)
649 self._seg_dec.write(*self._frame_dec.read())
650 self._op_dec.write(*self._seg_dec.read())
651 for op in self._op_dec.read():
652 self.assign_id(op)
653 opslog.debug("RCVD[%s]: %r", self.log_id, op)
654 op.dispatch(self)
655 self.dispatch()
656 except MessagingError, e:
657 self.close(e)
658 except:
659 self.close(InternalError(text=compat.format_exc()))
660
661 - def close(self, e=None):
662 self._reset()
663 if e:
664 self.connection.error = e
665 self._status = CLOSED
666
668 if isinstance(op, Command):
669 sst = self.get_sst(op)
670 op.id = sst.received
671 sst.received += 1
672
674 return len(self._buf)
675
677 result = self._buf[:n]
678 self._buf = self._buf[n:]
679 return result
680
683
685 opslog.debug("SENT[%s]: %r", self.log_id, op)
686 self._op_enc.write(op)
687 self._seg_enc.write(*self._op_enc.read())
688 self._frame_enc.write(*self._seg_enc.read())
689 bytes = self._frame_enc.read()
690 if self._sasl_encode:
691 bytes = self._sasl.encode(bytes)
692 self._buf += bytes
693 self._last_out = time.time()
694
696 cli_major = 0; cli_minor = 10
697 magic, _, _, major, minor = struct.unpack(HEADER, hdr)
698 if major != cli_major or minor != cli_minor:
699 raise VersionError(text="client: %s-%s, server: %s-%s" %
700 (cli_major, cli_minor, major, minor))
701
703 if self.connection.sasl_mechanisms:
704 permitted = self.connection.sasl_mechanisms.split()
705 mechs = [m for m in start.mechanisms if m in permitted]
706 else:
707 mechs = start.mechanisms
708 try:
709 mech, initial = self._sasl.start(" ".join(mechs))
710 except sasl.SASLError, e:
711 raise AuthenticationFailure(text=str(e))
712
713 client_properties = CLIENT_PROPERTIES.copy()
714 client_properties.update(self.connection.client_properties)
715 self.write_op(ConnectionStartOk(client_properties=client_properties,
716 mechanism=mech, response=initial))
717
719 resp = self._sasl.step(secure.challenge)
720 self.write_op(ConnectionSecureOk(response=resp))
721
723
724 if tune.channel_max is not None:
725 self.channel_max = tune.channel_max
726 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
727 channel_max=self.channel_max))
728 self.write_op(ConnectionOpen())
729 self._sasl_encode = True
730
732 self.connection.auth_username = self._sasl.auth_username()
733 self._connected = True
734 self._sasl_decode = True
735 self.connection._transport_connected = True
736
739
745
746
747
748
749
750
753
756
758 sst = self.get_sst(cp)
759 sst.received = cp.command_id
760
762 sst = self.get_sst(sc)
763 for r in sc.commands:
764 sst.acknowledged.add(r.lower, r.upper)
765
766 if not sc.commands.empty():
767 while sst.min_completion in sc.commands:
768 if sst.actions.has_key(sst.min_completion):
769 sst.actions.pop(sst.min_completion)()
770 sst.min_completion += 1
771
773 sst = self.get_sst(kcmp)
774 executed = RangedSet()
775 for e in sst.executed.ranges:
776 for ke in kcmp.ranges:
777 if e.lower in ke and e.upper in ke:
778 break
779 else:
780 executed.add_range(e)
781 sst.executed = completed
782
784 sst = self.get_sst(sf)
785 if sf.expected:
786 if sst.received is None:
787 exp = None
788 else:
789 exp = RangedSet(sst.received)
790 sst.write_op(SessionExpected(exp))
791 if sf.confirmed:
792 sst.write_op(SessionConfirmed(sst.executed))
793 if sf.completed:
794 sst.write_op(SessionCompleted(sst.executed))
795
799
801 sst = self.get_sst(er)
802 sst.results[er.command_id] = er.value
803 sst.executed.add(er.id)
804
809
811 if not self.connection._connected and not self._closing and self._status != CLOSED:
812 self.disconnect()
813
814 if self._connected and not self._closing:
815 for ssn in self.connection.sessions.values():
816 self.attach(ssn)
817 self.process(ssn)
818
819 if self.connection.heartbeat and self._status != CLOSED:
820 now = time.time()
821 if self._last_in is not None and \
822 now - self._last_in > 2*self.connection.heartbeat:
823 raise HeartbeatTimeout(text="heartbeat timeout")
824 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
825 self.write_op(ConnectionHeartbeat())
826
828 self._reset()
829 self._status = OPEN
830 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
831
833 self.write_op(ConnectionClose(close_code.normal))
834 self._closing = True
835
837 if ssn.closed: return
838 sst = self._attachments.get(ssn)
839 if sst is None:
840 for i in xrange(0, self.channel_max):
841 if not self._sessions.has_key(i):
842 ch = i
843 break
844 else:
845 raise RuntimeError("all channels used")
846 sst = SessionState(self, ssn, ssn.name, ch)
847 sst.write_op(SessionAttach(name=ssn.name))
848 sst.write_op(SessionCommandPoint(sst.sent, 0))
849 sst.outgoing_idx = 0
850 sst.acked = []
851 sst.acked_idx = 0
852 if ssn.transactional:
853 sst.write_cmd(TxSelect())
854 self._attachments[ssn] = sst
855 self._sessions[sst.channel] = sst
856
857 for snd in ssn.senders:
858 self.link(snd, self._out, snd.target)
859 for rcv in ssn.receivers:
860 self.link(rcv, self._in, rcv.source)
861
862 if sst is not None and ssn.closing and not sst.detached:
863 sst.detached = True
864 sst.write_op(SessionDetach(name=ssn.name))
865
867 return self._sessions[op.channel]
868
870 sst = self._sessions.pop(dtc.channel)
871 ssn = sst.session
872 del self._attachments[ssn]
873 ssn.closed = True
874
879
880 - def link(self, lnk, dir, addr):
881 sst = self._attachments.get(lnk.session)
882 _lnk = self._attachments.get(lnk)
883
884 if _lnk is None and not lnk.closed:
885 _lnk = Attachment(lnk)
886 _lnk.closing = False
887 dir.init_link(sst, lnk, _lnk)
888
889 err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
890 if err:
891 lnk.error = err
892 lnk.closed = True
893 return
894
895 def linked():
896 lnk.linked = True
897
898 def resolved(type, subtype):
899 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
900
901 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved)
902 self._attachments[lnk] = _lnk
903
904 if lnk.linked and lnk.closing and not lnk.closed:
905 if not _lnk.closing:
906 def unlinked():
907 dir.del_link(sst, lnk, _lnk)
908 del self._attachments[lnk]
909 lnk.closed = True
910 if _lnk.options.get("delete") in ("always", dir.DIR_NAME):
911 dir.do_unlink(sst, lnk, _lnk)
912 self.delete(sst, _lnk.name, unlinked)
913 else:
914 dir.do_unlink(sst, lnk, _lnk, unlinked)
915 _lnk.closing = True
916 elif not lnk.linked and lnk.closing and not lnk.closed:
917 if lnk.error: lnk.closed = True
918
932
934 ctx = Context()
935 err = dir.VALIDATOR.validate(lnk.options, ctx)
936 if err: return InvalidOption(text="error in options: %s" % err)
937
939 declare = lnk.options.get("create") in ("always", dir)
940 assrt = lnk.options.get("assert") in ("always", dir)
941 def do_resolved(type, subtype):
942 err = None
943 if type is None:
944 if declare:
945 err = self.declare(sst, lnk, action)
946 else:
947 err = NotFound(text="no such queue: %s" % lnk.name)
948 else:
949 if assrt:
950 expected = lnk.options.get("node", {}).get("type")
951 if expected and type != expected:
952 err = AssertionFailed(text="expected %s, got %s" % (expected, type))
953 if err is None:
954 action(type, subtype)
955
956 if err:
957 tgt = lnk.target
958 tgt.error = err
959 del self._attachments[tgt]
960 tgt.closed = True
961 return
962 self.resolve(sst, lnk.name, do_resolved, force=declare)
963
964 - def resolve(self, sst, name, action, force=False):
965 if not force:
966 try:
967 type, subtype = self.address_cache[name]
968 action(type, subtype)
969 return
970 except KeyError:
971 pass
972
973 args = []
974 def do_result(r):
975 args.append(r)
976 def do_action(r):
977 do_result(r)
978 er, qr = args
979 if er.not_found and not qr.queue:
980 type, subtype = None, None
981 elif qr.queue:
982 type, subtype = "queue", None
983 else:
984 type, subtype = "topic", er.type
985 if type is not None:
986 self.address_cache[name] = (type, subtype)
987 action(type, subtype)
988 sst.write_query(ExchangeQuery(name), do_result)
989 sst.write_query(QueueQuery(name), do_action)
990
991 - def declare(self, sst, lnk, action):
992 name = lnk.name
993 props = lnk.options.get("node", {})
994 durable = props.get("durable", DURABLE_DEFAULT)
995 type = props.get("type", "queue")
996 declare = props.get("x-declare", {})
997
998 if type == "topic":
999 cmd = ExchangeDeclare(exchange=name, durable=durable)
1000 bindings = get_bindings(props, exchange=name)
1001 elif type == "queue":
1002 cmd = QueueDeclare(queue=name, durable=durable)
1003 bindings = get_bindings(props, queue=name)
1004 else:
1005 raise ValueError(type)
1006
1007 sst.apply_overrides(cmd, declare)
1008
1009 if type == "topic":
1010 if cmd.type is None:
1011 cmd.type = "topic"
1012 subtype = cmd.type
1013 else:
1014 subtype = None
1015
1016 cmds = [cmd]
1017 cmds.extend(bindings)
1018
1019 def declared():
1020 self.address_cache[name] = (type, subtype)
1021 action(type, subtype)
1022
1023 sst.write_cmds(cmds, declared)
1024
1025 - def delete(self, sst, name, action):
1026 def deleted():
1027 del self.address_cache[name]
1028 action()
1029
1030 def do_delete(type, subtype):
1031 if type == "topic":
1032 sst.write_cmd(ExchangeDelete(name), deleted)
1033 elif type == "queue":
1034 sst.write_cmd(QueueDelete(name), deleted)
1035 elif type is None:
1036 action()
1037 else:
1038 raise ValueError(type)
1039 self.resolve(sst, name, do_delete, force=True)
1040
1042 if ssn.closed or ssn.closing: return
1043
1044 sst = self._attachments[ssn]
1045
1046 while sst.outgoing_idx < len(ssn.outgoing):
1047 msg = ssn.outgoing[sst.outgoing_idx]
1048 snd = msg._sender
1049
1050 _snd = self._attachments.get(snd)
1051 if _snd and snd.linked:
1052 self.send(snd, msg)
1053 sst.outgoing_idx += 1
1054 else:
1055 break
1056
1057 for snd in ssn.senders:
1058
1059 if snd.synced >= snd.queued and sst.need_sync:
1060 sst.write_cmd(ExecutionSync(), sync_noop)
1061
1062 for rcv in ssn.receivers:
1063 self.process_receiver(rcv)
1064
1065 if ssn.acked:
1066 messages = ssn.acked[sst.acked_idx:]
1067 if messages:
1068 ids = RangedSet()
1069
1070 disposed = [(DEFAULT_DISPOSITION, [])]
1071 acked = []
1072 for m in messages:
1073
1074
1075 if m._transfer_id is None:
1076 acked.append(m)
1077 continue
1078 ids.add(m._transfer_id)
1079 if m._receiver._accept_mode is accept_mode.explicit:
1080 disp = m._disposition or DEFAULT_DISPOSITION
1081 last, msgs = disposed[-1]
1082 if disp.type is last.type and disp.options == last.options:
1083 msgs.append(m)
1084 else:
1085 disposed.append((disp, [m]))
1086 else:
1087 acked.append(m)
1088
1089 for range in ids:
1090 sst.executed.add_range(range)
1091 sst.write_op(SessionCompleted(sst.executed))
1092
1093 def ack_acker(msgs):
1094 def ack_ack():
1095 for m in msgs:
1096 ssn.acked.remove(m)
1097 sst.acked_idx -= 1
1098
1099 if not ssn.transactional:
1100 sst.acked.remove(m)
1101 return ack_ack
1102
1103 for disp, msgs in disposed:
1104 if not msgs: continue
1105 if disp.type is None:
1106 op = MessageAccept
1107 elif disp.type is RELEASED:
1108 op = MessageRelease
1109 elif disp.type is REJECTED:
1110 op = MessageReject
1111 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
1112 **disp.options),
1113 ack_acker(msgs))
1114 if log.isEnabledFor(DEBUG):
1115 for m in msgs:
1116 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
1117
1118 sst.acked.extend(messages)
1119 sst.acked_idx += len(messages)
1120 ack_acker(acked)()
1121
1122 if ssn.committing and not sst.committing:
1123 def commit_ok():
1124 del sst.acked[:]
1125 ssn.committing = False
1126 ssn.committed = True
1127 ssn.aborting = False
1128 ssn.aborted = False
1129 sst.committing = False
1130 sst.write_cmd(TxCommit(), commit_ok)
1131 sst.committing = True
1132
1133 if ssn.aborting and not sst.aborting:
1134 sst.aborting = True
1135 def do_rb():
1136 messages = sst.acked + ssn.unacked + ssn.incoming
1137 ids = RangedSet(*[m._transfer_id for m in messages])
1138 for range in ids:
1139 sst.executed.add_range(range)
1140 sst.write_op(SessionCompleted(sst.executed))
1141 sst.write_cmd(MessageRelease(ids, True))
1142 sst.write_cmd(TxRollback(), do_rb_ok)
1143
1144 def do_rb_ok():
1145 del ssn.incoming[:]
1146 del ssn.unacked[:]
1147 del sst.acked[:]
1148
1149 for rcv in ssn.receivers:
1150 rcv.impending = rcv.received
1151 rcv.returned = rcv.received
1152
1153
1154 for rcv in ssn.receivers:
1155 self.process_receiver(rcv)
1156
1157 ssn.aborting = False
1158 ssn.aborted = True
1159 ssn.committing = False
1160 ssn.committed = False
1161 sst.aborting = False
1162
1163 for rcv in ssn.receivers:
1164 _rcv = self._attachments[rcv]
1165 sst.write_cmd(MessageStop(_rcv.destination))
1166 sst.write_cmd(ExecutionSync(), do_rb)
1167
1169 sst = self._attachments[rcv.session]
1170 _rcv = self._attachments.get(rcv)
1171 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining:
1172 return
1173
1174 if rcv.granted is UNLIMITED:
1175 if rcv.impending is UNLIMITED:
1176 delta = 0
1177 else:
1178 delta = UNLIMITED
1179 elif rcv.impending is UNLIMITED:
1180 delta = -1
1181 else:
1182 delta = max(rcv.granted, rcv.received) - rcv.impending
1183
1184 if delta is UNLIMITED:
1185 if not _rcv.bytes_open:
1186 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
1187 _rcv.bytes_open = True
1188 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
1189 rcv.impending = UNLIMITED
1190 elif delta > 0:
1191 if not _rcv.bytes_open:
1192 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
1193 _rcv.bytes_open = True
1194 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
1195 rcv.impending += delta
1196 elif delta < 0 and not rcv.draining:
1197 _rcv.draining = True
1198 def do_stop():
1199 rcv.impending = rcv.received
1200 _rcv.draining = False
1201 _rcv.bytes_open = False
1202 self.grant(rcv)
1203 sst.write_cmd(MessageStop(_rcv.destination), do_stop)
1204
1205 if rcv.draining:
1206 _rcv.draining = True
1207 def do_flush():
1208 rcv.impending = rcv.received
1209 rcv.granted = rcv.impending
1210 _rcv.draining = False
1211 _rcv.bytes_open = False
1212 rcv.draining = False
1213 sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
1214
1215
1217 if rcv.closed: return
1218 self.grant(rcv)
1219
1220 - def send(self, snd, msg):
1221 sst = self._attachments[snd.session]
1222 _snd = self._attachments[snd]
1223
1224 if msg.subject is None or _snd._exchange == "":
1225 rk = _snd._routing_key
1226 else:
1227 rk = msg.subject
1228
1229 if msg.subject is None:
1230 subject = _snd.subject
1231 else:
1232 subject = msg.subject
1233
1234
1235 if msg.reply_to:
1236 rt = addr2reply_to(msg.reply_to)
1237 else:
1238 rt = None
1239 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding")
1240 dp = DeliveryProperties(routing_key=rk)
1241 mp = MessageProperties(message_id=msg.id,
1242 user_id=msg.user_id,
1243 reply_to=rt,
1244 correlation_id=msg.correlation_id,
1245 app_id = msg.properties.get("x-amqp-0-10.app-id"),
1246 content_type=msg.content_type,
1247 content_encoding=content_encoding,
1248 application_headers=msg.properties)
1249 if subject is not None:
1250 if mp.application_headers is None:
1251 mp.application_headers = {}
1252 mp.application_headers[SUBJECT] = subject
1253 if msg.durable is not None:
1254 if msg.durable:
1255 dp.delivery_mode = delivery_mode.persistent
1256 else:
1257 dp.delivery_mode = delivery_mode.non_persistent
1258 if msg.priority is not None:
1259 dp.priority = msg.priority
1260 if msg.ttl is not None:
1261 dp.ttl = long(msg.ttl*1000)
1262 enc, dec = get_codec(msg.content_type)
1263 body = enc(msg.content)
1264
1265
1266 def msg_acked():
1267
1268 snd.acked += 1
1269 m = snd.session.outgoing.pop(0)
1270 sst.outgoing_idx -= 1
1271 log.debug("RACK[%s]: %s", sst.session.log_id, msg)
1272 assert msg == m
1273
1274 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
1275 payload=body)
1276
1277 if _snd.pre_ack:
1278 sst.write_cmd(xfr)
1279 else:
1280 sst.write_cmd(xfr, msg_acked, sync=msg._sync)
1281
1282 log.debug("SENT[%s]: %s", sst.session.log_id, msg)
1283
1284 if _snd.pre_ack:
1285 msg_acked()
1286
1288 sst = self.get_sst(xfr)
1289 ssn = sst.session
1290
1291 msg = self._decode(xfr)
1292 rcv = sst.destinations[xfr.destination].target
1293 msg._receiver = rcv
1294 if rcv.impending is not UNLIMITED:
1295 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
1296 rcv.received += 1
1297 log.debug("RCVD[%s]: %s", ssn.log_id, msg)
1298 ssn.incoming.append(msg)
1299
1301 dp = EMPTY_DP
1302 mp = EMPTY_MP
1303
1304 for h in xfr.headers:
1305 if isinstance(h, DeliveryProperties):
1306 dp = h
1307 elif isinstance(h, MessageProperties):
1308 mp = h
1309
1310 ap = mp.application_headers
1311 enc, dec = get_codec(mp.content_type)
1312 content = dec(xfr.payload)
1313 msg = Message(content)
1314 msg.id = mp.message_id
1315 if ap is not None:
1316 msg.subject = ap.get(SUBJECT)
1317 msg.user_id = mp.user_id
1318 if mp.reply_to is not None:
1319 msg.reply_to = reply_to2addr(mp.reply_to)
1320 msg.correlation_id = mp.correlation_id
1321 if dp.delivery_mode is not None:
1322 msg.durable = dp.delivery_mode == delivery_mode.persistent
1323 msg.priority = dp.priority
1324 if dp.ttl is not None:
1325 msg.ttl = dp.ttl/1000.0
1326 msg.redelivered = dp.redelivered
1327 msg.properties = mp.application_headers or {}
1328 if mp.app_id is not None:
1329 msg.properties["x-amqp-0-10.app-id"] = mp.app_id
1330 if mp.content_encoding is not None:
1331 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding
1332 if dp.routing_key is not None:
1333 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key
1334 if dp.timestamp is not None:
1335 msg.properties["x-amqp-0-10.timestamp"] = dp.timestamp
1336 msg.content_type = mp.content_type
1337 msg._transfer_id = xfr.id
1338 return msg
1339