CoolAMQP cluster

class coolamqp.clustering.Cluster(nodes, on_fail=None, extra_properties=None, log_frames=None, name=None, on_blocked=None, tracer=None)

Frontend for your AMQP needs.

This has ListenerThread.

Call .start() to connect to AMQP.

It is not safe to fork() after .start() is called, but it’s OK before.

Parameters:
  • nodes – list of nodes, or a single node. For now, only one is supported.

  • on_fail – callable/0 to call when connection fails in an unclean way. This is a one-shot

  • extra_properties – refer to documentation in [/coolamqp/connection/connection.py] Connection.__init__

  • log_frames (tp.Optional[coolamqp.tracing.BaseFrameTracer]) – an object that supports logging each and every frame CoolAMQP sends and receives from the broker

  • name – name to appear in log items and prctl() for the listener thread

  • on_blocked – callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be called with a value of True if connection becomes blocked, and False upon an unblock

  • tracer – tracer, if opentracing is installed

bind(queue, exchange, routing_key, persistent=False, span=None, dont_trace=False)

Bind a queue to an exchange

consume(queue, on_message=None, span=None, dont_trace=False, *args, **kwargs)

Start consuming from a queue.

args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer). Don’t use future_to_notify - it’s done here!

Take care not to lose the Consumer object - it’s the only way to cancel a consumer!

Parameters:
  • queue – Queue object, being consumed from right now. Note that name of anonymous queue might change at any time!

  • on_message – callable that will process incoming messages if you leave it at None, messages will be .put into self.events

  • span – optional span, if opentracing is installed

  • dont_trace – if True, this won’t output a span

Return type:

Tuple[Consumer, Future]

Returns:

a tuple (Consumer instance, and a Future), that tells, when consumer is ready

declare(obj, persistent=False, span=None, dont_trace=False)

Declare a Queue/Exchange

Parameters:
  • obj (tp.Union[Queue, Exchange]) – Queue/Exchange object

  • persistent (bool) – should it be redefined upon reconnect?

  • span (tp.Optional[opentracing.Span]) – optional parent span, if opentracing is installed

  • dont_trace (bool) – if True, a span won’t be output

Return type:

concurrent.futures.Future

Returns:

Future

delete_queue(queue)

Delete a queue.

Parameters:

queue (coolamqp.objects.Queue) – Queue instance that represents what to delete

Return type:

Future

Returns:

a Future (will succeed with None or fail with AMQPError)

drain(timeout, span=None, dont_trace=False)

Return an Event.

Parameters:
  • timeout – time to wait for an event. 0 means return immediately. None means block forever

  • span – optional parent span, if opentracing is installed

  • dont_trace – if True, this span won’t be traced

Return type:

Event

Returns:

an Event instance. NothingMuch is returned when there’s nothing within a given timoeout

publish(message, exchange=None, routing_key='', tx=None, confirm=None, span=None, dont_trace=False)

Publish a message.

Parameters:
  • message (Message) – Message to publish

  • exchange (tp.Union[Exchange, str, bytes]) – exchange to use. Default is the “direct” empty-name exchange.

  • routing_key (tp.Union[str, bytes]) – routing key to use

  • confirm (tp.Optional[bool]) – Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded

  • tx (tp.Optional[bool]) – deprecated, alias for confirm

  • span (tp.Optional[opentracing.Span]) – optionally, current span, if opentracing is installed

  • dont_trace (bool) – if set to True, a span won’t be generated

Return type:

tp.Optional[Future]

Returns:

Future to be finished on completion or None, is confirm/tx was not chosen

shutdown(wait=True)

Terminate all connections, release resources - finish the job.

Parameters:

wait (bool) – block until this is done

Raises:

RuntimeError – if called without start() being called first

Return type:

None

start(wait=True, timeout=10.0)

Connect to broker. Initialize Cluster.

Only after this call is Cluster usable. It is not safe to fork after this.

Parameters:
  • wait (bool) – block until connection is ready

  • timeout (float) – timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised

Raises:
  • RuntimeError – called more than once

  • ConnectionDead – failed to connect within timeout

Return type:

None

Publisher

class coolamqp.attaches.publisher.Publisher(mode, cluster=None)

An object that is capable of sucking into a Connection and sending messages. Depending on it’s characteristic, it may process messages in:

  • non-ack mode (default) - messages will be dropped on the floor if there is no active uplink

  • Consumer Publish mode - requires broker support, each message will be ACK/NACKed by the broker

    messages will survive broker reconnections.

    If you support this, it is your job to ensure that broker supports publisher_confirms. If it doesn’t, this publisher will enter ST_OFFLINE and emit a warning.

Other modes may be added in the future.

Since this may be called by other threads than ListenerThread, this has locking.

_pub and on_fail are synchronized so that _pub doesn’t see a partially destroyed class.

Parameters:

mode

Publishing mode to use. One of: MODE_NOACK - use non-ack mode MODE_CNPUB - use consumer publishing mode. A switch to MODE_TXPUB will be made

if broker does not support these.

Raises:

ValueError – mode invalid

MODE_CNPUB = 1
MODE_NOACK = 0
exception UnusablePublisher

This publisher will never work (eg. MODE_CNPUB on a broker not supporting publisher confirms)

attach(connection)

Attach this object to a live Connection.

Parameters:

connection – Connection instance to use

channel_id
on_connection_blocked(payload)
on_fail()
on_flow_control(payload)

Called on ChannelFlow

on_operational(operational)

[EXTEND ME] Called by internal methods (on_*) when channel has achieved (or lost) operational status.

If this is called with operational=True, then for sure it will be called with operational=False.

This will, therefore, get called an even number of times.

Called by Channeler, when:
  • Channeler.on_close gets called and state is ST_ONLINE

    on_close registers ChannelClose, ChannelCloseOk, BasicCancel

Parameters:

operational (bool) – True if channel has just become operational, False if it has just become useless.

Return type:

None

on_setup(payload)

[OVERRIDE ME!] Called with a method frame that signifies a part of setup.

You must be prepared to handle at least a payload of ChannelOpenOk

Parameters:

payload – AMQP method frame payload

publish(message, exchange=b'', routing_key=b'', span=None)

Schedule to have a message published.

If mode is MODE_CNPUB:

this function will return a Future. Future can end either with success (result will be None), or exception (a plain Exception instance). Exception will happen when broker NACKs the message: that, according to RabbitMQ, means an internal error in Erlang process.

Returned Future can be cancelled - this will prevent from sending the message, if it hasn’t commenced yet.

If mode is MODE_NOACK:

this function returns None. Messages are dropped on the floor if there’s no connection.

Parameters:
  • message – Message object to send

  • exchange (bytes, str or Exchange instance) – exchange name to use. Default direct exchange by default. Can also be an Exchange object.

  • routing_key – routing key to use

  • span – optional span, if opentracing is installed

Returns:

a Future instance, or None

Raises:

Publisher.UnusablePublisher – this publisher will never work (eg. MODE_CNPUB on Non-RabbitMQ)

Consumers

class coolamqp.attaches.consumer.BodyReceiveMode
class coolamqp.attaches.consumer.Consumer(queue, on_message, span=None, no_ack=True, qos=None, cancel_on_failure=False, future_to_notify=None, fail_on_first_time_resource_locked=False, body_receive_mode=0)

This object represents a consumer in the system.

Consumer may reside on any AMQP broker, this is to be decided by CoolAMQP. Consumer, when created, has the state of ST_SYNCING. CoolAMQP will try to declare the consumer where it makes most sense for it to be.

If it succeeds, the consumer will enter state ST_ONLINE, and callables on_start will be called. This means that broker has confirmed that this consumer is operational and receiving messages.

Note that does not attempt to cancel consumers, or any of such nonsense. Having a channel per consumer gives you the unique possibility of simply closing the channel. Since this implies cancelling the consumer, here you go.

WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS

DO!

You can subscribe to be informed when the consumer is cancelled (for any reason, server or client side) with:

>>> con, fut = Cluster.consume(...)
>>> def im_called_on_cancel_for_any_reason():   # must have arity of 0
>>>     ..
>>> con.on_cancel.add(im_called_on_cancel_for_any_reason)
>>> con.cancel()

Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel Notification:

>>> con.on_broker_cancel.add(im_cancelled_by_broker)
Parameters:
  • queue (coolamqp.objects.Queue) – Queue object, being consumed from right now. Note that name of anonymous queue might change at any time!

  • on_message (callable(ReceivedMessage instance)) – callable that will process incoming messages

  • span – optional span, if opentracing is installed

  • no_ack (bool) – Will this consumer require acknowledges from messages?

  • qos (tuple(int, int) or tuple(None, int) or int) – a tuple of (prefetch size, prefetch window) for this consumer, or an int (prefetch window only). If an int is passed, prefetch size will be set to 0 (which means undefined), and this int will be used for prefetch window

  • cancel_on_failure (bool) – Consumer will cancel itself when link goes down

  • future_to_notify (concurrent.futures.Future) – Future to succeed when this consumer goes online for the first time. This future can also raise with AMQPError if it fails to.

  • fail_on_first_time_resource_locked (bool) – When consumer is declared for the first time, and RESOURCE_LOCKED is encountered, it will fail the future with ResourceLocked, and consumer will cancel itself. By default it will retry until success is made. If the consumer doesn’t get the chance to be declared - because of a connection fail - next reconnect will consider this to be SECOND declaration, ie. it will retry ad infinitum

  • body_receive_mode (a property of BodyReceiveMode) – how should message.body be received. This has a performance impact

attache_group
body_receive_mode
cancel()

Cancel the customer.

.ack() or .nack() for messages from this customer will have no effect.

Return type:

Future

Returns:

a Future to tell when it’s done. The future will always succeed - sooner, or later. NOTE: Future is OK’d when entire channel is destroyed

cancel_on_failure
cancelled

public, if this is True, it won’t be attached to next connection

channel_close_sent
consumer_tag
deliver_watch
fail_on_first_time_resource_locked
future_to_notify
future_to_notify_on_dead
hb_watch
no_ack
on_broker_cancel

public, called on Customer Cancel Notification

on_cancel

public, called on cancel for any reason

on_close(payload=None)

Handle closing the channel. It sounds like an exception…

Return type:

None

This is done in two steps: 1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE) upon detecting

that no more messages will be there

  1. self.channel_id <- None, channel is returned to Connection - c hannel has been physically torn down

Note, this can be called multiple times, and eventually with None.

on_delivery(sth)

Callback for delivery-related shit

Parameters:

sth – AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or AMQPBodyFrame

on_message
on_operational(operational)

[EXTEND ME] Called by internal methods (on_*) when channel has achieved (or lost) operational status.

If this is called with operational=True, then for sure it will be called with operational=False.

This will, therefore, get called an even number of times.

Called by Channeler, when:
  • Channeler.on_close gets called and state is ST_ONLINE

    on_close registers ChannelClose, ChannelCloseOk, BasicCancel

Parameters:

operational (bool) – True if channel has just become operational, False if it has just become useless.

Return type:

None

on_setup(payload)

Called with different kinds of frames - during setup

Return type:

None

qos
qos_update_sent
queue
receiver
set_qos(prefetch_size, prefetch_count)

Set new QoS for this consumer.

Parameters:
  • prefetch_size (int) – prefetch in octets

  • prefetch_count (int) – prefetch in whole messages

Return type:

None

span