Usage basics

First off, you need a Cluster object:

class coolamqp.clustering.Cluster(nodes, on_fail=None, extra_properties=None, log_frames=None, name=None, on_blocked=None, tracer=None)

Frontend for your AMQP needs.

This has ListenerThread.

Call .start() to connect to AMQP.

It is not safe to fork() after .start() is called, but it’s OK before.

Parameters:
  • nodes – list of nodes, or a single node. For now, only one is supported.

  • on_fail – callable/0 to call when connection fails in an unclean way. This is a one-shot

  • extra_properties – refer to documentation in [/coolamqp/connection/connection.py] Connection.__init__

  • log_frames (tp.Optional[coolamqp.tracing.BaseFrameTracer]) – an object that supports logging each and every frame CoolAMQP sends and receives from the broker

  • name – name to appear in log items and prctl() for the listener thread

  • on_blocked – callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be called with a value of True if connection becomes blocked, and False upon an unblock

  • tracer – tracer, if opentracing is installed

bind(queue, exchange, routing_key, persistent=False, span=None, dont_trace=False)

Bind a queue to an exchange

consume()

Start consuming from a queue.

args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer). Don’t use future_to_notify - it’s done here!

Take care not to lose the Consumer object - it’s the only way to cancel a consumer!

Parameters:
  • queue – Queue object, being consumed from right now. Note that name of anonymous queue might change at any time!

  • on_message – callable that will process incoming messages if you leave it at None, messages will be .put into self.events

  • span – optional span, if opentracing is installed

  • dont_trace – if True, this won’t output a span

Return type:

Tuple[Consumer, Future]

Returns:

a tuple (Consumer instance, and a Future), that tells, when consumer is ready

declare(obj, persistent=False, span=None, dont_trace=False)

Declare a Queue/Exchange

Parameters:
  • obj (tp.Union[Queue, Exchange]) – Queue/Exchange object

  • persistent (bool) – should it be redefined upon reconnect?

  • span (tp.Optional[opentracing.Span]) – optional parent span, if opentracing is installed

  • dont_trace (bool) – if True, a span won’t be output

Return type:

concurrent.futures.Future

Returns:

Future

delete_queue(queue)

Delete a queue.

Parameters:

queue (coolamqp.objects.Queue) – Queue instance that represents what to delete

Return type:

Future

Returns:

a Future (will succeed with None or fail with AMQPError)

drain()

Return an Event.

Parameters:
  • timeout – time to wait for an event. 0 means return immediately. None means block forever

  • span – optional parent span, if opentracing is installed

  • dont_trace – if True, this span won’t be traced

Return type:

Event

Returns:

an Event instance. NothingMuch is returned when there’s nothing within a given timoeout

publish(message, exchange=None, routing_key='', tx=None, confirm=None, span=None, dont_trace=False)

Publish a message.

Parameters:
  • message (Message) – Message to publish

  • exchange (tp.Union[Exchange, str, bytes]) – exchange to use. Default is the “direct” empty-name exchange.

  • routing_key (tp.Union[str, bytes]) – routing key to use

  • confirm (tp.Optional[bool]) – Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded

  • tx (tp.Optional[bool]) – deprecated, alias for confirm

  • span (tp.Optional[opentracing.Span]) – optionally, current span, if opentracing is installed

  • dont_trace (bool) – if set to True, a span won’t be generated

Return type:

tp.Optional[Future]

Returns:

Future to be finished on completion or None, is confirm/tx was not chosen

shutdown(wait=True)

Terminate all connections, release resources - finish the job.

Parameters:

wait (bool) – block until this is done

Raises:

RuntimeError – if called without start() being called first

Return type:

None

start(wait=True, timeout=10.0)

Connect to broker. Initialize Cluster.

Only after this call is Cluster usable. It is not safe to fork after this.

Parameters:
  • wait (bool) – block until connection is ready

  • timeout (float) – timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised

Raises:
  • RuntimeError – called more than once

  • ConnectionDead – failed to connect within timeout

Return type:

None

You will need to initialize it with NodeDefinitions:

class coolamqp.objects.NodeDefinition(*args, **kwargs)

Definition of a reachable AMQP node.

This object is hashable.

>>> a = NodeDefinition(host='192.168.0.1', user='admin', password='password',
>>>                   virtual_host='vhost')

or

>>> a = NodeDefinition('192.168.0.1', 'admin', 'password')

or

>>> a = NodeDefinition('amqp://user:password@host/virtual_host')

or

>>> a = NodeDefinition('amqp://user:password@host:port/virtual_host', hearbeat=20)

AMQP connection string may be either bytes or str/unicode

Additional keyword parameters that can be specified:

heartbeat - heartbeat interval in seconds port - TCP port to use. Default is 5672

Raises:

ValueError – invalid parameters

You can send messages:

class coolamqp.objects.Message(body, properties=None)

An AMQP message. Has a binary body, and some properties.

Properties is a highly regularized class - see coolamqp.framing.definitions.BasicContentPropertyList for a list of possible properties.

Parameters:
  • body (anything with a buffer interface) – stream of octets

  • properties (MessageProperties instance, None or a dict (SLOW!)) – AMQP properties to be sent along. default is ‘no properties at all’ You can pass a dict - it will be passed to MessageProperties, but it’s slow - don’t do that.

and receive them

class coolamqp.objects.ReceivedMessage(body, exchange_name, routing_key, properties=None, delivery_tag=None, ack=None, nack=None)

A message that was received from the AMQP broker.

It additionally has an exchange name, routing key used, it’s delivery tag, and methods for ack() or nack().

Note that if the consumer that generated this message was no_ack, .ack() and .nack() are no-ops.

ack()

Acknowledge reception of this message.

This is a no-op if a Consumer was called with no_ack=True.

If called after an ack() or nack() was called, this will be a no-op.

nack()

Negatively acknowledge reception of this message.

This is a no-op if a Consumer was called with no_ack=True. If no_ack was False, the message will be requeued and redelivered by the broker

If called after an ack() or nack() was called, this will be a no-op.

MessageProperties

coolamqp.objects.MessageProperties

alias of BasicContentPropertyList

class coolamqp.framing.definitions.BasicContentPropertyList(**kwargs)

The basic class provides methods that support an industry-standard messaging model.

FIELDS = [('content-type', 'shortstr', 'shortstr', False), ('content-encoding', 'shortstr', 'shortstr', False), ('headers', 'table', 'table', False), ('delivery-mode', 'octet', 'octet', False), ('priority', 'octet', 'octet', False), ('correlation-id', 'shortstr', 'shortstr', False), ('reply-to', 'shortstr', 'shortstr', False), ('expiration', 'shortstr', 'shortstr', False), ('message-id', 'shortstr', 'shortstr', False), ('timestamp', 'timestamp', 'timestamp', False), ('type', 'shortstr', 'shortstr', False), ('user-id', 'shortstr', 'shortstr', False), ('app-id', 'shortstr', 'shortstr', False), ('reserved', 'shortstr', 'shortstr', False)]
PARTICULAR_CLASSES = {b'\x00\x00': <class 'coolamqp.framing.compilation.content_property.ParticularContentTypeList'>}
static from_buffer(buf, offset)

Return a content property list instance unserialized from buffer, so that buf[offset] marks the start of property flags

Return type:

BasicContentPropertyList

static typize(*fields)
Return type:

type

Take care, as MessageProperties will hash the keys found and store it within non-GCable memory. So each “variant” of message properties encountered will be compiled as a separate class.