Skip to content

Bound concurrency in the Parallel executor #293

Description

@FernandoCelmer

Labels: enhancement

Context

dotflow/core/workflow.py:459-476 spawns one multiprocessing.Process per task without any cap:

def run(self):
    previous_context = Context(workflow_id=self.workflow_id)
    for task in self.tasks:
        process = _mp.Process(target=self._run_task, args=(...))
        process.start()
        self._processes.append(process)
    for process in self._processes:
        process.join()

A workflow with 1,000 tasks attempts to launch 1,000 OS processes. Effects: OOM, exhausted database connection pool, upstream API rate-limit exhaustion.

Concept

Cap the number of in-flight workers via a process pool. Allow the cap to be set globally on Config or per-step on the decorator.

API sketch

# Global default
config = Config(max_concurrency=20)

# Per-call override
workflow.start(mode="parallel", max_workers=20)

# Per-step (limits this step regardless of pool)
@action(max_concurrency=5)
def write_to_db(): ...

Implementation sketch

Replace ad-hoc Process(...).start() with multiprocessing.Pool (or ProcessPoolExecutor). For per-step limits, gate execution with a named multiprocessing.Semaphore keyed by step name.

Acceptance criteria

  • Config(max_concurrency=N) accepted and honored
  • workflow.start(mode="parallel", max_workers=N) honored
  • @action(max_concurrency=N) honored
  • Default cap is os.cpu_count() (or 1 if unknown)
  • Test: spawning 100 tasks with max_concurrency=4 never has
    more than 4 active subprocesses at any moment

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions