Usage basics¶
First off, you need a Cluster object:
- class coolamqp.clustering.Cluster(nodes, on_fail=None, extra_properties=None, log_frames=None, name=None, on_blocked=None, tracer=None)¶
Frontend for your AMQP needs.
This has ListenerThread.
Call .start() to connect to AMQP.
It is not safe to fork() after .start() is called, but it’s OK before.
- Parameters:
nodes – list of nodes, or a single node. For now, only one is supported.
on_fail – callable/0 to call when connection fails in an unclean way. This is a one-shot
extra_properties – refer to documentation in [/coolamqp/connection/connection.py] Connection.__init__
log_frames (tp.Optional[
coolamqp.tracing.BaseFrameTracer
]) – an object that supports logging each and every frame CoolAMQP sends and receives from the brokername – name to appear in log items and prctl() for the listener thread
on_blocked – callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be called with a value of True if connection becomes blocked, and False upon an unblock
tracer – tracer, if opentracing is installed
- bind(queue, exchange, routing_key, persistent=False, span=None, dont_trace=False)¶
Bind a queue to an exchange
- consume()¶
Start consuming from a queue.
args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer). Don’t use future_to_notify - it’s done here!
Take care not to lose the Consumer object - it’s the only way to cancel a consumer!
- Parameters:
queue – Queue object, being consumed from right now. Note that name of anonymous queue might change at any time!
on_message – callable that will process incoming messages if you leave it at None, messages will be .put into self.events
span – optional span, if opentracing is installed
dont_trace – if True, this won’t output a span
- Return type:
Tuple
[Consumer
,Future
]- Returns:
a tuple (Consumer instance, and a Future), that tells, when consumer is ready
- declare(obj, persistent=False, span=None, dont_trace=False)¶
Declare a Queue/Exchange
- Parameters:
obj (tp.Union[Queue, Exchange]) – Queue/Exchange object
persistent (bool) – should it be redefined upon reconnect?
span (tp.Optional[opentracing.Span]) – optional parent span, if opentracing is installed
dont_trace (bool) – if True, a span won’t be output
- Return type:
concurrent.futures.Future
- Returns:
Future
- delete_queue(queue)¶
Delete a queue.
- Parameters:
queue (coolamqp.objects.Queue) – Queue instance that represents what to delete
- Return type:
Future
- Returns:
a Future (will succeed with None or fail with AMQPError)
- drain()¶
Return an Event.
- Parameters:
timeout – time to wait for an event. 0 means return immediately. None means block forever
span – optional parent span, if opentracing is installed
dont_trace – if True, this span won’t be traced
- Return type:
Event
- Returns:
an Event instance. NothingMuch is returned when there’s nothing within a given timoeout
- publish(message, exchange=None, routing_key='', tx=None, confirm=None, span=None, dont_trace=False)¶
Publish a message.
- Parameters:
message (Message) – Message to publish
exchange (tp.Union[Exchange, str, bytes]) – exchange to use. Default is the “direct” empty-name exchange.
routing_key (tp.Union[str, bytes]) – routing key to use
confirm (tp.Optional[bool]) – Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded
tx (tp.Optional[bool]) – deprecated, alias for confirm
span (tp.Optional[opentracing.Span]) – optionally, current span, if opentracing is installed
dont_trace (bool) – if set to True, a span won’t be generated
- Return type:
tp.Optional[Future]
- Returns:
Future to be finished on completion or None, is confirm/tx was not chosen
- shutdown(wait=True)¶
Terminate all connections, release resources - finish the job.
- Parameters:
wait (
bool
) – block until this is done- Raises:
RuntimeError – if called without start() being called first
- Return type:
None
- start(wait=True, timeout=10.0)¶
Connect to broker. Initialize Cluster.
Only after this call is Cluster usable. It is not safe to fork after this.
- Parameters:
wait (
bool
) – block until connection is readytimeout (
float
) – timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised
- Raises:
RuntimeError – called more than once
ConnectionDead – failed to connect within timeout
- Return type:
None
You will need to initialize it with NodeDefinitions:
- class coolamqp.objects.NodeDefinition(*args, **kwargs)¶
Definition of a reachable AMQP node.
This object is hashable.
>>> a = NodeDefinition(host='192.168.0.1', user='admin', password='password', >>> virtual_host='vhost')
or
>>> a = NodeDefinition('192.168.0.1', 'admin', 'password')
or
>>> a = NodeDefinition('amqp://user:password@host/virtual_host')
or
>>> a = NodeDefinition('amqp://user:password@host:port/virtual_host', hearbeat=20)
AMQP connection string may be either bytes or str/unicode
- Additional keyword parameters that can be specified:
heartbeat - heartbeat interval in seconds port - TCP port to use. Default is 5672
- Raises:
ValueError – invalid parameters
You can send messages:
- class coolamqp.objects.Message(body, properties=None)¶
An AMQP message. Has a binary body, and some properties.
Properties is a highly regularized class - see coolamqp.framing.definitions.BasicContentPropertyList for a list of possible properties.
- Parameters:
body (anything with a buffer interface) – stream of octets
properties (MessageProperties instance, None or a dict (SLOW!)) – AMQP properties to be sent along. default is ‘no properties at all’ You can pass a dict - it will be passed to MessageProperties, but it’s slow - don’t do that.
and receive them
- class coolamqp.objects.ReceivedMessage(body, exchange_name, routing_key, properties=None, delivery_tag=None, ack=None, nack=None)¶
A message that was received from the AMQP broker.
It additionally has an exchange name, routing key used, it’s delivery tag, and methods for ack() or nack().
Note that if the consumer that generated this message was no_ack, .ack() and .nack() are no-ops.
- ack()¶
Acknowledge reception of this message.
This is a no-op if a Consumer was called with no_ack=True.
If called after an ack() or nack() was called, this will be a no-op.
- nack()¶
Negatively acknowledge reception of this message.
This is a no-op if a Consumer was called with no_ack=True. If no_ack was False, the message will be requeued and redelivered by the broker
If called after an ack() or nack() was called, this will be a no-op.
MessageProperties¶
- coolamqp.objects.MessageProperties¶
alias of
BasicContentPropertyList
- class coolamqp.framing.definitions.BasicContentPropertyList(**kwargs)¶
The basic class provides methods that support an industry-standard messaging model.
- FIELDS = [('content-type', 'shortstr', 'shortstr', False), ('content-encoding', 'shortstr', 'shortstr', False), ('headers', 'table', 'table', False), ('delivery-mode', 'octet', 'octet', False), ('priority', 'octet', 'octet', False), ('correlation-id', 'shortstr', 'shortstr', False), ('reply-to', 'shortstr', 'shortstr', False), ('expiration', 'shortstr', 'shortstr', False), ('message-id', 'shortstr', 'shortstr', False), ('timestamp', 'timestamp', 'timestamp', False), ('type', 'shortstr', 'shortstr', False), ('user-id', 'shortstr', 'shortstr', False), ('app-id', 'shortstr', 'shortstr', False), ('reserved', 'shortstr', 'shortstr', False)]¶
- PARTICULAR_CLASSES = {b'\x00\x00': <class 'coolamqp.framing.compilation.content_property.ParticularContentTypeList'>}¶
- static from_buffer(buf, offset)¶
Return a content property list instance unserialized from buffer, so that buf[offset] marks the start of property flags
- Return type:
BasicContentPropertyList
- static typize(*fields)¶
- Return type:
type
Take care, as MessageProperties
will hash the
keys found and store it within non-GCable memory. So each “variant” of message
properties encountered will be compiled as a separate class.