Welcome to Flask-SignalBus’s documentation!¶
Release v0.5.20. (Command Line Interface, API Reference, Source Code)
Flask-SignalBus adds to Flask-SQLAlchemy the capability to atomically send messages (signals) over a message bus.
Important note: Flask-SignalBus does work with Flask-SQLAlchemy 2.5, and does not work with Flask-SQLAlchemy 3.0 or later. And for this reason, SQLAlchemy 2.0 or later is not supported.
The Problem¶
In microservices, the temptation to do distributed transactions pops up all the time.
- Distributed transaction:
- any situation where a single event results in the mutation of two separate sources of data which cannot be committed atomically
One practical and popular solution is to pick one of the services to be the primary handler for some particular event. This service will handle the original event with a single commit, and then take responsibility for asynchronously communicating the secondary effects to other services via a message bus of some sort (RabbitMQ, Kafka, etc.).
Thus, the processing of each “distributed” event involves three steps:
- As part of the original event transaction, one or more messages are recorded in the SQL database of the primary handler service (as rows in tables).
- The messages are sent over the message bus.
- Messages’ corresponding table rows are deleted.
Flask-SignalBus automates this process and make is less error prone.
It can automatically send the recorded messages after each transaction
commit (steps 2 and 3). Also, the sending of the recorded messages can
be triggered explicitly with a method call
, or
through the Flask Command Line Interface.
Usage¶
Each type of message (signal) that we plan to send over the message bus should have its own database model class defined. For example:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_signalbus import SignalBus
app = Flask(__name__)
db = SQLAlchemy(app)
signalbus = SignalBus(db)
class MySignal(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
message_text = db.Column(db.Text, nullable=False)
def send_signalbus_message(self):
# Write some code here, that sends
# the message over the message bus!
Here, MySignal
represent one particular type of message that we
will be sending over the message bus.
Auto-flushing¶
Each time we add a new object of type MySignal
to db.session
,
Flask-SignalBus will take note of that, and finally, when the database
transaction is committed, it will call the
MySignal.send_signalbus_message
method, and delete the
corresponding row from the database table. All this will happen
automatically, so that the only thing we need to do as a part of the
database transaction, is to add our message to db.session
:
# =========== Our transaction begins here. ===========
# We may insert/delete/update some database rows here!!!
# Here we add our message to the database session:
db.session.add(MySignal(message_text='Message in a Bottle'))
# We may insert/delete/update some database rows here too!!!
db.commit()
# Our transaction is committed. The message has been sent
# over the message bus. The corresponding row in the
# database table has been deleted. Auto-magically!
Within one database transaction we can add many messages (signals) of
many different types. As long as they have a
send_signalbus_message
method defined, they all will be processed
and sent automatically (flushed).
This auto-flushing behavior can be disabled if it is not desired. In this case, the sending of the recorded messages need to be triggered explicitly.
Pending Signals¶
When auto-flushing is disabled, or when the program has stopped before the message had been sent over the message bus, the row representing the message will remain in the database for some time. We call this a pending signal.
To make sure that pending signals are processed in time, even when the
application that generated them is off-line, it is recommended that
pending signals are flushed periodically, independently from the
application that generates them. This can be done in a cron
job
for example. (See Command Line Interface.)
Application Factory Pattern¶
If you want to use the Flask application factory pattern with
Flask-SignalBus, you should subclass the
SQLAlchemy
class, adding the
SignalBusMixin
mixin to it. For example:
from flask_sqlalchemy import SQLAlchemy
from flask_signalbus import SignalBusMixin
class CustomSQLAlchemy(SignalBusMixin, SQLAlchemy):
pass
db = CustomSQLAlchemy()
Note that SignalBusMixin
should always come before
SQLAlchemy
.
Message Ordering, Message Duplication¶
Normally, Flask-SignalBus does not give guarantees about the order in which the messages are sent over the message bus. Also, sometimes a single message can be sent more than once. Keep that in mind while designing your system.
When you want to guarantee that messages are sent in a particular
order, you should disable auto-flushing, define a
signalbus_order_by
attribute on the model class, and always use
the flushordered
CLI command to flush the messages
explicitly. (Messages can still be sent more than once, though.)
Transaction Management Utilities¶
As a bonus, Flask-SignalBus offers some utilities for transaction
management. See AtomicProceduresMixin
for
details.
Contents:
Command Line Interface¶
Flask-SignalBus will register a group of Flask CLI commands, starting
with the prefix signalbus
. To see all available commands, use:
$ flask signalbus --help
To flush pending signals which have failed to auto-flush, use:
$ flask signalbus flush
To send a potentially huge number of pending signals, use:
$ flask signalbus flushmany
To send all pending signals in predictable order, use:
$ flask signalbus flushordered
For each of these commands, you can specify the exact type of signals on which to operate.
API Reference¶
Signal Model¶
A signal model is an otherwise normal database model class (a
subclass of db.Model
), which however has a
send_signalbus_message
method defined. For example:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy(app)
class MySignal(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
message_text = db.Column(db.Text, nullable=False)
signalbus_autoflush = False
signalbus_order_by = (id, db.desc(message_text))
def send_signalbus_message(self):
# Send the message to the message bus.
print(MySignal.__marshmallow_schema__.dumps(self))
The
send_signalbus_message
method should be implemented in such a way that when it returns, the message is guaranteed to be successfully sent and stored by the broker. Normally, this means that an acknowledge has been received for the message from the broker.The signal model class may have a
send_signalbus_messages
class method which accepts one positional argument: an iterable of instances of the class. The method should be implemented in such a way that when it returns, all messages for the passed instances are guaranteed to be successfully sent and stored by the broker. Implementing asend_signalbus_messages
class method can greatly improve performance, because message brokers are usually optimized to process messages in batches much more efficiently.The signal model class may have a
signalbus_burst_count
integer attribute defined, which determines how many individual signals can be sent and deleted at once, as a part of one database transaction. This can greatly improve performace in some cases when auto-flushing is disabled, especially when thesend_signalbus_messages
class method is implemented efficiently. If not defined, it defaults to1
.The signal model class may have a
signalbus_autoflush
boolean attribute defined, which determines if signals of that type will be automatically sent over the message bus after each transaction commit. If not defined, it defaults toTrue
.The signal model class may have a
signalbus_order_by
tuple attribute defined, which determines the order in which signals will be send over the network by theflushordered
CLI command. If not defined, signals will not be ordered.Flask-SignalBus will automatically (after
sqlalchemy.orm.configure_mappers()
is invoked) add a bunch of attributes on each signal model class. These are useful when serializing instances, before sending them over the network:__marshmallow__
An auto-generated Marshmallow schema class for serializing and deserializing instances of the model class. It is a subclass of
marshmallow.Schema
. This attribute will be automatically added to all model classes (signal or non-signal). If the__marshmallow__
attribute happens to be defined in the model class, it will not be overridden.__marshmallow_schema__
An instance of the class referred by the
__marshmallow__
attribute.
Classes¶
-
class
flask_signalbus.
SignalBus
(db, init_app=True)¶ Instances of this class automatically send signal messages that have been recorded in the SQL database, over a message bus. Normally, the sending of the recorded messages (if there are any) is done after each transaction commit, but it also can be triggered explicitly by a command.
Parameters: db – The flask_sqlalchemy.SQLAlchemy
instanceFor example:
from flask_sqlalchemy import SQLAlchemy from flask_signalbus import SignalBus app = Flask(__name__) db = SQLAlchemy(app) signalbus = SignalBus(db) signalbus.flush()
-
autoflush
¶ Setting this property to
False
instructs theSignalBus
instance to not automatically flush pending signals after each transaction commit. Setting it back toTrue
restores the default behavior.
-
flush
(models=None, wait=3.0)¶ Send pending signals over the message bus.
This method assumes that auto-flushing is enabled for the given signal types, and therefore the number of pending signals is not too big. Having multiple processes that run this method in parallel is generally not a good idea.
Parameters: - models (list(Signal Model) or
None
) – If passed, flushes only signals of the specified types. - wait (float) – The number of seconds the method will wait after obtaining the list of pending signals, to allow auto-flushing senders to complete
Returns: The total number of signals that have been sent
- models (list(Signal Model) or
-
flushmany
(models=None)¶ Send a potentially huge number of pending signals over the message bus.
This method assumes that the number of pending signals might be huge. Using
SignalBus.flushmany
when auto-flushing is enabled for the given signal types is not recommended, because it may result in multiple delivery of messages.SignalBus.flushmany
can be very useful when recovering from long periods of disconnectedness from the message bus, or when auto-flushing is disabled. If your database (and its SQLAlchemy dialect) supportsFOR UPDATE SKIP LOCKED
, multiple processes will be able to run this method in parallel, without stepping on each others’ toes.Parameters: models (list(Signal Model) or None
) – If passed, flushes only signals of the specified types.Returns: The total number of signals that have been sent
-
flushordered
(models=None)¶ Send all pending messages in predictable order.
The order is defined by the
signalbus_order_by
attribute of the model class. When auto-flushing is disabled for the given signal types, this method guarantes that messages will be sent in the correct order. Having multiple processes that run this method in parallel is generally not a good idea.Parameters: models (list(Signal Model) or None
) – If passed, flushes only signals of the specified types.Returns: The total number of signals that have been sent
-
get_signal_models
()¶ Return all signal types in a list.
Return type: list(Signal Model)
-
Mixins¶
-
class
flask_signalbus.
SignalBusMixin
(*args, **kwargs)¶ A mixin class that can be used to extend
SQLAlchemy
to handle signals.For example:
from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_signalbus import SignalBusMixin class CustomSQLAlchemy(SignalBusMixin, SQLAlchemy): pass app = Flask(__name__) db = CustomSQLAlchemy(app) db.signalbus.flush()
-
init_app
(app, *args, **kwargs)¶ Bind the instance to a Flask app object.
Parameters: app – A Flask app object
-
-
class
flask_signalbus.
AtomicProceduresMixin
¶ A mixin class that adds utility functions to
flask_sqlalchemy.SQLAlchemy
and the declarative base.For example:
from flask_sqlalchemy import SQLAlchemy from flask_signalbus import AtomicProceduresMixin class CustomSQLAlchemy(AtomicProceduresMixin, SQLAlchemy): pass db = CustomSQLAlchemy() # Now `AtomicProceduresMixin` method are available in `db`.
Note that when subclassing,
AtomicProceduresMixin
should always come beforeflask_sqlalchemy.SQLAlchemy
. AddingAtomicProceduresMixin
has several useful results:AtomicProceduresMixin
method will be available indb
.- The classmethods from
_ModelUtilitiesMixin
will be available in the declarative base class (db.Model
), and therefore in every model class. - Database isolation level will be set to
REPEATABLE_READ
.
-
atomic
(func)¶ A decorator that wraps a function in an atomic block.
Example:
db = CustomSQLAlchemy() @db.atomic def f(): write_to_db('a message') return 'OK' assert f() == 'OK'
This code defines the function
f
, which is wrapped in an atomic block. Wrapping a function in an atomic block gives several guarantees:- The database transaction will be automatically committed if the function returns normally, and automatically rolled back if the function raises unhandled exception.
- When the transaction is committed, all objects in
db.session
will be expunged. This means that no lazy loading will be performed on them. - If a transaction serialization error occurs during the execution of the function, the function will be re-executed. (It might be re-executed several times.)
Atomic blocks can be nested, but in this case the outermost block takes full control of transaction’s life-cycle, and inner blocks do nothing.
-
execute_atomic
(func)¶ A decorator that executes a function in an atomic block (see
atomic()
).Example:
db = CustomSQLAlchemy() @db.execute_atomic def result(): write_to_db('a message') return 'OK' assert result == 'OK'
This code defines and executes the function
result
in an atomic block. At the end, the nameresult
holds the value returned from the function.
-
retry_on_integrity_error
()¶ Re-raise
IntegrityError
asDBSerializationError
.This is mainly useful to handle race conditions in atomic blocks. For example, even if prior to a database INSERT we have verified that there is no existing row with the given primary key, we still may get an
IntegrityError
if another transaction inserted a row with this primary key in the meantime. But if we do (within an atomic block):with db.retry_on_integrity_error(): db.session.add(instance)
then if the before-mentioned race condition occurs,
DBSerializationError
will be raised instead ofIntegrityError
, so that the transaction will be retried (by the atomic block), and the second time our prior-to-INSERT check will correctly detect a primary key collision.Note:
retry_on_integrity_error()
triggers a session flush.
-
class
flask_signalbus.atomic.
_ModelUtilitiesMixin
¶ -
classmethod
get_instance
(instance_or_pk, *options)¶ Return a model instance in
db.session
orNone
.Parameters: - instance_or_pk – An instance of this model class, or a primary key. A composite primary key can be passed as a tuple.
- options – Arguments to be passed to
options()
.
Example:
@db.atomic def increase_account_balance(account, amount): # Here `Account` is a subclass of `db.Model`. account = Account.get_instance(account) account.balance += amount # Now `increase_account_balance` can be # called with an account instance: increase_account_balance(my_account, 100.00) # or with an account primary key (1234): increase_account_balance(1234, 100.00)
-
classmethod
get_pk_values
(instance_or_pk)¶ Return a primary key as a tuple.
Parameters: instance_or_pk – An instance of this model class, or a primary key. A composite primary key can be passed as a tuple.
-
classmethod
lock_instance
(instance_or_pk, *options, **kw)¶ Return a locked model instance in
db.session
orNone
.Parameters: - instance_or_pk – An instance of this model class, or a primary key. A composite primary key can be passed as a tuple.
- options – Arguments to be passed to
options()
. - kw – Arguments to be passed to
with_for_update()
.
-
classmethod
rabbitmq
module¶
This module contains utilities for processing RabbitMQ messages. It requires Pika to be installed.
Classes¶
-
class
flask_signalbus.rabbitmq.
Publisher
(app=None, *, url_config_key='SIGNALBUS_RABBITMQ_URL')¶ A RabbitMQ message publisher
Each instance maintains a separate RabbitMQ connection in every thread. If a connection has not been used for longer than the heartbeat interval set for the connection, it will be automatically closed. A new connection will be open when needed.
Parameters: - app – Optional Flask app object. If not provided
init_app
must be called later, providing the Flask app object. - url_config_key – Optional configuration key for the RabbitMQ’s connection URL
Example:
from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_signalbus import rabbitmq app = Flask(__name__) headers = {'header1': 'value1', 'header2': 'value2'} properties = rabbitmq.MessageProperties( delivery_mode=2, # This makes the message persistent! app_id='example-publisher', content_type='application/json', headers=headers, ) m1 = rabbitmq.Message(exchange='', routing_key='test', body='Message 1', properties=properties) m2 = rabbitmq.Message(exchange='', routing_key='test', body='Message 2', properties=properties, mandatory=True) mq = rabbitmq.Publisher(app) mq.publish_messages([m1, m2])
-
init_app
(app)¶ Bind the instance to a Flask app object.
Parameters: app – A Flask app object
-
publish_messages
(messages: Iterable[flask_signalbus.rabbitmq.publisher.Message], *, timeout: Optional[int] = None, allow_retry: bool = True)¶ Publishes messages, waiting for delivery confirmations.
This method will block until a confirmation from the RabbitMQ broker has been received for each of the messages.
Parameters: - messages – The messages to publish
- timeout – Optional timeout in seconds
- app – Optional Flask app object. If not provided
-
class
flask_signalbus.rabbitmq.
Message
¶ A
typing.NamedTuple
representing a RabbitMQ message to be sendParameters: - exchange – RabbitMQ exchange name
- routing_key – RabbitMQ routing key
- body – The message’s body
- properties – Message properties (see
pika.BasicProperties
) - mandatory – If
True
, requires the message to be added to at least one queue.
-
body
¶ Alias for field number 2
-
exchange
¶ Alias for field number 0
-
mandatory
¶ Alias for field number 4
-
properties
¶ Alias for field number 3
-
routing_key
¶ Alias for field number 1
-
class
flask_signalbus.rabbitmq.
MessageProperties
¶ Basic message properties
This is an alias for
pika.BasicProperties
.
-
class
flask_signalbus.rabbitmq.
Consumer
(app=None, *, config_prefix='SIGNALBUS_RABBITMQ', url=None, queue=None, threads=None, prefetch_size=None, prefetch_count=None)¶ A RabbitMQ message consumer
Parameters: - app – Optional Flask app object. If not provided
init_app
must be called later, providing the Flask app object. - config_prefix – A prefix for the Flask configuration settings for this consumer instance.
- url – RabbitMQ’s connection URL. If not passed, the value of
the
{config_prefix}_URL
Flask configuration setting will be used. - queue – The name of the RabbitMQ queue to consume from. If
not passed, the value of the
{config_prefix}_QUEUE
Flask configuration setting will be used. - threads – The number of worker threads in the pool. If not
passed, the value of the
{config_prefix}_THREADS
Flask configuration setting will be used (the default is 1). - prefetch_size – Specifies the prefetch window size. RabbitMQ
will send a message in advance if it is equal to or smaller in
size than the available prefetch size (and also falls into other
prefetch limits). If not passed, the value of the
{config_prefix}_PREFETCH_SIZE
Flask configuration setting will be used (the default is 0, meaning “no specific limit”). - prefetch_count – Specifies a prefetch window in terms of
whole messages. This field may be used in combination with the
prefetch_size field. A message will only be sent in advance if
both prefetch windows allow it. Setting a bigger value may give
a performance improvement. If not passed, the value of the
{config_prefix}_PREFETCH_COUNT
Flask configuration setting will be used (the default is 1).
The received messages will be processed by a pool of worker threads, created by the consumer instance, after the
start
method is called. Each consumer instance maintains a separate RabbitMQ connection. If the connection has been closed for some reason, thestart
method will throw an exception. To continue consuming, thestart
method can be called again.This class is meant to be subclassed. For example:
from flask_signalbus import rabbitmq class ExampleConsumer(rabbitmq.Consumer): def process_message(self, body, properties): if len(body) == 0: return False # Malformed (empty) message # Process the message here. return True # Successfully processed
-
init_app
(app)¶ Bind the instance to a Flask app object.
Parameters: app – A Flask app object
-
process_message
(body: bytes, properties: flask_signalbus.rabbitmq.common.MessageProperties) → bool¶ This method must be implemented by the sub-classes.
Parameters: - body – message body
- properties – message properties
The method should return
True
if the message has been successfully processed, and can be removed from the queue. IfFalse
is returned, this means that the message is malformed, and can not be processed. (Usually, malformed messages will be send to a “dead letter queue”.)
-
start
()¶ Opens a RabbitMQ connection and starts processing messages until one of the following things happen:
- The connection has been lost.
- An error has occurred during message processing.
- The
stop
method has been called on the consumer instance.
This method blocks and never returns normally. If one of the previous things happen an
TerminatedConsumtion
exception will be raised. Also, this method may raisepika.exceptions.AMQPError
when, for some reason, a proper RabbitMQ connection can not be established.
-
stop
(signum=None, frame=None)¶ Orders the consumer to stop.
This is useful for properly handling process termination. For example:
import signal consumer = Consumer(...) # creates the instance signal.signal(signal.SIGINT, consumer.stop) signal.signal(signal.SIGTERM, consumer.stop) consumer.start()
- app – Optional Flask app object. If not provided
Exceptions¶
-
class
flask_signalbus.rabbitmq.
DeliveryError
¶ Bases:
Exception
A failed attempt to deliver messages.
-
class
flask_signalbus.rabbitmq.
ConnectionError
¶ Bases:
flask_signalbus.rabbitmq.publisher.DeliveryError
Can not connect to the server.
-
class
flask_signalbus.rabbitmq.
TimeoutError
¶ Bases:
flask_signalbus.rabbitmq.publisher.DeliveryError
The attempt to deliver messages has timed out.