Skip to content

Commit 2731f95

Browse files
committed
feat: training data pipeline, solver VL-JEPA, RPB deployment plan
- Rewrite PLAN.md to focus on RPB jurisdiction deployment (phases 1-4) - Add training_attestor and training_feed modules for agent-trace training pipeline - Update solver with VL-JEPA training support - Extend node service with governance bridge initialization - ATN runtime refinements: config, conversation export, session management - Add tests for training attestor, training feed, and solver VL-JEPA integration - Update governance integration tests
1 parent dc0d454 commit 2731f95

17 files changed

Lines changed: 2196 additions & 877 deletions

PLAN.md

Lines changed: 134 additions & 850 deletions
Large diffs are not rendered by default.

atn/autonet_service.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
from enum import Enum
2323
from typing import Any, TYPE_CHECKING
2424

25+
from .events import EventType
26+
2527
if TYPE_CHECKING:
2628
from .config import AutonetConfig
2729
from .events import EventBus
@@ -85,7 +87,8 @@ class AutonetBridge:
8587
task so it doesn't block the ATN event loop.
8688
"""
8789

88-
def __init__(self, config: AutonetConfig, event_bus: EventBus | None = None) -> None:
90+
def __init__(self, config: AutonetConfig, event_bus: EventBus | None = None,
91+
data_dir: str = "") -> None:
8992
self.config = config
9093
self.state = AutonetState()
9194
self._events = event_bus
@@ -96,6 +99,8 @@ def __init__(self, config: AutonetConfig, event_bus: EventBus | None = None) ->
9699
self._published_standards_hash: str = ""
97100
self._published_tx_hash: str = ""
98101
self._user_contract_address: str = ""
102+
# Training data feed (Story 3.2) — ATN data dir for JSONL files
103+
self._data_dir = data_dir
99104

100105
# Apply config values to state
101106
if config.wallet_address:
@@ -106,6 +111,13 @@ def __init__(self, config: AutonetConfig, event_bus: EventBus | None = None) ->
106111
if config.chain_id:
107112
self.state.chain_id = config.chain_id
108113

114+
# Subscribe to execution events for training data feed
115+
if self._events:
116+
self._events.subscribe(
117+
EventType.EXECUTION_COMPLETED,
118+
self._on_execution_completed,
119+
)
120+
109121
async def _emit(self, event_type_name: str, data: dict[str, Any] | None = None) -> None:
110122
"""Emit an event if the event bus is available."""
111123
if not self._events:
@@ -119,6 +131,20 @@ async def _emit(self, event_type_name: str, data: dict[str, Any] | None = None)
119131
data=data or self.state.to_dict(),
120132
))
121133

134+
async def _on_execution_completed(self, event) -> None:
135+
"""Handle EXECUTION_COMPLETED events from the EventBus.
136+
137+
Story 3.2: When an agent execution completes, notify the training
138+
service so it knows new training data is available on disk.
139+
"""
140+
data = event.data or {}
141+
agent_id = data.get("agent_id", "")
142+
execution_id = data.get("execution_id", "")
143+
status = data.get("status", "")
144+
145+
if self._service:
146+
self._service.notify_execution(agent_id, execution_id, status)
147+
122148
async def start(self) -> dict[str, Any]:
123149
"""Start the autonet training service.
124150
@@ -144,10 +170,12 @@ async def start(self) -> dict[str, Any]:
144170
self._autonet_config.blockchain.rpc_url = self.config.rpc_url
145171
if self.config.chain_id:
146172
self._autonet_config.blockchain.chain_id = self.config.chain_id
173+
if self.config.private_key:
174+
self._autonet_config.blockchain.private_key = self.config.private_key
147175

148176
# Create and start the service in a background thread
149177
# (AutonetService.start() is blocking)
150-
self._service = AutonetService(self._autonet_config)
178+
self._service = AutonetService(self._autonet_config, data_dir=self._data_dir)
151179
self._task = asyncio.create_task(self._run_service())
152180

153181
self.state.status = AutonetStatus.RUNNING

atn/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class AutonetConfig:
106106
# Blockchain connection (overrides autonet.yaml if set)
107107
rpc_url: str = "" # e.g. "https://node.shadownet.etherlink.com"
108108
chain_id: int = 0 # e.g. 127823 for Etherlink Shadownet
109+
private_key: str = "" # Hex private key for signing attestation txns
109110
# Wallet is managed externally (MetaMask etc.) — we just track the address
110111
wallet_address: str = "" # Connected wallet address (empty = not connected)
111112

@@ -267,6 +268,7 @@ def load_config(path: Path | None = None) -> ATNConfig:
267268
config_path=resolved.get("config_path", ""),
268269
rpc_url=resolved.get("rpc_url", ""),
269270
chain_id=resolved.get("chain_id", 0),
271+
private_key=resolved.get("private_key", ""),
270272
wallet_address=resolved.get("wallet_address", ""),
271273
)
272274

atn/conversation.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,20 +120,27 @@ def get_turns_page(self, limit: int = 50, offset: int | None = None) -> tuple[li
120120
return [], total
121121
return list(self._turns[start:end]), total
122122

123-
def get_history_for_prompt(self) -> str:
123+
def get_history_for_prompt(self, exclude_last: int = 0) -> str:
124124
"""Build a conversation history string that fits within the token budget.
125125
126126
Returns a formatted string of previous turns, newest last, trimmed
127127
from the front (oldest) if the total exceeds the budget. Returns
128128
empty string if no history.
129+
130+
Args:
131+
exclude_last: Number of turns to exclude from the end (e.g. 1 to
132+
skip the current user message that was just recorded).
129133
"""
130-
if not self._turns:
134+
turns = self._turns
135+
if exclude_last > 0:
136+
turns = turns[:-exclude_last] if len(turns) > exclude_last else []
137+
if not turns:
131138
return ""
132139

133140
# Build formatted turns
134141
formatted: list[str] = []
135142
_ROLE_PREFIX = {"user": "User", "assistant": "Orchestrator", "system": "System"}
136-
for turn in self._turns:
143+
for turn in turns:
137144
prefix = _ROLE_PREFIX.get(turn.role, turn.role.title())
138145
formatted.append(f"{prefix}: {turn.content}")
139146

atn/orchestrator/tools.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,11 +1186,11 @@ async def _post_message(runtime: Runtime, input: dict[str, Any]) -> dict[str, An
11861186
priority = MessagePriority(input.get("priority", "normal"))
11871187
instruction = (input.get("data") or {}).get("instruction", "")
11881188

1189-
# For cognitive agents with an instruction, persist the message in
1190-
# the agent's conversation store so it appears in its chat thread.
1191-
if defn.mode == AgentMode.COGNITIVE and instruction:
1192-
store = runtime.get_agent_conversation_store(target)
1193-
store.add_user_turn(instruction)
1189+
# NOTE: Do NOT add_user_turn here. The execution engine records the
1190+
# user turn right before calling the LLM (execution_engine.py lines
1191+
# 624-636), which is the single place responsible for persisting turns
1192+
# for both the orchestrator and child agents. Adding it here caused
1193+
# duplicate user messages in the conversation store.
11941194

11951195
msg = InboxMessage(
11961196
id=InboxMessage.generate_id(),

atn/runtime/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,13 @@ def __init__(self, event_bus: EventBus, data_dir: Path | None = None, config: AT
194194
# Voice service (lazy)
195195
self.voice = None # type: Any
196196

197-
# Autonet service
197+
# Autonet service (Story 3.2: pass data_dir for training data feed)
198198
from ..autonet_service import AutonetBridge
199-
self.autonet = AutonetBridge(self._config.autonet, event_bus=self.events)
199+
self.autonet = AutonetBridge(
200+
self._config.autonet,
201+
event_bus=self.events,
202+
data_dir=str(self._config.data_dir),
203+
)
200204

201205
# Tool registry
202206
from ..tool_registry import ToolRegistry

atn/runtime/execution_engine.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -613,19 +613,23 @@ async def _execute_cognitive_agent(
613613
# --- Session resume / history ---
614614
session_id = ""
615615
if is_orchestrator:
616-
# Build history from PRIOR turns before recording this one.
617616
session_id = getattr(sub_provider, '_session_id', "") or ""
618-
if not session_id:
619-
history = self.session_manager.conversation.get_history_for_prompt()
620-
else:
621-
history = ""
622-
# Store the raw user message (skip if already recorded by
623-
# ws_server or send_agent_message to avoid duplicates).
617+
# Record the user message (skip if already recorded by
618+
# ws_server to avoid duplicates in the conversation store).
624619
existing_turns = self.session_manager.conversation.get_turns()
625620
if (not existing_turns
626621
or existing_turns[-1].role != "user"
627622
or existing_turns[-1].content != user_message):
628623
self.session_manager.conversation.add_user_turn(user_message)
624+
# Build prior-turn history for non-session providers,
625+
# excluding the current user message (exclude_last=1) so
626+
# it doesn't appear in both history AND the prompt.
627+
if not session_id:
628+
history = self.session_manager.conversation.get_history_for_prompt(
629+
exclude_last=1,
630+
)
631+
else:
632+
history = ""
629633
# Prepend conversation history for non-session providers
630634
if history:
631635
user_message = history + "\n\nUser: " + user_message

atn/runtime/session_manager.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,21 @@ async def send_agent_message(self, agent_id: str, text: str) -> dict:
8686
return {"error": f"Agent '{agent_id}' is not a cognitive agent"}
8787

8888
store = self.get_agent_conversation_store(agent_id)
89-
store.add_user_turn(text)
9089

9190
# Mid-session injection — only if actually running
9291
is_running = self.registry._running_count.get(agent_id, 0) > 0
9392
if is_running:
9493
provider = self.provider_manager._active_providers.get(agent_id)
9594
if provider is not None:
95+
# Record in conversation store here — the execution engine
96+
# won't see this message (it goes directly to the provider).
97+
store.add_user_turn(text)
9698
await provider.send_user_message(text)
9799
return {"status": "injected", "agent_id": agent_id}
98100

99-
# Not running — post to inbox
101+
# Not running — post to inbox. Do NOT add_user_turn here;
102+
# the execution engine records it when it drains the inbox
103+
# (avoiding duplicates in the conversation store).
100104
msg = InboxMessage(
101105
id=InboxMessage.generate_id(),
102106
type=MessageType.WORK,

atn/ws_server.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,10 @@ async def _do_oauth() -> None:
654654
if msg_type == "post_message" and "source" not in args:
655655
args["source"] = "user"
656656

657-
# Record user turn in conversation history when messaging the orchestrator
657+
# Record user turn in conversation history early so the UI can fetch
658+
# it immediately via get_conversation (the frontend does an optimistic
659+
# add, but a history reload would lose it without this).
660+
# The execution engine skips re-adding it (dedup check at line ~624).
658661
if (msg_type == "post_message"
659662
and args.get("target") == "orchestrator"
660663
and args.get("source") == "user"):

0 commit comments

Comments
 (0)