-
Notifications
You must be signed in to change notification settings - Fork 8
⚙️ FEATURE-#290: Decouple Task status side effects via EventBus #299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
bafbafa
29fd121
4812cd6
a0b9c39
06c3ef0
e9f10f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| """Event bus""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Callable | ||
| from dataclasses import dataclass | ||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
| from dotflow.logging import logger | ||
|
|
||
| if TYPE_CHECKING: | ||
| from dotflow.core.task import Task | ||
| from dotflow.core.types.status import TypeStatus | ||
|
|
||
|
|
||
| @dataclass | ||
| class StatusChanged: | ||
| """Emitted when a task transitions to a new status.""" | ||
|
|
||
| task: Task | ||
| old: TypeStatus | ||
| new: TypeStatus | ||
|
|
||
|
|
||
| class EventBus: | ||
| """In-process pub/sub. Subscribers run in registration order.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| self._subs: list[Callable[[Any], None]] = [] | ||
|
|
||
| def subscribe(self, handler: Callable[[Any], None]) -> None: | ||
| self._subs.append(handler) | ||
|
|
||
| def emit(self, event: Any) -> None: | ||
| for handler in self._subs: | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Suggestion] Problem — The three built-in subscribers ( Failure scenario class AutoRetrySubscriber:
def __call__(self, event):
if isinstance(event, StatusChanged):
if event.new == TypeStatus.FAILED:
# Attempting a retry — sets status → triggers emit → calls this again
event.task.status = TypeStatus.RETRY # RecursionErrorFix — Add a simple re-entrancy guard: class EventBus:
def __init__(self) -> None:
self._subs: list[Callable[[Any], None]] = []
self._emitting: bool = False
def emit(self, event: Any) -> None:
if self._emitting:
return # or raise, or queue for later — depends on desired semantics
self._emitting = True
try:
for handler in self._subs:
try:
handler(event)
except Exception:
logger.exception("event handler failed")
finally:
self._emitting = FalseDocument the constraint clearly in the class docstring so that subscriber authors know status mutations inside a handler are not allowed (or are deferred). References — Observer pattern — Re-entrancy considerations |
||
| try: | ||
| handler(event) | ||
| except Exception: | ||
| logger.exception("event handler failed") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| """Default subscribers for the internal event bus.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| from dotflow.core.events import StatusChanged | ||
| from dotflow.core.types.status import TypeStatus | ||
|
|
||
| if TYPE_CHECKING: | ||
| from dotflow.abc.log import Log | ||
| from dotflow.abc.metrics import Metrics | ||
| from dotflow.abc.notify import Notify | ||
|
|
||
|
|
||
| class NotifySubscriber: | ||
| """Forwards status changes to the configured notify provider.""" | ||
|
|
||
| def __init__(self, notify: Notify): | ||
| self.notify = notify | ||
|
|
||
| def __call__(self, event): | ||
| if not isinstance(event, StatusChanged): | ||
| return | ||
|
|
||
| self.notify.hook_status_task(task=event.task) | ||
|
|
||
|
|
||
| class LogSubscriber: | ||
| """Routes status changes to the configured log provider.""" | ||
|
|
||
| def __init__(self, log: Log): | ||
| self.log = log | ||
|
|
||
| def __call__(self, event): | ||
| if not isinstance(event, StatusChanged): | ||
| return | ||
|
|
||
| if event.new == TypeStatus.FAILED: | ||
| self.log.error(task=event.task) | ||
| elif event.new == TypeStatus.RETRY: | ||
| self.log.warning(task=event.task) | ||
| else: | ||
| self.log.info(task=event.task) | ||
|
|
||
|
|
||
| class MetricsSubscriber: | ||
| """Updates task counters on status transitions.""" | ||
|
|
||
| def __init__(self, metrics: Metrics): | ||
| self.metrics = metrics | ||
|
|
||
| def __call__(self, event): | ||
| if not isinstance(event, StatusChanged): | ||
| return | ||
|
|
||
| if event.new == TypeStatus.FAILED: | ||
| self.metrics.task_failed(task=event.task) | ||
| elif event.new == TypeStatus.RETRY: | ||
| self.metrics.task_retried(task=event.task) | ||
| elif event.new == TypeStatus.COMPLETED: | ||
| self.metrics.task_completed(task=event.task) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| """Test EventBus and StatusChanged.""" | ||
|
|
||
| import unittest | ||
| from unittest.mock import Mock | ||
|
|
||
| from dotflow.core.events import EventBus, StatusChanged | ||
|
|
||
|
|
||
| class TestEventBus(unittest.TestCase): | ||
| def test_emit_runs_every_subscriber(self): | ||
| bus = EventBus() | ||
|
|
||
| a = Mock() | ||
| b = Mock() | ||
|
|
||
| bus.subscribe(a) | ||
| bus.subscribe(b) | ||
| bus.emit("payload") | ||
|
|
||
| a.assert_called_once_with("payload") | ||
| b.assert_called_once_with("payload") | ||
|
|
||
| def test_subscriber_failure_does_not_block_others(self): | ||
| bus = EventBus() | ||
|
|
||
| bus.subscribe(Mock(side_effect=RuntimeError("boom"))) | ||
| survivor = Mock() | ||
| bus.subscribe(survivor) | ||
|
|
||
| bus.emit("payload") | ||
|
|
||
| survivor.assert_called_once_with("payload") | ||
|
|
||
| def test_subscribers_run_in_registration_order(self): | ||
| bus = EventBus() | ||
| calls = [] | ||
|
|
||
| bus.subscribe(lambda _e: calls.append("a")) | ||
| bus.subscribe(lambda _e: calls.append("b")) | ||
| bus.subscribe(lambda _e: calls.append("c")) | ||
|
|
||
| bus.emit("x") | ||
|
|
||
| self.assertEqual(calls, ["a", "b", "c"]) | ||
|
|
||
|
|
||
| class TestStatusChanged(unittest.TestCase): | ||
| def test_carries_task_old_new(self): | ||
| event = StatusChanged(task="t", old="OLD", new="NEW") | ||
|
|
||
| self.assertEqual(event.task, "t") | ||
| self.assertEqual(event.old, "OLD") | ||
| self.assertEqual(event.new, "NEW") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| """Test default subscribers.""" | ||
|
|
||
| import unittest | ||
| from unittest.mock import Mock | ||
|
|
||
| from dotflow.core.events import StatusChanged | ||
| from dotflow.core.subscribers import ( | ||
| LogSubscriber, | ||
| MetricsSubscriber, | ||
| NotifySubscriber, | ||
| ) | ||
| from dotflow.core.types.status import TypeStatus | ||
|
|
||
|
|
||
| def _event(new): | ||
| return StatusChanged(task="task", old=TypeStatus.IN_PROGRESS, new=new) | ||
|
|
||
|
|
||
| class TestNotifySubscriber(unittest.TestCase): | ||
| def test_calls_hook_status_task(self): | ||
| notify = Mock() | ||
| sub = NotifySubscriber(notify) | ||
|
|
||
| sub(_event(TypeStatus.COMPLETED)) | ||
|
|
||
| notify.hook_status_task.assert_called_once_with(task="task") | ||
|
|
||
| def test_ignores_unknown_event(self): | ||
| notify = Mock() | ||
| sub = NotifySubscriber(notify) | ||
|
|
||
| sub("not a status change") | ||
|
|
||
| notify.hook_status_task.assert_not_called() | ||
|
|
||
|
|
||
| class TestLogSubscriber(unittest.TestCase): | ||
| def test_failed_uses_error(self): | ||
| log = Mock() | ||
| LogSubscriber(log)(_event(TypeStatus.FAILED)) | ||
|
|
||
| log.error.assert_called_once_with(task="task") | ||
| log.warning.assert_not_called() | ||
| log.info.assert_not_called() | ||
|
|
||
| def test_retry_uses_warning(self): | ||
| log = Mock() | ||
| LogSubscriber(log)(_event(TypeStatus.RETRY)) | ||
|
|
||
| log.warning.assert_called_once_with(task="task") | ||
|
|
||
| def test_completed_uses_info(self): | ||
| log = Mock() | ||
| LogSubscriber(log)(_event(TypeStatus.COMPLETED)) | ||
|
|
||
| log.info.assert_called_once_with(task="task") | ||
|
|
||
| def test_other_status_uses_info(self): | ||
| log = Mock() | ||
| LogSubscriber(log)(_event(TypeStatus.IN_PROGRESS)) | ||
|
|
||
| log.info.assert_called_once_with(task="task") | ||
|
|
||
|
|
||
| class TestMetricsSubscriber(unittest.TestCase): | ||
| def test_failed_calls_task_failed(self): | ||
| metrics = Mock() | ||
| MetricsSubscriber(metrics)(_event(TypeStatus.FAILED)) | ||
|
|
||
| metrics.task_failed.assert_called_once_with(task="task") | ||
|
|
||
| def test_retry_calls_task_retried(self): | ||
| metrics = Mock() | ||
| MetricsSubscriber(metrics)(_event(TypeStatus.RETRY)) | ||
|
|
||
| metrics.task_retried.assert_called_once_with(task="task") | ||
|
|
||
| def test_completed_calls_task_completed(self): | ||
| metrics = Mock() | ||
| MetricsSubscriber(metrics)(_event(TypeStatus.COMPLETED)) | ||
|
|
||
| metrics.task_completed.assert_called_once_with(task="task") | ||
|
|
||
| def test_other_status_no_op(self): | ||
| metrics = Mock() | ||
| MetricsSubscriber(metrics)(_event(TypeStatus.IN_PROGRESS)) | ||
|
|
||
| metrics.task_failed.assert_not_called() | ||
| metrics.task_retried.assert_not_called() | ||
| metrics.task_completed.assert_not_called() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Suggestion]
Problem — The
oldfield is typed asTypeStatus, but the very first status transition on everyTaskemitsStatusChanged(old=None, ...)becauseTask._statusis initialised toNoneinTaskInstance.__init__and the status setter capturesold = self._statusbefore any value has been set.The type annotation creates a false contract: anyone writing a custom subscriber who reads
event.oldand treats it as a guaranteedTypeStatuswill get anAttributeErroror silentNone-comparison failure on the very first event.Failure scenario
Every
Task(...)call immediately triggers this path (self.status = TypeStatus.NOT_STARTEDin__init__), so the crash happens on construction, not during execution.Fix — Narrow the annotation to reflect reality:
Alternatively, initialise
Task._statustoTypeStatus.NOT_STARTEDinTaskInstance.__init__so the field is neverNone, which would let the annotation stay asTypeStatusand also remove theif not self._statusguard in thestatusgetter.References — PEP 484 — Type Hints