sprockets.mixins.amqp¶
AMQP Publishing Mixin for Tornado RequestHandlers.
Installation¶
sprockets.mixins.amqp
is available on the
Python Package Index
and can be installed via pip
:
pip3 install sprockets.mixins.amqp
Documentation¶
Python Compatibility¶
- Python 3.7
- Python 3.8
Requirements¶
- Pika==0.13.1
- Tornado>=6,<7
Example¶
You may optionally install sprockets.mixins.correlation
into your request handler to take advantage of automatic correlation_id fetching.
Otherwise, be sure to set correlation_id as an instance variable on your request handler before sending AMQP messages.
This examples demonstrates the most basic usage of sprockets.mixins.amqp
export AMQP_URL="amqp://user:password@rabbitmq_host:5672/%2f"
python my-example-app.py
import json
from tornado import gen, web
from sprockets.mixins import amqp
def make_app(**settings):
application = web.Application(
[
web.url(r'/', RequestHandler),
], **settings)
amqp_settings = {
"reconnect_delay": 5,
}
amqp.install(application, **amqp_settings)
return application
class RequestHandler(amqp.PublishingMixin,
correlation.HandlerMixin,
web.RequestHandler):
async def get(self, *args, **kwargs):
body = {'request': self.request.path, 'args': args, 'kwargs': kwargs}
await self.amqp_publish(
'exchange',
'routing.key',
json.dumps(body),
{'content_type': 'application/json'}
)
AMQP Settings¶
url: | The AMQP URL to connect to. |
---|---|
reconnect_delay: | |
The optional time in seconds to wait before reconnecting on connection failure. | |
timeout: | The optional maximum time to wait for a bad state to resolve before treating the failure as persistent. |
connection_attempts: | |
The optional number of connection attempts to make before giving up. | |
on_ready_callback: | |
The optional callback to call when the connection to the AMQP server has been established and is ready. | |
on_unavailable_callback: | |
The optional callback to call when the connection to the AMQP server becomes unavailable. | |
on_persistent_failure_callback: | |
The optional callback to call when the connection failure does not resolve itself within the timeout. | |
on_message_returned_callback: | |
The optional callback to call when the AMQP server returns a message. | |
ioloop: | An optional IOLoop to override the default with. |
Environment Variables¶
Any environment variables set will override the corresponding AMQP settings passed into install()
AMQP_URL
AMQP_TIMEOUT
AMQP_RECONNECT_DELAY
AMQP_CONNECTION_ATTEMPTS
Source¶
sprockets.mixins.amqp
source is available on Github at https://github.com/sprockets/sprockets.mixins.amqp
Running Tests Locally¶
You’ll need to have python 3.7, Docker and Docker Compose installed.
$ python3 -m venv env
$ env/bin/activate
(env) $ pip3 install -r requires/testing.txt
(env) $ ./bootstrap.sh
(env) $ coverage run && coverage report
License¶
sprockets.mixins.amqp
is released under the 3-Clause BSD license.
Issues¶
Please report any issues to the Github project at https://github.com/sprockets/sprockets.mixins.amqp/issues
API¶
The PublishingMixin adds RabbitMQ publishing capabilities to a request handler, with methods to speed the development of publishing RabbitMQ messages.
Configured using the following environment variables:
AMQP_URL
- The AMQP URL to connect to.AMQP_TIMEOUT
- The optional maximum time to wait for a bad state to resolve before treating the failure as persistent.AMQP_RECONNECT_DELAY
- The optional time in seconds to wait before reconnecting on connection failure.AMQP_CONNECTION_ATTEMPTS
- The optional number of connection attempts to make before giving up.
The AMQP`
prefix is interchangeable with RABBITMQ
. For example, you can
use AMQP_URL
or RABBITMQ_URL
.
-
exception
sprockets.mixins.amqp.
AMQPException
(*args)¶ Base Class for the the AMQP client
-
class
sprockets.mixins.amqp.
Client
(url, enable_confirmations=True, reconnect_delay=5, connection_attempts=3, default_app_id=None, on_ready_callback=None, on_unavailable_callback=None, on_return_callback=None, io_loop=None)¶ This class encompasses all of the AMQP/RabbitMQ specific behaviors.
If RabbitMQ closes the connection, it will reopen it. You should look at the output, as there are limited reasons why the connection may be closed, which usually are tied to permission related issues or socket timeouts.
If the channel is closed, it will indicate a problem with one of the commands that were issued and that should surface in the output as well.
-
close
()¶ Cleanly shutdown the connection to RabbitMQ
Raises: sprockets.mixins.amqp.ConnectionStateError
-
connect
()¶ This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method will be invoked by pika.
Return type: pika.TornadoConnection
-
connecting
¶ Returns
True
if the connection to RabbitMQ is open and a channel is in the process of connecting.Return type: bool
-
static
on_back_pressure_detected
(obj)¶ This method is called by pika if it believes that back pressure is being applied to the TCP socket.
Parameters: obj (unknown) – The connection where back pressure is being applied
-
on_basic_return
(_channel, method, properties, body)¶ Invoke a registered callback or log the returned message.
Parameters: - _channel (pika.channel.Channel) – The channel the message was sent on
- method (pika.spec.Basic.Return) – The method object
- properties (pika.spec.BasicProperties) – The message properties
- unicode, bytes body (str,) – The message body
-
on_channel_closed
(channel, reply_code, reply_text)¶ Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters.
In this case, we just want to log the error and create a new channel after setting the state back to connecting.
Parameters:
-
on_channel_flow
(method)¶ When RabbitMQ indicates the connection is unblocked, set the state appropriately.
Parameters: method (pika.spec.Channel.Flow) – The Channel flow frame
-
on_channel_open
(channel)¶ This method is invoked by pika when the channel has been opened. The channel object is passed in so we can make use of it.
Parameters: channel (pika.channel.Channel) – The channel object
-
on_connection_blocked
(method_frame)¶ This method is called by pika if RabbitMQ sends a connection blocked method, to let us know we need to throttle our publishing.
Parameters: method_frame (pika.amqp_object.Method) – The blocked method frame
-
on_connection_closed
(connection, reply_code, reply_text)¶ This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects.
Parameters:
-
on_connection_open
(connection)¶ This method is called by pika once the connection to RabbitMQ has been established.
-
on_connection_open_error
(connection, error)¶ Invoked if the connection to RabbitMQ can not be made.
Parameters: error (Exception) – The exception indicating failure
-
on_connection_unblocked
(method_frame)¶ When RabbitMQ indicates the connection is unblocked, set the state appropriately.
Parameters: method_frame (pika.amqp_object.Method) – Unblocked method frame
-
on_delivery_confirmation
(method_frame)¶ Invoked by pika when RabbitMQ responds to a Basic.Publish RPC command, passing in either a Basic.Ack or Basic.Nack frame with the delivery tag of the message that was published. The delivery tag is an integer counter indicating the message number that was sent on the channel via Basic.Publish. Here we’re just doing house keeping to keep track of stats and remove message numbers that we expect a delivery confirmation of from the list used to keep track of messages that are pending confirmation.
Parameters: method_frame (pika.frame.Method) – Basic.Ack or Basic.Nack frame
-
publish
(exchange, routing_key, body, properties=None)¶ Publish a message to RabbitMQ. If the RabbitMQ connection is not established or is blocked, attempt to wait until sending is possible.
Parameters: Return type: Raises: Raises: sprockets.mixins.amqp.PublishingError
-
-
exception
sprockets.mixins.amqp.
ConnectionStateError
(*args)¶ Invoked when reconnect is attempted but the state is incorrect
-
exception
sprockets.mixins.amqp.
NotReadyError
(*args)¶ Raised if the
Client.publish()
is invoked and the connection is not ready for publishing.
-
exception
sprockets.mixins.amqp.
PublishingFailure
(*args)¶ Raised if the
Client.publish()
is invoked and an error occurs or the message delivery is not confirmed.
-
class
sprockets.mixins.amqp.
PublishingMixin
¶ This mixin adds publishing messages to RabbitMQ. It uses a persistent connection and channel opened when the application start up and automatically reopened if closed by RabbitMQ
-
amqp_publish
(exchange, routing_key, body, properties=None)¶ Publish a message to RabbitMQ
Parameters: Return type: Raises: sprockets.mixins.amqp.AMQPError
Raises: Raises: sprockets.mixins.amqp.PublishingError
-
-
sprockets.mixins.amqp.
install
(application, io_loop=None, **kwargs)¶ Call this to install AMQP for the Tornado application. Additional keyword arguments are passed through to the constructor of the AMQP object.
Parameters: - application (tornado.web.Application) – The tornado application
- io_loop (tornado.ioloop.IOLoop) – The current IOLoop.
Return type:
Version History¶
3.0.0 Mar 5, 2020¶
- Drop Python 3.5 & 3.6 support
- Add Python 3.8 support
- Bump Tornado to >= 6, <7
- Pin pika to 0.13.1
- Update tests to use async def/wait
- Switch to using unittest for test runner instead of nose
2.2.0 Aug 8, 2019¶
- Fix issue opening a channel is not checking if the conn is still open
- Fix issue with publishing confirmation bookkeeping not reset when channel is reopened
- Add bootstrap and docker-compose instead of using local rabbitmq
- Update CI to run bootstrap before tests
2.1.5 July 3, 2019¶
- Remove official support for python versions less than 3.5
- Add support for tornado 5.X releases
2.1.0 May 3, 2017¶
- Fix intentional closing of an AMQP connection
- New behavior for publishing that raises exception
- Add publisher confirmations
- Make
sprockets.mixins.amqp.install()
work with sprockets.http - Add support for environment variables prefixed with
AMQP_
orRABBITMQ
- Clean up AMQP message property behavior, make defaults, but don’t change already set values
- Automatically create the default
app_id
AMQP message property - Split out tests into a mix of unit tests and integration tests
- Update state behaviors, names, and transitions
- All publishing is mandatory, returned messages are logged, a callback can be registered
2.0.0 Apr 24, 2017¶
- Move Mixin and AMQP client to separate files
- Replace AMQP connection handling code with latest internal version
- Provide ability to register callbacks for ready, unavailable, and persistent failure states
- Remove default AMQP URL from AMQP class, url is now a required parameter for install
- Rename amqp_publish ‘message’ parameter to ‘body’
- Add properties for all AMQP states
- Provide mandatory AMQP properties (app_id, correlation_id, message_id, timestamp) automatically
- Mandatory properties cannot be overridden
- Add unit test coverage for new functionality
- Test execution requires a running AMQP server