sprockets.mixins.amqp

AMQP Publishing Mixin for Tornado RequestHandlers.

Version Downloads Build Status CodeCov Docs

Installation

sprockets.mixins.amqp is available on the Python Package Index and can be installed via pip:

pip3 install sprockets.mixins.amqp

Python Compatibility

  • Python 3.7
  • Python 3.8

Requirements

  • Pika==0.13.1
  • Tornado>=6,<7

Example

You may optionally install sprockets.mixins.correlation into your request handler to take advantage of automatic correlation_id fetching. Otherwise, be sure to set correlation_id as an instance variable on your request handler before sending AMQP messages.

This examples demonstrates the most basic usage of sprockets.mixins.amqp

export AMQP_URL="amqp://user:password@rabbitmq_host:5672/%2f"
python my-example-app.py
import json

from tornado import gen, web
from sprockets.mixins import amqp

def make_app(**settings):
    application = web.Application(
        [
            web.url(r'/', RequestHandler),
        ], **settings)

    amqp_settings = {
        "reconnect_delay": 5,
    }

    amqp.install(application, **amqp_settings)
    return application


class RequestHandler(amqp.PublishingMixin,
                     correlation.HandlerMixin,
                     web.RequestHandler):

    async def get(self, *args, **kwargs):
        body = {'request': self.request.path, 'args': args, 'kwargs': kwargs}
        await self.amqp_publish(
            'exchange',
            'routing.key',
            json.dumps(body),
            {'content_type': 'application/json'}
        )

AMQP Settings

url:The AMQP URL to connect to.
reconnect_delay:
 The optional time in seconds to wait before reconnecting on connection failure.
timeout:The optional maximum time to wait for a bad state to resolve before treating the failure as persistent.
connection_attempts:
 The optional number of connection attempts to make before giving up.
on_ready_callback:
 The optional callback to call when the connection to the AMQP server has been established and is ready.
on_unavailable_callback:
 The optional callback to call when the connection to the AMQP server becomes unavailable.
on_persistent_failure_callback:
 The optional callback to call when the connection failure does not resolve itself within the timeout.
on_message_returned_callback:
 The optional callback to call when the AMQP server returns a message.
ioloop:An optional IOLoop to override the default with.

Environment Variables

Any environment variables set will override the corresponding AMQP settings passed into install()

  • AMQP_URL
  • AMQP_TIMEOUT
  • AMQP_RECONNECT_DELAY
  • AMQP_CONNECTION_ATTEMPTS

Source

sprockets.mixins.amqp source is available on Github at https://github.com/sprockets/sprockets.mixins.amqp

Running Tests Locally

You’ll need to have python 3.7, Docker and Docker Compose installed.

$ python3 -m venv env
$ env/bin/activate
(env) $ pip3 install -r requires/testing.txt
(env) $ ./bootstrap.sh
(env) $ coverage run && coverage report

License

sprockets.mixins.amqp is released under the 3-Clause BSD license.

Issues

Please report any issues to the Github project at https://github.com/sprockets/sprockets.mixins.amqp/issues

API

The PublishingMixin adds RabbitMQ publishing capabilities to a request handler, with methods to speed the development of publishing RabbitMQ messages.

Configured using the following environment variables:

  • AMQP_URL - The AMQP URL to connect to.
  • AMQP_TIMEOUT - The optional maximum time to wait for a bad state to resolve before treating the failure as persistent.
  • AMQP_RECONNECT_DELAY - The optional time in seconds to wait before reconnecting on connection failure.
  • AMQP_CONNECTION_ATTEMPTS - The optional number of connection attempts to make before giving up.

The AMQP` prefix is interchangeable with RABBITMQ. For example, you can use AMQP_URL or RABBITMQ_URL.

exception sprockets.mixins.amqp.AMQPException(*args)

Base Class for the the AMQP client

class sprockets.mixins.amqp.Client(url, enable_confirmations=True, reconnect_delay=5, connection_attempts=3, default_app_id=None, on_ready_callback=None, on_unavailable_callback=None, on_return_callback=None, io_loop=None)

This class encompasses all of the AMQP/RabbitMQ specific behaviors.

If RabbitMQ closes the connection, it will reopen it. You should look at the output, as there are limited reasons why the connection may be closed, which usually are tied to permission related issues or socket timeouts.

If the channel is closed, it will indicate a problem with one of the commands that were issued and that should surface in the output as well.

blocked

Returns True if the connection is blocked by RabbitMQ.

Return type:bool
closable

Returns True if the connection to RabbitMQ can be closed

Return type:bool
close()

Cleanly shutdown the connection to RabbitMQ

Raises:sprockets.mixins.amqp.ConnectionStateError
closed

Returns True if the connection to RabbitMQ is closed.

Return type:bool
closing

Returns True if the connection to RabbitMQ is closing.

Return type:bool
connect()

This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method will be invoked by pika.

Return type:pika.TornadoConnection
connecting

Returns True if the connection to RabbitMQ is open and a channel is in the process of connecting.

Return type:bool
idle

Returns True if the connection to RabbitMQ is closing.

Return type:bool
static on_back_pressure_detected(obj)

This method is called by pika if it believes that back pressure is being applied to the TCP socket.

Parameters:obj (unknown) – The connection where back pressure is being applied
on_basic_return(_channel, method, properties, body)

Invoke a registered callback or log the returned message.

Parameters:
  • _channel (pika.channel.Channel) – The channel the message was sent on
  • method (pika.spec.Basic.Return) – The method object
  • properties (pika.spec.BasicProperties) – The message properties
  • unicode, bytes body (str,) – The message body
on_channel_closed(channel, reply_code, reply_text)

Invoked by pika when RabbitMQ unexpectedly closes the channel.

Channels are usually closed if you attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters.

In this case, we just want to log the error and create a new channel after setting the state back to connecting.

Parameters:
  • channel (pika.channel.Channel) – The closed channel
  • reply_code (int) – The numeric reason the channel was closed
  • reply_text (str) – The text reason the channel was closed
on_channel_flow(method)

When RabbitMQ indicates the connection is unblocked, set the state appropriately.

Parameters:method (pika.spec.Channel.Flow) – The Channel flow frame
on_channel_open(channel)

This method is invoked by pika when the channel has been opened. The channel object is passed in so we can make use of it.

Parameters:channel (pika.channel.Channel) – The channel object
on_connection_blocked(method_frame)

This method is called by pika if RabbitMQ sends a connection blocked method, to let us know we need to throttle our publishing.

Parameters:method_frame (pika.amqp_object.Method) – The blocked method frame
on_connection_closed(connection, reply_code, reply_text)

This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects.

Parameters:
  • connection (pika.TornadoConnection) – Closed connection
  • reply_code (int) – The server provided reply_code if given
  • reply_text (str) – The server provided reply_text if given
on_connection_open(connection)

This method is called by pika once the connection to RabbitMQ has been established.

on_connection_open_error(connection, error)

Invoked if the connection to RabbitMQ can not be made.

Parameters:error (Exception) – The exception indicating failure
on_connection_unblocked(method_frame)

When RabbitMQ indicates the connection is unblocked, set the state appropriately.

Parameters:method_frame (pika.amqp_object.Method) – Unblocked method frame
on_delivery_confirmation(method_frame)

Invoked by pika when RabbitMQ responds to a Basic.Publish RPC command, passing in either a Basic.Ack or Basic.Nack frame with the delivery tag of the message that was published. The delivery tag is an integer counter indicating the message number that was sent on the channel via Basic.Publish. Here we’re just doing house keeping to keep track of stats and remove message numbers that we expect a delivery confirmation of from the list used to keep track of messages that are pending confirmation.

Parameters:method_frame (pika.frame.Method) – Basic.Ack or Basic.Nack frame
publish(exchange, routing_key, body, properties=None)

Publish a message to RabbitMQ. If the RabbitMQ connection is not established or is blocked, attempt to wait until sending is possible.

Parameters:
  • exchange (str) – The exchange to publish the message to.
  • routing_key (str) – The routing key to publish the message with.
  • body (bytes) – The message body to send.
  • properties (dict) – An optional dict of additional properties to append.
Return type:

tornado.concurrent.Future

Raises:

sprockets.mixins.amqp.NotReadyError

Raises:

sprockets.mixins.amqp.PublishingError

ready

Returns True if the connection to RabbitMQ is established and we can publish to it.

Return type:bool
state_description

Return the human understandable state description.

Return type:str
exception sprockets.mixins.amqp.ConnectionStateError(*args)

Invoked when reconnect is attempted but the state is incorrect

exception sprockets.mixins.amqp.NotReadyError(*args)

Raised if the Client.publish() is invoked and the connection is not ready for publishing.

exception sprockets.mixins.amqp.PublishingFailure(*args)

Raised if the Client.publish() is invoked and an error occurs or the message delivery is not confirmed.

class sprockets.mixins.amqp.PublishingMixin

This mixin adds publishing messages to RabbitMQ. It uses a persistent connection and channel opened when the application start up and automatically reopened if closed by RabbitMQ

amqp_publish(exchange, routing_key, body, properties=None)

Publish a message to RabbitMQ

Parameters:
  • exchange (str) – The exchange to publish the message to
  • routing_key (str) – The routing key to publish the message with
  • body (bytes) – The message body to send
  • properties (dict) – An optional dict of AMQP properties
Return type:

tornado.concurrent.Future

Raises:

sprockets.mixins.amqp.AMQPError

Raises:

sprockets.mixins.amqp.NotReadyError

Raises:

sprockets.mixins.amqp.PublishingError

sprockets.mixins.amqp.install(application, io_loop=None, **kwargs)

Call this to install AMQP for the Tornado application. Additional keyword arguments are passed through to the constructor of the AMQP object.

Parameters:
Return type:

bool

Version History

3.0.1 July 10, 2020

  • Exclude tests from being installed with package

3.0.0 Mar 5, 2020

  • Drop Python 3.5 & 3.6 support
  • Add Python 3.8 support
  • Bump Tornado to >= 6, <7
  • Pin pika to 0.13.1
  • Update tests to use async def/wait
  • Switch to using unittest for test runner instead of nose

2.2.0 Aug 8, 2019

  • Fix issue opening a channel is not checking if the conn is still open
  • Fix issue with publishing confirmation bookkeeping not reset when channel is reopened
  • Add bootstrap and docker-compose instead of using local rabbitmq
  • Update CI to run bootstrap before tests

2.1.5 July 3, 2019

  • Remove official support for python versions less than 3.5
  • Add support for tornado 5.X releases

2.1.4 Jan 24, 2019

  • Pin pika to 0.12.0, 0.11.0 has issues with Python 3.7

2.1.3 Jan 23, 2019

  • Pin pika to 0.11.0

2.1.2 May 3, 2017

  • Move setting default properties up one level to allow access when not ready

2.1.1 May 3, 2017

  • Fix overlogging

2.1.0 May 3, 2017

  • Fix intentional closing of an AMQP connection
  • New behavior for publishing that raises exception
  • Add publisher confirmations
  • Make sprockets.mixins.amqp.install() work with sprockets.http
  • Add support for environment variables prefixed with AMQP_ or RABBITMQ
  • Clean up AMQP message property behavior, make defaults, but don’t change already set values
  • Automatically create the default app_id AMQP message property
  • Split out tests into a mix of unit tests and integration tests
  • Update state behaviors, names, and transitions
  • All publishing is mandatory, returned messages are logged, a callback can be registered

2.0.0 Apr 24, 2017

  • Move Mixin and AMQP client to separate files
  • Replace AMQP connection handling code with latest internal version
  • Provide ability to register callbacks for ready, unavailable, and persistent failure states
  • Remove default AMQP URL from AMQP class, url is now a required parameter for install
  • Rename amqp_publish ‘message’ parameter to ‘body’
  • Add properties for all AMQP states
  • Provide mandatory AMQP properties (app_id, correlation_id, message_id, timestamp) automatically
    • Mandatory properties cannot be overridden
  • Add unit test coverage for new functionality
    • Test execution requires a running AMQP server

1.0.1 Feb 28, 2016

  • Fixed documentation links and generation.

1.0.0 Mar 15, 2016

  • Connect to AMQP in sprockets.mixins.amqp.install and maintain and persist connection
  • Change to use tornado locks.Condition vs locks.Event

0.1.4 Mar 09, 2016

  • Reconnect in connection close callback

0.1.3 Sept 28, 2015

  • Use packages instead of py_modules

0.1.2 Sept 25, 2015

  • Don’t log the message body

0.1.1 Sept 24, 2015

  • Clean up installation and testing environment

0.1.0 Sept 23, 2015

  • Initial implementation