From 0cbcf48958bd70328c5231374b5ce849eedd1bbc Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Wed, 8 Apr 2026 11:19:07 -0600 Subject: [PATCH 1/3] Add replay-safe logger for orchestrations Adds ReplaySafeLogger class and OrchestrationContext.create_replay_safe_logger() so orchestrators can log without generating duplicate messages during replay. The logger wraps a standard logging.Logger and suppresses all log calls when the orchestrator is replaying from history. All standard log levels are supported: debug, info, warning, error, critical, and exception. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- durabletask/task.py | 69 +++++++++++ .../test_orchestration_executor.py | 117 ++++++++++++++++++ 2 files changed, 186 insertions(+) diff --git a/durabletask/task.py b/durabletask/task.py index 2375f124..cd6172a3 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -4,6 +4,7 @@ # See https://peps.python.org/pep-0563/ from __future__ import annotations +import logging import math from abc import ABC, abstractmethod from datetime import datetime, timedelta, timezone @@ -279,6 +280,74 @@ def new_uuid(self) -> str: def _exit_critical_section(self) -> None: pass + def create_replay_safe_logger(self, logger: logging.Logger) -> ReplaySafeLogger: + """Create a replay-safe logger that suppresses log messages during orchestration replay. + + The returned logger wraps the provided logger and only emits log messages when + the orchestrator is not replaying. This prevents duplicate log messages from + appearing as a side effect of orchestration replay. + + Parameters + ---------- + logger : logging.Logger + The underlying logger to wrap. + + Returns + ------- + ReplaySafeLogger + A logger that only emits log messages when the orchestrator is not replaying. + """ + return ReplaySafeLogger(logger, lambda: self.is_replaying) + + +class ReplaySafeLogger: + """A logger wrapper that suppresses log messages during orchestration replay. + + This class wraps a standard :class:`logging.Logger` and only emits log + messages when the orchestrator is *not* replaying. Use this to avoid + duplicate log entries that would otherwise appear every time the + orchestrator replays its history. + + Obtain an instance by calling :meth:`OrchestrationContext.create_replay_safe_logger`. + """ + + def __init__(self, logger: logging.Logger, is_replaying: Callable[[], bool]) -> None: + self._logger = logger + self._is_replaying = is_replaying + + def _should_log(self) -> bool: + return not self._is_replaying() + + def debug(self, msg: str, *args: Any, **kwargs: Any) -> None: + """Log a DEBUG-level message if the orchestrator is not replaying.""" + if self._should_log(): + self._logger.debug(msg, *args, **kwargs) + + def info(self, msg: str, *args: Any, **kwargs: Any) -> None: + """Log an INFO-level message if the orchestrator is not replaying.""" + if self._should_log(): + self._logger.info(msg, *args, **kwargs) + + def warning(self, msg: str, *args: Any, **kwargs: Any) -> None: + """Log a WARNING-level message if the orchestrator is not replaying.""" + if self._should_log(): + self._logger.warning(msg, *args, **kwargs) + + def error(self, msg: str, *args: Any, **kwargs: Any) -> None: + """Log an ERROR-level message if the orchestrator is not replaying.""" + if self._should_log(): + self._logger.error(msg, *args, **kwargs) + + def critical(self, msg: str, *args: Any, **kwargs: Any) -> None: + """Log a CRITICAL-level message if the orchestrator is not replaying.""" + if self._should_log(): + self._logger.critical(msg, *args, **kwargs) + + def exception(self, msg: str, *args: Any, **kwargs: Any) -> None: + """Log an ERROR-level message with exception info if the orchestrator is not replaying.""" + if self._should_log(): + self._logger.exception(msg, *args, **kwargs) + class FailureDetails: def __init__(self, message: str, error_type: str, stack_trace: Optional[str]): diff --git a/tests/durabletask/test_orchestration_executor.py b/tests/durabletask/test_orchestration_executor.py index 14d5e14d..3c159b58 100644 --- a/tests/durabletask/test_orchestration_executor.py +++ b/tests/durabletask/test_orchestration_executor.py @@ -1152,6 +1152,123 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert complete_action.result.value == encoded_output +def test_replay_safe_logger_suppresses_during_replay(): + """Validates that the replay-safe logger suppresses log messages during replay.""" + log_calls: list[str] = [] + + class _RecordingHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + log_calls.append(record.getMessage()) + + inner_logger = logging.getLogger("test_replay_safe_logger") + inner_logger.setLevel(logging.DEBUG) + inner_logger.addHandler(_RecordingHandler()) + + activity_name = "say_hello" + + def say_hello(_, name: str) -> str: + return f"Hello, {name}!" + + def orchestrator(ctx: task.OrchestrationContext, _): + replay_logger = ctx.create_replay_safe_logger(inner_logger) + replay_logger.info("Starting orchestration") + result = yield ctx.call_activity(say_hello, input="World") + replay_logger.info("Activity completed: %s", result) + return result + + registry = worker._Registry() + activity_name = registry.add_activity(say_hello) + orchestrator_name = registry.add_orchestrator(orchestrator) + + # First execution: starts the orchestration. The orchestrator runs without + # replay, so both log calls should be emitted. + new_events = [ + helpers.new_orchestrator_started_event(datetime.now()), + helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + assert result.actions # should have scheduled the activity + + assert log_calls == ["Starting orchestration"] + log_calls.clear() + + # Second execution: the orchestrator replays from history and then processes the + # activity completion. The "Starting orchestration" message is emitted during + # replay and should be suppressed; "Activity completed" is emitted after replay + # ends and should appear exactly once. + old_events = new_events + [ + helpers.new_task_scheduled_event(1, activity_name), + ] + encoded_output = json.dumps(say_hello(None, "World")) + new_events = [helpers.new_task_completed_event(1, encoded_output)] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions) + assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + + assert log_calls == ["Activity completed: Hello, World!"] + + +def test_replay_safe_logger_all_levels(): + """Validates that all log levels are suppressed during replay and emitted otherwise.""" + log_levels: list[str] = [] + + class _LevelRecorder(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + log_levels.append(record.levelname) + + inner_logger = logging.getLogger("test_replay_safe_logger_levels") + inner_logger.setLevel(logging.DEBUG) + inner_logger.addHandler(_LevelRecorder()) + + def orchestrator(ctx: task.OrchestrationContext, _): + replay_logger = ctx.create_replay_safe_logger(inner_logger) + replay_logger.debug("debug msg") + replay_logger.info("info msg") + replay_logger.warning("warning msg") + replay_logger.error("error msg") + replay_logger.critical("critical msg") + return "done" + + registry = worker._Registry() + orchestrator_name = registry.add_orchestrator(orchestrator) + + new_events = [ + helpers.new_orchestrator_started_event(datetime.now()), + helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions) + assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + + assert log_levels == ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + + +def test_replay_safe_logger_direct(): + """Unit test for ReplaySafeLogger — verifies suppression based on is_replaying flag.""" + log_calls: list[str] = [] + + class _RecordingHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + log_calls.append(record.getMessage()) + + inner_logger = logging.getLogger("test_replay_safe_logger_direct") + inner_logger.setLevel(logging.DEBUG) + inner_logger.addHandler(_RecordingHandler()) + + replaying = True + replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying) + + replay_logger.info("should be suppressed") + assert log_calls == [] + + replaying = False + replay_logger.info("should appear") + assert log_calls == ["should appear"] + + def test_when_any_with_retry(): """Tests that a when_any pattern works correctly with retries""" def dummy_activity(_, inp: str): From 151c50a56504f5a55bd45034e4982667888f11d5 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Wed, 8 Apr 2026 11:42:12 -0600 Subject: [PATCH 2/3] Refactor ReplaySafeLogger to use LoggerAdapter, add tests and docs - Rewrite ReplaySafeLogger as a logging.LoggerAdapter subclass with a single isEnabledFor() override instead of manual method delegation - Add log() and isEnabledFor() methods with tests - Add replay-safe logging section to docs/features.md - Add usage example in examples/activity_sequence.py - Add cross-package compatibility section to copilot-instructions.md --- .github/copilot-instructions.md | 15 ++++++ docs/features.md | 41 ++++++++++++++++ durabletask/task.py | 49 +++++-------------- examples/activity_sequence.py | 8 +++ .../test_orchestration_executor.py | 47 ++++++++++++++++++ 5 files changed, 124 insertions(+), 36 deletions(-) diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 1a0fdd7a..462f8c46 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -107,3 +107,18 @@ python -m pytest - `examples/` — example orchestrations (see `examples/README.md`) - `tests/` — test suite - `dev-requirements.txt` — development dependencies + +## Cross-Package Compatibility + +The `durabletask-azuremanaged` package extends the core `durabletask` +package (e.g. `DurableTaskSchedulerWorker` subclasses +`TaskHubGrpcWorker`). When adding or changing features in +`durabletask/`, always verify that `durabletask-azuremanaged` still +works correctly: + +- Check whether the azuremanaged worker, client, or tests override or + depend on the code you changed. +- Run the azuremanaged unit tests if they exist for the affected area. +- If a new public API is added to the core SDK (e.g. a method on + `OrchestrationContext`), confirm it is accessible through the + azuremanaged package and add a test or example if appropriate. diff --git a/docs/features.md b/docs/features.md index 85db052d..ea9bf36f 100644 --- a/docs/features.md +++ b/docs/features.md @@ -150,6 +150,47 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error. +### Replay-safe logging + +Orchestrator functions replay their history each time they are resumed, +which can cause duplicate log messages. The `create_replay_safe_logger` +method on `OrchestrationContext` returns a `ReplaySafeLogger` that wraps +a standard `logging.Logger` and automatically suppresses output while +the orchestrator is replaying. `ReplaySafeLogger` extends Python's +`logging.LoggerAdapter`, which is the idiomatic way to add context or +modify behavior on an existing logger. + +```python +import logging + +logger = logging.getLogger("my_orchestrator") + +def my_orchestrator(ctx: task.OrchestrationContext, input): + replay_logger = ctx.create_replay_safe_logger(logger) + replay_logger.info("Starting orchestration %s", ctx.instance_id) + result = yield ctx.call_activity(my_activity, input=input) + replay_logger.info("Activity returned: %s", result) + return result +``` + +> [!NOTE] +> Unlike the .NET SDK, where `CreateReplaySafeLogger` accepts a +> category name string and internally creates the logger via +> `ILoggerFactory`, the Python SDK requires you to pass an existing +> `logging.Logger` instance. This is because Python's +> `logging.getLogger(name)` already serves as the global factory and +> is the standard way to obtain loggers. + +The replay-safe logger supports all standard log levels: `debug`, +`info`, `warning`, `error`, `critical`, and `exception`, as well as +the generic `log(level, msg)` method. It also exposes `isEnabledFor` +which returns `False` during replay so callers can skip expensive +message formatting. + +> [!TIP] +> Create the replay-safe logger once at the start of your orchestrator +> and reuse it throughout the function. + ### Large payload externalization Orchestration inputs, outputs, and event data are transmitted through diff --git a/durabletask/task.py b/durabletask/task.py index cd6172a3..291529e6 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -300,10 +300,10 @@ def create_replay_safe_logger(self, logger: logging.Logger) -> ReplaySafeLogger: return ReplaySafeLogger(logger, lambda: self.is_replaying) -class ReplaySafeLogger: - """A logger wrapper that suppresses log messages during orchestration replay. +class ReplaySafeLogger(logging.LoggerAdapter): + """A logger adapter that suppresses log messages during orchestration replay. - This class wraps a standard :class:`logging.Logger` and only emits log + This class extends :class:`logging.LoggerAdapter` and only emits log messages when the orchestrator is *not* replaying. Use this to avoid duplicate log entries that would otherwise appear every time the orchestrator replays its history. @@ -312,41 +312,18 @@ class ReplaySafeLogger: """ def __init__(self, logger: logging.Logger, is_replaying: Callable[[], bool]) -> None: - self._logger = logger + super().__init__(logger, {}) self._is_replaying = is_replaying - def _should_log(self) -> bool: - return not self._is_replaying() - - def debug(self, msg: str, *args: Any, **kwargs: Any) -> None: - """Log a DEBUG-level message if the orchestrator is not replaying.""" - if self._should_log(): - self._logger.debug(msg, *args, **kwargs) - - def info(self, msg: str, *args: Any, **kwargs: Any) -> None: - """Log an INFO-level message if the orchestrator is not replaying.""" - if self._should_log(): - self._logger.info(msg, *args, **kwargs) - - def warning(self, msg: str, *args: Any, **kwargs: Any) -> None: - """Log a WARNING-level message if the orchestrator is not replaying.""" - if self._should_log(): - self._logger.warning(msg, *args, **kwargs) - - def error(self, msg: str, *args: Any, **kwargs: Any) -> None: - """Log an ERROR-level message if the orchestrator is not replaying.""" - if self._should_log(): - self._logger.error(msg, *args, **kwargs) - - def critical(self, msg: str, *args: Any, **kwargs: Any) -> None: - """Log a CRITICAL-level message if the orchestrator is not replaying.""" - if self._should_log(): - self._logger.critical(msg, *args, **kwargs) - - def exception(self, msg: str, *args: Any, **kwargs: Any) -> None: - """Log an ERROR-level message with exception info if the orchestrator is not replaying.""" - if self._should_log(): - self._logger.exception(msg, *args, **kwargs) + def isEnabledFor(self, level: int) -> bool: + """Return whether logging is enabled for the given level. + + Returns ``False`` while the orchestrator is replaying so that callers + can skip expensive message formatting during replay. + """ + if self._is_replaying(): + return False + return self.logger.isEnabledFor(level) class FailureDetails: diff --git a/examples/activity_sequence.py b/examples/activity_sequence.py index 420935d7..b4b92ea7 100644 --- a/examples/activity_sequence.py +++ b/examples/activity_sequence.py @@ -1,5 +1,6 @@ """End-to-end sample that demonstrates how to configure an orchestrator that calls an activity function in a sequence and prints the outputs.""" +import logging import os from azure.identity import DefaultAzureCredential @@ -8,6 +9,8 @@ from durabletask.azuremanaged.client import DurableTaskSchedulerClient from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker +logger = logging.getLogger("activity_sequence") + def hello(ctx: task.ActivityContext, name: str) -> str: """Activity function that returns a greeting""" @@ -16,10 +19,15 @@ def hello(ctx: task.ActivityContext, name: str) -> str: def sequence(ctx: task.OrchestrationContext, _): """Orchestrator function that calls the 'hello' activity function in a sequence""" + # Create a replay-safe logger to avoid duplicate log messages during replay + replay_logger = ctx.create_replay_safe_logger(logger) + + replay_logger.info("Starting activity sequence for instance %s", ctx.instance_id) # call "hello" activity function in a sequence result1 = yield ctx.call_activity(hello, input='Tokyo') result2 = yield ctx.call_activity(hello, input='Seattle') result3 = yield ctx.call_activity(hello, input='London') + replay_logger.info("All activities completed") # return an array of results return [result1, result2, result3] diff --git a/tests/durabletask/test_orchestration_executor.py b/tests/durabletask/test_orchestration_executor.py index 3c159b58..a6b205b9 100644 --- a/tests/durabletask/test_orchestration_executor.py +++ b/tests/durabletask/test_orchestration_executor.py @@ -1269,6 +1269,53 @@ def emit(self, record: logging.LogRecord) -> None: assert log_calls == ["should appear"] +def test_replay_safe_logger_log_method(): + """Validates the generic log() method respects the replay flag.""" + log_calls: list[str] = [] + + class _RecordingHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + log_calls.append(record.getMessage()) + + inner_logger = logging.getLogger("test_replay_safe_logger_log_method") + inner_logger.setLevel(logging.DEBUG) + inner_logger.addHandler(_RecordingHandler()) + + replaying = True + replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying) + + replay_logger.log(logging.WARNING, "suppressed warning") + assert log_calls == [] + + replaying = False + replay_logger.log(logging.WARNING, "visible warning") + assert log_calls == ["visible warning"] + + +def test_replay_safe_logger_is_enabled_for(): + """Validates isEnabledFor returns False during replay.""" + inner_logger = logging.getLogger("test_replay_safe_logger_enabled") + inner_logger.setLevel(logging.DEBUG) + + replaying = True + replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying) + + # During replay, isEnabledFor should always return False + assert replay_logger.isEnabledFor(logging.DEBUG) is False + assert replay_logger.isEnabledFor(logging.INFO) is False + assert replay_logger.isEnabledFor(logging.CRITICAL) is False + + # After replay, delegates to the inner logger + replaying = False + assert replay_logger.isEnabledFor(logging.DEBUG) is True + assert replay_logger.isEnabledFor(logging.INFO) is True + + # If a level is below the inner logger's level, should return False + inner_logger.setLevel(logging.WARNING) + assert replay_logger.isEnabledFor(logging.DEBUG) is False + assert replay_logger.isEnabledFor(logging.WARNING) is True + + def test_when_any_with_retry(): """Tests that a when_any pattern works correctly with retries""" def dummy_activity(_, inp: str): From e61534310d1c7809ff7f278104197aa5cdf939b8 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Wed, 8 Apr 2026 12:28:02 -0600 Subject: [PATCH 3/3] PR feedback --- docs/features.md | 6 +- .../test_orchestration_executor.py | 206 ++++++++++-------- 2 files changed, 121 insertions(+), 91 deletions(-) diff --git a/docs/features.md b/docs/features.md index 4405edd4..123312d5 100644 --- a/docs/features.md +++ b/docs/features.md @@ -163,12 +163,14 @@ modify behavior on an existing logger. ```python import logging +from durabletask import task + logger = logging.getLogger("my_orchestrator") -def my_orchestrator(ctx: task.OrchestrationContext, input): +def my_orchestrator(ctx: task.OrchestrationContext, payload): replay_logger = ctx.create_replay_safe_logger(logger) replay_logger.info("Starting orchestration %s", ctx.instance_id) - result = yield ctx.call_activity(my_activity, input=input) + result = yield ctx.call_activity(my_activity, input=payload) replay_logger.info("Activity returned: %s", result) return result ``` diff --git a/tests/durabletask/test_orchestration_executor.py b/tests/durabletask/test_orchestration_executor.py index f4bc537a..0134b122 100644 --- a/tests/durabletask/test_orchestration_executor.py +++ b/tests/durabletask/test_orchestration_executor.py @@ -1509,54 +1509,61 @@ class _RecordingHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: log_calls.append(record.getMessage()) + handler = _RecordingHandler() inner_logger = logging.getLogger("test_replay_safe_logger") inner_logger.setLevel(logging.DEBUG) - inner_logger.addHandler(_RecordingHandler()) - - activity_name = "say_hello" - - def say_hello(_, name: str) -> str: - return f"Hello, {name}!" - - def orchestrator(ctx: task.OrchestrationContext, _): - replay_logger = ctx.create_replay_safe_logger(inner_logger) - replay_logger.info("Starting orchestration") - result = yield ctx.call_activity(say_hello, input="World") - replay_logger.info("Activity completed: %s", result) - return result - - registry = worker._Registry() - activity_name = registry.add_activity(say_hello) - orchestrator_name = registry.add_orchestrator(orchestrator) - - # First execution: starts the orchestration. The orchestrator runs without - # replay, so both log calls should be emitted. - new_events = [ - helpers.new_orchestrator_started_event(datetime.now()), - helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None), - ] - executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - result = executor.execute(TEST_INSTANCE_ID, [], new_events) - assert result.actions # should have scheduled the activity - - assert log_calls == ["Starting orchestration"] - log_calls.clear() - - # Second execution: the orchestrator replays from history and then processes the - # activity completion. The "Starting orchestration" message is emitted during - # replay and should be suppressed; "Activity completed" is emitted after replay - # ends and should appear exactly once. - old_events = new_events + [ - helpers.new_task_scheduled_event(1, activity_name), - ] - encoded_output = json.dumps(say_hello(None, "World")) - new_events = [helpers.new_task_completed_event(1, encoded_output)] - executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) - complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions) - assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED - - assert log_calls == ["Activity completed: Hello, World!"] + original_propagate = inner_logger.propagate + inner_logger.propagate = False + inner_logger.addHandler(handler) + + try: + activity_name = "say_hello" + + def say_hello(_, name: str) -> str: + return f"Hello, {name}!" + + def orchestrator(ctx: task.OrchestrationContext, _): + replay_logger = ctx.create_replay_safe_logger(inner_logger) + replay_logger.info("Starting orchestration") + result = yield ctx.call_activity(say_hello, input="World") + replay_logger.info("Activity completed: %s", result) + return result + + registry = worker._Registry() + activity_name = registry.add_activity(say_hello) + orchestrator_name = registry.add_orchestrator(orchestrator) + + # First execution: starts the orchestration. The orchestrator runs without + # replay, emits the initial log message, and then schedules the activity. + new_events = [ + helpers.new_orchestrator_started_event(datetime.now()), + helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + assert result.actions # should have scheduled the activity + + assert log_calls == ["Starting orchestration"] + log_calls.clear() + + # Second execution: the orchestrator replays from history and then processes the + # activity completion. The "Starting orchestration" message is emitted during + # replay and should be suppressed; "Activity completed" is emitted after replay + # ends and should appear exactly once. + old_events = new_events + [ + helpers.new_task_scheduled_event(1, activity_name), + ] + encoded_output = json.dumps(say_hello(None, "World")) + new_events = [helpers.new_task_completed_event(1, encoded_output)] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions) + assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + + assert log_calls == ["Activity completed: Hello, World!"] + finally: + inner_logger.removeHandler(handler) + inner_logger.propagate = original_propagate def test_replay_safe_logger_all_levels(): @@ -1567,32 +1574,39 @@ class _LevelRecorder(logging.Handler): def emit(self, record: logging.LogRecord) -> None: log_levels.append(record.levelname) + handler = _LevelRecorder() inner_logger = logging.getLogger("test_replay_safe_logger_levels") inner_logger.setLevel(logging.DEBUG) - inner_logger.addHandler(_LevelRecorder()) - - def orchestrator(ctx: task.OrchestrationContext, _): - replay_logger = ctx.create_replay_safe_logger(inner_logger) - replay_logger.debug("debug msg") - replay_logger.info("info msg") - replay_logger.warning("warning msg") - replay_logger.error("error msg") - replay_logger.critical("critical msg") - return "done" - - registry = worker._Registry() - orchestrator_name = registry.add_orchestrator(orchestrator) - - new_events = [ - helpers.new_orchestrator_started_event(datetime.now()), - helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None), - ] - executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - result = executor.execute(TEST_INSTANCE_ID, [], new_events) - complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions) - assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED - - assert log_levels == ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + original_propagate = inner_logger.propagate + inner_logger.propagate = False + inner_logger.addHandler(handler) + + try: + def orchestrator(ctx: task.OrchestrationContext, _): + replay_logger = ctx.create_replay_safe_logger(inner_logger) + replay_logger.debug("debug msg") + replay_logger.info("info msg") + replay_logger.warning("warning msg") + replay_logger.error("error msg") + replay_logger.critical("critical msg") + return "done" + + registry = worker._Registry() + orchestrator_name = registry.add_orchestrator(orchestrator) + + new_events = [ + helpers.new_orchestrator_started_event(datetime.now()), + helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions) + assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + + assert log_levels == ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + finally: + inner_logger.removeHandler(handler) + inner_logger.propagate = original_propagate def test_replay_safe_logger_direct(): @@ -1603,19 +1617,26 @@ class _RecordingHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: log_calls.append(record.getMessage()) + handler = _RecordingHandler() inner_logger = logging.getLogger("test_replay_safe_logger_direct") inner_logger.setLevel(logging.DEBUG) - inner_logger.addHandler(_RecordingHandler()) + original_propagate = inner_logger.propagate + inner_logger.propagate = False + inner_logger.addHandler(handler) - replaying = True - replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying) + try: + replaying = True + replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying) - replay_logger.info("should be suppressed") - assert log_calls == [] + replay_logger.info("should be suppressed") + assert log_calls == [] - replaying = False - replay_logger.info("should appear") - assert log_calls == ["should appear"] + replaying = False + replay_logger.info("should appear") + assert log_calls == ["should appear"] + finally: + inner_logger.removeHandler(handler) + inner_logger.propagate = original_propagate def test_replay_safe_logger_log_method(): @@ -1626,19 +1647,26 @@ class _RecordingHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: log_calls.append(record.getMessage()) + handler = _RecordingHandler() inner_logger = logging.getLogger("test_replay_safe_logger_log_method") inner_logger.setLevel(logging.DEBUG) - inner_logger.addHandler(_RecordingHandler()) - - replaying = True - replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying) - - replay_logger.log(logging.WARNING, "suppressed warning") - assert log_calls == [] - - replaying = False - replay_logger.log(logging.WARNING, "visible warning") - assert log_calls == ["visible warning"] + original_propagate = inner_logger.propagate + inner_logger.propagate = False + inner_logger.addHandler(handler) + + try: + replaying = True + replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying) + + replay_logger.log(logging.WARNING, "suppressed warning") + assert log_calls == [] + + replaying = False + replay_logger.log(logging.WARNING, "visible warning") + assert log_calls == ["visible warning"] + finally: + inner_logger.removeHandler(handler) + inner_logger.propagate = original_propagate def test_replay_safe_logger_is_enabled_for():