|
18 | 18 | import hashlib |
19 | 19 | import json |
20 | 20 | import logging |
| 21 | +import threading |
21 | 22 | from dataclasses import dataclass, field |
22 | 23 | from enum import Enum |
23 | 24 | from typing import Any, TYPE_CHECKING |
@@ -140,6 +141,17 @@ def __init__(self, config: AutonetConfig, event_bus: EventBus | None = None, |
140 | 141 | self._on_execution_completed, |
141 | 142 | ) |
142 | 143 |
|
| 144 | + # P2P agent advertisement |
| 145 | + self._p2p_host = None # AutonetHost (lazy, trio-based) |
| 146 | + self._p2p_thread: threading.Thread | None = None |
| 147 | + self._p2p_stop = threading.Event() |
| 148 | + self._agent_registry = None # Set by Runtime after init |
| 149 | + # Subscribe to agent lifecycle events for p2p advertisement refresh |
| 150 | + if self._events: |
| 151 | + for evt in (EventType.AGENT_REGISTERED, EventType.AGENT_UNREGISTERED, |
| 152 | + EventType.AGENT_ACTIVATED, EventType.AGENT_DEACTIVATED): |
| 153 | + self._events.subscribe(evt, self._on_agent_changed) |
| 154 | + |
143 | 155 | # Constitution CID loaded lazily from on-chain Registry |
144 | 156 | self._constitution_loaded = False |
145 | 157 | # Raw constitution text (loaded once, cached for prompt injection) |
@@ -177,6 +189,107 @@ def _discover_jurisdiction(self) -> None: |
177 | 189 | log.warning("Failed to discover jurisdiction from %s: %s", |
178 | 190 | self.config.dao_address, e) |
179 | 191 |
|
| 192 | + # ------------------------------------------------------------------ |
| 193 | + # P2P agent advertisement |
| 194 | + # ------------------------------------------------------------------ |
| 195 | + |
| 196 | + def set_agent_registry(self, registry: Any) -> None: |
| 197 | + """Called by Runtime after init to provide agent registry reference.""" |
| 198 | + self._agent_registry = registry |
| 199 | + |
| 200 | + async def _on_agent_changed(self, event: Any) -> None: |
| 201 | + """Refresh p2p capability when agents are registered/unregistered.""" |
| 202 | + self._refresh_p2p_agents() |
| 203 | + |
| 204 | + def _refresh_p2p_agents(self) -> None: |
| 205 | + """Rebuild agent advertisements from registry and push to p2p host.""" |
| 206 | + if not self._p2p_host or not self._agent_registry: |
| 207 | + return |
| 208 | + try: |
| 209 | + ads = self._agent_registry.build_agent_advertisements() |
| 210 | + # Also include the connected wallet as a root agent if present |
| 211 | + if self.state.wallet_address and not any( |
| 212 | + a["address"].lower() == self.state.wallet_address.lower() for a in ads |
| 213 | + ): |
| 214 | + ads.insert(0, { |
| 215 | + "address": self.state.wallet_address, |
| 216 | + "name": "root", |
| 217 | + "description": "", |
| 218 | + "agent_type": "orchestrator", |
| 219 | + "model": "", |
| 220 | + "is_root": True, |
| 221 | + "parent_address": "", |
| 222 | + "registered_on_chain": False, |
| 223 | + }) |
| 224 | + self._p2p_host.update_capability(agents=ads) |
| 225 | + log.debug("P2P capability updated with %d agent(s)", len(ads)) |
| 226 | + except Exception: |
| 227 | + log.debug("Failed to refresh p2p agents", exc_info=True) |
| 228 | + |
| 229 | + def start_p2p(self) -> None: |
| 230 | + """Start the p2p host in a background thread for agent advertisement.""" |
| 231 | + if self._p2p_thread and self._p2p_thread.is_alive(): |
| 232 | + return |
| 233 | + try: |
| 234 | + from nodes.common.p2p import AutonetHost, NodeCapability |
| 235 | + from nodes.common.config import load_config as load_autonet_config |
| 236 | + except Exception: |
| 237 | + log.debug("P2P not available (nodes package not installed or import error)") |
| 238 | + return |
| 239 | + |
| 240 | + config_path = self.config.config_path or None |
| 241 | + try: |
| 242 | + cfg = load_autonet_config(config_path) |
| 243 | + except Exception: |
| 244 | + cfg = None |
| 245 | + |
| 246 | + listen_port = cfg.p2p.listen_port if cfg else 0 |
| 247 | + listen_host = cfg.p2p.listen_host if cfg else "0.0.0.0" |
| 248 | + bootstrap = cfg.p2p.bootstrap_peers if cfg else [] |
| 249 | + advertise_interval = cfg.p2p.capability_advertise_interval if cfg else 60 |
| 250 | + |
| 251 | + node_id = f"atn-{self.state.wallet_address[:8]}" if self.state.wallet_address else "atn-daemon" |
| 252 | + cap = NodeCapability(peer_id="", node_id=node_id) |
| 253 | + |
| 254 | + host = AutonetHost( |
| 255 | + node_id=node_id, |
| 256 | + listen_port=listen_port, |
| 257 | + listen_host=listen_host, |
| 258 | + bootstrap_peers=bootstrap, |
| 259 | + capability=cap, |
| 260 | + ) |
| 261 | + self._p2p_host = host |
| 262 | + self._p2p_stop.clear() |
| 263 | + |
| 264 | + def _run(): |
| 265 | + import trio |
| 266 | + async def _main(): |
| 267 | + async with host.run(): |
| 268 | + self._refresh_p2p_agents() |
| 269 | + await host.advertise_capability() |
| 270 | + log.info("P2P host running, advertising %d agent(s)", |
| 271 | + len(host._capability.agents)) |
| 272 | + while not self._p2p_stop.is_set(): |
| 273 | + await trio.sleep(advertise_interval) |
| 274 | + self._refresh_p2p_agents() |
| 275 | + await host.advertise_capability() |
| 276 | + try: |
| 277 | + trio.run(_main) |
| 278 | + except Exception: |
| 279 | + log.debug("P2P host stopped", exc_info=True) |
| 280 | + |
| 281 | + self._p2p_thread = threading.Thread(target=_run, name="p2p-host", daemon=True) |
| 282 | + self._p2p_thread.start() |
| 283 | + log.info("P2P agent advertisement started") |
| 284 | + |
| 285 | + def stop_p2p(self) -> None: |
| 286 | + """Stop the p2p background thread.""" |
| 287 | + self._p2p_stop.set() |
| 288 | + if self._p2p_thread: |
| 289 | + self._p2p_thread.join(timeout=5) |
| 290 | + self._p2p_thread = None |
| 291 | + self._p2p_host = None |
| 292 | + |
180 | 293 | async def _emit(self, event_type_name: str, data: dict[str, Any] | None = None) -> None: |
181 | 294 | """Emit an event if the event bus is available.""" |
182 | 295 | if not self._events: |
@@ -295,6 +408,8 @@ async def start(self) -> dict[str, Any]: |
295 | 408 |
|
296 | 409 | self.state.status = AutonetStatus.RUNNING |
297 | 410 | log.info("Autonet service started") |
| 411 | + # Start p2p agent advertisement alongside the training service |
| 412 | + self.start_p2p() |
298 | 413 | await self._emit("AUTONET_STARTED") |
299 | 414 | return {"status": "started"} |
300 | 415 |
|
@@ -338,6 +453,7 @@ async def stop(self) -> dict[str, Any]: |
338 | 453 | except asyncio.CancelledError: |
339 | 454 | pass |
340 | 455 |
|
| 456 | + self.stop_p2p() |
341 | 457 | self.state.status = AutonetStatus.STOPPED |
342 | 458 | log.info("Autonet service stopped") |
343 | 459 | await self._emit("AUTONET_STOPPED") |
|
0 commit comments