Skip to content

Commit c6f11da

Browse files
committed
Revert "add second fix"
This reverts commit 3a4c24e.
1 parent 3a4c24e commit c6f11da

3 files changed

Lines changed: 36 additions & 138 deletions

File tree

dimos/protocol/rpc/pubsubrpc.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,18 +169,16 @@ def stop(self) -> None:
169169
"""
170170
self._shutdown_thread_pool()
171171

172-
# Stop the LCM loop BEFORE unsubscribing. Doing it the other way
173-
# triggers a NULL-deref in LCM's C code. This order is safer because
174-
# unsub() is either a no-op (LCM destroyed) or safe (no concurrent
175-
# dispatch).
176-
if hasattr(super(), "stop"):
177-
super().stop() # type: ignore[misc]
178-
172+
# Cleanup shared response subscriptions
179173
with self._response_subs_lock:
180174
for unsub, _ in self._response_subs.values():
181175
unsub()
182176
self._response_subs.clear()
183177

178+
# Call parent stop if it exists
179+
if hasattr(super(), "stop"):
180+
super().stop() # type: ignore[misc]
181+
184182
def call(self, name: str, arguments: Args, cb: Callable | None): # type: ignore[no-untyped-def, type-arg]
185183
if cb is None:
186184
return self.call_nowait(name, arguments)
@@ -211,7 +209,9 @@ def shared_response_handler(msg: MsgT, _: TopicT) -> None:
211209
if res_id is None:
212210
return
213211

214-
callback = callbacks_dict.pop(res_id, None)
212+
# Look up callback for this msg_id
213+
with self._response_subs_lock:
214+
callback = callbacks_dict.pop(res_id, None)
215215

216216
if callback is None:
217217
return # No callback registered (already handled or timed out)

dimos/protocol/service/lcmservice.py

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class LCMService(Service):
6363
config: LCMConfig
6464
l: lcm_mod.LCM | None
6565
_stop_event: threading.Event
66-
_l_lock: threading.RLock
66+
_l_lock: threading.Lock
6767
_thread: threading.Thread | None
6868
_call_thread_pool: ThreadPoolExecutor | None = None
6969
_call_thread_pool_lock: threading.RLock = threading.RLock()
@@ -77,7 +77,7 @@ def __init__(self, **kwargs: Any) -> None:
7777
else:
7878
self.l = lcm_mod.LCM(self.config.url) if self.config.url else lcm_mod.LCM()
7979

80-
self._l_lock = threading.RLock()
80+
self._l_lock = threading.Lock()
8181
self._stop_event = threading.Event()
8282
self._thread = None
8383

@@ -100,7 +100,7 @@ def __setstate__(self, state) -> None: # type: ignore[no-untyped-def]
100100
self.l = None
101101
self._stop_event = threading.Event()
102102
self._thread = None
103-
self._l_lock = threading.RLock()
103+
self._l_lock = threading.Lock()
104104
self._call_thread_pool = None
105105
self._call_thread_pool_lock = threading.RLock()
106106

@@ -120,44 +120,31 @@ def start(self) -> None:
120120
return
121121

122122
# Reinitialize LCM if it's None (e.g., after unpickling)
123-
l = self.l
124-
if l is None:
123+
if self.l is None:
125124
if self.config.lcm:
126-
l = self.config.lcm
125+
self.l = self.config.lcm
127126
else:
128-
l = lcm_mod.LCM(self.config.url) if self.config.url else lcm_mod.LCM()
129-
self.l = l
130-
131-
# Pre-warm LCM recv setup (including the up-to-10s self-test) now,
132-
# so _lcm_loop doesn't do it later while holding _l_lock, which
133-
# would block external publish/subscribe/unsubscribe calls.
134-
try:
135-
l.fileno()
136-
except Exception as e:
137-
logger.warning(f"LCM fileno pre-warm failed: {e}")
127+
self.l = lcm_mod.LCM(self.config.url) if self.config.url else lcm_mod.LCM()
138128

139129
self._stop_event.clear()
140130
self._thread = threading.Thread(target=self._lcm_loop, daemon=True)
141131
self._thread.start()
142132

143133
def _lcm_loop(self) -> None:
144-
"""LCM message handling loop.
145-
146-
Holds _l_lock during handle_timeout so that external callers
147-
(publish/subscribe/unsubscribe on this LCMService from other threads)
148-
are serialized with dispatch. This prevents a data race in the LCM
149-
Python binding that segfaults otherwise.
150-
"""
134+
"""LCM message handling loop."""
151135
try:
152136
while not self._stop_event.is_set():
153137
with self._l_lock:
154-
if self.l is None:
138+
l = self.l
139+
if l is None:
155140
break
156-
try:
157-
self.l.handle_timeout(_LCM_LOOP_TIMEOUT)
158-
except Exception as e:
159-
stack_trace = traceback.format_exc()
160-
print(f"Error in LCM handling: {e}\n{stack_trace}")
141+
try:
142+
# This doesn't have to be under a lock because the C
143+
# library has its own locking for this.
144+
l.handle_timeout(_LCM_LOOP_TIMEOUT)
145+
except Exception as e:
146+
stack_trace = traceback.format_exc()
147+
print(f"Error in LCM handling: {e}\n{stack_trace}")
161148
finally:
162149
self._cleanup_owned_lcm()
163150
with self._l_lock:

dimos/protocol/service/test_lcmservice.py

Lines changed: 12 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,6 @@ def __init__(self) -> None:
5252
self.unsubscribe_called = threading.Event()
5353
self.subscription = BlockingSubscription()
5454

55-
def fileno(self) -> int:
56-
# Called by LCMService.start() to pre-warm LCM recv setup. A real LCM
57-
# sets up sockets and runs a self-test here; for the fake we just
58-
# return a sentinel fd since nothing will select() on it.
59-
return -1
60-
6155
def handle_timeout(self, _timeout: int) -> None:
6256
self.handle_entered.set()
6357
self.release_handle.wait(timeout=1.0)
@@ -329,7 +323,7 @@ def fake_pub_sub(fake_lcm):
329323
pubsub.stop()
330324

331325

332-
def test_publish_waits_for_handle_loop(fake_lcm, fake_pub_sub):
326+
def test_publish_proceeds_during_handle_loop(fake_lcm, fake_pub_sub):
333327
assert fake_lcm.handle_entered.wait(timeout=0.5)
334328

335329
publisher = threading.Thread(
@@ -338,20 +332,14 @@ def test_publish_waits_for_handle_loop(fake_lcm, fake_pub_sub):
338332
)
339333
publisher.start()
340334

341-
# Loop holds _l_lock inside handle_timeout; publish waits for the lock.
342-
assert not fake_lcm.publish_called.wait(timeout=0.1)
343-
assert publisher.is_alive()
344-
345-
# Releasing handle_timeout drops _l_lock; publish now proceeds.
346-
fake_lcm.release_handle.set()
347-
348-
assert fake_lcm.publish_called.wait(timeout=1.0)
335+
assert fake_lcm.publish_called.wait(timeout=0.5)
349336
publisher.join(timeout=1.0)
350337
assert not publisher.is_alive()
351338

339+
fake_lcm.release_handle.set()
352340

353-
def test_subscribe_waits_for_handle_loop(fake_lcm, fake_pub_sub):
354-
"""subscribe() must block while the loop thread is inside handle_timeout()."""
341+
342+
def test_subscribe_proceeds_during_handle_loop(fake_lcm, fake_pub_sub):
355343
assert fake_lcm.handle_entered.wait(timeout=0.5)
356344

357345
subscriber = threading.Thread(
@@ -360,27 +348,16 @@ def test_subscribe_waits_for_handle_loop(fake_lcm, fake_pub_sub):
360348
)
361349
subscriber.start()
362350

363-
assert not fake_lcm.subscribe_called.wait(timeout=0.1)
364-
assert subscriber.is_alive()
365-
366-
fake_lcm.release_handle.set()
367-
368-
assert fake_lcm.subscribe_called.wait(timeout=1.0)
351+
assert fake_lcm.subscribe_called.wait(timeout=0.5)
369352
subscriber.join(timeout=1.0)
370353
assert not subscriber.is_alive()
371354
assert fake_lcm.subscription.queue_capacity == 10000
372355

356+
fake_lcm.release_handle.set()
373357

374-
def test_unsubscribe_waits_for_handle_loop(fake_lcm, fake_pub_sub):
375-
"""unsubscribe() must block while the loop thread is inside handle_timeout().
376358

377-
This is the specific race whose resolution fixes the segfault in
378-
pylcm.c. Unsubscribing from another thread while dispatch is running
379-
would set subs_obj->lcm_obj = NULL under the nose of pylcm_msg_handler.
380-
"""
381-
# Let the first handle_timeout complete so we can subscribe cleanly.
359+
def test_unsubscribe_proceeds_during_handle_loop(fake_lcm, fake_pub_sub):
382360
assert fake_lcm.handle_entered.wait(timeout=0.5)
383-
fake_lcm.release_handle.set()
384361

385362
unsubscribe_holder: dict[str, object] = {}
386363

@@ -389,29 +366,20 @@ def do_subscribe() -> None:
389366

390367
subscriber = threading.Thread(target=do_subscribe, daemon=True)
391368
subscriber.start()
392-
assert fake_lcm.subscribe_called.wait(timeout=1.0)
369+
assert fake_lcm.subscribe_called.wait(timeout=0.5)
393370
subscriber.join(timeout=1.0)
394371
assert not subscriber.is_alive()
395372

396-
# Reset gates so the next handle_timeout iteration blocks again.
397-
fake_lcm.handle_entered.clear()
398-
fake_lcm.release_handle.clear()
399-
assert fake_lcm.handle_entered.wait(timeout=1.0)
400-
401373
unsubscribe = unsubscribe_holder["fn"]
402374
unsub_thread = threading.Thread(target=unsubscribe, daemon=True) # type: ignore[arg-type]
403375
unsub_thread.start()
404376

405-
# Loop holds _l_lock; unsubscribe waits for the lock.
406-
assert not fake_lcm.unsubscribe_called.wait(timeout=0.1)
407-
assert unsub_thread.is_alive()
408-
409-
fake_lcm.release_handle.set()
410-
411-
assert fake_lcm.unsubscribe_called.wait(timeout=1.0)
377+
assert fake_lcm.unsubscribe_called.wait(timeout=0.5)
412378
unsub_thread.join(timeout=1.0)
413379
assert not unsub_thread.is_alive()
414380

381+
fake_lcm.release_handle.set()
382+
415383

416384
def test_stop_from_within_lcm_thread(mocker):
417385
"""stop() called from inside handle_timeout must not deadlock and must
@@ -423,9 +391,6 @@ class SelfStoppingLCM:
423391
def __init__(self) -> None:
424392
self.done = threading.Event()
425393

426-
def fileno(self) -> int:
427-
return -1
428-
429394
def handle_timeout(self, _timeout: int) -> None:
430395
if not self.done.is_set():
431396
captured["thread"] = threading.current_thread()
@@ -455,57 +420,3 @@ def unsubscribe(self, *_args: object) -> None:
455420
assert not thread.is_alive()
456421
assert service.l is None
457422
assert service._thread is None
458-
459-
460-
def test_handler_can_publish_via_rlock_reentry(mocker):
461-
"""A message handler dispatched from handle_timeout runs on the loop
462-
thread while it already holds _l_lock. Reentry must work so the handler
463-
can call self.publish/subscribe/unsubscribe. This is why _l_lock is an
464-
RLock rather than a plain Lock.
465-
"""
466-
publish_calls: list[tuple[str, bytes]] = []
467-
handler_done = threading.Event()
468-
469-
class ReentrantLCM:
470-
def __init__(self) -> None:
471-
self._handler = None
472-
self._dispatched = False
473-
self._subscription = BlockingSubscription()
474-
475-
def fileno(self) -> int:
476-
return -1
477-
478-
def handle_timeout(self, _timeout: int) -> None:
479-
# Dispatch one fake message on the first call after a handler
480-
# has been registered. The handler will call self.publish, which
481-
# must reenter _l_lock recursively on the loop thread.
482-
if self._handler is not None and not self._dispatched:
483-
self._dispatched = True
484-
self._handler("/req", b"req-payload")
485-
handler_done.set()
486-
487-
def publish(self, channel: str, message: bytes) -> None:
488-
publish_calls.append((channel, message))
489-
490-
def subscribe(self, _channel, handler) -> BlockingSubscription:
491-
self._handler = handler
492-
return self._subscription
493-
494-
def unsubscribe(self, _subscription) -> None:
495-
pass
496-
497-
fake = ReentrantLCM()
498-
mocker.patch("dimos.protocol.service.lcmservice.lcm_mod.LCM", return_value=fake)
499-
500-
pubsub = LCMPubSubBase()
501-
pubsub.start()
502-
503-
def reentrant_callback(_msg: bytes, _topic: Topic) -> None:
504-
pubsub.publish(Topic("/res"), b"res-payload")
505-
506-
pubsub.subscribe(Topic("/req"), reentrant_callback)
507-
508-
assert handler_done.wait(timeout=2.0)
509-
pubsub.stop()
510-
511-
assert ("/res", b"res-payload") in publish_calls

0 commit comments

Comments
 (0)