rabbitmq module

This module contains utilities for processing RabbitMQ messages. It requires Pika to be installed.

Classes

class flask_signalbus.rabbitmq.Publisher(app=None, *, url_config_key='SIGNALBUS_RABBITMQ_URL')

A RabbitMQ message publisher

Each instance maintains a separate RabbitMQ connection in every thread. If a connection has not been used for longer than the heartbeat interval set for the connection, it will be automatically closed. A new connection will be open when needed.

Parameters:
  • app – Optional Flask app object. If not provided init_app must be called later, providing the Flask app object.
  • url_config_key – Optional configuration key for the RabbitMQ’s connection URL

Example:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_signalbus import rabbitmq

app = Flask(__name__)

headers = {'header1': 'value1', 'header2': 'value2'}
properties = rabbitmq.MessageProperties(
    delivery_mode=2,  # This makes the message persistent!
    app_id='example-publisher',
    content_type='application/json',
    headers=headers,
)
m1 = rabbitmq.Message(exchange='', routing_key='test',
                      body='Message 1', properties=properties)
m2 = rabbitmq.Message(exchange='', routing_key='test',
                      body='Message 2', properties=properties,
                      mandatory=True)

mq = rabbitmq.Publisher(app)
mq.publish_messages([m1, m2])
init_app(app)

Bind the instance to a Flask app object.

Parameters:app – A Flask app object
publish_messages(messages: Iterable[flask_signalbus.rabbitmq.publisher.Message], *, timeout: Optional[int] = None, allow_retry: bool = True)

Publishes messages, waiting for delivery confirmations.

This method will block until a confirmation from the RabbitMQ broker has been received for each of the messages.

Parameters:
  • messages – The messages to publish
  • timeout – Optional timeout in seconds
class flask_signalbus.rabbitmq.Message

A typing.NamedTuple representing a RabbitMQ message to be send

Parameters:
  • exchange – RabbitMQ exchange name
  • routing_key – RabbitMQ routing key
  • body – The message’s body
  • properties – Message properties (see pika.BasicProperties)
  • mandatory – If True, requires the message to be added to at least one queue.
body

Alias for field number 2

exchange

Alias for field number 0

mandatory

Alias for field number 4

properties

Alias for field number 3

routing_key

Alias for field number 1

class flask_signalbus.rabbitmq.MessageProperties

Basic message properties

This is an alias for pika.BasicProperties.

class flask_signalbus.rabbitmq.Consumer(app=None, *, config_prefix='SIGNALBUS_RABBITMQ', url=None, queue=None, threads=None, prefetch_size=None, prefetch_count=None)

A RabbitMQ message consumer

Parameters:
  • app – Optional Flask app object. If not provided init_app must be called later, providing the Flask app object.
  • config_prefix – A prefix for the Flask configuration settings for this consumer instance.
  • url – RabbitMQ’s connection URL. If not passed, the value of the {config_prefix}_URL Flask configuration setting will be used.
  • queue – The name of the RabbitMQ queue to consume from. If not passed, the value of the {config_prefix}_QUEUE Flask configuration setting will be used.
  • threads – The number of worker threads in the pool. If not passed, the value of the {config_prefix}_THREADS Flask configuration setting will be used (the default is 1).
  • prefetch_size – Specifies the prefetch window size. RabbitMQ will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). If not passed, the value of the {config_prefix}_PREFETCH_SIZE Flask configuration setting will be used (the default is 0, meaning “no specific limit”).
  • prefetch_count – Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch_size field. A message will only be sent in advance if both prefetch windows allow it. Setting a bigger value may give a performance improvement. If not passed, the value of the {config_prefix}_PREFETCH_COUNT Flask configuration setting will be used (the default is 1).

The received messages will be processed by a pool of worker threads, created by the consumer instance, after the start method is called. Each consumer instance maintains a separate RabbitMQ connection. If the connection has been closed for some reason, the start method will throw an exception. To continue consuming, the start method can be called again.

This class is meant to be subclassed. For example:

from flask_signalbus import rabbitmq

class ExampleConsumer(rabbitmq.Consumer):
    def process_message(self, body, properties):
        if len(body) == 0:
            return False  # Malformed (empty) message

        # Process the  message here.

        return True  # Successfully processed
init_app(app)

Bind the instance to a Flask app object.

Parameters:app – A Flask app object
process_message(body: bytes, properties: flask_signalbus.rabbitmq.common.MessageProperties) → bool

This method must be implemented by the sub-classes.

Parameters:
  • body – message body
  • properties – message properties

The method should return True if the message has been successfully processed, and can be removed from the queue. If False is returned, this means that the message is malformed, and can not be processed. (Usually, malformed messages will be send to a “dead letter queue”.)

start()

Opens a RabbitMQ connection and starts processing messages until one of the following things happen:

  • The connection has been lost.
  • An error has occurred during message processing.
  • The stop method has been called on the consumer instance.

This method blocks and never returns normally. If one of the previous things happen an TerminatedConsumtion exception will be raised. Also, this method may raise pika.exceptions.AMQPError when, for some reason, a proper RabbitMQ connection can not be established.

stop(signum=None, frame=None)

Orders the consumer to stop.

This is useful for properly handling process termination. For example:

import signal

consumer = Consumer(...)  # creates the instance

signal.signal(signal.SIGINT, consumer.stop)
signal.signal(signal.SIGTERM, consumer.stop)
consumer.start()

Exceptions

class flask_signalbus.rabbitmq.DeliveryError

Bases: Exception

A failed attempt to deliver messages.

class flask_signalbus.rabbitmq.ConnectionError

Bases: flask_signalbus.rabbitmq.publisher.DeliveryError

Can not connect to the server.

class flask_signalbus.rabbitmq.TimeoutError

Bases: flask_signalbus.rabbitmq.publisher.DeliveryError

The attempt to deliver messages has timed out.

class flask_signalbus.rabbitmq.TerminatedConsumtion

Bases: Exception

The consumption has been terminated.