API Reference¶
Signal Model¶
A signal model is an otherwise normal database model class (a
subclass of db.Model), which however has a
send_signalbus_message method defined. For example:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy(app)
class MySignal(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
message_text = db.Column(db.Text, nullable=False)
signalbus_autoflush = False
signalbus_order_by = (id, db.desc(message_text))
def send_signalbus_message(self):
# Send the message to the message bus.
print(MySignal.__marshmallow_schema__.dumps(self))
The
send_signalbus_messagemethod should be implemented in such a way that when it returns, the message is guaranteed to be successfully sent and stored by the broker. Normally, this means that an acknowledge has been received for the message from the broker.The signal model class may have a
send_signalbus_messagesclass method which accepts one positional argument: an iterable of instances of the class. The method should be implemented in such a way that when it returns, all messages for the passed instances are guaranteed to be successfully sent and stored by the broker. Implementing asend_signalbus_messagesclass method can greatly improve performance, because message brokers are usually optimized to process messages in batches much more efficiently.The signal model class may have a
signalbus_burst_countinteger attribute defined, which determines how many individual signals can be sent and deleted at once, as a part of one database transaction. This can greatly improve performace in some cases when auto-flushing is disabled, especially when thesend_signalbus_messagesclass method is implemented efficiently. If not defined, it defaults to1.The signal model class may have a
signalbus_autoflushboolean attribute defined, which determines if signals of that type will be automatically sent over the message bus after each transaction commit. If not defined, it defaults toTrue.The signal model class may have a
signalbus_order_bytuple attribute defined, which determines the order in which signals will be send over the network by theflushorderedCLI command. If not defined, signals will not be ordered.Flask-SignalBus will automatically (after
sqlalchemy.orm.configure_mappers()is invoked) add a bunch of attributes on each signal model class. These are useful when serializing instances, before sending them over the network:__marshmallow__An auto-generated Marshmallow schema class for serializing and deserializing instances of the model class. It is a subclass of
marshmallow.Schema. This attribute will be automatically added to all model classes (signal or non-signal). If the__marshmallow__attribute happens to be defined in the model class, it will not be overridden.__marshmallow_schema__An instance of the class referred by the
__marshmallow__attribute.
Classes¶
-
class
flask_signalbus.SignalBus(db, init_app=True)¶ Instances of this class automatically send signal messages that have been recorded in the SQL database, over a message bus. Normally, the sending of the recorded messages (if there are any) is done after each transaction commit, but it also can be triggered explicitly by a command.
Parameters: db – The flask_sqlalchemy.SQLAlchemyinstanceFor example:
from flask_sqlalchemy import SQLAlchemy from flask_signalbus import SignalBus app = Flask(__name__) db = SQLAlchemy(app) signalbus = SignalBus(db) signalbus.flush()
-
autoflush¶ Setting this property to
Falseinstructs theSignalBusinstance to not automatically flush pending signals after each transaction commit. Setting it back toTruerestores the default behavior.
-
flush(models=None, wait=3.0)¶ Send pending signals over the message bus.
This method assumes that auto-flushing is enabled for the given signal types, and therefore the number of pending signals is not too big. Having multiple processes that run this method in parallel is generally not a good idea.
Parameters: - models (list(Signal Model) or
None) – If passed, flushes only signals of the specified types. - wait (float) – The number of seconds the method will wait after obtaining the list of pending signals, to allow auto-flushing senders to complete
Returns: The total number of signals that have been sent
- models (list(Signal Model) or
-
flushmany(models=None)¶ Send a potentially huge number of pending signals over the message bus.
This method assumes that the number of pending signals might be huge. Using
SignalBus.flushmanywhen auto-flushing is enabled for the given signal types is not recommended, because it may result in multiple delivery of messages.SignalBus.flushmanycan be very useful when recovering from long periods of disconnectedness from the message bus, or when auto-flushing is disabled. If your database (and its SQLAlchemy dialect) supportsFOR UPDATE SKIP LOCKED, multiple processes will be able to run this method in parallel, without stepping on each others’ toes.Parameters: models (list(Signal Model) or None) – If passed, flushes only signals of the specified types.Returns: The total number of signals that have been sent
-
flushordered(models=None)¶ Send all pending messages in predictable order.
The order is defined by the
signalbus_order_byattribute of the model class. When auto-flushing is disabled for the given signal types, this method guarantes that messages will be sent in the correct order. Having multiple processes that run this method in parallel is generally not a good idea.Parameters: models (list(Signal Model) or None) – If passed, flushes only signals of the specified types.Returns: The total number of signals that have been sent
-
get_signal_models()¶ Return all signal types in a list.
Return type: list(Signal Model)
-
Mixins¶
-
class
flask_signalbus.SignalBusMixin(*args, **kwargs)¶ A mixin class that can be used to extend
SQLAlchemyto handle signals.For example:
from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_signalbus import SignalBusMixin class CustomSQLAlchemy(SignalBusMixin, SQLAlchemy): pass app = Flask(__name__) db = CustomSQLAlchemy(app) db.signalbus.flush()
-
class
flask_signalbus.AtomicProceduresMixin¶ A mixin class that adds utility functions to
flask_sqlalchemy.SQLAlchemyand the declarative base.For example:
from flask_sqlalchemy import SQLAlchemy from flask_signalbus import AtomicProceduresMixin class CustomSQLAlchemy(AtomicProceduresMixin, SQLAlchemy): pass db = CustomSQLAlchemy() # Now `AtomicProceduresMixin` method are available in `db`.
Note that when subclassing,
AtomicProceduresMixinshould always come beforeflask_sqlalchemy.SQLAlchemy. AddingAtomicProceduresMixinhas several useful results:AtomicProceduresMixinmethod will be available indb.- The classmethods from
_ModelUtilitiesMixinwill be available in the declarative base class (db.Model), and therefore in every model class. - Database isolation level will be set to
REPEATABLE_READ.
-
atomic(func)¶ A decorator that wraps a function in an atomic block.
Example:
db = CustomSQLAlchemy() @db.atomic def f(): write_to_db('a message') return 'OK' assert f() == 'OK'
This code defines the function
f, which is wrapped in an atomic block. Wrapping a function in an atomic block gives several guarantees:- The database transaction will be automatically committed if the function returns normally, and automatically rolled back if the function raises unhandled exception.
- When the transaction is committed, all objects in
db.sessionwill be expunged. This means that no lazy loading will be performed on them. - If a transaction serialization error occurs during the execution of the function, the function will be re-executed. (It might be re-executed several times.)
Atomic blocks can be nested, but in this case the outermost block takes full control of transaction’s life-cycle, and inner blocks do nothing.
-
execute_atomic(func)¶ A decorator that executes a function in an atomic block (see
atomic()).Example:
db = CustomSQLAlchemy() @db.execute_atomic def result(): write_to_db('a message') return 'OK' assert result == 'OK'
This code defines and executes the function
resultin an atomic block. At the end, the nameresultholds the value returned from the function.
-
retry_on_integrity_error()¶ Re-raise
IntegrityErrorasDBSerializationError.This is mainly useful to handle race conditions in atomic blocks. For example, even if prior to a database INSERT we have verified that there is no existing row with the given primary key, we still may get an
IntegrityErrorif another transaction inserted a row with this primary key in the meantime. But if we do (within an atomic block):with db.retry_on_integrity_error(): db.session.add(instance)
then if the before-mentioned race condition occurs,
DBSerializationErrorwill be raised instead ofIntegrityError, so that the transaction will be retried (by the atomic block), and the second time our prior-to-INSERT check will correctly detect a primary key collision.Note:
retry_on_integrity_error()triggers a session flush.
-
class
flask_signalbus.atomic._ModelUtilitiesMixin¶ -
classmethod
get_instance(instance_or_pk, *options)¶ Return a model instance in
db.sessionorNone.Parameters: - instance_or_pk – An instance of this model class, or a primary key. A composite primary key can be passed as a tuple.
- options – Arguments to be passed to
options().
Example:
@db.atomic def increase_account_balance(account, amount): # Here `Account` is a subclass of `db.Model`. account = Account.get_instance(account) account.balance += amount # Now `increase_account_balance` can be # called with an account instance: increase_account_balance(my_account, 100.00) # or with an account primary key (1234): increase_account_balance(1234, 100.00)
-
classmethod
get_pk_values(instance_or_pk)¶ Return a primary key as a tuple.
Parameters: instance_or_pk – An instance of this model class, or a primary key. A composite primary key can be passed as a tuple.
-
classmethod
lock_instance(instance_or_pk, *options, **kw)¶ Return a locked model instance in
db.sessionorNone.Parameters: - instance_or_pk – An instance of this model class, or a primary key. A composite primary key can be passed as a tuple.
- options – Arguments to be passed to
options(). - kw – Arguments to be passed to
with_for_update().
-
classmethod