Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions apps/code/scripts/download-binaries.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
existsSync,
mkdirSync,
realpathSync,
renameSync,
rmSync,
} from "node:fs";
import { dirname, join } from "node:path";
Expand Down Expand Up @@ -47,6 +48,40 @@ const BINARIES = [
return target;
},
},
{
name: "codex",
version: "0.140.0",
getUrl: (version, target) => {
if (target.includes("windows")) {
return `https://github.com/openai/codex/releases/download/rust-v${version}/codex-${target}.exe.zip`;
}
return `https://github.com/openai/codex/releases/download/rust-v${version}/codex-${target}.tar.gz`;
},
getTarget: () => {
const { platform, arch } = process;
const targets = {
darwin: { arm64: "aarch64-apple-darwin", x64: "x86_64-apple-darwin" },
linux: {
arm64: "aarch64-unknown-linux-musl",
x64: "x86_64-unknown-linux-musl",
},
win32: {
arm64: "aarch64-pc-windows-msvc",
x64: "x86_64-pc-windows-msvc",
},
};
const platformTargets = targets[platform];
if (!platformTargets)
throw new Error(`Unsupported platform: ${platform}`);
const target = platformTargets[arch];
if (!target) throw new Error(`Unsupported arch: ${arch}`);
return target;
},
// The codex release archive contains a target-suffixed binary
// (e.g. `codex-aarch64-apple-darwin`); rename it to `codex` after extract.
archiveBinaryName: (target) =>
process.platform === "win32" ? `codex-${target}.exe` : `codex-${target}`,
},
{
name: "rg",
version: "15.0.0",
Expand Down Expand Up @@ -159,6 +194,13 @@ async function downloadBinary(binary) {
await extractArchive(archivePath, DEST_DIR);
rmSync(archivePath);

if (binary.archiveBinaryName) {
const extractedPath = join(DEST_DIR, binary.archiveBinaryName(target));
if (extractedPath !== binaryPath && existsSync(extractedPath)) {
renameSync(extractedPath, binaryPath);
}
}

if (!existsSync(binaryPath)) {
throw new Error(`Binary not found after extraction: ${binaryPath}`);
}
Expand Down
1 change: 1 addition & 0 deletions apps/code/vite.main.config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ function copyCodexAcpBinaries(): Plugin {
const sourceDir = join(__dirname, "resources/codex-acp");
const binaries = [
{ name: "codex-acp", winName: "codex-acp.exe" },
{ name: "codex", winName: "codex.exe" },
{ name: "rg", winName: "rg.exe" },
];

Expand Down
29 changes: 27 additions & 2 deletions packages/agent/src/adapters/acp-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {
import { ClaudeAcpAgent } from "./claude/claude-agent";
import { CodexAcpAgent } from "./codex/codex-agent";
import type { CodexProcessOptions } from "./codex/spawn";
import { nativeCodexBinaryPath } from "./codex-app-server/binary-path";
import { CodexAppServerAgent } from "./codex-app-server/codex-app-server-agent";

type AgentAdapter = "claude" | "codex";

Expand Down Expand Up @@ -199,10 +201,33 @@ function createCodexConnection(config: AcpConnectionConfig): AcpConnection {

const agentStream = ndJsonStream(agentWritable, streams.agent.readable);

let agent: CodexAcpAgent | null = null;
let agent: CodexAcpAgent | CodexAppServerAgent | null = null;
const agentConnection = new AgentSideConnection((client) => {
const codexOptions = config.codexOptions ?? {};
const nativeBinary = nativeCodexBinaryPath(codexOptions.binaryPath);

// The native app-server is the default Codex harness. Fall back to the
// codex-acp (Zed) adapter only when the codex binary isn't bundled or when
// POSTHOG_CODEX_USE_ACP is set as an escape hatch.
if (nativeBinary && process.env.POSTHOG_CODEX_USE_ACP !== "1") {
agent = new CodexAppServerAgent(client, {
processOptions: {
binaryPath: nativeBinary,
cwd: codexOptions.cwd,
apiBaseUrl: codexOptions.apiBaseUrl,
apiKey: codexOptions.apiKey,
developerInstructions: codexOptions.developerInstructions,
},
model: codexOptions.model,
reasoningEffort: codexOptions.reasoningEffort,
processCallbacks: config.processCallbacks,
logger: config.logger?.child("CodexAppServerAgent"),
});
return agent;
}

agent = new CodexAcpAgent(client, {
codexProcessOptions: config.codexOptions ?? {},
codexProcessOptions: codexOptions,
processCallbacks: config.processCallbacks,
posthogApiConfig: resolveEnricherApiConfig(config),
onStructuredOutput: config.onStructuredOutput,
Expand Down
156 changes: 156 additions & 0 deletions packages/agent/src/adapters/codex-app-server/app-server-client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import { describe, expect, it, vi } from "vitest";
import { Logger } from "../../utils/logger";
import {
createBidirectionalStreams,
type StreamPair,
} from "../../utils/streams";
import { AppServerClient } from "./app-server-client";

interface RpcMessage {
id?: number;
method?: string;
params?: unknown;
result?: unknown;
error?: { code: number; message: string };
}

/**
* Drives the "server" end of a {@link StreamPair}: reads newline-delimited
* JSON-RPC the client sent and writes framed responses/notifications back.
*/
function makeFakeServer(transport: StreamPair) {
const writer = transport.writable.getWriter();
const reader = transport.readable.getReader();
const encoder = new TextEncoder();
const decoder = new TextDecoder();
let buffer = "";

return {
async readMessage(): Promise<RpcMessage> {
for (let guard = 0; guard < 10_000; guard++) {
const newlineIndex = buffer.indexOf("\n");
if (newlineIndex !== -1) {
const line = buffer.slice(0, newlineIndex).trim();
buffer = buffer.slice(newlineIndex + 1);
if (line) return JSON.parse(line) as RpcMessage;
continue;
}
const { value, done } = await reader.read();
if (done) throw new Error("server stream closed");
buffer += decoder.decode(value, { stream: true });
}
throw new Error("no message read");
},
async send(message: RpcMessage): Promise<void> {
await writer.write(encoder.encode(`${JSON.stringify(message)}\n`));
},
};
}

const silentLogger = new Logger({ debug: false });

describe("AppServerClient", () => {
it("resolves a request when the server returns a matching response", async () => {
const streams = createBidirectionalStreams();
const client = new AppServerClient(streams.client, {
logger: silentLogger,
});
const server = makeFakeServer(streams.agent);

const pending = client.request("initialize", {
clientInfo: { name: "posthog-code" },
});

const request = await server.readMessage();
expect(request.method).toBe("initialize");
expect(typeof request.id).toBe("number");
expect(request.params).toEqual({ clientInfo: { name: "posthog-code" } });

await server.send({
id: request.id as number,
result: { userAgent: "codex" },
});

await expect(pending).resolves.toEqual({ userAgent: "codex" });
await client.close();
});

it("rejects a request when the server returns an error", async () => {
const streams = createBidirectionalStreams();
const client = new AppServerClient(streams.client, {
logger: silentLogger,
});
const server = makeFakeServer(streams.agent);

const pending = client.request("turn/start", {});
const request = await server.readMessage();
await server.send({
id: request.id as number,
error: { code: -32001, message: "Server overloaded; retry later." },
});

await expect(pending).rejects.toThrow("Server overloaded; retry later.");
await client.close();
});

it("dispatches server notifications to the handler in order", async () => {
const streams = createBidirectionalStreams();
const received: string[] = [];
const client = new AppServerClient(streams.client, {
logger: silentLogger,
onNotification: (method, params) => {
if (method === "item/agentMessage/delta") {
received.push((params as { delta: string }).delta);
}
},
});
const server = makeFakeServer(streams.agent);

await server.send({
method: "item/agentMessage/delta",
params: { delta: "Hel" },
});
await server.send({
method: "item/agentMessage/delta",
params: { delta: "lo" },
});

await vi.waitFor(() => expect(received.length).toBe(2));
expect(received.join("")).toBe("Hello");
await client.close();
});

it("answers server-initiated requests via onRequest", async () => {
const streams = createBidirectionalStreams();
const client = new AppServerClient(streams.client, {
logger: silentLogger,
onRequest: async (method) => ({
decision: method === "applyPatchApproval" ? "approved" : "denied",
}),
});
const server = makeFakeServer(streams.agent);

await server.send({
id: 99,
method: "applyPatchApproval",
params: {},
});

const response = await server.readMessage();
expect(response.id).toBe(99);
expect(response.result).toEqual({ decision: "approved" });
await client.close();
});

it("rejects in-flight requests when closed", async () => {
const streams = createBidirectionalStreams();
const client = new AppServerClient(streams.client, {
logger: silentLogger,
});

const pending = client.request("thread/start", {});
await client.close();

await expect(pending).rejects.toThrow(/closed/i);
});
});
Loading
Loading