diff --git a/apps/code/scripts/download-binaries.mjs b/apps/code/scripts/download-binaries.mjs index 26a3e10f6a..4b1a049f2d 100644 --- a/apps/code/scripts/download-binaries.mjs +++ b/apps/code/scripts/download-binaries.mjs @@ -7,6 +7,7 @@ import { existsSync, mkdirSync, realpathSync, + renameSync, rmSync, } from "node:fs"; import { dirname, join } from "node:path"; @@ -47,6 +48,40 @@ const BINARIES = [ return target; }, }, + { + name: "codex", + version: "0.140.0", + getUrl: (version, target) => { + if (target.includes("windows")) { + return `https://github.com/openai/codex/releases/download/rust-v${version}/codex-${target}.exe.zip`; + } + return `https://github.com/openai/codex/releases/download/rust-v${version}/codex-${target}.tar.gz`; + }, + getTarget: () => { + const { platform, arch } = process; + const targets = { + darwin: { arm64: "aarch64-apple-darwin", x64: "x86_64-apple-darwin" }, + linux: { + arm64: "aarch64-unknown-linux-musl", + x64: "x86_64-unknown-linux-musl", + }, + win32: { + arm64: "aarch64-pc-windows-msvc", + x64: "x86_64-pc-windows-msvc", + }, + }; + const platformTargets = targets[platform]; + if (!platformTargets) + throw new Error(`Unsupported platform: ${platform}`); + const target = platformTargets[arch]; + if (!target) throw new Error(`Unsupported arch: ${arch}`); + return target; + }, + // The codex release archive contains a target-suffixed binary + // (e.g. `codex-aarch64-apple-darwin`); rename it to `codex` after extract. + archiveBinaryName: (target) => + process.platform === "win32" ? `codex-${target}.exe` : `codex-${target}`, + }, { name: "rg", version: "15.0.0", @@ -159,6 +194,13 @@ async function downloadBinary(binary) { await extractArchive(archivePath, DEST_DIR); rmSync(archivePath); + if (binary.archiveBinaryName) { + const extractedPath = join(DEST_DIR, binary.archiveBinaryName(target)); + if (extractedPath !== binaryPath && existsSync(extractedPath)) { + renameSync(extractedPath, binaryPath); + } + } + if (!existsSync(binaryPath)) { throw new Error(`Binary not found after extraction: ${binaryPath}`); } diff --git a/apps/code/vite.main.config.mts b/apps/code/vite.main.config.mts index 5dfe558118..6abd311c19 100644 --- a/apps/code/vite.main.config.mts +++ b/apps/code/vite.main.config.mts @@ -566,6 +566,7 @@ function copyCodexAcpBinaries(): Plugin { const sourceDir = join(__dirname, "resources/codex-acp"); const binaries = [ { name: "codex-acp", winName: "codex-acp.exe" }, + { name: "codex", winName: "codex.exe" }, { name: "rg", winName: "rg.exe" }, ]; diff --git a/packages/agent/src/adapters/acp-connection.ts b/packages/agent/src/adapters/acp-connection.ts index c271e03b6c..405c94fd65 100644 --- a/packages/agent/src/adapters/acp-connection.ts +++ b/packages/agent/src/adapters/acp-connection.ts @@ -10,6 +10,8 @@ import { import { ClaudeAcpAgent } from "./claude/claude-agent"; import { CodexAcpAgent } from "./codex/codex-agent"; import type { CodexProcessOptions } from "./codex/spawn"; +import { nativeCodexBinaryPath } from "./codex-app-server/binary-path"; +import { CodexAppServerAgent } from "./codex-app-server/codex-app-server-agent"; type AgentAdapter = "claude" | "codex"; @@ -199,10 +201,33 @@ function createCodexConnection(config: AcpConnectionConfig): AcpConnection { const agentStream = ndJsonStream(agentWritable, streams.agent.readable); - let agent: CodexAcpAgent | null = null; + let agent: CodexAcpAgent | CodexAppServerAgent | null = null; const agentConnection = new AgentSideConnection((client) => { + const codexOptions = config.codexOptions ?? {}; + const nativeBinary = nativeCodexBinaryPath(codexOptions.binaryPath); + + // The native app-server is the default Codex harness. Fall back to the + // codex-acp (Zed) adapter only when the codex binary isn't bundled or when + // POSTHOG_CODEX_USE_ACP is set as an escape hatch. + if (nativeBinary && process.env.POSTHOG_CODEX_USE_ACP !== "1") { + agent = new CodexAppServerAgent(client, { + processOptions: { + binaryPath: nativeBinary, + cwd: codexOptions.cwd, + apiBaseUrl: codexOptions.apiBaseUrl, + apiKey: codexOptions.apiKey, + developerInstructions: codexOptions.developerInstructions, + }, + model: codexOptions.model, + reasoningEffort: codexOptions.reasoningEffort, + processCallbacks: config.processCallbacks, + logger: config.logger?.child("CodexAppServerAgent"), + }); + return agent; + } + agent = new CodexAcpAgent(client, { - codexProcessOptions: config.codexOptions ?? {}, + codexProcessOptions: codexOptions, processCallbacks: config.processCallbacks, posthogApiConfig: resolveEnricherApiConfig(config), onStructuredOutput: config.onStructuredOutput, diff --git a/packages/agent/src/adapters/codex-app-server/app-server-client.test.ts b/packages/agent/src/adapters/codex-app-server/app-server-client.test.ts new file mode 100644 index 0000000000..db734950b0 --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/app-server-client.test.ts @@ -0,0 +1,156 @@ +import { describe, expect, it, vi } from "vitest"; +import { Logger } from "../../utils/logger"; +import { + createBidirectionalStreams, + type StreamPair, +} from "../../utils/streams"; +import { AppServerClient } from "./app-server-client"; + +interface RpcMessage { + id?: number; + method?: string; + params?: unknown; + result?: unknown; + error?: { code: number; message: string }; +} + +/** + * Drives the "server" end of a {@link StreamPair}: reads newline-delimited + * JSON-RPC the client sent and writes framed responses/notifications back. + */ +function makeFakeServer(transport: StreamPair) { + const writer = transport.writable.getWriter(); + const reader = transport.readable.getReader(); + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + let buffer = ""; + + return { + async readMessage(): Promise { + for (let guard = 0; guard < 10_000; guard++) { + const newlineIndex = buffer.indexOf("\n"); + if (newlineIndex !== -1) { + const line = buffer.slice(0, newlineIndex).trim(); + buffer = buffer.slice(newlineIndex + 1); + if (line) return JSON.parse(line) as RpcMessage; + continue; + } + const { value, done } = await reader.read(); + if (done) throw new Error("server stream closed"); + buffer += decoder.decode(value, { stream: true }); + } + throw new Error("no message read"); + }, + async send(message: RpcMessage): Promise { + await writer.write(encoder.encode(`${JSON.stringify(message)}\n`)); + }, + }; +} + +const silentLogger = new Logger({ debug: false }); + +describe("AppServerClient", () => { + it("resolves a request when the server returns a matching response", async () => { + const streams = createBidirectionalStreams(); + const client = new AppServerClient(streams.client, { + logger: silentLogger, + }); + const server = makeFakeServer(streams.agent); + + const pending = client.request("initialize", { + clientInfo: { name: "posthog-code" }, + }); + + const request = await server.readMessage(); + expect(request.method).toBe("initialize"); + expect(typeof request.id).toBe("number"); + expect(request.params).toEqual({ clientInfo: { name: "posthog-code" } }); + + await server.send({ + id: request.id as number, + result: { userAgent: "codex" }, + }); + + await expect(pending).resolves.toEqual({ userAgent: "codex" }); + await client.close(); + }); + + it("rejects a request when the server returns an error", async () => { + const streams = createBidirectionalStreams(); + const client = new AppServerClient(streams.client, { + logger: silentLogger, + }); + const server = makeFakeServer(streams.agent); + + const pending = client.request("turn/start", {}); + const request = await server.readMessage(); + await server.send({ + id: request.id as number, + error: { code: -32001, message: "Server overloaded; retry later." }, + }); + + await expect(pending).rejects.toThrow("Server overloaded; retry later."); + await client.close(); + }); + + it("dispatches server notifications to the handler in order", async () => { + const streams = createBidirectionalStreams(); + const received: string[] = []; + const client = new AppServerClient(streams.client, { + logger: silentLogger, + onNotification: (method, params) => { + if (method === "item/agentMessage/delta") { + received.push((params as { delta: string }).delta); + } + }, + }); + const server = makeFakeServer(streams.agent); + + await server.send({ + method: "item/agentMessage/delta", + params: { delta: "Hel" }, + }); + await server.send({ + method: "item/agentMessage/delta", + params: { delta: "lo" }, + }); + + await vi.waitFor(() => expect(received.length).toBe(2)); + expect(received.join("")).toBe("Hello"); + await client.close(); + }); + + it("answers server-initiated requests via onRequest", async () => { + const streams = createBidirectionalStreams(); + const client = new AppServerClient(streams.client, { + logger: silentLogger, + onRequest: async (method) => ({ + decision: method === "applyPatchApproval" ? "approved" : "denied", + }), + }); + const server = makeFakeServer(streams.agent); + + await server.send({ + id: 99, + method: "applyPatchApproval", + params: {}, + }); + + const response = await server.readMessage(); + expect(response.id).toBe(99); + expect(response.result).toEqual({ decision: "approved" }); + await client.close(); + }); + + it("rejects in-flight requests when closed", async () => { + const streams = createBidirectionalStreams(); + const client = new AppServerClient(streams.client, { + logger: silentLogger, + }); + + const pending = client.request("thread/start", {}); + await client.close(); + + await expect(pending).rejects.toThrow(/closed/i); + }); +}); diff --git a/packages/agent/src/adapters/codex-app-server/app-server-client.ts b/packages/agent/src/adapters/codex-app-server/app-server-client.ts new file mode 100644 index 0000000000..1fc5564ced --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/app-server-client.ts @@ -0,0 +1,209 @@ +import { Logger } from "../../utils/logger"; +import type { StreamPair } from "../../utils/streams"; +import type { JsonRpcMessage, JsonRpcResponse } from "./protocol"; + +export interface AppServerClientHandlers { + /** Server-pushed notification (no id), e.g. `item/agentMessage/delta`. */ + onNotification?: (method: string, params: unknown) => void; + /** + * Server-initiated request (has an id), e.g. an approval. The resolved value + * is returned to the server as the JSON-RPC result. + */ + onRequest?: (method: string, params: unknown) => Promise; + /** Fired once when the stream ends without an explicit close() (process exit). */ + onClose?: () => void; + logger?: Logger; +} + +interface PendingCall { + resolve: (value: unknown) => void; + reject: (reason: unknown) => void; +} + +/** The subset of the client the agent depends on, so it can be faked in tests. */ +export interface AppServerRpc { + request(method: string, params?: unknown): Promise; + notify(method: string, params?: unknown): void; + close(): Promise; +} + +/** + * Bidirectional newline-delimited JSON-RPC client for the native Codex + * `app-server` subprocess. Unlike the codex-acp adapter this speaks Codex's + * own protocol rather than ACP, so it cannot reuse the ACP SDK connection. + * + * Transport-agnostic: it is given a {@link StreamPair} so tests can drive it + * over in-memory streams without spawning a process. + */ +export class AppServerClient implements AppServerRpc { + private readonly writer: WritableStreamDefaultWriter; + private readonly encoder = new TextEncoder(); + private readonly pending = new Map(); + private readonly handlers: AppServerClientHandlers; + private readonly logger: Logger; + private reader?: ReadableStreamDefaultReader; + private nextId = 1; + private closed = false; + private buffer = ""; + + constructor(transport: StreamPair, handlers: AppServerClientHandlers = {}) { + this.handlers = handlers; + this.logger = + handlers.logger ?? + new Logger({ debug: false, prefix: "[AppServerClient]" }); + this.writer = transport.writable.getWriter(); + void this.readLoop(transport.readable); + } + + request(method: string, params?: unknown): Promise { + const id = this.nextId++; + const promise = new Promise((resolve, reject) => { + this.pending.set(id, { + resolve: resolve as (value: unknown) => void, + reject, + }); + }); + this.send({ id, method, params }); + return promise; + } + + notify(method: string, params?: unknown): void { + this.send({ method, params }); + } + + async close(): Promise { + if (this.closed) return; + this.closed = true; + for (const call of this.pending.values()) { + call.reject(new Error("AppServerClient closed")); + } + this.pending.clear(); + try { + await this.reader?.cancel(); + } catch { + // reader already released + } + try { + await this.writer.close(); + } catch { + // writable already closed + } + } + + private send(message: JsonRpcMessage): void { + const line = `${JSON.stringify(message)}\n`; + this.writer.write(this.encoder.encode(line)).catch((err) => { + if (!this.closed) { + this.logger.error("Failed to write app-server message", err); + } + }); + } + + private async readLoop(readable: StreamPair["readable"]): Promise { + this.reader = readable.getReader(); + const decoder = new TextDecoder(); + try { + while (!this.closed) { + const { value, done } = await this.reader.read(); + if (done) break; + this.buffer += decoder.decode(value, { stream: true }); + let newlineIndex = this.buffer.indexOf("\n"); + while (newlineIndex !== -1) { + const line = this.buffer.slice(0, newlineIndex).trim(); + this.buffer = this.buffer.slice(newlineIndex + 1); + if (line) this.dispatch(line); + newlineIndex = this.buffer.indexOf("\n"); + } + } + } catch (err) { + if (!this.closed) { + this.logger.error("App-server read loop failed", err); + } + } finally { + try { + this.reader.releaseLock(); + } catch { + // lock already released by cancel() + } + if (!this.closed) { + // The stream ended without an explicit close() (the process exited). + // Fail in-flight calls and notify the owner so a pending turn does not + // hang forever. + this.closed = true; + for (const call of this.pending.values()) { + call.reject(new Error("codex app-server stream closed")); + } + this.pending.clear(); + this.handlers.onClose?.(); + } + } + } + + private dispatch(line: string): void { + let message: JsonRpcMessage; + try { + message = JSON.parse(line) as JsonRpcMessage; + } catch (err) { + this.logger.warn("Ignoring non-JSON app-server line", { line, err }); + return; + } + + const id = (message as { id?: unknown }).id; + const method = (message as { method?: unknown }).method; + const params = (message as { params?: unknown }).params; + + if (typeof method !== "string") { + if (typeof id === "number") { + this.handleResponse(message as JsonRpcResponse); + } + return; + } + + if (typeof id === "number") { + void this.handleIncomingRequest(id, method, params); + return; + } + + this.handlers.onNotification?.(method, params); + } + + private handleResponse(message: JsonRpcResponse): void { + const call = this.pending.get(message.id); + if (!call) { + this.logger.warn("Response for unknown request id", { id: message.id }); + return; + } + this.pending.delete(message.id); + if (message.error) { + call.reject(new Error(message.error.message)); + } else { + call.resolve(message.result); + } + } + + private async handleIncomingRequest( + id: number, + method: string, + params: unknown, + ): Promise { + if (!this.handlers.onRequest) { + this.send({ + id, + error: { code: -32601, message: `Method not handled: ${method}` }, + }); + return; + } + try { + const result = await this.handlers.onRequest(method, params); + this.send({ id, result }); + } catch (err) { + this.send({ + id, + error: { + code: -32000, + message: err instanceof Error ? err.message : String(err), + }, + }); + } + } +} diff --git a/packages/agent/src/adapters/codex-app-server/binary-path.test.ts b/packages/agent/src/adapters/codex-app-server/binary-path.test.ts new file mode 100644 index 0000000000..f8e46a544d --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/binary-path.test.ts @@ -0,0 +1,29 @@ +import { describe, expect, it, vi } from "vitest"; + +const existsSyncMock = vi.hoisted(() => vi.fn()); +vi.mock("node:fs", async (importOriginal) => ({ + ...(await importOriginal()), + existsSync: existsSyncMock, +})); + +const { nativeCodexBinaryPath } = await import("./binary-path"); + +describe("nativeCodexBinaryPath", () => { + it("returns undefined without a codex-acp path", () => { + expect(nativeCodexBinaryPath(undefined)).toBeUndefined(); + }); + + it("returns undefined when the sibling codex binary is absent", () => { + existsSyncMock.mockReturnValue(false); + expect( + nativeCodexBinaryPath("/bundle/codex-acp/codex-acp"), + ).toBeUndefined(); + }); + + it("returns the sibling codex binary when present", () => { + existsSyncMock.mockReturnValue(true); + expect(nativeCodexBinaryPath("/bundle/codex-acp/codex-acp")).toBe( + "/bundle/codex-acp/codex", + ); + }); +}); diff --git a/packages/agent/src/adapters/codex-app-server/binary-path.ts b/packages/agent/src/adapters/codex-app-server/binary-path.ts new file mode 100644 index 0000000000..c025522cd2 --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/binary-path.ts @@ -0,0 +1,17 @@ +import { existsSync } from "node:fs"; +import { dirname, join } from "node:path"; + +/** + * The native codex CLI is bundled next to codex-acp, so derive its path from + * the codex-acp binary path (same directory, `codex` instead of `codex-acp`). + * Returns undefined when the binary isn't present (e.g. the npx fallback), in + * which case the caller keeps using the codex-acp adapter. + */ +export function nativeCodexBinaryPath( + codexAcpPath?: string, +): string | undefined { + if (!codexAcpPath) return undefined; + const binaryName = process.platform === "win32" ? "codex.exe" : "codex"; + const candidate = join(dirname(codexAcpPath), binaryName); + return existsSync(candidate) ? candidate : undefined; +} diff --git a/packages/agent/src/adapters/codex-app-server/codex-app-server-agent.test.ts b/packages/agent/src/adapters/codex-app-server/codex-app-server-agent.test.ts new file mode 100644 index 0000000000..140c4abed1 --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/codex-app-server-agent.test.ts @@ -0,0 +1,264 @@ +import type { + AgentSideConnection, + InitializeRequest, + NewSessionRequest, + PromptRequest, +} from "@agentclientprotocol/sdk"; +import { describe, expect, it } from "vitest"; +import type { + AppServerClientHandlers, + AppServerRpc, +} from "./app-server-client"; +import { CodexAppServerAgent } from "./codex-app-server-agent"; + +function makeStubRpc(responses: Record) { + let handlers: AppServerClientHandlers | undefined; + const requests: Array<{ method: string; params?: unknown }> = []; + + const rpc: AppServerRpc = { + async request(method: string, params?: unknown): Promise { + requests.push({ method, params }); + return (responses[method] ?? {}) as T; + }, + notify() {}, + async close() {}, + }; + + return { + requests, + factory(captured: AppServerClientHandlers): AppServerRpc { + handlers = captured; + return rpc; + }, + emit(method: string, params: unknown) { + handlers?.onNotification?.(method, params); + }, + invokeRequest(method: string, params: unknown): Promise { + if (!handlers?.onRequest) throw new Error("no onRequest handler"); + return handlers.onRequest(method, params); + }, + triggerClose() { + handlers?.onClose?.(); + }, + }; +} + +function makeFakeClient( + outcome: unknown = { outcome: "selected", optionId: "allow" }, +) { + const sessionUpdates: unknown[] = []; + const client = { + sessionUpdate: async (notification: unknown) => { + sessionUpdates.push(notification); + }, + requestPermission: async () => ({ outcome }), + } as unknown as AgentSideConnection; + return { client, sessionUpdates }; +} + +const init = { protocolVersion: 1 } as unknown as InitializeRequest; + +describe("CodexAppServerAgent", () => { + it("runs initialize -> thread/start -> turn/start and streams agent text", async () => { + const stub = makeStubRpc({ + initialize: {}, + "thread/start": { thread: { id: "thr_1" } }, + "turn/start": { turn: { id: "turn_1", status: "inProgress" } }, + }); + const { client, sessionUpdates } = makeFakeClient(); + const agent = new CodexAppServerAgent(client, { + processOptions: { binaryPath: "/bundle/codex" }, + model: "gpt-5.5", + rpcFactory: stub.factory, + }); + + await agent.initialize(init); + const session = await agent.newSession({ + cwd: "/repo", + } as unknown as NewSessionRequest); + expect(session.sessionId).toBe("thr_1"); + + const promptDone = agent.prompt({ + sessionId: "thr_1", + prompt: [{ type: "text", text: "hello" }], + } as unknown as PromptRequest); + + stub.emit("item/agentMessage/delta", { itemId: "i1", text: "Hi there" }); + stub.emit("turn/completed", { + turn: { id: "turn_1", status: "completed" }, + }); + + const result = await promptDone; + expect(result.stopReason).toBe("end_turn"); + expect(sessionUpdates).toContainEqual({ + sessionId: "thr_1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Hi there" }, + }, + }); + + const turnStart = stub.requests.find((r) => r.method === "turn/start"); + expect(turnStart?.params).toMatchObject({ + threadId: "thr_1", + input: [{ type: "text", text: "hello" }], + }); + }); + + it("maps a failed turn to a refusal stop reason", async () => { + const stub = makeStubRpc({ "thread/start": { thread: { id: "t" } } }); + const { client } = makeFakeClient(); + const agent = new CodexAppServerAgent(client, { + processOptions: { binaryPath: "/x/codex" }, + rpcFactory: stub.factory, + }); + + await agent.newSession({ cwd: "/r" } as unknown as NewSessionRequest); + const done = agent.prompt({ + sessionId: "t", + prompt: [], + } as unknown as PromptRequest); + stub.emit("turn/completed", { turn: { status: "failed" } }); + + expect((await done).stopReason).toBe("refusal"); + }); + + it("routes command approvals to the host and maps allow to accept", async () => { + const stub = makeStubRpc({ "thread/start": { thread: { id: "t" } } }); + const { client } = makeFakeClient(); + const agent = new CodexAppServerAgent(client, { + processOptions: { binaryPath: "/x/codex" }, + rpcFactory: stub.factory, + }); + + await agent.newSession({ cwd: "/r" } as unknown as NewSessionRequest); + const decision = await stub.invokeRequest( + "item/commandExecution/requestApproval", + { itemId: "i", command: "ls -la" }, + ); + + expect(decision).toBe("accept"); + }); + + it("rejects the pending turn when the app-server stream closes", async () => { + const stub = makeStubRpc({ "thread/start": { thread: { id: "t" } } }); + const { client } = makeFakeClient(); + const agent = new CodexAppServerAgent(client, { + processOptions: { binaryPath: "/x/codex" }, + rpcFactory: stub.factory, + }); + + await agent.newSession({ cwd: "/r" } as unknown as NewSessionRequest); + const done = agent.prompt({ + sessionId: "t", + prompt: [{ type: "text", text: "hi" }], + } as unknown as PromptRequest); + + stub.triggerClose(); + + await expect(done).rejects.toThrow(/exited before the turn completed/); + }); + + it("interrupts by sending turn/interrupt before reporting cancelled", async () => { + const stub = makeStubRpc({ "thread/start": { thread: { id: "t" } } }); + const { client } = makeFakeClient(); + const agent = new CodexAppServerAgent(client, { + processOptions: { binaryPath: "/x/codex" }, + rpcFactory: stub.factory, + }); + + await agent.newSession({ cwd: "/r" } as unknown as NewSessionRequest); + const done = agent.prompt({ + sessionId: "t", + prompt: [], + } as unknown as PromptRequest); + + await agent.cancel({ sessionId: "t" }); + + expect((await done).stopReason).toBe("cancelled"); + expect(stub.requests.some((r) => r.method === "turn/interrupt")).toBe(true); + }); + + it("rejects a concurrent prompt while a turn is in progress", async () => { + const stub = makeStubRpc({ "thread/start": { thread: { id: "t" } } }); + const { client } = makeFakeClient(); + const agent = new CodexAppServerAgent(client, { + processOptions: { binaryPath: "/x/codex" }, + rpcFactory: stub.factory, + }); + + await agent.newSession({ cwd: "/r" } as unknown as NewSessionRequest); + const first = agent.prompt({ + sessionId: "t", + prompt: [], + } as unknown as PromptRequest); + + await expect( + agent.prompt({ sessionId: "t", prompt: [] } as unknown as PromptRequest), + ).rejects.toThrow(/already in progress/); + + stub.emit("turn/completed", { turn: { status: "completed" } }); + await first; + }); + + it("runs sequential turns on the same session", async () => { + const stub = makeStubRpc({ "thread/start": { thread: { id: "t" } } }); + const { client } = makeFakeClient(); + const agent = new CodexAppServerAgent(client, { + processOptions: { binaryPath: "/x/codex" }, + rpcFactory: stub.factory, + }); + + await agent.newSession({ cwd: "/r" } as unknown as NewSessionRequest); + + const first = agent.prompt({ + sessionId: "t", + prompt: [{ type: "text", text: "one" }], + } as unknown as PromptRequest); + stub.emit("turn/completed", { turn: { status: "completed" } }); + expect((await first).stopReason).toBe("end_turn"); + + const second = agent.prompt({ + sessionId: "t", + prompt: [{ type: "text", text: "two" }], + } as unknown as PromptRequest); + stub.emit("turn/completed", { turn: { status: "completed" } }); + expect((await second).stopReason).toBe("end_turn"); + }); + + it("maps a rejected approval to decline", async () => { + const stub = makeStubRpc({ "thread/start": { thread: { id: "t" } } }); + const { client } = makeFakeClient({ + outcome: "selected", + optionId: "reject", + }); + const agent = new CodexAppServerAgent(client, { + processOptions: { binaryPath: "/x/codex" }, + rpcFactory: stub.factory, + }); + + await agent.newSession({ cwd: "/r" } as unknown as NewSessionRequest); + expect( + await stub.invokeRequest("item/fileChange/requestApproval", { + itemId: "i", + }), + ).toBe("decline"); + }); + + it("maps a cancelled approval to cancel", async () => { + const stub = makeStubRpc({ "thread/start": { thread: { id: "t" } } }); + const { client } = makeFakeClient({ outcome: "cancelled" }); + const agent = new CodexAppServerAgent(client, { + processOptions: { binaryPath: "/x/codex" }, + rpcFactory: stub.factory, + }); + + await agent.newSession({ cwd: "/r" } as unknown as NewSessionRequest); + expect( + await stub.invokeRequest("item/commandExecution/requestApproval", { + itemId: "i", + command: "ls", + }), + ).toBe("cancel"); + }); +}); diff --git a/packages/agent/src/adapters/codex-app-server/codex-app-server-agent.ts b/packages/agent/src/adapters/codex-app-server/codex-app-server-agent.ts new file mode 100644 index 0000000000..88797060eb --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/codex-app-server-agent.ts @@ -0,0 +1,298 @@ +import type { + AgentSideConnection, + ContentBlock, + InitializeRequest, + InitializeResponse, + NewSessionRequest, + NewSessionResponse, + PromptRequest, + PromptResponse, + StopReason, +} from "@agentclientprotocol/sdk"; +import { DEFAULT_CODEX_MODEL } from "../../gateway-models"; +import type { ProcessSpawnedCallback } from "../../types"; +import { Logger } from "../../utils/logger"; +import { + nodeReadableToWebReadable, + nodeWritableToWebWritable, +} from "../../utils/streams"; +import { BaseAcpAgent, type BaseSettingsManager } from "../base-acp-agent"; +import { + AppServerClient, + type AppServerClientHandlers, + type AppServerRpc, +} from "./app-server-client"; +import { mapAppServerNotification } from "./mapping"; +import { + APP_SERVER_METHODS, + APP_SERVER_NOTIFICATIONS, + APP_SERVER_REQUESTS, +} from "./protocol"; +import { + type CodexAppServerProcess, + type CodexAppServerProcessOptions, + spawnCodexAppServerProcess, +} from "./spawn"; + +// The native app-server owns its own configuration, so there is nothing for the +// host to manage. BaseAcpAgent only calls dispose() on this. +class NoopSettingsManager implements BaseSettingsManager { + constructor(private cwd: string) {} + dispose(): void {} + getCwd(): string { + return this.cwd; + } + async setCwd(cwd: string): Promise { + this.cwd = cwd; + } + async initialize(): Promise {} +} + +export interface CodexAppServerAgentOptions { + processOptions: CodexAppServerProcessOptions; + /** Model id passed to thread/start. */ + model?: string; + /** Reasoning effort passed to turn/start. */ + reasoningEffort?: string; + processCallbacks?: ProcessSpawnedCallback; + logger?: Logger; + /** Test seam: build the JSON-RPC client (defaults to spawning the process). */ + rpcFactory?: (handlers: AppServerClientHandlers) => AppServerRpc; +} + +/** + * ACP Agent backed by the native Codex `app-server` protocol. Presents the same + * ACP surface to PostHog Code as the codex-acp adapter, but talks to Codex's own + * JSON-RPC protocol underneath instead of going through the Zed translation layer. + * + * Spike scope: covers the core lifecycle (initialize, thread/start, turn/start + * with streamed agent messages, interrupt, approvals). Resume/fork, tool-call + * rendering, structured output and usage accounting are follow-ups. + */ +export class CodexAppServerAgent extends BaseAcpAgent { + readonly adapterName = "codex"; + private readonly rpc: AppServerRpc; + private readonly proc?: CodexAppServerProcess; + private readonly model: string; + private readonly reasoningEffort?: string; + private threadId?: string; + private pendingTurn?: { + resolve: (reason: StopReason) => void; + reject: (err: Error) => void; + }; + + constructor( + client: AgentSideConnection, + options: CodexAppServerAgentOptions, + ) { + super(client); + this.logger = + options.logger ?? + new Logger({ debug: true, prefix: "[CodexAppServerAgent]" }); + this.model = options.model ?? DEFAULT_CODEX_MODEL; + this.reasoningEffort = options.reasoningEffort; + + const handlers: AppServerClientHandlers = { + logger: this.logger, + onNotification: (method, params) => + this.handleNotification(method, params), + onRequest: (method, params) => this.handleApproval(method, params), + onClose: () => this.handleServerClosed(), + }; + + if (options.rpcFactory) { + this.rpc = options.rpcFactory(handlers); + } else { + this.proc = spawnCodexAppServerProcess({ + ...options.processOptions, + logger: this.logger, + processCallbacks: options.processCallbacks, + }); + this.rpc = new AppServerClient( + { + readable: nodeReadableToWebReadable(this.proc.stdout), + writable: nodeWritableToWebWritable(this.proc.stdin), + }, + handlers, + ); + } + + this.session = { + abortController: new AbortController(), + settingsManager: new NoopSettingsManager( + options.processOptions.cwd ?? process.cwd(), + ), + notificationHistory: [], + cancelled: false, + }; + } + + async initialize(request: InitializeRequest): Promise { + await this.rpc.request(APP_SERVER_METHODS.INITIALIZE, { + clientInfo: { + name: "posthog-code", + title: "PostHog Code", + version: "0.1.0", + }, + capabilities: { experimentalApi: false }, + }); + this.rpc.notify(APP_SERVER_NOTIFICATIONS.INITIALIZED, {}); + return { + protocolVersion: request.protocolVersion, + agentInfo: { + name: "codex", + title: "Codex (app-server)", + version: "0.1.0", + }, + }; + } + + async newSession(params: NewSessionRequest): Promise { + const result = await this.rpc.request<{ thread?: { id?: string } }>( + APP_SERVER_METHODS.THREAD_START, + { model: this.model, cwd: params.cwd }, + ); + const threadId = result?.thread?.id; + if (!threadId) { + throw new Error("codex app-server thread/start returned no thread id"); + } + this.threadId = threadId; + this.sessionId = threadId; + this.logger.info("Codex app-server session created", { threadId }); + return { sessionId: threadId }; + } + + async prompt(params: PromptRequest): Promise { + if (!this.threadId) { + throw new Error("prompt() called before newSession()"); + } + if (this.pendingTurn) { + // The host serializes turns; a concurrent prompt would clobber the + // single pendingTurn slot, so fail fast rather than corrupt it. + throw new Error("prompt() called while a turn is already in progress"); + } + this.session.cancelled = false; + const input = toTurnInput(params.prompt); + const dropped = params.prompt.length - input.length; + if (dropped > 0) { + this.logger.warn("Dropped non-text prompt blocks", { dropped }); + } + const completion = new Promise((resolve, reject) => { + this.pendingTurn = { resolve, reject }; + }); + try { + await this.rpc.request(APP_SERVER_METHODS.TURN_START, { + threadId: this.threadId, + input, + ...(this.reasoningEffort ? { effort: this.reasoningEffort } : {}), + }); + return { stopReason: await completion }; + } finally { + this.pendingTurn = undefined; + } + } + + protected async interrupt(): Promise { + // Tell the server to stop first, then report the turn cancelled, so the + // caller never sees "cancelled" while Codex is still running. + if (this.threadId) { + await this.rpc + .request(APP_SERVER_METHODS.TURN_INTERRUPT, { threadId: this.threadId }) + .catch((err) => this.logger.warn("turn/interrupt failed", err)); + } + this.pendingTurn?.resolve("cancelled"); + this.pendingTurn = undefined; + } + + async closeSession(): Promise { + this.session.abortController.abort(); + this.pendingTurn?.resolve("cancelled"); + this.pendingTurn = undefined; + this.session.settingsManager.dispose(); + this.proc?.kill(); + await this.rpc.close(); + } + + private handleNotification(method: string, params: unknown): void { + if (this.sessionId && !this.session.cancelled) { + const notification = mapAppServerNotification( + this.sessionId, + method, + params, + ); + if (notification) { + void this.client + .sessionUpdate(notification) + .catch((err) => this.logger.warn("sessionUpdate failed", err)); + this.appendNotification(this.sessionId, notification); + } + } + + if (method === APP_SERVER_NOTIFICATIONS.TURN_COMPLETED) { + const status = (params as { turn?: { status?: string } })?.turn?.status; + this.pendingTurn?.resolve(status === "failed" ? "refusal" : "end_turn"); + this.pendingTurn = undefined; + } + } + + private handleServerClosed(): void { + this.pendingTurn?.reject( + new Error("codex app-server exited before the turn completed"), + ); + this.pendingTurn = undefined; + } + + private async handleApproval( + method: string, + params: unknown, + ): Promise { + if ( + method !== APP_SERVER_REQUESTS.COMMAND_APPROVAL && + method !== APP_SERVER_REQUESTS.FILE_CHANGE_APPROVAL + ) { + this.logger.warn("Unrecognized server request; declining", { method }); + return "decline"; + } + const detail = params as { itemId?: string; command?: string }; + const title = + detail.command ?? + (method === APP_SERVER_REQUESTS.FILE_CHANGE_APPROVAL + ? "Apply file changes" + : "Run command"); + try { + const response = await this.client.requestPermission({ + sessionId: this.sessionId, + toolCall: { toolCallId: detail.itemId ?? "codex-approval", title }, + options: [ + { optionId: "allow", name: "Allow", kind: "allow_once" }, + { optionId: "reject", name: "Reject", kind: "reject_once" }, + ], + }); + if ( + response.outcome.outcome === "selected" && + response.outcome.optionId === "allow" + ) { + return "accept"; + } + if (response.outcome.outcome === "cancelled") { + return "cancel"; + } + return "decline"; + } catch (err) { + this.logger.warn("requestPermission failed; declining", err); + return "decline"; + } + } +} + +function toTurnInput( + prompt: ContentBlock[], +): Array<{ type: "text"; text: string }> { + const input: Array<{ type: "text"; text: string }> = []; + for (const block of prompt) { + if (block.type === "text") { + input.push({ type: "text", text: block.text }); + } + } + return input; +} diff --git a/packages/agent/src/adapters/codex-app-server/mapping.test.ts b/packages/agent/src/adapters/codex-app-server/mapping.test.ts new file mode 100644 index 0000000000..fd4f1882d0 --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/mapping.test.ts @@ -0,0 +1,46 @@ +import { describe, expect, it } from "vitest"; +import { mapAppServerNotification } from "./mapping"; +import { APP_SERVER_NOTIFICATIONS } from "./protocol"; + +describe("mapAppServerNotification", () => { + it("maps an agent message delta to an ACP agent_message_chunk", () => { + const result = mapAppServerNotification( + "s-1", + APP_SERVER_NOTIFICATIONS.AGENT_MESSAGE_DELTA, + { itemId: "item_1", text: "Hello" }, + ); + + expect(result).toEqual({ + sessionId: "s-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Hello" }, + }, + }); + }); + + it("returns null when the text is missing or empty", () => { + expect( + mapAppServerNotification( + "s-1", + APP_SERVER_NOTIFICATIONS.AGENT_MESSAGE_DELTA, + {}, + ), + ).toBeNull(); + expect( + mapAppServerNotification( + "s-1", + APP_SERVER_NOTIFICATIONS.AGENT_MESSAGE_DELTA, + { itemId: "item_1", text: "" }, + ), + ).toBeNull(); + }); + + it("returns null for notifications not yet mapped in the spike", () => { + expect( + mapAppServerNotification("s-1", APP_SERVER_NOTIFICATIONS.TURN_COMPLETED, { + usage: { input_tokens: 10 }, + }), + ).toBeNull(); + }); +}); diff --git a/packages/agent/src/adapters/codex-app-server/mapping.ts b/packages/agent/src/adapters/codex-app-server/mapping.ts new file mode 100644 index 0000000000..d282981e14 --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/mapping.ts @@ -0,0 +1,42 @@ +import type { SessionNotification } from "@agentclientprotocol/sdk"; +import { APP_SERVER_NOTIFICATIONS } from "./protocol"; + +/** + * Translates a native app-server notification into an ACP SessionNotification + * so the rest of PostHog Code, which speaks ACP, stays unchanged. + * + * Spike scope: only the streaming agent-message path is mapped, which is what + * Phase A proves end to end. item/tool events, token usage and approvals are + * mapped in Phase B once the generated schema pins their exact shapes. + * Notifications without a mapping return null and are dropped. + */ +export function mapAppServerNotification( + sessionId: string, + method: string, + params: unknown, +): SessionNotification | null { + switch (method) { + case APP_SERVER_NOTIFICATIONS.AGENT_MESSAGE_DELTA: { + // `item/agentMessage/delta` carries { itemId, text }. + const text = readStringField(params, "text"); + if (!text) return null; + return { + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text }, + }, + }; + } + default: + return null; + } +} + +function readStringField(params: unknown, key: string): string | null { + if (params && typeof params === "object" && key in params) { + const value = (params as Record)[key]; + return typeof value === "string" ? value : null; + } + return null; +} diff --git a/packages/agent/src/adapters/codex-app-server/protocol.ts b/packages/agent/src/adapters/codex-app-server/protocol.ts new file mode 100644 index 0000000000..0448513366 --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/protocol.ts @@ -0,0 +1,65 @@ +/** + * Minimal typings for the native Codex `app-server` JSON-RPC protocol. + * + * Method names and message shapes follow the documented protocol + * (https://developers.openai.com/codex/app-server). The wire framing is + * newline-delimited JSON that follows JSON-RPC 2.0 structure but omits the + * `"jsonrpc": "2.0"` header on the wire. + * + * Spike scope: param/result shapes are still partial. Generate the exact, + * version-pinned schema with `codex app-server generate-ts` once the codex + * binary is bundled, then tighten these. + */ + +export const APP_SERVER_METHODS = { + INITIALIZE: "initialize", + THREAD_START: "thread/start", + THREAD_RESUME: "thread/resume", + THREAD_FORK: "thread/fork", + TURN_START: "turn/start", + TURN_INTERRUPT: "turn/interrupt", +} as const; + +export const APP_SERVER_NOTIFICATIONS = { + INITIALIZED: "initialized", + THREAD_STARTED: "thread/started", + ITEM_STARTED: "item/started", + ITEM_COMPLETED: "item/completed", + AGENT_MESSAGE_DELTA: "item/agentMessage/delta", + TURN_COMPLETED: "turn/completed", + TOKEN_USAGE_UPDATED: "thread/tokenUsage/updated", +} as const; + +/** Server-initiated requests the client must answer (approvals). */ +export const APP_SERVER_REQUESTS = { + COMMAND_APPROVAL: "item/commandExecution/requestApproval", + FILE_CHANGE_APPROVAL: "item/fileChange/requestApproval", +} as const; + +export interface JsonRpcRequest { + id: number; + method: string; + params?: unknown; +} + +export interface JsonRpcNotification { + method: string; + params?: unknown; +} + +export interface JsonRpcError { + code: number; + message: string; + data?: unknown; +} + +export interface JsonRpcResponse { + id: number; + result?: unknown; + error?: JsonRpcError; +} + +export type JsonRpcMessage = + | JsonRpcRequest + | JsonRpcNotification + | JsonRpcResponse; diff --git a/packages/agent/src/adapters/codex-app-server/spawn.test.ts b/packages/agent/src/adapters/codex-app-server/spawn.test.ts new file mode 100644 index 0000000000..0be0058b4b --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/spawn.test.ts @@ -0,0 +1,31 @@ +import { describe, expect, it } from "vitest"; +import { buildAppServerArgs } from "./spawn"; + +describe("buildAppServerArgs", () => { + it("launches the app-server subcommand routed through the PostHog gateway", () => { + const args = buildAppServerArgs({ + binaryPath: "/bundle/codex", + apiBaseUrl: "https://gateway.example/v1", + }); + + expect(args[0]).toBe("app-server"); + expect(args).toContain('model_provider="posthog"'); + expect(args).toContain( + 'model_providers.posthog.base_url="https://gateway.example/v1"', + ); + expect(args).toContain('model_providers.posthog.wire_api="responses"'); + expect(args).toContain( + 'model_providers.posthog.env_key="POSTHOG_GATEWAY_API_KEY"', + ); + }); + + it("passes guidance via developer_instructions, never the replacing key", () => { + const args = buildAppServerArgs({ + binaryPath: "/bundle/codex", + developerInstructions: "Follow PostHog rules.", + }); + + expect(args).toContain('developer_instructions="Follow PostHog rules."'); + expect(args.some((arg) => arg.startsWith("instructions="))).toBe(false); + }); +}); diff --git a/packages/agent/src/adapters/codex-app-server/spawn.ts b/packages/agent/src/adapters/codex-app-server/spawn.ts new file mode 100644 index 0000000000..2db7a633b0 --- /dev/null +++ b/packages/agent/src/adapters/codex-app-server/spawn.ts @@ -0,0 +1,132 @@ +import { type ChildProcess, spawn } from "node:child_process"; +import { existsSync } from "node:fs"; +import { delimiter, dirname } from "node:path"; +import type { Readable, Writable } from "node:stream"; +import type { ProcessSpawnedCallback } from "../../types"; +import { Logger } from "../../utils/logger"; + +export interface CodexAppServerProcessOptions { + /** Path to the native `codex` CLI binary (the one that exposes `app-server`). */ + binaryPath: string; + cwd?: string; + apiBaseUrl?: string; + apiKey?: string; + /** Guidance appended to Codex's base prompt via `developer_instructions`. */ + developerInstructions?: string; + logger?: Logger; + processCallbacks?: ProcessSpawnedCallback; +} + +export interface CodexAppServerProcess { + process: ChildProcess; + stdin: Writable; + stdout: Readable; + kill: () => void; +} + +export function buildAppServerArgs( + options: CodexAppServerProcessOptions, +): string[] { + const args: string[] = ["app-server"]; + + args.push("-c", "features.remote_models=false"); + + if (options.apiBaseUrl) { + args.push("-c", `model_provider="posthog"`); + args.push("-c", `model_providers.posthog.name="PostHog Gateway"`); + args.push("-c", `model_providers.posthog.base_url="${options.apiBaseUrl}"`); + args.push("-c", `model_providers.posthog.wire_api="responses"`); + args.push( + "-c", + `model_providers.posthog.env_key="POSTHOG_GATEWAY_API_KEY"`, + ); + } + + if (options.developerInstructions) { + const escaped = options.developerInstructions + .replace(/\\/g, "\\\\") + .replace(/\n/g, "\\n") + .replace(/\r/g, "\\r") + .replace(/"/g, '\\"'); + args.push("-c", `developer_instructions="${escaped}"`); + } + + return args; +} + +export function spawnCodexAppServerProcess( + options: CodexAppServerProcessOptions, +): CodexAppServerProcess { + const logger = + options.logger ?? new Logger({ debug: true, prefix: "[CodexAppServer]" }); + + if (!existsSync(options.binaryPath)) { + throw new Error( + `codex binary not found at ${options.binaryPath}. Run "node apps/code/scripts/download-binaries.mjs" to download it.`, + ); + } + + const env: NodeJS.ProcessEnv = { ...process.env }; + delete env.ELECTRON_RUN_AS_NODE; + delete env.ELECTRON_NO_ASAR; + if (options.apiKey) { + env.POSTHOG_GATEWAY_API_KEY = options.apiKey; + } + env.PATH = `${dirname(options.binaryPath)}${delimiter}${env.PATH ?? ""}`; + + const args = buildAppServerArgs(options); + + logger.info("Spawning codex app-server process", { + command: options.binaryPath, + args, + cwd: options.cwd, + }); + + const child = spawn(options.binaryPath, args, { + cwd: options.cwd, + env, + stdio: ["pipe", "pipe", "pipe"], + detached: process.platform !== "win32", + }); + + child.stderr?.on("data", (data: Buffer) => { + logger.warn("codex app-server stderr:", data.toString()); + }); + + child.on("error", (err) => { + logger.error("codex app-server process error:", err); + }); + + child.on("exit", (code, signal) => { + logger.info("codex app-server process exited", { code, signal }); + if (child.pid && options.processCallbacks?.onProcessExited) { + options.processCallbacks.onProcessExited(child.pid); + } + }); + + if (!child.stdin || !child.stdout) { + throw new Error( + "Failed to get stdio streams from codex app-server process", + ); + } + + if (child.pid && options.processCallbacks?.onProcessSpawned) { + options.processCallbacks.onProcessSpawned({ + pid: child.pid, + command: options.binaryPath, + }); + } + + return { + process: child, + stdin: child.stdin, + stdout: child.stdout, + kill: () => { + logger.info("Killing codex app-server process", { pid: child.pid }); + child.stdin?.destroy(); + child.stdout?.destroy(); + child.stderr?.destroy(); + child.kill("SIGTERM"); + }, + }; +}