Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 306 additions & 2 deletions apps/code/src/main/services/agent/service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import fs, { mkdirSync, symlinkSync } from "node:fs";
import fs, { promises as fsPromises, mkdirSync, symlinkSync } from "node:fs";
import { homedir, tmpdir } from "node:os";
import { isAbsolute, join, relative, resolve, sep } from "node:path";
import {
Expand All @@ -18,7 +18,10 @@ import {
POSTHOG_NOTIFICATIONS,
} from "@posthog/agent";
import type { McpToolApprovals } from "@posthog/agent/adapters/claude/mcp/tool-metadata";
import { hydrateSessionJsonl } from "@posthog/agent/adapters/claude/session/jsonl-hydration";
import {
getSessionJsonlPath,
hydrateSessionJsonl,
} from "@posthog/agent/adapters/claude/session/jsonl-hydration";
import { getReasoningEffortOptions } from "@posthog/agent/adapters/reasoning-effort";
import { Agent } from "@posthog/agent/agent";
import {
Expand All @@ -38,10 +41,12 @@ import { getLlmGatewayUrl } from "@posthog/agent/posthog-api";
import { extractCreatedPrUrl } from "@posthog/agent/pr-url-detector";
import type * as AgentTypes from "@posthog/agent/types";
import { getCurrentBranch } from "@posthog/git/queries";
import { CaptureCheckpointSaga } from "@posthog/git/sagas/checkpoint";
import type { IAppMeta } from "@posthog/platform/app-meta";
import type { IBundledResources } from "@posthog/platform/bundled-resources";
import type { IPowerManager } from "@posthog/platform/power-manager";
import type { IStoragePaths } from "@posthog/platform/storage-paths";
import { DATA_DIR } from "@shared/constants";
import { isAuthError } from "@shared/errors";
import type { AcpMessage } from "@shared/types/session-events";
import { inject, injectable, preDestroy } from "inversify";
Expand Down Expand Up @@ -286,6 +291,13 @@ export class AgentService extends TypedEventEmitter<AgentServiceEvents> {

private sessions = new Map<string, ManagedSession>();
private pendingPermissions = new Map<string, PendingPermission>();
/** taskRunIds that need force-refetch of JSONL on next reconnect (checkpoint restore). */
private checkpointRestoreTaskRunIds = new Set<string>();
/** Checkpoint notifications captured per taskRunId, in capture order. Survives session reconnect. */
private sessionCheckpoints = new Map<
string,
Array<{ checkpointId: string; ts: number; promptId: number | undefined }>
>();
private mockNodeReady = false;
private idleTimeouts = new Map<
string,
Expand Down Expand Up @@ -758,6 +770,11 @@ When creating pull requests, add the following footer at the end of the PR descr
if (adapter !== "codex") {
const posthogAPI = agent.getPosthogAPI();
if (posthogAPI) {
const forceRefetch =
this.checkpointRestoreTaskRunIds.has(taskRunId);
if (forceRefetch) {
this.checkpointRestoreTaskRunIds.delete(taskRunId);
}
const hasSession = await hydrateSessionJsonl({
sessionId: existingSessionId,
cwd: repoPath,
Expand All @@ -766,6 +783,7 @@ When creating pull requests, add the following footer at the end of the PR descr
permissionMode: config.permissionMode,
posthogAPI,
log,
forceRefetch,
});
if (!hasSession) {
log.info(
Expand All @@ -780,6 +798,11 @@ When creating pull requests, add the following footer at the end of the PR descr

if (isReconnect && config.sessionId) {
const existingSessionId = config.sessionId;
log.info("Reconnecting with existing sessionId", {
taskId,
taskRunId,
sessionId: existingSessionId,
});

// Both adapters implement resumeSession:
// - Claude: delegates to SDK's resumeSession with JSONL hydration
Expand Down Expand Up @@ -993,6 +1016,106 @@ When creating pull requests, add the following footer at the end of the PR descr
return this.sessions.get(taskRunId);
}

getSessionInfo(taskRunId: string):
| {
sessionId: string;
repoPath: string;
taskId: string;
apiHost: string;
projectId: number;
adapter: "claude" | "codex" | undefined;
}
| undefined {
const session = this.sessions.get(taskRunId);
if (!session?.config.sessionId) return undefined;
return {
sessionId: session.config.sessionId,
repoPath: session.repoPath,
taskId: session.config.taskId,
apiHost: session.config.credentials.apiHost,
projectId: session.config.credentials.projectId,
adapter: session.config.adapter,
};
}

/**
* Re-emit stored checkpoint notifications through the SessionEvent channel.
* Called by the renderer after its subscription is set up so no events are lost.
*/
replayCheckpoints(taskRunId: string): number {
const checkpoints = this.sessionCheckpoints.get(taskRunId) ?? [];
if (checkpoints.length === 0) return 0;

log.info("Replaying stored checkpoints via SessionEvent", {
taskRunId,
count: checkpoints.length,
});

for (const { checkpointId, ts, promptId } of checkpoints) {
this.emit(AgentServiceEvent.SessionEvent, {
taskRunId,
payload: {
type: "acp_message",
ts,
message: {
jsonrpc: "2.0" as const,
method: POSTHOG_NOTIFICATIONS.GIT_CHECKPOINT,
// Mark as replay so renderer doesn't re-sync to S3
params: { checkpointId, promptId, replay: true },
},
},
});
}

return checkpoints.length;
}

/**
* Get the promptId for a checkpoint. Used when truncating S3 log so the
* backend can find the correct turn boundary by promptId.
*/
getCheckpointPromptId(
taskRunId: string,
checkpointId: string,
): number | undefined {
const checkpoints = this.sessionCheckpoints.get(taskRunId) ?? [];
const cp = checkpoints.find((c) => c.checkpointId === checkpointId);
return cp?.promptId;
}

/**
* Remove stored checkpoints that come AFTER the given checkpointId (inclusive
* of the target). Called after a restore so replayCheckpoints only re-emits
* the surviving checkpoints and not orphaned ones whose git refs were deleted.
*/
truncateCheckpoints(taskRunId: string, keepUpToCheckpointId: string): number {
const checkpoints = this.sessionCheckpoints.get(taskRunId) ?? [];
const idx = checkpoints.findIndex(
(cp) => cp.checkpointId === keepUpToCheckpointId,
);
if (idx === -1) return 0;
this.sessionCheckpoints.set(taskRunId, checkpoints.slice(0, idx + 1));
log.info("Truncated stored checkpoints after restore", {
taskRunId,
keepUpTo: keepUpToCheckpointId,
kept: idx + 1,
removed: checkpoints.length - idx - 1,
});
return idx + 1;
}

/**
* Mark a taskRunId so the next reconnect forces JSONL re-hydration from S3
* instead of reusing an existing (stale) JSONL file. Called before cancelSession
* during checkpoint restore so hydrateSessionJsonl skips the "already exists" guard.
*/
markCheckpointRestore(taskRunId: string): void {
this.checkpointRestoreTaskRunIds.add(taskRunId);
log.info("Marked taskRunId for force JSONL refetch on reconnect", {
taskRunId,
});
}

async setSessionConfigOption(
sessionId: string,
configId: string,
Expand Down Expand Up @@ -1263,6 +1386,10 @@ For git operations while detached:
});
};

// Track the most recent session/prompt request ID so the checkpoint
// notification can be tagged with the turn it belongs to.
let latestPromptId: number | undefined;

const onAcpMessage = (message: unknown) => {
const acpMessage: AcpMessage = {
type: "acp_message",
Expand All @@ -1271,8 +1398,25 @@ For git operations while detached:
};
emitToRenderer(acpMessage);

// Track session/prompt request IDs for turn-tagging
const raw = message as { method?: string; id?: number };
if (raw.method === "session/prompt" && raw.id !== undefined) {
latestPromptId = raw.id;
log.debug("Tracked session/prompt id", { taskRunId, promptId: raw.id });
}

// Inspect tool call updates for PR URLs and file activity
this.handleToolCallUpdate(taskRunId, message as AcpMessage["message"]);

// Capture a local git checkpoint when a turn completes.
// Intercepted here (raw stream tap) rather than extNotification because
// the ACP SDK does not reliably route _posthog/ notifications to that callback.
this.handleTurnCompleteForCheckpoint(
taskRunId,
message,
latestPromptId,
emitToRenderer,
);
};

const tappedReadable = createTappedReadableStream(
Expand Down Expand Up @@ -1729,6 +1873,166 @@ For git operations while detached:
});
}

private handleTurnCompleteForCheckpoint(
taskRunId: string,
message: unknown,
promptId: number | undefined,
emitToRenderer: (payload: unknown) => void,
): void {
const msg = message as { method?: string };
if (!isNotification(msg.method, POSTHOG_NOTIFICATIONS.TURN_COMPLETE))
return;

const session = this.sessions.get(taskRunId);
if (!session?.config.repoPath) {
log.debug("TURN_COMPLETE in stream — no repoPath, skipping checkpoint", {
taskRunId,
});
return;
}

log.info("TURN_COMPLETE in stream — capturing local checkpoint", {
taskRunId,
repoPath: session.config.repoPath,
promptId,
});

this.captureLocalCheckpoint(
taskRunId,
session.config.repoPath,
session.config.sessionId,
promptId,
emitToRenderer,
).catch((err) => {
log.warn("Local checkpoint capture failed", {
taskRunId,
error: err instanceof Error ? err.message : String(err),
});
});
}

/**
* Capture a local git checkpoint after a turn completes, emit the
* `_posthog/git_checkpoint` notification to the renderer, and append it to
* the session JSONL so it survives page reload.
*/
private async captureLocalCheckpoint(
taskRunId: string,
repoPath: string,
sessionId: string | undefined,
promptId: number | undefined,
emitToRenderer: (payload: unknown) => void,
): Promise<void> {
log.info("Capturing local checkpoint after turn", { taskRunId, repoPath });

const saga = new CaptureCheckpointSaga();
const sagaResult = await saga.run({ baseDir: repoPath });
if (!sagaResult.success) {
log.warn("CaptureCheckpointSaga failed — no checkpoint for this turn", {
taskRunId,
error: sagaResult.error,
});
return;
}

const result = sagaResult.data;
log.info("Local checkpoint captured", {
taskRunId,
checkpointId: result.checkpointId,
commit: result.commit,
branch: result.branch,
});

// Persist mapping so we can re-inject on reconnect, with promptId for
// correct turn association regardless of when the notification arrives.
const ts = Date.now();
const existing = this.sessionCheckpoints.get(taskRunId) ?? [];
existing.push({ checkpointId: result.checkpointId, ts, promptId });
this.sessionCheckpoints.set(taskRunId, existing);
log.info("Stored checkpoint for reconnect replay", {
taskRunId,
checkpointId: result.checkpointId,
promptId,
totalStored: existing.length,
});

const notification = {
jsonrpc: "2.0" as const,
method: POSTHOG_NOTIFICATIONS.GIT_CHECKPOINT,
params: { checkpointId: result.checkpointId, promptId },
};

// Emit to renderer so the restore button activates on the completed turn
const acpMessage: AcpMessage = {
type: "acp_message",
ts: Date.now(),
message: notification as AcpMessage["message"],
};
emitToRenderer(acpMessage);

log.info("Emitted GIT_CHECKPOINT notification to renderer", {
taskRunId,
checkpointId: result.checkpointId,
});

// Append to the session JSONL so restore can find the checkpoint on reload
if (sessionId) {
try {
const jsonlPath = getSessionJsonlPath(sessionId, repoPath);
const line = `${JSON.stringify({ notification })}\n`;
await fsPromises.appendFile(jsonlPath, line, "utf-8");
log.info("Checkpoint appended to JSONL", {
taskRunId,
checkpointId: result.checkpointId,
jsonlPath,
});
} catch (err) {
log.warn(
"Failed to append checkpoint to JSONL (restore may not survive reload)",
{
taskRunId,
error: err instanceof Error ? err.message : String(err),
},
);
}
} else {
log.warn("No sessionId yet — checkpoint not written to JSONL", {
taskRunId,
});
}

// Also append to the local logs.ndjson cache. The renderer's fetchSessionLogs
// reads logs.ndjson first (before S3), and the in-memory sessionCheckpoints
// map is lost when the main process restarts. Without this, the checkpoint
// notification lives only in S3 + the in-memory map, so after the app is
// reopened the cold load reads a checkpoint-less local cache and every
// restore icon goes disabled. This append (matching the SessionLogWriter
// tap's line-by-line model) keeps checkpoints visible across restarts.
try {
const sessionDir = join(homedir(), DATA_DIR, "sessions", taskRunId);
await fsPromises.mkdir(sessionDir, { recursive: true });
const entry = {
type: "notification" as const,
timestamp: new Date().toISOString(),
notification,
};
await fsPromises.appendFile(
join(sessionDir, "logs.ndjson"),
`${JSON.stringify(entry)}\n`,
"utf-8",
);
log.info("Checkpoint appended to local logs.ndjson", {
taskRunId,
checkpointId: result.checkpointId,
});
} catch (err) {
log.warn("Failed to append checkpoint to local logs.ndjson", {
taskRunId,
error: err instanceof Error ? err.message : String(err),
});
}
}

async getGatewayModels(apiHost: string) {
const gatewayUrl = getLlmGatewayUrl(apiHost);
const models = await fetchGatewayModels({ gatewayUrl });
Expand Down
Loading