Skip to content

Commit 83a38b4

Browse files
EightRiceclaude
andcommitted
refactor: unify orchestrator and cognitive agent execution paths
The orchestrator is now a cognitive-mode agent — identical in structure to any child agent. Key changes: - orchestrator/__init__.py: mode=COGNITIVE with provider fallback chain, cognitive_model, tools=["atn_full"], and system_prompt directly on AgentDefinition (no more pipeline steps wrapper) - runtime.py: Single _execute_cognitive_agent() handles ALL cognitive agents. Orchestrator vs child differences are configuration-driven: * Tool surface: "atn_full" vs "atn_core" (from defn.tools) * Provider lifecycle: orchestrator reuses; children create fresh * Conversation: orchestrator records to global store; children to per-agent * Session resume: orchestrator uses BridgeProvider session_id - runtime.py: New route_tool_call() method — universal tool router for all cognitive agents (shell tools, MCP connectors, ATN framework tools) - runtime.py: New _resolve_provider_by_name() properly maps provider names (claude_max, anthropic, gemini, openai, ollama) to provider instances with the correct model, fixing the provider fallback chain - runtime.py: interrupt_orchestrator() now delegates to interrupt_delegate() - runtime.py: _get_bridge_provider() now uses _active_providers uniformly - ws_server.py: orchestrator_message handler uses _active_providers directly Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6fcc3f3 commit 83a38b4

7 files changed

Lines changed: 437 additions & 286 deletions

File tree

atn/cli.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,20 @@ async def _input_loop(runtime: Runtime) -> None:
268268
loop = asyncio.get_event_loop()
269269
while True:
270270
try:
271-
line = await loop.run_in_executor(None, sys.stdin.readline)
271+
# Race between stdin input and shutdown signal
272+
read_task = asyncio.ensure_future(
273+
loop.run_in_executor(None, sys.stdin.readline)
274+
)
275+
shutdown_task = asyncio.ensure_future(runtime._shutdown_event.wait())
276+
done, pending = await asyncio.wait(
277+
[read_task, shutdown_task],
278+
return_when=asyncio.FIRST_COMPLETED,
279+
)
280+
for p in pending:
281+
p.cancel()
282+
if shutdown_task in done:
283+
break
284+
line = read_task.result()
272285
if not line:
273286
break
274287
keep_going = await _handle_command(line, runtime)
@@ -386,7 +399,8 @@ async def run_cli() -> None:
386399
console.print("\n[bold red]Shutting down...[/]")
387400
if ws_bridge:
388401
await ws_bridge.stop()
389-
await runtime.stop()
402+
if not runtime._shutdown_event.is_set():
403+
await runtime.stop()
390404
console.print("[dim]Bye.[/]")
391405

392406

atn/orchestrator/__init__.py

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
"""Orchestrator — the meta-agent that manages the fleet.
22
3-
The orchestrator is an AgentDefinition with a single multi-turn cognitive step
4-
configured with orchestrator tools. It's registered in the Runtime like any
5-
other agent and wakes on inbox messages (user input, alert escalations).
3+
The orchestrator is a cognitive-mode AgentDefinition — identical to any other
4+
cognitive agent, just with a full tool surface and its own system prompt.
5+
It's registered in the Runtime like any other agent and wakes on inbox
6+
messages (user input, alert escalations).
67
"""
78
from __future__ import annotations
89

910
from pathlib import Path
1011

1112
from ..config import OrchestratorConfig
12-
from ..models import AgentDefinition, StepDefinition, StepType
13+
from ..models import AgentDefinition, AgentMode
1314

1415
ORCHESTRATOR_ID = "orchestrator"
1516

@@ -294,9 +295,9 @@
294295
- **You ARE the orchestrator.** Never tell the user you're "just Claude Code" or \
295296
a different system. You are the ATN orchestrator daemon, running from c:\\code\\autonet.
296297
- **The daemon IS you.** When the user says "restart the daemon," they mean restart \
297-
YOU. You cannot restart yourself the user has to do it.
298-
- **The ATN daemon runs from c:\\code\\autonet.** The legacy c:\\code\\atn repo is \
299-
pre-unification — don't sync framework changes there. But you work across ALL \
298+
YOU. You can restart yourself using the restart_daemon tool — it performs a clean \
299+
shutdown and spawns a new process. Your conversation memory and all agent state persists.
300+
- **The ATN daemon runs from c:\\code\\autonet.** You work across ALL \
300301
of the user's repos as needed (c:\\code\\werule_new, c:\\code\\trustless_new, etc.).
301302
- **Check delegate_status sparingly.** Each check burns your tokens. The advantage \
302303
of delegation is that agents work independently. Check only when you need to make \
@@ -365,6 +366,11 @@ def create_orchestrator_agent(
365366
) -> AgentDefinition:
366367
"""Build the orchestrator AgentDefinition.
367368
369+
The orchestrator is a **cognitive-mode** agent — identical in structure
370+
to any child cognitive agent. The differences are configuration-level:
371+
full tool surface ("atn_full"), provider fallback chain, and the fleet
372+
management system prompt.
373+
368374
Args:
369375
config: Orchestrator config from ATNConfig (provider, model).
370376
system_prompt: Override the default system prompt.
@@ -393,7 +399,6 @@ def create_orchestrator_agent(
393399
)
394400

395401
# Provider fallback chain: primary first, then alternatives.
396-
# The cognitive step will try each in order if the previous one fails.
397402
# Providers that aren't configured are silently skipped at runtime.
398403
# Ollama is excluded — local models can't reliably handle 20+ tools,
399404
# multi-turn planning, and the complex reasoning the orchestrator needs.
@@ -403,30 +408,17 @@ def create_orchestrator_agent(
403408
if p not in provider_chain:
404409
provider_chain.append(p)
405410

406-
step_config = {
407-
"provider": provider_chain,
408-
"model": model,
409-
"system": system_prompt or _DEFAULT_SYSTEM_PROMPT,
410-
"prompt": "{inbox}",
411-
"max_tokens": 4096,
412-
"temperature": 0.0,
413-
"max_turns": max_turns,
414-
"tool_executors": "orchestrator",
415-
}
416-
417-
built_prompt = step_config["system"]
411+
built_prompt = system_prompt or _DEFAULT_SYSTEM_PROMPT
418412

419413
return AgentDefinition(
420414
id=ORCHESTRATOR_ID,
421415
name="Orchestrator",
416+
mode=AgentMode.COGNITIVE,
422417
description="Meta-agent that manages the fleet via multi-turn tool use.",
423418
system_prompt=built_prompt,
424-
steps=[
425-
StepDefinition(
426-
type=StepType.COGNITIVE,
427-
config=step_config,
428-
name="orchestrate",
429-
),
430-
],
419+
provider=provider_chain,
420+
cognitive_model=model,
421+
max_turns=max_turns,
422+
tools=["atn_full"],
431423
concurrency=1,
432424
)

atn/providers/bridge.py

Lines changed: 63 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -229,16 +229,19 @@ async def _stream_events() -> None:
229229
event = await asyncio.wait_for(self._event_queue.get(), timeout=120)
230230
except asyncio.TimeoutError:
231231
break
232-
if event.get("type") == "done":
233-
break
234-
if event.get("type") == "text_delta" and on_chunk:
235-
text = event.get("text", "")
236-
if text:
237-
await on_chunk(text)
238-
elif event.get("type") == "thinking" and on_thinking:
239-
text = event.get("text", "")
240-
if text:
241-
await on_thinking(text)
232+
try:
233+
if event.get("type") == "done":
234+
break
235+
if event.get("type") == "text_delta" and on_chunk:
236+
text = event.get("text", "")
237+
if text:
238+
await on_chunk(text)
239+
elif event.get("type") == "thinking" and on_thinking:
240+
text = event.get("text", "")
241+
if text:
242+
await on_thinking(text)
243+
except Exception:
244+
log.exception("Error processing stream event: %s", event)
242245

243246
async def _get_response() -> dict[str, Any]:
244247
"""Send request and get the final JSON response."""
@@ -386,65 +389,68 @@ async def _stream_events() -> None:
386389
)
387390
except asyncio.TimeoutError:
388391
break
389-
if event.get("type") == "done":
390-
break
391-
if event.get("type") == "text_delta":
392-
text = event.get("text", "")
393-
if text and on_chunk:
394-
await on_chunk(text)
395-
elif event.get("type") == "tool_use_start" and self.event_bus:
396-
await self.event_bus.emit(Event(
397-
type=EventType.AGENT_TOOL_USE_START,
398-
source=self.source_agent_id,
399-
data={
400-
"agent_id": self.source_agent_id,
401-
"tool_use_id": event.get("tool_use_id", ""),
402-
"tool_name": event.get("tool_name", ""),
403-
"input": event.get("input", {}),
404-
},
405-
))
406-
elif event.get("type") == "tool_use_result" and self.event_bus:
407-
await self.event_bus.emit(Event(
408-
type=EventType.AGENT_TOOL_USE_RESULT,
409-
source=self.source_agent_id,
410-
data={
411-
"agent_id": self.source_agent_id,
412-
"tool_use_id": event.get("tool_use_id", ""),
413-
"is_error": event.get("is_error", False),
414-
},
415-
))
416-
elif event.get("type") == "compaction":
417-
self._compaction_count += 1
418-
self._last_compaction_pre_tokens = event.get("pre_tokens", 0)
419-
logger.info(
420-
"Context compaction #%d (trigger=%s, pre_tokens=%d)",
421-
self._compaction_count,
422-
event.get("trigger", "auto"),
423-
self._last_compaction_pre_tokens,
424-
)
425-
if self.event_bus:
392+
try:
393+
if event.get("type") == "done":
394+
break
395+
if event.get("type") == "text_delta":
396+
text = event.get("text", "")
397+
if text and on_chunk:
398+
await on_chunk(text)
399+
elif event.get("type") == "tool_use_start" and self.event_bus:
426400
await self.event_bus.emit(Event(
427-
type=EventType.CUSTOM,
401+
type=EventType.AGENT_TOOL_USE_START,
428402
source=self.source_agent_id,
429403
data={
430-
"type": "context_compaction",
431404
"agent_id": self.source_agent_id,
432-
"compaction_count": self._compaction_count,
433-
"trigger": event.get("trigger", "auto"),
434-
"pre_tokens": self._last_compaction_pre_tokens,
405+
"tool_use_id": event.get("tool_use_id", ""),
406+
"tool_name": event.get("tool_name", ""),
407+
"input": event.get("input", {}),
435408
},
436409
))
437-
elif event.get("type") == "status":
438-
status = event.get("status", "idle")
439-
if status == "compacting" and self.event_bus:
410+
elif event.get("type") == "tool_use_result" and self.event_bus:
440411
await self.event_bus.emit(Event(
441-
type=EventType.CUSTOM,
412+
type=EventType.AGENT_TOOL_USE_RESULT,
442413
source=self.source_agent_id,
443414
data={
444-
"type": "context_compacting",
445415
"agent_id": self.source_agent_id,
416+
"tool_use_id": event.get("tool_use_id", ""),
417+
"is_error": event.get("is_error", False),
446418
},
447419
))
420+
elif event.get("type") == "compaction":
421+
self._compaction_count += 1
422+
self._last_compaction_pre_tokens = event.get("pre_tokens", 0)
423+
log.info(
424+
"Context compaction #%d (trigger=%s, pre_tokens=%d)",
425+
self._compaction_count,
426+
event.get("trigger", "auto"),
427+
self._last_compaction_pre_tokens,
428+
)
429+
if self.event_bus:
430+
await self.event_bus.emit(Event(
431+
type=EventType.CUSTOM,
432+
source=self.source_agent_id,
433+
data={
434+
"type": "context_compaction",
435+
"agent_id": self.source_agent_id,
436+
"compaction_count": self._compaction_count,
437+
"trigger": event.get("trigger", "auto"),
438+
"pre_tokens": self._last_compaction_pre_tokens,
439+
},
440+
))
441+
elif event.get("type") == "status":
442+
status = event.get("status", "idle")
443+
if status == "compacting" and self.event_bus:
444+
await self.event_bus.emit(Event(
445+
type=EventType.CUSTOM,
446+
source=self.source_agent_id,
447+
data={
448+
"type": "context_compacting",
449+
"agent_id": self.source_agent_id,
450+
},
451+
))
452+
except Exception:
453+
log.exception("Error processing stream event: %s", event)
448454

449455
stream_task = asyncio.create_task(_stream_events())
450456

0 commit comments

Comments
 (0)