1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 The proton module defines a suite of APIs that implement the AMQP 1.0
22 protocol.
23
24 The proton APIs consist of the following classes:
25
26 - L{Messenger} -- A messaging endpoint.
27 - L{Message} -- A class for creating and/or accessing AMQP message content.
28 - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
29 data.
30
31 """
32 from __future__ import absolute_import
33
34 from cproton import *
35 from .wrapper import Wrapper
36 from proton import _compat
37
38 import weakref, socket, sys, threading
39
40 try:
41 import uuid
45
46 except ImportError:
47 """
48 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
49 """
50 import struct
53 - def __init__(self, hex=None, bytes=None):
54 if [hex, bytes].count(None) != 1:
55 raise TypeError("need one of hex or bytes")
56 if bytes is not None:
57 self.bytes = bytes
58 elif hex is not None:
59 fields=hex.split("-")
60 fields[4:5] = [fields[4][:4], fields[4][4:]]
61 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
62
64 if isinstance(other, uuid.UUID):
65 return cmp(self.bytes, other.bytes)
66 else:
67 return -1
68
70 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
71
73 return "UUID(%r)" % str(self)
74
77
78 import os, random, time
79 rand = random.Random()
80 rand.seed((os.getpid(), time.time(), socket.gethostname()))
82 data = [rand.randint(0, 255) for i in xrange(16)]
83
84
85 data[6] &= 0x0F
86 data[6] |= 0x40
87
88
89 data[8] &= 0x3F
90 data[8] |= 0x80
91 return "".join(map(chr, data))
92
94 return uuid.UUID(bytes=random_uuid())
95
98
99
100
101
102 try:
103 bytes()
104 except NameError:
105 bytes = str
106 try:
107 long()
108 except NameError:
109 long = int
110 try:
111 unicode()
112 except NameError:
113 unicode = str
114
115
116 VERSION_MAJOR = PN_VERSION_MAJOR
117 VERSION_MINOR = PN_VERSION_MINOR
118 VERSION_POINT = PN_VERSION_POINT
119 VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT)
120 API_LANGUAGE = "C"
121 IMPLEMENTATION_LANGUAGE = "C"
130
132 """
133 The root of the proton exception hierarchy. All proton exception
134 classes derive from this exception.
135 """
136 pass
137
139 """
140 A timeout exception indicates that a blocking operation has timed
141 out.
142 """
143 pass
144
146 """
147 An interrupt exception indicaes that a blocking operation was interrupted.
148 """
149 pass
150
152 """
153 The root of the messenger exception hierarchy. All exceptions
154 generated by the messenger class derive from this exception.
155 """
156 pass
157
159 """
160 The MessageException class is the root of the message exception
161 hierarhcy. All exceptions generated by the Message class derive from
162 this exception.
163 """
164 pass
165
166 EXCEPTIONS = {
167 PN_TIMEOUT: Timeout,
168 PN_INTR: Interrupt
169 }
170
171 PENDING = Constant("PENDING")
172 ACCEPTED = Constant("ACCEPTED")
173 REJECTED = Constant("REJECTED")
174 RELEASED = Constant("RELEASED")
175 MODIFIED = Constant("MODIFIED")
176 ABORTED = Constant("ABORTED")
177 SETTLED = Constant("SETTLED")
178
179 STATUSES = {
180 PN_STATUS_ABORTED: ABORTED,
181 PN_STATUS_ACCEPTED: ACCEPTED,
182 PN_STATUS_REJECTED: REJECTED,
183 PN_STATUS_RELEASED: RELEASED,
184 PN_STATUS_MODIFIED: MODIFIED,
185 PN_STATUS_PENDING: PENDING,
186 PN_STATUS_SETTLED: SETTLED,
187 PN_STATUS_UNKNOWN: None
188 }
189
190 AUTOMATIC = Constant("AUTOMATIC")
191 MANUAL = Constant("MANUAL")
194 """
195 The L{Messenger} class defines a high level interface for sending
196 and receiving L{Messages<Message>}. Every L{Messenger} contains a
197 single logical queue of incoming messages and a single logical queue
198 of outgoing messages. These messages in these queues may be destined
199 for, or originate from, a variety of addresses.
200
201 The messenger interface is single-threaded. All methods
202 except one (L{interrupt}) are intended to be used from within
203 the messenger thread.
204
205
206 Address Syntax
207 ==============
208
209 An address has the following form::
210
211 [ amqp[s]:// ] [user[:password]@] domain [/[name]]
212
213 Where domain can be one of::
214
215 host | host:port | ip | ip:port | name
216
217 The following are valid examples of addresses:
218
219 - example.org
220 - example.org:1234
221 - amqp://example.org
222 - amqps://example.org
223 - example.org/incoming
224 - amqps://example.org/outgoing
225 - amqps://fred:trustno1@example.org
226 - 127.0.0.1:1234
227 - amqps://127.0.0.1:1234
228
229 Sending & Receiving Messages
230 ============================
231
232 The L{Messenger} class works in conjuction with the L{Message} class. The
233 L{Message} class is a mutable holder of message content.
234
235 The L{put} method copies its L{Message} to the outgoing queue, and may
236 send queued messages if it can do so without blocking. The L{send}
237 method blocks until it has sent the requested number of messages,
238 or until a timeout interrupts the attempt.
239
240
241 >>> message = Message()
242 >>> for i in range(3):
243 ... message.address = "amqp://host/queue"
244 ... message.subject = "Hello World %i" % i
245 ... messenger.put(message)
246 >>> messenger.send()
247
248 Similarly, the L{recv} method receives messages into the incoming
249 queue, and may block as it attempts to receive the requested number
250 of messages, or until timeout is reached. It may receive fewer
251 than the requested number. The L{get} method pops the
252 eldest L{Message} off the incoming queue and copies it into the L{Message}
253 object that you supply. It will not block.
254
255
256 >>> message = Message()
257 >>> messenger.recv(10):
258 >>> while messenger.incoming > 0:
259 ... messenger.get(message)
260 ... print message.subject
261 Hello World 0
262 Hello World 1
263 Hello World 2
264
265 The blocking flag allows you to turn off blocking behavior entirely,
266 in which case L{send} and L{recv} will do whatever they can without
267 blocking, and then return. You can then look at the number
268 of incoming and outgoing messages to see how much outstanding work
269 still remains.
270 """
271
273 """
274 Construct a new L{Messenger} with the given name. The name has
275 global scope. If a NULL name is supplied, a UUID based name will
276 be chosen.
277
278 @type name: string
279 @param name: the name of the messenger or None
280
281 """
282 self._mng = pn_messenger(name)
283 self._selectables = {}
284
286 """
287 Destroy the L{Messenger}. This will close all connections that
288 are managed by the L{Messenger}. Call the L{stop} method before
289 destroying the L{Messenger}.
290 """
291 if hasattr(self, "_mng"):
292 pn_messenger_free(self._mng)
293 del self._mng
294
296 if err < 0:
297 if (err == PN_INPROGRESS):
298 return
299 exc = EXCEPTIONS.get(err, MessengerException)
300 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng))))
301 else:
302 return err
303
304 @property
306 """
307 The name of the L{Messenger}.
308 """
309 return pn_messenger_name(self._mng)
310
312 return pn_messenger_get_certificate(self._mng)
313
315 self._check(pn_messenger_set_certificate(self._mng, value))
316
317 certificate = property(_get_certificate, _set_certificate,
318 doc="""
319 Path to a certificate file for the L{Messenger}. This certificate is
320 used when the L{Messenger} accepts or establishes SSL/TLS connections.
321 This property must be specified for the L{Messenger} to accept
322 incoming SSL/TLS connections and to establish client authenticated
323 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
324 connections do not require this property.
325 """)
326
328 return pn_messenger_get_private_key(self._mng)
329
331 self._check(pn_messenger_set_private_key(self._mng, value))
332
333 private_key = property(_get_private_key, _set_private_key,
334 doc="""
335 Path to a private key file for the L{Messenger's<Messenger>}
336 certificate. This property must be specified for the L{Messenger} to
337 accept incoming SSL/TLS connections and to establish client
338 authenticated outgoing SSL/TLS connection. Non client authenticated
339 SSL/TLS connections do not require this property.
340 """)
341
343 return pn_messenger_get_password(self._mng)
344
346 self._check(pn_messenger_set_password(self._mng, value))
347
348 password = property(_get_password, _set_password,
349 doc="""
350 This property contains the password for the L{Messenger.private_key}
351 file, or None if the file is not encrypted.
352 """)
353
355 return pn_messenger_get_trusted_certificates(self._mng)
356
358 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
359
360 trusted_certificates = property(_get_trusted_certificates,
361 _set_trusted_certificates,
362 doc="""
363 A path to a database of trusted certificates for use in verifying the
364 peer on an SSL/TLS connection. If this property is None, then the peer
365 will not be verified.
366 """)
367
369 t = pn_messenger_get_timeout(self._mng)
370 if t == -1:
371 return None
372 else:
373 return millis2secs(t)
374
376 if value is None:
377 t = -1
378 else:
379 t = secs2millis(value)
380 self._check(pn_messenger_set_timeout(self._mng, t))
381
382 timeout = property(_get_timeout, _set_timeout,
383 doc="""
384 The timeout property contains the default timeout for blocking
385 operations performed by the L{Messenger}.
386 """)
387
389 return pn_messenger_is_blocking(self._mng)
390
392 self._check(pn_messenger_set_blocking(self._mng, b))
393
394 blocking = property(_is_blocking, _set_blocking,
395 doc="""
396 Enable or disable blocking behavior during L{Message} sending
397 and receiving. This affects every blocking call, with the
398 exception of L{work}. Currently, the affected calls are
399 L{send}, L{recv}, and L{stop}.
400 """)
401
403 return pn_messenger_is_passive(self._mng)
404
406 self._check(pn_messenger_set_passive(self._mng, b))
407
408 passive = property(_is_passive, _set_passive,
409 doc="""
410 When passive is set to true, Messenger will not attempt to perform I/O
411 internally. In this mode it is necessary to use the selectables API to
412 drive any I/O needed to perform requested actions. In this mode
413 Messenger will never block.
414 """)
415
417 return pn_messenger_get_incoming_window(self._mng)
418
420 self._check(pn_messenger_set_incoming_window(self._mng, window))
421
422 incoming_window = property(_get_incoming_window, _set_incoming_window,
423 doc="""
424 The incoming tracking window for the messenger. The messenger will
425 track the remote status of this many incoming deliveries after they
426 have been accepted or rejected. Defaults to zero.
427
428 L{Messages<Message>} enter this window only when you take them into your application
429 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
430 without explicitly accepting or rejecting the oldest message, then the
431 message that passes beyond the edge of the incoming window will be assigned
432 the default disposition of its link.
433 """)
434
436 return pn_messenger_get_outgoing_window(self._mng)
437
439 self._check(pn_messenger_set_outgoing_window(self._mng, window))
440
441 outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
442 doc="""
443 The outgoing tracking window for the messenger. The messenger will
444 track the remote status of this many outgoing deliveries after calling
445 send. Defaults to zero.
446
447 A L{Message} enters this window when you call the put() method with the
448 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
449 times, status information will no longer be available for the
450 first message.
451 """)
452
454 """
455 Currently a no-op placeholder.
456 For future compatibility, do not L{send} or L{recv} messages
457 before starting the L{Messenger}.
458 """
459 self._check(pn_messenger_start(self._mng))
460
462 """
463 Transitions the L{Messenger} to an inactive state. An inactive
464 L{Messenger} will not send or receive messages from its internal
465 queues. A L{Messenger} should be stopped before being discarded to
466 ensure a clean shutdown handshake occurs on any internally managed
467 connections.
468 """
469 self._check(pn_messenger_stop(self._mng))
470
471 @property
473 """
474 Returns true iff a L{Messenger} is in the stopped state.
475 This function does not block.
476 """
477 return pn_messenger_stopped(self._mng)
478
480 """
481 Subscribes the L{Messenger} to messages originating from the
482 specified source. The source is an address as specified in the
483 L{Messenger} introduction with the following addition. If the
484 domain portion of the address begins with the '~' character, the
485 L{Messenger} will interpret the domain as host/port, bind to it,
486 and listen for incoming messages. For example "~0.0.0.0",
487 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
488 local interface and listen for incoming messages with the last
489 variant only permitting incoming SSL connections.
490
491 @type source: string
492 @param source: the source of messages to subscribe to
493 """
494 sub_impl = pn_messenger_subscribe(self._mng, source)
495 if not sub_impl:
496 self._check(pn_error_code(pn_messenger_error(self._mng)))
497 raise MessengerException("Cannot subscribe to %s"%source)
498 return Subscription(sub_impl)
499
500 - def put(self, message):
501 """
502 Places the content contained in the message onto the outgoing
503 queue of the L{Messenger}. This method will never block, however
504 it will send any unblocked L{Messages<Message>} in the outgoing
505 queue immediately and leave any blocked L{Messages<Message>}
506 remaining in the outgoing queue. The L{send} call may be used to
507 block until the outgoing queue is empty. The L{outgoing} property
508 may be used to check the depth of the outgoing queue.
509
510 When the content in a given L{Message} object is copied to the outgoing
511 message queue, you may then modify or discard the L{Message} object
512 without having any impact on the content in the outgoing queue.
513
514 This method returns an outgoing tracker for the L{Message}. The tracker
515 can be used to determine the delivery status of the L{Message}.
516
517 @type message: Message
518 @param message: the message to place in the outgoing queue
519 @return: a tracker
520 """
521 message._pre_encode()
522 self._check(pn_messenger_put(self._mng, message._msg))
523 return pn_messenger_outgoing_tracker(self._mng)
524
526 """
527 Gets the last known remote state of the delivery associated with
528 the given tracker.
529
530 @type tracker: tracker
531 @param tracker: the tracker whose status is to be retrieved
532
533 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED
534 """
535 disp = pn_messenger_status(self._mng, tracker);
536 return STATUSES.get(disp, disp)
537
539 """
540 Checks if the delivery associated with the given tracker is still
541 waiting to be sent.
542
543 @type tracker: tracker
544 @param tracker: the tracker whose status is to be retrieved
545
546 @return: true if delivery is still buffered
547 """
548 return pn_messenger_buffered(self._mng, tracker);
549
550 - def settle(self, tracker=None):
551 """
552 Frees a L{Messenger} from tracking the status associated with a given
553 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
554 to the most recent will be settled.
555 """
556 if tracker is None:
557 tracker = pn_messenger_outgoing_tracker(self._mng)
558 flags = PN_CUMULATIVE
559 else:
560 flags = 0
561 self._check(pn_messenger_settle(self._mng, tracker, flags))
562
563 - def send(self, n=-1):
564 """
565 This call will block until the indicated number of L{messages<Message>}
566 have been sent, or until the operation times out. If n is -1 this call will
567 block until all outgoing L{messages<Message>} have been sent. If n is 0 then
568 this call will send whatever it can without blocking.
569 """
570 self._check(pn_messenger_send(self._mng, n))
571
572 - def recv(self, n=None):
573 """
574 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
575 for I{n} is supplied, this call will receive as many L{messages<Message>} as it
576 can buffer internally. If the L{Messenger} is in blocking mode, this
577 call will block until at least one L{Message} is available in the
578 incoming queue.
579 """
580 if n is None:
581 n = -1
582 self._check(pn_messenger_recv(self._mng, n))
583
584 - def work(self, timeout=None):
585 """
586 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
587 This will block for the indicated timeout.
588 This method may also do I/O work other than sending and receiving
589 L{messages<Message>}. For example, closing connections after messenger.L{stop}()
590 has been called.
591 """
592 if timeout is None:
593 t = -1
594 else:
595 t = secs2millis(timeout)
596 err = pn_messenger_work(self._mng, t)
597 if (err == PN_TIMEOUT):
598 return False
599 else:
600 self._check(err)
601 return True
602
603 @property
605 return pn_messenger_receiving(self._mng)
606
608 """
609 The L{Messenger} interface is single-threaded.
610 This is the only L{Messenger} function intended to be called
611 from outside of the L{Messenger} thread.
612 Call this from a non-messenger thread to interrupt
613 a L{Messenger} that is blocking.
614 This will cause any in-progress blocking call to throw
615 the L{Interrupt} exception. If there is no currently blocking
616 call, then the next blocking call will be affected, even if it
617 is within the same thread that interrupt was called from.
618 """
619 self._check(pn_messenger_interrupt(self._mng))
620
621 - def get(self, message=None):
622 """
623 Moves the message from the head of the incoming message queue into
624 the supplied message object. Any content in the message will be
625 overwritten.
626
627 A tracker for the incoming L{Message} is returned. The tracker can
628 later be used to communicate your acceptance or rejection of the
629 L{Message}.
630
631 If None is passed in for the L{Message} object, the L{Message}
632 popped from the head of the queue is discarded.
633
634 @type message: Message
635 @param message: the destination message object
636 @return: a tracker
637 """
638 if message is None:
639 impl = None
640 else:
641 impl = message._msg
642 self._check(pn_messenger_get(self._mng, impl))
643 if message is not None:
644 message._post_decode()
645 return pn_messenger_incoming_tracker(self._mng)
646
647 - def accept(self, tracker=None):
648 """
649 Signal the sender that you have acted on the L{Message}
650 pointed to by the tracker. If no tracker is supplied,
651 then all messages that have been returned by the L{get}
652 method are accepted, except those that have already been
653 auto-settled by passing beyond your incoming window size.
654
655 @type tracker: tracker
656 @param tracker: a tracker as returned by get
657 """
658 if tracker is None:
659 tracker = pn_messenger_incoming_tracker(self._mng)
660 flags = PN_CUMULATIVE
661 else:
662 flags = 0
663 self._check(pn_messenger_accept(self._mng, tracker, flags))
664
665 - def reject(self, tracker=None):
666 """
667 Rejects the L{Message} indicated by the tracker. If no tracker
668 is supplied, all messages that have been returned by the L{get}
669 method are rejected, except those that have already been auto-settled
670 by passing beyond your outgoing window size.
671
672 @type tracker: tracker
673 @param tracker: a tracker as returned by get
674 """
675 if tracker is None:
676 tracker = pn_messenger_incoming_tracker(self._mng)
677 flags = PN_CUMULATIVE
678 else:
679 flags = 0
680 self._check(pn_messenger_reject(self._mng, tracker, flags))
681
682 @property
684 """
685 The outgoing queue depth.
686 """
687 return pn_messenger_outgoing(self._mng)
688
689 @property
691 """
692 The incoming queue depth.
693 """
694 return pn_messenger_incoming(self._mng)
695
696 - def route(self, pattern, address):
697 """
698 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
699
700 The route procedure may be used to influence how a L{Messenger} will
701 internally treat a given address or class of addresses. Every call
702 to the route procedure will result in L{Messenger} appending a routing
703 rule to its internal routing table.
704
705 Whenever a L{Message} is presented to a L{Messenger} for delivery, it
706 will match the address of this message against the set of routing
707 rules in order. The first rule to match will be triggered, and
708 instead of routing based on the address presented in the message,
709 the L{Messenger} will route based on the address supplied in the rule.
710
711 The pattern matching syntax supports two types of matches, a '%'
712 will match any character except a '/', and a '*' will match any
713 character including a '/'.
714
715 A routing address is specified as a normal AMQP address, however it
716 may additionally use substitution variables from the pattern match
717 that triggered the rule.
718
719 Any message sent to "foo" will be routed to "amqp://foo.com":
720
721 >>> messenger.route("foo", "amqp://foo.com");
722
723 Any message sent to "foobar" will be routed to
724 "amqp://foo.com/bar":
725
726 >>> messenger.route("foobar", "amqp://foo.com/bar");
727
728 Any message sent to bar/<path> will be routed to the corresponding
729 path within the amqp://bar.com domain:
730
731 >>> messenger.route("bar/*", "amqp://bar.com/$1");
732
733 Route all L{messages<Message>} over TLS:
734
735 >>> messenger.route("amqp:*", "amqps:$1")
736
737 Supply credentials for foo.com:
738
739 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
740
741 Supply credentials for all domains:
742
743 >>> messenger.route("amqp://*", "amqp://user:password@$1");
744
745 Route all addresses through a single proxy while preserving the
746 original destination:
747
748 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
749
750 Route any address through a single broker:
751
752 >>> messenger.route("*", "amqp://user:password@broker/$1");
753 """
754 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
755
756 - def rewrite(self, pattern, address):
757 """
758 Similar to route(), except that the destination of
759 the L{Message} is determined before the message address is rewritten.
760
761 The outgoing address is only rewritten after routing has been
762 finalized. If a message has an outgoing address of
763 "amqp://0.0.0.0:5678", and a rewriting rule that changes its
764 outgoing address to "foo", it will still arrive at the peer that
765 is listening on "amqp://0.0.0.0:5678", but when it arrives there,
766 the receiver will see its outgoing address as "foo".
767
768 The default rewrite rule removes username and password from addresses
769 before they are transmitted.
770 """
771 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
772
774 return Selectable.wrap(pn_messenger_selectable(self._mng))
775
776 @property
778 tstamp = pn_messenger_deadline(self._mng)
779 if tstamp:
780 return millis2secs(tstamp)
781 else:
782 return None
783
785 """The L{Message} class is a mutable holder of message content.
786
787 @ivar instructions: delivery instructions for the message
788 @type instructions: dict
789 @ivar annotations: infrastructure defined message annotations
790 @type annotations: dict
791 @ivar properties: application defined message properties
792 @type properties: dict
793 @ivar body: message body
794 @type body: bytes | unicode | dict | list | int | long | float | UUID
795 """
796
797 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
798
799 - def __init__(self, body=None, **kwargs):
800 """
801 @param kwargs: Message property name/value pairs to initialise the Message
802 """
803 self._msg = pn_message()
804 self._id = Data(pn_message_id(self._msg))
805 self._correlation_id = Data(pn_message_correlation_id(self._msg))
806 self.instructions = None
807 self.annotations = None
808 self.properties = None
809 self.body = body
810 for k,v in _compat.iteritems(kwargs):
811 getattr(self, k)
812 setattr(self, k, v)
813
815 if hasattr(self, "_msg"):
816 pn_message_free(self._msg)
817 del self._msg
818
820 if err < 0:
821 exc = EXCEPTIONS.get(err, MessageException)
822 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
823 else:
824 return err
825
844
845 - def _post_decode(self):
846 inst = Data(pn_message_instructions(self._msg))
847 ann = Data(pn_message_annotations(self._msg))
848 props = Data(pn_message_properties(self._msg))
849 body = Data(pn_message_body(self._msg))
850
851 if inst.next():
852 self.instructions = inst.get_object()
853 else:
854 self.instructions = None
855 if ann.next():
856 self.annotations = ann.get_object()
857 else:
858 self.annotations = None
859 if props.next():
860 self.properties = props.get_object()
861 else:
862 self.properties = None
863 if body.next():
864 self.body = body.get_object()
865 else:
866 self.body = None
867
869 """
870 Clears the contents of the L{Message}. All fields will be reset to
871 their default values.
872 """
873 pn_message_clear(self._msg)
874 self.instructions = None
875 self.annotations = None
876 self.properties = None
877 self.body = None
878
880 return pn_message_is_inferred(self._msg)
881
883 self._check(pn_message_set_inferred(self._msg, bool(value)))
884
885 inferred = property(_is_inferred, _set_inferred, doc="""
886 The inferred flag for a message indicates how the message content
887 is encoded into AMQP sections. If inferred is true then binary and
888 list values in the body of the message will be encoded as AMQP DATA
889 and AMQP SEQUENCE sections, respectively. If inferred is false,
890 then all values in the body of the message will be encoded as AMQP
891 VALUE sections regardless of their type.
892 """)
893
895 return pn_message_is_durable(self._msg)
896
898 self._check(pn_message_set_durable(self._msg, bool(value)))
899
900 durable = property(_is_durable, _set_durable,
901 doc="""
902 The durable property indicates that the message should be held durably
903 by any intermediaries taking responsibility for the message.
904 """)
905
907 return pn_message_get_priority(self._msg)
908
910 self._check(pn_message_set_priority(self._msg, value))
911
912 priority = property(_get_priority, _set_priority,
913 doc="""
914 The priority of the message.
915 """)
916
918 return millis2secs(pn_message_get_ttl(self._msg))
919
921 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
922
923 ttl = property(_get_ttl, _set_ttl,
924 doc="""
925 The time to live of the message measured in seconds. Expired messages
926 may be dropped.
927 """)
928
930 return pn_message_is_first_acquirer(self._msg)
931
933 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
934
935 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
936 doc="""
937 True iff the recipient is the first to acquire the message.
938 """)
939
941 return pn_message_get_delivery_count(self._msg)
942
944 self._check(pn_message_set_delivery_count(self._msg, value))
945
946 delivery_count = property(_get_delivery_count, _set_delivery_count,
947 doc="""
948 The number of delivery attempts made for this message.
949 """)
950
951
959 id = property(_get_id, _set_id,
960 doc="""
961 The id of the message.
962 """)
963
965 return pn_message_get_user_id(self._msg)
966
968 self._check(pn_message_set_user_id(self._msg, value))
969
970 user_id = property(_get_user_id, _set_user_id,
971 doc="""
972 The user id of the message creator.
973 """)
974
976 return utf82unicode(pn_message_get_address(self._msg))
977
979 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
980
981 address = property(_get_address, _set_address,
982 doc="""
983 The address of the message.
984 """)
985
987 return utf82unicode(pn_message_get_subject(self._msg))
988
990 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
991
992 subject = property(_get_subject, _set_subject,
993 doc="""
994 The subject of the message.
995 """)
996
998 return utf82unicode(pn_message_get_reply_to(self._msg))
999
1001 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
1002
1003 reply_to = property(_get_reply_to, _set_reply_to,
1004 doc="""
1005 The reply-to address for the message.
1006 """)
1007
1015
1016 correlation_id = property(_get_correlation_id, _set_correlation_id,
1017 doc="""
1018 The correlation-id for the message.
1019 """)
1020
1022 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
1023
1024 - def _set_content_type(self, value):
1025 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
1026
1027 content_type = property(_get_content_type, _set_content_type,
1028 doc="""
1029 The content-type of the message.
1030 """)
1031
1033 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
1034
1035 - def _set_content_encoding(self, value):
1036 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
1037
1038 content_encoding = property(_get_content_encoding, _set_content_encoding,
1039 doc="""
1040 The content-encoding of the message.
1041 """)
1042
1044 return millis2secs(pn_message_get_expiry_time(self._msg))
1045
1047 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1048
1049 expiry_time = property(_get_expiry_time, _set_expiry_time,
1050 doc="""
1051 The expiry time of the message.
1052 """)
1053
1055 return millis2secs(pn_message_get_creation_time(self._msg))
1056
1058 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1059
1060 creation_time = property(_get_creation_time, _set_creation_time,
1061 doc="""
1062 The creation time of the message.
1063 """)
1064
1066 return utf82unicode(pn_message_get_group_id(self._msg))
1067
1069 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
1070
1071 group_id = property(_get_group_id, _set_group_id,
1072 doc="""
1073 The group id of the message.
1074 """)
1075
1077 return pn_message_get_group_sequence(self._msg)
1078
1080 self._check(pn_message_set_group_sequence(self._msg, value))
1081
1082 group_sequence = property(_get_group_sequence, _set_group_sequence,
1083 doc="""
1084 The sequence of the message within its group.
1085 """)
1086
1088 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
1089
1091 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
1092
1093 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
1094 doc="""
1095 The group-id for any replies.
1096 """)
1097
1099 self._pre_encode()
1100 sz = 16
1101 while True:
1102 err, data = pn_message_encode(self._msg, sz)
1103 if err == PN_OVERFLOW:
1104 sz *= 2
1105 continue
1106 else:
1107 self._check(err)
1108 return data
1109
1111 self._check(pn_message_decode(self._msg, data))
1112 self._post_decode()
1113
1114 - def send(self, sender, tag=None):
1122
1123 - def recv(self, link):
1124 """
1125 Receives and decodes the message content for the current delivery
1126 from the link. Upon success it will return the current delivery
1127 for the link. If there is no current delivery, or if the current
1128 delivery is incomplete, or if the link is not a receiver, it will
1129 return None.
1130
1131 @type link: Link
1132 @param link: the link to receive a message from
1133 @return the delivery associated with the decoded message (or None)
1134
1135 """
1136 if link.is_sender: return None
1137 dlv = link.current
1138 if not dlv or dlv.partial: return None
1139 dlv.encoded = link.recv(dlv.pending)
1140 link.advance()
1141
1142
1143 if link.remote_snd_settle_mode == Link.SND_SETTLED:
1144 dlv.settle()
1145 self.decode(dlv.encoded)
1146 return dlv
1147
1149 props = []
1150 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
1151 "priority", "first_acquirer", "delivery_count", "id",
1152 "correlation_id", "user_id", "group_id", "group_sequence",
1153 "reply_to_group_id", "instructions", "annotations",
1154 "properties", "body"):
1155 value = getattr(self, attr)
1156 if value: props.append("%s=%r" % (attr, value))
1157 return "Message(%s)" % ", ".join(props)
1158
1160 tmp = pn_string(None)
1161 err = pn_inspect(self._msg, tmp)
1162 result = pn_string_get(tmp)
1163 pn_free(tmp)
1164 self._check(err)
1165 return result
1166
1168
1171
1172 @property
1174 return pn_subscription_address(self._impl)
1175
1176 _DEFAULT = object()
1179
1180 @staticmethod
1182 if impl is None:
1183 return None
1184 else:
1185 return Selectable(impl)
1186
1189
1192
1194 if fd is _DEFAULT:
1195 return pn_selectable_get_fd(self._impl)
1196 elif fd is None:
1197 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
1198 else:
1199 pn_selectable_set_fd(self._impl, fd)
1200
1202 return pn_selectable_is_reading(self._impl)
1203
1205 pn_selectable_set_reading(self._impl, bool(val))
1206
1207 reading = property(_is_reading, _set_reading)
1208
1210 return pn_selectable_is_writing(self._impl)
1211
1213 pn_selectable_set_writing(self._impl, bool(val))
1214
1215 writing = property(_is_writing, _set_writing)
1216
1218 tstamp = pn_selectable_get_deadline(self._impl)
1219 if tstamp:
1220 return millis2secs(tstamp)
1221 else:
1222 return None
1223
1225 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1226
1227 deadline = property(_get_deadline, _set_deadline)
1228
1230 pn_selectable_readable(self._impl)
1231
1233 pn_selectable_writable(self._impl)
1234
1236 pn_selectable_expired(self._impl)
1237
1239 return pn_selectable_is_registered(self._impl)
1240
1242 pn_selectable_set_registered(self._impl, registered)
1243
1244 registered = property(_is_registered, _set_registered,
1245 doc="""
1246 The registered property may be get/set by an I/O polling system to
1247 indicate whether the fd has been registered or not.
1248 """)
1249
1250 @property
1252 return pn_selectable_is_terminal(self._impl)
1253
1255 pn_selectable_terminate(self._impl)
1256
1258 pn_selectable_release(self._impl)
1259
1261 """
1262 The DataException class is the root of the Data exception hierarchy.
1263 All exceptions raised by the Data class extend this exception.
1264 """
1265 pass
1266
1268
1271
1273 return "UnmappedType(%s)" % self.msg
1274
1276
1278 return "ulong(%s)" % long.__repr__(self)
1279
1281
1283 return "timestamp(%s)" % long.__repr__(self)
1284
1286
1288 return "symbol(%s)" % unicode.__repr__(self)
1289
1290 -class char(unicode):
1291
1293 return "char(%s)" % unicode.__repr__(self)
1294
1299
1304
1309
1314
1319
1321
1323 return "uint(%s)" % long.__repr__(self)
1324
1326
1328 return "float32(%s)" % float.__repr__(self)
1329
1334
1336
1338 return "decimal64(%s)" % long.__repr__(self)
1339
1341
1343 return "decimal128(%s)" % bytes.__repr__(self)
1344
1346
1347 - def __init__(self, descriptor, value):
1348 self.descriptor = descriptor
1349 self.value = value
1350
1352 return "Described(%r, %r)" % (self.descriptor, self.value)
1353
1355 if isinstance(o, Described):
1356 return self.descriptor == o.descriptor and self.value == o.value
1357 else:
1358 return False
1359
1360 UNDESCRIBED = Constant("UNDESCRIBED")
1361
1362 -class Array(object):
1363
1364 - def __init__(self, descriptor, type, *elements):
1365 self.descriptor = descriptor
1366 self.type = type
1367 self.elements = elements
1368
1370 return iter(self.elements)
1371
1373 if self.elements:
1374 els = ", %s" % (", ".join(map(repr, self.elements)))
1375 else:
1376 els = ""
1377 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1378
1380 if isinstance(o, Array):
1381 return self.descriptor == o.descriptor and \
1382 self.type == o.type and self.elements == o.elements
1383 else:
1384 return False
1385
1387 """
1388 The L{Data} class provides an interface for decoding, extracting,
1389 creating, and encoding arbitrary AMQP data. A L{Data} object
1390 contains a tree of AMQP values. Leaf nodes in this tree correspond
1391 to scalars in the AMQP type system such as L{ints<INT>} or
1392 L{strings<STRING>}. Non-leaf nodes in this tree correspond to
1393 compound values in the AMQP type system such as L{lists<LIST>},
1394 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
1395 The root node of the tree is the L{Data} object itself and can have
1396 an arbitrary number of children.
1397
1398 A L{Data} object maintains the notion of the current sibling node
1399 and a current parent node. Siblings are ordered within their parent.
1400 Values are accessed and/or added by using the L{next}, L{prev},
1401 L{enter}, and L{exit} methods to navigate to the desired location in
1402 the tree and using the supplied variety of put_*/get_* methods to
1403 access or add a value of the desired type.
1404
1405 The put_* methods will always add a value I{after} the current node
1406 in the tree. If the current node has a next sibling the put_* method
1407 will overwrite the value on this node. If there is no current node
1408 or the current node has no next sibling then one will be added. The
1409 put_* methods always set the added/modified node to the current
1410 node. The get_* methods read the value of the current node and do
1411 not change which node is current.
1412
1413 The following types of scalar values are supported:
1414
1415 - L{NULL}
1416 - L{BOOL}
1417 - L{UBYTE}
1418 - L{USHORT}
1419 - L{SHORT}
1420 - L{UINT}
1421 - L{INT}
1422 - L{ULONG}
1423 - L{LONG}
1424 - L{FLOAT}
1425 - L{DOUBLE}
1426 - L{BINARY}
1427 - L{STRING}
1428 - L{SYMBOL}
1429
1430 The following types of compound values are supported:
1431
1432 - L{DESCRIBED}
1433 - L{ARRAY}
1434 - L{LIST}
1435 - L{MAP}
1436 """
1437
1438 NULL = PN_NULL; "A null value."
1439 BOOL = PN_BOOL; "A boolean value."
1440 UBYTE = PN_UBYTE; "An unsigned byte value."
1441 BYTE = PN_BYTE; "A signed byte value."
1442 USHORT = PN_USHORT; "An unsigned short value."
1443 SHORT = PN_SHORT; "A short value."
1444 UINT = PN_UINT; "An unsigned int value."
1445 INT = PN_INT; "A signed int value."
1446 CHAR = PN_CHAR; "A character value."
1447 ULONG = PN_ULONG; "An unsigned long value."
1448 LONG = PN_LONG; "A signed long value."
1449 TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
1450 FLOAT = PN_FLOAT; "A float value."
1451 DOUBLE = PN_DOUBLE; "A double value."
1452 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
1453 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
1454 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
1455 UUID = PN_UUID; "A UUID value."
1456 BINARY = PN_BINARY; "A binary string."
1457 STRING = PN_STRING; "A unicode string."
1458 SYMBOL = PN_SYMBOL; "A symbolic string."
1459 DESCRIBED = PN_DESCRIBED; "A described value."
1460 ARRAY = PN_ARRAY; "An array value."
1461 LIST = PN_LIST; "A list value."
1462 MAP = PN_MAP; "A map value."
1463
1464 type_names = {
1465 NULL: "null",
1466 BOOL: "bool",
1467 BYTE: "byte",
1468 UBYTE: "ubyte",
1469 SHORT: "short",
1470 USHORT: "ushort",
1471 INT: "int",
1472 UINT: "uint",
1473 CHAR: "char",
1474 LONG: "long",
1475 ULONG: "ulong",
1476 TIMESTAMP: "timestamp",
1477 FLOAT: "float",
1478 DOUBLE: "double",
1479 DECIMAL32: "decimal32",
1480 DECIMAL64: "decimal64",
1481 DECIMAL128: "decimal128",
1482 UUID: "uuid",
1483 BINARY: "binary",
1484 STRING: "string",
1485 SYMBOL: "symbol",
1486 DESCRIBED: "described",
1487 ARRAY: "array",
1488 LIST: "list",
1489 MAP: "map"
1490 }
1491
1492 @classmethod
1494
1502
1504 if self._free and hasattr(self, "_data"):
1505 pn_data_free(self._data)
1506 del self._data
1507
1509 if err < 0:
1510 exc = EXCEPTIONS.get(err, DataException)
1511 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
1512 else:
1513 return err
1514
1516 """
1517 Clears the data object.
1518 """
1519 pn_data_clear(self._data)
1520
1522 """
1523 Clears current node and sets the parent to the root node. Clearing the
1524 current node sets it _before_ the first node, calling next() will advance to
1525 the first node.
1526 """
1527 assert self._data is not None
1528 pn_data_rewind(self._data)
1529
1531 """
1532 Advances the current node to its next sibling and returns its
1533 type. If there is no next sibling the current node remains
1534 unchanged and None is returned.
1535 """
1536 found = pn_data_next(self._data)
1537 if found:
1538 return self.type()
1539 else:
1540 return None
1541
1543 """
1544 Advances the current node to its previous sibling and returns its
1545 type. If there is no previous sibling the current node remains
1546 unchanged and None is returned.
1547 """
1548 found = pn_data_prev(self._data)
1549 if found:
1550 return self.type()
1551 else:
1552 return None
1553
1555 """
1556 Sets the parent node to the current node and clears the current node.
1557 Clearing the current node sets it _before_ the first child,
1558 call next() advances to the first child.
1559 """
1560 return pn_data_enter(self._data)
1561
1563 """
1564 Sets the current node to the parent node and the parent node to
1565 its own parent.
1566 """
1567 return pn_data_exit(self._data)
1568
1570 return pn_data_lookup(self._data, name)
1571
1573 pn_data_narrow(self._data)
1574
1576 pn_data_widen(self._data)
1577
1579 """
1580 Returns the type of the current node.
1581 """
1582 dtype = pn_data_type(self._data)
1583 if dtype == -1:
1584 return None
1585 else:
1586 return dtype
1587
1589 """
1590 Returns the size in bytes needed to encode the data in AMQP format.
1591 """
1592 return pn_data_encoded_size(self._data)
1593
1595 """
1596 Returns a representation of the data encoded in AMQP format.
1597 """
1598 size = 1024
1599 while True:
1600 cd, enc = pn_data_encode(self._data, size)
1601 if cd == PN_OVERFLOW:
1602 size *= 2
1603 elif cd >= 0:
1604 return enc
1605 else:
1606 self._check(cd)
1607
1609 """
1610 Decodes the first value from supplied AMQP data and returns the
1611 number of bytes consumed.
1612
1613 @type encoded: binary
1614 @param encoded: AMQP encoded binary data
1615 """
1616 return self._check(pn_data_decode(self._data, encoded))
1617
1619 """
1620 Puts a list value. Elements may be filled by entering the list
1621 node and putting element values.
1622
1623 >>> data = Data()
1624 >>> data.put_list()
1625 >>> data.enter()
1626 >>> data.put_int(1)
1627 >>> data.put_int(2)
1628 >>> data.put_int(3)
1629 >>> data.exit()
1630 """
1631 self._check(pn_data_put_list(self._data))
1632
1634 """
1635 Puts a map value. Elements may be filled by entering the map node
1636 and putting alternating key value pairs.
1637
1638 >>> data = Data()
1639 >>> data.put_map()
1640 >>> data.enter()
1641 >>> data.put_string("key")
1642 >>> data.put_string("value")
1643 >>> data.exit()
1644 """
1645 self._check(pn_data_put_map(self._data))
1646
1647 - def put_array(self, described, element_type):
1648 """
1649 Puts an array value. Elements may be filled by entering the array
1650 node and putting the element values. The values must all be of the
1651 specified array element type. If an array is described then the
1652 first child value of the array is the descriptor and may be of any
1653 type.
1654
1655 >>> data = Data()
1656 >>>
1657 >>> data.put_array(False, Data.INT)
1658 >>> data.enter()
1659 >>> data.put_int(1)
1660 >>> data.put_int(2)
1661 >>> data.put_int(3)
1662 >>> data.exit()
1663 >>>
1664 >>> data.put_array(True, Data.DOUBLE)
1665 >>> data.enter()
1666 >>> data.put_symbol("array-descriptor")
1667 >>> data.put_double(1.1)
1668 >>> data.put_double(1.2)
1669 >>> data.put_double(1.3)
1670 >>> data.exit()
1671
1672 @type described: bool
1673 @param described: specifies whether the array is described
1674 @type element_type: int
1675 @param element_type: the type of the array elements
1676 """
1677 self._check(pn_data_put_array(self._data, described, element_type))
1678
1680 """
1681 Puts a described value. A described node has two children, the
1682 descriptor and the value. These are specified by entering the node
1683 and putting the desired values.
1684
1685 >>> data = Data()
1686 >>> data.put_described()
1687 >>> data.enter()
1688 >>> data.put_symbol("value-descriptor")
1689 >>> data.put_string("the value")
1690 >>> data.exit()
1691 """
1692 self._check(pn_data_put_described(self._data))
1693
1695 """
1696 Puts a null value.
1697 """
1698 self._check(pn_data_put_null(self._data))
1699
1701 """
1702 Puts a boolean value.
1703
1704 @param b: a boolean value
1705 """
1706 self._check(pn_data_put_bool(self._data, b))
1707
1709 """
1710 Puts an unsigned byte value.
1711
1712 @param ub: an integral value
1713 """
1714 self._check(pn_data_put_ubyte(self._data, ub))
1715
1717 """
1718 Puts a signed byte value.
1719
1720 @param b: an integral value
1721 """
1722 self._check(pn_data_put_byte(self._data, b))
1723
1725 """
1726 Puts an unsigned short value.
1727
1728 @param us: an integral value.
1729 """
1730 self._check(pn_data_put_ushort(self._data, us))
1731
1733 """
1734 Puts a signed short value.
1735
1736 @param s: an integral value
1737 """
1738 self._check(pn_data_put_short(self._data, s))
1739
1741 """
1742 Puts an unsigned int value.
1743
1744 @param ui: an integral value
1745 """
1746 self._check(pn_data_put_uint(self._data, ui))
1747
1749 """
1750 Puts a signed int value.
1751
1752 @param i: an integral value
1753 """
1754 self._check(pn_data_put_int(self._data, i))
1755
1757 """
1758 Puts a char value.
1759
1760 @param c: a single character
1761 """
1762 self._check(pn_data_put_char(self._data, ord(c)))
1763
1765 """
1766 Puts an unsigned long value.
1767
1768 @param ul: an integral value
1769 """
1770 self._check(pn_data_put_ulong(self._data, ul))
1771
1773 """
1774 Puts a signed long value.
1775
1776 @param l: an integral value
1777 """
1778 self._check(pn_data_put_long(self._data, l))
1779
1781 """
1782 Puts a timestamp value.
1783
1784 @param t: an integral value
1785 """
1786 self._check(pn_data_put_timestamp(self._data, t))
1787
1789 """
1790 Puts a float value.
1791
1792 @param f: a floating point value
1793 """
1794 self._check(pn_data_put_float(self._data, f))
1795
1797 """
1798 Puts a double value.
1799
1800 @param d: a floating point value.
1801 """
1802 self._check(pn_data_put_double(self._data, d))
1803
1805 """
1806 Puts a decimal32 value.
1807
1808 @param d: a decimal32 value
1809 """
1810 self._check(pn_data_put_decimal32(self._data, d))
1811
1813 """
1814 Puts a decimal64 value.
1815
1816 @param d: a decimal64 value
1817 """
1818 self._check(pn_data_put_decimal64(self._data, d))
1819
1821 """
1822 Puts a decimal128 value.
1823
1824 @param d: a decimal128 value
1825 """
1826 self._check(pn_data_put_decimal128(self._data, d))
1827
1829 """
1830 Puts a UUID value.
1831
1832 @param u: a uuid value
1833 """
1834 self._check(pn_data_put_uuid(self._data, u.bytes))
1835
1837 """
1838 Puts a binary value.
1839
1840 @type b: binary
1841 @param b: a binary value
1842 """
1843 self._check(pn_data_put_binary(self._data, bytes(b)))
1844
1846 """
1847 Puts a unicode value.
1848
1849 @type s: unicode
1850 @param s: a unicode value
1851 """
1852 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1853
1855 """
1856 Puts a symbolic value.
1857
1858 @type s: string
1859 @param s: the symbol name
1860 """
1861 self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
1862
1864 """
1865 If the current node is a list, return the number of elements,
1866 otherwise return zero. List elements can be accessed by entering
1867 the list.
1868
1869 >>> count = data.get_list()
1870 >>> data.enter()
1871 >>> for i in range(count):
1872 ... type = data.next()
1873 ... if type == Data.STRING:
1874 ... print data.get_string()
1875 ... elif type == ...:
1876 ... ...
1877 >>> data.exit()
1878 """
1879 return pn_data_get_list(self._data)
1880
1882 """
1883 If the current node is a map, return the number of child elements,
1884 otherwise return zero. Key value pairs can be accessed by entering
1885 the map.
1886
1887 >>> count = data.get_map()
1888 >>> data.enter()
1889 >>> for i in range(count/2):
1890 ... type = data.next()
1891 ... if type == Data.STRING:
1892 ... print data.get_string()
1893 ... elif type == ...:
1894 ... ...
1895 >>> data.exit()
1896 """
1897 return pn_data_get_map(self._data)
1898
1900 """
1901 If the current node is an array, return a tuple of the element
1902 count, a boolean indicating whether the array is described, and
1903 the type of each element, otherwise return (0, False, None). Array
1904 data can be accessed by entering the array.
1905
1906 >>> # read an array of strings with a symbolic descriptor
1907 >>> count, described, type = data.get_array()
1908 >>> data.enter()
1909 >>> data.next()
1910 >>> print "Descriptor:", data.get_symbol()
1911 >>> for i in range(count):
1912 ... data.next()
1913 ... print "Element:", data.get_string()
1914 >>> data.exit()
1915 """
1916 count = pn_data_get_array(self._data)
1917 described = pn_data_is_array_described(self._data)
1918 type = pn_data_get_array_type(self._data)
1919 if type == -1:
1920 type = None
1921 return count, described, type
1922
1924 """
1925 Checks if the current node is a described value. The descriptor
1926 and value may be accessed by entering the described value.
1927
1928 >>> # read a symbolically described string
1929 >>> assert data.is_described() # will error if the current node is not described
1930 >>> data.enter()
1931 >>> data.next()
1932 >>> print data.get_symbol()
1933 >>> data.next()
1934 >>> print data.get_string()
1935 >>> data.exit()
1936 """
1937 return pn_data_is_described(self._data)
1938
1940 """
1941 Checks if the current node is a null.
1942 """
1943 return pn_data_is_null(self._data)
1944
1946 """
1947 If the current node is a boolean, returns its value, returns False
1948 otherwise.
1949 """
1950 return pn_data_get_bool(self._data)
1951
1953 """
1954 If the current node is an unsigned byte, returns its value,
1955 returns 0 otherwise.
1956 """
1957 return ubyte(pn_data_get_ubyte(self._data))
1958
1960 """
1961 If the current node is a signed byte, returns its value, returns 0
1962 otherwise.
1963 """
1964 return byte(pn_data_get_byte(self._data))
1965
1967 """
1968 If the current node is an unsigned short, returns its value,
1969 returns 0 otherwise.
1970 """
1971 return ushort(pn_data_get_ushort(self._data))
1972
1974 """
1975 If the current node is a signed short, returns its value, returns
1976 0 otherwise.
1977 """
1978 return short(pn_data_get_short(self._data))
1979
1981 """
1982 If the current node is an unsigned int, returns its value, returns
1983 0 otherwise.
1984 """
1985 return uint(pn_data_get_uint(self._data))
1986
1988 """
1989 If the current node is a signed int, returns its value, returns 0
1990 otherwise.
1991 """
1992 return int32(pn_data_get_int(self._data))
1993
1995 """
1996 If the current node is a char, returns its value, returns 0
1997 otherwise.
1998 """
1999 return char(_compat.unichar(pn_data_get_char(self._data)))
2000
2002 """
2003 If the current node is an unsigned long, returns its value,
2004 returns 0 otherwise.
2005 """
2006 return ulong(pn_data_get_ulong(self._data))
2007
2009 """
2010 If the current node is an signed long, returns its value, returns
2011 0 otherwise.
2012 """
2013 return long(pn_data_get_long(self._data))
2014
2016 """
2017 If the current node is a timestamp, returns its value, returns 0
2018 otherwise.
2019 """
2020 return timestamp(pn_data_get_timestamp(self._data))
2021
2023 """
2024 If the current node is a float, returns its value, raises 0
2025 otherwise.
2026 """
2027 return float32(pn_data_get_float(self._data))
2028
2030 """
2031 If the current node is a double, returns its value, returns 0
2032 otherwise.
2033 """
2034 return pn_data_get_double(self._data)
2035
2036
2038 """
2039 If the current node is a decimal32, returns its value, returns 0
2040 otherwise.
2041 """
2042 return decimal32(pn_data_get_decimal32(self._data))
2043
2044
2046 """
2047 If the current node is a decimal64, returns its value, returns 0
2048 otherwise.
2049 """
2050 return decimal64(pn_data_get_decimal64(self._data))
2051
2052
2054 """
2055 If the current node is a decimal128, returns its value, returns 0
2056 otherwise.
2057 """
2058 return decimal128(pn_data_get_decimal128(self._data))
2059
2061 """
2062 If the current node is a UUID, returns its value, returns None
2063 otherwise.
2064 """
2065 if pn_data_type(self._data) == Data.UUID:
2066 return uuid.UUID(bytes=pn_data_get_uuid(self._data))
2067 else:
2068 return None
2069
2071 """
2072 If the current node is binary, returns its value, returns ""
2073 otherwise.
2074 """
2075 return pn_data_get_binary(self._data)
2076
2078 """
2079 If the current node is a string, returns its value, returns ""
2080 otherwise.
2081 """
2082 return pn_data_get_string(self._data).decode("utf8")
2083
2085 """
2086 If the current node is a symbol, returns its value, returns ""
2087 otherwise.
2088 """
2089 return symbol(pn_data_get_symbol(self._data).decode('ascii'))
2090
2091 - def copy(self, src):
2092 self._check(pn_data_copy(self._data, src._data))
2093
2104
2106 pn_data_dump(self._data)
2107
2117
2119 if self.enter():
2120 try:
2121 result = {}
2122 while self.next():
2123 k = self.get_object()
2124 if self.next():
2125 v = self.get_object()
2126 else:
2127 v = None
2128 result[k] = v
2129 finally:
2130 self.exit()
2131 return result
2132
2141
2143 if self.enter():
2144 try:
2145 result = []
2146 while self.next():
2147 result.append(self.get_object())
2148 finally:
2149 self.exit()
2150 return result
2151
2162
2171
2173 """
2174 If the current node is an array, return an Array object
2175 representing the array and its contents. Otherwise return None.
2176 This is a convenience wrapper around get_array, enter, etc.
2177 """
2178
2179 count, described, type = self.get_array()
2180 if type is None: return None
2181 if self.enter():
2182 try:
2183 if described:
2184 self.next()
2185 descriptor = self.get_object()
2186 else:
2187 descriptor = UNDESCRIBED
2188 elements = []
2189 while self.next():
2190 elements.append(self.get_object())
2191 finally:
2192 self.exit()
2193 return Array(descriptor, type, *elements)
2194
2206
2207 put_mappings = {
2208 None.__class__: lambda s, _: s.put_null(),
2209 bool: put_bool,
2210 ubyte: put_ubyte,
2211 ushort: put_ushort,
2212 uint: put_uint,
2213 ulong: put_ulong,
2214 byte: put_byte,
2215 short: put_short,
2216 int32: put_int,
2217 long: put_long,
2218 float32: put_float,
2219 float: put_double,
2220 decimal32: put_decimal32,
2221 decimal64: put_decimal64,
2222 decimal128: put_decimal128,
2223 char: put_char,
2224 timestamp: put_timestamp,
2225 uuid.UUID: put_uuid,
2226 bytes: put_binary,
2227 unicode: put_string,
2228 symbol: put_symbol,
2229 list: put_sequence,
2230 tuple: put_sequence,
2231 dict: put_dict,
2232 Described: put_py_described,
2233 Array: put_py_array
2234 }
2235
2236
2237 if int not in put_mappings:
2238 put_mappings[int] = put_int
2239
2240 if getattr(__builtins__, 'memoryview', None):
2241 put_mappings[memoryview] = put_binary
2242 if getattr(__builtins__, 'buffer', None):
2243 put_mappings[buffer] = put_binary
2244
2245 get_mappings = {
2246 NULL: lambda s: None,
2247 BOOL: get_bool,
2248 BYTE: get_byte,
2249 UBYTE: get_ubyte,
2250 SHORT: get_short,
2251 USHORT: get_ushort,
2252 INT: get_int,
2253 UINT: get_uint,
2254 CHAR: get_char,
2255 LONG: get_long,
2256 ULONG: get_ulong,
2257 TIMESTAMP: get_timestamp,
2258 FLOAT: get_float,
2259 DOUBLE: get_double,
2260 DECIMAL32: get_decimal32,
2261 DECIMAL64: get_decimal64,
2262 DECIMAL128: get_decimal128,
2263 UUID: get_uuid,
2264 BINARY: get_binary,
2265 STRING: get_string,
2266 SYMBOL: get_symbol,
2267 DESCRIBED: get_py_described,
2268 ARRAY: get_py_array,
2269 LIST: get_sequence,
2270 MAP: get_dict
2271 }
2272
2273
2275 putter = self.put_mappings[obj.__class__]
2276 putter(self, obj)
2277
2279 type = self.type()
2280 if type is None: return None
2281 getter = self.get_mappings.get(type)
2282 if getter:
2283 return getter(self)
2284 else:
2285 return UnmappedType(str(type))
2286
2289
2291
2292 LOCAL_UNINIT = PN_LOCAL_UNINIT
2293 REMOTE_UNINIT = PN_REMOTE_UNINIT
2294 LOCAL_ACTIVE = PN_LOCAL_ACTIVE
2295 REMOTE_ACTIVE = PN_REMOTE_ACTIVE
2296 LOCAL_CLOSED = PN_LOCAL_CLOSED
2297 REMOTE_CLOSED = PN_REMOTE_CLOSED
2298
2301
2303 obj2cond(self.condition, self._get_cond_impl())
2304
2305 @property
2307 return cond2obj(self._get_remote_cond_impl())
2308
2309
2311 assert False, "Subclass must override this!"
2312
2314 assert False, "Subclass must override this!"
2315
2325
2337
2338 handler = property(_get_handler, _set_handler)
2339
2340 @property
2343
2345
2346 - def __init__(self, name, description=None, info=None):
2347 self.name = name
2348 self.description = description
2349 self.info = info
2350
2352 return "Condition(%s)" % ", ".join([repr(x) for x in
2353 (self.name, self.description, self.info)
2354 if x])
2355
2357 if not isinstance(o, Condition): return False
2358 return self.name == o.name and \
2359 self.description == o.description and \
2360 self.info == o.info
2361
2363 pn_condition_clear(cond)
2364 if obj:
2365 pn_condition_set_name(cond, str(obj.name))
2366 pn_condition_set_description(cond, obj.description)
2367 info = Data(pn_condition_info(cond))
2368 if obj.info:
2369 info.put_object(obj.info)
2370
2372 if pn_condition_is_set(cond):
2373 return Condition(pn_condition_get_name(cond),
2374 pn_condition_get_description(cond),
2375 dat2obj(pn_condition_info(cond)))
2376 else:
2377 return None
2378
2387
2392
2394 return long(secs*1000)
2395
2397 return float(millis)/1000.0
2398
2400 if secs is None: return PN_MILLIS_MAX
2401 return secs2millis(secs)
2402
2404 if millis == PN_MILLIS_MAX: return None
2405 return millis2secs(millis)
2406
2408 """Some Proton APIs expect a null terminated string. Convert python text
2409 types to UTF8 to avoid zero bytes introduced by other multi-byte encodings.
2410 This method will throw if the string cannot be converted.
2411 """
2412 if string is None:
2413 return None
2414 if _compat.IS_PY2:
2415 if isinstance(string, unicode):
2416 return string.encode('utf-8')
2417 elif isinstance(string, str):
2418 return string
2419 else:
2420
2421 if isinstance(string, str):
2422 string = string.encode('utf-8')
2423
2424 if isinstance(string, bytes):
2425 return string.decode('utf-8')
2426 raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
2427
2429 """Covert C strings returned from proton-c into python unicode"""
2430 if string is None:
2431 return None
2432 if isinstance(string, _compat.TEXT_TYPES):
2433
2434 return string
2435 elif isinstance(string, _compat.BINARY_TYPES):
2436 return string.decode('utf8')
2437 else:
2438 raise TypeError("Unrecognized string type")
2439
2441 """
2442 A representation of an AMQP connection
2443 """
2444
2445 @staticmethod
2447 if impl is None:
2448 return None
2449 else:
2450 return Connection(impl)
2451
2452 - def __init__(self, impl = pn_connection):
2454
2456 Endpoint._init(self)
2457 self.offered_capabilities = None
2458 self.desired_capabilities = None
2459 self.properties = None
2460
2462 return pn_connection_attachments(self._impl)
2463
2464 @property
2467
2468 @property
2471
2473 if err < 0:
2474 exc = EXCEPTIONS.get(err, ConnectionException)
2475 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
2476 else:
2477 return err
2478
2480 return pn_connection_condition(self._impl)
2481
2483 return pn_connection_remote_condition(self._impl)
2484
2486 if collector is None:
2487 pn_connection_collect(self._impl, None)
2488 else:
2489 pn_connection_collect(self._impl, collector._impl)
2490 self._collector = weakref.ref(collector)
2491
2493 return utf82unicode(pn_connection_get_container(self._impl))
2495 return pn_connection_set_container(self._impl, unicode2utf8(name))
2496
2497 container = property(_get_container, _set_container)
2498
2500 return utf82unicode(pn_connection_get_hostname(self._impl))
2502 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2503
2504 hostname = property(_get_hostname, _set_hostname,
2505 doc="""
2506 Set the name of the host (either fully qualified or relative) to which this
2507 connection is connecting to. This information may be used by the remote
2508 peer to determine the correct back-end service to connect the client to.
2509 This value will be sent in the Open performative, and will be used by SSL
2510 and SASL layers to identify the peer.
2511 """)
2512
2514 return utf82unicode(pn_connection_get_user(self._impl))
2516 return pn_connection_set_user(self._impl, unicode2utf8(name))
2517
2518 user = property(_get_user, _set_user)
2519
2523 return pn_connection_set_password(self._impl, unicode2utf8(name))
2524
2525 password = property(_get_password, _set_password)
2526
2527 @property
2529 """The container identifier specified by the remote peer for this connection."""
2530 return pn_connection_remote_container(self._impl)
2531
2532 @property
2534 """The hostname specified by the remote peer for this connection."""
2535 return pn_connection_remote_hostname(self._impl)
2536
2537 @property
2539 """The capabilities offered by the remote peer for this connection."""
2540 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2541
2542 @property
2544 """The capabilities desired by the remote peer for this connection."""
2545 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2546
2547 @property
2549 """The properties specified by the remote peer for this connection."""
2550 return dat2obj(pn_connection_remote_properties(self._impl))
2551
2553 """
2554 Opens the connection.
2555
2556 In more detail, this moves the local state of the connection to
2557 the ACTIVE state and triggers an open frame to be sent to the
2558 peer. A connection is fully active once both peers have opened it.
2559 """
2560 obj2dat(self.offered_capabilities,
2561 pn_connection_offered_capabilities(self._impl))
2562 obj2dat(self.desired_capabilities,
2563 pn_connection_desired_capabilities(self._impl))
2564 obj2dat(self.properties, pn_connection_properties(self._impl))
2565 pn_connection_open(self._impl)
2566
2568 """
2569 Closes the connection.
2570
2571 In more detail, this moves the local state of the connection to
2572 the CLOSED state and triggers a close frame to be sent to the
2573 peer. A connection is fully closed once both peers have closed it.
2574 """
2575 self._update_cond()
2576 pn_connection_close(self._impl)
2577
2578 @property
2580 """
2581 The state of the connection as a bit field. The state has a local
2582 and a remote component. Each of these can be in one of three
2583 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2584 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2585 REMOTE_ACTIVE and REMOTE_CLOSED.
2586 """
2587 return pn_connection_state(self._impl)
2588
2590 """
2591 Returns a new session on this connection.
2592 """
2593 ssn = pn_session(self._impl)
2594 if ssn is None:
2595 raise(SessionException("Session allocation failed."))
2596 else:
2597 return Session(ssn)
2598
2600 return Session.wrap(pn_session_head(self._impl, mask))
2601
2603 return Link.wrap(pn_link_head(self._impl, mask))
2604
2605 @property
2608
2609 @property
2611 return pn_error_code(pn_connection_error(self._impl))
2612
2614 pn_connection_release(self._impl)
2615
2618
2620
2621 @staticmethod
2623 if impl is None:
2624 return None
2625 else:
2626 return Session(impl)
2627
2630
2632 return pn_session_attachments(self._impl)
2633
2635 return pn_session_condition(self._impl)
2636
2638 return pn_session_remote_condition(self._impl)
2639
2641 return pn_session_get_incoming_capacity(self._impl)
2642
2644 pn_session_set_incoming_capacity(self._impl, capacity)
2645
2646 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
2647
2649 return pn_session_get_outgoing_window(self._impl)
2650
2652 pn_session_set_outgoing_window(self._impl, window)
2653
2654 outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
2655
2656 @property
2658 return pn_session_outgoing_bytes(self._impl)
2659
2660 @property
2662 return pn_session_incoming_bytes(self._impl)
2663
2665 pn_session_open(self._impl)
2666
2668 self._update_cond()
2669 pn_session_close(self._impl)
2670
2671 - def next(self, mask):
2672 return Session.wrap(pn_session_next(self._impl, mask))
2673
2674 @property
2676 return pn_session_state(self._impl)
2677
2678 @property
2681
2683 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2684
2686 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2687
2689 pn_session_free(self._impl)
2690
2693
2694 -class Link(Wrapper, Endpoint):
2695 """
2696 A representation of an AMQP link, of which there are two concrete
2697 implementations, Sender and Receiver.
2698 """
2699
2700 SND_UNSETTLED = PN_SND_UNSETTLED
2701 SND_SETTLED = PN_SND_SETTLED
2702 SND_MIXED = PN_SND_MIXED
2703
2704 RCV_FIRST = PN_RCV_FIRST
2705 RCV_SECOND = PN_RCV_SECOND
2706
2707 @staticmethod
2709 if impl is None: return None
2710 if pn_link_is_sender(impl):
2711 return Sender(impl)
2712 else:
2713 return Receiver(impl)
2714
2717
2719 return pn_link_attachments(self._impl)
2720
2722 if err < 0:
2723 exc = EXCEPTIONS.get(err, LinkException)
2724 raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
2725 else:
2726 return err
2727
2729 return pn_link_condition(self._impl)
2730
2732 return pn_link_remote_condition(self._impl)
2733
2735 """
2736 Opens the link.
2737
2738 In more detail, this moves the local state of the link to the
2739 ACTIVE state and triggers an attach frame to be sent to the
2740 peer. A link is fully active once both peers have attached it.
2741 """
2742 pn_link_open(self._impl)
2743
2745 """
2746 Closes the link.
2747
2748 In more detail, this moves the local state of the link to the
2749 CLOSED state and triggers an detach frame (with the closed flag
2750 set) to be sent to the peer. A link is fully closed once both
2751 peers have detached it.
2752 """
2753 self._update_cond()
2754 pn_link_close(self._impl)
2755
2756 @property
2758 """
2759 The state of the link as a bit field. The state has a local
2760 and a remote component. Each of these can be in one of three
2761 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2762 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2763 REMOTE_ACTIVE and REMOTE_CLOSED.
2764 """
2765 return pn_link_state(self._impl)
2766
2767 @property
2769 """The source of the link as described by the local peer."""
2770 return Terminus(pn_link_source(self._impl))
2771
2772 @property
2774 """The target of the link as described by the local peer."""
2775 return Terminus(pn_link_target(self._impl))
2776
2777 @property
2779 """The source of the link as described by the remote peer."""
2780 return Terminus(pn_link_remote_source(self._impl))
2781 @property
2783 """The target of the link as described by the remote peer."""
2784 return Terminus(pn_link_remote_target(self._impl))
2785
2786 @property
2789
2790 @property
2792 """The connection on which this link was attached."""
2793 return self.session.connection
2794
2797
2798 @property
2801
2803 return pn_link_advance(self._impl)
2804
2805 @property
2807 return pn_link_unsettled(self._impl)
2808
2809 @property
2811 """The amount of oustanding credit on this link."""
2812 return pn_link_credit(self._impl)
2813
2814 @property
2816 return pn_link_available(self._impl)
2817
2818 @property
2820 return pn_link_queued(self._impl)
2821
2822 - def next(self, mask):
2823 return Link.wrap(pn_link_next(self._impl, mask))
2824
2825 @property
2827 """Returns the name of the link"""
2828 return utf82unicode(pn_link_name(self._impl))
2829
2830 @property
2832 """Returns true if this link is a sender."""
2833 return pn_link_is_sender(self._impl)
2834
2835 @property
2837 """Returns true if this link is a receiver."""
2838 return pn_link_is_receiver(self._impl)
2839
2840 @property
2842 return pn_link_remote_snd_settle_mode(self._impl)
2843
2844 @property
2846 return pn_link_remote_rcv_settle_mode(self._impl)
2847
2849 return pn_link_snd_settle_mode(self._impl)
2851 pn_link_set_snd_settle_mode(self._impl, mode)
2852 snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
2853
2855 return pn_link_rcv_settle_mode(self._impl)
2857 pn_link_set_rcv_settle_mode(self._impl, mode)
2858 rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
2859
2861 return pn_link_get_drain(self._impl)
2862
2864 pn_link_set_drain(self._impl, bool(b))
2865
2866 drain_mode = property(_get_drain, _set_drain)
2867
2869 return pn_link_drained(self._impl)
2870
2871 @property
2873 return pn_link_remote_max_message_size(self._impl)
2874
2876 return pn_link_max_message_size(self._impl)
2878 pn_link_set_max_message_size(self._impl, mode)
2879 max_message_size = property(_get_max_message_size, _set_max_message_size)
2880
2882 return pn_link_detach(self._impl)
2883
2885 pn_link_free(self._impl)
2886
2888
2889 UNSPECIFIED = PN_UNSPECIFIED
2890 SOURCE = PN_SOURCE
2891 TARGET = PN_TARGET
2892 COORDINATOR = PN_COORDINATOR
2893
2894 NONDURABLE = PN_NONDURABLE
2895 CONFIGURATION = PN_CONFIGURATION
2896 DELIVERIES = PN_DELIVERIES
2897
2898 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
2899 DIST_MODE_COPY = PN_DIST_MODE_COPY
2900 DIST_MODE_MOVE = PN_DIST_MODE_MOVE
2901
2902 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
2903 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
2904 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
2905 EXPIRE_NEVER = PN_EXPIRE_NEVER
2906
2909
2911 if err < 0:
2912 exc = EXCEPTIONS.get(err, LinkException)
2913 raise exc("[%s]" % err)
2914 else:
2915 return err
2916
2918 return pn_terminus_get_type(self._impl)
2920 self._check(pn_terminus_set_type(self._impl, type))
2921 type = property(_get_type, _set_type)
2922
2924 """The address that identifies the source or target node"""
2925 return utf82unicode(pn_terminus_get_address(self._impl))
2927 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2928 address = property(_get_address, _set_address)
2929
2931 return pn_terminus_get_durability(self._impl)
2933 self._check(pn_terminus_set_durability(self._impl, seconds))
2934 durability = property(_get_durability, _set_durability)
2935
2937 return pn_terminus_get_expiry_policy(self._impl)
2939 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2940 expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
2941
2943 return pn_terminus_get_timeout(self._impl)
2945 self._check(pn_terminus_set_timeout(self._impl, seconds))
2946 timeout = property(_get_timeout, _set_timeout)
2947
2949 """Indicates whether the source or target node was dynamically
2950 created"""
2951 return pn_terminus_is_dynamic(self._impl)
2953 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2954 dynamic = property(_is_dynamic, _set_dynamic)
2955
2957 return pn_terminus_get_distribution_mode(self._impl)
2959 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2960 distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
2961
2962 @property
2964 """Properties of a dynamic source or target."""
2965 return Data(pn_terminus_properties(self._impl))
2966
2967 @property
2969 """Capabilities of the source or target."""
2970 return Data(pn_terminus_capabilities(self._impl))
2971
2972 @property
2974 return Data(pn_terminus_outcomes(self._impl))
2975
2976 @property
2978 """A filter on a source allows the set of messages transfered over
2979 the link to be restricted"""
2980 return Data(pn_terminus_filter(self._impl))
2981
2982 - def copy(self, src):
2983 self._check(pn_terminus_copy(self._impl, src._impl))
2984
2986 """
2987 A link over which messages are sent.
2988 """
2989
2991 pn_link_offered(self._impl, n)
2992
2994 """
2995 Send specified data as part of the current delivery
2996
2997 @type data: binary
2998 @param data: data to send
2999 """
3000 return self._check(pn_link_send(self._impl, data))
3001
3002 - def send(self, obj, tag=None):
3003 """
3004 Send specified object over this sender; the object is expected to
3005 have a send() method on it that takes the sender and an optional
3006 tag as arguments.
3007
3008 Where the object is a Message, this will send the message over
3009 this link, creating a new delivery for the purpose.
3010 """
3011 if hasattr(obj, 'send'):
3012 return obj.send(self, tag=tag)
3013 else:
3014
3015 return self.stream(obj)
3016
3018 if not hasattr(self, 'tag_generator'):
3019 def simple_tags():
3020 count = 1
3021 while True:
3022 yield str(count)
3023 count += 1
3024 self.tag_generator = simple_tags()
3025 return next(self.tag_generator)
3026
3028 """
3029 A link over which messages are received.
3030 """
3031
3032 - def flow(self, n):
3033 """Increases the credit issued to the remote sender by the specified number of messages."""
3034 pn_link_flow(self._impl, n)
3035
3036 - def recv(self, limit):
3037 n, binary = pn_link_recv(self._impl, limit)
3038 if n == PN_EOS:
3039 return None
3040 else:
3041 self._check(n)
3042 return binary
3043
3045 pn_link_drain(self._impl, n)
3046
3048 return pn_link_draining(self._impl)
3049
3051
3052 values = {}
3053
3055 ni = super(NamedInt, cls).__new__(cls, i)
3056 cls.values[i] = ni
3057 return ni
3058
3061
3064
3067
3068 @classmethod
3070 return cls.values.get(i, i)
3071
3074
3076
3077 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
3078 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
3079 REJECTED = DispositionType(PN_REJECTED, "REJECTED")
3080 RELEASED = DispositionType(PN_RELEASED, "RELEASED")
3081 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
3082
3084 self._impl = impl
3085 self.local = local
3086 self._data = None
3087 self._condition = None
3088 self._annotations = None
3089
3090 @property
3092 return DispositionType.get(pn_disposition_type(self._impl))
3093
3095 return pn_disposition_get_section_number(self._impl)
3097 pn_disposition_set_section_number(self._impl, n)
3098 section_number = property(_get_section_number, _set_section_number)
3099
3101 return pn_disposition_get_section_offset(self._impl)
3103 pn_disposition_set_section_offset(self._impl, n)
3104 section_offset = property(_get_section_offset, _set_section_offset)
3105
3107 return pn_disposition_is_failed(self._impl)
3109 pn_disposition_set_failed(self._impl, b)
3110 failed = property(_get_failed, _set_failed)
3111
3113 return pn_disposition_is_undeliverable(self._impl)
3115 pn_disposition_set_undeliverable(self._impl, b)
3116 undeliverable = property(_get_undeliverable, _set_undeliverable)
3117
3119 if self.local:
3120 return self._data
3121 else:
3122 return dat2obj(pn_disposition_data(self._impl))
3124 if self.local:
3125 self._data = obj
3126 else:
3127 raise AttributeError("data attribute is read-only")
3128 data = property(_get_data, _set_data)
3129
3131 if self.local:
3132 return self._annotations
3133 else:
3134 return dat2obj(pn_disposition_annotations(self._impl))
3136 if self.local:
3137 self._annotations = obj
3138 else:
3139 raise AttributeError("annotations attribute is read-only")
3140 annotations = property(_get_annotations, _set_annotations)
3141
3143 if self.local:
3144 return self._condition
3145 else:
3146 return cond2obj(pn_disposition_condition(self._impl))
3148 if self.local:
3149 self._condition = obj
3150 else:
3151 raise AttributeError("condition attribute is read-only")
3152 condition = property(_get_condition, _set_condition)
3153
3155 """
3156 Tracks and/or records the delivery of a message over a link.
3157 """
3158
3159 RECEIVED = Disposition.RECEIVED
3160 ACCEPTED = Disposition.ACCEPTED
3161 REJECTED = Disposition.REJECTED
3162 RELEASED = Disposition.RELEASED
3163 MODIFIED = Disposition.MODIFIED
3164
3165 @staticmethod
3167 if impl is None:
3168 return None
3169 else:
3170 return Delivery(impl)
3171
3174
3176 self.local = Disposition(pn_delivery_local(self._impl), True)
3177 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3178
3179 @property
3181 """The identifier for the delivery."""
3182 return pn_delivery_tag(self._impl)
3183
3184 @property
3186 """Returns true for an outgoing delivery to which data can now be written."""
3187 return pn_delivery_writable(self._impl)
3188
3189 @property
3191 """Returns true for an incoming delivery that has data to read."""
3192 return pn_delivery_readable(self._impl)
3193
3194 @property
3196 """Returns true if the state of the delivery has been updated
3197 (e.g. it has been settled and/or accepted, rejected etc)."""
3198 return pn_delivery_updated(self._impl)
3199
3201 """
3202 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED.
3203 """
3204 obj2dat(self.local._data, pn_disposition_data(self.local._impl))
3205 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
3206 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
3207 pn_delivery_update(self._impl, state)
3208
3209 @property
3211 return pn_delivery_pending(self._impl)
3212
3213 @property
3215 """
3216 Returns true for an incoming delivery if not all the data is
3217 yet available.
3218 """
3219 return pn_delivery_partial(self._impl)
3220
3221 @property
3223 """Returns the local state of the delivery."""
3224 return DispositionType.get(pn_delivery_local_state(self._impl))
3225
3226 @property
3228 """
3229 Returns the state of the delivery as indicated by the remote
3230 peer.
3231 """
3232 return DispositionType.get(pn_delivery_remote_state(self._impl))
3233
3234 @property
3236 """
3237 Returns true if the delivery has been settled by the remote peer.
3238 """
3239 return pn_delivery_settled(self._impl)
3240
3242 """
3243 Settles the delivery locally. This indicates the aplication
3244 considers the delivery complete and does not wish to receive any
3245 further events about it. Every delivery should be settled locally.
3246 """
3247 pn_delivery_settle(self._impl)
3248
3249 @property
3252
3253 @property
3255 """
3256 Returns the link on which the delivery was sent or received.
3257 """
3258 return Link.wrap(pn_delivery_link(self._impl))
3259
3260 @property
3262 """
3263 Returns the session over which the delivery was sent or received.
3264 """
3265 return self.link.session
3266
3267 @property
3269 """
3270 Returns the connection over which the delivery was sent or received.
3271 """
3272 return self.session.connection
3273
3274 @property
3277
3280
3282
3285
3286 - def __call__(self, trans_impl, message):
3288
3290
3291 TRACE_OFF = PN_TRACE_OFF
3292 TRACE_DRV = PN_TRACE_DRV
3293 TRACE_FRM = PN_TRACE_FRM
3294 TRACE_RAW = PN_TRACE_RAW
3295
3296 CLIENT = 1
3297 SERVER = 2
3298
3299 @staticmethod
3301 if impl is None:
3302 return None
3303 else:
3304 return Transport(_impl=impl)
3305
3306 - def __init__(self, mode=None, _impl = pn_transport):
3314
3316 self._sasl = None
3317 self._ssl = None
3318
3320 if err < 0:
3321 exc = EXCEPTIONS.get(err, TransportException)
3322 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
3323 else:
3324 return err
3325
3327 pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
3328
3330 adapter = pn_transport_get_pytracer(self._impl)
3331 if adapter:
3332 return adapter.tracer
3333 else:
3334 return None
3335
3336 tracer = property(_get_tracer, _set_tracer,
3337 doc="""
3338 A callback for trace logging. The callback is passed the transport and log message.
3339 """)
3340
3341 - def log(self, message):
3342 pn_transport_log(self._impl, message)
3343
3345 pn_transport_require_auth(self._impl, bool)
3346
3347 @property
3349 return pn_transport_is_authenticated(self._impl)
3350
3352 pn_transport_require_encryption(self._impl, bool)
3353
3354 @property
3356 return pn_transport_is_encrypted(self._impl)
3357
3358 @property
3360 return pn_transport_get_user(self._impl)
3361
3362 - def bind(self, connection):
3363 """Assign a connection to the transport"""
3364 self._check(pn_transport_bind(self._impl, connection._impl))
3365
3367 """Release the connection"""
3368 self._check(pn_transport_unbind(self._impl))
3369
3371 pn_transport_trace(self._impl, n)
3372
3373 - def tick(self, now):
3374 """Process any timed events (like heartbeat generation).
3375 now = seconds since epoch (float).
3376 """
3377 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3378
3380 c = pn_transport_capacity(self._impl)
3381 if c >= PN_EOS:
3382 return c
3383 else:
3384 return self._check(c)
3385
3386 - def push(self, binary):
3387 n = self._check(pn_transport_push(self._impl, binary))
3388 if n != len(binary):
3389 raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
3390
3392 self._check(pn_transport_close_tail(self._impl))
3393
3395 p = pn_transport_pending(self._impl)
3396 if p >= PN_EOS:
3397 return p
3398 else:
3399 return self._check(p)
3400
3401 - def peek(self, size):
3402 cd, out = pn_transport_peek(self._impl, size)
3403 if cd == PN_EOS:
3404 return None
3405 else:
3406 self._check(cd)
3407 return out
3408
3409 - def pop(self, size):
3410 pn_transport_pop(self._impl, size)
3411
3413 self._check(pn_transport_close_head(self._impl))
3414
3415 @property
3417 return pn_transport_closed(self._impl)
3418
3419
3421 return pn_transport_get_max_frame(self._impl)
3422
3424 pn_transport_set_max_frame(self._impl, value)
3425
3426 max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
3427 doc="""
3428 Sets the maximum size for received frames (in bytes).
3429 """)
3430
3431 @property
3433 return pn_transport_get_remote_max_frame(self._impl)
3434
3436 return pn_transport_get_channel_max(self._impl)
3437
3439 if pn_transport_set_channel_max(self._impl, value):
3440 raise SessionException("Too late to change channel max.")
3441
3442 channel_max = property(_get_channel_max, _set_channel_max,
3443 doc="""
3444 Sets the maximum channel that may be used on the transport.
3445 """)
3446
3447 @property
3449 return pn_transport_remote_channel_max(self._impl)
3450
3451
3453 return millis2secs(pn_transport_get_idle_timeout(self._impl))
3454
3456 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
3457
3458 idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
3459 doc="""
3460 The idle timeout of the connection (float, in seconds).
3461 """)
3462
3463 @property
3465 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
3466
3467 @property
3469 return pn_transport_get_frames_output(self._impl)
3470
3471 @property
3474
3477
3478 - def ssl(self, domain=None, session_details=None):
3479
3480 if not self._ssl:
3481 self._ssl = SSL(self, domain, session_details)
3482 return self._ssl
3483
3484 @property
3486 return cond2obj(pn_transport_condition(self._impl))
3487
3488 @property
3491
3494
3495 -class SASL(Wrapper):
3496
3497 OK = PN_SASL_OK
3498 AUTH = PN_SASL_AUTH
3499 SYS = PN_SASL_SYS
3500 PERM = PN_SASL_PERM
3501 TEMP = PN_SASL_TEMP
3502
3503 @staticmethod
3505 return pn_sasl_extended()
3506
3510
3512 if err < 0:
3513 exc = EXCEPTIONS.get(err, SASLException)
3514 raise exc("[%s]" % (err))
3515 else:
3516 return err
3517
3518 @property
3520 return pn_sasl_get_user(self._sasl)
3521
3522 @property
3524 return pn_sasl_get_mech(self._sasl)
3525
3526 @property
3528 outcome = pn_sasl_outcome(self._sasl)
3529 if outcome == PN_SASL_NONE:
3530 return None
3531 else:
3532 return outcome
3533
3535 pn_sasl_allowed_mechs(self._sasl, mechs)
3536
3538 return pn_sasl_get_allow_insecure_mechs(self._sasl)
3539
3541 pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
3542
3543 allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs,
3544 doc="""
3545 Allow unencrypted cleartext passwords (PLAIN mech)
3546 """)
3547
3548 - def done(self, outcome):
3549 pn_sasl_done(self._sasl, outcome)
3550
3552 pn_sasl_config_name(self._sasl, name)
3553
3555 pn_sasl_config_path(self._sasl, path)
3556
3559
3562
3563 -class SSLDomain(object):
3564
3565 MODE_CLIENT = PN_SSL_MODE_CLIENT
3566 MODE_SERVER = PN_SSL_MODE_SERVER
3567 VERIFY_PEER = PN_SSL_VERIFY_PEER
3568 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
3569 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
3570
3571 - def __init__(self, mode):
3572 self._domain = pn_ssl_domain(mode)
3573 if self._domain is None:
3574 raise SSLUnavailable()
3575
3576 - def _check(self, err):
3577 if err < 0:
3578 exc = EXCEPTIONS.get(err, SSLException)
3579 raise exc("SSL failure.")
3580 else:
3581 return err
3582
3583 - def set_credentials(self, cert_file, key_file, password):
3584 return self._check( pn_ssl_domain_set_credentials(self._domain,
3585 cert_file, key_file,
3586 password) )
3587 - def set_trusted_ca_db(self, certificate_db):
3588 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
3589 certificate_db) )
3590 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3591 return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
3592 verify_mode,
3593 trusted_CAs) )
3594
3596 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3597
3598 - def __del__(self):
3599 pn_ssl_domain_free(self._domain)
3600
3602
3603 @staticmethod
3605 return pn_ssl_present()
3606
3613
3614 - def __new__(cls, transport, domain, session_details=None):
3615 """Enforce a singleton SSL object per Transport"""
3616 if transport._ssl:
3617
3618
3619
3620 ssl = transport._ssl
3621 if (domain and (ssl._domain is not domain) or
3622 session_details and (ssl._session_details is not session_details)):
3623 raise SSLException("Cannot re-configure existing SSL object!")
3624 else:
3625 obj = super(SSL, cls).__new__(cls)
3626 obj._domain = domain
3627 obj._session_details = session_details
3628 session_id = None
3629 if session_details:
3630 session_id = session_details.get_session_id()
3631 obj._ssl = pn_ssl( transport._impl )
3632 if obj._ssl is None:
3633 raise SSLUnavailable()
3634 if domain:
3635 pn_ssl_init( obj._ssl, domain._domain, session_id )
3636 transport._ssl = obj
3637 return transport._ssl
3638
3640 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
3641 if rc:
3642 return name
3643 return None
3644
3646 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
3647 if rc:
3648 return name
3649 return None
3650
3651 SHA1 = PN_SSL_SHA1
3652 SHA256 = PN_SSL_SHA256
3653 SHA512 = PN_SSL_SHA512
3654 MD5 = PN_SSL_MD5
3655
3656 CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME
3657 CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE
3658 CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY
3659 CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME
3660 CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT
3661 CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME
3662
3664 subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name)
3665 return subfield_value
3666
3668 subject = pn_ssl_get_remote_subject(self._ssl)
3669 return subject
3670
3674
3675
3678
3681
3684
3687
3690
3693
3695 rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name)
3696 if rc == PN_OK:
3697 return fingerprint_str
3698 return None
3699
3700
3703
3706
3710
3714
3717
3718 @property
3720 return pn_ssl_get_remote_subject( self._ssl )
3721
3722 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
3723 RESUME_NEW = PN_SSL_RESUME_NEW
3724 RESUME_REUSED = PN_SSL_RESUME_REUSED
3725
3727 return pn_ssl_resume_status( self._ssl )
3728
3730 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3732 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 )
3733 self._check(err)
3734 return utf82unicode(name)
3735 peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
3736 doc="""
3737 Manage the expected name of the remote peer. Used to authenticate the remote.
3738 """)
3739
3742 """ Unique identifier for the SSL session. Used to resume previous session on a new
3743 SSL connection.
3744 """
3745
3747 self._session_id = session_id
3748
3750 return self._session_id
3751
3752
3753 wrappers = {
3754 "pn_void": lambda x: pn_void2py(x),
3755 "pn_pyref": lambda x: pn_void2py(x),
3756 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)),
3757 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
3758 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
3759 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
3760 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
3761 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
3762 }
3765
3767 self._impl = pn_collector()
3768
3769 - def put(self, obj, etype):
3770 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3771
3773 return Event.wrap(pn_collector_peek(self._impl))
3774
3776 ev = self.peek()
3777 pn_collector_pop(self._impl)
3778
3780 pn_collector_free(self._impl)
3781 del self._impl
3782
3783 if "TypeExtender" not in globals():
3786 self.number = number
3788 try:
3789 return self.number
3790 finally:
3791 self.number += 1
3792
3794
3795 _lock = threading.Lock()
3796 _extended = TypeExtender(10000)
3797 TYPES = {}
3798
3799 - def __init__(self, name=None, number=None, method=None):
3800 if name is None and number is None:
3801 raise TypeError("extended events require a name")
3802 try:
3803 self._lock.acquire()
3804 if name is None:
3805 name = pn_event_type_name(number)
3806
3807 if number is None:
3808 number = self._extended.next()
3809
3810 if method is None:
3811 method = "on_%s" % name
3812
3813 self.name = name
3814 self.number = number
3815 self.method = method
3816
3817 self.TYPES[number] = self
3818 finally:
3819 self._lock.release()
3820
3823
3830
3832
3833 - def __init__(self, clazz, context, type):
3837
3840
3841 -def _none(x): return None
3842
3843 DELEGATED = Constant("DELEGATED")
3844
3845 -def _core(number, method):
3846 return EventType(number=number, method=method)
3847
3848 -class Event(Wrapper, EventBase):
3849
3850 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init")
3851 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced")
3852 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final")
3853
3854 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task")
3855
3856 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init")
3857 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound")
3858 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound")
3859 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open")
3860 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close")
3861 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open")
3862 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close")
3863 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final")
3864
3865 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init")
3866 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open")
3867 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close")
3868 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open")
3869 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close")
3870 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final")
3871
3872 LINK_INIT = _core(PN_LINK_INIT, "on_link_init")
3873 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open")
3874 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close")
3875 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach")
3876 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open")
3877 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close")
3878 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach")
3879 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow")
3880 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final")
3881
3882 DELIVERY = _core(PN_DELIVERY, "on_delivery")
3883
3884 TRANSPORT = _core(PN_TRANSPORT, "on_transport")
3885 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error")
3886 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed")
3887 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
3888 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed")
3889
3890 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init")
3891 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated")
3892 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable")
3893 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
3894 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
3895 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error")
3896 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final")
3897
3898 @staticmethod
3899 - def wrap(impl, number=None):
3900 if impl is None:
3901 return None
3902
3903 if number is None:
3904 number = pn_event_type(impl)
3905
3906 event = Event(impl, number)
3907
3908
3909
3910 if pn_event_class(impl) == PN_PYREF and \
3911 isinstance(event.context, EventBase):
3912 return event.context
3913 else:
3914 return event
3915
3919
3922
3926
3927 @property
3929 cls = pn_event_class(self._impl)
3930 if cls:
3931 return pn_class_name(cls)
3932 else:
3933 return None
3934
3935 @property
3937 return WrappedHandler.wrap(pn_event_root(self._impl))
3938
3939 @property
3940 - def context(self):
3941 """Returns the context object associated with the event. The type of this depend on the type of event."""
3942 return wrappers[self.clazz](pn_event_context(self._impl))
3943
3944 - def dispatch(self, handler, type=None):
3953
3954
3955 @property
3957 """Returns the reactor associated with the event."""
3958 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
3959
3961 r = self.reactor
3962 if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name:
3963 return r
3964 else:
3965 return super(Event, self).__getattr__(name)
3966
3967 @property
3969 """Returns the transport associated with the event, or null if none is associated with it."""
3970 return Transport.wrap(pn_event_transport(self._impl))
3971
3972 @property
3974 """Returns the connection associated with the event, or null if none is associated with it."""
3975 return Connection.wrap(pn_event_connection(self._impl))
3976
3977 @property
3979 """Returns the session associated with the event, or null if none is associated with it."""
3980 return Session.wrap(pn_event_session(self._impl))
3981
3982 @property
3984 """Returns the link associated with the event, or null if none is associated with it."""
3985 return Link.wrap(pn_event_link(self._impl))
3986
3987 @property
3989 """Returns the sender link associated with the event, or null if
3990 none is associated with it. This is essentially an alias for
3991 link(), that does an additional checkon the type of the
3992 link."""
3993 l = self.link
3994 if l and l.is_sender:
3995 return l
3996 else:
3997 return None
3998
3999 @property
4001 """Returns the receiver link associated with the event, or null if
4002 none is associated with it. This is essentially an alias for
4003 link(), that does an additional checkon the type of the link."""
4004 l = self.link
4005 if l and l.is_receiver:
4006 return l
4007 else:
4008 return None
4009
4010 @property
4012 """Returns the delivery associated with the event, or null if none is associated with it."""
4013 return Delivery.wrap(pn_event_delivery(self._impl))
4014
4017
4020 if obj is None:
4021 return self
4022 ret = []
4023 obj.__dict__['handlers'] = ret
4024 return ret
4025
4031
4033
4034 - def __init__(self, handler, on_error=None):
4037
4041
4047
4050 self.handlers = []
4051 self.delegate = weakref.ref(delegate)
4052
4054 delegate = self.delegate()
4055 if delegate:
4056 dispatch(delegate, method, event)
4057
4061 if obj is None:
4062 return None
4063 return self.surrogate(obj).handlers
4064
4066 self.surrogate(obj).handlers = value
4067
4069 key = "_surrogate"
4070 objdict = obj.__dict__
4071 surrogate = objdict.get(key, None)
4072 if surrogate is None:
4073 objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj)
4074 obj.add(surrogate)
4075 return surrogate
4076
4078
4079 handlers = WrappedHandlersProperty()
4080
4081 @classmethod
4082 - def wrap(cls, impl, on_error=None):
4089
4090 - def __init__(self, impl_or_constructor):
4091 Wrapper.__init__(self, impl_or_constructor)
4092 if list(self.__class__.__mro__).index(WrappedHandler) > 1:
4093
4094 self.handlers.extend([])
4095
4102
4103 - def add(self, handler):
4104 if handler is None: return
4105 impl = _chandler(handler, self._on_error)
4106 pn_handler_add(self._impl, impl)
4107 pn_decref(impl)
4108
4110 pn_handler_clear(self._impl)
4111
4113 if obj is None:
4114 return None
4115 elif isinstance(obj, WrappedHandler):
4116 impl = obj._impl
4117 pn_incref(impl)
4118 return impl
4119 else:
4120 return pn_pyhandler(_cadapter(obj, on_error))
4121
4123 """
4124 Simple URL parser/constructor, handles URLs of the form:
4125
4126 <scheme>://<user>:<password>@<host>:<port>/<path>
4127
4128 All components can be None if not specifeid in the URL string.
4129
4130 The port can be specified as a service name, e.g. 'amqp' in the
4131 URL string but Url.port always gives the integer value.
4132
4133 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
4134 @ivar user: Username
4135 @ivar password: Password
4136 @ivar host: Host name, ipv6 literal or ipv4 dotted quad.
4137 @ivar port: Integer port.
4138 @ivar host_port: Returns host:port
4139 """
4140
4141 AMQPS = "amqps"
4142 AMQP = "amqp"
4143
4145 """An integer port number that can be constructed from a service name string"""
4146
4148 """@param value: integer port number or string service name."""
4149 port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
4150 setattr(port, 'name', str(value))
4151 return port
4152
4153 - def __eq__(self, x): return str(self) == x or int(self) == x
4154 - def __ne__(self, x): return not self == x
4156
4157 @staticmethod
4159 """Convert service, an integer or a service name, into an integer port number."""
4160 try:
4161 return int(value)
4162 except ValueError:
4163 try:
4164 return socket.getservbyname(value)
4165 except socket.error:
4166
4167 if value == Url.AMQPS: return 5671
4168 elif value == Url.AMQP: return 5672
4169 else:
4170 raise ValueError("Not a valid port number or service name: '%s'" % value)
4171
4172 - def __init__(self, url=None, defaults=True, **kwargs):
4173 """
4174 @param url: URL string to parse.
4175 @param defaults: If true, fill in missing default values in the URL.
4176 If false, you can fill them in later by calling self.defaults()
4177 @param kwargs: scheme, user, password, host, port, path.
4178 If specified, replaces corresponding part in url string.
4179 """
4180 if url:
4181 self._url = pn_url_parse(unicode2utf8(str(url)))
4182 if not self._url: raise ValueError("Invalid URL '%s'" % url)
4183 else:
4184 self._url = pn_url()
4185 for k in kwargs:
4186 getattr(self, k)
4187 setattr(self, k, kwargs[k])
4188 if defaults: self.defaults()
4189
4192 self.getter = globals()["pn_url_get_%s" % part]
4193 self.setter = globals()["pn_url_set_%s" % part]
4194 - def __get__(self, obj, type=None): return self.getter(obj._url)
4195 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
4196
4197 scheme = PartDescriptor('scheme')
4198 username = PartDescriptor('username')
4199 password = PartDescriptor('password')
4200 host = PartDescriptor('host')
4201 path = PartDescriptor('path')
4202
4204 portstr = pn_url_get_port(self._url)
4205 return portstr and Url.Port(portstr)
4206
4208 if value is None: pn_url_set_port(self._url, None)
4209 else: pn_url_set_port(self._url, str(Url.Port(value)))
4210
4211 port = property(_get_port, _set_port)
4212
4213 - def __str__(self): return pn_url_str(self._url)
4214
4215 - def __repr__(self): return "Url(%r)" % str(self)
4216
4217 - def __eq__(self, x): return str(self) == str(x)
4218 - def __ne__(self, x): return not self == x
4219
4221 pn_url_free(self._url);
4222 del self._url
4223
4225 """
4226 Fill in missing values (scheme, host or port) with defaults
4227 @return: self
4228 """
4229 self.scheme = self.scheme or self.AMQP
4230 self.host = self.host or '0.0.0.0'
4231 self.port = self.port or self.Port(self.scheme)
4232 return self
4233
4234 __all__ = [
4235 "API_LANGUAGE",
4236 "IMPLEMENTATION_LANGUAGE",
4237 "ABORTED",
4238 "ACCEPTED",
4239 "AUTOMATIC",
4240 "PENDING",
4241 "MANUAL",
4242 "REJECTED",
4243 "RELEASED",
4244 "MODIFIED",
4245 "SETTLED",
4246 "UNDESCRIBED",
4247 "Array",
4248 "Collector",
4249 "Condition",
4250 "Connection",
4251 "Data",
4252 "Delivery",
4253 "Disposition",
4254 "Described",
4255 "Endpoint",
4256 "Event",
4257 "EventType",
4258 "Handler",
4259 "Link",
4260 "Message",
4261 "MessageException",
4262 "Messenger",
4263 "MessengerException",
4264 "ProtonException",
4265 "VERSION_MAJOR",
4266 "VERSION_MINOR",
4267 "Receiver",
4268 "SASL",
4269 "Sender",
4270 "Session",
4271 "SessionException",
4272 "SSL",
4273 "SSLDomain",
4274 "SSLSessionDetails",
4275 "SSLUnavailable",
4276 "SSLException",
4277 "Terminus",
4278 "Timeout",
4279 "Interrupt",
4280 "Transport",
4281 "TransportException",
4282 "Url",
4283 "char",
4284 "dispatch",
4285 "symbol",
4286 "timestamp",
4287 "ulong",
4288 "byte",
4289 "short",
4290 "int32",
4291 "ubyte",
4292 "ushort",
4293 "uint",
4294 "float32",
4295 "decimal32",
4296 "decimal64",
4297 "decimal128"
4298 ]
4299