1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 -class Task(Wrapper):
42
43 @staticmethod
45 if impl is None:
46 return None
47 else:
48 return Task(impl)
49
52
55
57 pn_task_cancel(self._impl)
58
60
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
68 pn_acceptor_close(self._impl)
69
71
72 @staticmethod
74 if impl is None:
75 return None
76 else:
77 record = pn_reactor_attachments(impl)
78 attrs = pn_void2py(pn_record_get(record, PYCTX))
79 if attrs and 'subclass' in attrs:
80 return attrs['subclass'](impl=impl)
81 else:
82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
88
91
93 self.errors.append(info)
94 self.yield_()
95
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
100 impl = _chandler(handler, self.on_error)
101 pn_reactor_set_global_handler(self._impl, impl)
102 pn_decref(impl)
103
104 global_handler = property(_get_global, _set_global)
105
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111
112 timeout = property(_get_timeout, _set_timeout)
113
115 pn_reactor_yield(self._impl)
116
118 return pn_reactor_mark(self._impl)
119
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
124 impl = _chandler(handler, self.on_error)
125 pn_reactor_set_handler(self._impl, impl)
126 pn_decref(impl)
127
128 handler = property(_get_handler, _set_handler)
129
138
140 n = pn_reactor_wakeup(self._impl)
141 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
142
144 pn_reactor_start(self._impl)
145
146 @property
148 return pn_reactor_quiesced(self._impl)
149
151 if self.errors:
152 for exc, value, tb in self.errors[:-1]:
153 traceback.print_exception(exc, value, tb)
154 exc, value, tb = self.errors[-1]
155 _compat.raise_(exc, value, tb)
156
158 result = pn_reactor_process(self._impl)
159 self._check_errors()
160 return result
161
163 pn_reactor_stop(self._impl)
164 self._check_errors()
165
167 impl = _chandler(task, self.on_error)
168 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
169 pn_decref(impl)
170 return task
171
172 - def acceptor(self, host, port, handler=None):
173 impl = _chandler(handler, self.on_error)
174 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
175 pn_decref(impl)
176 if aimpl:
177 return Acceptor(aimpl)
178 else:
179 raise IOError("%s (%s:%s)" % pn_error_text(pn_reactor_error(self._impl)), host, port)
180
182 """Deprecated: use connection_to_host() instead
183 """
184 impl = _chandler(handler, self.on_error)
185 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
186 if impl: pn_decref(impl)
187 return result
188
190 """Create an outgoing Connection that will be managed by the reactor.
191 The reator's pn_iohandler will create a socket connection to the host
192 once the connection is opened.
193 """
194 conn = self.connection(handler)
195 self.set_connection_host(conn, host, port)
196 return conn
197
199 """Change the address used by the connection. The address is
200 used by the reactor's iohandler to create an outgoing socket
201 connection. This must be set prior to opening the connection.
202 """
203 pn_reactor_set_connection_host(self._impl,
204 connection._impl,
205 unicode2utf8(str(host)),
206 unicode2utf8(str(port)))
207
209 """This may be used to retrieve the remote peer address.
210 @return: string containing the address in URL format or None if no
211 address is available. Use the proton.Url class to create a Url object
212 from the returned value.
213 """
214 _url = pn_reactor_get_connection_address(self._impl, connection._impl)
215 return utf82unicode(_url)
216
218 impl = _chandler(handler, self.on_error)
219 result = Selectable.wrap(pn_reactor_selectable(self._impl))
220 if impl:
221 record = pn_selectable_attachments(result._impl)
222 pn_record_set_handler(record, impl)
223 pn_decref(impl)
224 return result
225
227 pn_reactor_update(self._impl, sel._impl)
228
230 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
231
232 from proton import wrappers as _wrappers
233 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
234 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
238 """
239 Can be added to a reactor to allow events to be triggered by an
240 external thread but handled on the event thread associated with
241 the reactor. An instance of this class can be passed to the
242 Reactor.selectable() method of the reactor in order to activate
243 it. The close() method should be called when it is no longer
244 needed, to allow the event loop to end if needed.
245 """
247 self.queue = Queue.Queue()
248 self.pipe = os.pipe()
249 self._closed = False
250
252 """
253 Request that the given event be dispatched on the event thread
254 of the reactor to which this EventInjector was added.
255 """
256 self.queue.put(event)
257 os.write(self.pipe[1], _compat.str2bin("!"))
258
260 """
261 Request that this EventInjector be closed. Existing events
262 will be dispctahed on the reactors event dispactch thread,
263 then this will be removed from the set of interest.
264 """
265 self._closed = True
266 os.write(self.pipe[1], _compat.str2bin("!"))
267
270
276
286
289 """
290 Application defined event, which can optionally be associated with
291 an engine object and or an arbitrary subject
292 """
293 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
306
310
312 """
313 Class to track state of an AMQP 1.0 transaction.
314 """
315 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
316 self.txn_ctrl = txn_ctrl
317 self.handler = handler
318 self.id = None
319 self._declare = None
320 self._discharge = None
321 self.failed = False
322 self._pending = []
323 self.settle_before_discharge = settle_before_discharge
324 self.declare()
325
328
331
333 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
334
338
343
344 - def send(self, sender, msg, tag=None):
349
356
357 - def update(self, delivery, state=None):
361
367
370
393
395 """
396 Abstract interface for link configuration options
397 """
399 """
400 Subclasses will implement any configuration logic in this
401 method
402 """
403 pass
404 - def test(self, link):
405 """
406 Subclasses can override this to selectively apply an option
407 e.g. based on some link criteria
408 """
409 return True
410
414
419
421 - def apply(self, sender): pass
423
425 - def apply(self, receiver): pass
427
442
445 self.filter_set = filter_set
446
447 - def apply(self, receiver):
449
451 """
452 Configures a link with a message selector filter
453 """
454 - def __init__(self, value, name='selector'):
456
458 - def apply(self, receiver):
461
462 -class Move(ReceiverOption):
463 - def apply(self, receiver):
465
466 -class Copy(ReceiverOption):
467 - def apply(self, receiver):
469
477
482
489
492 self._default_session = None
493
495 if not self._default_session:
496 self._default_session = _create_session(connection)
497 self._default_session.context = self
498 return self._default_session
499
503
505 """
506 Internal handler that triggers the necessary socket connect for an
507 opened connection.
508 """
511
513 if not self._override(event):
514 event.dispatch(self.base)
515
517 conn = event.connection
518 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
519
521 """
522 Internal handler that triggers the necessary socket connect for an
523 opened connection.
524 """
538
539 - def _connect(self, connection, reactor):
570
573
579
582
597
600
603
605 """
606 A reconnect strategy involving an increasing delay between
607 retries, up to a maximum or 10 seconds.
608 """
611
614
622
625 self.values = [Url(v) for v in values]
626 self.i = iter(self.values)
627
630
632 try:
633 return next(self.i)
634 except StopIteration:
635 self.i = iter(self.values)
636 return next(self.i)
637
650
653 """A representation of the AMQP concept of a 'container', which
654 lossely speaking is something that establishes links to or from
655 another container, over which messages are transfered. This is
656 an extension to the Reactor class that adds convenience methods
657 for creating connections and sender- or receiver- links.
658 """
659 - def __init__(self, *handlers, **kwargs):
675
676 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
677 """
678 Initiates the establishment of an AMQP connection. Returns an
679 instance of proton.Connection.
680
681 @param url: URL string of process to connect to
682
683 @param urls: list of URL strings of process to try to connect to
684
685 Only one of url or urls should be specified.
686
687 @param reconnect: A value of False will prevent the library
688 form automatically trying to reconnect if the underlying
689 socket is disconnected before the connection has been closed.
690
691 @param heartbeat: A value in milliseconds indicating the
692 desired frequency of heartbeats used to test the underlying
693 socket is alive.
694
695 @param ssl_domain: SSL configuration in the form of an
696 instance of proton.SSLdomain.
697
698 @param handler: a connection scoped handler that will be
699 called to process any events in the scope of this connection
700 or its child links
701
702 @param kwargs: sasl_enabled, which determines whether a sasl layer is
703 used for the connection; allowed_mechs an optional list of SASL
704 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag
705 indicating whether insecure mechanisms, such as PLAIN over a
706 non-encrypted socket, are allowed; 'virtual_host' the hostname to set
707 in the Open performative used by peer to determine the correct
708 back-end service for the client. If 'virtual_host' is not supplied the
709 host field from the URL is used instead."
710
711 """
712 conn = self.connection(handler)
713 conn.container = self.container_id or str(generate_uuid())
714 conn.offered_capabilities = kwargs.get('offered_capabilities')
715 conn.desired_capabilities = kwargs.get('desired_capabilities')
716 conn.properties = kwargs.get('properties')
717
718 connector = Connector(conn)
719 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
720 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
721 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
722 connector.user = kwargs.get('user', self.user)
723 connector.password = kwargs.get('password', self.password)
724 connector.virtual_host = kwargs.get('virtual_host')
725 if connector.virtual_host:
726
727 conn.hostname = connector.virtual_host
728 connector.ssl_sni = kwargs.get('sni')
729
730 conn._overrides = connector
731 if url: connector.address = Urls([url])
732 elif urls: connector.address = Urls(urls)
733 elif address: connector.address = address
734 else: raise ValueError("One of url, urls or address required")
735 if heartbeat:
736 connector.heartbeat = heartbeat
737 if reconnect:
738 connector.reconnect = reconnect
739 elif reconnect is None:
740 connector.reconnect = Backoff()
741
742
743 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
744 conn._session_policy = SessionPerConnection()
745 conn.open()
746 return conn
747
748 - def _get_id(self, container, remote, local):
749 if local and remote: "%s-%s-%s" % (container, remote, local)
750 elif local: return "%s-%s" % (container, local)
751 elif remote: return "%s-%s" % (container, remote)
752 else: return "%s-%s" % (container, str(generate_uuid()))
753
766
767 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
768 """
769 Initiates the establishment of a link over which messages can
770 be sent. Returns an instance of proton.Sender.
771
772 There are two patterns of use. (1) A connection can be passed
773 as the first argument, in which case the link is established
774 on that connection. In this case the target address can be
775 specified as the second argument (or as a keyword
776 argument). The source address can also be specified if
777 desired. (2) Alternatively a URL can be passed as the first
778 argument. In this case a new connection will be establised on
779 which the link will be attached. If a path is specified and
780 the target is not, then the path of the URL is used as the
781 target address.
782
783 The name of the link may be specified if desired, otherwise a
784 unique name will be generated.
785
786 Various LinkOptions can be specified to further control the
787 attachment.
788 """
789 if isinstance(context, _compat.STRING_TYPES):
790 context = Url(context)
791 if isinstance(context, Url) and not target:
792 target = context.path
793 session = self._get_session(context)
794 snd = session.sender(name or self._get_id(session.connection.container, target, source))
795 if source:
796 snd.source.address = source
797 if target:
798 snd.target.address = target
799 if handler != None:
800 snd.handler = handler
801 if tags:
802 snd.tag_generator = tags
803 _apply_link_options(options, snd)
804 snd.open()
805 return snd
806
807 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
808 """
809 Initiates the establishment of a link over which messages can
810 be received (aka a subscription). Returns an instance of
811 proton.Receiver.
812
813 There are two patterns of use. (1) A connection can be passed
814 as the first argument, in which case the link is established
815 on that connection. In this case the source address can be
816 specified as the second argument (or as a keyword
817 argument). The target address can also be specified if
818 desired. (2) Alternatively a URL can be passed as the first
819 argument. In this case a new connection will be establised on
820 which the link will be attached. If a path is specified and
821 the source is not, then the path of the URL is used as the
822 target address.
823
824 The name of the link may be specified if desired, otherwise a
825 unique name will be generated.
826
827 Various LinkOptions can be specified to further control the
828 attachment.
829 """
830 if isinstance(context, _compat.STRING_TYPES):
831 context = Url(context)
832 if isinstance(context, Url) and not source:
833 source = context.path
834 session = self._get_session(context)
835 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
836 if source:
837 rcv.source.address = source
838 if dynamic:
839 rcv.source.dynamic = True
840 if target:
841 rcv.target.address = target
842 if handler != None:
843 rcv.handler = handler
844 _apply_link_options(options, rcv)
845 rcv.open()
846 return rcv
847
849 if not _get_attr(context, '_txn_ctrl'):
850 class InternalTransactionHandler(OutgoingMessageHandler):
851 def __init__(self):
852 super(InternalTransactionHandler, self).__init__(auto_settle=True)
853
854 def on_settled(self, event):
855 if hasattr(event.delivery, "transaction"):
856 event.transaction = event.delivery.transaction
857 event.delivery.transaction.handle_outcome(event)
858 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
859 context._txn_ctrl.target.type = Terminus.COORDINATOR
860 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
861 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
862
863 - def listen(self, url, ssl_domain=None):
864 """
865 Initiates a server socket, accepting incoming AMQP connections
866 on the interface and port specified.
867 """
868 url = Url(url)
869 acceptor = self.acceptor(url.host, url.port)
870 ssl_config = ssl_domain
871 if not ssl_config and url.scheme == 'amqps':
872
873 if self.ssl:
874 ssl_config = self.ssl.server
875 else:
876 raise SSLUnavailable("amqps: SSL libraries not found")
877 if ssl_config:
878 acceptor.set_ssl_domain(ssl_config)
879 return acceptor
880
885