Skip to content

Commit 7fa805a

Browse files
committed
fix(lcm): segfault at cleanup
1 parent f39d615 commit 7fa805a

3 files changed

Lines changed: 395 additions & 235 deletions

File tree

dimos/protocol/pubsub/impl/lcmpubsub.py

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -84,50 +84,53 @@ class LCMPubSubBase(LCMService, AllPubSub[Topic, Any]):
8484

8585
def publish(self, topic: Topic | str, message: bytes) -> None:
8686
"""Publish a message to the specified channel."""
87-
if self.l is None:
87+
topic_str = str(topic) if isinstance(topic, Topic) else topic
88+
with self._l_lock:
89+
l = self.l
90+
if l is None:
8891
logger.error("Tried to publish after LCM was closed")
8992
return
90-
91-
topic_str = str(topic) if isinstance(topic, Topic) else topic
92-
self.l.publish(topic_str, message)
93+
l.publish(topic_str, message)
9394

9495
def subscribe_all(self, callback: Callable[[bytes, Topic], Any]) -> Callable[[], None]:
9596
return self.subscribe(Topic(re.compile(".*")), callback)
9697

9798
def subscribe(
9899
self, topic: Topic, callback: Callable[[bytes, Topic], None]
99100
) -> Callable[[], None]:
100-
if self.l is None:
101-
logger.error("Tried to subscribe after LCM was closed")
101+
with self._l_lock:
102+
if self.l is None:
103+
logger.error("Tried to subscribe after LCM was closed")
102104

103-
def noop() -> None:
104-
pass
105+
def noop() -> None:
106+
pass
105107

106-
return noop
108+
return noop
107109

108-
if topic.is_pattern:
110+
if topic.is_pattern:
109111

110-
def handler(channel: str, msg: bytes) -> None:
111-
if channel == "LCM_SELF_TEST":
112-
return
113-
callback(msg, Topic.from_channel_str(channel, topic.lcm_type))
112+
def handler(channel: str, msg: bytes) -> None:
113+
if channel == "LCM_SELF_TEST":
114+
return
115+
callback(msg, Topic.from_channel_str(channel, topic.lcm_type))
114116

115-
pattern_str = str(topic)
116-
if not pattern_str.endswith("*"):
117-
pattern_str = f"{pattern_str}(#.*)?"
117+
pattern_str = str(topic)
118+
if not pattern_str.endswith("*"):
119+
pattern_str = f"{pattern_str}(#.*)?"
118120

119-
lcm_subscription = self.l.subscribe(pattern_str, handler)
120-
else:
121-
topic_str = str(topic)
122-
lcm_subscription = self.l.subscribe(topic_str, lambda _, msg: callback(msg, topic))
121+
lcm_subscription = self.l.subscribe(pattern_str, handler)
122+
else:
123+
topic_str = str(topic)
124+
lcm_subscription = self.l.subscribe(topic_str, lambda _, msg: callback(msg, topic))
123125

124-
# Set queue capacity to 10000 to handle high-volume bursts
125-
lcm_subscription.set_queue_capacity(10000)
126+
# Set queue capacity to 10000 to handle high-volume bursts
127+
lcm_subscription.set_queue_capacity(10000)
126128

127129
def unsubscribe() -> None:
128-
if self.l is None:
129-
return
130-
self.l.unsubscribe(lcm_subscription)
130+
with self._l_lock:
131+
if self.l is None:
132+
return
133+
self.l.unsubscribe(lcm_subscription)
131134

132135
return unsubscribe
133136

dimos/protocol/service/lcmservice.py

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -104,47 +104,70 @@ def __setstate__(self, state) -> None: # type: ignore[no-untyped-def]
104104
self._call_thread_pool = None
105105
self._call_thread_pool_lock = threading.RLock()
106106

107+
def _cleanup_owned_lcm(self) -> None:
108+
"""Dispose of the internally-owned LCM instance."""
109+
if self.config.lcm:
110+
return
111+
112+
with self._l_lock:
113+
if self.l is not None:
114+
del self.l
115+
self.l = None
116+
107117
def start(self) -> None:
108-
# Reinitialize LCM if it's None (e.g., after unpickling)
109-
if self.l is None:
110-
if self.config.lcm:
111-
self.l = self.config.lcm
112-
else:
113-
self.l = lcm_mod.LCM(self.config.url) if self.config.url else lcm_mod.LCM()
114-
115-
self._stop_event.clear()
116-
self._thread = threading.Thread(target=self._lcm_loop)
117-
self._thread.daemon = True
118-
self._thread.start()
118+
with self._l_lock:
119+
if self._thread is not None and self._thread.is_alive():
120+
return
121+
122+
# Reinitialize LCM if it's None (e.g., after unpickling)
123+
if self.l is None:
124+
if self.config.lcm:
125+
self.l = self.config.lcm
126+
else:
127+
self.l = lcm_mod.LCM(self.config.url) if self.config.url else lcm_mod.LCM()
128+
129+
self._stop_event.clear()
130+
self._thread = threading.Thread(target=self._lcm_loop, daemon=True)
131+
self._thread.start()
119132

120133
def _lcm_loop(self) -> None:
121134
"""LCM message handling loop."""
122-
while not self._stop_event.is_set():
123-
try:
135+
try:
136+
while not self._stop_event.is_set():
124137
with self._l_lock:
125-
if self.l is None:
138+
l = self.l
139+
if l is None:
126140
break
127-
self.l.handle_timeout(_LCM_LOOP_TIMEOUT)
128-
except Exception as e:
129-
stack_trace = traceback.format_exc()
130-
print(f"Error in LCM handling: {e}\n{stack_trace}")
141+
try:
142+
# This doesn't have to be under a lock because the C
143+
# library has its own locking for this.
144+
l.handle_timeout(_LCM_LOOP_TIMEOUT)
145+
except Exception as e:
146+
stack_trace = traceback.format_exc()
147+
print(f"Error in LCM handling: {e}\n{stack_trace}")
148+
finally:
149+
self._cleanup_owned_lcm()
150+
with self._l_lock:
151+
if self._thread is threading.current_thread():
152+
self._thread = None
131153

132154
def stop(self) -> None:
133155
"""Stop the LCM loop."""
134156
self._stop_event.set()
135-
if self._thread is not None:
157+
thread = self._thread
158+
if thread is not None:
136159
# Only join if we're not the LCM thread (avoid "cannot join current thread")
137-
if threading.current_thread() != self._thread:
138-
self._thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT)
139-
if self._thread.is_alive():
160+
if threading.current_thread() != thread:
161+
thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT)
162+
if thread.is_alive():
140163
logger.warning("LCM thread did not stop cleanly within timeout")
141164

142-
# Clean up LCM instance if we created it
143-
if not self.config.lcm:
144-
with self._l_lock:
145-
if self.l is not None:
146-
del self.l
147-
self.l = None
165+
# If the thread is still alive, do not clean up now. _lcm_loop will
166+
# clean up when it exits. If we try to clean up here as well it could
167+
# race with the cleanup in _lcm_loop and segfault, and it would leave
168+
# the service half-stopped with a live thread but no LCM instance.
169+
if thread is None or not thread.is_alive():
170+
self._cleanup_owned_lcm()
148171

149172
with self._call_thread_pool_lock:
150173
if self._call_thread_pool:

0 commit comments

Comments
 (0)