Add replay-safe logger support for orchestrations#129
Add replay-safe logger support for orchestrations#129andystaples wants to merge 4 commits intomicrosoft:mainfrom
Conversation
Adds ReplaySafeLogger class and OrchestrationContext.create_replay_safe_logger() so orchestrators can log without generating duplicate messages during replay. The logger wraps a standard logging.Logger and suppresses all log calls when the orchestrator is replaying from history. All standard log levels are supported: debug, info, warning, error, critical, and exception. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Rewrite ReplaySafeLogger as a logging.LoggerAdapter subclass with a single isEnabledFor() override instead of manual method delegation - Add log() and isEnabledFor() methods with tests - Add replay-safe logging section to docs/features.md - Add usage example in examples/activity_sequence.py - Add cross-package compatibility section to copilot-instructions.md
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds replay-safe logging for orchestrator functions to prevent duplicate log emission during orchestrator replay, aligning behavior with other SDKs.
Changes:
- Introduces
ReplaySafeLogger(logging.LoggerAdapter) andOrchestrationContext.create_replay_safe_logger(). - Adds tests validating suppression behavior across levels and APIs.
- Updates documentation and an example to demonstrate replay-safe logging usage.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
durabletask/task.py |
Adds ReplaySafeLogger and context helper to create replay-safe loggers. |
tests/durabletask/test_orchestration_executor.py |
Adds unit/integration tests for replay-safe logging behavior. |
docs/features.md |
Documents replay-safe logging, usage, and SDK differences. |
examples/activity_sequence.py |
Demonstrates replay-safe logging usage in an orchestrator example. |
.github/copilot-instructions.md |
Adds guidance to verify cross-package compatibility when changing core SDK APIs. |
| inner_logger.addHandler(_RecordingHandler()) | ||
|
|
||
| activity_name = "say_hello" | ||
|
|
||
| def say_hello(_, name: str) -> str: | ||
| return f"Hello, {name}!" | ||
|
|
||
| def orchestrator(ctx: task.OrchestrationContext, _): | ||
| replay_logger = ctx.create_replay_safe_logger(inner_logger) | ||
| replay_logger.info("Starting orchestration") | ||
| result = yield ctx.call_activity(say_hello, input="World") | ||
| replay_logger.info("Activity completed: %s", result) | ||
| return result | ||
|
|
||
| registry = worker._Registry() | ||
| activity_name = registry.add_activity(say_hello) | ||
| orchestrator_name = registry.add_orchestrator(orchestrator) | ||
|
|
||
| # First execution: starts the orchestration. The orchestrator runs without | ||
| # replay, so both log calls should be emitted. | ||
| new_events = [ | ||
| helpers.new_orchestrator_started_event(datetime.now()), | ||
| helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None), | ||
| ] | ||
| executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) | ||
| result = executor.execute(TEST_INSTANCE_ID, [], new_events) | ||
| assert result.actions # should have scheduled the activity | ||
|
|
||
| assert log_calls == ["Starting orchestration"] | ||
| log_calls.clear() | ||
|
|
||
| # Second execution: the orchestrator replays from history and then processes the | ||
| # activity completion. The "Starting orchestration" message is emitted during | ||
| # replay and should be suppressed; "Activity completed" is emitted after replay | ||
| # ends and should appear exactly once. | ||
| old_events = new_events + [ | ||
| helpers.new_task_scheduled_event(1, activity_name), | ||
| ] | ||
| encoded_output = json.dumps(say_hello(None, "World")) | ||
| new_events = [helpers.new_task_completed_event(1, encoded_output)] | ||
| executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) | ||
| result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) | ||
| complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions) | ||
| assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED | ||
|
|
||
| assert log_calls == ["Activity completed: Hello, World!"] |
There was a problem hiding this comment.
These tests attach handlers to globally-registered logging loggers but never detach them. Because logging.getLogger(...) returns a singleton per name, the handler (and its closure over log_calls) will remain attached after the test finishes, which can cause memory leaks and/or unexpected output if that logger is ever used later in the same test run. Consider setting inner_logger.propagate = False and removing the handler in a finally block (or using pytest’s caplog fixture) to keep tests isolated. This same pattern appears in the other newly-added replay-safe logger tests as well.
| inner_logger.addHandler(_RecordingHandler()) | |
| activity_name = "say_hello" | |
| def say_hello(_, name: str) -> str: | |
| return f"Hello, {name}!" | |
| def orchestrator(ctx: task.OrchestrationContext, _): | |
| replay_logger = ctx.create_replay_safe_logger(inner_logger) | |
| replay_logger.info("Starting orchestration") | |
| result = yield ctx.call_activity(say_hello, input="World") | |
| replay_logger.info("Activity completed: %s", result) | |
| return result | |
| registry = worker._Registry() | |
| activity_name = registry.add_activity(say_hello) | |
| orchestrator_name = registry.add_orchestrator(orchestrator) | |
| # First execution: starts the orchestration. The orchestrator runs without | |
| # replay, so both log calls should be emitted. | |
| new_events = [ | |
| helpers.new_orchestrator_started_event(datetime.now()), | |
| helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None), | |
| ] | |
| executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) | |
| result = executor.execute(TEST_INSTANCE_ID, [], new_events) | |
| assert result.actions # should have scheduled the activity | |
| assert log_calls == ["Starting orchestration"] | |
| log_calls.clear() | |
| # Second execution: the orchestrator replays from history and then processes the | |
| # activity completion. The "Starting orchestration" message is emitted during | |
| # replay and should be suppressed; "Activity completed" is emitted after replay | |
| # ends and should appear exactly once. | |
| old_events = new_events + [ | |
| helpers.new_task_scheduled_event(1, activity_name), | |
| ] | |
| encoded_output = json.dumps(say_hello(None, "World")) | |
| new_events = [helpers.new_task_completed_event(1, encoded_output)] | |
| executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) | |
| result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) | |
| complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions) | |
| assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED | |
| assert log_calls == ["Activity completed: Hello, World!"] | |
| handler = _RecordingHandler() | |
| original_propagate = inner_logger.propagate | |
| inner_logger.propagate = False | |
| inner_logger.addHandler(handler) | |
| try: | |
| activity_name = "say_hello" | |
| def say_hello(_, name: str) -> str: | |
| return f"Hello, {name}!" | |
| def orchestrator(ctx: task.OrchestrationContext, _): | |
| replay_logger = ctx.create_replay_safe_logger(inner_logger) | |
| replay_logger.info("Starting orchestration") | |
| result = yield ctx.call_activity(say_hello, input="World") | |
| replay_logger.info("Activity completed: %s", result) | |
| return result | |
| registry = worker._Registry() | |
| activity_name = registry.add_activity(say_hello) | |
| orchestrator_name = registry.add_orchestrator(orchestrator) | |
| # First execution: starts the orchestration. The orchestrator runs without | |
| # replay, so both log calls should be emitted. | |
| new_events = [ | |
| helpers.new_orchestrator_started_event(datetime.now()), | |
| helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None), | |
| ] | |
| executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) | |
| result = executor.execute(TEST_INSTANCE_ID, [], new_events) | |
| assert result.actions # should have scheduled the activity | |
| assert log_calls == ["Starting orchestration"] | |
| log_calls.clear() | |
| # Second execution: the orchestrator replays from history and then processes the | |
| # activity completion. The "Starting orchestration" message is emitted during | |
| # replay and should be suppressed; "Activity completed" is emitted after replay | |
| # ends and should appear exactly once. | |
| old_events = new_events + [ | |
| helpers.new_task_scheduled_event(1, activity_name), | |
| ] | |
| encoded_output = json.dumps(say_hello(None, "World")) | |
| new_events = [helpers.new_task_completed_event(1, encoded_output)] | |
| executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) | |
| result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) | |
| complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions) | |
| assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED | |
| assert log_calls == ["Activity completed: Hello, World!"] | |
| finally: | |
| inner_logger.removeHandler(handler) | |
| inner_logger.propagate = original_propagate |
| orchestrator_name = registry.add_orchestrator(orchestrator) | ||
|
|
||
| # First execution: starts the orchestration. The orchestrator runs without | ||
| # replay, so both log calls should be emitted. |
There was a problem hiding this comment.
The comment says "both log calls should be emitted" on the first execution, but the test asserts only \"Starting orchestration\" is logged in that pass (the second log happens after the activity result is delivered in the next execution). Please update the comment to match the actual control flow/assertions.
| # replay, so both log calls should be emitted. | |
| # replay, emits the initial log message, and then schedules the activity. |
docs/features.md
Outdated
|
|
||
| logger = logging.getLogger("my_orchestrator") | ||
|
|
||
| def my_orchestrator(ctx: task.OrchestrationContext, input): | ||
| replay_logger = ctx.create_replay_safe_logger(logger) | ||
| replay_logger.info("Starting orchestration %s", ctx.instance_id) | ||
| result = yield ctx.call_activity(my_activity, input=input) |
There was a problem hiding this comment.
The snippet uses task.OrchestrationContext but doesn’t show importing task, which makes the example non-copy-pasteable as written. Consider adding the missing import in the snippet (e.g., from durabletask import task if that’s the intended module) or removing the module-qualified type annotation in the example. Also, using input as a parameter name shadows Python’s built-in input(); renaming it (e.g., inp/payload) would reduce confusion.
| logger = logging.getLogger("my_orchestrator") | |
| def my_orchestrator(ctx: task.OrchestrationContext, input): | |
| replay_logger = ctx.create_replay_safe_logger(logger) | |
| replay_logger.info("Starting orchestration %s", ctx.instance_id) | |
| result = yield ctx.call_activity(my_activity, input=input) | |
| from durabletask import task | |
| logger = logging.getLogger("my_orchestrator") | |
| def my_orchestrator(ctx: task.OrchestrationContext, payload): | |
| replay_logger = ctx.create_replay_safe_logger(logger) | |
| replay_logger.info("Starting orchestration %s", ctx.instance_id) | |
| result = yield ctx.call_activity(my_activity, input=payload) |
Summary
Adds replay-safe logging support for orchestrations, bringing the Python SDK in line with the .NET and JavaScript SDKs. Orchestrator functions replay from the beginning on each new event, which causes log statements to fire repeatedly. A replay-safe logger suppresses log output during replay so that each message is emitted exactly once.
What's included
ReplaySafeLogger— alogging.LoggerAdaptersubclass that overridesisEnabledFor()to returnFalseduring replay. SinceLoggerAdapter.log()checksisEnabledFor()before forwarding, all log methods (debug,info,warning,error,critical,log) are automatically suppressed during replay with no manual delegation needed.OrchestrationContext.create_replay_safe_logger(logger)— a concrete method on theOrchestrationContextABC that wraps any standardlogging.Loggerin aReplaySafeLogger. Because it's defined on the base class, all context implementations (includingdurabletask-azuremanaged) inherit support automatically.5 unit/integration tests covering:
log()methodisEnabledFor()behavior (returnsFalseduring replay, delegates when live, respects inner logger level)Documentation in
docs/features.mdwith usage examples and notesUpdated example in
examples/activity_sequence.pyUsage
.NET vs Python API distinction
In the .NET SDK,
CreateReplaySafeLoggeraccepts a category name string rather than a logger instance:This works because .NET uses a dependency-injected
ILoggerFactoryto create the logger internally. Python'sloggingmodule has no equivalent DI pattern —logging.getLogger(name)is already the trivial global factory — so the Python API accepts aLoggerinstance directly.Proposal: string name overload
To improve ergonomics and provide a .NET-like experience, we could add support for also accepting a string name:
Implementation would be straightforward:
This would let users write
ctx.create_replay_safe_logger("my_orchestrator")without needing a separatelogging.getLogger()call, matching the .NET SDK's single-argument-string pattern. The existingLoggerinstance overload would continue to work unchanged.Feedback welcome on whether this overload is worth adding in this PR or as a follow-up.