From bafbafab35f5a9bccaec416e7bcb7d573a55439b Mon Sep 17 00:00:00 2001 From: Fernando Celmer Date: Sat, 2 May 2026 23:31:25 -0300 Subject: [PATCH 1/6] =?UTF-8?q?=E2=9A=99=EF=B8=8F=20FEATURE-#290:=20Add=20?= =?UTF-8?q?EventBus=20and=20StatusChanged=20event?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotflow/core/events.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 dotflow/core/events.py diff --git a/dotflow/core/events.py b/dotflow/core/events.py new file mode 100644 index 00000000..6892664a --- /dev/null +++ b/dotflow/core/events.py @@ -0,0 +1,39 @@ +"""Event bus""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from dotflow.logging import logger + +if TYPE_CHECKING: + from dotflow.core.task import Task + from dotflow.core.types.status import TypeStatus + + +@dataclass +class StatusChanged: + """Emitted when a task transitions to a new status.""" + + task: Task + old: TypeStatus + new: TypeStatus + + +class EventBus: + """In-process pub/sub. Subscribers run in registration order.""" + + def __init__(self) -> None: + self._subs: list[Callable[[Any], None]] = [] + + def subscribe(self, handler: Callable[[Any], None]) -> None: + self._subs.append(handler) + + def emit(self, event: Any) -> None: + for handler in self._subs: + try: + handler(event) + except Exception: + logger.exception("event handler failed") From 29fd121f7ad418498e48bd6f4f0765d3af5e8a36 Mon Sep 17 00:00:00 2001 From: Fernando Celmer Date: Sat, 2 May 2026 23:31:25 -0300 Subject: [PATCH 2/6] =?UTF-8?q?=E2=9A=99=EF=B8=8F=20FEATURE-#290:=20Add=20?= =?UTF-8?q?default=20Log=20Notify=20Metrics=20subscribers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotflow/core/subscribers.py | 62 +++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 dotflow/core/subscribers.py diff --git a/dotflow/core/subscribers.py b/dotflow/core/subscribers.py new file mode 100644 index 00000000..dfe3634d --- /dev/null +++ b/dotflow/core/subscribers.py @@ -0,0 +1,62 @@ +"""Default subscribers for the internal event bus.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from dotflow.core.events import StatusChanged +from dotflow.core.types.status import TypeStatus + +if TYPE_CHECKING: + from dotflow.abc.log import Log + from dotflow.abc.metrics import Metrics + from dotflow.abc.notify import Notify + + +class NotifySubscriber: + """Forwards status changes to the configured notify provider.""" + + def __init__(self, notify: Notify): + self.notify = notify + + def __call__(self, event): + if not isinstance(event, StatusChanged): + return + + self.notify.hook_status_task(task=event.task) + + +class LogSubscriber: + """Routes status changes to the configured log provider.""" + + def __init__(self, log: Log): + self.log = log + + def __call__(self, event): + if not isinstance(event, StatusChanged): + return + + if event.new == TypeStatus.FAILED: + self.log.error(task=event.task) + elif event.new == TypeStatus.RETRY: + self.log.warning(task=event.task) + else: + self.log.info(task=event.task) + + +class MetricsSubscriber: + """Updates task counters on status transitions.""" + + def __init__(self, metrics: Metrics): + self.metrics = metrics + + def __call__(self, event): + if not isinstance(event, StatusChanged): + return + + if event.new == TypeStatus.FAILED: + self.metrics.task_failed(task=event.task) + elif event.new == TypeStatus.RETRY: + self.metrics.task_retried(task=event.task) + elif event.new == TypeStatus.COMPLETED: + self.metrics.task_completed(task=event.task) From 4812cd624596a9ff5fff1a8fbb68a6bd18d8253d Mon Sep 17 00:00:00 2001 From: Fernando Celmer Date: Sat, 2 May 2026 23:31:25 -0300 Subject: [PATCH 3/6] =?UTF-8?q?=E2=9A=99=EF=B8=8F=20FEATURE-#290:=20Wire?= =?UTF-8?q?=20EventBus=20and=20default=20subscribers=20in=20Config?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotflow/core/config.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/dotflow/core/config.py b/dotflow/core/config.py index f10733ed..a89c4cef 100644 --- a/dotflow/core/config.py +++ b/dotflow/core/config.py @@ -9,7 +9,13 @@ from dotflow.abc.server import Server from dotflow.abc.storage import Storage from dotflow.abc.tracer import Tracer +from dotflow.core.events import EventBus from dotflow.core.exception import NotCallableObject +from dotflow.core.subscribers import ( + LogSubscriber, + MetricsSubscriber, + NotifySubscriber, +) from dotflow.providers.log_default import LogDefault from dotflow.providers.metrics_default import MetricsDefault from dotflow.providers.notify_default import NotifyDefault @@ -91,6 +97,11 @@ def __init__( self._validate() + self.events = EventBus() + self.events.subscribe(LogSubscriber(self.log)) + self.events.subscribe(NotifySubscriber(self.notify)) + self.events.subscribe(MetricsSubscriber(self.metrics)) + def _validate(self) -> None: for name, abc in self._PROVIDERS.items(): value = getattr(self, name) From a0b9c3932050027ae64bb6ae00f857e49bff5189 Mon Sep 17 00:00:00 2001 From: Fernando Celmer Date: Sat, 2 May 2026 23:31:25 -0300 Subject: [PATCH 4/6] =?UTF-8?q?=E2=9A=99=EF=B8=8F=20FEATURE-#290:=20Emit?= =?UTF-8?q?=20StatusChanged=20from=20Task.status=20setter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotflow/core/task.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/dotflow/core/task.py b/dotflow/core/task.py index 7ef2f1c9..7534005c 100644 --- a/dotflow/core/task.py +++ b/dotflow/core/task.py @@ -13,6 +13,7 @@ from dotflow.core.action import Action from dotflow.core.config import Config from dotflow.core.context import Context +from dotflow.core.events import StatusChanged from dotflow.core.exception import ( MissingActionDecorator, NotCallableObject, @@ -254,21 +255,10 @@ def status(self): @status.setter def status(self, value: TypeStatus) -> None: + old = self._status self._status = value - self.config.notify.hook_status_task(task=self) - - if value == TypeStatus.FAILED: - self.config.log.error(task=self) - self.config.metrics.task_failed(task=self) - elif value == TypeStatus.RETRY: - self.config.log.warning(task=self) - self.config.metrics.task_retried(task=self) - elif value == TypeStatus.COMPLETED: - self.config.log.info(task=self) - self.config.metrics.task_completed(task=self) - else: - self.config.log.info(task=self) + self.config.events.emit(StatusChanged(task=self, old=old, new=value)) @property def config(self): From 06c3ef069ffda65bba8238239333f1a87e162826 Mon Sep 17 00:00:00 2001 From: Fernando Celmer Date: Sat, 2 May 2026 23:31:25 -0300 Subject: [PATCH 5/6] =?UTF-8?q?=E2=9D=A4=EF=B8=8F=20TEST-#290:=20Cover=20E?= =?UTF-8?q?ventBus=20emit=20and=20handler=20isolation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/core/test_events.py | 53 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 tests/core/test_events.py diff --git a/tests/core/test_events.py b/tests/core/test_events.py new file mode 100644 index 00000000..5ca8344f --- /dev/null +++ b/tests/core/test_events.py @@ -0,0 +1,53 @@ +"""Test EventBus and StatusChanged.""" + +import unittest +from unittest.mock import Mock + +from dotflow.core.events import EventBus, StatusChanged + + +class TestEventBus(unittest.TestCase): + def test_emit_runs_every_subscriber(self): + bus = EventBus() + + a = Mock() + b = Mock() + + bus.subscribe(a) + bus.subscribe(b) + bus.emit("payload") + + a.assert_called_once_with("payload") + b.assert_called_once_with("payload") + + def test_subscriber_failure_does_not_block_others(self): + bus = EventBus() + + bus.subscribe(Mock(side_effect=RuntimeError("boom"))) + survivor = Mock() + bus.subscribe(survivor) + + bus.emit("payload") + + survivor.assert_called_once_with("payload") + + def test_subscribers_run_in_registration_order(self): + bus = EventBus() + calls = [] + + bus.subscribe(lambda _e: calls.append("a")) + bus.subscribe(lambda _e: calls.append("b")) + bus.subscribe(lambda _e: calls.append("c")) + + bus.emit("x") + + self.assertEqual(calls, ["a", "b", "c"]) + + +class TestStatusChanged(unittest.TestCase): + def test_carries_task_old_new(self): + event = StatusChanged(task="t", old="OLD", new="NEW") + + self.assertEqual(event.task, "t") + self.assertEqual(event.old, "OLD") + self.assertEqual(event.new, "NEW") From e9f10f436c3f35d25753ed7f1de22b42ea69cf36 Mon Sep 17 00:00:00 2001 From: Fernando Celmer Date: Sat, 2 May 2026 23:31:25 -0300 Subject: [PATCH 6/6] =?UTF-8?q?=E2=9D=A4=EF=B8=8F=20TEST-#290:=20Cover=20d?= =?UTF-8?q?efault=20subscribers=20status=20routing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/core/test_subscribers.py | 90 ++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 tests/core/test_subscribers.py diff --git a/tests/core/test_subscribers.py b/tests/core/test_subscribers.py new file mode 100644 index 00000000..668093dd --- /dev/null +++ b/tests/core/test_subscribers.py @@ -0,0 +1,90 @@ +"""Test default subscribers.""" + +import unittest +from unittest.mock import Mock + +from dotflow.core.events import StatusChanged +from dotflow.core.subscribers import ( + LogSubscriber, + MetricsSubscriber, + NotifySubscriber, +) +from dotflow.core.types.status import TypeStatus + + +def _event(new): + return StatusChanged(task="task", old=TypeStatus.IN_PROGRESS, new=new) + + +class TestNotifySubscriber(unittest.TestCase): + def test_calls_hook_status_task(self): + notify = Mock() + sub = NotifySubscriber(notify) + + sub(_event(TypeStatus.COMPLETED)) + + notify.hook_status_task.assert_called_once_with(task="task") + + def test_ignores_unknown_event(self): + notify = Mock() + sub = NotifySubscriber(notify) + + sub("not a status change") + + notify.hook_status_task.assert_not_called() + + +class TestLogSubscriber(unittest.TestCase): + def test_failed_uses_error(self): + log = Mock() + LogSubscriber(log)(_event(TypeStatus.FAILED)) + + log.error.assert_called_once_with(task="task") + log.warning.assert_not_called() + log.info.assert_not_called() + + def test_retry_uses_warning(self): + log = Mock() + LogSubscriber(log)(_event(TypeStatus.RETRY)) + + log.warning.assert_called_once_with(task="task") + + def test_completed_uses_info(self): + log = Mock() + LogSubscriber(log)(_event(TypeStatus.COMPLETED)) + + log.info.assert_called_once_with(task="task") + + def test_other_status_uses_info(self): + log = Mock() + LogSubscriber(log)(_event(TypeStatus.IN_PROGRESS)) + + log.info.assert_called_once_with(task="task") + + +class TestMetricsSubscriber(unittest.TestCase): + def test_failed_calls_task_failed(self): + metrics = Mock() + MetricsSubscriber(metrics)(_event(TypeStatus.FAILED)) + + metrics.task_failed.assert_called_once_with(task="task") + + def test_retry_calls_task_retried(self): + metrics = Mock() + MetricsSubscriber(metrics)(_event(TypeStatus.RETRY)) + + metrics.task_retried.assert_called_once_with(task="task") + + def test_completed_calls_task_completed(self): + metrics = Mock() + MetricsSubscriber(metrics)(_event(TypeStatus.COMPLETED)) + + metrics.task_completed.assert_called_once_with(task="task") + + def test_other_status_no_op(self): + metrics = Mock() + MetricsSubscriber(metrics)(_event(TypeStatus.IN_PROGRESS)) + + metrics.task_failed.assert_not_called() + metrics.task_retried.assert_not_called() + metrics.task_completed.assert_not_called()