Skip to content

Commit 4240573

Browse files
committed
refactor: defer AsyncModuleThread loop creation to start()
AsyncModuleThread no longer spawns the event loop thread in __init__. The loop is created on the first call to start(), which ModuleBase.start() now calls. This means module construction no longer has side effects — no threads are spawned until the module is explicitly started.
1 parent 218b72e commit 4240573

3 files changed

Lines changed: 26 additions & 5 deletions

File tree

dimos/core/module.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def start(self) -> None:
123123
if state == "stopped":
124124
raise RuntimeError(f"{type(self).__name__} cannot be restarted after stop")
125125
self.mod_state.set("started")
126+
self._async_thread.start()
126127

127128
@rpc
128129
def stop(self) -> None:

dimos/utils/test_thread_utils.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,9 +339,10 @@ def stop_it() -> None:
339339
def test_close_timeout_respected(self) -> None:
340340
"""If the thread ignores the stop signal, stop() should return after close_timeout."""
341341
mod = FakeModule()
342+
cancel = threading.Event()
342343

343344
def stubborn_target() -> None:
344-
time.sleep(10) # ignores stopping signal
345+
cancel.wait(10) # blocks but can be released for cleanup
345346

346347
mt = ModuleThread(
347348
module=mod, target=stubborn_target, name="test-timeout", close_timeout=0.2
@@ -351,6 +352,9 @@ def stubborn_target() -> None:
351352
mt.stop()
352353
elapsed = time.monotonic() - start
353354
assert elapsed < 1.0, f"stop() took {elapsed}s, expected ~0.2s"
355+
# Release the thread so it doesn't leak
356+
cancel.set()
357+
mt._thread.join(timeout=1.0)
354358

355359
def test_stop_concurrent_with_dispose(self) -> None:
356360
"""Calling stop() and dispose() concurrently should not crash."""
@@ -382,6 +386,7 @@ class TestAsyncModuleThread:
382386
def test_creates_loop_and_thread(self) -> None:
383387
mod = FakeModule()
384388
amt = AsyncModuleThread(module=mod)
389+
amt.start()
385390
assert amt.loop is not None
386391
assert amt.loop.is_running()
387392
assert amt.is_alive
@@ -391,13 +396,15 @@ def test_creates_loop_and_thread(self) -> None:
391396
def test_stop_idempotent(self) -> None:
392397
mod = FakeModule()
393398
amt = AsyncModuleThread(module=mod)
399+
amt.start()
394400
amt.stop()
395401
amt.stop() # should not raise
396402
amt.stop()
397403

398404
def test_dispose_stops_loop(self) -> None:
399405
mod = FakeModule()
400406
amt = AsyncModuleThread(module=mod)
407+
amt.start()
401408
assert amt.is_alive
402409
mod.dispose()
403410
time.sleep(0.1)
@@ -406,6 +413,7 @@ def test_dispose_stops_loop(self) -> None:
406413
def test_can_schedule_coroutine(self) -> None:
407414
mod = FakeModule()
408415
amt = AsyncModuleThread(module=mod)
416+
amt.start()
409417
result = []
410418

411419
async def coro() -> None:
@@ -420,6 +428,7 @@ def test_stop_with_pending_work(self) -> None:
420428
"""Stop should succeed even with long-running tasks on the loop."""
421429
mod = FakeModule()
422430
amt = AsyncModuleThread(module=mod)
431+
amt.start()
423432
started = threading.Event()
424433

425434
async def slow_coro() -> None:
@@ -437,6 +446,7 @@ async def slow_coro() -> None:
437446
def test_concurrent_stop(self) -> None:
438447
mod = FakeModule()
439448
amt = AsyncModuleThread(module=mod)
449+
amt.start()
440450
errors = []
441451

442452
def stop_it() -> None:

dimos/utils/thread_utils.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,18 @@ def __init__(
201201
self._stopped = ThreadSafeVal(False)
202202
self._owns_loop = False
203203
self._thread: threading.Thread | None = None
204+
self._loop: asyncio.AbstractEventLoop | None = None
205+
self._module_name = type(module).__name__
206+
207+
module._disposables.add(Disposable(self.stop))
208+
209+
def start(self) -> None:
210+
"""Create (or reuse) the event loop and start the driver thread.
211+
212+
Safe to call multiple times — subsequent calls are no-ops.
213+
"""
214+
if self._loop is not None:
215+
return
204216

205217
try:
206218
self._loop = asyncio.get_running_loop()
@@ -211,12 +223,10 @@ def __init__(
211223
self._thread = threading.Thread(
212224
target=self._loop.run_forever,
213225
daemon=True,
214-
name=f"{type(module).__name__}-event-loop",
226+
name=f"{self._module_name}-event-loop",
215227
)
216228
self._thread.start()
217229

218-
module._disposables.add(Disposable(self.stop))
219-
220230
@property
221231
def loop(self) -> asyncio.AbstractEventLoop:
222232
"""The managed event loop."""
@@ -237,7 +247,7 @@ def stop(self) -> None:
237247
return
238248
self._stopped.set(True)
239249

240-
if self._owns_loop and self._loop.is_running():
250+
if self._owns_loop and self._loop is not None and self._loop.is_running():
241251
self._loop.call_soon_threadsafe(self._loop.stop)
242252

243253
if self._thread is not None and self._thread.is_alive():

0 commit comments

Comments
 (0)