-
Notifications
You must be signed in to change notification settings - Fork 43
feat(agent): emit status notifications for orchestrator streaming #2732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3c39135
8ec2712
f5caa29
6b7c0e8
b446d87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,6 +62,16 @@ interface AnthropicMessageWithContent { | |
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Accumulates assistant-text content emitted since the last tool_use so the | ||
| * orchestrator can render the agent's pre-tool prose as the upcoming step's | ||
| * description. The mutable wrapper threads through the chunk handlers; each | ||
| * tool_use emit reads and resets ``value``. | ||
| */ | ||
| export interface IntentBuffer { | ||
| value: string; | ||
| } | ||
|
|
||
| type ChunkHandlerContext = { | ||
| sessionId: string; | ||
| toolUseCache: ToolUseCache; | ||
|
|
@@ -73,6 +83,7 @@ type ChunkHandlerContext = { | |
| registerHooks?: boolean; | ||
| supportsTerminalOutput?: boolean; | ||
| cwd?: string; | ||
| intentBuffer?: IntentBuffer; | ||
| /** Raw MCP tool result from SDKUserMessage.tool_use_result (contains content, structuredContent, _meta) */ | ||
| mcpToolUseResult?: Record<string, unknown>; | ||
| /** Per-session task list (populated by createTaskHook + tool_result handler) */ | ||
|
|
@@ -109,6 +120,13 @@ export interface MessageHandlerContext { | |
| supportsTerminalOutput?: boolean; | ||
| /** Absent on replay, where the legacy drop-all text/thinking filter applies. */ | ||
| streamedAssistantBlocks?: StreamedAssistantBlocks; | ||
| /** | ||
| * Mutable buffer accumulating assistant-text deltas since the last tool_use. | ||
| * Read+reset on every ``_posthog/status`` emit so the orchestrator can use | ||
| * the agent's pre-tool prose as the upcoming step's description. Optional | ||
| * (absent on replay); the chunk handler tolerates ``undefined``. | ||
| */ | ||
| intentBuffer?: IntentBuffer; | ||
| } | ||
|
|
||
| function messageUpdateType(role: Role) { | ||
|
|
@@ -136,11 +154,72 @@ function bashCommandFromToolUse( | |
| return typeof command === "string" ? command : undefined; | ||
| } | ||
|
|
||
| const TOOL_ARGS_PREVIEW_LIMIT = 240; | ||
|
|
||
| const TOOL_ARGS_PREVIEW_KEYS = [ | ||
| "file_path", | ||
| "notebook_path", | ||
| "path", | ||
| "code", // MCP exec / hogql / sql payloads | ||
| "query", // search queries | ||
| "pattern", // grep / glob patterns | ||
| "url", | ||
| "description", | ||
| "prompt", // Task / Agent sub-agent prompt | ||
| "name", // schema lookups | ||
| "title", | ||
| ]; | ||
|
|
||
| function toolArgsPreview( | ||
| chunk: ToolUseCache[string], | ||
| bashCommand: string | undefined, | ||
| ): string { | ||
| const input = chunk.input as Record<string, unknown> | undefined; | ||
| const tryField = (key: string): string | undefined => { | ||
| const v = input?.[key]; | ||
| return typeof v === "string" && v ? v : undefined; | ||
| }; | ||
|
|
||
| let raw = bashCommand; | ||
| if (!raw) { | ||
| for (const key of TOOL_ARGS_PREVIEW_KEYS) { | ||
| const v = tryField(key); | ||
| if (v) { | ||
| raw = v; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| // Fallback: take the first short-string arg of the input. Avoids returning | ||
| // the empty string when an MCP tool uses an arg name we don't enumerate | ||
| // above. Bound by ``TOOL_ARGS_PREVIEW_LIMIT`` after the truncation below. | ||
| if (!raw && input) { | ||
| for (const value of Object.values(input)) { | ||
| if (typeof value === "string" && value.trim()) { | ||
| raw = value; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| if (!raw) return ""; | ||
| const oneLine = raw.replace(/\s+/g, " ").trim(); | ||
| return oneLine.length > TOOL_ARGS_PREVIEW_LIMIT | ||
| ? `${oneLine.slice(0, TOOL_ARGS_PREVIEW_LIMIT - 1)}…` | ||
| : oneLine; | ||
| } | ||
|
|
||
| function handleTextChunk( | ||
| chunk: { text: string }, | ||
| role: Role, | ||
| parentToolCallId?: string, | ||
| intentBuffer?: IntentBuffer, | ||
| ): SessionUpdate { | ||
| // Only top-level assistant prose feeds the upcoming tool's intent. Skip | ||
| // sub-agent (parentToolCallId) and user text; neither should drive a | ||
| // surrounding step's description. | ||
| if (role === "assistant" && !parentToolCallId && intentBuffer && chunk.text) { | ||
| intentBuffer.value += chunk.text; | ||
| } | ||
| const update: SessionUpdate = { | ||
| sessionUpdate: messageUpdateType(role), | ||
| content: text(chunk.text), | ||
|
|
@@ -249,6 +328,37 @@ function handleToolUseChunk( | |
| cwd: ctx.cwd, | ||
| }); | ||
|
|
||
| // Broadcast a live "agent is doing X" status when a tool first starts so | ||
| // downstream consumers (the Slack orchestrator) can render it as a status | ||
| // line in the thread. ``tool_name`` / ``tool_args_preview`` give the bare | ||
| // tool name and a short arg preview for the plan-block step; ``tool_intent`` | ||
| // carries the assistant prose accumulated since the previous tool — the | ||
| // orchestrator uses it as the step's description when it's short enough to | ||
| // fit Slack's task_update.details field, otherwise it stays in the message | ||
| // body. Always-emit (rather than agent-side threshold) keeps the protocol | ||
| // simple — the orchestrator owns the rendering decision. | ||
| if (!alreadyCached && toolInfo.title) { | ||
| const toolIntent = ctx.intentBuffer?.value ?? ""; | ||
| if (ctx.intentBuffer) { | ||
| ctx.intentBuffer.value = ""; | ||
| } | ||
| void ctx.client | ||
| .extNotification(POSTHOG_NOTIFICATIONS.STATUS, { | ||
| sessionId: ctx.sessionId, | ||
| status: "tool_use", | ||
| text: toolInfo.title, | ||
| tool_name: chunk.name, | ||
| tool_args_preview: toolArgsPreview( | ||
| chunk, | ||
| bashCommandFromToolUse(chunk), | ||
| ), | ||
| tool_intent: toolIntent, | ||
| }) | ||
| .catch(() => { | ||
| // Best-effort — a failed status broadcast must not break tool execution. | ||
| }); | ||
|
Comment on lines
+356
to
+359
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The Prompt To Fix With AIThis is a comment left during a code review.
Path: packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts
Line: 261-264
Comment:
**Silent error suppression makes systematic failures invisible**
The `.catch(() => {})` swallows all errors without logging. If the `extNotification` call is systematically failing (e.g. due to a schema mismatch, a disconnected client, or a downstream bug), there will be no signal in any log output. A single `ctx.logger.warn(...)` call in the catch handler would make these failures observable without changing the fire-and-forget semantics.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
||
| } | ||
|
|
||
| const meta: Record<string, unknown> = { | ||
| ...toolMeta( | ||
| chunk.name, | ||
|
|
@@ -463,7 +573,12 @@ function processContentChunk( | |
| switch (chunk.type) { | ||
| case "text": | ||
| case "text_delta": { | ||
| const update = handleTextChunk(chunk, role, ctx.parentToolCallId); | ||
| const update = handleTextChunk( | ||
| chunk, | ||
| role, | ||
| ctx.parentToolCallId, | ||
| ctx.intentBuffer, | ||
| ); | ||
| return update ? [update] : []; | ||
| } | ||
|
|
||
|
|
@@ -541,8 +656,12 @@ function toAcpNotifications( | |
| mcpToolUseResult?: Record<string, unknown>, | ||
| enrichedReadCache?: EnrichedReadCache, | ||
| taskState?: TaskState, | ||
| intentBuffer?: IntentBuffer, | ||
| ): SessionNotification[] { | ||
| if (typeof content === "string") { | ||
| if (role === "assistant" && !parentToolCallId && intentBuffer && content) { | ||
| intentBuffer.value += content; | ||
| } | ||
| const update: SessionUpdate = { | ||
| sessionUpdate: messageUpdateType(role), | ||
| content: text(content), | ||
|
|
@@ -570,6 +689,7 @@ function toAcpNotifications( | |
| cwd, | ||
| mcpToolUseResult, | ||
| taskState, | ||
| intentBuffer, | ||
| }; | ||
| const output: SessionNotification[] = []; | ||
|
|
||
|
|
@@ -596,6 +716,7 @@ function streamEventToAcpNotifications( | |
| cwd?: string, | ||
| enrichedReadCache?: EnrichedReadCache, | ||
| taskState?: TaskState, | ||
| intentBuffer?: IntentBuffer, | ||
| ): SessionNotification[] { | ||
| const event = message.event; | ||
| switch (event.type) { | ||
|
|
@@ -622,6 +743,7 @@ function streamEventToAcpNotifications( | |
| undefined, | ||
| enrichedReadCache, | ||
| taskState, | ||
| intentBuffer, | ||
| ); | ||
| } | ||
| case "content_block_delta": { | ||
|
|
@@ -648,6 +770,7 @@ function streamEventToAcpNotifications( | |
| undefined, | ||
| enrichedReadCache, | ||
| taskState, | ||
| intentBuffer, | ||
| ); | ||
| } | ||
| case "content_block_stop": | ||
|
|
@@ -983,6 +1106,7 @@ export async function handleStreamEvent( | |
| context.session.cwd, | ||
| context.enrichedReadCache, | ||
| context.session.taskState, | ||
| context.intentBuffer, | ||
| )) { | ||
| await client.sessionUpdate(notification); | ||
| context.session.notificationHistory.push(notification); | ||
|
|
@@ -1188,6 +1312,7 @@ export async function handleUserAssistantMessage( | |
| undefined, | ||
| context.enrichedReadCache, | ||
| session.taskState, | ||
| context.intentBuffer, | ||
| )) { | ||
| await client.sessionUpdate(notification); | ||
| session.notificationHistory.push(notification); | ||
|
|
@@ -1234,6 +1359,7 @@ export async function handleUserAssistantMessage( | |
| mcpToolUseResult, | ||
| context.enrichedReadCache, | ||
| session.taskState, | ||
| context.intentBuffer, | ||
| )) { | ||
| await client.sessionUpdate(notification); | ||
| session.notificationHistory.push(notification); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the streaming path,
handleToolUseChunkis first called fromstreamEventToAcpNotifications → content_block_start. At that pointchunk.inputis always{}because the input arrives later viainput_json_deltadeltas (which go throughinputJsonDeltaToAcpNotifications, never through this function again). So the notification fires immediately with the fallback title —"Execute command"for Bash,"Read File"for Read,"Edit"for Edit — not the specific"Running tests"/"Reading api.py"values the PR describes.When the consolidated
SDKAssistantMessagearrives with the full input,alreadyCachedis alreadytrue, so the updated title is never sent. The only path that produces a specific title is a non-streaming scenario wherehandleUserAssistantMessageis the first call site for a given tool.Prompt To Fix With AI