CoolAMQP cluster

class coolamqp.clustering.Cluster(nodes, on_fail=None, extra_properties=None, log_frames=None, name=None, on_blocked=None, tracer=None)

Frontend for your AMQP needs.

This has ListenerThread.

Call .start() to connect to AMQP.

It is not safe to fork() after .start() is called, but it’s OK before.

  • nodes – list of nodes, or a single node. For now, only one is supported.

  • on_fail – callable/0 to call when connection fails in an unclean way. This is a one-shot

  • extra_properties – refer to documentation in [/coolamqp/connection/] Connection.__init__

  • log_frames – an object that supports logging each and every frame CoolAMQP sends and receives from the broker

  • name – name to appear in log items and prctl() for the listener thread

  • on_blocked – callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be called with a value of True if connection becomes blocked, and False upon an unblock

  • tracer – tracer, if opentracing is installed

bind(queue, exchange, routing_key, persistent=False, span=None, dont_trace=False)

Bind a queue to an exchange

consume(queue, on_message=None, span=None, dont_trace=False, *args, **kwargs)

Start consuming from a queue.

args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer). Don’t use future_to_notify - it’s done here!

Take care not to lose the Consumer object - it’s the only way to cancel a consumer!

  • queue – Queue object, being consumed from right now. Note that name of anonymous queue might change at any time!

  • on_message – callable that will process incoming messages if you leave it at None, messages will be .put into

  • span – optional span, if opentracing is installed

  • dont_trace – if True, this won’t output a span

Return type

Tuple[Consumer, Future]


a tuple (Consumer instance, and a Future), that tells, when consumer is ready

declare(obj, persistent=False, span=None, dont_trace=False)

Declare a Queue/Exchange

  • obj (tp.Union[Queue, Exchange]) – Queue/Exchange object

  • persistent (bool) – should it be redefined upon reconnect?

  • span (tp.Optional[opentracing.Span]) – optional parent span, if opentracing is installed

  • dont_trace (bool) – if True, a span won’t be output

Return type





Delete a queue.


queue (coolamqp.objects.Queue) – Queue instance that represents what to delete

Return type



a Future (will succeed with None or fail with AMQPError)

drain(timeout, span=None, dont_trace=False)

Return an Event.

  • timeout – time to wait for an event. 0 means return immediately. None means block forever

  • span – optional parent span, if opentracing is installed

  • dont_trace – if True, this span won’t be traced

Return type



an Event instance. NothingMuch is returned when there’s nothing within a given timoeout

publish(message, exchange=None, routing_key='', tx=None, confirm=None, span=None, dont_trace=False)

Publish a message.

  • message (Message) – Message to publish

  • exchange (tp.Union[Exchange, str, bytes]) – exchange to use. Default is the “direct” empty-name exchange.

  • routing_key (tp.Union[str, bytes]) – routing key to use

  • confirm (tp.Optional[bool]) – Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded

  • tx (tp.Optional[bool]) – deprecated, alias for confirm

  • span (tp.Optional[opentracing.Span]) – optionally, current span, if opentracing is installed

  • dont_trace (bool) – if set to True, a span won’t be generated

Return type



Future to be finished on completion or None, is confirm/tx was not chosen


Terminate all connections, release resources - finish the job.


wait (bool) – block until this is done


RuntimeError – if called without start() being called first

Return type


start(wait=True, timeout=10.0)

Connect to broker. Initialize Cluster.

Only after this call is Cluster usable. It is not safe to fork after this.

  • wait (bool) – block until connection is ready

  • timeout (float) – timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised

  • RuntimeError – called more than once

  • ConnectionDead – failed to connect within timeout

Return type