Skip to content

Add replay-safe logger support for orchestrations#129

Open
andystaples wants to merge 4 commits intomicrosoft:mainfrom
andystaples:andystaples/add-replay-safe-logging
Open

Add replay-safe logger support for orchestrations#129
andystaples wants to merge 4 commits intomicrosoft:mainfrom
andystaples:andystaples/add-replay-safe-logging

Conversation

@andystaples
Copy link
Copy Markdown
Contributor

Summary

Adds replay-safe logging support for orchestrations, bringing the Python SDK in line with the .NET and JavaScript SDKs. Orchestrator functions replay from the beginning on each new event, which causes log statements to fire repeatedly. A replay-safe logger suppresses log output during replay so that each message is emitted exactly once.

What's included

  • ReplaySafeLogger — a logging.LoggerAdapter subclass that overrides isEnabledFor() to return False during replay. Since LoggerAdapter.log() checks isEnabledFor() before forwarding, all log methods (debug, info, warning, error, critical, log) are automatically suppressed during replay with no manual delegation needed.

  • OrchestrationContext.create_replay_safe_logger(logger) — a concrete method on the OrchestrationContext ABC that wraps any standard logging.Logger in a ReplaySafeLogger. Because it's defined on the base class, all context implementations (including durabletask-azuremanaged) inherit support automatically.

  • 5 unit/integration tests covering:

    • Log suppression during replay (integration with activity replay)
    • All log levels (DEBUG through CRITICAL)
    • Direct unit test with a mock context
    • The generic log() method
    • isEnabledFor() behavior (returns False during replay, delegates when live, respects inner logger level)
  • Documentation in docs/features.md with usage examples and notes

  • Updated example in examples/activity_sequence.py

Usage

import logging

logger = logging.getLogger("my_orchestrator")

def my_orchestrator(ctx: task.OrchestrationContext, _):
    replay_logger = ctx.create_replay_safe_logger(logger)
    replay_logger.info("This only logs once, not on every replay")
    result = yield ctx.call_activity(my_activity, input="hello")
    replay_logger.info(f"Activity returned: {result}")
    return result

.NET vs Python API distinction

In the .NET SDK, CreateReplaySafeLogger accepts a category name string rather than a logger instance:

var logger = ctx.CreateReplaySafeLogger("MyOrchestrator");

This works because .NET uses a dependency-injected ILoggerFactory to create the logger internally. Python's logging module has no equivalent DI pattern — logging.getLogger(name) is already the trivial global factory — so the Python API accepts a Logger instance directly.

Proposal: string name overload

To improve ergonomics and provide a .NET-like experience, we could add support for also accepting a string name:

# Current API (Logger instance)
replay_logger = ctx.create_replay_safe_logger(my_logger)

# Proposed: also accept a string name (.NET style)
replay_logger = ctx.create_replay_safe_logger("my_orchestrator")

Implementation would be straightforward:

def create_replay_safe_logger(
    self, logger: Union[logging.Logger, str]
) -> ReplaySafeLogger:
    if isinstance(logger, str):
        logger = logging.getLogger(logger)
    return ReplaySafeLogger(logger, lambda: self.is_replaying)

This would let users write ctx.create_replay_safe_logger("my_orchestrator") without needing a separate logging.getLogger() call, matching the .NET SDK's single-argument-string pattern. The existing Logger instance overload would continue to work unchanged.

Feedback welcome on whether this overload is worth adding in this PR or as a follow-up.

andystaples and others added 2 commits April 8, 2026 11:19
Adds ReplaySafeLogger class and OrchestrationContext.create_replay_safe_logger()
so orchestrators can log without generating duplicate messages during replay.

The logger wraps a standard logging.Logger and suppresses all log calls
when the orchestrator is replaying from history. All standard log levels
are supported: debug, info, warning, error, critical, and exception.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Rewrite ReplaySafeLogger as a logging.LoggerAdapter subclass with a
  single isEnabledFor() override instead of manual method delegation
- Add log() and isEnabledFor() methods with tests
- Add replay-safe logging section to docs/features.md
- Add usage example in examples/activity_sequence.py
- Add cross-package compatibility section to copilot-instructions.md
Copilot AI review requested due to automatic review settings April 8, 2026 17:46
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds replay-safe logging for orchestrator functions to prevent duplicate log emission during orchestrator replay, aligning behavior with other SDKs.

Changes:

  • Introduces ReplaySafeLogger (logging.LoggerAdapter) and OrchestrationContext.create_replay_safe_logger().
  • Adds tests validating suppression behavior across levels and APIs.
  • Updates documentation and an example to demonstrate replay-safe logging usage.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
durabletask/task.py Adds ReplaySafeLogger and context helper to create replay-safe loggers.
tests/durabletask/test_orchestration_executor.py Adds unit/integration tests for replay-safe logging behavior.
docs/features.md Documents replay-safe logging, usage, and SDK differences.
examples/activity_sequence.py Demonstrates replay-safe logging usage in an orchestrator example.
.github/copilot-instructions.md Adds guidance to verify cross-package compatibility when changing core SDK APIs.

Comment on lines +1514 to +1559
inner_logger.addHandler(_RecordingHandler())

activity_name = "say_hello"

def say_hello(_, name: str) -> str:
return f"Hello, {name}!"

def orchestrator(ctx: task.OrchestrationContext, _):
replay_logger = ctx.create_replay_safe_logger(inner_logger)
replay_logger.info("Starting orchestration")
result = yield ctx.call_activity(say_hello, input="World")
replay_logger.info("Activity completed: %s", result)
return result

registry = worker._Registry()
activity_name = registry.add_activity(say_hello)
orchestrator_name = registry.add_orchestrator(orchestrator)

# First execution: starts the orchestration. The orchestrator runs without
# replay, so both log calls should be emitted.
new_events = [
helpers.new_orchestrator_started_event(datetime.now()),
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
assert result.actions # should have scheduled the activity

assert log_calls == ["Starting orchestration"]
log_calls.clear()

# Second execution: the orchestrator replays from history and then processes the
# activity completion. The "Starting orchestration" message is emitted during
# replay and should be suppressed; "Activity completed" is emitted after replay
# ends and should appear exactly once.
old_events = new_events + [
helpers.new_task_scheduled_event(1, activity_name),
]
encoded_output = json.dumps(say_hello(None, "World"))
new_events = [helpers.new_task_completed_event(1, encoded_output)]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED

assert log_calls == ["Activity completed: Hello, World!"]
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests attach handlers to globally-registered logging loggers but never detach them. Because logging.getLogger(...) returns a singleton per name, the handler (and its closure over log_calls) will remain attached after the test finishes, which can cause memory leaks and/or unexpected output if that logger is ever used later in the same test run. Consider setting inner_logger.propagate = False and removing the handler in a finally block (or using pytest’s caplog fixture) to keep tests isolated. This same pattern appears in the other newly-added replay-safe logger tests as well.

Suggested change
inner_logger.addHandler(_RecordingHandler())
activity_name = "say_hello"
def say_hello(_, name: str) -> str:
return f"Hello, {name}!"
def orchestrator(ctx: task.OrchestrationContext, _):
replay_logger = ctx.create_replay_safe_logger(inner_logger)
replay_logger.info("Starting orchestration")
result = yield ctx.call_activity(say_hello, input="World")
replay_logger.info("Activity completed: %s", result)
return result
registry = worker._Registry()
activity_name = registry.add_activity(say_hello)
orchestrator_name = registry.add_orchestrator(orchestrator)
# First execution: starts the orchestration. The orchestrator runs without
# replay, so both log calls should be emitted.
new_events = [
helpers.new_orchestrator_started_event(datetime.now()),
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
assert result.actions # should have scheduled the activity
assert log_calls == ["Starting orchestration"]
log_calls.clear()
# Second execution: the orchestrator replays from history and then processes the
# activity completion. The "Starting orchestration" message is emitted during
# replay and should be suppressed; "Activity completed" is emitted after replay
# ends and should appear exactly once.
old_events = new_events + [
helpers.new_task_scheduled_event(1, activity_name),
]
encoded_output = json.dumps(say_hello(None, "World"))
new_events = [helpers.new_task_completed_event(1, encoded_output)]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
assert log_calls == ["Activity completed: Hello, World!"]
handler = _RecordingHandler()
original_propagate = inner_logger.propagate
inner_logger.propagate = False
inner_logger.addHandler(handler)
try:
activity_name = "say_hello"
def say_hello(_, name: str) -> str:
return f"Hello, {name}!"
def orchestrator(ctx: task.OrchestrationContext, _):
replay_logger = ctx.create_replay_safe_logger(inner_logger)
replay_logger.info("Starting orchestration")
result = yield ctx.call_activity(say_hello, input="World")
replay_logger.info("Activity completed: %s", result)
return result
registry = worker._Registry()
activity_name = registry.add_activity(say_hello)
orchestrator_name = registry.add_orchestrator(orchestrator)
# First execution: starts the orchestration. The orchestrator runs without
# replay, so both log calls should be emitted.
new_events = [
helpers.new_orchestrator_started_event(datetime.now()),
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
assert result.actions # should have scheduled the activity
assert log_calls == ["Starting orchestration"]
log_calls.clear()
# Second execution: the orchestrator replays from history and then processes the
# activity completion. The "Starting orchestration" message is emitted during
# replay and should be suppressed; "Activity completed" is emitted after replay
# ends and should appear exactly once.
old_events = new_events + [
helpers.new_task_scheduled_event(1, activity_name),
]
encoded_output = json.dumps(say_hello(None, "World"))
new_events = [helpers.new_task_completed_event(1, encoded_output)]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
assert log_calls == ["Activity completed: Hello, World!"]
finally:
inner_logger.removeHandler(handler)
inner_logger.propagate = original_propagate

Copilot uses AI. Check for mistakes.
orchestrator_name = registry.add_orchestrator(orchestrator)

# First execution: starts the orchestration. The orchestrator runs without
# replay, so both log calls should be emitted.
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "both log calls should be emitted" on the first execution, but the test asserts only \"Starting orchestration\" is logged in that pass (the second log happens after the activity result is delivered in the next execution). Please update the comment to match the actual control flow/assertions.

Suggested change
# replay, so both log calls should be emitted.
# replay, emits the initial log message, and then schedules the activity.

Copilot uses AI. Check for mistakes.
docs/features.md Outdated
Comment on lines +165 to +171

logger = logging.getLogger("my_orchestrator")

def my_orchestrator(ctx: task.OrchestrationContext, input):
replay_logger = ctx.create_replay_safe_logger(logger)
replay_logger.info("Starting orchestration %s", ctx.instance_id)
result = yield ctx.call_activity(my_activity, input=input)
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snippet uses task.OrchestrationContext but doesn’t show importing task, which makes the example non-copy-pasteable as written. Consider adding the missing import in the snippet (e.g., from durabletask import task if that’s the intended module) or removing the module-qualified type annotation in the example. Also, using input as a parameter name shadows Python’s built-in input(); renaming it (e.g., inp/payload) would reduce confusion.

Suggested change
logger = logging.getLogger("my_orchestrator")
def my_orchestrator(ctx: task.OrchestrationContext, input):
replay_logger = ctx.create_replay_safe_logger(logger)
replay_logger.info("Starting orchestration %s", ctx.instance_id)
result = yield ctx.call_activity(my_activity, input=input)
from durabletask import task
logger = logging.getLogger("my_orchestrator")
def my_orchestrator(ctx: task.OrchestrationContext, payload):
replay_logger = ctx.create_replay_safe_logger(logger)
replay_logger.info("Starting orchestration %s", ctx.instance_id)
result = yield ctx.call_activity(my_activity, input=payload)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants