Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  import logging, os, socket, time, types 
 21  from heapq import heappush, heappop, nsmallest 
 22  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 23  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 24  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 25  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 26  from select import select 
 27  from proton.handlers import OutgoingMessageHandler 
 28  from proton import unicode2utf8, utf82unicode 
 29   
 30  import traceback 
 31  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 32  from .wrapper import Wrapper, PYCTX 
 33  from cproton import * 
 34  from . import _compat 
 35   
 36  try: 
 37      import Queue 
 38  except ImportError: 
 39      import queue as Queue 
40 41 -class Task(Wrapper):
42 43 @staticmethod
44 - def wrap(impl):
45 if impl is None: 46 return None 47 else: 48 return Task(impl)
49
50 - def __init__(self, impl):
51 Wrapper.__init__(self, impl, pn_task_attachments)
52
53 - def _init(self):
54 pass
55
56 - def cancel(self):
57 pn_task_cancel(self._impl)
58
59 -class Acceptor(Wrapper):
60
61 - def __init__(self, impl):
62 Wrapper.__init__(self, impl)
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
67 - def close(self):
68 pn_acceptor_close(self._impl)
69
70 -class Reactor(Wrapper):
71 72 @staticmethod
73 - def wrap(impl):
74 if impl is None: 75 return None 76 else: 77 record = pn_reactor_attachments(impl) 78 attrs = pn_void2py(pn_record_get(record, PYCTX)) 79 if attrs and 'subclass' in attrs: 80 return attrs['subclass'](impl=impl) 81 else: 82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
85 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 86 for h in handlers: 87 self.handler.add(h)
88
89 - def _init(self):
90 self.errors = []
91
92 - def on_error(self, info):
93 self.errors.append(info) 94 self.yield_()
95
96 - def _get_global(self):
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
99 - def _set_global(self, handler):
100 impl = _chandler(handler, self.on_error) 101 pn_reactor_set_global_handler(self._impl, impl) 102 pn_decref(impl)
103 104 global_handler = property(_get_global, _set_global) 105
106 - def _get_timeout(self):
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
109 - def _set_timeout(self, secs):
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111 112 timeout = property(_get_timeout, _set_timeout) 113
114 - def yield_(self):
115 pn_reactor_yield(self._impl)
116
117 - def mark(self):
118 return pn_reactor_mark(self._impl)
119
120 - def _get_handler(self):
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
123 - def _set_handler(self, handler):
124 impl = _chandler(handler, self.on_error) 125 pn_reactor_set_handler(self._impl, impl) 126 pn_decref(impl)
127 128 handler = property(_get_handler, _set_handler) 129
130 - def run(self):
131 self.timeout = 3.14159265359 132 self.start() 133 while self.process(): pass 134 self.stop() 135 self.process() 136 self.global_handler = None 137 self.handler = None
138
139 - def wakeup(self):
140 n = pn_reactor_wakeup(self._impl) 141 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
142
143 - def start(self):
144 pn_reactor_start(self._impl)
145 146 @property
147 - def quiesced(self):
148 return pn_reactor_quiesced(self._impl)
149
150 - def _check_errors(self):
151 if self.errors: 152 for exc, value, tb in self.errors[:-1]: 153 traceback.print_exception(exc, value, tb) 154 exc, value, tb = self.errors[-1] 155 _compat.raise_(exc, value, tb)
156
157 - def process(self):
158 result = pn_reactor_process(self._impl) 159 self._check_errors() 160 return result
161
162 - def stop(self):
163 pn_reactor_stop(self._impl) 164 self._check_errors()
165
166 - def schedule(self, delay, task):
167 impl = _chandler(task, self.on_error) 168 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 169 pn_decref(impl) 170 return task
171
172 - def acceptor(self, host, port, handler=None):
173 impl = _chandler(handler, self.on_error) 174 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 175 pn_decref(impl) 176 if aimpl: 177 return Acceptor(aimpl) 178 else: 179 raise IOError("%s (%s:%s)" % pn_error_text(pn_reactor_error(self._impl)), host, port)
180
181 - def connection(self, handler=None):
182 """Deprecated: use connection_to_host() instead 183 """ 184 impl = _chandler(handler, self.on_error) 185 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 186 if impl: pn_decref(impl) 187 return result
188
189 - def connection_to_host(self, host, port, handler=None):
190 """Create an outgoing Connection that will be managed by the reactor. 191 The reator's pn_iohandler will create a socket connection to the host 192 once the connection is opened. 193 """ 194 conn = self.connection(handler) 195 self.set_connection_host(conn, host, port) 196 return conn
197
198 - def set_connection_host(self, connection, host, port):
199 """Change the address used by the connection. The address is 200 used by the reactor's iohandler to create an outgoing socket 201 connection. This must be set prior to opening the connection. 202 """ 203 pn_reactor_set_connection_host(self._impl, 204 connection._impl, 205 unicode2utf8(str(host)), 206 unicode2utf8(str(port)))
207
208 - def get_connection_address(self, connection):
209 """This may be used to retrieve the remote peer address. 210 @return: string containing the address in URL format or None if no 211 address is available. Use the proton.Url class to create a Url object 212 from the returned value. 213 """ 214 _url = pn_reactor_get_connection_address(self._impl, connection._impl) 215 return utf82unicode(_url)
216
217 - def selectable(self, handler=None):
218 impl = _chandler(handler, self.on_error) 219 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 220 if impl: 221 record = pn_selectable_attachments(result._impl) 222 pn_record_set_handler(record, impl) 223 pn_decref(impl) 224 return result
225
226 - def update(self, sel):
227 pn_reactor_update(self._impl, sel._impl)
228
229 - def push_event(self, obj, etype):
230 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
231 232 from proton import wrappers as _wrappers 233 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 234 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
235 236 237 -class EventInjector(object):
238 """ 239 Can be added to a reactor to allow events to be triggered by an 240 external thread but handled on the event thread associated with 241 the reactor. An instance of this class can be passed to the 242 Reactor.selectable() method of the reactor in order to activate 243 it. The close() method should be called when it is no longer 244 needed, to allow the event loop to end if needed. 245 """
246 - def __init__(self):
247 self.queue = Queue.Queue() 248 self.pipe = os.pipe() 249 self._closed = False
250
251 - def trigger(self, event):
252 """ 253 Request that the given event be dispatched on the event thread 254 of the reactor to which this EventInjector was added. 255 """ 256 self.queue.put(event) 257 os.write(self.pipe[1], _compat.str2bin("!"))
258
259 - def close(self):
260 """ 261 Request that this EventInjector be closed. Existing events 262 will be dispctahed on the reactors event dispactch thread, 263 then this will be removed from the set of interest. 264 """ 265 self._closed = True 266 os.write(self.pipe[1], _compat.str2bin("!"))
267
268 - def fileno(self):
269 return self.pipe[0]
270
271 - def on_selectable_init(self, event):
272 sel = event.context 273 sel.fileno(self.fileno()) 274 sel.reading = True 275 event.reactor.update(sel)
276
277 - def on_selectable_readable(self, event):
278 os.read(self.pipe[0], 512) 279 while not self.queue.empty(): 280 requested = self.queue.get() 281 event.reactor.push_event(requested.context, requested.type) 282 if self._closed: 283 s = event.context 284 s.terminate() 285 event.reactor.update(s)
286
287 288 -class ApplicationEvent(EventBase):
289 """ 290 Application defined event, which can optionally be associated with 291 an engine object and or an arbitrary subject 292 """
293 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
294 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 295 self.connection = connection 296 self.session = session 297 self.link = link 298 self.delivery = delivery 299 if self.delivery: 300 self.link = self.delivery.link 301 if self.link: 302 self.session = self.link.session 303 if self.session: 304 self.connection = self.session.connection 305 self.subject = subject
306
307 - def __repr__(self):
308 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 309 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
310
311 -class Transaction(object):
312 """ 313 Class to track state of an AMQP 1.0 transaction. 314 """
315 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
316 self.txn_ctrl = txn_ctrl 317 self.handler = handler 318 self.id = None 319 self._declare = None 320 self._discharge = None 321 self.failed = False 322 self._pending = [] 323 self.settle_before_discharge = settle_before_discharge 324 self.declare()
325
326 - def commit(self):
327 self.discharge(False)
328
329 - def abort(self):
330 self.discharge(True)
331
332 - def declare(self):
333 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
334
335 - def discharge(self, failed):
336 self.failed = failed 337 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
338
339 - def _send_ctrl(self, descriptor, value):
340 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 341 delivery.transaction = self 342 return delivery
343
344 - def send(self, sender, msg, tag=None):
345 dlv = sender.send(msg, tag=tag) 346 dlv.local.data = [self.id] 347 dlv.update(0x34) 348 return dlv
349
350 - def accept(self, delivery):
351 self.update(delivery, PN_ACCEPTED) 352 if self.settle_before_discharge: 353 delivery.settle() 354 else: 355 self._pending.append(delivery)
356
357 - def update(self, delivery, state=None):
358 if state: 359 delivery.local.data = [self.id, Described(ulong(state), [])] 360 delivery.update(0x34)
361
362 - def _release_pending(self):
363 for d in self._pending: 364 d.update(Delivery.RELEASED) 365 d.settle() 366 self._clear_pending()
367
368 - def _clear_pending(self):
369 self._pending = []
370
371 - def handle_outcome(self, event):
372 if event.delivery == self._declare: 373 if event.delivery.remote.data: 374 self.id = event.delivery.remote.data[0] 375 self.handler.on_transaction_declared(event) 376 elif event.delivery.remote_state == Delivery.REJECTED: 377 self.handler.on_transaction_declare_failed(event) 378 else: 379 logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 380 self.handler.on_transaction_declare_failed(event) 381 elif event.delivery == self._discharge: 382 if event.delivery.remote_state == Delivery.REJECTED: 383 if not self.failed: 384 self.handler.on_transaction_commit_failed(event) 385 self._release_pending() # make this optional? 386 else: 387 if self.failed: 388 self.handler.on_transaction_aborted(event) 389 self._release_pending() 390 else: 391 self.handler.on_transaction_committed(event) 392 self._clear_pending()
393
394 -class LinkOption(object):
395 """ 396 Abstract interface for link configuration options 397 """
398 - def apply(self, link):
399 """ 400 Subclasses will implement any configuration logic in this 401 method 402 """ 403 pass
404 - def test(self, link):
405 """ 406 Subclasses can override this to selectively apply an option 407 e.g. based on some link criteria 408 """ 409 return True
410
411 -class AtMostOnce(LinkOption):
412 - def apply(self, link):
414
415 -class AtLeastOnce(LinkOption):
416 - def apply(self, link):
419
420 -class SenderOption(LinkOption):
421 - def apply(self, sender): pass
422 - def test(self, link): return link.is_sender
423
424 -class ReceiverOption(LinkOption):
425 - def apply(self, receiver): pass
426 - def test(self, link): return link.is_receiver
427
428 -class DynamicNodeProperties(LinkOption):
429 - def __init__(self, props={}):
430 self.properties = {} 431 for k in props: 432 if isinstance(k, symbol): 433 self.properties[k] = props[k] 434 else: 435 self.properties[symbol(k)] = props[k]
436
437 - def apply(self, link):
442
443 -class Filter(ReceiverOption):
444 - def __init__(self, filter_set={}):
445 self.filter_set = filter_set
446
447 - def apply(self, receiver):
448 receiver.source.filter.put_dict(self.filter_set)
449
450 -class Selector(Filter):
451 """ 452 Configures a link with a message selector filter 453 """
454 - def __init__(self, value, name='selector'):
455 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
456
457 -class DurableSubscription(ReceiverOption):
458 - def apply(self, receiver):
461
462 -class Move(ReceiverOption):
463 - def apply(self, receiver):
465
466 -class Copy(ReceiverOption):
467 - def apply(self, receiver):
469 477
478 -def _create_session(connection, handler=None):
479 session = connection.session() 480 session.open() 481 return session
482
483 484 -def _get_attr(target, name):
485 if hasattr(target, name): 486 return getattr(target, name) 487 else: 488 return None
489
490 -class SessionPerConnection(object):
491 - def __init__(self):
492 self._default_session = None
493
494 - def session(self, connection):
495 if not self._default_session: 496 self._default_session = _create_session(connection) 497 self._default_session.context = self 498 return self._default_session
499
500 - def on_session_remote_close(self, event):
501 event.connection.close() 502 self._default_session = None
503
504 -class GlobalOverrides(object):
505 """ 506 Internal handler that triggers the necessary socket connect for an 507 opened connection. 508 """
509 - def __init__(self, base):
510 self.base = base
511
512 - def on_unhandled(self, name, event):
513 if not self._override(event): 514 event.dispatch(self.base)
515
516 - def _override(self, event):
517 conn = event.connection 518 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
519
520 -class Connector(Handler):
521 """ 522 Internal handler that triggers the necessary socket connect for an 523 opened connection. 524 """
525 - def __init__(self, connection):
526 self.connection = connection 527 self.address = None 528 self.heartbeat = None 529 self.reconnect = None 530 self.ssl_domain = None 531 self.allow_insecure_mechs = True 532 self.allowed_mechs = None 533 self.sasl_enabled = True 534 self.user = None 535 self.password = None 536 self.virtual_host = None 537 self.ssl_sni = None
538
539 - def _connect(self, connection, reactor):
540 assert(reactor is not None) 541 url = self.address.next() 542 reactor.set_connection_host(connection, url.host, str(url.port)) 543 # if virtual-host not set, use host from address as default 544 if self.virtual_host is None: 545 connection.hostname = url.host 546 logging.debug("connecting to %s..." % url) 547 548 transport = Transport() 549 if self.sasl_enabled: 550 sasl = transport.sasl() 551 sasl.allow_insecure_mechs = self.allow_insecure_mechs 552 if url.username: 553 connection.user = url.username 554 elif self.user: 555 connection.user = self.user 556 if url.password: 557 connection.password = url.password 558 elif self.password: 559 connection.password = self.password 560 if self.allowed_mechs: 561 sasl.allowed_mechs(self.allowed_mechs) 562 transport.bind(connection) 563 if self.heartbeat: 564 transport.idle_timeout = self.heartbeat 565 if url.scheme == 'amqps': 566 if not self.ssl_domain: 567 raise SSLUnavailable("amqps: SSL libraries not found") 568 self.ssl = SSL(transport, self.ssl_domain) 569 self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host
570
571 - def on_connection_local_open(self, event):
572 self._connect(event.connection, event.reactor)
573
574 - def on_connection_remote_open(self, event):
575 logging.debug("connected to %s" % event.connection.hostname) 576 if self.reconnect: 577 self.reconnect.reset() 578 self.transport = None
579
580 - def on_transport_tail_closed(self, event):
581 self.on_transport_closed(event)
582
583 - def on_transport_closed(self, event):
584 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: 585 if self.reconnect: 586 event.transport.unbind() 587 delay = self.reconnect.next() 588 if delay == 0: 589 logging.info("Disconnected, reconnecting...") 590 self._connect(self.connection, event.reactor) 591 else: 592 logging.info("Disconnected will try to reconnect after %s seconds" % delay) 593 event.reactor.schedule(delay, self) 594 else: 595 logging.debug("Disconnected") 596 self.connection = None
597
598 - def on_timer_task(self, event):
599 self._connect(self.connection, event.reactor)
600
601 - def on_connection_remote_close(self, event):
602 self.connection = None
603
604 -class Backoff(object):
605 """ 606 A reconnect strategy involving an increasing delay between 607 retries, up to a maximum or 10 seconds. 608 """
609 - def __init__(self):
610 self.delay = 0
611
612 - def reset(self):
613 self.delay = 0
614
615 - def next(self):
616 current = self.delay 617 if current == 0: 618 self.delay = 0.1 619 else: 620 self.delay = min(10, 2*current) 621 return current
622
623 -class Urls(object):
624 - def __init__(self, values):
625 self.values = [Url(v) for v in values] 626 self.i = iter(self.values)
627
628 - def __iter__(self):
629 return self
630
631 - def next(self):
632 try: 633 return next(self.i) 634 except StopIteration: 635 self.i = iter(self.values) 636 return next(self.i)
637
638 -class SSLConfig(object):
639 - def __init__(self):
640 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 641 self.server = SSLDomain(SSLDomain.MODE_SERVER)
642
643 - def set_credentials(self, cert_file, key_file, password):
644 self.client.set_credentials(cert_file, key_file, password) 645 self.server.set_credentials(cert_file, key_file, password)
646
647 - def set_trusted_ca_db(self, certificate_db):
648 self.client.set_trusted_ca_db(certificate_db) 649 self.server.set_trusted_ca_db(certificate_db)
650
651 652 -class Container(Reactor):
653 """A representation of the AMQP concept of a 'container', which 654 lossely speaking is something that establishes links to or from 655 another container, over which messages are transfered. This is 656 an extension to the Reactor class that adds convenience methods 657 for creating connections and sender- or receiver- links. 658 """
659 - def __init__(self, *handlers, **kwargs):
660 super(Container, self).__init__(*handlers, **kwargs) 661 if "impl" not in kwargs: 662 try: 663 self.ssl = SSLConfig() 664 except SSLUnavailable: 665 self.ssl = None 666 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 667 self.trigger = None 668 self.container_id = str(generate_uuid()) 669 self.allow_insecure_mechs = True 670 self.allowed_mechs = None 671 self.sasl_enabled = True 672 self.user = None 673 self.password = None 674 Wrapper.__setattr__(self, 'subclass', self.__class__)
675
676 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
677 """ 678 Initiates the establishment of an AMQP connection. Returns an 679 instance of proton.Connection. 680 681 @param url: URL string of process to connect to 682 683 @param urls: list of URL strings of process to try to connect to 684 685 Only one of url or urls should be specified. 686 687 @param reconnect: A value of False will prevent the library 688 form automatically trying to reconnect if the underlying 689 socket is disconnected before the connection has been closed. 690 691 @param heartbeat: A value in milliseconds indicating the 692 desired frequency of heartbeats used to test the underlying 693 socket is alive. 694 695 @param ssl_domain: SSL configuration in the form of an 696 instance of proton.SSLdomain. 697 698 @param handler: a connection scoped handler that will be 699 called to process any events in the scope of this connection 700 or its child links 701 702 @param kwargs: sasl_enabled, which determines whether a sasl layer is 703 used for the connection; allowed_mechs an optional list of SASL 704 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag 705 indicating whether insecure mechanisms, such as PLAIN over a 706 non-encrypted socket, are allowed; 'virtual_host' the hostname to set 707 in the Open performative used by peer to determine the correct 708 back-end service for the client. If 'virtual_host' is not supplied the 709 host field from the URL is used instead." 710 711 """ 712 conn = self.connection(handler) 713 conn.container = self.container_id or str(generate_uuid()) 714 conn.offered_capabilities = kwargs.get('offered_capabilities') 715 conn.desired_capabilities = kwargs.get('desired_capabilities') 716 conn.properties = kwargs.get('properties') 717 718 connector = Connector(conn) 719 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 720 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 721 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 722 connector.user = kwargs.get('user', self.user) 723 connector.password = kwargs.get('password', self.password) 724 connector.virtual_host = kwargs.get('virtual_host') 725 if connector.virtual_host: 726 # only set hostname if virtual-host is a non-empty string 727 conn.hostname = connector.virtual_host 728 connector.ssl_sni = kwargs.get('sni') 729 730 conn._overrides = connector 731 if url: connector.address = Urls([url]) 732 elif urls: connector.address = Urls(urls) 733 elif address: connector.address = address 734 else: raise ValueError("One of url, urls or address required") 735 if heartbeat: 736 connector.heartbeat = heartbeat 737 if reconnect: 738 connector.reconnect = reconnect 739 elif reconnect is None: 740 connector.reconnect = Backoff() 741 # use container's default client domain if none specified. This is 742 # only necessary of the URL specifies the "amqps:" scheme 743 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 744 conn._session_policy = SessionPerConnection() #todo: make configurable 745 conn.open() 746 return conn
747
748 - def _get_id(self, container, remote, local):
749 if local and remote: "%s-%s-%s" % (container, remote, local) 750 elif local: return "%s-%s" % (container, local) 751 elif remote: return "%s-%s" % (container, remote) 752 else: return "%s-%s" % (container, str(generate_uuid()))
753
754 - def _get_session(self, context):
755 if isinstance(context, Url): 756 return self._get_session(self.connect(url=context)) 757 elif isinstance(context, Session): 758 return context 759 elif isinstance(context, Connection): 760 if hasattr(context, '_session_policy'): 761 return context._session_policy.session(context) 762 else: 763 return _create_session(context) 764 else: 765 return context.session()
766
767 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
768 """ 769 Initiates the establishment of a link over which messages can 770 be sent. Returns an instance of proton.Sender. 771 772 There are two patterns of use. (1) A connection can be passed 773 as the first argument, in which case the link is established 774 on that connection. In this case the target address can be 775 specified as the second argument (or as a keyword 776 argument). The source address can also be specified if 777 desired. (2) Alternatively a URL can be passed as the first 778 argument. In this case a new connection will be establised on 779 which the link will be attached. If a path is specified and 780 the target is not, then the path of the URL is used as the 781 target address. 782 783 The name of the link may be specified if desired, otherwise a 784 unique name will be generated. 785 786 Various LinkOptions can be specified to further control the 787 attachment. 788 """ 789 if isinstance(context, _compat.STRING_TYPES): 790 context = Url(context) 791 if isinstance(context, Url) and not target: 792 target = context.path 793 session = self._get_session(context) 794 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 795 if source: 796 snd.source.address = source 797 if target: 798 snd.target.address = target 799 if handler != None: 800 snd.handler = handler 801 if tags: 802 snd.tag_generator = tags 803 _apply_link_options(options, snd) 804 snd.open() 805 return snd
806
807 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
808 """ 809 Initiates the establishment of a link over which messages can 810 be received (aka a subscription). Returns an instance of 811 proton.Receiver. 812 813 There are two patterns of use. (1) A connection can be passed 814 as the first argument, in which case the link is established 815 on that connection. In this case the source address can be 816 specified as the second argument (or as a keyword 817 argument). The target address can also be specified if 818 desired. (2) Alternatively a URL can be passed as the first 819 argument. In this case a new connection will be establised on 820 which the link will be attached. If a path is specified and 821 the source is not, then the path of the URL is used as the 822 target address. 823 824 The name of the link may be specified if desired, otherwise a 825 unique name will be generated. 826 827 Various LinkOptions can be specified to further control the 828 attachment. 829 """ 830 if isinstance(context, _compat.STRING_TYPES): 831 context = Url(context) 832 if isinstance(context, Url) and not source: 833 source = context.path 834 session = self._get_session(context) 835 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 836 if source: 837 rcv.source.address = source 838 if dynamic: 839 rcv.source.dynamic = True 840 if target: 841 rcv.target.address = target 842 if handler != None: 843 rcv.handler = handler 844 _apply_link_options(options, rcv) 845 rcv.open() 846 return rcv
847
848 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
849 if not _get_attr(context, '_txn_ctrl'): 850 class InternalTransactionHandler(OutgoingMessageHandler): 851 def __init__(self): 852 super(InternalTransactionHandler, self).__init__(auto_settle=True)
853 854 def on_settled(self, event): 855 if hasattr(event.delivery, "transaction"): 856 event.transaction = event.delivery.transaction 857 event.delivery.transaction.handle_outcome(event)
858 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 859 context._txn_ctrl.target.type = Terminus.COORDINATOR 860 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 861 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 862
863 - def listen(self, url, ssl_domain=None):
864 """ 865 Initiates a server socket, accepting incoming AMQP connections 866 on the interface and port specified. 867 """ 868 url = Url(url) 869 acceptor = self.acceptor(url.host, url.port) 870 ssl_config = ssl_domain 871 if not ssl_config and url.scheme == 'amqps': 872 # use container's default server domain 873 if self.ssl: 874 ssl_config = self.ssl.server 875 else: 876 raise SSLUnavailable("amqps: SSL libraries not found") 877 if ssl_config: 878 acceptor.set_ssl_domain(ssl_config) 879 return acceptor
880
881 - def do_work(self, timeout=None):
882 if timeout: 883 self.timeout = timeout 884 return self.process()
885