Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions packages/agent/src/utils/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ export function createTappedWritableStream(
const { onMessage, logger } = options;
const decoder = new TextDecoder();
let buffer = "";
let _messageCount = 0;
let messageCount = 0;
let droppedWriteCount = 0;

return new WritableStream({
async write(chunk: Uint8Array) {
Expand All @@ -133,7 +134,7 @@ export function createTappedWritableStream(

for (const line of lines) {
if (!line.trim()) continue;
_messageCount++;
messageCount++;

onMessage(line);
}
Expand All @@ -144,7 +145,13 @@ export function createTappedWritableStream(
writer.releaseLock();
} catch (err) {
// Stream may be closed if subprocess crashed - log but don't throw
logger?.error("ACP write error", err);
droppedWriteCount++;
logger?.error("ACP write error", {
error: err instanceof Error ? err.message : String(err),
messageCount,
droppedWriteCount,
droppedBytes: chunk.byteLength,
});
}
},
async close() {
Expand Down
117 changes: 108 additions & 9 deletions packages/core/src/cloud-task/cloud-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
type IAnalytics,
} from "@posthog/platform/analytics";
import type { StoredLogEntry } from "@posthog/shared";
import { TypedEventEmitter } from "@posthog/shared";
import { serializeError, TypedEventEmitter } from "@posthog/shared";
import { ANALYTICS_EVENTS } from "@posthog/shared/analytics-events";
import { inject, injectable, preDestroy } from "inversify";
import type { CloudTaskPermissionRequestUpdate } from "./cloud-task-types";
Expand Down Expand Up @@ -106,6 +106,9 @@ interface WatcherState {
lastErrorMessage: string | null;
lastBranch: string | null;
lastStatusUpdatedAt: string | null;
connStartedAt: number;
connSentLastEventId: string | null;
connDataEventsReceived: number;
isBootstrapping: boolean;
hasEmittedSnapshot: boolean;
bufferedLogBatches: StoredLogEntry[][];
Expand Down Expand Up @@ -158,6 +161,16 @@ function isPermissionRequestEvent(
);
}

function isKeepaliveEvent(event: SseEvent): boolean {
return (
event.event === "keepalive" ||
(typeof event.data === "object" &&
event.data !== null &&
"type" in event.data &&
event.data.type === "keepalive")
);
}

function createStreamStatusError(status: number): CloudTaskStreamError {
switch (status) {
case 401:
Expand Down Expand Up @@ -432,6 +445,9 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
lastErrorMessage: null,
lastBranch: null,
lastStatusUpdatedAt: null,
connStartedAt: 0,
connSentLastEventId: null,
connDataEventsReceived: 0,
isBootstrapping: false,
hasEmittedSnapshot: false,
bufferedLogBatches: [],
Expand Down Expand Up @@ -620,10 +636,15 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
const controller = new AbortController();
watcher.sseAbortController = controller;

watcher.connStartedAt = 0;
watcher.connSentLastEventId = watcher.lastEventId;
watcher.connDataEventsReceived = 0;
const startLatest = Boolean(options?.startLatest && !watcher.lastEventId);

const url = new URL(
`${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream/`,
);
if (options?.startLatest && !watcher.lastEventId) {
if (startLatest) {
url.searchParams.set("start", "latest");
}
const headers: Record<string, string> = {
Expand All @@ -643,6 +664,9 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
// failed reconnect attempt (see SSE_HEALTHY_CONNECTION_MS).
let connectedAt = 0;
let streamWasEstablished = false;
let bytesReceived = 0;
let eventsReceived = 0;
let dataEventsReceived = 0;

try {
const response = await this.auth.authenticatedFetch(url.toString(), {
Expand All @@ -661,6 +685,21 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {

connectedAt = Date.now();
streamWasEstablished = true;
watcher.connStartedAt = connectedAt;

this.log.info("Cloud task SSE connected", {
key,
sentLastEventId: watcher.connSentLastEventId,
startLatest,
status: response.status,
server: response.headers.get("server"),
via: response.headers.get("via"),
cfRay: response.headers.get("cf-ray"),
cfCacheStatus: response.headers.get("cf-cache-status"),
xAccelBuffering: response.headers.get("x-accel-buffering"),
contentType: response.headers.get("content-type"),
requestId: response.headers.get("x-request-id"),
});

const reader = response.body.getReader();

Expand All @@ -674,9 +713,14 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
continue;
}

bytesReceived += value.byteLength;
const chunk = decoder.decode(value, { stream: true });
const events = parser.parse(chunk);
for (const event of events) {
eventsReceived += 1;
if (event.event !== "error" && !isKeepaliveEvent(event)) {
dataEventsReceived += 1;
}
this.handleSseEvent(key, event);
}
}
Expand All @@ -692,6 +736,14 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
return;
}

this.log.info("Cloud task stream closed cleanly", {
key,
connectionDurationMs: connectedAt ? Date.now() - connectedAt : 0,
bytesReceived,
eventsReceived,
dataEventsReceived,
lastEventId: this.watchers.get(key)?.lastEventId ?? null,
});
await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true });
} catch (error) {
this.flushLogBatch(key);
Expand Down Expand Up @@ -729,8 +781,20 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
this.log.warn("Cloud task stream error", {
key,
error: errorMessage,
errorDetail: serializeError(error),
wasHealthyStream,
isBackendError,
streamWasEstablished,
connectionDurationMs: streamWasEstablished
? Date.now() - connectedAt
: 0,
bytesReceived,
eventsReceived,
dataEventsReceived,
lastEventId: watcher?.lastEventId ?? null,
reconnectAttempts: watcher?.reconnectAttempts ?? 0,
streamErrorAttempts: watcher?.streamErrorAttempts ?? 0,
cumulativeReconnectAttempts: watcher?.cumulativeReconnectAttempts ?? 0,
});
await this.handleStreamCompletion(key, {
reconnectIfNonTerminal: true,
Expand Down Expand Up @@ -766,13 +830,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
// data.
watcher.reconnectAttempts = 0;

if (
event.event === "keepalive" ||
(typeof event.data === "object" &&
event.data !== null &&
"type" in event.data &&
event.data.type === "keepalive")
) {
if (isKeepaliveEvent(event)) {
return;
}

Expand All @@ -781,6 +839,15 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
watcher.streamErrorAttempts = 0;
watcher.cumulativeReconnectAttempts = 0;

watcher.connDataEventsReceived += 1;
if (watcher.connDataEventsReceived === 1 && watcher.connSentLastEventId) {
this.log.info("Cloud task SSE resumed", {
key,
resumedFrom: watcher.connSentLastEventId,
firstEventIdAfterResume: event.id ?? null,
});
}

if (isTaskRunStateEvent(event.data)) {
if (this.applyTaskRunState(watcher, event.data)) {
if (!watcher.isBootstrapping && !isTerminalStatus(watcher.lastStatus)) {
Expand Down Expand Up @@ -992,6 +1059,18 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
const watcher = this.watchers.get(key);
if (!watcher) return;

this.log.warn("Cloud task watcher failed", {
key,
errorTitle: error.title,
retryable: error.retryable,
status: watcher.lastStatus,
wasBootstrapping: watcher.isBootstrapping,
reconnectAttempts: watcher.reconnectAttempts,
cumulativeReconnectAttempts: watcher.cumulativeReconnectAttempts,
totalEntryCount: watcher.totalEntryCount,
lastEventId: watcher.lastEventId,
});

this.analytics.track(ANALYTICS_EVENTS.CLOUD_STREAM_DISCONNECTED, {
task_id: watcher.taskId,
run_id: watcher.runId,
Expand Down Expand Up @@ -1173,13 +1252,33 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
this.log.warn("Cloud task stream ended before terminal status", {
key,
status: watcher.lastStatus,
stage: watcher.lastStage,
stateChanged,
lastEventId: watcher.lastEventId,
sentLastEventId: watcher.connSentLastEventId,
connDataEventsReceived: watcher.connDataEventsReceived,
reconnectAttempts: watcher.reconnectAttempts,
streamErrorAttempts: watcher.streamErrorAttempts,
cumulativeReconnectAttempts: watcher.cumulativeReconnectAttempts,
});
this.scheduleReconnect(key, options.reconnectError, {
countAttempt: options.countReconnectAttempt ?? false,
});
return;
}

this.log.info("Cloud task terminal stop", {
key,
status: watcher.lastStatus,
reachedViaError: options.reconnectError !== undefined,
lastEventId: watcher.lastEventId,
sentLastEventId: watcher.connSentLastEventId,
totalEntryCount: watcher.totalEntryCount,
connDataEventsReceived: watcher.connDataEventsReceived,
connDurationMs:
watcher.connStartedAt !== 0 ? Date.now() - watcher.connStartedAt : 0,
});

// Always emit the latest status before stopping. Terminal states are
// intentionally deferred until stream completion; clean EOFs can also mean
// the backend has no more stream events even when the run status remains active.
Expand Down
72 changes: 72 additions & 0 deletions packages/core/src/sessions/sessionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,28 @@ export function isPermissionRequestAlreadySurfaced(
);
}

function classifyTurnEventKind(
msg: AcpMessage["message"],
): "text" | "output" | "other" {
if (!("method" in msg) || msg.method !== "session/update") return "other";
const update = (msg as { params?: { update?: Record<string, unknown> } })
.params?.update;
if (!update) return "other";
const sessionUpdate = update.sessionUpdate;
if (sessionUpdate === "agent_message_chunk") {
const content = update.content as { type?: string } | undefined;
return content?.type === "text" ? "text" : "output";
}
if (
sessionUpdate === "agent_thought_chunk" ||
sessionUpdate === "tool_call" ||
sessionUpdate === "tool_call_update"
) {
return "output";
}
return "other";
}

export class SessionService {
private connectingTasks = new Map<string, Promise<void>>();
private reconcilingTasks = new Set<string>();
Expand Down Expand Up @@ -398,6 +420,10 @@ export class SessionService {
private cloudLogGapReconciler: CloudLogGapReconciler;
/** Maps toolCallId → cloud requestId for routing permission responses */
private cloudPermissionRequestIds = new Map<string, string>();
private liveTurnContent = new Map<
string,
{ startedAtTs: number; agentTextChunks: number; agentOutputEvents: number }
>();
private idleKilledSubscription: { unsubscribe: () => void } | null = null;
/**
* Cached preview-config-options responses keyed by `${apiHost}::${adapter}`.
Expand Down Expand Up @@ -1220,6 +1246,7 @@ export class SessionService {
subscription?.event.unsubscribe();
subscription?.permission?.unsubscribe();
this.subscriptions.delete(taskRunId);
this.liveTurnContent.delete(taskRunId);
}

/**
Expand Down Expand Up @@ -1247,6 +1274,7 @@ export class SessionService {
this.localRepoPaths.clear();
this.localRecoveryAttempts.clear();
this.cloudPermissionRequestIds.clear();
this.liveTurnContent.clear();
this.cloudLogGapReconciler.clear();
this.dispatchingCloudQueues.clear();
this.scheduledCloudQueueFlushes.clear();
Expand All @@ -1255,20 +1283,60 @@ export class SessionService {
this.idleKilledSubscription = null;
}

private finalizeTurnContent(
taskRunId: string,
trigger: "stop_reason" | "turn_complete",
endedAtTs: number,
): void {
const tally = this.liveTurnContent.get(taskRunId);
if (!tally) return;
this.liveTurnContent.delete(taskRunId);
const session = this.d.store.getSessions()[taskRunId];
const payload = {
taskRunId,
taskId: session?.taskId,
isCloud: session?.isCloud ?? false,
trigger,
agentTextChunks: tally.agentTextChunks,
agentOutputEvents: tally.agentOutputEvents,
durationMs: Math.max(0, endedAtTs - tally.startedAtTs),
};
if (tally.agentTextChunks === 0 && tally.agentOutputEvents === 0) {
this.d.log.warn("Turn completed with no agent output", payload);
} else {
this.d.log.debug("Turn completed", payload);
}
}

private updatePromptStateFromEvents(
taskRunId: string,
events: AcpMessage[],
{ isLive = false }: { isLive?: boolean } = {},
): void {
for (const acpMsg of events) {
const msg = acpMsg.message;
const turnTally = isLive
? this.liveTurnContent.get(taskRunId)
: undefined;
if (turnTally) {
const kind = classifyTurnEventKind(msg);
if (kind === "text") turnTally.agentTextChunks += 1;
else if (kind === "output") turnTally.agentOutputEvents += 1;
}
if (isJsonRpcRequest(msg) && msg.method === "session/prompt") {
this.d.store.updateSession(taskRunId, {
isPromptPending: true,
promptStartedAt: acpMsg.ts,
pausedDurationMs: 0,
currentPromptId: msg.id,
});
if (isLive) {
this.liveTurnContent.set(taskRunId, {
startedAtTs: acpMsg.ts,
agentTextChunks: 0,
agentOutputEvents: 0,
});
}
const promptSession = this.d.store.getSessions()[taskRunId];
if (promptSession?.isCloud) {
this.cloudRunIdleTracker.markBusy(promptSession);
Expand Down Expand Up @@ -1298,6 +1366,9 @@ export class SessionService {
promptStartedAt: null,
currentPromptId: null,
});
if (isLive) {
this.finalizeTurnContent(taskRunId, "stop_reason", acpMsg.ts);
}
}
if (isTurnCompleteEvent(acpMsg)) {
// Local sessions use the JSON-RPC response as the canonical turn-done
Expand All @@ -1320,6 +1391,7 @@ export class SessionService {
);
}
this.d.taskViewedApi.markActivity(session.taskId);
this.finalizeTurnContent(taskRunId, "turn_complete", acpMsg.ts);
}
}
}
Expand Down
Loading
Loading