Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 30 additions & 27 deletions dimos/protocol/pubsub/impl/lcmpubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,50 +84,53 @@ class LCMPubSubBase(LCMService, AllPubSub[Topic, Any]):

def publish(self, topic: Topic | str, message: bytes) -> None:
"""Publish a message to the specified channel."""
if self.l is None:
logger.error("Tried to publish after LCM was closed")
return

topic_str = str(topic) if isinstance(topic, Topic) else topic
self.l.publish(topic_str, message)
with self._l_lock:
l = self.l
if l is None:
logger.error("Tried to publish after LCM was closed")
return
l.publish(topic_str, message)

def subscribe_all(self, callback: Callable[[bytes, Topic], Any]) -> Callable[[], None]:
return self.subscribe(Topic(re.compile(".*")), callback)

def subscribe(
self, topic: Topic, callback: Callable[[bytes, Topic], None]
) -> Callable[[], None]:
if self.l is None:
logger.error("Tried to subscribe after LCM was closed")
with self._l_lock:
if self.l is None:
logger.error("Tried to subscribe after LCM was closed")

def noop() -> None:
pass
def noop() -> None:
pass

return noop
return noop

if topic.is_pattern:
if topic.is_pattern:

def handler(channel: str, msg: bytes) -> None:
if channel == "LCM_SELF_TEST":
return
callback(msg, Topic.from_channel_str(channel, topic.lcm_type))
def handler(channel: str, msg: bytes) -> None:
if channel == "LCM_SELF_TEST":
return
callback(msg, Topic.from_channel_str(channel, topic.lcm_type))

pattern_str = str(topic)
if not pattern_str.endswith("*"):
pattern_str = f"{pattern_str}(#.*)?"
pattern_str = str(topic)
if not pattern_str.endswith("*"):
pattern_str = f"{pattern_str}(#.*)?"

lcm_subscription = self.l.subscribe(pattern_str, handler)
else:
topic_str = str(topic)
lcm_subscription = self.l.subscribe(topic_str, lambda _, msg: callback(msg, topic))
lcm_subscription = self.l.subscribe(pattern_str, handler)
else:
topic_str = str(topic)
lcm_subscription = self.l.subscribe(topic_str, lambda _, msg: callback(msg, topic))

# Set queue capacity to 10000 to handle high-volume bursts
lcm_subscription.set_queue_capacity(10000)
# Set queue capacity to 10000 to handle high-volume bursts
lcm_subscription.set_queue_capacity(10000)

def unsubscribe() -> None:
if self.l is None:
return
self.l.unsubscribe(lcm_subscription)
with self._l_lock:
if self.l is None:
return
self.l.unsubscribe(lcm_subscription)

return unsubscribe

Expand Down
79 changes: 51 additions & 28 deletions dimos/protocol/service/lcmservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,47 +104,70 @@ def __setstate__(self, state) -> None: # type: ignore[no-untyped-def]
self._call_thread_pool = None
self._call_thread_pool_lock = threading.RLock()

def _cleanup_owned_lcm(self) -> None:
"""Dispose of the internally-owned LCM instance."""
if self.config.lcm:
return

with self._l_lock:
if self.l is not None:
del self.l
self.l = None

def start(self) -> None:
# Reinitialize LCM if it's None (e.g., after unpickling)
if self.l is None:
if self.config.lcm:
self.l = self.config.lcm
else:
self.l = lcm_mod.LCM(self.config.url) if self.config.url else lcm_mod.LCM()

self._stop_event.clear()
self._thread = threading.Thread(target=self._lcm_loop)
self._thread.daemon = True
self._thread.start()
with self._l_lock:
if self._thread is not None and self._thread.is_alive():
return

# Reinitialize LCM if it's None (e.g., after unpickling)
if self.l is None:
if self.config.lcm:
self.l = self.config.lcm
else:
self.l = lcm_mod.LCM(self.config.url) if self.config.url else lcm_mod.LCM()

self._stop_event.clear()
self._thread = threading.Thread(target=self._lcm_loop, daemon=True)
self._thread.start()

def _lcm_loop(self) -> None:
"""LCM message handling loop."""
while not self._stop_event.is_set():
try:
try:
while not self._stop_event.is_set():
with self._l_lock:
if self.l is None:
l = self.l
if l is None:
break
self.l.handle_timeout(_LCM_LOOP_TIMEOUT)
except Exception as e:
stack_trace = traceback.format_exc()
print(f"Error in LCM handling: {e}\n{stack_trace}")
try:
# This doesn't have to be under a lock because the C
# library has its own locking for this.
l.handle_timeout(_LCM_LOOP_TIMEOUT)
except Exception as e:
stack_trace = traceback.format_exc()
print(f"Error in LCM handling: {e}\n{stack_trace}")
finally:
self._cleanup_owned_lcm()
with self._l_lock:
if self._thread is threading.current_thread():
self._thread = None

def stop(self) -> None:
"""Stop the LCM loop."""
self._stop_event.set()
if self._thread is not None:
thread = self._thread
if thread is not None:
# Only join if we're not the LCM thread (avoid "cannot join current thread")
if threading.current_thread() != self._thread:
self._thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT)
if self._thread.is_alive():
if threading.current_thread() != thread:
thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT)
if thread.is_alive():
logger.warning("LCM thread did not stop cleanly within timeout")

# Clean up LCM instance if we created it
if not self.config.lcm:
with self._l_lock:
if self.l is not None:
del self.l
self.l = None
# If the thread is still alive, do not clean up now. _lcm_loop will
# clean up when it exits. If we try to clean up here as well it could
# race with the cleanup in _lcm_loop and segfault, and it would leave
# the service half-stopped with a live thread but no LCM instance.
if thread is None or not thread.is_alive():
self._cleanup_owned_lcm()

with self._call_thread_pool_lock:
if self._call_thread_pool:
Expand Down
Loading