Skip to content

Commit 3a4c24e

Browse files
committed
add second fix
1 parent ea6b398 commit 3a4c24e

3 files changed

Lines changed: 138 additions & 36 deletions

File tree

dimos/protocol/rpc/pubsubrpc.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,16 +169,18 @@ def stop(self) -> None:
169169
"""
170170
self._shutdown_thread_pool()
171171

172-
# Cleanup shared response subscriptions
172+
# Stop the LCM loop BEFORE unsubscribing. Doing it the other way
173+
# triggers a NULL-deref in LCM's C code. This order is safer because
174+
# unsub() is either a no-op (LCM destroyed) or safe (no concurrent
175+
# dispatch).
176+
if hasattr(super(), "stop"):
177+
super().stop() # type: ignore[misc]
178+
173179
with self._response_subs_lock:
174180
for unsub, _ in self._response_subs.values():
175181
unsub()
176182
self._response_subs.clear()
177183

178-
# Call parent stop if it exists
179-
if hasattr(super(), "stop"):
180-
super().stop() # type: ignore[misc]
181-
182184
def call(self, name: str, arguments: Args, cb: Callable | None): # type: ignore[no-untyped-def, type-arg]
183185
if cb is None:
184186
return self.call_nowait(name, arguments)
@@ -209,9 +211,7 @@ def shared_response_handler(msg: MsgT, _: TopicT) -> None:
209211
if res_id is None:
210212
return
211213

212-
# Look up callback for this msg_id
213-
with self._response_subs_lock:
214-
callback = callbacks_dict.pop(res_id, None)
214+
callback = callbacks_dict.pop(res_id, None)
215215

216216
if callback is None:
217217
return # No callback registered (already handled or timed out)

dimos/protocol/service/lcmservice.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class LCMService(Service):
6363
config: LCMConfig
6464
l: lcm_mod.LCM | None
6565
_stop_event: threading.Event
66-
_l_lock: threading.Lock
66+
_l_lock: threading.RLock
6767
_thread: threading.Thread | None
6868
_call_thread_pool: ThreadPoolExecutor | None = None
6969
_call_thread_pool_lock: threading.RLock = threading.RLock()
@@ -77,7 +77,7 @@ def __init__(self, **kwargs: Any) -> None:
7777
else:
7878
self.l = lcm_mod.LCM(self.config.url) if self.config.url else lcm_mod.LCM()
7979

80-
self._l_lock = threading.Lock()
80+
self._l_lock = threading.RLock()
8181
self._stop_event = threading.Event()
8282
self._thread = None
8383

@@ -100,7 +100,7 @@ def __setstate__(self, state) -> None: # type: ignore[no-untyped-def]
100100
self.l = None
101101
self._stop_event = threading.Event()
102102
self._thread = None
103-
self._l_lock = threading.Lock()
103+
self._l_lock = threading.RLock()
104104
self._call_thread_pool = None
105105
self._call_thread_pool_lock = threading.RLock()
106106

@@ -120,31 +120,44 @@ def start(self) -> None:
120120
return
121121

122122
# Reinitialize LCM if it's None (e.g., after unpickling)
123-
if self.l is None:
123+
l = self.l
124+
if l is None:
124125
if self.config.lcm:
125-
self.l = self.config.lcm
126+
l = self.config.lcm
126127
else:
127-
self.l = lcm_mod.LCM(self.config.url) if self.config.url else lcm_mod.LCM()
128+
l = lcm_mod.LCM(self.config.url) if self.config.url else lcm_mod.LCM()
129+
self.l = l
130+
131+
# Pre-warm LCM recv setup (including the up-to-10s self-test) now,
132+
# so _lcm_loop doesn't do it later while holding _l_lock, which
133+
# would block external publish/subscribe/unsubscribe calls.
134+
try:
135+
l.fileno()
136+
except Exception as e:
137+
logger.warning(f"LCM fileno pre-warm failed: {e}")
128138

129139
self._stop_event.clear()
130140
self._thread = threading.Thread(target=self._lcm_loop, daemon=True)
131141
self._thread.start()
132142

133143
def _lcm_loop(self) -> None:
134-
"""LCM message handling loop."""
144+
"""LCM message handling loop.
145+
146+
Holds _l_lock during handle_timeout so that external callers
147+
(publish/subscribe/unsubscribe on this LCMService from other threads)
148+
are serialized with dispatch. This prevents a data race in the LCM
149+
Python binding that segfaults otherwise.
150+
"""
135151
try:
136152
while not self._stop_event.is_set():
137153
with self._l_lock:
138-
l = self.l
139-
if l is None:
154+
if self.l is None:
140155
break
141-
try:
142-
# This doesn't have to be under a lock because the C
143-
# library has its own locking for this.
144-
l.handle_timeout(_LCM_LOOP_TIMEOUT)
145-
except Exception as e:
146-
stack_trace = traceback.format_exc()
147-
print(f"Error in LCM handling: {e}\n{stack_trace}")
156+
try:
157+
self.l.handle_timeout(_LCM_LOOP_TIMEOUT)
158+
except Exception as e:
159+
stack_trace = traceback.format_exc()
160+
print(f"Error in LCM handling: {e}\n{stack_trace}")
148161
finally:
149162
self._cleanup_owned_lcm()
150163
with self._l_lock:

dimos/protocol/service/test_lcmservice.py

Lines changed: 101 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ def __init__(self) -> None:
5252
self.unsubscribe_called = threading.Event()
5353
self.subscription = BlockingSubscription()
5454

55+
def fileno(self) -> int:
56+
# Called by LCMService.start() to pre-warm LCM recv setup. A real LCM
57+
# sets up sockets and runs a self-test here; for the fake we just
58+
# return a sentinel fd since nothing will select() on it.
59+
return -1
60+
5561
def handle_timeout(self, _timeout: int) -> None:
5662
self.handle_entered.set()
5763
self.release_handle.wait(timeout=1.0)
@@ -323,7 +329,7 @@ def fake_pub_sub(fake_lcm):
323329
pubsub.stop()
324330

325331

326-
def test_publish_proceeds_during_handle_loop(fake_lcm, fake_pub_sub):
332+
def test_publish_waits_for_handle_loop(fake_lcm, fake_pub_sub):
327333
assert fake_lcm.handle_entered.wait(timeout=0.5)
328334

329335
publisher = threading.Thread(
@@ -332,14 +338,20 @@ def test_publish_proceeds_during_handle_loop(fake_lcm, fake_pub_sub):
332338
)
333339
publisher.start()
334340

335-
assert fake_lcm.publish_called.wait(timeout=0.5)
336-
publisher.join(timeout=1.0)
337-
assert not publisher.is_alive()
341+
# Loop holds _l_lock inside handle_timeout; publish waits for the lock.
342+
assert not fake_lcm.publish_called.wait(timeout=0.1)
343+
assert publisher.is_alive()
338344

345+
# Releasing handle_timeout drops _l_lock; publish now proceeds.
339346
fake_lcm.release_handle.set()
340347

348+
assert fake_lcm.publish_called.wait(timeout=1.0)
349+
publisher.join(timeout=1.0)
350+
assert not publisher.is_alive()
341351

342-
def test_subscribe_proceeds_during_handle_loop(fake_lcm, fake_pub_sub):
352+
353+
def test_subscribe_waits_for_handle_loop(fake_lcm, fake_pub_sub):
354+
"""subscribe() must block while the loop thread is inside handle_timeout()."""
343355
assert fake_lcm.handle_entered.wait(timeout=0.5)
344356

345357
subscriber = threading.Thread(
@@ -348,16 +360,27 @@ def test_subscribe_proceeds_during_handle_loop(fake_lcm, fake_pub_sub):
348360
)
349361
subscriber.start()
350362

351-
assert fake_lcm.subscribe_called.wait(timeout=0.5)
363+
assert not fake_lcm.subscribe_called.wait(timeout=0.1)
364+
assert subscriber.is_alive()
365+
366+
fake_lcm.release_handle.set()
367+
368+
assert fake_lcm.subscribe_called.wait(timeout=1.0)
352369
subscriber.join(timeout=1.0)
353370
assert not subscriber.is_alive()
354371
assert fake_lcm.subscription.queue_capacity == 10000
355372

356-
fake_lcm.release_handle.set()
357373

374+
def test_unsubscribe_waits_for_handle_loop(fake_lcm, fake_pub_sub):
375+
"""unsubscribe() must block while the loop thread is inside handle_timeout().
358376
359-
def test_unsubscribe_proceeds_during_handle_loop(fake_lcm, fake_pub_sub):
377+
This is the specific race whose resolution fixes the segfault in
378+
pylcm.c. Unsubscribing from another thread while dispatch is running
379+
would set subs_obj->lcm_obj = NULL under the nose of pylcm_msg_handler.
380+
"""
381+
# Let the first handle_timeout complete so we can subscribe cleanly.
360382
assert fake_lcm.handle_entered.wait(timeout=0.5)
383+
fake_lcm.release_handle.set()
361384

362385
unsubscribe_holder: dict[str, object] = {}
363386

@@ -366,20 +389,29 @@ def do_subscribe() -> None:
366389

367390
subscriber = threading.Thread(target=do_subscribe, daemon=True)
368391
subscriber.start()
369-
assert fake_lcm.subscribe_called.wait(timeout=0.5)
392+
assert fake_lcm.subscribe_called.wait(timeout=1.0)
370393
subscriber.join(timeout=1.0)
371394
assert not subscriber.is_alive()
372395

396+
# Reset gates so the next handle_timeout iteration blocks again.
397+
fake_lcm.handle_entered.clear()
398+
fake_lcm.release_handle.clear()
399+
assert fake_lcm.handle_entered.wait(timeout=1.0)
400+
373401
unsubscribe = unsubscribe_holder["fn"]
374402
unsub_thread = threading.Thread(target=unsubscribe, daemon=True) # type: ignore[arg-type]
375403
unsub_thread.start()
376404

377-
assert fake_lcm.unsubscribe_called.wait(timeout=0.5)
378-
unsub_thread.join(timeout=1.0)
379-
assert not unsub_thread.is_alive()
405+
# Loop holds _l_lock; unsubscribe waits for the lock.
406+
assert not fake_lcm.unsubscribe_called.wait(timeout=0.1)
407+
assert unsub_thread.is_alive()
380408

381409
fake_lcm.release_handle.set()
382410

411+
assert fake_lcm.unsubscribe_called.wait(timeout=1.0)
412+
unsub_thread.join(timeout=1.0)
413+
assert not unsub_thread.is_alive()
414+
383415

384416
def test_stop_from_within_lcm_thread(mocker):
385417
"""stop() called from inside handle_timeout must not deadlock and must
@@ -391,6 +423,9 @@ class SelfStoppingLCM:
391423
def __init__(self) -> None:
392424
self.done = threading.Event()
393425

426+
def fileno(self) -> int:
427+
return -1
428+
394429
def handle_timeout(self, _timeout: int) -> None:
395430
if not self.done.is_set():
396431
captured["thread"] = threading.current_thread()
@@ -420,3 +455,57 @@ def unsubscribe(self, *_args: object) -> None:
420455
assert not thread.is_alive()
421456
assert service.l is None
422457
assert service._thread is None
458+
459+
460+
def test_handler_can_publish_via_rlock_reentry(mocker):
461+
"""A message handler dispatched from handle_timeout runs on the loop
462+
thread while it already holds _l_lock. Reentry must work so the handler
463+
can call self.publish/subscribe/unsubscribe. This is why _l_lock is an
464+
RLock rather than a plain Lock.
465+
"""
466+
publish_calls: list[tuple[str, bytes]] = []
467+
handler_done = threading.Event()
468+
469+
class ReentrantLCM:
470+
def __init__(self) -> None:
471+
self._handler = None
472+
self._dispatched = False
473+
self._subscription = BlockingSubscription()
474+
475+
def fileno(self) -> int:
476+
return -1
477+
478+
def handle_timeout(self, _timeout: int) -> None:
479+
# Dispatch one fake message on the first call after a handler
480+
# has been registered. The handler will call self.publish, which
481+
# must reenter _l_lock recursively on the loop thread.
482+
if self._handler is not None and not self._dispatched:
483+
self._dispatched = True
484+
self._handler("/req", b"req-payload")
485+
handler_done.set()
486+
487+
def publish(self, channel: str, message: bytes) -> None:
488+
publish_calls.append((channel, message))
489+
490+
def subscribe(self, _channel, handler) -> BlockingSubscription:
491+
self._handler = handler
492+
return self._subscription
493+
494+
def unsubscribe(self, _subscription) -> None:
495+
pass
496+
497+
fake = ReentrantLCM()
498+
mocker.patch("dimos.protocol.service.lcmservice.lcm_mod.LCM", return_value=fake)
499+
500+
pubsub = LCMPubSubBase()
501+
pubsub.start()
502+
503+
def reentrant_callback(_msg: bytes, _topic: Topic) -> None:
504+
pubsub.publish(Topic("/res"), b"res-payload")
505+
506+
pubsub.subscribe(Topic("/req"), reentrant_callback)
507+
508+
assert handler_done.wait(timeout=2.0)
509+
pubsub.stop()
510+
511+
assert ("/res", b"res-payload") in publish_calls

0 commit comments

Comments
 (0)