diff --git a/dotflow/core/config.py b/dotflow/core/config.py index f10733ed..a89c4cef 100644 --- a/dotflow/core/config.py +++ b/dotflow/core/config.py @@ -9,7 +9,13 @@ from dotflow.abc.server import Server from dotflow.abc.storage import Storage from dotflow.abc.tracer import Tracer +from dotflow.core.events import EventBus from dotflow.core.exception import NotCallableObject +from dotflow.core.subscribers import ( + LogSubscriber, + MetricsSubscriber, + NotifySubscriber, +) from dotflow.providers.log_default import LogDefault from dotflow.providers.metrics_default import MetricsDefault from dotflow.providers.notify_default import NotifyDefault @@ -91,6 +97,11 @@ def __init__( self._validate() + self.events = EventBus() + self.events.subscribe(LogSubscriber(self.log)) + self.events.subscribe(NotifySubscriber(self.notify)) + self.events.subscribe(MetricsSubscriber(self.metrics)) + def _validate(self) -> None: for name, abc in self._PROVIDERS.items(): value = getattr(self, name) diff --git a/dotflow/core/events.py b/dotflow/core/events.py new file mode 100644 index 00000000..6892664a --- /dev/null +++ b/dotflow/core/events.py @@ -0,0 +1,39 @@ +"""Event bus""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from dotflow.logging import logger + +if TYPE_CHECKING: + from dotflow.core.task import Task + from dotflow.core.types.status import TypeStatus + + +@dataclass +class StatusChanged: + """Emitted when a task transitions to a new status.""" + + task: Task + old: TypeStatus + new: TypeStatus + + +class EventBus: + """In-process pub/sub. Subscribers run in registration order.""" + + def __init__(self) -> None: + self._subs: list[Callable[[Any], None]] = [] + + def subscribe(self, handler: Callable[[Any], None]) -> None: + self._subs.append(handler) + + def emit(self, event: Any) -> None: + for handler in self._subs: + try: + handler(event) + except Exception: + logger.exception("event handler failed") diff --git a/dotflow/core/subscribers.py b/dotflow/core/subscribers.py new file mode 100644 index 00000000..dfe3634d --- /dev/null +++ b/dotflow/core/subscribers.py @@ -0,0 +1,62 @@ +"""Default subscribers for the internal event bus.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from dotflow.core.events import StatusChanged +from dotflow.core.types.status import TypeStatus + +if TYPE_CHECKING: + from dotflow.abc.log import Log + from dotflow.abc.metrics import Metrics + from dotflow.abc.notify import Notify + + +class NotifySubscriber: + """Forwards status changes to the configured notify provider.""" + + def __init__(self, notify: Notify): + self.notify = notify + + def __call__(self, event): + if not isinstance(event, StatusChanged): + return + + self.notify.hook_status_task(task=event.task) + + +class LogSubscriber: + """Routes status changes to the configured log provider.""" + + def __init__(self, log: Log): + self.log = log + + def __call__(self, event): + if not isinstance(event, StatusChanged): + return + + if event.new == TypeStatus.FAILED: + self.log.error(task=event.task) + elif event.new == TypeStatus.RETRY: + self.log.warning(task=event.task) + else: + self.log.info(task=event.task) + + +class MetricsSubscriber: + """Updates task counters on status transitions.""" + + def __init__(self, metrics: Metrics): + self.metrics = metrics + + def __call__(self, event): + if not isinstance(event, StatusChanged): + return + + if event.new == TypeStatus.FAILED: + self.metrics.task_failed(task=event.task) + elif event.new == TypeStatus.RETRY: + self.metrics.task_retried(task=event.task) + elif event.new == TypeStatus.COMPLETED: + self.metrics.task_completed(task=event.task) diff --git a/dotflow/core/task.py b/dotflow/core/task.py index 7ef2f1c9..7534005c 100644 --- a/dotflow/core/task.py +++ b/dotflow/core/task.py @@ -13,6 +13,7 @@ from dotflow.core.action import Action from dotflow.core.config import Config from dotflow.core.context import Context +from dotflow.core.events import StatusChanged from dotflow.core.exception import ( MissingActionDecorator, NotCallableObject, @@ -254,21 +255,10 @@ def status(self): @status.setter def status(self, value: TypeStatus) -> None: + old = self._status self._status = value - self.config.notify.hook_status_task(task=self) - - if value == TypeStatus.FAILED: - self.config.log.error(task=self) - self.config.metrics.task_failed(task=self) - elif value == TypeStatus.RETRY: - self.config.log.warning(task=self) - self.config.metrics.task_retried(task=self) - elif value == TypeStatus.COMPLETED: - self.config.log.info(task=self) - self.config.metrics.task_completed(task=self) - else: - self.config.log.info(task=self) + self.config.events.emit(StatusChanged(task=self, old=old, new=value)) @property def config(self): diff --git a/tests/core/test_events.py b/tests/core/test_events.py new file mode 100644 index 00000000..5ca8344f --- /dev/null +++ b/tests/core/test_events.py @@ -0,0 +1,53 @@ +"""Test EventBus and StatusChanged.""" + +import unittest +from unittest.mock import Mock + +from dotflow.core.events import EventBus, StatusChanged + + +class TestEventBus(unittest.TestCase): + def test_emit_runs_every_subscriber(self): + bus = EventBus() + + a = Mock() + b = Mock() + + bus.subscribe(a) + bus.subscribe(b) + bus.emit("payload") + + a.assert_called_once_with("payload") + b.assert_called_once_with("payload") + + def test_subscriber_failure_does_not_block_others(self): + bus = EventBus() + + bus.subscribe(Mock(side_effect=RuntimeError("boom"))) + survivor = Mock() + bus.subscribe(survivor) + + bus.emit("payload") + + survivor.assert_called_once_with("payload") + + def test_subscribers_run_in_registration_order(self): + bus = EventBus() + calls = [] + + bus.subscribe(lambda _e: calls.append("a")) + bus.subscribe(lambda _e: calls.append("b")) + bus.subscribe(lambda _e: calls.append("c")) + + bus.emit("x") + + self.assertEqual(calls, ["a", "b", "c"]) + + +class TestStatusChanged(unittest.TestCase): + def test_carries_task_old_new(self): + event = StatusChanged(task="t", old="OLD", new="NEW") + + self.assertEqual(event.task, "t") + self.assertEqual(event.old, "OLD") + self.assertEqual(event.new, "NEW") diff --git a/tests/core/test_subscribers.py b/tests/core/test_subscribers.py new file mode 100644 index 00000000..668093dd --- /dev/null +++ b/tests/core/test_subscribers.py @@ -0,0 +1,90 @@ +"""Test default subscribers.""" + +import unittest +from unittest.mock import Mock + +from dotflow.core.events import StatusChanged +from dotflow.core.subscribers import ( + LogSubscriber, + MetricsSubscriber, + NotifySubscriber, +) +from dotflow.core.types.status import TypeStatus + + +def _event(new): + return StatusChanged(task="task", old=TypeStatus.IN_PROGRESS, new=new) + + +class TestNotifySubscriber(unittest.TestCase): + def test_calls_hook_status_task(self): + notify = Mock() + sub = NotifySubscriber(notify) + + sub(_event(TypeStatus.COMPLETED)) + + notify.hook_status_task.assert_called_once_with(task="task") + + def test_ignores_unknown_event(self): + notify = Mock() + sub = NotifySubscriber(notify) + + sub("not a status change") + + notify.hook_status_task.assert_not_called() + + +class TestLogSubscriber(unittest.TestCase): + def test_failed_uses_error(self): + log = Mock() + LogSubscriber(log)(_event(TypeStatus.FAILED)) + + log.error.assert_called_once_with(task="task") + log.warning.assert_not_called() + log.info.assert_not_called() + + def test_retry_uses_warning(self): + log = Mock() + LogSubscriber(log)(_event(TypeStatus.RETRY)) + + log.warning.assert_called_once_with(task="task") + + def test_completed_uses_info(self): + log = Mock() + LogSubscriber(log)(_event(TypeStatus.COMPLETED)) + + log.info.assert_called_once_with(task="task") + + def test_other_status_uses_info(self): + log = Mock() + LogSubscriber(log)(_event(TypeStatus.IN_PROGRESS)) + + log.info.assert_called_once_with(task="task") + + +class TestMetricsSubscriber(unittest.TestCase): + def test_failed_calls_task_failed(self): + metrics = Mock() + MetricsSubscriber(metrics)(_event(TypeStatus.FAILED)) + + metrics.task_failed.assert_called_once_with(task="task") + + def test_retry_calls_task_retried(self): + metrics = Mock() + MetricsSubscriber(metrics)(_event(TypeStatus.RETRY)) + + metrics.task_retried.assert_called_once_with(task="task") + + def test_completed_calls_task_completed(self): + metrics = Mock() + MetricsSubscriber(metrics)(_event(TypeStatus.COMPLETED)) + + metrics.task_completed.assert_called_once_with(task="task") + + def test_other_status_no_op(self): + metrics = Mock() + MetricsSubscriber(metrics)(_event(TypeStatus.IN_PROGRESS)) + + metrics.task_failed.assert_not_called() + metrics.task_retried.assert_not_called() + metrics.task_completed.assert_not_called()