Tutorial

If you want to connect to an AMQP broker, you need: * its address (and port) * login and password * name of the virtual host

An idea of a heartbeat interval would be good, but you can do without. Since CoolAMQP will support clusters in the future, you should define the nodes first. You can do it using _NodeDefinition_. See NodeDefinition’s documentation for alternative ways to do this, but here we will use the AMQP connection string.

class coolamqp.objects.NodeDefinition(*args, **kwargs)

Definition of a reachable AMQP node.

This object is hashable.

>>> a = NodeDefinition(host='192.168.0.1', user='admin', password='password',
>>>                   virtual_host='vhost')

or

>>> a = NodeDefinition('192.168.0.1', 'admin', 'password')

or

>>> a = NodeDefinition('amqp://user:password@host/virtual_host')

or

>>> a = NodeDefinition('amqp://user:password@host:port/virtual_host', hearbeat=20)

AMQP connection string may be either bytes or str/unicode

Additional keyword parameters that can be specified:

heartbeat - heartbeat interval in seconds port - TCP port to use. Default is 5672

Raises:

ValueError – invalid parameters

from coolamqp.objects import NodeDefinition

node = NodeDefinition('amqp://user@password:host/vhost')

Cluster instances are used to interface with the cluster (or a single broker). It accepts a list of nodes:

from coolamqp.clustering import Cluster
cluster = Cluster([node], name='My Cluster')
cluster.start(wait=True)

wait=True will block until connection is completed. After this, you can use other methods.

name is optional. If you specify it, and have setproctitle installed, the thread will receive a provided label, postfixed by AMQP listener thread.

class coolamqp.clustering.Cluster(nodes, on_fail=None, extra_properties=None, log_frames=None, name=None, on_blocked=None, tracer=None)

Frontend for your AMQP needs.

This has ListenerThread.

Call .start() to connect to AMQP.

It is not safe to fork() after .start() is called, but it’s OK before.

Parameters:
  • nodes – list of nodes, or a single node. For now, only one is supported.

  • on_fail – callable/0 to call when connection fails in an unclean way. This is a one-shot

  • extra_properties – refer to documentation in [/coolamqp/connection/connection.py] Connection.__init__

  • log_frames (tp.Optional[coolamqp.tracing.BaseFrameTracer]) – an object that supports logging each and every frame CoolAMQP sends and receives from the broker

  • name – name to appear in log items and prctl() for the listener thread

  • on_blocked – callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be called with a value of True if connection becomes blocked, and False upon an unblock

  • tracer – tracer, if opentracing is installed

bind(queue, exchange, routing_key, persistent=False, span=None, dont_trace=False)

Bind a queue to an exchange

consume(queue, on_message=None, span=None, dont_trace=False, *args, **kwargs)

Start consuming from a queue.

args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer). Don’t use future_to_notify - it’s done here!

Take care not to lose the Consumer object - it’s the only way to cancel a consumer!

Parameters:
  • queue – Queue object, being consumed from right now. Note that name of anonymous queue might change at any time!

  • on_message – callable that will process incoming messages if you leave it at None, messages will be .put into self.events

  • span – optional span, if opentracing is installed

  • dont_trace – if True, this won’t output a span

Return type:

Tuple[Consumer, Future]

Returns:

a tuple (Consumer instance, and a Future), that tells, when consumer is ready

declare(obj, persistent=False, span=None, dont_trace=False)

Declare a Queue/Exchange

Parameters:
  • obj (tp.Union[Queue, Exchange]) – Queue/Exchange object

  • persistent (bool) – should it be redefined upon reconnect?

  • span (tp.Optional[opentracing.Span]) – optional parent span, if opentracing is installed

  • dont_trace (bool) – if True, a span won’t be output

Return type:

concurrent.futures.Future

Returns:

Future

delete_queue(queue)

Delete a queue.

Parameters:

queue (coolamqp.objects.Queue) – Queue instance that represents what to delete

Return type:

Future

Returns:

a Future (will succeed with None or fail with AMQPError)

drain(timeout, span=None, dont_trace=False)

Return an Event.

Parameters:
  • timeout – time to wait for an event. 0 means return immediately. None means block forever

  • span – optional parent span, if opentracing is installed

  • dont_trace – if True, this span won’t be traced

Return type:

Event

Returns:

an Event instance. NothingMuch is returned when there’s nothing within a given timoeout

publish(message, exchange=None, routing_key='', tx=None, confirm=None, span=None, dont_trace=False)

Publish a message.

Parameters:
  • message (Message) – Message to publish

  • exchange (tp.Union[Exchange, str, bytes]) – exchange to use. Default is the “direct” empty-name exchange.

  • routing_key (tp.Union[str, bytes]) – routing key to use

  • confirm (tp.Optional[bool]) – Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded

  • tx (tp.Optional[bool]) – deprecated, alias for confirm

  • span (tp.Optional[opentracing.Span]) – optionally, current span, if opentracing is installed

  • dont_trace (bool) – if set to True, a span won’t be generated

Return type:

tp.Optional[Future]

Returns:

Future to be finished on completion or None, is confirm/tx was not chosen

shutdown(wait=True)

Terminate all connections, release resources - finish the job.

Parameters:

wait (bool) – block until this is done

Raises:

RuntimeError – if called without start() being called first

Return type:

None

start(wait=True, timeout=10.0)

Connect to broker. Initialize Cluster.

Only after this call is Cluster usable. It is not safe to fork after this.

Parameters:
  • wait (bool) – block until connection is ready

  • timeout (float) – timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised

Raises:
  • RuntimeError – called more than once

  • ConnectionDead – failed to connect within timeout

Return type:

None

Publishing and consuming

Connecting is boring. After we do, we want to do something! Let’s try sending a message, and receiving it. To do that, you must first define a queue, and register a consumer.

from coolamqp.objects import Queue

queue = Queue(u'my_queue', auto_delete=True, exclusive=True)

consumer, consume_confirm = cluster.consume(queue, no_ack=False)
consume_confirm.result()    # wait for consuming to start

This will create an auto-delete and exclusive queue. After than, a consumer will be registered for this queue. _no_ack=False_ will mean that we have to manually confirm messages.

You can specify a callback, that will be called with a message if one’s received by this consumer. Since we did not do that, this will go to a generic queue belonging to _Cluster_.

_consumer_ is a _Consumer_ object. This allows us to do some things with the consumer (such as setting QoS), but most importantly it allows us to cancel it later. _consume_confirm_ is a _Future_, that will succeed when AMQP _basic.consume-ok_ is received.

To send a message we need to construct it first, and later publish:

from coolamqp.objects import Message

msg = Message(b'hello world', properties=Message.Properties())
cluster.publish(msg, routing_key=u'my_queue')
class coolamqp.objects.Message(body, properties=None)

An AMQP message. Has a binary body, and some properties.

Properties is a highly regularized class - see coolamqp.framing.definitions.BasicContentPropertyList for a list of possible properties.

Parameters:
  • body (anything with a buffer interface) – stream of octets

  • properties (MessageProperties instance, None or a dict (SLOW!)) – AMQP properties to be sent along. default is ‘no properties at all’ You can pass a dict - it will be passed to MessageProperties, but it’s slow - don’t do that.

Properties

alias of BasicContentPropertyList

This creates a message with no properties, and sends it through default (direct) exchange to our queue. Note that CoolAMQP simply considers your messages to be bags of bytes + properties. It will not modify them, nor decode, and will always expect and return bytes.

To actually get our message, we need to start a consumer first. To do that, just invoke:

cons, fut = cluster.consume(Queue('name of the queue'), **kwargs)

Where kwargs are passed directly to Consumer class. cons is a Consumer object, and fut is a Future that will happen when listening has been registered on target server.

class coolamqp.attaches.Consumer(queue, on_message, span=None, no_ack=True, qos=None, cancel_on_failure=False, future_to_notify=None, fail_on_first_time_resource_locked=False, body_receive_mode=0)

This object represents a consumer in the system.

Consumer may reside on any AMQP broker, this is to be decided by CoolAMQP. Consumer, when created, has the state of ST_SYNCING. CoolAMQP will try to declare the consumer where it makes most sense for it to be.

If it succeeds, the consumer will enter state ST_ONLINE, and callables on_start will be called. This means that broker has confirmed that this consumer is operational and receiving messages.

Note that does not attempt to cancel consumers, or any of such nonsense. Having a channel per consumer gives you the unique possibility of simply closing the channel. Since this implies cancelling the consumer, here you go.

WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS

DO!

You can subscribe to be informed when the consumer is cancelled (for any reason, server or client side) with:

>>> con, fut = Cluster.consume(...)
>>> def im_called_on_cancel_for_any_reason():   # must have arity of 0
>>>     ..
>>> con.on_cancel.add(im_called_on_cancel_for_any_reason)
>>> con.cancel()

Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel Notification:

>>> con.on_broker_cancel.add(im_cancelled_by_broker)
Parameters:
  • queue (coolamqp.objects.Queue) – Queue object, being consumed from right now. Note that name of anonymous queue might change at any time!

  • on_message (callable(ReceivedMessage instance)) – callable that will process incoming messages

  • span – optional span, if opentracing is installed

  • no_ack (bool) – Will this consumer require acknowledges from messages?

  • qos (tuple(int, int) or tuple(None, int) or int) – a tuple of (prefetch size, prefetch window) for this consumer, or an int (prefetch window only). If an int is passed, prefetch size will be set to 0 (which means undefined), and this int will be used for prefetch window

  • cancel_on_failure (bool) – Consumer will cancel itself when link goes down

  • future_to_notify (concurrent.futures.Future) – Future to succeed when this consumer goes online for the first time. This future can also raise with AMQPError if it fails to.

  • fail_on_first_time_resource_locked (bool) – When consumer is declared for the first time, and RESOURCE_LOCKED is encountered, it will fail the future with ResourceLocked, and consumer will cancel itself. By default it will retry until success is made. If the consumer doesn’t get the chance to be declared - because of a connection fail - next reconnect will consider this to be SECOND declaration, ie. it will retry ad infinitum

  • body_receive_mode (a property of BodyReceiveMode) – how should message.body be received. This has a performance impact

cancel()

Cancel the customer.

.ack() or .nack() for messages from this customer will have no effect.

Return type:

Future

Returns:

a Future to tell when it’s done. The future will always succeed - sooner, or later. NOTE: Future is OK’d when entire channel is destroyed

cancelled

public, if this is True, it won’t be attached to next connection

on_broker_cancel

public, called on Customer Cancel Notification

on_cancel

public, called on cancel for any reason

on_close(payload=None)

Handle closing the channel. It sounds like an exception…

Return type:

None

This is done in two steps: 1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE) upon detecting

that no more messages will be there

  1. self.channel_id <- None, channel is returned to Connection - c hannel has been physically torn down

Note, this can be called multiple times, and eventually with None.

on_delivery(sth)

Callback for delivery-related shit

Parameters:

sth – AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or AMQPBodyFrame

on_operational(operational)

[EXTEND ME] Called by internal methods (on_*) when channel has achieved (or lost) operational status.

If this is called with operational=True, then for sure it will be called with operational=False.

This will, therefore, get called an even number of times.

Called by Channeler, when:
  • Channeler.on_close gets called and state is ST_ONLINE

    on_close registers ChannelClose, ChannelCloseOk, BasicCancel

Parameters:

operational (bool) – True if channel has just become operational, False if it has just become useless.

Return type:

None

on_setup(payload)

Called with different kinds of frames - during setup

Return type:

None

set_qos(prefetch_size, prefetch_count)

Set new QoS for this consumer.

Parameters:
  • prefetch_size (int) – prefetch in octets

  • prefetch_count (int) – prefetch in whole messages

Return type:

None