1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import socket, struct, sys, time
21 from logging import getLogger, DEBUG
22 from qpid import compat
23 from qpid import sasl
24 from qpid.concurrency import synchronized
25 from qpid.datatypes import RangedSet, Serial
26 from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
27 FrameDecoder, SegmentDecoder, OpDecoder
28 from qpid.messaging import address, transports
29 from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
30 from qpid.messaging.exceptions import *
31 from qpid.messaging.message import get_codec, Disposition, Message
32 from qpid.ops import *
33 from qpid.selector import Selector
34 from qpid.util import URL, default
35 from qpid.validator import And, Context, List, Map, Types, Values
36 from threading import Condition, Thread
37
38 log = getLogger("qpid.messaging")
39 rawlog = getLogger("qpid.messaging.io.raw")
40 opslog = getLogger("qpid.messaging.io.ops")
43 name, subject, options = address.parse(addr)
44 if options:
45 type = options.get("node", {}).get("type")
46 else:
47 type = None
48
49 if type == "topic":
50 return ReplyTo(name, subject)
51 else:
52 return ReplyTo(None, name)
53
55 if reply_to.exchange in (None, ""):
56 return reply_to.routing_key
57 elif reply_to.routing_key is None:
58 return "%s; {node: {type: topic}}" % reply_to.exchange
59 else:
60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
66
67
68
69 DURABLE_DEFAULT=False
74 """
75 The pattern filter matches the supplied wildcard pattern against a
76 message subject.
77 """
78
81
82
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind
85
86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
87 binding_key=self.value.replace("*", "#")))
88
89 SUBJECT_DEFAULTS = {
90 "topic": "#"
91 }
92
93
94 ppid = 0
95 try:
96 ppid = os.getppid()
97 except:
98 pass
99
100 CLIENT_PROPERTIES = {"product": "qpid python client",
101 "version": "development",
102 "platform": os.name,
103 "qpid.client_process": os.path.basename(sys.argv[0]),
104 "qpid.client_pid": os.getpid(),
105 "qpid.client_ppid": ppid}
106
107 -def noop(): pass
109
111
112 - def __init__(self, driver, session, name, channel):
113 self.driver = driver
114 self.session = session
115 self.name = name
116 self.channel = channel
117 self.detached = False
118 self.committing = False
119 self.aborting = False
120
121
122 self.sent = Serial(0)
123 self.acknowledged = RangedSet()
124 self.actions = {}
125 self.min_completion = self.sent
126 self.max_completion = self.sent
127 self.results = {}
128 self.need_sync = False
129
130
131 self.received = None
132 self.executed = RangedSet()
133
134
135
136 self.destinations = {}
137
139 id = self.sent
140 self.write_cmd(query, lambda: handler(self.results.pop(id)))
141
143 for k, v in overrides.items():
144 cmd[k.replace('-', '_')] = v
145
146 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
147 if overrides:
148 self.apply_overrides(cmd, overrides)
149
150 if action != noop:
151 cmd.sync = sync
152 if self.detached:
153 raise Exception("detached")
154 cmd.id = self.sent
155 self.sent += 1
156 self.actions[cmd.id] = action
157 self.max_completion = cmd.id
158 self.write_op(cmd)
159 self.need_sync = not cmd.sync
160
162 if cmds:
163 for cmd in cmds[:-1]:
164 self.write_cmd(cmd)
165 self.write_cmd(cmds[-1], action)
166 else:
167 action()
168
172
173 POLICIES = Values("always", "sender", "receiver", "never")
174 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
175 "exactly-once")
176
177 DECLARE = Map({}, restricted=False)
178 BINDINGS = List(Map({
179 "exchange": Types(basestring),
180 "queue": Types(basestring),
181 "key": Types(basestring),
182 "arguments": Map({}, restricted=False)
183 }))
184
185 COMMON_OPTS = {
186 "create": POLICIES,
187 "delete": POLICIES,
188 "assert": POLICIES,
189 "node": Map({
190 "type": Values("queue", "topic"),
191 "durable": Types(bool),
192 "x-declare": DECLARE,
193 "x-bindings": BINDINGS
194 }),
195 "link": Map({
196 "name": Types(basestring),
197 "durable": Types(bool),
198 "reliability": RELIABILITY,
199 "x-declare": DECLARE,
200 "x-bindings": BINDINGS,
201 "x-subscribe": Map({}, restricted=False)
202 })
203 }
204
205 RECEIVE_MODES = Values("browse", "consume")
206
207 SOURCE_OPTS = COMMON_OPTS.copy()
208 SOURCE_OPTS.update({
209 "mode": RECEIVE_MODES
210 })
211
212 TARGET_OPTS = COMMON_OPTS.copy()
215
216 ADDR_NAME = "source"
217 DIR_NAME = "receiver"
218 VALIDATOR = Map(SOURCE_OPTS)
219
221 _rcv.destination = str(rcv.id)
222 sst.destinations[_rcv.destination] = _rcv
223 _rcv.draining = False
224 _rcv.bytes_open = False
225 _rcv.on_unlink = []
226
227 - def do_link(self, sst, rcv, _rcv, type, subtype, action):
228 link_opts = _rcv.options.get("link", {})
229 reliability = link_opts.get("reliability", "at-least-once")
230 declare = link_opts.get("x-declare", {})
231 subscribe = link_opts.get("x-subscribe", {})
232 acq_mode = acquire_mode.pre_acquired
233 if reliability in ("unreliable", "at-most-once"):
234 rcv._accept_mode = accept_mode.none
235 else:
236 rcv._accept_mode = accept_mode.explicit
237
238 if type == "topic":
239 default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
240 _rcv._queue = link_opts.get("name", default_name)
241 sst.write_cmd(QueueDeclare(queue=_rcv._queue,
242 durable=link_opts.get("durable", False),
243 exclusive=True,
244 auto_delete=(reliability == "unreliable")),
245 overrides=declare)
246 _rcv.on_unlink = [QueueDelete(_rcv._queue)]
247 subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
248 bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
249 if not bindings:
250 sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
251
252 elif type == "queue":
253 _rcv._queue = _rcv.name
254 if _rcv.options.get("mode", "consume") == "browse":
255 acq_mode = acquire_mode.not_acquired
256 bindings = get_bindings(link_opts, queue=_rcv._queue)
257
258
259 sst.write_cmds(bindings)
260 sst.write_cmd(MessageSubscribe(queue=_rcv._queue,
261 destination=_rcv.destination,
262 acquire_mode = acq_mode,
263 accept_mode = rcv._accept_mode),
264 overrides=subscribe)
265 sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
266
268 link_opts = _rcv.options.get("link", {})
269 reliability = link_opts.get("reliability")
270 cmds = [MessageCancel(_rcv.destination)]
271 cmds.extend(_rcv.on_unlink)
272 sst.write_cmds(cmds, action)
273
275 del sst.destinations[_rcv.destination]
276
278
279 ADDR_NAME = "target"
280 DIR_NAME = "sender"
281 VALIDATOR = Map(TARGET_OPTS)
282
284 _snd.closing = False
285 _snd.pre_ack = False
286
287 - def do_link(self, sst, snd, _snd, type, subtype, action):
288 link_opts = _snd.options.get("link", {})
289 reliability = link_opts.get("reliability", "at-least-once")
290 _snd.pre_ack = reliability in ("unreliable", "at-most-once")
291 if type == "topic":
292 _snd._exchange = _snd.name
293 _snd._routing_key = _snd.subject
294 bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
295 elif type == "queue":
296 _snd._exchange = ""
297 _snd._routing_key = _snd.name
298 bindings = get_bindings(link_opts, queue=_snd.name)
299 sst.write_cmds(bindings, action)
300
303
306
308
310 self.ttl = ttl
311 self.entries = {}
312
314 self.entries[key] = time.time(), value
315
317 tstamp, value = self.entries[key]
318 if time.time() - tstamp >= self.ttl:
319 del self.entries[key]
320 raise KeyError(key)
321 else:
322 return value
323
325 del self.entries[key]
326
327
328 HEADER="!4s4B"
329
330 EMPTY_DP = DeliveryProperties()
331 EMPTY_MP = MessageProperties()
332
333 SUBJECT = "qpid.subject"
334
335 CLOSED = "CLOSED"
336 READ_ONLY = "READ_ONLY"
337 WRITE_ONLY = "WRITE_ONLY"
338 OPEN = "OPEN"
341
343 self.connection = connection
344 self.log_id = "%x" % id(self.connection)
345 self._lock = self.connection._lock
346
347 self._selector = Selector.default()
348 self._attempts = 0
349 self._delay = self.connection.reconnect_interval_min
350 self._reconnect_log = self.connection.reconnect_log
351 self._host = 0
352 self._retrying = False
353 self._next_retry = None
354 self._transport = None
355
356 self._timeout = None
357
358 self.engine = None
359
361 urls = [URL(u) for u in self.connection.reconnect_urls]
362 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \
363 [(u.host, default(u.port, 5672)) for u in urls]
364 if self._host >= len(hosts):
365 self._host = 0
366 result = hosts[self._host]
367 if self._host == 0:
368 self._attempts += 1
369 self._host = self._host + 1
370 return result
371
373 return len(self.connection.reconnect_urls) + 1
374
375 @synchronized
379
381 self._selector.register(self)
382
384 self._selector.unregister(self)
385 if self._transport:
386 self.st_closed()
387
389 return self._transport.fileno()
390
391 @synchronized
393 return self._transport is not None and \
394 self._transport.reading(True)
395
396 @synchronized
398 return self._transport is not None and \
399 self._transport.writing(self.engine.pending())
400
401 @synchronized
404
405 @synchronized
422
424 if self.connection.error:
425 self.connection._condition.gc()
426 self.connection._waiter.notifyAll()
427
429 if e is None:
430 e = ConnectionError(text="connection aborted")
431
432 if (self.connection.reconnect and
433 (self.connection.reconnect_limit is None or
434 self.connection.reconnect_limit <= 0 or
435 self._attempts <= self.connection.reconnect_limit)):
436 if self._host < self._num_hosts():
437 delay = 0
438 else:
439 delay = self._delay
440 self._delay = min(2*self._delay,
441 self.connection.reconnect_interval_max)
442 self._next_retry = time.time() + delay
443 if self._reconnect_log:
444 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
445 if delay > 0:
446 log.warn("sleeping %s seconds" % delay)
447 self._retrying = True
448 self.engine.close()
449 else:
450 self.engine.close(e)
451
452 self.schedule()
453
457
459
460
461 self._transport.close()
462 self._transport = None
463 self.engine = None
464 return True
465
468
469 @synchronized
471 notify = False
472 try:
473 n = self._transport.send(self.engine.peek())
474 if n == 0: return
475 sent = self.engine.read(n)
476 rawlog.debug("SENT[%s]: %r", self.log_id, sent)
477 except socket.error, e:
478 self.close_engine(e)
479 notify = True
480
481 if self.update_status() or notify:
482 self._notify()
483
484 @synchronized
489
491 times = []
492 if self.connection.heartbeat:
493 times.append(time.time() + self.connection.heartbeat)
494 if self._next_retry:
495 times.append(self._next_retry)
496 if times:
497 self._timeout = min(times)
498 else:
499 self._timeout = None
500
502 try:
503 if self._transport is None:
504 if self.connection._connected and not self.connection.error:
505 self.connect()
506 else:
507 self.engine.dispatch()
508 except HeartbeatTimeout, e:
509 self.close_engine(e)
510 except:
511
512 msg = compat.format_exc()
513 self.connection.error = InternalError(text=msg)
514
516 if self._retrying and time.time() < self._next_retry:
517 return
518
519 try:
520
521 host, port = self._next_host()
522 if self._retrying and self._reconnect_log:
523 log.warn("trying: %s:%s", host, port)
524 self.engine = Engine(self.connection)
525 self.engine.open()
526 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
527 trans = transports.TRANSPORTS.get(self.connection.transport)
528 if trans:
529 self._transport = trans(self.connection, host, port)
530 else:
531 raise ConnectError("no such transport: %s" % self.connection.transport)
532 if self._retrying and self._reconnect_log:
533 log.warn("reconnect succeeded: %s:%s", host, port)
534 self._next_retry = None
535 self._attempts = 0
536 self._host = 0
537 self._delay = self.connection.reconnect_interval_min
538 self._retrying = False
539 self.schedule()
540 except socket.error, e:
541 self.close_engine(ConnectError(text=str(e)))
542
543 DEFAULT_DISPOSITION = Disposition(None)
544
545 -def get_bindings(opts, queue=None, exchange=None, key=None):
546 bindings = opts.get("x-bindings", [])
547 cmds = []
548 for b in bindings:
549 exchange = b.get("exchange", exchange)
550 queue = b.get("queue", queue)
551 key = b.get("key", key)
552 args = b.get("arguments", {})
553 cmds.append(ExchangeBind(queue, exchange, key, args))
554 return cmds
555
556 CONNECTION_ERRS = {
557
558
559 }
560
561 SESSION_ERRS = {
562
563 error_code.unauthorized_access: UnauthorizedAccess,
564 error_code.not_found: NotFound,
565 error_code.resource_locked: ReceiverError,
566 error_code.resource_limit_exceeded: TargetCapacityExceeded,
567 error_code.internal_error: ServerError
568 }
571
573 self.connection = connection
574 self.log_id = "%x" % id(self.connection)
575 self._closing = False
576 self._connected = False
577 self._attachments = {}
578
579 self._in = LinkIn()
580 self._out = LinkOut()
581
582 self._channel_max = 65536
583 self._channels = 0
584 self._sessions = {}
585
586 self.address_cache = Cache(self.connection.address_ttl)
587
588 self._status = CLOSED
589 self._buf = ""
590 self._hdr = ""
591 self._last_in = None
592 self._last_out = None
593 self._op_enc = OpEncoder()
594 self._seg_enc = SegmentEncoder()
595 self._frame_enc = FrameEncoder()
596 self._frame_dec = FrameDecoder()
597 self._seg_dec = SegmentDecoder()
598 self._op_dec = OpDecoder()
599
600 self._sasl = sasl.Client()
601 if self.connection.username:
602 self._sasl.setAttr("username", self.connection.username)
603 if self.connection.password:
604 self._sasl.setAttr("password", self.connection.password)
605 if self.connection.host:
606 self._sasl.setAttr("host", self.connection.host)
607 self._sasl.setAttr("service", self.connection.sasl_service)
608 if self.connection.sasl_min_ssf is not None:
609 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf)
610 if self.connection.sasl_max_ssf is not None:
611 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf)
612 self._sasl.init()
613 self._sasl_encode = False
614 self._sasl_decode = False
615
617 self.connection._transport_connected = False
618
619 for ssn in self.connection.sessions.values():
620 for m in ssn.acked + ssn.unacked + ssn.incoming:
621 m._transfer_id = None
622 for snd in ssn.senders:
623 snd.linked = False
624 for rcv in ssn.receivers:
625 rcv.impending = rcv.received
626 rcv.linked = False
627
630
632 self._last_in = time.time()
633 try:
634 if self._sasl_decode:
635 data = self._sasl.decode(data)
636
637 if len(self._hdr) < 8:
638 r = 8 - len(self._hdr)
639 self._hdr += data[:r]
640 data = data[r:]
641
642 if len(self._hdr) == 8:
643 self.do_header(self._hdr)
644
645 self._frame_dec.write(data)
646 self._seg_dec.write(*self._frame_dec.read())
647 self._op_dec.write(*self._seg_dec.read())
648 for op in self._op_dec.read():
649 self.assign_id(op)
650 opslog.debug("RCVD[%s]: %r", self.log_id, op)
651 op.dispatch(self)
652 self.dispatch()
653 except MessagingError, e:
654 self.close(e)
655 except:
656 self.close(InternalError(text=compat.format_exc()))
657
658 - def close(self, e=None):
659 self._reset()
660 if e:
661 self.connection.error = e
662 self._status = CLOSED
663
665 if isinstance(op, Command):
666 sst = self.get_sst(op)
667 op.id = sst.received
668 sst.received += 1
669
671 return len(self._buf)
672
674 result = self._buf[:n]
675 self._buf = self._buf[n:]
676 return result
677
680
682 opslog.debug("SENT[%s]: %r", self.log_id, op)
683 self._op_enc.write(op)
684 self._seg_enc.write(*self._op_enc.read())
685 self._frame_enc.write(*self._seg_enc.read())
686 bytes = self._frame_enc.read()
687 if self._sasl_encode:
688 bytes = self._sasl.encode(bytes)
689 self._buf += bytes
690 self._last_out = time.time()
691
693 cli_major = 0; cli_minor = 10
694 magic, _, _, major, minor = struct.unpack(HEADER, hdr)
695 if major != cli_major or minor != cli_minor:
696 raise VersionError(text="client: %s-%s, server: %s-%s" %
697 (cli_major, cli_minor, major, minor))
698
700 if self.connection.sasl_mechanisms:
701 permitted = self.connection.sasl_mechanisms.split()
702 mechs = [m for m in start.mechanisms if m in permitted]
703 else:
704 mechs = start.mechanisms
705 try:
706 mech, initial = self._sasl.start(" ".join(mechs))
707 except sasl.SASLError, e:
708 raise AuthenticationFailure(text=str(e))
709 self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES,
710 mechanism=mech, response=initial))
711
713 resp = self._sasl.step(secure.challenge)
714 self.write_op(ConnectionSecureOk(response=resp))
715
717
718 if tune.channel_max is not None:
719 self.channel_max = tune.channel_max
720 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
721 channel_max=self.channel_max))
722 self.write_op(ConnectionOpen())
723 self._sasl_encode = True
724
726 self.connection.auth_username = self._sasl.auth_username()
727 self._connected = True
728 self._sasl_decode = True
729 self.connection._transport_connected = True
730
733
739
740
741
742
743
744
747
750
752 sst = self.get_sst(cp)
753 sst.received = cp.command_id
754
756 sst = self.get_sst(sc)
757 for r in sc.commands:
758 sst.acknowledged.add(r.lower, r.upper)
759
760 if not sc.commands.empty():
761 while sst.min_completion in sc.commands:
762 if sst.actions.has_key(sst.min_completion):
763 sst.actions.pop(sst.min_completion)()
764 sst.min_completion += 1
765
767 sst = self.get_sst(kcmp)
768 executed = RangedSet()
769 for e in sst.executed.ranges:
770 for ke in kcmp.ranges:
771 if e.lower in ke and e.upper in ke:
772 break
773 else:
774 executed.add_range(e)
775 sst.executed = completed
776
778 sst = self.get_sst(sf)
779 if sf.expected:
780 if sst.received is None:
781 exp = None
782 else:
783 exp = RangedSet(sst.received)
784 sst.write_op(SessionExpected(exp))
785 if sf.confirmed:
786 sst.write_op(SessionConfirmed(sst.executed))
787 if sf.completed:
788 sst.write_op(SessionCompleted(sst.executed))
789
793
795 sst = self.get_sst(er)
796 sst.results[er.command_id] = er.value
797 sst.executed.add(er.id)
798
803
805 if not self.connection._connected and not self._closing and self._status != CLOSED:
806 self.disconnect()
807
808 if self._connected and not self._closing:
809 for ssn in self.connection.sessions.values():
810 self.attach(ssn)
811 self.process(ssn)
812
813 if self.connection.heartbeat and self._status != CLOSED:
814 now = time.time()
815 if self._last_in is not None and \
816 now - self._last_in > 2*self.connection.heartbeat:
817 raise HeartbeatTimeout(text="heartbeat timeout")
818 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
819 self.write_op(ConnectionHeartbeat())
820
822 self._reset()
823 self._status = OPEN
824 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
825
827 self.write_op(ConnectionClose(close_code.normal))
828 self._closing = True
829
831 sst = self._attachments.get(ssn)
832 if sst is None and not ssn.closed:
833 for i in xrange(0, self.channel_max):
834 if not self._sessions.has_key(i):
835 ch = i
836 break
837 else:
838 raise RuntimeError("all channels used")
839 sst = SessionState(self, ssn, ssn.name, ch)
840 sst.write_op(SessionAttach(name=ssn.name))
841 sst.write_op(SessionCommandPoint(sst.sent, 0))
842 sst.outgoing_idx = 0
843 sst.acked = []
844 sst.acked_idx = 0
845 if ssn.transactional:
846 sst.write_cmd(TxSelect())
847 self._attachments[ssn] = sst
848 self._sessions[sst.channel] = sst
849
850 for snd in ssn.senders:
851 self.link(snd, self._out, snd.target)
852 for rcv in ssn.receivers:
853 self.link(rcv, self._in, rcv.source)
854
855 if sst is not None and ssn.closing and not sst.detached:
856 sst.detached = True
857 sst.write_op(SessionDetach(name=ssn.name))
858
860 return self._sessions[op.channel]
861
863 sst = self._sessions.pop(dtc.channel)
864 ssn = sst.session
865 del self._attachments[ssn]
866 ssn.closed = True
867
872
873 - def link(self, lnk, dir, addr):
874 sst = self._attachments.get(lnk.session)
875 _lnk = self._attachments.get(lnk)
876
877 if _lnk is None and not lnk.closed:
878 _lnk = Attachment(lnk)
879 _lnk.closing = False
880 dir.init_link(sst, lnk, _lnk)
881
882 err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
883 if err:
884 lnk.error = err
885 lnk.closed = True
886 return
887
888 def linked():
889 lnk.linked = True
890
891 def resolved(type, subtype):
892 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
893
894 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved)
895 self._attachments[lnk] = _lnk
896
897 if lnk.linked and lnk.closing and not lnk.closed:
898 if not _lnk.closing:
899 def unlinked():
900 dir.del_link(sst, lnk, _lnk)
901 del self._attachments[lnk]
902 lnk.closed = True
903 if _lnk.options.get("delete") in ("always", dir.DIR_NAME):
904 dir.do_unlink(sst, lnk, _lnk)
905 self.delete(sst, _lnk.name, unlinked)
906 else:
907 dir.do_unlink(sst, lnk, _lnk, unlinked)
908 _lnk.closing = True
909 elif not lnk.linked and lnk.closing and not lnk.closed:
910 if lnk.error: lnk.closed = True
911
925
927 ctx = Context()
928 err = dir.VALIDATOR.validate(lnk.options, ctx)
929 if err: return InvalidOption(text="error in options: %s" % err)
930
932 declare = lnk.options.get("create") in ("always", dir)
933 assrt = lnk.options.get("assert") in ("always", dir)
934 def do_resolved(type, subtype):
935 err = None
936 if type is None:
937 if declare:
938 err = self.declare(sst, lnk, action)
939 else:
940 err = NotFound(text="no such queue: %s" % lnk.name)
941 else:
942 if assrt:
943 expected = lnk.options.get("node", {}).get("type")
944 if expected and type != expected:
945 err = AssertionFailed(text="expected %s, got %s" % (expected, type))
946 if err is None:
947 action(type, subtype)
948
949 if err:
950 tgt = lnk.target
951 tgt.error = err
952 del self._attachments[tgt]
953 tgt.closed = True
954 return
955 self.resolve(sst, lnk.name, do_resolved, force=declare)
956
957 - def resolve(self, sst, name, action, force=False):
958 if not force:
959 try:
960 type, subtype = self.address_cache[name]
961 action(type, subtype)
962 return
963 except KeyError:
964 pass
965
966 args = []
967 def do_result(r):
968 args.append(r)
969 def do_action(r):
970 do_result(r)
971 er, qr = args
972 if er.not_found and not qr.queue:
973 type, subtype = None, None
974 elif qr.queue:
975 type, subtype = "queue", None
976 else:
977 type, subtype = "topic", er.type
978 if type is not None:
979 self.address_cache[name] = (type, subtype)
980 action(type, subtype)
981 sst.write_query(ExchangeQuery(name), do_result)
982 sst.write_query(QueueQuery(name), do_action)
983
984 - def declare(self, sst, lnk, action):
985 name = lnk.name
986 props = lnk.options.get("node", {})
987 durable = props.get("durable", DURABLE_DEFAULT)
988 type = props.get("type", "queue")
989 declare = props.get("x-declare", {})
990
991 if type == "topic":
992 cmd = ExchangeDeclare(exchange=name, durable=durable)
993 bindings = get_bindings(props, exchange=name)
994 elif type == "queue":
995 cmd = QueueDeclare(queue=name, durable=durable)
996 bindings = get_bindings(props, queue=name)
997 else:
998 raise ValueError(type)
999
1000 sst.apply_overrides(cmd, declare)
1001
1002 if type == "topic":
1003 if cmd.type is None:
1004 cmd.type = "topic"
1005 subtype = cmd.type
1006 else:
1007 subtype = None
1008
1009 cmds = [cmd]
1010 cmds.extend(bindings)
1011
1012 def declared():
1013 self.address_cache[name] = (type, subtype)
1014 action(type, subtype)
1015
1016 sst.write_cmds(cmds, declared)
1017
1018 - def delete(self, sst, name, action):
1019 def deleted():
1020 del self.address_cache[name]
1021 action()
1022
1023 def do_delete(type, subtype):
1024 if type == "topic":
1025 sst.write_cmd(ExchangeDelete(name), deleted)
1026 elif type == "queue":
1027 sst.write_cmd(QueueDelete(name), deleted)
1028 elif type is None:
1029 action()
1030 else:
1031 raise ValueError(type)
1032 self.resolve(sst, name, do_delete, force=True)
1033
1035 if ssn.closed or ssn.closing: return
1036
1037 sst = self._attachments[ssn]
1038
1039 while sst.outgoing_idx < len(ssn.outgoing):
1040 msg = ssn.outgoing[sst.outgoing_idx]
1041 snd = msg._sender
1042
1043 _snd = self._attachments.get(snd)
1044 if _snd and snd.linked:
1045 self.send(snd, msg)
1046 sst.outgoing_idx += 1
1047 else:
1048 break
1049
1050 for snd in ssn.senders:
1051
1052 if snd.synced >= snd.queued and sst.need_sync:
1053 sst.write_cmd(ExecutionSync(), sync_noop)
1054
1055 for rcv in ssn.receivers:
1056 self.process_receiver(rcv)
1057
1058 if ssn.acked:
1059 messages = ssn.acked[sst.acked_idx:]
1060 if messages:
1061 ids = RangedSet()
1062
1063 disposed = [(DEFAULT_DISPOSITION, [])]
1064 acked = []
1065 for m in messages:
1066
1067
1068 if m._transfer_id is None:
1069 acked.append(m)
1070 continue
1071 ids.add(m._transfer_id)
1072 if m._receiver._accept_mode is accept_mode.explicit:
1073 disp = m._disposition or DEFAULT_DISPOSITION
1074 last, msgs = disposed[-1]
1075 if disp.type is last.type and disp.options == last.options:
1076 msgs.append(m)
1077 else:
1078 disposed.append((disp, [m]))
1079 else:
1080 acked.append(m)
1081
1082 for range in ids:
1083 sst.executed.add_range(range)
1084 sst.write_op(SessionCompleted(sst.executed))
1085
1086 def ack_acker(msgs):
1087 def ack_ack():
1088 for m in msgs:
1089 ssn.acked.remove(m)
1090 sst.acked_idx -= 1
1091
1092 if not ssn.transactional:
1093 sst.acked.remove(m)
1094 return ack_ack
1095
1096 for disp, msgs in disposed:
1097 if not msgs: continue
1098 if disp.type is None:
1099 op = MessageAccept
1100 elif disp.type is RELEASED:
1101 op = MessageRelease
1102 elif disp.type is REJECTED:
1103 op = MessageReject
1104 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
1105 **disp.options),
1106 ack_acker(msgs))
1107 if log.isEnabledFor(DEBUG):
1108 for m in msgs:
1109 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
1110
1111 sst.acked.extend(messages)
1112 sst.acked_idx += len(messages)
1113 ack_acker(acked)()
1114
1115 if ssn.committing and not sst.committing:
1116 def commit_ok():
1117 del sst.acked[:]
1118 ssn.committing = False
1119 ssn.committed = True
1120 ssn.aborting = False
1121 ssn.aborted = False
1122 sst.committing = False
1123 sst.write_cmd(TxCommit(), commit_ok)
1124 sst.committing = True
1125
1126 if ssn.aborting and not sst.aborting:
1127 sst.aborting = True
1128 def do_rb():
1129 messages = sst.acked + ssn.unacked + ssn.incoming
1130 ids = RangedSet(*[m._transfer_id for m in messages])
1131 for range in ids:
1132 sst.executed.add_range(range)
1133 sst.write_op(SessionCompleted(sst.executed))
1134 sst.write_cmd(MessageRelease(ids, True))
1135 sst.write_cmd(TxRollback(), do_rb_ok)
1136
1137 def do_rb_ok():
1138 del ssn.incoming[:]
1139 del ssn.unacked[:]
1140 del sst.acked[:]
1141
1142 for rcv in ssn.receivers:
1143 rcv.impending = rcv.received
1144 rcv.returned = rcv.received
1145
1146
1147 for rcv in ssn.receivers:
1148 self.process_receiver(rcv)
1149
1150 ssn.aborting = False
1151 ssn.aborted = True
1152 ssn.committing = False
1153 ssn.committed = False
1154 sst.aborting = False
1155
1156 for rcv in ssn.receivers:
1157 _rcv = self._attachments[rcv]
1158 sst.write_cmd(MessageStop(_rcv.destination))
1159 sst.write_cmd(ExecutionSync(), do_rb)
1160
1162 sst = self._attachments[rcv.session]
1163 _rcv = self._attachments.get(rcv)
1164 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining:
1165 return
1166
1167 if rcv.granted is UNLIMITED:
1168 if rcv.impending is UNLIMITED:
1169 delta = 0
1170 else:
1171 delta = UNLIMITED
1172 elif rcv.impending is UNLIMITED:
1173 delta = -1
1174 else:
1175 delta = max(rcv.granted, rcv.received) - rcv.impending
1176
1177 if delta is UNLIMITED:
1178 if not _rcv.bytes_open:
1179 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
1180 _rcv.bytes_open = True
1181 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
1182 rcv.impending = UNLIMITED
1183 elif delta > 0:
1184 if not _rcv.bytes_open:
1185 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
1186 _rcv.bytes_open = True
1187 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
1188 rcv.impending += delta
1189 elif delta < 0 and not rcv.draining:
1190 _rcv.draining = True
1191 def do_stop():
1192 rcv.impending = rcv.received
1193 _rcv.draining = False
1194 _rcv.bytes_open = False
1195 self.grant(rcv)
1196 sst.write_cmd(MessageStop(_rcv.destination), do_stop)
1197
1198 if rcv.draining:
1199 _rcv.draining = True
1200 def do_flush():
1201 rcv.impending = rcv.received
1202 rcv.granted = rcv.impending
1203 _rcv.draining = False
1204 _rcv.bytes_open = False
1205 rcv.draining = False
1206 sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
1207
1208
1210 if rcv.closed: return
1211 self.grant(rcv)
1212
1213 - def send(self, snd, msg):
1214 sst = self._attachments[snd.session]
1215 _snd = self._attachments[snd]
1216
1217 if msg.subject is None or _snd._exchange == "":
1218 rk = _snd._routing_key
1219 else:
1220 rk = msg.subject
1221
1222 if msg.subject is None:
1223 subject = _snd.subject
1224 else:
1225 subject = msg.subject
1226
1227
1228 if msg.reply_to:
1229 rt = addr2reply_to(msg.reply_to)
1230 else:
1231 rt = None
1232 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding")
1233 dp = DeliveryProperties(routing_key=rk)
1234 mp = MessageProperties(message_id=msg.id,
1235 user_id=msg.user_id,
1236 reply_to=rt,
1237 correlation_id=msg.correlation_id,
1238 app_id = msg.properties.get("x-amqp-0-10.app-id"),
1239 content_type=msg.content_type,
1240 content_encoding=content_encoding,
1241 application_headers=msg.properties)
1242 if subject is not None:
1243 if mp.application_headers is None:
1244 mp.application_headers = {}
1245 mp.application_headers[SUBJECT] = subject
1246 if msg.durable is not None:
1247 if msg.durable:
1248 dp.delivery_mode = delivery_mode.persistent
1249 else:
1250 dp.delivery_mode = delivery_mode.non_persistent
1251 if msg.priority is not None:
1252 dp.priority = msg.priority
1253 if msg.ttl is not None:
1254 dp.ttl = long(msg.ttl*1000)
1255 enc, dec = get_codec(msg.content_type)
1256 body = enc(msg.content)
1257
1258
1259 def msg_acked():
1260
1261 snd.acked += 1
1262 m = snd.session.outgoing.pop(0)
1263 sst.outgoing_idx -= 1
1264 log.debug("RACK[%s]: %s", sst.session.log_id, msg)
1265 assert msg == m
1266
1267 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
1268 payload=body)
1269
1270 if _snd.pre_ack:
1271 sst.write_cmd(xfr)
1272 else:
1273 sst.write_cmd(xfr, msg_acked, sync=msg._sync)
1274
1275 log.debug("SENT[%s]: %s", sst.session.log_id, msg)
1276
1277 if _snd.pre_ack:
1278 msg_acked()
1279
1281 sst = self.get_sst(xfr)
1282 ssn = sst.session
1283
1284 msg = self._decode(xfr)
1285 rcv = sst.destinations[xfr.destination].target
1286 msg._receiver = rcv
1287 if rcv.impending is not UNLIMITED:
1288 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
1289 rcv.received += 1
1290 log.debug("RCVD[%s]: %s", ssn.log_id, msg)
1291 ssn.incoming.append(msg)
1292
1294 dp = EMPTY_DP
1295 mp = EMPTY_MP
1296
1297 for h in xfr.headers:
1298 if isinstance(h, DeliveryProperties):
1299 dp = h
1300 elif isinstance(h, MessageProperties):
1301 mp = h
1302
1303 ap = mp.application_headers
1304 enc, dec = get_codec(mp.content_type)
1305 content = dec(xfr.payload)
1306 msg = Message(content)
1307 msg.id = mp.message_id
1308 if ap is not None:
1309 msg.subject = ap.get(SUBJECT)
1310 msg.user_id = mp.user_id
1311 if mp.reply_to is not None:
1312 msg.reply_to = reply_to2addr(mp.reply_to)
1313 msg.correlation_id = mp.correlation_id
1314 if dp.delivery_mode is not None:
1315 msg.durable = dp.delivery_mode == delivery_mode.persistent
1316 msg.priority = dp.priority
1317 if dp.ttl is not None:
1318 msg.ttl = dp.ttl/1000.0
1319 msg.redelivered = dp.redelivered
1320 msg.properties = mp.application_headers or {}
1321 if mp.app_id is not None:
1322 msg.properties["x-amqp-0-10.app-id"] = mp.app_id
1323 if mp.content_encoding is not None:
1324 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding
1325 if dp.routing_key is not None:
1326 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key
1327 msg.content_type = mp.content_type
1328 msg._transfer_id = xfr.id
1329 return msg
1330