diff --git a/packages/agent/src/utils/streams.ts b/packages/agent/src/utils/streams.ts index 803a1eb667..ff67712875 100644 --- a/packages/agent/src/utils/streams.ts +++ b/packages/agent/src/utils/streams.ts @@ -122,7 +122,8 @@ export function createTappedWritableStream( const { onMessage, logger } = options; const decoder = new TextDecoder(); let buffer = ""; - let _messageCount = 0; + let messageCount = 0; + let droppedWriteCount = 0; return new WritableStream({ async write(chunk: Uint8Array) { @@ -133,7 +134,7 @@ export function createTappedWritableStream( for (const line of lines) { if (!line.trim()) continue; - _messageCount++; + messageCount++; onMessage(line); } @@ -144,7 +145,13 @@ export function createTappedWritableStream( writer.releaseLock(); } catch (err) { // Stream may be closed if subprocess crashed - log but don't throw - logger?.error("ACP write error", err); + droppedWriteCount++; + logger?.error("ACP write error", { + error: err instanceof Error ? err.message : String(err), + messageCount, + droppedWriteCount, + droppedBytes: chunk.byteLength, + }); } }, async close() { diff --git a/packages/core/src/cloud-task/cloud-task.ts b/packages/core/src/cloud-task/cloud-task.ts index 4919738972..555b3bafbd 100644 --- a/packages/core/src/cloud-task/cloud-task.ts +++ b/packages/core/src/cloud-task/cloud-task.ts @@ -8,7 +8,7 @@ import { type IAnalytics, } from "@posthog/platform/analytics"; import type { StoredLogEntry } from "@posthog/shared"; -import { TypedEventEmitter } from "@posthog/shared"; +import { serializeError, TypedEventEmitter } from "@posthog/shared"; import { ANALYTICS_EVENTS } from "@posthog/shared/analytics-events"; import { inject, injectable, preDestroy } from "inversify"; import type { CloudTaskPermissionRequestUpdate } from "./cloud-task-types"; @@ -106,6 +106,9 @@ interface WatcherState { lastErrorMessage: string | null; lastBranch: string | null; lastStatusUpdatedAt: string | null; + connStartedAt: number; + connSentLastEventId: string | null; + connDataEventsReceived: number; isBootstrapping: boolean; hasEmittedSnapshot: boolean; bufferedLogBatches: StoredLogEntry[][]; @@ -158,6 +161,16 @@ function isPermissionRequestEvent( ); } +function isKeepaliveEvent(event: SseEvent): boolean { + return ( + event.event === "keepalive" || + (typeof event.data === "object" && + event.data !== null && + "type" in event.data && + event.data.type === "keepalive") + ); +} + function createStreamStatusError(status: number): CloudTaskStreamError { switch (status) { case 401: @@ -432,6 +445,9 @@ export class CloudTaskService extends TypedEventEmitter { lastErrorMessage: null, lastBranch: null, lastStatusUpdatedAt: null, + connStartedAt: 0, + connSentLastEventId: null, + connDataEventsReceived: 0, isBootstrapping: false, hasEmittedSnapshot: false, bufferedLogBatches: [], @@ -620,10 +636,15 @@ export class CloudTaskService extends TypedEventEmitter { const controller = new AbortController(); watcher.sseAbortController = controller; + watcher.connStartedAt = 0; + watcher.connSentLastEventId = watcher.lastEventId; + watcher.connDataEventsReceived = 0; + const startLatest = Boolean(options?.startLatest && !watcher.lastEventId); + const url = new URL( `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream/`, ); - if (options?.startLatest && !watcher.lastEventId) { + if (startLatest) { url.searchParams.set("start", "latest"); } const headers: Record = { @@ -643,6 +664,9 @@ export class CloudTaskService extends TypedEventEmitter { // failed reconnect attempt (see SSE_HEALTHY_CONNECTION_MS). let connectedAt = 0; let streamWasEstablished = false; + let bytesReceived = 0; + let eventsReceived = 0; + let dataEventsReceived = 0; try { const response = await this.auth.authenticatedFetch(url.toString(), { @@ -661,6 +685,21 @@ export class CloudTaskService extends TypedEventEmitter { connectedAt = Date.now(); streamWasEstablished = true; + watcher.connStartedAt = connectedAt; + + this.log.info("Cloud task SSE connected", { + key, + sentLastEventId: watcher.connSentLastEventId, + startLatest, + status: response.status, + server: response.headers.get("server"), + via: response.headers.get("via"), + cfRay: response.headers.get("cf-ray"), + cfCacheStatus: response.headers.get("cf-cache-status"), + xAccelBuffering: response.headers.get("x-accel-buffering"), + contentType: response.headers.get("content-type"), + requestId: response.headers.get("x-request-id"), + }); const reader = response.body.getReader(); @@ -674,9 +713,14 @@ export class CloudTaskService extends TypedEventEmitter { continue; } + bytesReceived += value.byteLength; const chunk = decoder.decode(value, { stream: true }); const events = parser.parse(chunk); for (const event of events) { + eventsReceived += 1; + if (event.event !== "error" && !isKeepaliveEvent(event)) { + dataEventsReceived += 1; + } this.handleSseEvent(key, event); } } @@ -692,6 +736,14 @@ export class CloudTaskService extends TypedEventEmitter { return; } + this.log.info("Cloud task stream closed cleanly", { + key, + connectionDurationMs: connectedAt ? Date.now() - connectedAt : 0, + bytesReceived, + eventsReceived, + dataEventsReceived, + lastEventId: this.watchers.get(key)?.lastEventId ?? null, + }); await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true }); } catch (error) { this.flushLogBatch(key); @@ -729,8 +781,20 @@ export class CloudTaskService extends TypedEventEmitter { this.log.warn("Cloud task stream error", { key, error: errorMessage, + errorDetail: serializeError(error), wasHealthyStream, isBackendError, + streamWasEstablished, + connectionDurationMs: streamWasEstablished + ? Date.now() - connectedAt + : 0, + bytesReceived, + eventsReceived, + dataEventsReceived, + lastEventId: watcher?.lastEventId ?? null, + reconnectAttempts: watcher?.reconnectAttempts ?? 0, + streamErrorAttempts: watcher?.streamErrorAttempts ?? 0, + cumulativeReconnectAttempts: watcher?.cumulativeReconnectAttempts ?? 0, }); await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true, @@ -766,13 +830,7 @@ export class CloudTaskService extends TypedEventEmitter { // data. watcher.reconnectAttempts = 0; - if ( - event.event === "keepalive" || - (typeof event.data === "object" && - event.data !== null && - "type" in event.data && - event.data.type === "keepalive") - ) { + if (isKeepaliveEvent(event)) { return; } @@ -781,6 +839,15 @@ export class CloudTaskService extends TypedEventEmitter { watcher.streamErrorAttempts = 0; watcher.cumulativeReconnectAttempts = 0; + watcher.connDataEventsReceived += 1; + if (watcher.connDataEventsReceived === 1 && watcher.connSentLastEventId) { + this.log.info("Cloud task SSE resumed", { + key, + resumedFrom: watcher.connSentLastEventId, + firstEventIdAfterResume: event.id ?? null, + }); + } + if (isTaskRunStateEvent(event.data)) { if (this.applyTaskRunState(watcher, event.data)) { if (!watcher.isBootstrapping && !isTerminalStatus(watcher.lastStatus)) { @@ -992,6 +1059,18 @@ export class CloudTaskService extends TypedEventEmitter { const watcher = this.watchers.get(key); if (!watcher) return; + this.log.warn("Cloud task watcher failed", { + key, + errorTitle: error.title, + retryable: error.retryable, + status: watcher.lastStatus, + wasBootstrapping: watcher.isBootstrapping, + reconnectAttempts: watcher.reconnectAttempts, + cumulativeReconnectAttempts: watcher.cumulativeReconnectAttempts, + totalEntryCount: watcher.totalEntryCount, + lastEventId: watcher.lastEventId, + }); + this.analytics.track(ANALYTICS_EVENTS.CLOUD_STREAM_DISCONNECTED, { task_id: watcher.taskId, run_id: watcher.runId, @@ -1173,6 +1252,14 @@ export class CloudTaskService extends TypedEventEmitter { this.log.warn("Cloud task stream ended before terminal status", { key, status: watcher.lastStatus, + stage: watcher.lastStage, + stateChanged, + lastEventId: watcher.lastEventId, + sentLastEventId: watcher.connSentLastEventId, + connDataEventsReceived: watcher.connDataEventsReceived, + reconnectAttempts: watcher.reconnectAttempts, + streamErrorAttempts: watcher.streamErrorAttempts, + cumulativeReconnectAttempts: watcher.cumulativeReconnectAttempts, }); this.scheduleReconnect(key, options.reconnectError, { countAttempt: options.countReconnectAttempt ?? false, @@ -1180,6 +1267,18 @@ export class CloudTaskService extends TypedEventEmitter { return; } + this.log.info("Cloud task terminal stop", { + key, + status: watcher.lastStatus, + reachedViaError: options.reconnectError !== undefined, + lastEventId: watcher.lastEventId, + sentLastEventId: watcher.connSentLastEventId, + totalEntryCount: watcher.totalEntryCount, + connDataEventsReceived: watcher.connDataEventsReceived, + connDurationMs: + watcher.connStartedAt !== 0 ? Date.now() - watcher.connStartedAt : 0, + }); + // Always emit the latest status before stopping. Terminal states are // intentionally deferred until stream completion; clean EOFs can also mean // the backend has no more stream events even when the run status remains active. diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index 0eb24e96dc..40c22facf5 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -361,6 +361,28 @@ export function isPermissionRequestAlreadySurfaced( ); } +function classifyTurnEventKind( + msg: AcpMessage["message"], +): "text" | "output" | "other" { + if (!("method" in msg) || msg.method !== "session/update") return "other"; + const update = (msg as { params?: { update?: Record } }) + .params?.update; + if (!update) return "other"; + const sessionUpdate = update.sessionUpdate; + if (sessionUpdate === "agent_message_chunk") { + const content = update.content as { type?: string } | undefined; + return content?.type === "text" ? "text" : "output"; + } + if ( + sessionUpdate === "agent_thought_chunk" || + sessionUpdate === "tool_call" || + sessionUpdate === "tool_call_update" + ) { + return "output"; + } + return "other"; +} + export class SessionService { private connectingTasks = new Map>(); private reconcilingTasks = new Set(); @@ -398,6 +420,10 @@ export class SessionService { private cloudLogGapReconciler: CloudLogGapReconciler; /** Maps toolCallId → cloud requestId for routing permission responses */ private cloudPermissionRequestIds = new Map(); + private liveTurnContent = new Map< + string, + { startedAtTs: number; agentTextChunks: number; agentOutputEvents: number } + >(); private idleKilledSubscription: { unsubscribe: () => void } | null = null; /** * Cached preview-config-options responses keyed by `${apiHost}::${adapter}`. @@ -1220,6 +1246,7 @@ export class SessionService { subscription?.event.unsubscribe(); subscription?.permission?.unsubscribe(); this.subscriptions.delete(taskRunId); + this.liveTurnContent.delete(taskRunId); } /** @@ -1247,6 +1274,7 @@ export class SessionService { this.localRepoPaths.clear(); this.localRecoveryAttempts.clear(); this.cloudPermissionRequestIds.clear(); + this.liveTurnContent.clear(); this.cloudLogGapReconciler.clear(); this.dispatchingCloudQueues.clear(); this.scheduledCloudQueueFlushes.clear(); @@ -1255,6 +1283,31 @@ export class SessionService { this.idleKilledSubscription = null; } + private finalizeTurnContent( + taskRunId: string, + trigger: "stop_reason" | "turn_complete", + endedAtTs: number, + ): void { + const tally = this.liveTurnContent.get(taskRunId); + if (!tally) return; + this.liveTurnContent.delete(taskRunId); + const session = this.d.store.getSessions()[taskRunId]; + const payload = { + taskRunId, + taskId: session?.taskId, + isCloud: session?.isCloud ?? false, + trigger, + agentTextChunks: tally.agentTextChunks, + agentOutputEvents: tally.agentOutputEvents, + durationMs: Math.max(0, endedAtTs - tally.startedAtTs), + }; + if (tally.agentTextChunks === 0 && tally.agentOutputEvents === 0) { + this.d.log.warn("Turn completed with no agent output", payload); + } else { + this.d.log.debug("Turn completed", payload); + } + } + private updatePromptStateFromEvents( taskRunId: string, events: AcpMessage[], @@ -1262,6 +1315,14 @@ export class SessionService { ): void { for (const acpMsg of events) { const msg = acpMsg.message; + const turnTally = isLive + ? this.liveTurnContent.get(taskRunId) + : undefined; + if (turnTally) { + const kind = classifyTurnEventKind(msg); + if (kind === "text") turnTally.agentTextChunks += 1; + else if (kind === "output") turnTally.agentOutputEvents += 1; + } if (isJsonRpcRequest(msg) && msg.method === "session/prompt") { this.d.store.updateSession(taskRunId, { isPromptPending: true, @@ -1269,6 +1330,13 @@ export class SessionService { pausedDurationMs: 0, currentPromptId: msg.id, }); + if (isLive) { + this.liveTurnContent.set(taskRunId, { + startedAtTs: acpMsg.ts, + agentTextChunks: 0, + agentOutputEvents: 0, + }); + } const promptSession = this.d.store.getSessions()[taskRunId]; if (promptSession?.isCloud) { this.cloudRunIdleTracker.markBusy(promptSession); @@ -1298,6 +1366,9 @@ export class SessionService { promptStartedAt: null, currentPromptId: null, }); + if (isLive) { + this.finalizeTurnContent(taskRunId, "stop_reason", acpMsg.ts); + } } if (isTurnCompleteEvent(acpMsg)) { // Local sessions use the JSON-RPC response as the canonical turn-done @@ -1320,6 +1391,7 @@ export class SessionService { ); } this.d.taskViewedApi.markActivity(session.taskId); + this.finalizeTurnContent(taskRunId, "turn_complete", acpMsg.ts); } } } diff --git a/packages/shared/src/errors.ts b/packages/shared/src/errors.ts index 37a9c727d9..6d93d4ce28 100644 --- a/packages/shared/src/errors.ts +++ b/packages/shared/src/errors.ts @@ -29,6 +29,49 @@ export function getErrorMessage(error: unknown): string { return ""; } +export interface SerializedError { + name?: string; + message: string; + code?: string | number; + cause?: SerializedError; +} + +/** + * Flatten an error and its `cause` chain into a plain, log-friendly object. + * undici surfaces the real socket-level reason for `TypeError: terminated` + * (e.g. ECONNRESET, UND_ERR_SOCKET, UND_ERR_HEADERS_TIMEOUT) on `error.cause`, + * which a bare `error.message` throws away. Depth-bounded to avoid cycles. + */ +export function serializeError(error: unknown, maxDepth = 5): SerializedError { + if (typeof error === "object" && error !== null) { + const source = error as { + name?: unknown; + message?: unknown; + code?: unknown; + cause?: unknown; + }; + const result: SerializedError = { + message: + typeof source.message === "string" + ? source.message + : error instanceof Error + ? error.message + : String(error), + }; + if (typeof source.name === "string") { + result.name = source.name; + } + if (typeof source.code === "string" || typeof source.code === "number") { + result.code = source.code; + } + if (source.cause != null && maxDepth > 0) { + result.cause = serializeError(source.cause, maxDepth - 1); + } + return result; + } + return { message: String(error) }; +} + export function isAuthError(error: unknown): boolean { const message = getErrorMessage(error).toLowerCase(); if (!message) return false; diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index a47492e6f2..8ecefc4d5e 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -49,6 +49,8 @@ export { isNotAuthenticatedError, isRateLimitError, NotAuthenticatedError, + type SerializedError, + serializeError, } from "./errors"; export type { ExecutionMode } from "./exec-types"; export * from "./flags"; diff --git a/packages/workspace-server/src/services/agent/agent.ts b/packages/workspace-server/src/services/agent/agent.ts index 9a79db7ff3..9a49bff306 100644 --- a/packages/workspace-server/src/services/agent/agent.ts +++ b/packages/workspace-server/src/services/agent/agent.ts @@ -1367,6 +1367,14 @@ For git operations while detached: private async cleanupSession(taskRunId: string): Promise { const session = this.sessions.get(taskRunId); if (session) { + if (session.promptPending || session.inFlightMcpToolCalls.size > 0) { + this.log.warn("Cleaning up session with in-flight work", { + taskRunId, + taskId: session.taskId, + promptPending: session.promptPending, + inFlightMcpToolCalls: session.inFlightMcpToolCalls.size, + }); + } this.cancelInFlightMcpToolCalls(session); this.sleepService.release(taskRunId); try { diff --git a/packages/workspace-server/src/services/auth-proxy/auth-proxy.ts b/packages/workspace-server/src/services/auth-proxy/auth-proxy.ts index 9e316397fb..b2a73f1f6f 100644 --- a/packages/workspace-server/src/services/auth-proxy/auth-proxy.ts +++ b/packages/workspace-server/src/services/auth-proxy/auth-proxy.ts @@ -4,8 +4,12 @@ import { type RootLogger, type ScopedLogger, } from "@posthog/di/logger"; +import { serializeError } from "@posthog/shared"; import { inject, injectable } from "inversify"; -import { streamBodyToResponse } from "../proxy-stream/proxy-stream"; +import { + type StreamProgress, + streamBodyToResponse, +} from "../proxy-stream/proxy-stream"; import { AUTH_PROXY_AUTH } from "./identifiers"; import type { AuthProxyAuth } from "./ports"; @@ -178,8 +182,12 @@ export class AuthProxyService { options: RequestInit, res: http.ServerResponse, ): Promise { + const startedAt = Date.now(); + const progress: StreamProgress = { bytesWritten: 0 }; + let status = 0; try { const response = await this.auth.authenticatedFetch(url, options); + status = response.status; const responseHeaders: Record = {}; const stripHeaders = new Set([ @@ -194,14 +202,39 @@ export class AuthProxyService { res.writeHead(response.status, responseHeaders); - await streamBodyToResponse(response.body, res); + await streamBodyToResponse(response.body, res, progress); + + this.log.info("Auth proxy forward completed", { + url, + method: options.method, + status, + durationMs: Date.now() - startedAt, + bytesStreamed: progress.bytesWritten, + }); } catch (err) { + // headersSent distinguishes a pre-response failure (connection setup) + // from a mid-stream termination (the body died after status was sent). + // With durationMs + bytesStreamed this reveals whether an upstream + // timeout is cutting streamed LLM responses, and serializeError surfaces + // the real undici socket cause (ECONNRESET, UND_ERR_SOCKET) behind the + // generic "terminated" message. if (options.signal?.aborted) { this.log.debug("Upstream fetch aborted after client disconnect", { url, + durationMs: Date.now() - startedAt, + bytesStreamed: progress.bytesWritten, }); } else { - this.log.error("Proxy forward error", { url, err }); + this.log.error("Proxy forward error", { + url, + method: options.method, + status, + headersSent: res.headersSent, + durationMs: Date.now() - startedAt, + bytesStreamed: progress.bytesWritten, + stack: err instanceof Error ? err.stack : undefined, + errorDetail: serializeError(err), + }); } if (!res.headersSent) { res.writeHead(502); diff --git a/packages/workspace-server/src/services/proxy-stream/proxy-stream.ts b/packages/workspace-server/src/services/proxy-stream/proxy-stream.ts index edf3703aaf..38db9cde8d 100644 --- a/packages/workspace-server/src/services/proxy-stream/proxy-stream.ts +++ b/packages/workspace-server/src/services/proxy-stream/proxy-stream.ts @@ -1,5 +1,9 @@ import type http from "node:http"; +export interface StreamProgress { + bytesWritten: number; +} + function waitForDrainOrClose(res: http.ServerResponse): Promise { return new Promise((resolve) => { const settle = () => { @@ -21,6 +25,7 @@ function waitForDrainOrClose(res: http.ServerResponse): Promise { export async function streamBodyToResponse( body: ReadableStream | null, res: http.ServerResponse, + progress?: StreamProgress, ): Promise { if (!body) { res.end(); @@ -36,6 +41,9 @@ export async function streamBodyToResponse( res.end(); return; } + if (progress) { + progress.bytesWritten += value.byteLength; + } if (!res.write(value)) { await waitForDrainOrClose(res); }