Package qpid :: Package messaging :: Module driver
[hide private]
[frames] | no frames]

Source Code for Module qpid.messaging.driver

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  import socket, struct, sys, time 
  21  from logging import getLogger, DEBUG 
  22  from qpid import compat 
  23  from qpid import sasl 
  24  from qpid.concurrency import synchronized 
  25  from qpid.datatypes import RangedSet, Serial 
  26  from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ 
  27      FrameDecoder, SegmentDecoder, OpDecoder 
  28  from qpid.messaging import address, transports 
  29  from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED 
  30  from qpid.messaging.exceptions import * 
  31  from qpid.messaging.message import get_codec, Disposition, Message 
  32  from qpid.ops import * 
  33  from qpid.selector import Selector 
  34  from qpid.util import URL, default 
  35  from qpid.validator import And, Context, List, Map, Types, Values 
  36  from threading import Condition, Thread 
  37   
  38  log = getLogger("qpid.messaging") 
  39  rawlog = getLogger("qpid.messaging.io.raw") 
  40  opslog = getLogger("qpid.messaging.io.ops") 
41 42 -def addr2reply_to(addr):
43 name, subject, options = address.parse(addr) 44 if options: 45 type = options.get("node", {}).get("type") 46 else: 47 type = None 48 49 if type == "topic": 50 return ReplyTo(name, subject) 51 else: 52 return ReplyTo(None, name)
53
54 -def reply_to2addr(reply_to):
55 if reply_to.exchange in (None, ""): 56 return reply_to.routing_key 57 elif reply_to.routing_key is None: 58 return "%s; {node: {type: topic}}" % reply_to.exchange 59 else: 60 return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
61
62 -class Attachment:
63
64 - def __init__(self, target):
65 self.target = target
66 67 # XXX 68 69 DURABLE_DEFAULT=False
70 71 # XXX 72 73 -class Pattern:
74 """ 75 The pattern filter matches the supplied wildcard pattern against a 76 message subject. 77 """ 78
79 - def __init__(self, value):
80 self.value = value
81 82 # XXX: this should become part of the driver
83 - def _bind(self, sst, exchange, queue):
84 from qpid.ops import ExchangeBind 85 86 sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, 87 binding_key=self.value.replace("*", "#")))
88 89 SUBJECT_DEFAULTS = { 90 "topic": "#" 91 } 92 93 # XXX 94 ppid = 0 95 try: 96 ppid = os.getppid() 97 except: 98 pass 99 100 CLIENT_PROPERTIES = {"product": "qpid python client", 101 "version": "development", 102 "platform": os.name, 103 "qpid.client_process": os.path.basename(sys.argv[0]), 104 "qpid.client_pid": os.getpid(), 105 "qpid.client_ppid": ppid}
106 107 -def noop(): pass
108 -def sync_noop(): pass
109
110 -class SessionState:
111
112 - def __init__(self, driver, session, name, channel):
113 self.driver = driver 114 self.session = session 115 self.name = name 116 self.channel = channel 117 self.detached = False 118 self.committing = False 119 self.aborting = False 120 121 # sender state 122 self.sent = Serial(0) 123 self.acknowledged = RangedSet() 124 self.actions = {} 125 self.min_completion = self.sent 126 self.max_completion = self.sent 127 self.results = {} 128 self.need_sync = False 129 130 # receiver state 131 self.received = None 132 self.executed = RangedSet() 133 134 # XXX: need to periodically exchange completion/known_completion 135 136 self.destinations = {}
137
138 - def write_query(self, query, handler):
139 id = self.sent 140 self.write_cmd(query, lambda: handler(self.results.pop(id)))
141
142 - def apply_overrides(self, cmd, overrides):
143 for k, v in overrides.items(): 144 cmd[k.replace('-', '_')] = v
145
146 - def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
147 if overrides: 148 self.apply_overrides(cmd, overrides) 149 150 if action != noop: 151 cmd.sync = sync 152 if self.detached: 153 raise Exception("detached") 154 cmd.id = self.sent 155 self.sent += 1 156 self.actions[cmd.id] = action 157 self.max_completion = cmd.id 158 self.write_op(cmd) 159 self.need_sync = not cmd.sync
160
161 - def write_cmds(self, cmds, action=noop):
162 if cmds: 163 for cmd in cmds[:-1]: 164 self.write_cmd(cmd) 165 self.write_cmd(cmds[-1], action) 166 else: 167 action()
168
169 - def write_op(self, op):
170 op.channel = self.channel 171 self.driver.write_op(op)
172 173 POLICIES = Values("always", "sender", "receiver", "never") 174 RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", 175 "exactly-once") 176 177 DECLARE = Map({}, restricted=False) 178 BINDINGS = List(Map({ 179 "exchange": Types(basestring), 180 "queue": Types(basestring), 181 "key": Types(basestring), 182 "arguments": Map({}, restricted=False) 183 })) 184 185 COMMON_OPTS = { 186 "create": POLICIES, 187 "delete": POLICIES, 188 "assert": POLICIES, 189 "node": Map({ 190 "type": Values("queue", "topic"), 191 "durable": Types(bool), 192 "x-declare": DECLARE, 193 "x-bindings": BINDINGS 194 }), 195 "link": Map({ 196 "name": Types(basestring), 197 "durable": Types(bool), 198 "reliability": RELIABILITY, 199 "x-declare": DECLARE, 200 "x-bindings": BINDINGS, 201 "x-subscribe": Map({}, restricted=False) 202 }) 203 } 204 205 RECEIVE_MODES = Values("browse", "consume") 206 207 SOURCE_OPTS = COMMON_OPTS.copy() 208 SOURCE_OPTS.update({ 209 "mode": RECEIVE_MODES 210 }) 211 212 TARGET_OPTS = COMMON_OPTS.copy()
213 214 -class LinkIn:
215 216 ADDR_NAME = "source" 217 DIR_NAME = "receiver" 218 VALIDATOR = Map(SOURCE_OPTS) 219 226 266 273
276
277 -class LinkOut:
278 279 ADDR_NAME = "target" 280 DIR_NAME = "sender" 281 VALIDATOR = Map(TARGET_OPTS) 282 286 300 303
306
307 -class Cache:
308
309 - def __init__(self, ttl):
310 self.ttl = ttl 311 self.entries = {}
312
313 - def __setitem__(self, key, value):
314 self.entries[key] = time.time(), value
315
316 - def __getitem__(self, key):
317 tstamp, value = self.entries[key] 318 if time.time() - tstamp >= self.ttl: 319 del self.entries[key] 320 raise KeyError(key) 321 else: 322 return value
323
324 - def __delitem__(self, key):
325 del self.entries[key]
326 327 # XXX 328 HEADER="!4s4B" 329 330 EMPTY_DP = DeliveryProperties() 331 EMPTY_MP = MessageProperties() 332 333 SUBJECT = "qpid.subject" 334 335 CLOSED = "CLOSED" 336 READ_ONLY = "READ_ONLY" 337 WRITE_ONLY = "WRITE_ONLY" 338 OPEN = "OPEN"
339 340 -class Driver:
341
342 - def __init__(self, connection):
343 self.connection = connection 344 self.log_id = "%x" % id(self.connection) 345 self._lock = self.connection._lock 346 347 self._selector = Selector.default() 348 self._attempts = 0 349 self._delay = self.connection.reconnect_interval_min 350 self._reconnect_log = self.connection.reconnect_log 351 self._host = 0 352 self._retrying = False 353 self._next_retry = None 354 self._transport = None 355 356 self._timeout = None 357 358 self.engine = None
359
360 - def _next_host(self):
361 urls = [URL(u) for u in self.connection.reconnect_urls] 362 hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ 363 [(u.host, default(u.port, 5672)) for u in urls] 364 if self._host >= len(hosts): 365 self._host = 0 366 result = hosts[self._host] 367 if self._host == 0: 368 self._attempts += 1 369 self._host = self._host + 1 370 return result
371
372 - def _num_hosts(self):
373 return len(self.connection.reconnect_urls) + 1
374 375 @synchronized
376 - def wakeup(self):
377 self.dispatch() 378 self._selector.wakeup()
379
380 - def start(self):
381 self._selector.register(self)
382
383 - def stop(self):
384 self._selector.unregister(self) 385 if self._transport: 386 self.st_closed()
387
388 - def fileno(self):
389 return self._transport.fileno()
390 391 @synchronized
392 - def reading(self):
393 return self._transport is not None and \ 394 self._transport.reading(True)
395 396 @synchronized
397 - def writing(self):
398 return self._transport is not None and \ 399 self._transport.writing(self.engine.pending())
400 401 @synchronized
402 - def timing(self):
403 return self._timeout
404 405 @synchronized
406 - def readable(self):
407 try: 408 data = self._transport.recv(64*1024) 409 if data is None: 410 return 411 elif data: 412 rawlog.debug("READ[%s]: %r", self.log_id, data) 413 self.engine.write(data) 414 else: 415 self.close_engine() 416 except socket.error, e: 417 self.close_engine(ConnectionError(text=str(e))) 418 419 self.update_status() 420 421 self._notify()
422
423 - def _notify(self):
424 if self.connection.error: 425 self.connection._condition.gc() 426 self.connection._waiter.notifyAll()
427
428 - def close_engine(self, e=None):
429 if e is None: 430 e = ConnectionError(text="connection aborted") 431 432 if (self.connection.reconnect and 433 (self.connection.reconnect_limit is None or 434 self.connection.reconnect_limit <= 0 or 435 self._attempts <= self.connection.reconnect_limit)): 436 if self._host < self._num_hosts(): 437 delay = 0 438 else: 439 delay = self._delay 440 self._delay = min(2*self._delay, 441 self.connection.reconnect_interval_max) 442 self._next_retry = time.time() + delay 443 if self._reconnect_log: 444 log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) 445 if delay > 0: 446 log.warn("sleeping %s seconds" % delay) 447 self._retrying = True 448 self.engine.close() 449 else: 450 self.engine.close(e) 451 452 self.schedule()
453
454 - def update_status(self):
455 status = self.engine.status() 456 return getattr(self, "st_%s" % status.lower())()
457
458 - def st_closed(self):
459 # XXX: this log statement seems to sometimes hit when the socket is not connected 460 # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) 461 self._transport.close() 462 self._transport = None 463 self.engine = None 464 return True
465
466 - def st_open(self):
467 return False
468 469 @synchronized
470 - def writeable(self):
471 notify = False 472 try: 473 n = self._transport.send(self.engine.peek()) 474 if n == 0: return 475 sent = self.engine.read(n) 476 rawlog.debug("SENT[%s]: %r", self.log_id, sent) 477 except socket.error, e: 478 self.close_engine(e) 479 notify = True 480 481 if self.update_status() or notify: 482 self._notify()
483 484 @synchronized
485 - def timeout(self):
486 self.dispatch() 487 self._notify() 488 self.schedule()
489
490 - def schedule(self):
491 times = [] 492 if self.connection.heartbeat: 493 times.append(time.time() + self.connection.heartbeat) 494 if self._next_retry: 495 times.append(self._next_retry) 496 if times: 497 self._timeout = min(times) 498 else: 499 self._timeout = None
500
501 - def dispatch(self):
502 try: 503 if self._transport is None: 504 if self.connection._connected and not self.connection.error: 505 self.connect() 506 else: 507 self.engine.dispatch() 508 except HeartbeatTimeout, e: 509 self.close_engine(e) 510 except: 511 # XXX: Does socket get leaked if this occurs? 512 msg = compat.format_exc() 513 self.connection.error = InternalError(text=msg)
514
515 - def connect(self):
516 if self._retrying and time.time() < self._next_retry: 517 return 518 519 try: 520 # XXX: should make this non blocking 521 host, port = self._next_host() 522 if self._retrying and self._reconnect_log: 523 log.warn("trying: %s:%s", host, port) 524 self.engine = Engine(self.connection) 525 self.engine.open() 526 rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) 527 trans = transports.TRANSPORTS.get(self.connection.transport) 528 if trans: 529 self._transport = trans(self.connection, host, port) 530 else: 531 raise ConnectError("no such transport: %s" % self.connection.transport) 532 if self._retrying and self._reconnect_log: 533 log.warn("reconnect succeeded: %s:%s", host, port) 534 self._next_retry = None 535 self._attempts = 0 536 self._host = 0 537 self._delay = self.connection.reconnect_interval_min 538 self._retrying = False 539 self.schedule() 540 except socket.error, e: 541 self.close_engine(ConnectError(text=str(e)))
542 543 DEFAULT_DISPOSITION = Disposition(None)
544 545 -def get_bindings(opts, queue=None, exchange=None, key=None):
546 bindings = opts.get("x-bindings", []) 547 cmds = [] 548 for b in bindings: 549 exchange = b.get("exchange", exchange) 550 queue = b.get("queue", queue) 551 key = b.get("key", key) 552 args = b.get("arguments", {}) 553 cmds.append(ExchangeBind(queue, exchange, key, args)) 554 return cmds
555 556 CONNECTION_ERRS = { 557 # anythong not here (i.e. everything right now) will default to 558 # connection error 559 } 560 561 SESSION_ERRS = { 562 # anything not here will default to session error 563 error_code.unauthorized_access: UnauthorizedAccess, 564 error_code.not_found: NotFound, 565 error_code.resource_locked: ReceiverError, 566 error_code.resource_limit_exceeded: TargetCapacityExceeded, 567 error_code.internal_error: ServerError 568 }
569 570 -class Engine:
571
572 - def __init__(self, connection):
573 self.connection = connection 574 self.log_id = "%x" % id(self.connection) 575 self._closing = False 576 self._connected = False 577 self._attachments = {} 578 579 self._in = LinkIn() 580 self._out = LinkOut() 581 582 self._channel_max = 65536 583 self._channels = 0 584 self._sessions = {} 585 586 self.address_cache = Cache(self.connection.address_ttl) 587 588 self._status = CLOSED 589 self._buf = "" 590 self._hdr = "" 591 self._last_in = None 592 self._last_out = None 593 self._op_enc = OpEncoder() 594 self._seg_enc = SegmentEncoder() 595 self._frame_enc = FrameEncoder() 596 self._frame_dec = FrameDecoder() 597 self._seg_dec = SegmentDecoder() 598 self._op_dec = OpDecoder() 599 600 self._sasl = sasl.Client() 601 if self.connection.username: 602 self._sasl.setAttr("username", self.connection.username) 603 if self.connection.password: 604 self._sasl.setAttr("password", self.connection.password) 605 if self.connection.host: 606 self._sasl.setAttr("host", self.connection.host) 607 self._sasl.setAttr("service", self.connection.sasl_service) 608 if self.connection.sasl_min_ssf is not None: 609 self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) 610 if self.connection.sasl_max_ssf is not None: 611 self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) 612 self._sasl.init() 613 self._sasl_encode = False 614 self._sasl_decode = False
615
616 - def _reset(self):
617 self.connection._transport_connected = False 618 619 for ssn in self.connection.sessions.values(): 620 for m in ssn.acked + ssn.unacked + ssn.incoming: 621 m._transfer_id = None 622 for snd in ssn.senders: 623 snd.linked = False 624 for rcv in ssn.receivers: 625 rcv.impending = rcv.received 626 rcv.linked = False
627
628 - def status(self):
629 return self._status
630
631 - def write(self, data):
632 self._last_in = time.time() 633 try: 634 if self._sasl_decode: 635 data = self._sasl.decode(data) 636 637 if len(self._hdr) < 8: 638 r = 8 - len(self._hdr) 639 self._hdr += data[:r] 640 data = data[r:] 641 642 if len(self._hdr) == 8: 643 self.do_header(self._hdr) 644 645 self._frame_dec.write(data) 646 self._seg_dec.write(*self._frame_dec.read()) 647 self._op_dec.write(*self._seg_dec.read()) 648 for op in self._op_dec.read(): 649 self.assign_id(op) 650 opslog.debug("RCVD[%s]: %r", self.log_id, op) 651 op.dispatch(self) 652 self.dispatch() 653 except MessagingError, e: 654 self.close(e) 655 except: 656 self.close(InternalError(text=compat.format_exc()))
657
658 - def close(self, e=None):
659 self._reset() 660 if e: 661 self.connection.error = e 662 self._status = CLOSED
663
664 - def assign_id(self, op):
665 if isinstance(op, Command): 666 sst = self.get_sst(op) 667 op.id = sst.received 668 sst.received += 1
669
670 - def pending(self):
671 return len(self._buf)
672
673 - def read(self, n):
674 result = self._buf[:n] 675 self._buf = self._buf[n:] 676 return result
677
678 - def peek(self):
679 return self._buf
680
681 - def write_op(self, op):
682 opslog.debug("SENT[%s]: %r", self.log_id, op) 683 self._op_enc.write(op) 684 self._seg_enc.write(*self._op_enc.read()) 685 self._frame_enc.write(*self._seg_enc.read()) 686 bytes = self._frame_enc.read() 687 if self._sasl_encode: 688 bytes = self._sasl.encode(bytes) 689 self._buf += bytes 690 self._last_out = time.time()
691
692 - def do_header(self, hdr):
693 cli_major = 0; cli_minor = 10 694 magic, _, _, major, minor = struct.unpack(HEADER, hdr) 695 if major != cli_major or minor != cli_minor: 696 raise VersionError(text="client: %s-%s, server: %s-%s" % 697 (cli_major, cli_minor, major, minor))
698
699 - def do_connection_start(self, start):
700 if self.connection.sasl_mechanisms: 701 permitted = self.connection.sasl_mechanisms.split() 702 mechs = [m for m in start.mechanisms if m in permitted] 703 else: 704 mechs = start.mechanisms 705 try: 706 mech, initial = self._sasl.start(" ".join(mechs)) 707 except sasl.SASLError, e: 708 raise AuthenticationFailure(text=str(e)) 709 self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES, 710 mechanism=mech, response=initial))
711
712 - def do_connection_secure(self, secure):
713 resp = self._sasl.step(secure.challenge) 714 self.write_op(ConnectionSecureOk(response=resp))
715
716 - def do_connection_tune(self, tune):
717 # XXX: is heartbeat protocol specific? 718 if tune.channel_max is not None: 719 self.channel_max = tune.channel_max 720 self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, 721 channel_max=self.channel_max)) 722 self.write_op(ConnectionOpen()) 723 self._sasl_encode = True
724
725 - def do_connection_open_ok(self, open_ok):
726 self.connection.auth_username = self._sasl.auth_username() 727 self._connected = True 728 self._sasl_decode = True 729 self.connection._transport_connected = True
730
731 - def do_connection_heartbeat(self, hrt):
732 pass
733
734 - def do_connection_close(self, close):
735 self.write_op(ConnectionCloseOk()) 736 if close.reply_code != close_code.normal: 737 exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) 738 self.connection.error = exc(close.reply_code, close.reply_text)
739 # XXX: should we do a half shutdown on the socket here? 740 # XXX: we really need to test this, we may end up reporting a 741 # connection abort after this, if we were to do a shutdown on read 742 # and stop reading, then we wouldn't report the abort, that's 743 # probably the right thing to do 744
745 - def do_connection_close_ok(self, close_ok):
746 self.close()
747
748 - def do_session_attached(self, atc):
749 pass
750
751 - def do_session_command_point(self, cp):
752 sst = self.get_sst(cp) 753 sst.received = cp.command_id
754
755 - def do_session_completed(self, sc):
756 sst = self.get_sst(sc) 757 for r in sc.commands: 758 sst.acknowledged.add(r.lower, r.upper) 759 760 if not sc.commands.empty(): 761 while sst.min_completion in sc.commands: 762 if sst.actions.has_key(sst.min_completion): 763 sst.actions.pop(sst.min_completion)() 764 sst.min_completion += 1
765
766 - def session_known_completed(self, kcmp):
767 sst = self.get_sst(kcmp) 768 executed = RangedSet() 769 for e in sst.executed.ranges: 770 for ke in kcmp.ranges: 771 if e.lower in ke and e.upper in ke: 772 break 773 else: 774 executed.add_range(e) 775 sst.executed = completed
776
777 - def do_session_flush(self, sf):
778 sst = self.get_sst(sf) 779 if sf.expected: 780 if sst.received is None: 781 exp = None 782 else: 783 exp = RangedSet(sst.received) 784 sst.write_op(SessionExpected(exp)) 785 if sf.confirmed: 786 sst.write_op(SessionConfirmed(sst.executed)) 787 if sf.completed: 788 sst.write_op(SessionCompleted(sst.executed))
789
790 - def do_session_request_timeout(self, rt):
791 sst = self.get_sst(rt) 792 sst.write_op(SessionTimeout(timeout=0))
793
794 - def do_execution_result(self, er):
795 sst = self.get_sst(er) 796 sst.results[er.command_id] = er.value 797 sst.executed.add(er.id)
798
799 - def do_execution_exception(self, ex):
800 sst = self.get_sst(ex) 801 exc = SESSION_ERRS.get(ex.error_code, SessionError) 802 sst.session.error = exc(ex.error_code, ex.description)
803
804 - def dispatch(self):
805 if not self.connection._connected and not self._closing and self._status != CLOSED: 806 self.disconnect() 807 808 if self._connected and not self._closing: 809 for ssn in self.connection.sessions.values(): 810 self.attach(ssn) 811 self.process(ssn) 812 813 if self.connection.heartbeat and self._status != CLOSED: 814 now = time.time() 815 if self._last_in is not None and \ 816 now - self._last_in > 2*self.connection.heartbeat: 817 raise HeartbeatTimeout(text="heartbeat timeout") 818 if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: 819 self.write_op(ConnectionHeartbeat())
820
821 - def open(self):
822 self._reset() 823 self._status = OPEN 824 self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
825
826 - def disconnect(self):
827 self.write_op(ConnectionClose(close_code.normal)) 828 self._closing = True
829
830 - def attach(self, ssn):
831 sst = self._attachments.get(ssn) 832 if sst is None and not ssn.closed: 833 for i in xrange(0, self.channel_max): 834 if not self._sessions.has_key(i): 835 ch = i 836 break 837 else: 838 raise RuntimeError("all channels used") 839 sst = SessionState(self, ssn, ssn.name, ch) 840 sst.write_op(SessionAttach(name=ssn.name)) 841 sst.write_op(SessionCommandPoint(sst.sent, 0)) 842 sst.outgoing_idx = 0 843 sst.acked = [] 844 sst.acked_idx = 0 845 if ssn.transactional: 846 sst.write_cmd(TxSelect()) 847 self._attachments[ssn] = sst 848 self._sessions[sst.channel] = sst 849 850 for snd in ssn.senders: 851 self.link(snd, self._out, snd.target) 852 for rcv in ssn.receivers: 853 self.link(rcv, self._in, rcv.source) 854 855 if sst is not None and ssn.closing and not sst.detached: 856 sst.detached = True 857 sst.write_op(SessionDetach(name=ssn.name))
858
859 - def get_sst(self, op):
860 return self._sessions[op.channel]
861
862 - def do_session_detached(self, dtc):
863 sst = self._sessions.pop(dtc.channel) 864 ssn = sst.session 865 del self._attachments[ssn] 866 ssn.closed = True
867
868 - def do_session_detach(self, dtc):
869 sst = self.get_sst(dtc) 870 sst.write_op(SessionDetached(name=dtc.name)) 871 self.do_session_detached(dtc)
872 890 891 def resolved(type, subtype): 892 dir.do_link(sst, lnk, _lnk, type, subtype, linked)
893 894 self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) 895 self._attachments[lnk] = _lnk 896 897 if lnk.linked and lnk.closing and not lnk.closed: 898 if not _lnk.closing: 899 def unlinked(): 900 dir.del_link(sst, lnk, _lnk) 901 del self._attachments[lnk] 902 lnk.closed = True 903 if _lnk.options.get("delete") in ("always", dir.DIR_NAME): 904 dir.do_unlink(sst, lnk, _lnk) 905 self.delete(sst, _lnk.name, unlinked) 906 else: 907 dir.do_unlink(sst, lnk, _lnk, unlinked) 908 _lnk.closing = True 909 elif not lnk.linked and lnk.closing and not lnk.closed: 910 if lnk.error: lnk.closed = True 911
912 - def parse_address(self, lnk, dir, addr):
913 if addr is None: 914 return MalformedAddress(text="%s is None" % dir.ADDR_NAME) 915 else: 916 try: 917 lnk.name, lnk.subject, lnk.options = address.parse(addr) 918 # XXX: subject 919 if lnk.options is None: 920 lnk.options = {} 921 except address.LexError, e: 922 return MalformedAddress(text=str(e)) 923 except address.ParseError, e: 924 return MalformedAddress(text=str(e))
925
926 - def validate_options(self, lnk, dir):
927 ctx = Context() 928 err = dir.VALIDATOR.validate(lnk.options, ctx) 929 if err: return InvalidOption(text="error in options: %s" % err)
930
931 - def resolve_declare(self, sst, lnk, dir, action):
932 declare = lnk.options.get("create") in ("always", dir) 933 assrt = lnk.options.get("assert") in ("always", dir) 934 def do_resolved(type, subtype): 935 err = None 936 if type is None: 937 if declare: 938 err = self.declare(sst, lnk, action) 939 else: 940 err = NotFound(text="no such queue: %s" % lnk.name) 941 else: 942 if assrt: 943 expected = lnk.options.get("node", {}).get("type") 944 if expected and type != expected: 945 err = AssertionFailed(text="expected %s, got %s" % (expected, type)) 946 if err is None: 947 action(type, subtype) 948 949 if err: 950 tgt = lnk.target 951 tgt.error = err 952 del self._attachments[tgt] 953 tgt.closed = True 954 return
955 self.resolve(sst, lnk.name, do_resolved, force=declare) 956
957 - def resolve(self, sst, name, action, force=False):
958 if not force: 959 try: 960 type, subtype = self.address_cache[name] 961 action(type, subtype) 962 return 963 except KeyError: 964 pass 965 966 args = [] 967 def do_result(r): 968 args.append(r)
969 def do_action(r): 970 do_result(r) 971 er, qr = args 972 if er.not_found and not qr.queue: 973 type, subtype = None, None 974 elif qr.queue: 975 type, subtype = "queue", None 976 else: 977 type, subtype = "topic", er.type 978 if type is not None: 979 self.address_cache[name] = (type, subtype) 980 action(type, subtype) 981 sst.write_query(ExchangeQuery(name), do_result) 982 sst.write_query(QueueQuery(name), do_action) 983
984 - def declare(self, sst, lnk, action):
985 name = lnk.name 986 props = lnk.options.get("node", {}) 987 durable = props.get("durable", DURABLE_DEFAULT) 988 type = props.get("type", "queue") 989 declare = props.get("x-declare", {}) 990 991 if type == "topic": 992 cmd = ExchangeDeclare(exchange=name, durable=durable) 993 bindings = get_bindings(props, exchange=name) 994 elif type == "queue": 995 cmd = QueueDeclare(queue=name, durable=durable) 996 bindings = get_bindings(props, queue=name) 997 else: 998 raise ValueError(type) 999 1000 sst.apply_overrides(cmd, declare) 1001 1002 if type == "topic": 1003 if cmd.type is None: 1004 cmd.type = "topic" 1005 subtype = cmd.type 1006 else: 1007 subtype = None 1008 1009 cmds = [cmd] 1010 cmds.extend(bindings) 1011 1012 def declared(): 1013 self.address_cache[name] = (type, subtype) 1014 action(type, subtype)
1015 1016 sst.write_cmds(cmds, declared) 1017
1018 - def delete(self, sst, name, action):
1019 def deleted(): 1020 del self.address_cache[name] 1021 action()
1022 1023 def do_delete(type, subtype): 1024 if type == "topic": 1025 sst.write_cmd(ExchangeDelete(name), deleted) 1026 elif type == "queue": 1027 sst.write_cmd(QueueDelete(name), deleted) 1028 elif type is None: 1029 action() 1030 else: 1031 raise ValueError(type) 1032 self.resolve(sst, name, do_delete, force=True) 1033
1034 - def process(self, ssn):
1035 if ssn.closed or ssn.closing: return 1036 1037 sst = self._attachments[ssn] 1038 1039 while sst.outgoing_idx < len(ssn.outgoing): 1040 msg = ssn.outgoing[sst.outgoing_idx] 1041 snd = msg._sender 1042 # XXX: should check for sender error here 1043 _snd = self._attachments.get(snd) 1044 if _snd and snd.linked: 1045 self.send(snd, msg) 1046 sst.outgoing_idx += 1 1047 else: 1048 break 1049 1050 for snd in ssn.senders: 1051 # XXX: should included snd.acked in this 1052 if snd.synced >= snd.queued and sst.need_sync: 1053 sst.write_cmd(ExecutionSync(), sync_noop) 1054 1055 for rcv in ssn.receivers: 1056 self.process_receiver(rcv) 1057 1058 if ssn.acked: 1059 messages = ssn.acked[sst.acked_idx:] 1060 if messages: 1061 ids = RangedSet() 1062 1063 disposed = [(DEFAULT_DISPOSITION, [])] 1064 acked = [] 1065 for m in messages: 1066 # XXX: we're ignoring acks that get lost when disconnected, 1067 # could we deal this via some message-id based purge? 1068 if m._transfer_id is None: 1069 acked.append(m) 1070 continue 1071 ids.add(m._transfer_id) 1072 if m._receiver._accept_mode is accept_mode.explicit: 1073 disp = m._disposition or DEFAULT_DISPOSITION 1074 last, msgs = disposed[-1] 1075 if disp.type is last.type and disp.options == last.options: 1076 msgs.append(m) 1077 else: 1078 disposed.append((disp, [m])) 1079 else: 1080 acked.append(m) 1081 1082 for range in ids: 1083 sst.executed.add_range(range) 1084 sst.write_op(SessionCompleted(sst.executed)) 1085 1086 def ack_acker(msgs): 1087 def ack_ack(): 1088 for m in msgs: 1089 ssn.acked.remove(m) 1090 sst.acked_idx -= 1 1091 # XXX: should this check accept_mode too? 1092 if not ssn.transactional: 1093 sst.acked.remove(m)
1094 return ack_ack 1095 1096 for disp, msgs in disposed: 1097 if not msgs: continue 1098 if disp.type is None: 1099 op = MessageAccept 1100 elif disp.type is RELEASED: 1101 op = MessageRelease 1102 elif disp.type is REJECTED: 1103 op = MessageReject 1104 sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), 1105 **disp.options), 1106 ack_acker(msgs)) 1107 if log.isEnabledFor(DEBUG): 1108 for m in msgs: 1109 log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) 1110 1111 sst.acked.extend(messages) 1112 sst.acked_idx += len(messages) 1113 ack_acker(acked)() 1114 1115 if ssn.committing and not sst.committing: 1116 def commit_ok(): 1117 del sst.acked[:] 1118 ssn.committing = False 1119 ssn.committed = True 1120 ssn.aborting = False 1121 ssn.aborted = False 1122 sst.committing = False 1123 sst.write_cmd(TxCommit(), commit_ok) 1124 sst.committing = True 1125 1126 if ssn.aborting and not sst.aborting: 1127 sst.aborting = True 1128 def do_rb(): 1129 messages = sst.acked + ssn.unacked + ssn.incoming 1130 ids = RangedSet(*[m._transfer_id for m in messages]) 1131 for range in ids: 1132 sst.executed.add_range(range) 1133 sst.write_op(SessionCompleted(sst.executed)) 1134 sst.write_cmd(MessageRelease(ids, True)) 1135 sst.write_cmd(TxRollback(), do_rb_ok) 1136 1137 def do_rb_ok(): 1138 del ssn.incoming[:] 1139 del ssn.unacked[:] 1140 del sst.acked[:] 1141 1142 for rcv in ssn.receivers: 1143 rcv.impending = rcv.received 1144 rcv.returned = rcv.received 1145 # XXX: do we need to update granted here as well? 1146 1147 for rcv in ssn.receivers: 1148 self.process_receiver(rcv) 1149 1150 ssn.aborting = False 1151 ssn.aborted = True 1152 ssn.committing = False 1153 ssn.committed = False 1154 sst.aborting = False 1155 1156 for rcv in ssn.receivers: 1157 _rcv = self._attachments[rcv] 1158 sst.write_cmd(MessageStop(_rcv.destination)) 1159 sst.write_cmd(ExecutionSync(), do_rb) 1160
1161 - def grant(self, rcv):
1162 sst = self._attachments[rcv.session] 1163 _rcv = self._attachments.get(rcv) 1164 if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: 1165 return 1166 1167 if rcv.granted is UNLIMITED: 1168 if rcv.impending is UNLIMITED: 1169 delta = 0 1170 else: 1171 delta = UNLIMITED 1172 elif rcv.impending is UNLIMITED: 1173 delta = -1 1174 else: 1175 delta = max(rcv.granted, rcv.received) - rcv.impending 1176 1177 if delta is UNLIMITED: 1178 if not _rcv.bytes_open: 1179 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1180 _rcv.bytes_open = True 1181 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) 1182 rcv.impending = UNLIMITED 1183 elif delta > 0: 1184 if not _rcv.bytes_open: 1185 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) 1186 _rcv.bytes_open = True 1187 sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) 1188 rcv.impending += delta 1189 elif delta < 0 and not rcv.draining: 1190 _rcv.draining = True 1191 def do_stop(): 1192 rcv.impending = rcv.received 1193 _rcv.draining = False 1194 _rcv.bytes_open = False 1195 self.grant(rcv)
1196 sst.write_cmd(MessageStop(_rcv.destination), do_stop) 1197 1198 if rcv.draining: 1199 _rcv.draining = True 1200 def do_flush(): 1201 rcv.impending = rcv.received 1202 rcv.granted = rcv.impending 1203 _rcv.draining = False 1204 _rcv.bytes_open = False 1205 rcv.draining = False 1206 sst.write_cmd(MessageFlush(_rcv.destination), do_flush) 1207 1208
1209 - def process_receiver(self, rcv):
1210 if rcv.closed: return 1211 self.grant(rcv)
1212
1213 - def send(self, snd, msg):
1214 sst = self._attachments[snd.session] 1215 _snd = self._attachments[snd] 1216 1217 if msg.subject is None or _snd._exchange == "": 1218 rk = _snd._routing_key 1219 else: 1220 rk = msg.subject 1221 1222 if msg.subject is None: 1223 subject = _snd.subject 1224 else: 1225 subject = msg.subject 1226 1227 # XXX: do we need to query to figure out how to create the reply-to interoperably? 1228 if msg.reply_to: 1229 rt = addr2reply_to(msg.reply_to) 1230 else: 1231 rt = None 1232 content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") 1233 dp = DeliveryProperties(routing_key=rk) 1234 mp = MessageProperties(message_id=msg.id, 1235 user_id=msg.user_id, 1236 reply_to=rt, 1237 correlation_id=msg.correlation_id, 1238 app_id = msg.properties.get("x-amqp-0-10.app-id"), 1239 content_type=msg.content_type, 1240 content_encoding=content_encoding, 1241 application_headers=msg.properties) 1242 if subject is not None: 1243 if mp.application_headers is None: 1244 mp.application_headers = {} 1245 mp.application_headers[SUBJECT] = subject 1246 if msg.durable is not None: 1247 if msg.durable: 1248 dp.delivery_mode = delivery_mode.persistent 1249 else: 1250 dp.delivery_mode = delivery_mode.non_persistent 1251 if msg.priority is not None: 1252 dp.priority = msg.priority 1253 if msg.ttl is not None: 1254 dp.ttl = long(msg.ttl*1000) 1255 enc, dec = get_codec(msg.content_type) 1256 body = enc(msg.content) 1257 1258 # XXX: this is not safe for out of order, can this be triggered by pre_ack? 1259 def msg_acked(): 1260 # XXX: should we log the ack somehow too? 1261 snd.acked += 1 1262 m = snd.session.outgoing.pop(0) 1263 sst.outgoing_idx -= 1 1264 log.debug("RACK[%s]: %s", sst.session.log_id, msg) 1265 assert msg == m
1266 1267 xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), 1268 payload=body) 1269 1270 if _snd.pre_ack: 1271 sst.write_cmd(xfr) 1272 else: 1273 sst.write_cmd(xfr, msg_acked, sync=msg._sync) 1274 1275 log.debug("SENT[%s]: %s", sst.session.log_id, msg) 1276 1277 if _snd.pre_ack: 1278 msg_acked() 1279
1280 - def do_message_transfer(self, xfr):
1281 sst = self.get_sst(xfr) 1282 ssn = sst.session 1283 1284 msg = self._decode(xfr) 1285 rcv = sst.destinations[xfr.destination].target 1286 msg._receiver = rcv 1287 if rcv.impending is not UNLIMITED: 1288 assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) 1289 rcv.received += 1 1290 log.debug("RCVD[%s]: %s", ssn.log_id, msg) 1291 ssn.incoming.append(msg)
1292
1293 - def _decode(self, xfr):
1294 dp = EMPTY_DP 1295 mp = EMPTY_MP 1296 1297 for h in xfr.headers: 1298 if isinstance(h, DeliveryProperties): 1299 dp = h 1300 elif isinstance(h, MessageProperties): 1301 mp = h 1302 1303 ap = mp.application_headers 1304 enc, dec = get_codec(mp.content_type) 1305 content = dec(xfr.payload) 1306 msg = Message(content) 1307 msg.id = mp.message_id 1308 if ap is not None: 1309 msg.subject = ap.get(SUBJECT) 1310 msg.user_id = mp.user_id 1311 if mp.reply_to is not None: 1312 msg.reply_to = reply_to2addr(mp.reply_to) 1313 msg.correlation_id = mp.correlation_id 1314 if dp.delivery_mode is not None: 1315 msg.durable = dp.delivery_mode == delivery_mode.persistent 1316 msg.priority = dp.priority 1317 if dp.ttl is not None: 1318 msg.ttl = dp.ttl/1000.0 1319 msg.redelivered = dp.redelivered 1320 msg.properties = mp.application_headers or {} 1321 if mp.app_id is not None: 1322 msg.properties["x-amqp-0-10.app-id"] = mp.app_id 1323 if mp.content_encoding is not None: 1324 msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding 1325 if dp.routing_key is not None: 1326 msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key 1327 msg.content_type = mp.content_type 1328 msg._transfer_id = xfr.id 1329 return msg
1330