Skip to content

Pluggable retry policy (jitter, cap, retry-on, circuit breaker) #289

Description

@FernandoCelmer

Labels: enhancement, discovery

Context

dotflow/core/engine.py:118-160 implements a fixed retry strategy:

current_delay = retry_delay
for attempt in range(1, max_attempts + 1):
    try:
        ...
    except Exception:
        sleep(current_delay)
        if backoff:
            current_delay *= 2

Limitations:

  • Doubles retry_delay without an upper cap.
  • No jitter — N tasks failing simultaneously will retry in lockstep
    ("thundering herd").
  • No way to declare which exceptions are retryable; a ValueError
    bug retries the same way as a transient ConnectionError.
  • No circuit breaker — when an upstream is down, every task burns
    its full retry budget before failing.
  • sleep() blocks the worker thread.

Concept

Introduce a RetryPolicy protocol. The @action decorator accepts a policy instance. A few default policies cover the common cases.

API sketch

from typing import Protocol

class RetryPolicy(Protocol):
    def should_retry(self, attempt: int, error: BaseException) -> bool: ...
    def next_delay(self, attempt: int) -> float: ...

class FixedRetry:
    def __init__(self, max_attempts: int, delay: float = 1.0,
                 retry_on: tuple = (Exception,)):
        ...

class ExponentialBackoff:
    def __init__(self, max_attempts: int = 5, base: float = 2,
                 cap: float = 60, jitter: bool = True,
                 retry_on: tuple = (Exception,)):
        ...

class CircuitBreaker:
    """Opens after `threshold` failures within `window` seconds.
    While open, retries fail fast for `reset_after` seconds, then
    half-open on next attempt."""
    def __init__(self, threshold: int = 5, window: float = 60,
                 reset_after: float = 60): ...

Usage:

@action(retry=ExponentialBackoff(max_attempts=5, jitter=True,
                                  retry_on=(ConnectionError, TimeoutError)))
def call_api(): ...

@action(retry=CircuitBreaker(threshold=10, reset_after=30))
def flaky_dependency(): ...

Backward compatibility

retry=int on the decorator continues to work and is internally mapped to FixedRetry(max_attempts=int).

Acceptance criteria

  • RetryPolicy protocol defined
  • FixedRetry, ExponentialBackoff, CircuitBreaker
    implemented in dotflow/core/retry.py
  • Action decorator accepts retry: int | RetryPolicy
  • TaskEngine.execute_with_retry delegates to the policy
  • Jitter is real (random.uniform(0.5, 1.5) of the delay)
  • retry_on short-circuits the loop for non-matching exceptions
  • Tests: jitter spread, retry-on filter, circuit open/half-open
    transitions

Future work

Async sleep when running under an async engine. Out of scope here.


Metadata

Metadata

Labels

Type

No type

Fields

No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions