rabbitmq
module¶
This module contains utilities for processing RabbitMQ messages. It requires Pika to be installed.
Classes¶
-
class
flask_signalbus.rabbitmq.
Publisher
(app=None, *, url_config_key='SIGNALBUS_RABBITMQ_URL')¶ A RabbitMQ message publisher
Each instance maintains a separate RabbitMQ connection in every thread. If a connection has not been used for longer than the heartbeat interval set for the connection, it will be automatically closed. A new connection will be open when needed.
Parameters: - app – Optional Flask app object. If not provided
init_app
must be called later, providing the Flask app object. - url_config_key – Optional configuration key for the RabbitMQ’s connection URL
Example:
from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_signalbus import rabbitmq app = Flask(__name__) headers = {'header1': 'value1', 'header2': 'value2'} properties = rabbitmq.MessageProperties( delivery_mode=2, # This makes the message persistent! app_id='example-publisher', content_type='application/json', headers=headers, ) m1 = rabbitmq.Message(exchange='', routing_key='test', body='Message 1', properties=properties) m2 = rabbitmq.Message(exchange='', routing_key='test', body='Message 2', properties=properties, mandatory=True) mq = rabbitmq.Publisher(app) mq.publish_messages([m1, m2])
-
init_app
(app)¶ Bind the instance to a Flask app object.
Parameters: app – A Flask app object
-
publish_messages
(messages: Iterable[flask_signalbus.rabbitmq.publisher.Message], *, timeout: Optional[int] = None, allow_retry: bool = True)¶ Publishes messages, waiting for delivery confirmations.
This method will block until a confirmation from the RabbitMQ broker has been received for each of the messages.
Parameters: - messages – The messages to publish
- timeout – Optional timeout in seconds
- app – Optional Flask app object. If not provided
-
class
flask_signalbus.rabbitmq.
Message
¶ A
typing.NamedTuple
representing a RabbitMQ message to be sendParameters: - exchange – RabbitMQ exchange name
- routing_key – RabbitMQ routing key
- body – The message’s body
- properties – Message properties (see
pika.BasicProperties
) - mandatory – If
True
, requires the message to be added to at least one queue.
-
body
¶ Alias for field number 2
-
exchange
¶ Alias for field number 0
-
mandatory
¶ Alias for field number 4
-
properties
¶ Alias for field number 3
-
routing_key
¶ Alias for field number 1
-
class
flask_signalbus.rabbitmq.
MessageProperties
¶ Basic message properties
This is an alias for
pika.BasicProperties
.
-
class
flask_signalbus.rabbitmq.
Consumer
(app=None, *, config_prefix='SIGNALBUS_RABBITMQ', url=None, queue=None, threads=None, prefetch_size=None, prefetch_count=None)¶ A RabbitMQ message consumer
Parameters: - app – Optional Flask app object. If not provided
init_app
must be called later, providing the Flask app object. - config_prefix – A prefix for the Flask configuration settings for this consumer instance.
- url – RabbitMQ’s connection URL. If not passed, the value of
the
{config_prefix}_URL
Flask configuration setting will be used. - queue – The name of the RabbitMQ queue to consume from. If
not passed, the value of the
{config_prefix}_QUEUE
Flask configuration setting will be used. - threads – The number of worker threads in the pool. If not
passed, the value of the
{config_prefix}_THREADS
Flask configuration setting will be used (the default is 1). - prefetch_size – Specifies the prefetch window size. RabbitMQ
will send a message in advance if it is equal to or smaller in
size than the available prefetch size (and also falls into other
prefetch limits). If not passed, the value of the
{config_prefix}_PREFETCH_SIZE
Flask configuration setting will be used (the default is 0, meaning “no specific limit”). - prefetch_count – Specifies a prefetch window in terms of
whole messages. This field may be used in combination with the
prefetch_size field. A message will only be sent in advance if
both prefetch windows allow it. Setting a bigger value may give
a performance improvement. If not passed, the value of the
{config_prefix}_PREFETCH_COUNT
Flask configuration setting will be used (the default is 1).
The received messages will be processed by a pool of worker threads, created by the consumer instance, after the
start
method is called. Each consumer instance maintains a separate RabbitMQ connection. If the connection has been closed for some reason, thestart
method will throw an exception. To continue consuming, thestart
method can be called again.This class is meant to be subclassed. For example:
from flask_signalbus import rabbitmq class ExampleConsumer(rabbitmq.Consumer): def process_message(self, body, properties): if len(body) == 0: return False # Malformed (empty) message # Process the message here. return True # Successfully processed
-
init_app
(app)¶ Bind the instance to a Flask app object.
Parameters: app – A Flask app object
-
process_message
(body: bytes, properties: flask_signalbus.rabbitmq.common.MessageProperties) → bool¶ This method must be implemented by the sub-classes.
Parameters: - body – message body
- properties – message properties
The method should return
True
if the message has been successfully processed, and can be removed from the queue. IfFalse
is returned, this means that the message is malformed, and can not be processed. (Usually, malformed messages will be send to a “dead letter queue”.)
-
start
()¶ Opens a RabbitMQ connection and starts processing messages until one of the following things happen:
- The connection has been lost.
- An error has occurred during message processing.
- The
stop
method has been called on the consumer instance.
This method blocks and never returns normally. If one of the previous things happen an
TerminatedConsumtion
exception will be raised. Also, this method may raisepika.exceptions.AMQPError
when, for some reason, a proper RabbitMQ connection can not be established.
-
stop
(signum=None, frame=None)¶ Orders the consumer to stop.
This is useful for properly handling process termination. For example:
import signal consumer = Consumer(...) # creates the instance signal.signal(signal.SIGINT, consumer.stop) signal.signal(signal.SIGTERM, consumer.stop) consumer.start()
- app – Optional Flask app object. If not provided
Exceptions¶
-
class
flask_signalbus.rabbitmq.
DeliveryError
¶ Bases:
Exception
A failed attempt to deliver messages.
-
class
flask_signalbus.rabbitmq.
ConnectionError
¶ Bases:
flask_signalbus.rabbitmq.publisher.DeliveryError
Can not connect to the server.
-
class
flask_signalbus.rabbitmq.
TimeoutError
¶ Bases:
flask_signalbus.rabbitmq.publisher.DeliveryError
The attempt to deliver messages has timed out.