1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import socket, struct, sys, time
21 from logging import getLogger, DEBUG
22 from qpid import compat
23 from qpid import sasl
24 from qpid.concurrency import synchronized
25 from qpid.datatypes import RangedSet, Serial
26 from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
27 FrameDecoder, SegmentDecoder, OpDecoder
28 from qpid.messaging import address, transports
29 from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
30 from qpid.messaging.exceptions import *
31 from qpid.messaging.message import get_codec, Disposition, Message
32 from qpid.ops import *
33 from qpid.selector import Selector
34 from qpid.util import URL, default
35 from qpid.validator import And, Context, List, Map, Types, Values
36 from threading import Condition, Thread
37
38 log = getLogger("qpid.messaging")
39 rawlog = getLogger("qpid.messaging.io.raw")
40 opslog = getLogger("qpid.messaging.io.ops")
43 name, subject, options = address.parse(addr)
44 if options:
45 type = options.get("node", {}).get("type")
46 else:
47 type = None
48
49 if type == "topic":
50 return ReplyTo(name, subject)
51 else:
52 return ReplyTo(None, name)
53
55 if reply_to.exchange in (None, ""):
56 return reply_to.routing_key
57 elif reply_to.routing_key is None:
58 return "%s; {node: {type: topic}}" % reply_to.exchange
59 else:
60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
66
67
68
69 DURABLE_DEFAULT=False
74 """
75 The pattern filter matches the supplied wildcard pattern against a
76 message subject.
77 """
78
81
82
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind
85
86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
87 binding_key=self.value.replace("*", "#")))
88
89 SUBJECT_DEFAULTS = {
90 "topic": "#"
91 }
92
93
94 ppid = 0
95 try:
96 ppid = os.getppid()
97 except:
98 pass
99
100 CLIENT_PROPERTIES = {"product": "qpid python client",
101 "version": "development",
102 "platform": os.name,
103 "qpid.client_process": os.path.basename(sys.argv[0]),
104 "qpid.client_pid": os.getpid(),
105 "qpid.client_ppid": ppid}
106
107 -def noop(): pass
109
111
112 - def __init__(self, driver, session, name, channel):
113 self.driver = driver
114 self.session = session
115 self.name = name
116 self.channel = channel
117 self.detached = False
118 self.committing = False
119 self.aborting = False
120
121
122 self.sent = Serial(0)
123 self.acknowledged = RangedSet()
124 self.actions = {}
125 self.min_completion = self.sent
126 self.max_completion = self.sent
127 self.results = {}
128 self.need_sync = False
129
130
131 self.received = None
132 self.executed = RangedSet()
133
134
135
136 self.destinations = {}
137
139 id = self.sent
140 self.write_cmd(query, lambda: handler(self.results.pop(id)))
141
143 for k, v in overrides.items():
144 cmd[k.replace('-', '_')] = v
145
146 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
147 if overrides:
148 self.apply_overrides(cmd, overrides)
149
150 if action != noop:
151 cmd.sync = sync
152 if self.detached:
153 raise Exception("detached")
154 cmd.id = self.sent
155 self.sent += 1
156 self.actions[cmd.id] = action
157 self.max_completion = cmd.id
158 self.write_op(cmd)
159 self.need_sync = not cmd.sync
160
162 if cmds:
163 for cmd in cmds[:-1]:
164 self.write_cmd(cmd)
165 self.write_cmd(cmds[-1], action)
166 else:
167 action()
168
172
173 POLICIES = Values("always", "sender", "receiver", "never")
174 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
175 "exactly-once")
176
177 DECLARE = Map({}, restricted=False)
178 BINDINGS = List(Map({
179 "exchange": Types(basestring),
180 "queue": Types(basestring),
181 "key": Types(basestring),
182 "arguments": Map({}, restricted=False)
183 }))
184
185 COMMON_OPTS = {
186 "create": POLICIES,
187 "delete": POLICIES,
188 "assert": POLICIES,
189 "node": Map({
190 "type": Values("queue", "topic"),
191 "durable": Types(bool),
192 "x-declare": DECLARE,
193 "x-bindings": BINDINGS
194 }),
195 "link": Map({
196 "name": Types(basestring),
197 "durable": Types(bool),
198 "reliability": RELIABILITY,
199 "x-declare": DECLARE,
200 "x-bindings": BINDINGS,
201 "x-subscribe": Map({}, restricted=False)
202 })
203 }
204
205 RECEIVE_MODES = Values("browse", "consume")
206
207 SOURCE_OPTS = COMMON_OPTS.copy()
208 SOURCE_OPTS.update({
209 "mode": RECEIVE_MODES
210 })
211
212 TARGET_OPTS = COMMON_OPTS.copy()
215
216 ADDR_NAME = "source"
217 DIR_NAME = "receiver"
218 VALIDATOR = Map(SOURCE_OPTS)
219
221 _rcv.destination = str(rcv.id)
222 sst.destinations[_rcv.destination] = _rcv
223 _rcv.draining = False
224 _rcv.bytes_open = False
225 _rcv.on_unlink = []
226
227 - def do_link(self, sst, rcv, _rcv, type, subtype, action):
228 link_opts = _rcv.options.get("link", {})
229 reliability = link_opts.get("reliability", "at-least-once")
230 declare = link_opts.get("x-declare", {})
231 subscribe = link_opts.get("x-subscribe", {})
232 acq_mode = acquire_mode.pre_acquired
233 if reliability in ("unreliable", "at-most-once"):
234 rcv._accept_mode = accept_mode.none
235 else:
236 rcv._accept_mode = accept_mode.explicit
237
238 if type == "topic":
239 default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
240 _rcv._queue = link_opts.get("name", default_name)
241 sst.write_cmd(QueueDeclare(queue=_rcv._queue,
242 durable=link_opts.get("durable", False),
243 exclusive=True,
244 auto_delete=(reliability == "unreliable")),
245 overrides=declare)
246 _rcv.on_unlink = [QueueDelete(_rcv._queue)]
247 subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
248 bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
249 if not bindings:
250 sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
251
252 elif type == "queue":
253 _rcv._queue = _rcv.name
254 if _rcv.options.get("mode", "consume") == "browse":
255 acq_mode = acquire_mode.not_acquired
256 bindings = get_bindings(link_opts, queue=_rcv._queue)
257
258
259 sst.write_cmds(bindings)
260 sst.write_cmd(MessageSubscribe(queue=_rcv._queue,
261 destination=_rcv.destination,
262 acquire_mode = acq_mode,
263 accept_mode = rcv._accept_mode),
264 overrides=subscribe)
265 sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
266
268 link_opts = _rcv.options.get("link", {})
269 reliability = link_opts.get("reliability")
270 cmds = [MessageCancel(_rcv.destination)]
271 cmds.extend(_rcv.on_unlink)
272 sst.write_cmds(cmds, action)
273
275 del sst.destinations[_rcv.destination]
276
278
279 ADDR_NAME = "target"
280 DIR_NAME = "sender"
281 VALIDATOR = Map(TARGET_OPTS)
282
284 _snd.closing = False
285 _snd.pre_ack = False
286
287 - def do_link(self, sst, snd, _snd, type, subtype, action):
288 link_opts = _snd.options.get("link", {})
289 reliability = link_opts.get("reliability", "at-least-once")
290 _snd.pre_ack = reliability in ("unreliable", "at-most-once")
291 if type == "topic":
292 _snd._exchange = _snd.name
293 _snd._routing_key = _snd.subject
294 bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
295 elif type == "queue":
296 _snd._exchange = ""
297 _snd._routing_key = _snd.name
298 bindings = get_bindings(link_opts, queue=_snd.name)
299 sst.write_cmds(bindings, action)
300
303
306
308
310 self.ttl = ttl
311 self.entries = {}
312
314 self.entries[key] = time.time(), value
315
317 tstamp, value = self.entries[key]
318 if time.time() - tstamp >= self.ttl:
319 del self.entries[key]
320 raise KeyError(key)
321 else:
322 return value
323
325 del self.entries[key]
326
327
328 HEADER="!4s4B"
329
330 EMPTY_DP = DeliveryProperties()
331 EMPTY_MP = MessageProperties()
332
333 SUBJECT = "qpid.subject"
334
335 CLOSED = "CLOSED"
336 READ_ONLY = "READ_ONLY"
337 WRITE_ONLY = "WRITE_ONLY"
338 OPEN = "OPEN"
341
343 self.connection = connection
344 self.log_id = "%x" % id(self.connection)
345 self._lock = self.connection._lock
346
347 self._selector = Selector.default()
348 self._attempts = 0
349 self._delay = self.connection.reconnect_interval_min
350 self._reconnect_log = self.connection.reconnect_log
351 self._host = 0
352 self._retrying = False
353 self._next_retry = None
354 self._transport = None
355
356 self._timeout = None
357
358 self.engine = None
359
361 urls = [URL(u) for u in self.connection.reconnect_urls]
362 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \
363 [(u.host, default(u.port, 5672)) for u in urls]
364 if self._host >= len(hosts):
365 self._host = 0
366 result = hosts[self._host]
367 if self._host == 0:
368 self._attempts += 1
369 self._host = self._host + 1
370 return result
371
373 return len(self.connection.reconnect_urls) + 1
374
375 @synchronized
379
381 self._selector.register(self)
382
384 self._selector.unregister(self)
385 if self._transport:
386 self.st_closed()
387
389 return self._transport.fileno()
390
391 @synchronized
393 return self._transport is not None and \
394 self._transport.reading(True)
395
396 @synchronized
398 return self._transport is not None and \
399 self._transport.writing(self.engine.pending())
400
401 @synchronized
404
405 @synchronized
422
424 if self.connection.error:
425 self.connection._condition.gc()
426 self.connection._waiter.notifyAll()
427
429 if e is None:
430 e = ConnectionError(text="connection aborted")
431
432 if (self.connection.reconnect and
433 (self.connection.reconnect_limit is None or
434 self.connection.reconnect_limit <= 0 or
435 self._attempts <= self.connection.reconnect_limit)):
436 if self._host < self._num_hosts():
437 delay = 0
438 else:
439 delay = self._delay
440 self._delay = min(2*self._delay,
441 self.connection.reconnect_interval_max)
442 self._next_retry = time.time() + delay
443 if self._reconnect_log:
444 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
445 if delay > 0:
446 log.warn("sleeping %s seconds" % delay)
447 self._retrying = True
448 self.engine.close()
449 else:
450 self.engine.close(e)
451
452 self.schedule()
453
457
459
460
461 self._transport.close()
462 self._transport = None
463 self.engine = None
464 return True
465
468
469 @synchronized
471 notify = False
472 try:
473 n = self._transport.send(self.engine.peek())
474 if n == 0: return
475 sent = self.engine.read(n)
476 rawlog.debug("SENT[%s]: %r", self.log_id, sent)
477 except socket.error, e:
478 self.close_engine(e)
479 notify = True
480
481 if self.update_status() or notify:
482 self._notify()
483
484 @synchronized
489
491 times = []
492 if self.connection.heartbeat:
493 times.append(time.time() + self.connection.heartbeat)
494 if self._next_retry:
495 times.append(self._next_retry)
496 if times:
497 self._timeout = min(times)
498 else:
499 self._timeout = None
500
502 try:
503 if self._transport is None:
504 if self.connection._connected and not self.connection.error:
505 self.connect()
506 else:
507 self.engine.dispatch()
508 except HeartbeatTimeout, e:
509 self.close_engine(e)
510 except:
511
512 msg = compat.format_exc()
513 self.connection.error = InternalError(text=msg)
514
516 if self._retrying and time.time() < self._next_retry:
517 return
518
519 try:
520
521 host, port = self._next_host()
522 if self._retrying and self._reconnect_log:
523 log.warn("trying: %s:%s", host, port)
524 self.engine = Engine(self.connection)
525 self.engine.open()
526 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
527 trans = transports.TRANSPORTS.get(self.connection.transport)
528 if trans:
529 self._transport = trans(self.connection, host, port)
530 else:
531 raise ConnectError("no such transport: %s" % self.connection.transport)
532 if self._retrying and self._reconnect_log:
533 log.warn("reconnect succeeded: %s:%s", host, port)
534 self._next_retry = None
535 self._attempts = 0
536 self._host = 0
537 self._delay = self.connection.reconnect_interval_min
538 self._retrying = False
539 self.schedule()
540 except socket.error, e:
541 self.close_engine(ConnectError(text=str(e)))
542
543 DEFAULT_DISPOSITION = Disposition(None)
544
545 -def get_bindings(opts, queue=None, exchange=None, key=None):
546 bindings = opts.get("x-bindings", [])
547 cmds = []
548 for b in bindings:
549 exchange = b.get("exchange", exchange)
550 queue = b.get("queue", queue)
551 key = b.get("key", key)
552 args = b.get("arguments", {})
553 cmds.append(ExchangeBind(queue, exchange, key, args))
554 return cmds
555
556 CONNECTION_ERRS = {
557
558
559 }
560
561 SESSION_ERRS = {
562
563 error_code.unauthorized_access: UnauthorizedAccess,
564 error_code.not_found: NotFound,
565 error_code.resource_locked: ReceiverError,
566 error_code.resource_limit_exceeded: TargetCapacityExceeded,
567 error_code.internal_error: ServerError
568 }
571
573 self.connection = connection
574 self.log_id = "%x" % id(self.connection)
575 self._closing = False
576 self._connected = False
577 self._attachments = {}
578
579 self._in = LinkIn()
580 self._out = LinkOut()
581
582 self._channel_max = 65536
583 self._channels = 0
584 self._sessions = {}
585
586 self.address_cache = Cache(self.connection.address_ttl)
587
588 self._status = CLOSED
589 self._buf = ""
590 self._hdr = ""
591 self._last_in = None
592 self._last_out = None
593 self._op_enc = OpEncoder()
594 self._seg_enc = SegmentEncoder()
595 self._frame_enc = FrameEncoder()
596 self._frame_dec = FrameDecoder()
597 self._seg_dec = SegmentDecoder()
598 self._op_dec = OpDecoder()
599
600 self._sasl = sasl.Client()
601 if self.connection.username:
602 self._sasl.setAttr("username", self.connection.username)
603 if self.connection.password:
604 self._sasl.setAttr("password", self.connection.password)
605 if self.connection.host:
606 self._sasl.setAttr("host", self.connection.host)
607 self._sasl.setAttr("service", self.connection.sasl_service)
608 if self.connection.sasl_min_ssf is not None:
609 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf)
610 if self.connection.sasl_max_ssf is not None:
611 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf)
612 self._sasl.init()
613 self._sasl_encode = False
614 self._sasl_decode = False
615
617 self.connection._transport_connected = False
618
619 for ssn in self.connection.sessions.values():
620 for m in ssn.acked + ssn.unacked + ssn.incoming:
621 m._transfer_id = None
622 for snd in ssn.senders:
623 snd.linked = False
624 for rcv in ssn.receivers:
625 rcv.impending = rcv.received
626 rcv.linked = False
627
630
632 self._last_in = time.time()
633 try:
634 if self._sasl_decode:
635 data = self._sasl.decode(data)
636
637 if len(self._hdr) < 8:
638 r = 8 - len(self._hdr)
639 self._hdr += data[:r]
640 data = data[r:]
641
642 if len(self._hdr) == 8:
643 self.do_header(self._hdr)
644
645 self._frame_dec.write(data)
646 self._seg_dec.write(*self._frame_dec.read())
647 self._op_dec.write(*self._seg_dec.read())
648 for op in self._op_dec.read():
649 self.assign_id(op)
650 opslog.debug("RCVD[%s]: %r", self.log_id, op)
651 op.dispatch(self)
652 self.dispatch()
653 except MessagingError, e:
654 self.close(e)
655 except:
656 self.close(InternalError(text=compat.format_exc()))
657
658 - def close(self, e=None):
659 self._reset()
660 if e:
661 self.connection.error = e
662 self._status = CLOSED
663
665 if isinstance(op, Command):
666 sst = self.get_sst(op)
667 op.id = sst.received
668 sst.received += 1
669
671 return len(self._buf)
672
674 result = self._buf[:n]
675 self._buf = self._buf[n:]
676 return result
677
680
682 opslog.debug("SENT[%s]: %r", self.log_id, op)
683 self._op_enc.write(op)
684 self._seg_enc.write(*self._op_enc.read())
685 self._frame_enc.write(*self._seg_enc.read())
686 bytes = self._frame_enc.read()
687 if self._sasl_encode:
688 bytes = self._sasl.encode(bytes)
689 self._buf += bytes
690 self._last_out = time.time()
691
693 cli_major = 0; cli_minor = 10
694 magic, _, _, major, minor = struct.unpack(HEADER, hdr)
695 if major != cli_major or minor != cli_minor:
696 raise VersionError(text="client: %s-%s, server: %s-%s" %
697 (cli_major, cli_minor, major, minor))
698
700 if self.connection.sasl_mechanisms:
701 permitted = self.connection.sasl_mechanisms.split()
702 mechs = [m for m in start.mechanisms if m in permitted]
703 else:
704 mechs = start.mechanisms
705 try:
706 mech, initial = self._sasl.start(" ".join(mechs))
707 except sasl.SASLError, e:
708 raise AuthenticationFailure(text=str(e))
709 self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES,
710 mechanism=mech, response=initial))
711
713 resp = self._sasl.step(secure.challenge)
714 self.write_op(ConnectionSecureOk(response=resp))
715
717
718 if tune.channel_max is not None:
719 self.channel_max = tune.channel_max
720 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
721 channel_max=self.channel_max))
722 self.write_op(ConnectionOpen())
723 self._sasl_encode = True
724
726 self.connection.auth_username = self._sasl.auth_username()
727 self._connected = True
728 self._sasl_decode = True
729 self.connection._transport_connected = True
730
733
739
740
741
742
743
744
747
750
752 sst = self.get_sst(cp)
753 sst.received = cp.command_id
754
756 sst = self.get_sst(sc)
757 for r in sc.commands:
758 sst.acknowledged.add(r.lower, r.upper)
759
760 if not sc.commands.empty():
761 while sst.min_completion in sc.commands:
762 if sst.actions.has_key(sst.min_completion):
763 sst.actions.pop(sst.min_completion)()
764 sst.min_completion += 1
765
767 sst = self.get_sst(kcmp)
768 executed = RangedSet()
769 for e in sst.executed.ranges:
770 for ke in kcmp.ranges:
771 if e.lower in ke and e.upper in ke:
772 break
773 else:
774 executed.add_range(e)
775 sst.executed = completed
776
778 sst = self.get_sst(sf)
779 if sf.expected:
780 if sst.received is None:
781 exp = None
782 else:
783 exp = RangedSet(sst.received)
784 sst.write_op(SessionExpected(exp))
785 if sf.confirmed:
786 sst.write_op(SessionConfirmed(sst.executed))
787 if sf.completed:
788 sst.write_op(SessionCompleted(sst.executed))
789
793
795 sst = self.get_sst(er)
796 sst.results[er.command_id] = er.value
797 sst.executed.add(er.id)
798
803
805 if not self.connection._connected and not self._closing and self._status != CLOSED:
806 self.disconnect()
807
808 if self._connected and not self._closing:
809 for ssn in self.connection.sessions.values():
810 self.attach(ssn)
811 self.process(ssn)
812
813 if self.connection.heartbeat and self._status != CLOSED:
814 now = time.time()
815 if self._last_in is not None and \
816 now - self._last_in > 2*self.connection.heartbeat:
817 raise HeartbeatTimeout(text="heartbeat timeout")
818 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
819 self.write_op(ConnectionHeartbeat())
820
822 self._reset()
823 self._status = OPEN
824 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
825
827 self.write_op(ConnectionClose(close_code.normal))
828 self._closing = True
829
831 if ssn.closed: return
832 sst = self._attachments.get(ssn)
833 if sst is None:
834 for i in xrange(0, self.channel_max):
835 if not self._sessions.has_key(i):
836 ch = i
837 break
838 else:
839 raise RuntimeError("all channels used")
840 sst = SessionState(self, ssn, ssn.name, ch)
841 sst.write_op(SessionAttach(name=ssn.name))
842 sst.write_op(SessionCommandPoint(sst.sent, 0))
843 sst.outgoing_idx = 0
844 sst.acked = []
845 sst.acked_idx = 0
846 if ssn.transactional:
847 sst.write_cmd(TxSelect())
848 self._attachments[ssn] = sst
849 self._sessions[sst.channel] = sst
850
851 for snd in ssn.senders:
852 self.link(snd, self._out, snd.target)
853 for rcv in ssn.receivers:
854 self.link(rcv, self._in, rcv.source)
855
856 if sst is not None and ssn.closing and not sst.detached:
857 sst.detached = True
858 sst.write_op(SessionDetach(name=ssn.name))
859
861 return self._sessions[op.channel]
862
864 sst = self._sessions.pop(dtc.channel)
865 ssn = sst.session
866 del self._attachments[ssn]
867 ssn.closed = True
868
873
874 - def link(self, lnk, dir, addr):
875 sst = self._attachments.get(lnk.session)
876 _lnk = self._attachments.get(lnk)
877
878 if _lnk is None and not lnk.closed:
879 _lnk = Attachment(lnk)
880 _lnk.closing = False
881 dir.init_link(sst, lnk, _lnk)
882
883 err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
884 if err:
885 lnk.error = err
886 lnk.closed = True
887 return
888
889 def linked():
890 lnk.linked = True
891
892 def resolved(type, subtype):
893 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
894
895 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved)
896 self._attachments[lnk] = _lnk
897
898 if lnk.linked and lnk.closing and not lnk.closed:
899 if not _lnk.closing:
900 def unlinked():
901 dir.del_link(sst, lnk, _lnk)
902 del self._attachments[lnk]
903 lnk.closed = True
904 if _lnk.options.get("delete") in ("always", dir.DIR_NAME):
905 dir.do_unlink(sst, lnk, _lnk)
906 self.delete(sst, _lnk.name, unlinked)
907 else:
908 dir.do_unlink(sst, lnk, _lnk, unlinked)
909 _lnk.closing = True
910 elif not lnk.linked and lnk.closing and not lnk.closed:
911 if lnk.error: lnk.closed = True
912
926
928 ctx = Context()
929 err = dir.VALIDATOR.validate(lnk.options, ctx)
930 if err: return InvalidOption(text="error in options: %s" % err)
931
933 declare = lnk.options.get("create") in ("always", dir)
934 assrt = lnk.options.get("assert") in ("always", dir)
935 def do_resolved(type, subtype):
936 err = None
937 if type is None:
938 if declare:
939 err = self.declare(sst, lnk, action)
940 else:
941 err = NotFound(text="no such queue: %s" % lnk.name)
942 else:
943 if assrt:
944 expected = lnk.options.get("node", {}).get("type")
945 if expected and type != expected:
946 err = AssertionFailed(text="expected %s, got %s" % (expected, type))
947 if err is None:
948 action(type, subtype)
949
950 if err:
951 tgt = lnk.target
952 tgt.error = err
953 del self._attachments[tgt]
954 tgt.closed = True
955 return
956 self.resolve(sst, lnk.name, do_resolved, force=declare)
957
958 - def resolve(self, sst, name, action, force=False):
959 if not force:
960 try:
961 type, subtype = self.address_cache[name]
962 action(type, subtype)
963 return
964 except KeyError:
965 pass
966
967 args = []
968 def do_result(r):
969 args.append(r)
970 def do_action(r):
971 do_result(r)
972 er, qr = args
973 if er.not_found and not qr.queue:
974 type, subtype = None, None
975 elif qr.queue:
976 type, subtype = "queue", None
977 else:
978 type, subtype = "topic", er.type
979 if type is not None:
980 self.address_cache[name] = (type, subtype)
981 action(type, subtype)
982 sst.write_query(ExchangeQuery(name), do_result)
983 sst.write_query(QueueQuery(name), do_action)
984
985 - def declare(self, sst, lnk, action):
986 name = lnk.name
987 props = lnk.options.get("node", {})
988 durable = props.get("durable", DURABLE_DEFAULT)
989 type = props.get("type", "queue")
990 declare = props.get("x-declare", {})
991
992 if type == "topic":
993 cmd = ExchangeDeclare(exchange=name, durable=durable)
994 bindings = get_bindings(props, exchange=name)
995 elif type == "queue":
996 cmd = QueueDeclare(queue=name, durable=durable)
997 bindings = get_bindings(props, queue=name)
998 else:
999 raise ValueError(type)
1000
1001 sst.apply_overrides(cmd, declare)
1002
1003 if type == "topic":
1004 if cmd.type is None:
1005 cmd.type = "topic"
1006 subtype = cmd.type
1007 else:
1008 subtype = None
1009
1010 cmds = [cmd]
1011 cmds.extend(bindings)
1012
1013 def declared():
1014 self.address_cache[name] = (type, subtype)
1015 action(type, subtype)
1016
1017 sst.write_cmds(cmds, declared)
1018
1019 - def delete(self, sst, name, action):
1020 def deleted():
1021 del self.address_cache[name]
1022 action()
1023
1024 def do_delete(type, subtype):
1025 if type == "topic":
1026 sst.write_cmd(ExchangeDelete(name), deleted)
1027 elif type == "queue":
1028 sst.write_cmd(QueueDelete(name), deleted)
1029 elif type is None:
1030 action()
1031 else:
1032 raise ValueError(type)
1033 self.resolve(sst, name, do_delete, force=True)
1034
1036 if ssn.closed or ssn.closing: return
1037
1038 sst = self._attachments[ssn]
1039
1040 while sst.outgoing_idx < len(ssn.outgoing):
1041 msg = ssn.outgoing[sst.outgoing_idx]
1042 snd = msg._sender
1043
1044 _snd = self._attachments.get(snd)
1045 if _snd and snd.linked:
1046 self.send(snd, msg)
1047 sst.outgoing_idx += 1
1048 else:
1049 break
1050
1051 for snd in ssn.senders:
1052
1053 if snd.synced >= snd.queued and sst.need_sync:
1054 sst.write_cmd(ExecutionSync(), sync_noop)
1055
1056 for rcv in ssn.receivers:
1057 self.process_receiver(rcv)
1058
1059 if ssn.acked:
1060 messages = ssn.acked[sst.acked_idx:]
1061 if messages:
1062 ids = RangedSet()
1063
1064 disposed = [(DEFAULT_DISPOSITION, [])]
1065 acked = []
1066 for m in messages:
1067
1068
1069 if m._transfer_id is None:
1070 acked.append(m)
1071 continue
1072 ids.add(m._transfer_id)
1073 if m._receiver._accept_mode is accept_mode.explicit:
1074 disp = m._disposition or DEFAULT_DISPOSITION
1075 last, msgs = disposed[-1]
1076 if disp.type is last.type and disp.options == last.options:
1077 msgs.append(m)
1078 else:
1079 disposed.append((disp, [m]))
1080 else:
1081 acked.append(m)
1082
1083 for range in ids:
1084 sst.executed.add_range(range)
1085 sst.write_op(SessionCompleted(sst.executed))
1086
1087 def ack_acker(msgs):
1088 def ack_ack():
1089 for m in msgs:
1090 ssn.acked.remove(m)
1091 sst.acked_idx -= 1
1092
1093 if not ssn.transactional:
1094 sst.acked.remove(m)
1095 return ack_ack
1096
1097 for disp, msgs in disposed:
1098 if not msgs: continue
1099 if disp.type is None:
1100 op = MessageAccept
1101 elif disp.type is RELEASED:
1102 op = MessageRelease
1103 elif disp.type is REJECTED:
1104 op = MessageReject
1105 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
1106 **disp.options),
1107 ack_acker(msgs))
1108 if log.isEnabledFor(DEBUG):
1109 for m in msgs:
1110 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
1111
1112 sst.acked.extend(messages)
1113 sst.acked_idx += len(messages)
1114 ack_acker(acked)()
1115
1116 if ssn.committing and not sst.committing:
1117 def commit_ok():
1118 del sst.acked[:]
1119 ssn.committing = False
1120 ssn.committed = True
1121 ssn.aborting = False
1122 ssn.aborted = False
1123 sst.committing = False
1124 sst.write_cmd(TxCommit(), commit_ok)
1125 sst.committing = True
1126
1127 if ssn.aborting and not sst.aborting:
1128 sst.aborting = True
1129 def do_rb():
1130 messages = sst.acked + ssn.unacked + ssn.incoming
1131 ids = RangedSet(*[m._transfer_id for m in messages])
1132 for range in ids:
1133 sst.executed.add_range(range)
1134 sst.write_op(SessionCompleted(sst.executed))
1135 sst.write_cmd(MessageRelease(ids, True))
1136 sst.write_cmd(TxRollback(), do_rb_ok)
1137
1138 def do_rb_ok():
1139 del ssn.incoming[:]
1140 del ssn.unacked[:]
1141 del sst.acked[:]
1142
1143 for rcv in ssn.receivers:
1144 rcv.impending = rcv.received
1145 rcv.returned = rcv.received
1146
1147
1148 for rcv in ssn.receivers:
1149 self.process_receiver(rcv)
1150
1151 ssn.aborting = False
1152 ssn.aborted = True
1153 ssn.committing = False
1154 ssn.committed = False
1155 sst.aborting = False
1156
1157 for rcv in ssn.receivers:
1158 _rcv = self._attachments[rcv]
1159 sst.write_cmd(MessageStop(_rcv.destination))
1160 sst.write_cmd(ExecutionSync(), do_rb)
1161
1163 sst = self._attachments[rcv.session]
1164 _rcv = self._attachments.get(rcv)
1165 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining:
1166 return
1167
1168 if rcv.granted is UNLIMITED:
1169 if rcv.impending is UNLIMITED:
1170 delta = 0
1171 else:
1172 delta = UNLIMITED
1173 elif rcv.impending is UNLIMITED:
1174 delta = -1
1175 else:
1176 delta = max(rcv.granted, rcv.received) - rcv.impending
1177
1178 if delta is UNLIMITED:
1179 if not _rcv.bytes_open:
1180 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
1181 _rcv.bytes_open = True
1182 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
1183 rcv.impending = UNLIMITED
1184 elif delta > 0:
1185 if not _rcv.bytes_open:
1186 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
1187 _rcv.bytes_open = True
1188 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
1189 rcv.impending += delta
1190 elif delta < 0 and not rcv.draining:
1191 _rcv.draining = True
1192 def do_stop():
1193 rcv.impending = rcv.received
1194 _rcv.draining = False
1195 _rcv.bytes_open = False
1196 self.grant(rcv)
1197 sst.write_cmd(MessageStop(_rcv.destination), do_stop)
1198
1199 if rcv.draining:
1200 _rcv.draining = True
1201 def do_flush():
1202 rcv.impending = rcv.received
1203 rcv.granted = rcv.impending
1204 _rcv.draining = False
1205 _rcv.bytes_open = False
1206 rcv.draining = False
1207 sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
1208
1209
1211 if rcv.closed: return
1212 self.grant(rcv)
1213
1214 - def send(self, snd, msg):
1215 sst = self._attachments[snd.session]
1216 _snd = self._attachments[snd]
1217
1218 if msg.subject is None or _snd._exchange == "":
1219 rk = _snd._routing_key
1220 else:
1221 rk = msg.subject
1222
1223 if msg.subject is None:
1224 subject = _snd.subject
1225 else:
1226 subject = msg.subject
1227
1228
1229 if msg.reply_to:
1230 rt = addr2reply_to(msg.reply_to)
1231 else:
1232 rt = None
1233 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding")
1234 dp = DeliveryProperties(routing_key=rk)
1235 mp = MessageProperties(message_id=msg.id,
1236 user_id=msg.user_id,
1237 reply_to=rt,
1238 correlation_id=msg.correlation_id,
1239 app_id = msg.properties.get("x-amqp-0-10.app-id"),
1240 content_type=msg.content_type,
1241 content_encoding=content_encoding,
1242 application_headers=msg.properties)
1243 if subject is not None:
1244 if mp.application_headers is None:
1245 mp.application_headers = {}
1246 mp.application_headers[SUBJECT] = subject
1247 if msg.durable is not None:
1248 if msg.durable:
1249 dp.delivery_mode = delivery_mode.persistent
1250 else:
1251 dp.delivery_mode = delivery_mode.non_persistent
1252 if msg.priority is not None:
1253 dp.priority = msg.priority
1254 if msg.ttl is not None:
1255 dp.ttl = long(msg.ttl*1000)
1256 enc, dec = get_codec(msg.content_type)
1257 body = enc(msg.content)
1258
1259
1260 def msg_acked():
1261
1262 snd.acked += 1
1263 m = snd.session.outgoing.pop(0)
1264 sst.outgoing_idx -= 1
1265 log.debug("RACK[%s]: %s", sst.session.log_id, msg)
1266 assert msg == m
1267
1268 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
1269 payload=body)
1270
1271 if _snd.pre_ack:
1272 sst.write_cmd(xfr)
1273 else:
1274 sst.write_cmd(xfr, msg_acked, sync=msg._sync)
1275
1276 log.debug("SENT[%s]: %s", sst.session.log_id, msg)
1277
1278 if _snd.pre_ack:
1279 msg_acked()
1280
1282 sst = self.get_sst(xfr)
1283 ssn = sst.session
1284
1285 msg = self._decode(xfr)
1286 rcv = sst.destinations[xfr.destination].target
1287 msg._receiver = rcv
1288 if rcv.impending is not UNLIMITED:
1289 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
1290 rcv.received += 1
1291 log.debug("RCVD[%s]: %s", ssn.log_id, msg)
1292 ssn.incoming.append(msg)
1293
1295 dp = EMPTY_DP
1296 mp = EMPTY_MP
1297
1298 for h in xfr.headers:
1299 if isinstance(h, DeliveryProperties):
1300 dp = h
1301 elif isinstance(h, MessageProperties):
1302 mp = h
1303
1304 ap = mp.application_headers
1305 enc, dec = get_codec(mp.content_type)
1306 content = dec(xfr.payload)
1307 msg = Message(content)
1308 msg.id = mp.message_id
1309 if ap is not None:
1310 msg.subject = ap.get(SUBJECT)
1311 msg.user_id = mp.user_id
1312 if mp.reply_to is not None:
1313 msg.reply_to = reply_to2addr(mp.reply_to)
1314 msg.correlation_id = mp.correlation_id
1315 if dp.delivery_mode is not None:
1316 msg.durable = dp.delivery_mode == delivery_mode.persistent
1317 msg.priority = dp.priority
1318 if dp.ttl is not None:
1319 msg.ttl = dp.ttl/1000.0
1320 msg.redelivered = dp.redelivered
1321 msg.properties = mp.application_headers or {}
1322 if mp.app_id is not None:
1323 msg.properties["x-amqp-0-10.app-id"] = mp.app_id
1324 if mp.content_encoding is not None:
1325 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding
1326 if dp.routing_key is not None:
1327 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key
1328 msg.content_type = mp.content_type
1329 msg._transfer_id = xfr.id
1330 return msg
1331