CoolAMQP cluster

class coolamqp.clustering.Cluster(nodes, on_fail=None, extra_properties=None, log_frames=None, name=None, on_blocked=None, tracer=None)

Frontend for your AMQP needs.

This has ListenerThread.

Call .start() to connect to AMQP.

It is not safe to fork() after .start() is called, but it’s OK before.

Parameters
  • nodes – list of nodes, or a single node. For now, only one is supported.

  • on_fail – callable/0 to call when connection fails in an unclean way. This is a one-shot

  • extra_properties – refer to documentation in [/coolamqp/connection/connection.py] Connection.__init__

  • log_frames – an object that supports logging each and every frame CoolAMQP sends and receives from the broker

  • name – name to appear in log items and prctl() for the listener thread

  • on_blocked – callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be called with a value of True if connection becomes blocked, and False upon an unblock

  • tracer – tracer, if opentracing is installed

bind(queue, exchange, routing_key, persistent=False, span=None, dont_trace=False)

Bind a queue to an exchange

consume(queue, on_message=None, span=None, dont_trace=False, *args, **kwargs)

Start consuming from a queue.

args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer). Don’t use future_to_notify - it’s done here!

Take care not to lose the Consumer object - it’s the only way to cancel a consumer!

Parameters
  • queue – Queue object, being consumed from right now. Note that name of anonymous queue might change at any time!

  • on_message – callable that will process incoming messages if you leave it at None, messages will be .put into self.events

  • span – optional span, if opentracing is installed

  • dont_trace – if True, this won’t output a span

Return type

Tuple[Consumer, Future]

Returns

a tuple (Consumer instance, and a Future), that tells, when consumer is ready

declare(obj, persistent=False, span=None, dont_trace=False)

Declare a Queue/Exchange

Parameters
  • obj (tp.Union[Queue, Exchange]) – Queue/Exchange object

  • persistent (bool) – should it be redefined upon reconnect?

  • span (tp.Optional[opentracing.Span]) – optional parent span, if opentracing is installed

  • dont_trace (bool) – if True, a span won’t be output

Return type

concurrent.futures.Future

Returns

Future

delete_queue(queue)

Delete a queue.

Parameters

queue (coolamqp.objects.Queue) – Queue instance that represents what to delete

Return type

Future

Returns

a Future (will succeed with None or fail with AMQPError)

drain(timeout, span=None, dont_trace=False)

Return an Event.

Parameters
  • timeout – time to wait for an event. 0 means return immediately. None means block forever

  • span – optional parent span, if opentracing is installed

  • dont_trace – if True, this span won’t be traced

Return type

Event

Returns

an Event instance. NothingMuch is returned when there’s nothing within a given timoeout

publish(message, exchange=None, routing_key='', tx=None, confirm=None, span=None, dont_trace=False)

Publish a message.

Parameters
  • message (Message) – Message to publish

  • exchange (tp.Union[Exchange, str, bytes]) – exchange to use. Default is the “direct” empty-name exchange.

  • routing_key (tp.Union[str, bytes]) – routing key to use

  • confirm (tp.Optional[bool]) – Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded

  • tx (tp.Optional[bool]) – deprecated, alias for confirm

  • span (tp.Optional[opentracing.Span]) – optionally, current span, if opentracing is installed

  • dont_trace (bool) – if set to True, a span won’t be generated

Return type

tp.Optional[Future]

Returns

Future to be finished on completion or None, is confirm/tx was not chosen

shutdown(wait=True)

Terminate all connections, release resources - finish the job.

Parameters

wait (bool) – block until this is done

Raises

RuntimeError – if called without start() being called first

Return type

None

start(wait=True, timeout=10.0)

Connect to broker. Initialize Cluster.

Only after this call is Cluster usable. It is not safe to fork after this.

Parameters
  • wait (bool) – block until connection is ready

  • timeout (float) – timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised

Raises
  • RuntimeError – called more than once

  • ConnectionDead – failed to connect within timeout

Return type

None