API

The PublishingMixin adds RabbitMQ publishing capabilities to a request handler, with methods to speed the development of publishing RabbitMQ messages.

Configured using the following environment variables:

  • AMQP_URL - The AMQP URL to connect to.
  • AMQP_TIMEOUT - The optional maximum time to wait for a bad state to resolve before treating the failure as persistent.
  • AMQP_RECONNECT_DELAY - The optional time in seconds to wait before reconnecting on connection failure.
  • AMQP_CONNECTION_ATTEMPTS - The optional number of connection attempts to make before giving up.

The AMQP` prefix is interchangeable with RABBITMQ. For example, you can use AMQP_URL or RABBITMQ_URL.

exception sprockets.mixins.amqp.AMQPException(*args)

Base Class for the the AMQP client

class sprockets.mixins.amqp.Client(url, enable_confirmations=True, reconnect_delay=5, connection_attempts=3, default_app_id=None, on_ready_callback=None, on_unavailable_callback=None, on_return_callback=None, io_loop=None)

This class encompasses all of the AMQP/RabbitMQ specific behaviors.

If RabbitMQ closes the connection, it will reopen it. You should look at the output, as there are limited reasons why the connection may be closed, which usually are tied to permission related issues or socket timeouts.

If the channel is closed, it will indicate a problem with one of the commands that were issued and that should surface in the output as well.

blocked

Returns True if the connection is blocked by RabbitMQ.

Return type:bool
closable

Returns True if the connection to RabbitMQ can be closed

Return type:bool
close()

Cleanly shutdown the connection to RabbitMQ

Raises:sprockets.mixins.amqp.ConnectionStateError
closed

Returns True if the connection to RabbitMQ is closed.

Return type:bool
closing

Returns True if the connection to RabbitMQ is closing.

Return type:bool
connect()

This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method will be invoked by pika.

Return type:pika.TornadoConnection
connecting

Returns True if the connection to RabbitMQ is open and a channel is in the process of connecting.

Return type:bool
idle

Returns True if the connection to RabbitMQ is closing.

Return type:bool
static on_back_pressure_detected(obj)

This method is called by pika if it believes that back pressure is being applied to the TCP socket.

Parameters:obj (unknown) – The connection where back pressure is being applied
on_basic_return(_channel, method, properties, body)

Invoke a registered callback or log the returned message.

Parameters:
  • _channel (pika.channel.Channel) – The channel the message was sent on
  • method (pika.spec.Basic.Return) – The method object
  • properties (pika.spec.BasicProperties) – The message properties
  • unicode, bytes body (str,) – The message body
on_channel_closed(channel, reply_code, reply_text)

Invoked by pika when RabbitMQ unexpectedly closes the channel.

Channels are usually closed if you attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters.

In this case, we just want to log the error and create a new channel after setting the state back to connecting.

Parameters:
  • channel (pika.channel.Channel) – The closed channel
  • reply_code (int) – The numeric reason the channel was closed
  • reply_text (str) – The text reason the channel was closed
on_channel_flow(method)

When RabbitMQ indicates the connection is unblocked, set the state appropriately.

Parameters:method (pika.spec.Channel.Flow) – The Channel flow frame
on_channel_open(channel)

This method is invoked by pika when the channel has been opened. The channel object is passed in so we can make use of it.

Parameters:channel (pika.channel.Channel) – The channel object
on_connection_blocked(method_frame)

This method is called by pika if RabbitMQ sends a connection blocked method, to let us know we need to throttle our publishing.

Parameters:method_frame (pika.amqp_object.Method) – The blocked method frame
on_connection_closed(connection, reply_code, reply_text)

This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects.

Parameters:
  • connection (pika.TornadoConnection) – Closed connection
  • reply_code (int) – The server provided reply_code if given
  • reply_text (str) – The server provided reply_text if given
on_connection_open(connection)

This method is called by pika once the connection to RabbitMQ has been established.

on_connection_open_error(connection, error)

Invoked if the connection to RabbitMQ can not be made.

Parameters:error (Exception) – The exception indicating failure
on_connection_unblocked(method_frame)

When RabbitMQ indicates the connection is unblocked, set the state appropriately.

Parameters:method_frame (pika.amqp_object.Method) – Unblocked method frame
on_delivery_confirmation(method_frame)

Invoked by pika when RabbitMQ responds to a Basic.Publish RPC command, passing in either a Basic.Ack or Basic.Nack frame with the delivery tag of the message that was published. The delivery tag is an integer counter indicating the message number that was sent on the channel via Basic.Publish. Here we’re just doing house keeping to keep track of stats and remove message numbers that we expect a delivery confirmation of from the list used to keep track of messages that are pending confirmation.

Parameters:method_frame (pika.frame.Method) – Basic.Ack or Basic.Nack frame
publish(exchange, routing_key, body, properties=None)

Publish a message to RabbitMQ. If the RabbitMQ connection is not established or is blocked, attempt to wait until sending is possible.

Parameters:
  • exchange (str) – The exchange to publish the message to.
  • routing_key (str) – The routing key to publish the message with.
  • body (bytes) – The message body to send.
  • properties (dict) – An optional dict of additional properties to append.
Return type:

tornado.concurrent.Future

Raises:

sprockets.mixins.amqp.NotReadyError

Raises:

sprockets.mixins.amqp.PublishingError

ready

Returns True if the connection to RabbitMQ is established and we can publish to it.

Return type:bool
state_description

Return the human understandable state description.

Return type:str
exception sprockets.mixins.amqp.ConnectionStateError(*args)

Invoked when reconnect is attempted but the state is incorrect

exception sprockets.mixins.amqp.NotReadyError(*args)

Raised if the Client.publish() is invoked and the connection is not ready for publishing.

exception sprockets.mixins.amqp.PublishingFailure(*args)

Raised if the Client.publish() is invoked and an error occurs or the message delivery is not confirmed.

class sprockets.mixins.amqp.PublishingMixin

This mixin adds publishing messages to RabbitMQ. It uses a persistent connection and channel opened when the application start up and automatically reopened if closed by RabbitMQ

amqp_publish(exchange, routing_key, body, properties=None)

Publish a message to RabbitMQ

Parameters:
  • exchange (str) – The exchange to publish the message to
  • routing_key (str) – The routing key to publish the message with
  • body (bytes) – The message body to send
  • properties (dict) – An optional dict of AMQP properties
Return type:

tornado.concurrent.Future

Raises:

sprockets.mixins.amqp.AMQPError

Raises:

sprockets.mixins.amqp.NotReadyError

Raises:

sprockets.mixins.amqp.PublishingError

sprockets.mixins.amqp.install(application, io_loop=None, **kwargs)

Call this to install AMQP for the Tornado application. Additional keyword arguments are passed through to the constructor of the AMQP object.

Parameters:
Return type:

bool