Reduce Saga Storage Overhead
π Release notes: Reduce Saga Storage Overhead
π Summary
This release reduces saga storage load by introducing checkpoint commits: one session per saga run with explicit commits only at key points (after each step, after each compensation step, etc.) instead of committing after every storage call. Fewer commits, shorter lock hold time, and lower risk of deadlocks when using SQLAlchemy.
β¨ What's New
π― Checkpoint commits & SagaStorageRun protocol
- New protocol
SagaStorageRun(src/cqrs/saga/storage/protocol.py) β scoped βsessionβ for a single saga. Methods do not commit; the caller callscommit()at checkpoints. - New method
ISagaStorage.create_run()β returns a context manager that yields aSagaStorageRun. Storages that donβt support it can raiseNotImplementedErrorand execution falls back to the previous behaviour. SagaTransactionβ whencreate_run()is available, the saga runs in one session and commits only at checkpoints (after create + RUNNING, after each step, after each compensation step, at completion/failure). Storages withoutcreate_run()keep working as before.
π¦ Storage implementations
- Memory (
MemorySagaStorage) β implementscreate_run()via_MemorySagaStorageRun; commit/rollback are no-ops but the protocol matches SQLAlchemy. - SQLAlchemy (
SqlAlchemySagaStorage) β implementscreate_run()via_SqlAlchemySagaStorageRunwith oneAsyncSessionper run; mutations are committed only when the saga callsrun.commit().
π Deadlock mitigation
load_saga_state(..., read_for_update=True)β when loading for recovery or exclusive update, the row can be locked (e.g.SELECT ... FOR UPDATEin SQLAlchemy). Together with checkpoint commits, this shortens lock duration and reduces deadlock risk.
β»οΈ Compensation
SagaCompensatorβ new optionalon_after_compensate_stepcallback, invoked after each successfully compensated step. When using a run, the saga passesrun.commitso each compensation step is committed at a checkpoint.
π Documentation & types
- Docstrings added/updated for the storage protocol,
SagaStorageRun, memory and SQLAlchemy storage,SagaTransaction, execution and recovery managers, and compensator. βStrict Backward Recoveryβ and checkpoint behaviour are described in code.
π Fixes
- Deadlocks β checkpoint commits and
read_for_updatereduce long-held locks and concurrent-update conflicts.
π§ͺ Tests & infrastructure
Unit tests
- New
tests/unit/test_saga/test_saga_storage_run.pyβ testscreate_run()and checkpoint path (memory), and fallback when storage does not implementcreate_run().
Integration tests & databases
- PostgreSQL β integration tests now run against PostgreSQL (port 5433) in addition to MySQL.
- Test split β
test_saga_storage_sqlalchemy_postgres.py/_mysql.py,test_saga_mediator_sqlalchemy_postgres.py/_mysql.py; conftest/fixtures extended withDATABASE_DSN_POSTGRESQL.
π§ CI
- tests.yml β start and wait for PostgreSQL; set
DATABASE_DSN_POSTGRESQL; run tests with MySQL and PostgreSQL. - New workflow
codspeed.ymlβ run benchmarks via CodSpeed (MySQL, PostgreSQL, Redis;pytest tests/benchmarks/ --codspeed). - pytest-config.ini β added
DATABASE_DSN_POSTGRESQLto env.
π¦ Docker & dependencies
- docker-compose-test.yml β new service postgres_tests (PostgreSQL 15.4, port 5433).
- pyproject.toml β version 4.9.0; dev dependency pytest-codspeed==4.2.0 added.
β‘ Benchmarks
- Benchmarks in
tests/benchmarks/updated and extended (conftest, dataclasses/default for memory and SQLAlchemy) to measure the impact of the new storage behaviour.
β Compatibility
- Saga examples (
saga.py,saga_recovery.py,saga_sqlalchemy_storage.py,saga_fallback.py,saga_recovery_scheduler.py, etc.) updated to the current storage API and checkpoint path where applicable. - Backward compatibility β code using
ISagaStoragewithoutcreate_run()continues to work. The new path is used only when storage implementscreate_run()(Memory and SQLAlchemy do by default).
π Miscellaneous
- README.md β updated for structure and usage.
- Post-review: ruff format, pre-commit, and βbanchmarksβ β benchmarks rename.
π Summary of benefits
| β‘ | Fewer DB commits per saga run |
| π | Shorter lock hold time and lower deadlock risk |
| β | Backward compatible; custom storages can add create_run() like Memory/SQLAlchemy |