Skip to content

Reduce Saga Storage Overhead

Choose a tag to compare

@vadikko2 vadikko2 released this 20 Feb 11:46
· 10 commits to master since this release
37e723a

πŸš€ Release notes: Reduce Saga Storage Overhead

πŸ“‹ Summary

This release reduces saga storage load by introducing checkpoint commits: one session per saga run with explicit commits only at key points (after each step, after each compensation step, etc.) instead of committing after every storage call. Fewer commits, shorter lock hold time, and lower risk of deadlocks when using SQLAlchemy.


✨ What's New

🎯 Checkpoint commits & SagaStorageRun protocol

  • New protocol SagaStorageRun (src/cqrs/saga/storage/protocol.py) β€” scoped β€œsession” for a single saga. Methods do not commit; the caller calls commit() at checkpoints.
  • New method ISagaStorage.create_run() β€” returns a context manager that yields a SagaStorageRun. Storages that don’t support it can raise NotImplementedError and execution falls back to the previous behaviour.
  • SagaTransaction β€” when create_run() is available, the saga runs in one session and commits only at checkpoints (after create + RUNNING, after each step, after each compensation step, at completion/failure). Storages without create_run() keep working as before.

πŸ“¦ Storage implementations

  • Memory (MemorySagaStorage) β€” implements create_run() via _MemorySagaStorageRun; commit/rollback are no-ops but the protocol matches SQLAlchemy.
  • SQLAlchemy (SqlAlchemySagaStorage) β€” implements create_run() via _SqlAlchemySagaStorageRun with one AsyncSession per run; mutations are committed only when the saga calls run.commit().

πŸ”’ Deadlock mitigation

  • load_saga_state(..., read_for_update=True) β€” when loading for recovery or exclusive update, the row can be locked (e.g. SELECT ... FOR UPDATE in SQLAlchemy). Together with checkpoint commits, this shortens lock duration and reduces deadlock risk.

♻️ Compensation

  • SagaCompensator β€” new optional on_after_compensate_step callback, invoked after each successfully compensated step. When using a run, the saga passes run.commit so each compensation step is committed at a checkpoint.

πŸ“ Documentation & types

  • Docstrings added/updated for the storage protocol, SagaStorageRun, memory and SQLAlchemy storage, SagaTransaction, execution and recovery managers, and compensator. β€œStrict Backward Recovery” and checkpoint behaviour are described in code.

πŸ› Fixes

  • Deadlocks β€” checkpoint commits and read_for_update reduce long-held locks and concurrent-update conflicts.

πŸ§ͺ Tests & infrastructure

Unit tests

  • New tests/unit/test_saga/test_saga_storage_run.py β€” tests create_run() and checkpoint path (memory), and fallback when storage does not implement create_run().

Integration tests & databases

  • PostgreSQL β€” integration tests now run against PostgreSQL (port 5433) in addition to MySQL.
  • Test split β€” test_saga_storage_sqlalchemy_postgres.py / _mysql.py, test_saga_mediator_sqlalchemy_postgres.py / _mysql.py; conftest/fixtures extended with DATABASE_DSN_POSTGRESQL.

πŸ”§ CI

  • tests.yml β€” start and wait for PostgreSQL; set DATABASE_DSN_POSTGRESQL; run tests with MySQL and PostgreSQL.
  • New workflow codspeed.yml β€” run benchmarks via CodSpeed (MySQL, PostgreSQL, Redis; pytest tests/benchmarks/ --codspeed).
  • pytest-config.ini β€” added DATABASE_DSN_POSTGRESQL to env.

πŸ“¦ Docker & dependencies

  • docker-compose-test.yml β€” new service postgres_tests (PostgreSQL 15.4, port 5433).
  • pyproject.toml β€” version 4.9.0; dev dependency pytest-codspeed==4.2.0 added.

⚑ Benchmarks

  • Benchmarks in tests/benchmarks/ updated and extended (conftest, dataclasses/default for memory and SQLAlchemy) to measure the impact of the new storage behaviour.

βœ… Compatibility

  • Saga examples (saga.py, saga_recovery.py, saga_sqlalchemy_storage.py, saga_fallback.py, saga_recovery_scheduler.py, etc.) updated to the current storage API and checkpoint path where applicable.
  • Backward compatibility β€” code using ISagaStorage without create_run() continues to work. The new path is used only when storage implements create_run() (Memory and SQLAlchemy do by default).

πŸ“Œ Miscellaneous

  • README.md β€” updated for structure and usage.
  • Post-review: ruff format, pre-commit, and β€œbanchmarks” β†’ benchmarks rename.

πŸŽ‰ Summary of benefits

⚑ Fewer DB commits per saga run
πŸ”’ Shorter lock hold time and lower deadlock risk
βœ… Backward compatible; custom storages can add create_run() like Memory/SQLAlchemy