Henson-AMQP¶
A library for interacting with AMQP with a Henson application.
Quickstart¶
# settings.py
AMQP_INBOUND_QUEUE = 'incoming'
AMQP_INBOUND_EXCHANGE = 'incoming'
AMQP_OUTBOUND_EXCHANGE = 'outgoing'
AMQP_INBOUND_ROUTING_KEY = 'outgoing'
AMQP_OUTBOUND_ROUTING_KEY = 'outgoing'
# app.py
from henson import Application
from henson_amqp import AMQP
from . import settings
from .callback import run
app = Application('app', callback=run)
app.config.from_object(settings)
amqp = AMQP(app)
app.consumer = amqp.consumer()
# Enable optional Retry support
from henson.contrib.retry import Retry
app.settings['RETRY_CALLBACK'] = app.consumer.retry
Retry(app)
$ henson run app
Contents:
Settings¶
Connection Settings¶
AMQP_HOST |
The hostname or IP address of the AMQP
server to connect to. Defaults to
'localhost' . |
AMQP_PORT |
The port of the AMQP server to connect
to. Defaults to 5672 . |
AMQP_USERNAME |
The username to authenticate with.
Defaults to 'guest' . |
AMQP_PASSWORD |
The password to authenticate with.
Defaults to 'guest' . |
AMQP_VIRTUAL_HOST |
The virtual host to use. Defaults to
'/' . |
AMQP_HEARTBEAT_INTERVAL |
The heartbeat interval to use for
connections. Defaults to 60 . |
AMQP_CONNECTION_KWARGS |
Additional arguments to pass to
aioamqp.connect() . Defaults to
{} . |
Consumer Settings¶
REGISTER_CONSUMER |
If True , a consumer will be
automatically created and assigned to
the application. Defaults to False . |
AMQP_INBOUND_EXCHANGE |
The name of the exchange that the
consumer should read from. Defaults to
'' (the AMQP default exchange). |
AMQP_INBOUND_EXCHANGE_DURABLE |
The durability setting of the exchange
that the consumer reads from. Defaults
to False . |
AMQP_INBOUND_EXCHANGE_TYPE |
The type of the inbound exchange.
Defaults to 'direct' . |
AMQP_INBOUND_EXCHANGE_KWARGS |
Additonal arguments to pass to
aioamqp.channel.exchange_declare() .
Defaults to {} . |
AMQP_INBOUND_QUEUE |
The name of the queue that the
consumer should read from. Defaults to
'' (the AMQP default queue). |
AMQP_INBOUND_QUEUE_DURABLE |
The durability setting of the queue
the consumer reads from. Defaults to
False . |
AMQP_INBOUND_ROUTING_KEY |
The routing key used to bind the
inbound exchange and queue. Defaults
to '' . |
AMQP_DISPATCH_METHOD |
Reserved for future use. |
Producer Settings¶
AMQP_OUTBOUND_EXCHANGE |
The name of the exchange used by the
producer to send messages. Defaults to
'' . |
AMQP_OUTBOUND_EXCHANGE_DURABLE |
The durability setting of the outbound
exchange. Defaults to False . |
AMQP_OUTBOUND_EXCHANGE_TYPE |
The type of the outbound exchange.
Defaults to 'direct' . |
AMQP_OUTBOUND_EXCHANGE_KWARGS |
Additonal arguments to pass to
aioamqp.channel.exchange_declare() .
Defaults to {} . |
AMQP_OUTBOUND_ROUTING_KEY |
The default routing key used when
sending messages to the outbound
exchange if the routing_key argument
is not provided. Defaults to '' . |
AMQP_PREFETCH_LIMIT |
The maximum number of messages to keep
in the internal queue waiting to be
processed. If set to 0 , the
consumer will fetch all available
messages from the AMQP queue. Defaults
to 0 . |
AMQP_DELIVERY_MODE |
The mode used when sending messages.
By default, messages are
non-persistent.
Defaults to
henson_amqp.DeliveryMode.NONPERSISTENT |
API¶
The public API of Henson-AMQP.
AMQP¶
-
class
henson_amqp.
AMQP
(app=None)[source]¶ An interface to interact with an AMQP broker.
-
consumer
()[source]¶ Return a new AMQP consumer.
Returns: - A new consumer object that can be used to read
- from the AMQP broker and queue specified the Application’s settings.
Return type: Consumer
-
Consumer¶
-
class
henson_amqp.
Consumer
(app)[source]¶ A consumer of an AMQP queue.
Parameters: app (henson.base.Application) – The application for which this consumer consumes. -
read
()[source]¶ Read a single message from the message queue.
If the consumer has not yet begun reading from the AMQP broker, that process is initiated before yielding from the queue.
Returns: The next available message. Return type: Message Raises: aioamqp.exceptions.AioamqpException
– The exception raised on connection close.
-
retry
(app, message)[source]¶ Requeue a message to be processed again.
This coroutine is meant for use with the
henson.contrib.retry.Retry
extension.Parameters: - app (henson.base.Application) – The application processing the message.
- message (dict) – A copy of the message read from the AMQP server.
Note
This function assumes that messages are JSON serializeable. If they are not, a custom function may be used in its place.
-
Message¶
Changelog¶
Version 0.6.0¶
Released 2018-06-25
- Support both
int
values and existinghenson_amqp.DeliveryMode
enums
for theAMQP_DELIVERY_MODE
setting - Stop declaring the exchange with every message sent
Version 0.5.0¶
Released 2016-06-24
- Support passing
arguments
to inbound and outbound exchange declaration
Version 0.2.0¶
Released 2016-03-11
- If a connection is closed by
aioamqp
while reading, raise that exception during calls toConsumer.read
after all previously read messages have been returned.