Labels: enhancement
Context
dotflow/core/workflow.py:459-476 spawns one multiprocessing.Process per task without any cap:
def run(self):
previous_context = Context(workflow_id=self.workflow_id)
for task in self.tasks:
process = _mp.Process(target=self._run_task, args=(...))
process.start()
self._processes.append(process)
for process in self._processes:
process.join()
A workflow with 1,000 tasks attempts to launch 1,000 OS processes. Effects: OOM, exhausted database connection pool, upstream API rate-limit exhaustion.
Concept
Cap the number of in-flight workers via a process pool. Allow the cap to be set globally on Config or per-step on the decorator.
API sketch
# Global default
config = Config(max_concurrency=20)
# Per-call override
workflow.start(mode="parallel", max_workers=20)
# Per-step (limits this step regardless of pool)
@action(max_concurrency=5)
def write_to_db(): ...
Implementation sketch
Replace ad-hoc Process(...).start() with multiprocessing.Pool (or ProcessPoolExecutor). For per-step limits, gate execution with a named multiprocessing.Semaphore keyed by step name.
Acceptance criteria
Labels:
enhancementContext
dotflow/core/workflow.py:459-476spawns onemultiprocessing.Processper task without any cap:A workflow with 1,000 tasks attempts to launch 1,000 OS processes. Effects: OOM, exhausted database connection pool, upstream API rate-limit exhaustion.
Concept
Cap the number of in-flight workers via a process pool. Allow the cap to be set globally on
Configor per-step on the decorator.API sketch
Implementation sketch
Replace ad-hoc
Process(...).start()withmultiprocessing.Pool(orProcessPoolExecutor). For per-step limits, gate execution with a namedmultiprocessing.Semaphorekeyed by step name.Acceptance criteria
Config(max_concurrency=N)accepted and honoredworkflow.start(mode="parallel", max_workers=N)honored@action(max_concurrency=N)honoredos.cpu_count()(or 1 if unknown)max_concurrency=4never hasmore than 4 active subprocesses at any moment