Tutorial¶
If you want to connect to an AMQP broker, you need: * its address (and port) * login and password * name of the virtual host
An idea of a heartbeat interval would be good, but you can do without. Since CoolAMQP will support clusters in the future, you should define the nodes first. You can do it using _NodeDefinition_. See NodeDefinition’s documentation for alternative ways to do this, but here we will use the AMQP connection string.
- class coolamqp.objects.NodeDefinition(*args, **kwargs)¶
Definition of a reachable AMQP node.
This object is hashable.
>>> a = NodeDefinition(host='192.168.0.1', user='admin', password='password', >>> virtual_host='vhost')
or
>>> a = NodeDefinition('192.168.0.1', 'admin', 'password')
or
>>> a = NodeDefinition('amqp://user:password@host/virtual_host')
or
>>> a = NodeDefinition('amqp://user:password@host:port/virtual_host', hearbeat=20)
AMQP connection string may be either bytes or str/unicode
- Additional keyword parameters that can be specified:
heartbeat - heartbeat interval in seconds port - TCP port to use. Default is 5672
- Raises:
ValueError – invalid parameters
from coolamqp.objects import NodeDefinition
node = NodeDefinition('amqp://user@password:host/vhost')
Cluster instances are used to interface with the cluster (or a single broker). It accepts a list of nodes:
from coolamqp.clustering import Cluster
cluster = Cluster([node], name='My Cluster')
cluster.start(wait=True)
wait=True will block until connection is completed. After this, you can use other methods.
name is optional. If you specify it, and have setproctitle installed, the thread will receive a provided label, postfixed by AMQP listener thread.
- class coolamqp.clustering.Cluster(nodes, on_fail=None, extra_properties=None, log_frames=None, name=None, on_blocked=None, tracer=None)¶
Frontend for your AMQP needs.
This has ListenerThread.
Call .start() to connect to AMQP.
It is not safe to fork() after .start() is called, but it’s OK before.
- Parameters:
nodes – list of nodes, or a single node. For now, only one is supported.
on_fail – callable/0 to call when connection fails in an unclean way. This is a one-shot
extra_properties – refer to documentation in [/coolamqp/connection/connection.py] Connection.__init__
log_frames (tp.Optional[
coolamqp.tracing.BaseFrameTracer
]) – an object that supports logging each and every frame CoolAMQP sends and receives from the brokername – name to appear in log items and prctl() for the listener thread
on_blocked – callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be called with a value of True if connection becomes blocked, and False upon an unblock
tracer – tracer, if opentracing is installed
- bind(queue, exchange, routing_key, persistent=False, span=None, dont_trace=False)¶
Bind a queue to an exchange
- consume(queue, on_message=None, span=None, dont_trace=False, *args, **kwargs)¶
Start consuming from a queue.
args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer). Don’t use future_to_notify - it’s done here!
Take care not to lose the Consumer object - it’s the only way to cancel a consumer!
- Parameters:
queue – Queue object, being consumed from right now. Note that name of anonymous queue might change at any time!
on_message – callable that will process incoming messages if you leave it at None, messages will be .put into self.events
span – optional span, if opentracing is installed
dont_trace – if True, this won’t output a span
- Return type:
Tuple
[Consumer
,Future
]- Returns:
a tuple (Consumer instance, and a Future), that tells, when consumer is ready
- declare(obj, persistent=False, span=None, dont_trace=False)¶
Declare a Queue/Exchange
- Parameters:
obj (tp.Union[Queue, Exchange]) – Queue/Exchange object
persistent (bool) – should it be redefined upon reconnect?
span (tp.Optional[opentracing.Span]) – optional parent span, if opentracing is installed
dont_trace (bool) – if True, a span won’t be output
- Return type:
concurrent.futures.Future
- Returns:
Future
- delete_queue(queue)¶
Delete a queue.
- Parameters:
queue (coolamqp.objects.Queue) – Queue instance that represents what to delete
- Return type:
Future
- Returns:
a Future (will succeed with None or fail with AMQPError)
- drain(timeout, span=None, dont_trace=False)¶
Return an Event.
- Parameters:
timeout – time to wait for an event. 0 means return immediately. None means block forever
span – optional parent span, if opentracing is installed
dont_trace – if True, this span won’t be traced
- Return type:
Event
- Returns:
an Event instance. NothingMuch is returned when there’s nothing within a given timoeout
- publish(message, exchange=None, routing_key='', tx=None, confirm=None, span=None, dont_trace=False)¶
Publish a message.
- Parameters:
message (Message) – Message to publish
exchange (tp.Union[Exchange, str, bytes]) – exchange to use. Default is the “direct” empty-name exchange.
routing_key (tp.Union[str, bytes]) – routing key to use
confirm (tp.Optional[bool]) – Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded
tx (tp.Optional[bool]) – deprecated, alias for confirm
span (tp.Optional[opentracing.Span]) – optionally, current span, if opentracing is installed
dont_trace (bool) – if set to True, a span won’t be generated
- Return type:
tp.Optional[Future]
- Returns:
Future to be finished on completion or None, is confirm/tx was not chosen
- shutdown(wait=True)¶
Terminate all connections, release resources - finish the job.
- Parameters:
wait (
bool
) – block until this is done- Raises:
RuntimeError – if called without start() being called first
- Return type:
None
- start(wait=True, timeout=10.0)¶
Connect to broker. Initialize Cluster.
Only after this call is Cluster usable. It is not safe to fork after this.
- Parameters:
wait (
bool
) – block until connection is readytimeout (
float
) – timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised
- Raises:
RuntimeError – called more than once
ConnectionDead – failed to connect within timeout
- Return type:
None
Publishing and consuming¶
Connecting is boring. After we do, we want to do something! Let’s try sending a message, and receiving it. To do that, you must first define a queue, and register a consumer.
from coolamqp.objects import Queue
queue = Queue(u'my_queue', auto_delete=True, exclusive=True)
consumer, consume_confirm = cluster.consume(queue, no_ack=False)
consume_confirm.result() # wait for consuming to start
This will create an auto-delete and exclusive queue. After than, a consumer will be registered for this queue. _no_ack=False_ will mean that we have to manually confirm messages.
You can specify a callback, that will be called with a message if one’s received by this consumer. Since we did not do that, this will go to a generic queue belonging to _Cluster_.
_consumer_ is a _Consumer_ object. This allows us to do some things with the consumer (such as setting QoS), but most importantly it allows us to cancel it later. _consume_confirm_ is a _Future_, that will succeed when AMQP _basic.consume-ok_ is received.
To send a message we need to construct it first, and later publish:
from coolamqp.objects import Message
msg = Message(b'hello world', properties=Message.Properties())
cluster.publish(msg, routing_key=u'my_queue')
- class coolamqp.objects.Message(body, properties=None)¶
An AMQP message. Has a binary body, and some properties.
Properties is a highly regularized class - see coolamqp.framing.definitions.BasicContentPropertyList for a list of possible properties.
- Parameters:
body (anything with a buffer interface) – stream of octets
properties (MessageProperties instance, None or a dict (SLOW!)) – AMQP properties to be sent along. default is ‘no properties at all’ You can pass a dict - it will be passed to MessageProperties, but it’s slow - don’t do that.
- Properties¶
alias of
BasicContentPropertyList
This creates a message with no properties, and sends it through default (direct) exchange to our queue. Note that CoolAMQP simply considers your messages to be bags of bytes + properties. It will not modify them, nor decode, and will always expect and return bytes.
To actually get our message, we need to start a consumer first. To do that, just invoke:
cons, fut = cluster.consume(Queue('name of the queue'), **kwargs)
Where kwargs are passed directly to Consumer class. cons is a Consumer object, and fut is a Future that will happen when listening has been registered on target server.
- class coolamqp.attaches.Consumer(queue, on_message, span=None, no_ack=True, qos=None, cancel_on_failure=False, future_to_notify=None, fail_on_first_time_resource_locked=False, body_receive_mode=0)¶
This object represents a consumer in the system.
Consumer may reside on any AMQP broker, this is to be decided by CoolAMQP. Consumer, when created, has the state of ST_SYNCING. CoolAMQP will try to declare the consumer where it makes most sense for it to be.
If it succeeds, the consumer will enter state ST_ONLINE, and callables on_start will be called. This means that broker has confirmed that this consumer is operational and receiving messages.
Note that does not attempt to cancel consumers, or any of such nonsense. Having a channel per consumer gives you the unique possibility of simply closing the channel. Since this implies cancelling the consumer, here you go.
- WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS
DO!
You can subscribe to be informed when the consumer is cancelled (for any reason, server or client side) with:
>>> con, fut = Cluster.consume(...) >>> def im_called_on_cancel_for_any_reason(): # must have arity of 0 >>> .. >>> con.on_cancel.add(im_called_on_cancel_for_any_reason) >>> con.cancel()
Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel Notification:
>>> con.on_broker_cancel.add(im_cancelled_by_broker)
- Parameters:
queue (coolamqp.objects.Queue) – Queue object, being consumed from right now. Note that name of anonymous queue might change at any time!
on_message (callable(ReceivedMessage instance)) – callable that will process incoming messages
span – optional span, if opentracing is installed
no_ack (bool) – Will this consumer require acknowledges from messages?
qos (tuple(int, int) or tuple(None, int) or int) – a tuple of (prefetch size, prefetch window) for this consumer, or an int (prefetch window only). If an int is passed, prefetch size will be set to 0 (which means undefined), and this int will be used for prefetch window
cancel_on_failure (bool) – Consumer will cancel itself when link goes down
future_to_notify (concurrent.futures.Future) – Future to succeed when this consumer goes online for the first time. This future can also raise with AMQPError if it fails to.
fail_on_first_time_resource_locked (bool) – When consumer is declared for the first time, and RESOURCE_LOCKED is encountered, it will fail the future with ResourceLocked, and consumer will cancel itself. By default it will retry until success is made. If the consumer doesn’t get the chance to be declared - because of a connection fail - next reconnect will consider this to be SECOND declaration, ie. it will retry ad infinitum
body_receive_mode (a property of BodyReceiveMode) – how should message.body be received. This has a performance impact
- cancel()¶
Cancel the customer.
.ack() or .nack() for messages from this customer will have no effect.
- Return type:
Future
- Returns:
a Future to tell when it’s done. The future will always succeed - sooner, or later. NOTE: Future is OK’d when entire channel is destroyed
- cancelled¶
public, if this is True, it won’t be attached to next connection
- on_broker_cancel¶
public, called on Customer Cancel Notification
- on_cancel¶
public, called on cancel for any reason
- on_close(payload=None)¶
Handle closing the channel. It sounds like an exception…
- Return type:
None
This is done in two steps: 1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE) upon detecting
that no more messages will be there
self.channel_id <- None, channel is returned to Connection - c hannel has been physically torn down
Note, this can be called multiple times, and eventually with None.
- on_delivery(sth)¶
Callback for delivery-related shit
- Parameters:
sth – AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or AMQPBodyFrame
- on_operational(operational)¶
[EXTEND ME] Called by internal methods (on_*) when channel has achieved (or lost) operational status.
If this is called with operational=True, then for sure it will be called with operational=False.
This will, therefore, get called an even number of times.
- Called by Channeler, when:
- Channeler.on_close gets called and state is ST_ONLINE
on_close registers ChannelClose, ChannelCloseOk, BasicCancel
- Parameters:
operational (
bool
) – True if channel has just become operational, False if it has just become useless.- Return type:
None
- on_setup(payload)¶
Called with different kinds of frames - during setup
- Return type:
None
- set_qos(prefetch_size, prefetch_count)¶
Set new QoS for this consumer.
- Parameters:
prefetch_size (
int
) – prefetch in octetsprefetch_count (
int
) – prefetch in whole messages
- Return type:
None