Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions dotflow/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
from dotflow.abc.server import Server
from dotflow.abc.storage import Storage
from dotflow.abc.tracer import Tracer
from dotflow.core.events import EventBus
from dotflow.core.exception import NotCallableObject
from dotflow.core.subscribers import (
LogSubscriber,
MetricsSubscriber,
NotifySubscriber,
)
from dotflow.providers.log_default import LogDefault
from dotflow.providers.metrics_default import MetricsDefault
from dotflow.providers.notify_default import NotifyDefault
Expand Down Expand Up @@ -91,6 +97,11 @@ def __init__(

self._validate()

self.events = EventBus()
self.events.subscribe(LogSubscriber(self.log))
self.events.subscribe(NotifySubscriber(self.notify))
self.events.subscribe(MetricsSubscriber(self.metrics))

def _validate(self) -> None:
for name, abc in self._PROVIDERS.items():
value = getattr(self, name)
Expand Down
39 changes: 39 additions & 0 deletions dotflow/core/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Event bus"""

from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

from dotflow.logging import logger

if TYPE_CHECKING:
from dotflow.core.task import Task
from dotflow.core.types.status import TypeStatus


@dataclass
class StatusChanged:
"""Emitted when a task transitions to a new status."""

task: Task
old: TypeStatus
new: TypeStatus

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion]

Problem — The old field is typed as TypeStatus, but the very first status transition on every Task emits StatusChanged(old=None, ...) because Task._status is initialised to None in TaskInstance.__init__ and the status setter captures old = self._status before any value has been set.

The type annotation creates a false contract: anyone writing a custom subscriber who reads event.old and treats it as a guaranteed TypeStatus will get an AttributeError or silent None-comparison failure on the very first event.

Failure scenario

class AuditSubscriber:
    def __call__(self, event):
        if isinstance(event, StatusChanged):
            # Crashes with AttributeError on first task creation:
            # 'NoneType' object has no attribute 'value'
            print(f"Transition: {event.old.value}{event.new.value}")

Every Task(...) call immediately triggers this path (self.status = TypeStatus.NOT_STARTED in __init__), so the crash happens on construction, not during execution.

Fix — Narrow the annotation to reflect reality:

@dataclass
class StatusChanged:
    task: Task
    old: TypeStatus | None   # None on the very first transition
    new: TypeStatus

Alternatively, initialise Task._status to TypeStatus.NOT_STARTED in TaskInstance.__init__ so the field is never None, which would let the annotation stay as TypeStatus and also remove the if not self._status guard in the status getter.

ReferencesPEP 484 — Type Hints



class EventBus:
"""In-process pub/sub. Subscribers run in registration order."""

def __init__(self) -> None:
self._subs: list[Callable[[Any], None]] = []

def subscribe(self, handler: Callable[[Any], None]) -> None:
self._subs.append(handler)

def emit(self, event: Any) -> None:
for handler in self._subs:

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion]

Problememit passes event with a live reference to the Task object inside StatusChanged. Nothing prevents a subscriber from mutating task.status — which calls emit again — producing unbounded recursion that exhausts the call stack.

The three built-in subscribers (LogSubscriber, NotifySubscriber, MetricsSubscriber) are safe today, but the pattern is invisible to users implementing custom subscribers. The docstring says "Subscribers run in registration order" but says nothing about re-entrancy.

Failure scenario

class AutoRetrySubscriber:
    def __call__(self, event):
        if isinstance(event, StatusChanged):
            if event.new == TypeStatus.FAILED:
                # Attempting a retry — sets status → triggers emit → calls this again
                event.task.status = TypeStatus.RETRY  # RecursionError

Fix — Add a simple re-entrancy guard:

class EventBus:
    def __init__(self) -> None:
        self._subs: list[Callable[[Any], None]] = []
        self._emitting: bool = False

    def emit(self, event: Any) -> None:
        if self._emitting:
            return   # or raise, or queue for later — depends on desired semantics
        self._emitting = True
        try:
            for handler in self._subs:
                try:
                    handler(event)
                except Exception:
                    logger.exception("event handler failed")
        finally:
            self._emitting = False

Document the constraint clearly in the class docstring so that subscriber authors know status mutations inside a handler are not allowed (or are deferred).

ReferencesObserver pattern — Re-entrancy considerations

try:
handler(event)
except Exception:
logger.exception("event handler failed")
62 changes: 62 additions & 0 deletions dotflow/core/subscribers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Default subscribers for the internal event bus."""

from __future__ import annotations

from typing import TYPE_CHECKING

from dotflow.core.events import StatusChanged
from dotflow.core.types.status import TypeStatus

if TYPE_CHECKING:
from dotflow.abc.log import Log
from dotflow.abc.metrics import Metrics
from dotflow.abc.notify import Notify


class NotifySubscriber:
"""Forwards status changes to the configured notify provider."""

def __init__(self, notify: Notify):
self.notify = notify

def __call__(self, event):
if not isinstance(event, StatusChanged):
return

self.notify.hook_status_task(task=event.task)


class LogSubscriber:
"""Routes status changes to the configured log provider."""

def __init__(self, log: Log):
self.log = log

def __call__(self, event):
if not isinstance(event, StatusChanged):
return

if event.new == TypeStatus.FAILED:
self.log.error(task=event.task)
elif event.new == TypeStatus.RETRY:
self.log.warning(task=event.task)
else:
self.log.info(task=event.task)


class MetricsSubscriber:
"""Updates task counters on status transitions."""

def __init__(self, metrics: Metrics):
self.metrics = metrics

def __call__(self, event):
if not isinstance(event, StatusChanged):
return

if event.new == TypeStatus.FAILED:
self.metrics.task_failed(task=event.task)
elif event.new == TypeStatus.RETRY:
self.metrics.task_retried(task=event.task)
elif event.new == TypeStatus.COMPLETED:
self.metrics.task_completed(task=event.task)
16 changes: 3 additions & 13 deletions dotflow/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dotflow.core.action import Action
from dotflow.core.config import Config
from dotflow.core.context import Context
from dotflow.core.events import StatusChanged
from dotflow.core.exception import (
MissingActionDecorator,
NotCallableObject,
Expand Down Expand Up @@ -254,21 +255,10 @@ def status(self):

@status.setter
def status(self, value: TypeStatus) -> None:
old = self._status
self._status = value

self.config.notify.hook_status_task(task=self)

if value == TypeStatus.FAILED:
self.config.log.error(task=self)
self.config.metrics.task_failed(task=self)
elif value == TypeStatus.RETRY:
self.config.log.warning(task=self)
self.config.metrics.task_retried(task=self)
elif value == TypeStatus.COMPLETED:
self.config.log.info(task=self)
self.config.metrics.task_completed(task=self)
else:
self.config.log.info(task=self)
self.config.events.emit(StatusChanged(task=self, old=old, new=value))

@property
def config(self):
Expand Down
53 changes: 53 additions & 0 deletions tests/core/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Test EventBus and StatusChanged."""

import unittest
from unittest.mock import Mock

from dotflow.core.events import EventBus, StatusChanged


class TestEventBus(unittest.TestCase):
def test_emit_runs_every_subscriber(self):
bus = EventBus()

a = Mock()
b = Mock()

bus.subscribe(a)
bus.subscribe(b)
bus.emit("payload")

a.assert_called_once_with("payload")
b.assert_called_once_with("payload")

def test_subscriber_failure_does_not_block_others(self):
bus = EventBus()

bus.subscribe(Mock(side_effect=RuntimeError("boom")))
survivor = Mock()
bus.subscribe(survivor)

bus.emit("payload")

survivor.assert_called_once_with("payload")

def test_subscribers_run_in_registration_order(self):
bus = EventBus()
calls = []

bus.subscribe(lambda _e: calls.append("a"))
bus.subscribe(lambda _e: calls.append("b"))
bus.subscribe(lambda _e: calls.append("c"))

bus.emit("x")

self.assertEqual(calls, ["a", "b", "c"])


class TestStatusChanged(unittest.TestCase):
def test_carries_task_old_new(self):
event = StatusChanged(task="t", old="OLD", new="NEW")

self.assertEqual(event.task, "t")
self.assertEqual(event.old, "OLD")
self.assertEqual(event.new, "NEW")
90 changes: 90 additions & 0 deletions tests/core/test_subscribers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Test default subscribers."""

import unittest
from unittest.mock import Mock

from dotflow.core.events import StatusChanged
from dotflow.core.subscribers import (
LogSubscriber,
MetricsSubscriber,
NotifySubscriber,
)
from dotflow.core.types.status import TypeStatus


def _event(new):
return StatusChanged(task="task", old=TypeStatus.IN_PROGRESS, new=new)


class TestNotifySubscriber(unittest.TestCase):
def test_calls_hook_status_task(self):
notify = Mock()
sub = NotifySubscriber(notify)

sub(_event(TypeStatus.COMPLETED))

notify.hook_status_task.assert_called_once_with(task="task")

def test_ignores_unknown_event(self):
notify = Mock()
sub = NotifySubscriber(notify)

sub("not a status change")

notify.hook_status_task.assert_not_called()


class TestLogSubscriber(unittest.TestCase):
def test_failed_uses_error(self):
log = Mock()
LogSubscriber(log)(_event(TypeStatus.FAILED))

log.error.assert_called_once_with(task="task")
log.warning.assert_not_called()
log.info.assert_not_called()

def test_retry_uses_warning(self):
log = Mock()
LogSubscriber(log)(_event(TypeStatus.RETRY))

log.warning.assert_called_once_with(task="task")

def test_completed_uses_info(self):
log = Mock()
LogSubscriber(log)(_event(TypeStatus.COMPLETED))

log.info.assert_called_once_with(task="task")

def test_other_status_uses_info(self):
log = Mock()
LogSubscriber(log)(_event(TypeStatus.IN_PROGRESS))

log.info.assert_called_once_with(task="task")


class TestMetricsSubscriber(unittest.TestCase):
def test_failed_calls_task_failed(self):
metrics = Mock()
MetricsSubscriber(metrics)(_event(TypeStatus.FAILED))

metrics.task_failed.assert_called_once_with(task="task")

def test_retry_calls_task_retried(self):
metrics = Mock()
MetricsSubscriber(metrics)(_event(TypeStatus.RETRY))

metrics.task_retried.assert_called_once_with(task="task")

def test_completed_calls_task_completed(self):
metrics = Mock()
MetricsSubscriber(metrics)(_event(TypeStatus.COMPLETED))

metrics.task_completed.assert_called_once_with(task="task")

def test_other_status_no_op(self):
metrics = Mock()
MetricsSubscriber(metrics)(_event(TypeStatus.IN_PROGRESS))

metrics.task_failed.assert_not_called()
metrics.task_retried.assert_not_called()
metrics.task_completed.assert_not_called()
Loading