Skip to content

feat: durable execution (suspend/resume + ctx.durable journal)#786

Open
andreahlert wants to merge 57 commits into
apache:mainfrom
andreahlert:worktree-durable-execution
Open

feat: durable execution (suspend/resume + ctx.durable journal)#786
andreahlert wants to merge 57 commits into
apache:mainfrom
andreahlert:worktree-durable-execution

Conversation

@andreahlert
Copy link
Copy Markdown
Collaborator

Summary

Adds durable execution to Burr, unifying two approaches in one design:

  • A: action-boundary suspend/resume via __context.suspend(channel, schema=, metadata=). The signal propagates through _Suspended(BaseException), is persisted as a SuspensionRecord, and replayed by resume(...) / aresume(...).
  • B: __context.durable(key, fn, ...) / __context.adurable(...) sub-step journal. Memoizes side effects across re-runs, with DeterminismError raised fail-loud on mismatch.

Closes the gap where Burr could not natively pause for human input, external events, or crash recovery without manual state-fork workarounds.

What's new

  • burr/core/durable.py_Suspended, SuspensionRecord, JournalEntry, DeterminismError, supports_durable_storage().
  • burr/core/resume.pyresume() and aresume() rebuild the app from graph + persister, deliver the payload, run to the next halt.
  • ApplicationContext.suspend / durable / adurable + _handle_suspension / _ahandle_suspension in the sync and async run loops.
  • 5 optional persister methods (save_suspension, load_suspension, save_journal_entry, load_journal, mark_suspension_resolved) with a bool contract on mark_*.
  • First-party persister overrides with dedicated tables/collections: in-memory, SQLite, psycopg2 (PostgreSQL), asyncpg, aiosqlite, redis (sync + async), pymongo. Deprecated shims inherit transparently.
  • Lifecycle hooks: PostActionSuspendHook + async, PreActionResumeHook + async. LocalTrackingClient emits SuspendEntryModel so the Burr UI can render status="suspended".
  • Persister status Literal["completed", "failed", "suspended"] propagated across the persistence layer.
  • Example at examples/durable-execution/ (runnable HITL workflow with notebook + statemachine.png) and concept docs at docs/concepts/durable-execution.rst.

Test plan

  • BURR_CI_INTEGRATION_TESTS=true POSTGRES_PORT=… REDIS_DB=… pytest tests/core/test_durable.py tests/core/test_durable_integration.py tests/core/test_durable_persisters.py90 passed with real Postgres, asyncpg, redis sync+async, pymongo, mongo shim backends.
  • pytest tests/core/ tests/lifecycle/ tests/tracking/ tests/integrations/persisters/ --ignore=tests/core/test_graphviz_display.py425 passed, no regressions. The 25 skipped are env-gated persister tests.
  • End-to-end real-LLM test with ollama (qwen2.5:1.5b) drives the HITL example, simulates a crash by deleting the in-memory app + persister, resumes from SQLite, and asserts the LLM is invoked exactly twice end-to-end — proving the journal prevents side-effect re-fire on replay.
  • Resume idempotency verified: second resume() call on the same suspension is a no-op (no LLM call, final state unchanged).
  • python examples/validate_examples.py accepts the new example directory.

Determinism contract (from the design)

  • key stable across re-runs per call site.
  • Same order, same set of durable calls across re-runs.
  • No durable behind a non-deterministic branch.
  • No suspend inside a durable fn.
  • Mismatch raises DeterminismError (fail-loud).

Notes

  • _Suspended inherits from BaseException — do not wrap __context.suspend() inside asyncio.shield(...) or task-cancellation guards that catch BaseException.
  • Custom persisters work through the in-state fallback (correct, not optimized for resume-once at concurrent scale).

…llback

Adds five durable-storage methods (save_suspension, load_suspension,
save_journal_entry, load_journal, mark_suspension_resolved) to
BaseStatePersister and AsyncBaseStatePersister with NotImplementedError
defaults, a real override on InMemoryPersister for tests, the
supports_durable_storage() capability helper, and an in-state fallback
codec in durable.py for persisters that do not override the methods.
Add docstring warnings about dataclasses.asdict not round-tripping nested
types in the in-state codec, document the all-or-nothing override contract
on supports_durable_storage, replace string annotations with real imports
in persistence.py (no circular import), and strengthen the
NotImplementedError test to call all 5 durable methods with real arguments.
Add four unit tests covering the in-state fallback codec functions
(suspension and journal round-trips, channel mismatch, JSON result
preservation). Correct the misleading load_suspension docstring on
BaseStatePersister and AsyncBaseStatePersister to reflect that the
method returns resolved and unresolved records alike. Add type
annotations to the five durable methods on InMemoryPersister to
match the base-class signatures.
…_journal return type

Add tests for mark_suspension_resolved (flag flip and unknown-id no-op), journal
insertion-order sorting, and tighten load_journal return annotation to list[JournalEntry]
on all three sites (requires-python >=3.9).
Docstring now accurately describes the dict-only coercion behavior. The
_context_factory method uses direct attribute access for all three durable
fields instead of mixing getattr with direct access. A comment marks the
intentional omission of _journal_call_index. New test covers schema_json
population on the first suspend call.
Wrap _handle_suspension calls in _step and _astep so that persister or
hook failures clear suspended_signal and fire post_run_step with the
real exception instead of falsely reporting a clean suspension. Also use
self._state in _astep's non-suspended finally branch to pick up state
mutations from delegated sync actions. Strengthen async suspension test
to assert persistence round-trip parity with the sync counterpart.
Remove the direct persister.save call inside _handle_suspension for the
in-state fallback branch. The post_run_step lifecycle hook (PersisterHook)
already saves the step row for every suspended step, so the inline save
was writing the same (partition_key, app_id, sequence_id, position) row
twice, causing an IntegrityError in SQLitePersister due to its UNIQUE
constraint. Remove the _UpsertSQLitePersister workaround subclass from
the test and use bare SQLitePersister directly to confirm the fix.
…jsonschema

- Remove dead `record.resolved = True` mutation in in-state fallback path of
  resume() and aresume(); replace with comment naming the no-durability rule.
- Expand docstrings on resume() and aresume() to distinguish durable-storage
  idempotency (no-op) from in-state fallback behavior (second call raises).
- Tighten no-record ValueError message to name the in-state fallback cause,
  distinguishing it from a wrong app_id.
- _validate_payload now emits a warnings.warn instead of silently skipping
  when jsonschema is absent; import warnings moved to module level.
- Add M5 deferral comment in application._handle_suspension.
- Add test_resume_in_state_fallback_second_call_raises to integration suite.
…aresume

Guard aresume load_suspension call with supports_durable_storage check,
mirroring the existing guard used for journal loading in the same function.
Without the guard, async persisters that do not override load_suspension
raised NotImplementedError instead of falling through to _load_suspension.
Also raise warnings.warn stacklevel from 2 to 3 in _validate_payload so
the warning points at the caller of resume/aresume, not the internal helper.
Replaces the silent broken path (TypeError: coroutine object is not
subscriptable) with an explicit NotImplementedError when aresume() is
called with an async persister that does not implement durable storage.
Updates the aresume() docstring to accurately describe the async/sync
paths and their idempotency guarantees. Adds a test to assert the guard.
…State

aresume() now raises NotImplementedError for any async persister upfront,
removing unreachable dead branches. Both resume() and aresume() return the
loaded State object directly instead of wrapping it in State() again.
Add test_journal_no_double_count_via_stream_result to verify that
step_a's journal entry is not duplicated when stream_result() fast-
forwards through a non-halt_after action then executes the target
non-streaming action directly.  Reverting the self._journal_sink = []
reset at line 1744 of application.py causes this test to observe 3
journal entries (a_calc, a_calc, b_calc) instead of the correct 2.
The pg_persister fixture was hardcoded to localhost:5432, which made
it impossible to run against a Postgres on a non-default port without
editing the file. Honor POSTGRES_HOST/PORT/USER/PASSWORD/DB env vars
(with the previous values as defaults), so CI and local Docker setups
both work.

Add a tiny test that confirms the deprecated postgresql.py shim
inherits durable-storage support from the canonical b_psycopg2
persister without re-declaring methods.
Spec-compliance pass left a few quality gaps in the Postgres durable
methods: parameter type hints were stripped, return types were loose
('list' vs 'list[JournalEntry]'), and 'serde', 'json',
'SuspensionRecord' and 'JournalEntry' were re-imported inside every
method body even though no circular import constraint requires it.

Lift the imports to module top, tighten signatures to match the SQLite
reference, and drop a misleading F401 type-reference comment that
never matched a real annotation. Also drop the persister's state table
in the test fixture teardown so future state-table writes can't leak
between runs.
Remove the NotImplementedError guard for async persisters and add
_aload_suspension, _aload_journal, _arebuild async helpers that handle
all four combos (durable/non-durable x async/sync). aresume() now
awaits async persister calls and branches to sync calls for sync
persisters throughout the load/journal/rebuild/mark-resolved path.
Remove the temporary try/except ValueError guards around post_action_suspend
in _handle_suspension and _ahandle_suspension now that the hooks are
registered. Extend resume()/aresume() with an optional hooks parameter,
thread it through _rebuild/_arebuild, and fire pre_action_resume before
re-running the action. Covers sync post_suspend, sync pre_resume and async
pre_resume with three new tests.
Adds SuspendEntryModel to the tracking models and implements
PostActionSuspendHook on SyncTrackingClient so that a suspend_entry
line is written to the JSONL log whenever an action suspends the run,
enabling the Burr UI to render the suspension status.
Adds examples/durable-execution/ with a draft-review-finalize workflow
demonstrating suspend/resume and durable() journaling. Includes
application.py, notebook.ipynb, README.md, requirements.txt, __init__.py,
and a real statemachine.png generated by graphviz. Extends
test_durable_integration.py with test_example_application_suspends_and_resumes
which loads the example module and exercises the full suspend/resume path
against a tmp_path SQLite DB.
@github-actions github-actions Bot added area/core Application, State, Graph, Actions area/storage Persisters, state storage area/hooks Lifecycle hooks, interceptors area/tracking Telemetry, tracing, OpenTelemetry area/integrations External integrations (LLMs, frameworks) area/website burr.apache.org website area/examples Relates to /examples pr/needs-rebase Conflicts with main labels May 23, 2026
@andreahlert andreahlert self-assigned this May 23, 2026
@vaquarkhan
Copy link
Copy Markdown
Contributor

Hey @andreahlert ,I have spent some time going through this properly. I checked out the branch at ececa4b, built it, ran the suite, and wrote a few throwaway scripts to confirm the behavior rather than just eyeballing the diff.

Overall it's good work. The suspend/resume flow is clean, the DeterminismError guard is a smart safety net, and the mark_suspension_resolved on each persister is already atomic. I did find a handful of things worth fixing. I've noted how I confirmed each one so you can repro. Everything below is against ececa4b.

1. resume never uses the atomic claim (blocking)
burr/core/resume.py, resume() L157-184, same in aresume() L230-263.

resume() reads record.resolved at L157, runs the action at L179, then calls mark_suspension_resolved() at L184 and throws away the return. So two resumes racing (webhook retry, two workers on the same queue) both see resolved=False and both run the post-suspension action.

Every persister already returns the right thing for this:

SQLite: UPDATE ... WHERE suspension_id=? AND resolved=0, returns rowcount > 0
Postgres: WHERE suspension_id=%s AND resolved=false, returns rowcount > 0
pymongo: update_one({"_id": id, "resolved": False}, ...), returns modified_count == 1
redis: SETNX

The claim is built and correct everywhere. resume just marks it after the run and ignores whether it won. It should claim before the run.

Note: I read the code and confirmed the order plus that resume runs the action. I did not run two workers concurrently, so the race is from the code, not observed under load.

Fix, claim before running:

_validate_payload(record.schema_json, payload)

if supports_durable_storage(persister):
    if not persister.mark_suspension_resolved(record.suspension_id):
        loaded = persister.load(partition_key, app_id)
        return loaded["state"] if loaded else State(record.state)

app = _rebuild(...)
# deliver payload, then app.run(halt_after=[])
# drop the mark_suspension_resolved() at L184
Same in aresume(). No persister changes.

2. durable() doesn't replay across a crash, only across resume() (blocking)
burr/core/resume.py L165 (resume()) and L241 (aresume()), the only places _loaded_journal gets real data.

_loaded_journal starts empty and only gets filled in resume/aresume. The normal crash path, initialize_from(..., resume_at_next_action=True), never touches it. So a process that dies mid-action and recovers the usual way re-runs every durable() sub-step. That's the cost the journal is supposed to kill.

Repro: two-action graph, crash the second action after its first durable() ran and journaled, recover in a fresh process via initialize_from:

loaded_journal on recovery:          []
manual load_journal(pk, app, seq):   [('step_a', 'A')]   # row is there, key matches
step_a re-fired on recovery?         True

The row is saved and the key lines up (I checked the sequence_id, it matches, my first guess that it'd be off was wrong). Runtime just never loads it outside resume().

Fix: load the journal on the recovery path too when the persister is durable:

if supports_durable_storage(persister):
    app._loaded_journal = persister.load_journal(partition_key, app_id, sequence_id)
Plus a test for exactly-once across initialize_from with no suspend/resume.

If the intent is that durable() only covers deliberate suspend/resume and not crashes, fine, but then the docs shouldn't call it crash recovery.

3. in-state journal grows and gets rewritten every step
burr/core/application.py, _step() L1064-1066 and _astep() L1358-1360.

merged = read_journal_from_state(self._state) + self._journal_sink
new_state = write_journal_into_state(new_state, merged)

Never cleared after an action finishes, so burr_durable.journal piles up for the whole run and the whole blob gets rewritten every step. N sub-steps means N-squared writes. Bad with large LLM state.

(The commit named "flush in-state journal on action completion" is the one that adds this merge-and-grow. It doesn't flush.)

Repro, three actions each with one durable() call, in-state persister:

after a1: journal has 1 -> ['k_a']
after a2: journal has 2 -> ['k_a', 'k_b']
after a3: journal has 3 -> ['k_a', 'k_b', 'k_c']
Fix: drop burr_durable.journal on successful completion, keep only what belongs to the suspended action.

4. partial persister override treated as fully durable
burr/core/durable.py, supports_durable_storage() L95-113.

Detection only checks save_suspension (L113). Override save_suspension but not save_journal_entry and this returns True, then you hit NotImplementedError at journal-write time, after the expensive sub-step already ran. Docstring admits it. Bad edge for anyone writing the custom DynamoDB/S3 persisters this is built for.

Repro, persister overriding only save_suspension:

supports_durable_storage(partial) = True
partial.save_journal_entry(...)   -> NotImplementedError

Fix: check all five methods (save_suspension, load_suspension, save_journal_entry, load_journal, mark_suspension_resolved) at register/build time and fail clearly on partial override.

5. suspend(schema=...) skips validation for non-Pydantic types
burr/core/application.py, suspend() L617-623.

schema_json only computed if the schema has model_json_schema (L617-618). Pass a dataclass or TypedDict and it stays None, so _validate_payload does nothing and nobody is told. Confirmed a dataclass has no model_json_schema.

Fix: require a Pydantic model with a clear TypeError otherwise, or support dataclass/TypedDict, or at minimum warn so it isn't silent.

6. two shim tests fail instead of skip without the DB driver (minor)
tests/core/test_durable_persisters.py L544 and L559.

The test bodies import the persister module which does a plain import pymongo / import psycopg2. No driver means ModuleNotFoundError raised inside the test, so pytest marks them FAILED instead of skipping. Every other DB test in the file skips. Confirmed: ModuleNotFoundError: No module named 'psycopg2', reported as 1 failed.

Fix: pytest.importorskip("pymongo") / pytest.importorskip("psycopg2").

That's everything I caught. To be upfront about #1: I'm confident the race is real from reading the code, but I didn't actually run two workers against the same suspension to watch it happen, so treat that one as "proven on paper, not under load." Everything else I reproduced directly.

My take is that #1 and #2 are the ones to sort before this lands, since together they're what make the exactly-once story actually hold up under concurrency and after a crash.

The rest can follow whenever. Happy to send a small PR for #1 and #2 if that's easier than describing it, neither needs any persister changes. Either way, thanks for putting this together, it's a genuinely useful feature.

@andreahlert
Copy link
Copy Markdown
Collaborator Author

Thanks for reviewing it @vaquarkhan. We are having a discution about this implementation on Dev list. Could you take a look and give your opinion about it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/core Application, State, Graph, Actions area/examples Relates to /examples area/hooks Lifecycle hooks, interceptors area/integrations External integrations (LLMs, frameworks) area/storage Persisters, state storage area/tracking Telemetry, tracing, OpenTelemetry area/website burr.apache.org website pr/needs-rebase Conflicts with main

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants