Skip to content

Move checkpoint persistence out of Task.current_context setter #286

Description

@FernandoCelmer

Labels: enhancement, significant

Context

dotflow/core/task.py:178-187 performs remote I/O inside a property setter:

@current_context.setter
def current_context(self, value: Context):
    self._current_context = Context(
        task_id=self.task_id, workflow_id=self.workflow_id, storage=value
    )
    self.config.storage.post(
        key=self.config.storage.key(task=self),
        context=self.current_context,
    )

Problems

  • A property setter is conventionally a cheap, predictable mutation.
    Here, every assignment triggers an HTTP/PUT (StorageS3, StorageGCS) — surprising and opaque to readers.
  • The engine assigns current_context at multiple points
    (engine.py:86, 115, 138, 175) which means N round-trips per task.
  • Failure in storage raises inside the setter, mixing object
    state mutation with I/O failure.
  • Unit tests for Task cannot mutate current_context without a
    full storage mock.
  • Issue Idempotency policy for workflow re-submission with same workflow_id but different initial_context #281 needs to persist a fingerprint alongside the context;
    the setter has no clean place to receive it.

Concept

Separate state mutation from persistence.

  1. The setter only updates _current_context.
  2. The engine commits the checkpoint once per task, after the
    step completes successfully.

Proposed change

# dotflow/core/task.py
@current_context.setter
def current_context(self, value: Context):
    self._current_context = Context(
        task_id=self.task_id,
        workflow_id=self.workflow_id,
        storage=value,
    )
    # no I/O

# dotflow/core/engine.py — inside execute_with_retry, after success
def execute_with_retry(self):
    ...
    for attempt in range(1, max_attempts + 1):
        try:
            result = self._execute_single()
            self.task.current_context = result
            self._commit_checkpoint()
            return result
        except Exception:
            ...

def _commit_checkpoint(self):
    storage = self.task.config.storage
    storage.post(
        key=storage.key(task=self.task),
        context=self.task.current_context,
        # in a follow-up: fingerprint=... for #281
    )

Acceptance criteria

  • current_context setter no longer calls storage.post
  • Engine commits checkpoint exactly once per successful attempt
  • Failed attempts do not write a checkpoint
  • Existing behavior preserved for StorageDefault, StorageFile,
    StorageS3, StorageGCS (verified by existing test suite)
  • Unit test: assigning current_context does not call any
    storage method when the storage is a fresh mock

Optional follow-up

A BatchStorage decorator that buffers writes for very high-volume pipelines. Out of scope here.


Metadata

Metadata

Labels

Type

No type

Fields

No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions