diff --git a/.changeset/fix-deferred-pull-wakes.md b/.changeset/fix-deferred-pull-wakes.md new file mode 100644 index 0000000000..961199ed32 --- /dev/null +++ b/.changeset/fix-deferred-pull-wakes.md @@ -0,0 +1,8 @@ +--- +'@electric-ax/agents-runtime': patch +'@electric-ax/agents-server': patch +--- + +Fix child wake delivery so same-stream wake trigger notifications are queued while an active claim is already running, ensuring the runner checks for all pending wake rows again after the active work drains. + +Refactor the server wake registry to use TanStack DB collections and optimistic actions over `wake_registrations`, removing the manual ShapeStream-backed registration cache and stale-cache reload fallback. diff --git a/docs/superpowers/plans/2026-06-19-wake-registry-tanstack-db.md b/docs/superpowers/plans/2026-06-19-wake-registry-tanstack-db.md new file mode 100644 index 0000000000..fc67c1a38f --- /dev/null +++ b/docs/superpowers/plans/2026-06-19-wake-registry-tanstack-db.md @@ -0,0 +1,1525 @@ +# Wake Registry TanStack DB Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace `WakeRegistry`'s manual Postgres row cache and ShapeStream sync with TanStack DB collections, optimistic actions, `queryOnce`, and collection effects. + +**Architecture:** `WakeRegistry` will own one TanStack DB collection of `wake_registrations` rows. Runtime uses `electricCollectionOptions` over the Postgres table; unit tests use a local-only collection. All mutations go through `createOptimisticAction`; each action persists to Postgres in its `mutationFn`, then awaits the Electric collection txid. Evaluation reads with `queryOnce`, and timeout wake timers are driven by `createEffect` over collection rows. + +**Tech Stack:** TypeScript, Vitest, Drizzle ORM, Postgres, Electric `@electric-sql/client`, TanStack DB `@tanstack/db`, TanStack Electric collection `@tanstack/electric-db-collection`. + +## Global Constraints + +- Runtime requires Electric; there is no no-Electric Postgres fallback. +- No `wake_registrations` schema migration. +- No custom TanStack DB adapter. +- No pull-wake runner changes. +- No changes to persisted wake row payloads. +- All public `WakeRegistry` mutations invoke optimistic actions. +- Bulk unregister operations are one action per domain intent. +- Wake evaluation uses `queryOnce` over the collection. +- Timeout wake timers are driven by `createEffect` over collection rows. +- `WakeRegistry` must not instantiate `Shape` or `ShapeStream` directly. +- Before running package typecheck/tests in this worktree, run `pnpm install` from the repository root if workspace links are missing. + +--- + +## File Structure + +- `packages/agents-server/package.json` + - Add runtime dependencies: `@tanstack/db` and `@tanstack/electric-db-collection`. +- `packages/agents-server/src/wake-registry.ts` + - Main refactor. Owns TanStack DB collection, actions, query-based evaluation, and timeout effects. +- `packages/agents-server/src/entity-manager.ts` + - Await async `wakeRegistry.evaluate(...)`, remove reload-on-miss fallback, and require `startSync(...)` for runtime startup. +- `packages/agents-server/src/host.ts` + - Require `electricUrl` for host startup; remove `loadRegistrations()` fallback. +- `packages/agents-server/src/standalone-runtime.ts` + - Keep passing Electric URL into `rebuildWakeRegistry(...)`; missing Electric URL should fail through `EntityManager.rebuildWakeRegistry(...)`. +- `packages/agents-server/test/wake-registry.test.ts` + - Convert unit tests to local-only collection startup and async evaluation. +- `packages/agents-server/test/wake-registry-sync.test.ts` + - Replace manual ShapeStream mock tests with Postgres + Electric collection integration tests or remove if covered elsewhere. +- `packages/agents-server/test/server-start.test.ts` + - Update mocked `WakeRegistry` API and assert missing Electric URL fails when applicable. +- `.changeset/fix-deferred-pull-wakes.md` + - Update if package dependencies/behavior summary changes. + +--- + +### Task 1: Add TanStack DB Dependencies and Local Collection Foundation + +**Files:** + +- Modify: `packages/agents-server/package.json:46-67` +- Modify: `packages/agents-server/src/wake-registry.ts:1-180` +- Test: `packages/agents-server/test/wake-registry.test.ts:32-61` + +**Interfaces:** + +- Consumes: existing `WakeRegistration`, `WakeEvalResult`, `DrizzleDB`, `DEFAULT_TENANT_ID`. +- Produces: + - `interface WakeRegistrationCollectionRow` + - `WakeRegistry.startLocalForTests(): Promise` + - `WakeRegistry.evaluate(...): Promise>` + - `WakeRegistry.requireCollection(): Collection` + +- [ ] **Step 1: Add dependencies** + +Add these dependencies to `packages/agents-server/package.json` under `dependencies`: + +```json +"@tanstack/db": "^0.6.7", +"@tanstack/electric-db-collection": "^0.3.5" +``` + +Run from repo root: + +```bash +pnpm install +``` + +Expected: lockfile updates cleanly and package workspace links are materialized. + +- [ ] **Step 2: Write failing local-collection evaluation test** + +In `packages/agents-server/test/wake-registry.test.ts`, add this test near the first simple `Wake Registry` tests. This test intentionally uses a local collection helper that does not exist yet: + +```ts +it(`evaluates registrations from local TanStack DB collection`, async () => { + const registry = new WakeRegistry(createMockDb()) + await registry.startLocalForTests() + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/c1`, + condition: `runFinished`, + oneShot: false, + }) + + const results = await registry.evaluate(`/child/c1`, { + type: `run`, + key: `run-1`, + value: { status: `completed` }, + headers: { operation: `update` }, + }) + + expect(results).toHaveLength(1) + expect(results[0]!.subscriberUrl).toBe(`/parent/p1`) + expect(results[0]!.registrationDbId).toBe(1) + expect(results[0]!.sourceEventKey).toBe(`update:run-1`) +}) +``` + +- [ ] **Step 3: Run the focused test to verify it fails** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/wake-registry.test.ts -t "evaluates registrations from local TanStack DB collection" --run +``` + +Expected: FAIL with TypeScript/runtime error that `startLocalForTests` does not exist or `evaluate` is not awaitable yet. + +- [ ] **Step 4: Add collection foundation alongside the legacy runtime cache** + +Task 1 is an incremental local-only foundation. Keep the existing `registrationCache`, `ShapeStream`, `startSync(...)`, and runtime cache code in place until Task 4 removes them. In `packages/agents-server/src/wake-registry.ts`, add TanStack DB imports at the top while keeping imports that are still needed by the legacy runtime path: + +```ts +import { + and as dbAnd, + createCollection, + createOptimisticAction, + eq as dbEq, + localOnlyCollectionOptions, + queryOnce, +} from '@tanstack/db' +import { and, eq } from 'drizzle-orm' +import { wakeRegistrations } from './db/schema.js' +import { DEFAULT_TENANT_ID } from './tenant.js' +import type { Collection } from '@tanstack/db' +import type { DrizzleDB } from './db/index.js' +``` + +Do not remove the existing `ShapeStream`, `serverLog`, `electricUrlWithPath`, `Row`, or `Value` imports in Task 1 if they are still used by the legacy runtime path. Task 4 removes manual ShapeStream runtime sync. + +Add this row type and local test state after `WakeDebounceCallback`: + +```ts +interface WakeRegistrationCollectionRow { + id: number + tenantId: string + subscriberUrl: string + sourceUrl: string + condition: WakeRegistration[`condition`] + debounceMs: number + timeoutMs: number + oneShot: boolean + timeoutConsumed: boolean + includeResponse: boolean + manifestKey: string | null + createdAt: Date +} + +type WakeRegistryMode = `unstarted` | `local-test` | `electric` +``` + +In `WakeRegistry`, add these fields next to the existing legacy cache/sync fields. Do not remove the old cache fields in Task 1 because the existing runtime `startSync(...)` path still uses them until Task 4. + +```ts +private registrationsCollection: Collection | null = null +private mode: WakeRegistryMode = `unstarted` +private nextLocalId = 1 +``` + +Add these helpers inside `WakeRegistry`: + +```ts +private requireCollection(): Collection { + if (!this.registrationsCollection) { + throw new Error(`WakeRegistry has not been started`) + } + return this.registrationsCollection +} + +async startLocalForTests(): Promise { + if (this.registrationsCollection) return + this.mode = `local-test` + this.registrationsCollection = createCollection( + localOnlyCollectionOptions({ + id: `wake-registrations-local:${this.tenantId ?? `all`}`, + getKey: (row) => row.id, + initialData: [], + }) + ) + await this.registrationsCollection.preload() +} + +private allocateLocalId(): number { + return this.nextLocalId++ +} + +private normalizeRegistration( + reg: WakeRegistration, + tenantId: string, + id: number +): WakeRegistrationCollectionRow { + return { + id, + tenantId, + subscriberUrl: reg.subscriberUrl, + sourceUrl: reg.sourceUrl, + condition: reg.condition, + debounceMs: reg.debounceMs ?? 0, + timeoutMs: reg.timeoutMs ?? 0, + oneShot: reg.oneShot, + timeoutConsumed: false, + includeResponse: reg.includeResponse !== false, + manifestKey: reg.manifestKey ?? null, + createdAt: new Date(), + } +} +``` + +- [ ] **Step 5: Implement local register action and async queryOnce evaluation** + +Add this action field and initializer inside `WakeRegistry`. The local action is enough for this task; runtime persistence is added later. + +```ts +private registerAction = createOptimisticAction({ + onMutate: (row) => { + this.requireCollection().insert(row) + }, + mutationFn: async () => { + if (this.mode === `local-test`) return + throw new Error(`WakeRegistry registerAction runtime persistence is not initialized`) + }, +}) +``` + +Replace `register(reg: WakeRegistration): Promise` with this minimal local implementation: + +```ts +async register(reg: WakeRegistration): Promise { + const tenantId = this.resolveTenantId(reg.tenantId) + const id = this.mode === `local-test` ? this.allocateLocalId() : this.allocateLocalId() + const tx = this.registerAction(this.normalizeRegistration(reg, tenantId, id)) + await tx.isPersisted.promise +} +``` + +Replace the start of `evaluate(...)` with an async collection query. Keep the existing matching/debounce/result code, but iterate over `regs` from `queryOnce` instead of the old `registrationCache` array: + +```ts +async evaluate( + sourceUrl: string, + event: Record, + tenantId?: string +): Promise> { + const resolvedTenantId = this.resolveTenantId(tenantId) + const regs = await queryOnce((q) => + q + .from({ reg: this.requireCollection() }) + .where(({ reg }) => + dbAnd(dbEq(reg.tenantId, resolvedTenantId), dbEq(reg.sourceUrl, sourceUrl)) + ) + ) + if (regs.length === 0) return [] + + const results: Array = [] + const oneShotRows: Array = [] + + for (const reg of regs) { + const match = this.matchCondition(reg, event) + if (!match) continue + + const timerKey = this.registrationKey(reg) + const timeoutTimer = this.timeoutTimers.get(timerKey) + if (timeoutTimer) { + clearTimeout(timeoutTimer) + this.timeoutTimers.delete(timerKey) + void this.markTimeoutConsumed(reg.id, reg.tenantId) + } + + if (reg.debounceMs > 0) { + const buffer = this.debounceBuffers.get(timerKey) ?? [] + buffer.push(match.change) + this.debounceBuffers.set(timerKey, buffer) + if (match.runFinishedStatus) { + this.debounceRunStatus.set(timerKey, match.runFinishedStatus) + } + const existing = this.debounceTimers.get(timerKey) + if (existing) clearTimeout(existing) + const timer = setTimeout(() => { + this.debounceTimers.delete(timerKey) + const flushed = this.debounceBuffers.get(timerKey) + if (flushed && flushed.length > 0) { + this.debounceBuffers.delete(timerKey) + const runStatus = this.debounceRunStatus.get(timerKey) + this.debounceRunStatus.delete(timerKey) + this.deliverDebounce({ + tenantId: reg.tenantId, + subscriberUrl: reg.subscriberUrl, + registrationDbId: reg.id, + sourceEventKey: flushed[flushed.length - 1]!.key, + wakeMessage: { + source: sourceUrl, + timeout: false, + changes: flushed, + }, + runFinishedStatus: runStatus, + includeResponse: reg.includeResponse, + }) + } + }, reg.debounceMs) + this.debounceTimers.set(timerKey, timer) + } else { + results.push({ + tenantId: reg.tenantId, + subscriberUrl: reg.subscriberUrl, + registrationDbId: reg.id, + sourceEventKey: wakeSourceEventId(event), + wakeMessage: { + source: sourceUrl, + timeout: false, + changes: [match.change], + }, + runFinishedStatus: match.runFinishedStatus, + includeResponse: reg.includeResponse, + }) + } + + if (reg.oneShot) oneShotRows.push(reg) + } + + for (const reg of oneShotRows) { + this.clearRegistrationState(reg) + this.timeoutDelivered.delete(reg.id) + this.requireCollection().delete(reg.id) + } + + return results +} +``` + +Update `registrationKey`, `clearRegistrationState`, `matchCondition`, and timeout helper parameter types from `CachedWakeRegistration` to `WakeRegistrationCollectionRow` where needed: + +```ts +private registrationKey(reg: WakeRegistrationCollectionRow): string +private clearRegistrationState(reg: WakeRegistrationCollectionRow): void +private matchCondition(reg: WakeRegistrationCollectionRow, event: Record): ... +``` + +- [ ] **Step 6: Run the focused test to verify it passes** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/wake-registry.test.ts -t "evaluates registrations from local TanStack DB collection" --run +``` + +Expected: PASS. + +- [ ] **Step 7: Commit** + +```bash +git add packages/agents-server/package.json pnpm-lock.yaml packages/agents-server/src/wake-registry.ts packages/agents-server/test/wake-registry.test.ts +git commit -m "Introduce TanStack DB wake registry collection" +``` + +--- + +### Task 2: Convert All Unit-Test Registry Mutations to Optimistic Actions + +**Files:** + +- Modify: `packages/agents-server/src/wake-registry.ts:298-530, 730-910` +- Modify: `packages/agents-server/test/wake-registry.test.ts` + +**Interfaces:** + +- Consumes: `WakeRegistrationCollectionRow`, `WakeRegistry.requireCollection()`, async `WakeRegistry.evaluate(...)`. +- Produces: + - `unregisterByManifestKeyAction` + - `unregisterBySubscriberAction` + - `unregisterBySourceAction` + - `unregisterBySubscriberAndSourceAction` + - `markTimeoutConsumedAction` + - `consumeMatchedRegistrationsAction` + +- [ ] **Step 1: Write failing tests for bulk unregister actions** + +Add these tests to `packages/agents-server/test/wake-registry.test.ts` near existing unregister tests: + +```ts +it(`unregisterBySource removes all matching local collection rows`, async () => { + const registry = new WakeRegistry(createMockDb()) + await registry.startLocalForTests() + + await registry.register({ + subscriberUrl: `/parent/a`, + sourceUrl: `/source/1`, + condition: { on: `change` }, + oneShot: false, + }) + await registry.register({ + subscriberUrl: `/parent/b`, + sourceUrl: `/source/1`, + condition: { on: `change` }, + oneShot: false, + }) + await registry.register({ + subscriberUrl: `/parent/c`, + sourceUrl: `/source/2`, + condition: { on: `change` }, + oneShot: false, + }) + + await registry.unregisterBySource(`/source/1`) + + expect( + await registry.evaluate(`/source/1`, { + type: `texts`, + key: `t1`, + value: {}, + headers: { operation: `insert` }, + }) + ).toHaveLength(0) + expect( + await registry.evaluate(`/source/2`, { + type: `texts`, + key: `t2`, + value: {}, + headers: { operation: `insert` }, + }) + ).toHaveLength(1) +}) + +it(`oneShot match removes row before a second immediate evaluation`, async () => { + const registry = new WakeRegistry(createMockDb()) + await registry.startLocalForTests() + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/c1`, + condition: `runFinished`, + oneShot: true, + }) + + const event = { + type: `run`, + key: `run-1`, + value: { status: `completed` }, + headers: { operation: `update` }, + } + + expect(await registry.evaluate(`/child/c1`, event)).toHaveLength(1) + expect(await registry.evaluate(`/child/c1`, event)).toHaveLength(0) +}) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/wake-registry.test.ts -t "unregisterBySource removes|oneShot match removes" --run +``` + +Expected: FAIL if unregisters or one-shot cleanup still use old cache/DB paths. + +- [ ] **Step 3: Add query helpers for matching rows** + +Add these helpers to `WakeRegistry`: + +```ts +private async rowsByPredicate( + predicate: (row: WakeRegistrationCollectionRow) => boolean +): Promise> { + const rows = await queryOnce((q) => q.from({ reg: this.requireCollection() })) + return rows.filter(predicate) +} + +private async rowsForSource( + tenantId: string, + sourceUrl: string +): Promise> { + return await queryOnce((q) => + q + .from({ reg: this.requireCollection() }) + .where(({ reg }) => + dbAnd(dbEq(reg.tenantId, tenantId), dbEq(reg.sourceUrl, sourceUrl)) + ) + ) +} +``` + +This task may use `rowsByPredicate` for local actions because action `onMutate` must be synchronous. Capture the rows before invoking the action and pass their ids into the action input. + +- [ ] **Step 4: Implement unregister optimistic actions** + +Add action input types near `WakeRegistryMode`: + +```ts +type DeleteRowsInput = { + rows: Array + persist: + | { + kind: `manifestKey` + tenantId: string + subscriberUrl: string + manifestKey: string + } + | { + kind: `subscriber` + tenantId: string + subscriberUrl: string + } + | { + kind: `source` + tenantId: string + sourceUrl: string + } + | { + kind: `subscriberAndSource` + tenantId: string + subscriberUrl: string + sourceUrl: string + } + | { + kind: `oneShot` + } +} +``` + +Add one shared delete action: + +```ts +private deleteRowsAction = createOptimisticAction({ + onMutate: ({ rows }) => { + const collection = this.requireCollection() + for (const row of rows) { + this.clearRegistrationState(row) + this.timeoutDelivered.delete(row.id) + collection.delete(row.id) + } + }, + mutationFn: async ({ persist }) => { + if (this.mode === `local-test` || persist.kind === `oneShot`) return + throw new Error(`WakeRegistry deleteRowsAction runtime persistence is not initialized`) + }, +}) +``` + +Replace unregister methods with wrappers that query rows, invoke the action, and await persistence: + +```ts +async unregisterBySource(sourceUrl: string, tenantId?: string): Promise { + const resolvedTenantId = this.resolveTenantId(tenantId) + const rows = await this.rowsForSource(resolvedTenantId, sourceUrl) + const tx = this.deleteRowsAction({ + rows, + persist: { kind: `source`, tenantId: resolvedTenantId, sourceUrl }, + }) + await tx.isPersisted.promise +} +``` + +Implement the other three unregister methods with the same pattern: + +```ts +async unregisterBySubscriber(subscriberUrl: string, tenantId?: string): Promise { + const resolvedTenantId = this.resolveTenantId(tenantId) + const rows = await this.rowsByPredicate( + (row) => row.tenantId === resolvedTenantId && row.subscriberUrl === subscriberUrl + ) + const tx = this.deleteRowsAction({ + rows, + persist: { kind: `subscriber`, tenantId: resolvedTenantId, subscriberUrl }, + }) + await tx.isPersisted.promise +} + +async unregisterByManifestKey( + subscriberUrl: string, + manifestKey: string, + tenantId?: string +): Promise { + const resolvedTenantId = this.resolveTenantId(tenantId) + const rows = await this.rowsByPredicate( + (row) => + row.tenantId === resolvedTenantId && + row.subscriberUrl === subscriberUrl && + row.manifestKey === manifestKey + ) + const tx = this.deleteRowsAction({ + rows, + persist: { kind: `manifestKey`, tenantId: resolvedTenantId, subscriberUrl, manifestKey }, + }) + await tx.isPersisted.promise +} + +async unregisterBySubscriberAndSource( + subscriberUrl: string, + sourceUrl: string, + tenantId?: string +): Promise { + const resolvedTenantId = this.resolveTenantId(tenantId) + const rows = await this.rowsByPredicate( + (row) => + row.tenantId === resolvedTenantId && + row.subscriberUrl === subscriberUrl && + row.sourceUrl === sourceUrl + ) + const tx = this.deleteRowsAction({ + rows, + persist: { kind: `subscriberAndSource`, tenantId: resolvedTenantId, subscriberUrl, sourceUrl }, + }) + await tx.isPersisted.promise +} +``` + +- [ ] **Step 5: Implement mark-timeout-consumed action** + +Add this action: + +```ts +private markTimeoutConsumedAction = createOptimisticAction<{ + row: WakeRegistrationCollectionRow +}>({ + onMutate: ({ row }) => { + this.requireCollection().update(row.id, (draft) => { + draft.timeoutConsumed = true + }) + }, + mutationFn: async () => { + if (this.mode === `local-test`) return + throw new Error(`WakeRegistry markTimeoutConsumedAction runtime persistence is not initialized`) + }, +}) +``` + +Replace `markTimeoutConsumed(dbId, tenantId)` with: + +```ts +private async markTimeoutConsumed(dbId: number, tenantId: string): Promise { + const row = await queryOnce((q) => + q + .from({ reg: this.requireCollection() }) + .where(({ reg }) => dbAnd(dbEq(reg.tenantId, tenantId), dbEq(reg.id, dbId))) + .findOne() + ) + if (!row) return + const tx = this.markTimeoutConsumedAction({ row }) + await tx.isPersisted.promise +} +``` + +- [ ] **Step 6: Route one-shot cleanup through the shared action** + +In `evaluate(...)`, replace direct `collection.delete(...)` one-shot cleanup with: + +```ts +if (oneShotRows.length > 0) { + const tx = this.deleteRowsAction({ + rows: oneShotRows, + persist: { kind: `oneShot` }, + }) + void tx.isPersisted.promise.catch((error) => { + console.warn(`[wake-registry] failed to persist one-shot cleanup:`, error) + }) +} +``` + +- [ ] **Step 7: Run focused tests** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/wake-registry.test.ts -t "unregisterBySource removes|oneShot match removes|evaluates registrations from local" --run +``` + +Expected: PASS. + +- [ ] **Step 8: Commit** + +```bash +git add packages/agents-server/src/wake-registry.ts packages/agents-server/test/wake-registry.test.ts +git commit -m "Move wake registry mutations to optimistic actions" +``` + +--- + +### Task 3: Drive Timeout Wakes with TanStack DB Effects + +**Files:** + +- Modify: `packages/agents-server/src/wake-registry.ts:104-177, 562-728` +- Modify: `packages/agents-server/test/wake-registry.test.ts` + +**Interfaces:** + +- Consumes: `WakeRegistrationCollectionRow`, local collection startup, optimistic timeout action. +- Produces: + - `private registrationsEffect: { dispose(): Promise } | null` + - `private startRegistrationEffect(): void` + - timeout timers synchronized from collection rows via `createEffect` + +- [ ] **Step 1: Write failing timeout effect tests** + +Add this test near existing timeout tests: + +```ts +it(`timeout effect delivers timeout wake once and marks row consumed`, async () => { + const registry = new WakeRegistry(createMockDb()) + await registry.startLocalForTests() + const delivered: Array = [] + registry.setTimeoutCallback((result) => delivered.push(result)) + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/c1`, + condition: `runFinished`, + oneShot: false, + timeoutMs: 25, + }) + + await new Promise((resolve) => setTimeout(resolve, 80)) + + expect(delivered).toHaveLength(1) + expect(delivered[0]!.wakeMessage.timeout).toBe(true) + + await new Promise((resolve) => setTimeout(resolve, 80)) + expect(delivered).toHaveLength(1) +}) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/wake-registry.test.ts -t "timeout effect delivers" --run +``` + +Expected: FAIL until the collection effect starts timers from row enter/update events. + +- [ ] **Step 3: Import `createEffect` and add effect field** + +Update imports from `@tanstack/db`: + +```ts +import { + and as dbAnd, + createCollection, + createEffect, + createOptimisticAction, + eq as dbEq, + localOnlyCollectionOptions, + queryOnce, +} from '@tanstack/db' +``` + +Add field: + +```ts +private registrationsEffect: { dispose(): Promise } | null = null +``` + +- [ ] **Step 4: Start the effect after collection preload** + +Add this method: + +```ts +private startRegistrationEffect(): void { + if (this.registrationsEffect) return + const collection = this.requireCollection() + this.registrationsEffect = createEffect({ + query: (q) => q.from({ reg: collection }), + skipInitial: false, + onEnter: ({ value }) => { + this.syncTimeoutTimer(value) + }, + onUpdate: ({ value }) => { + this.syncTimeoutTimer(value) + }, + onExit: ({ value }) => { + this.clearRegistrationState(value) + this.timeoutDelivered.delete(value.id) + }, + }) +} +``` + +Call it at the end of `startLocalForTests()` after `await this.registrationsCollection.preload()`: + +```ts +this.startRegistrationEffect() +``` + +- [ ] **Step 5: Update timeout helpers to collection row type** + +Replace old timeout helper signatures with: + +```ts +private startTimeoutTimer(reg: WakeRegistrationCollectionRow): void +private startTimeoutTimerWithDuration(reg: WakeRegistrationCollectionRow, durationMs: number): void +private syncTimeoutTimer(reg: WakeRegistrationCollectionRow): void +private deliverTimeoutForRegistration(reg: WakeRegistrationCollectionRow): void +private timeoutWakeResult(reg: WakeRegistrationCollectionRow): WakeEvalResult +``` + +Use `reg.id` instead of `dbId` inside these helpers. The key bodies should be: + +```ts +private startTimeoutTimer(reg: WakeRegistrationCollectionRow): void { + if (reg.timeoutMs <= 0) return + this.startTimeoutTimerWithDuration(reg, reg.timeoutMs) +} + +private startTimeoutTimerWithDuration( + reg: WakeRegistrationCollectionRow, + durationMs: number +): void { + const timerKey = this.registrationKey(reg) + const timer = setTimeout(() => { + this.timeoutTimers.delete(timerKey) + this.deliverTimeoutForRegistration(reg) + }, durationMs) + this.timeoutTimers.set(timerKey, timer) +} + +private syncTimeoutTimer(reg: WakeRegistrationCollectionRow): void { + const timerKey = this.registrationKey(reg) + + if (reg.timeoutConsumed || reg.timeoutMs <= 0) { + this.clearTimeoutState(timerKey) + return + } + + if (this.timeoutTimers.has(timerKey)) return + + const remaining = reg.createdAt.getTime() + reg.timeoutMs - Date.now() + if (remaining > 0) { + this.startTimeoutTimerWithDuration(reg, remaining) + return + } + + if (this.timeoutDelivered.has(reg.id)) return + this.deliverTimeoutForRegistration(reg) +} + +private deliverTimeoutForRegistration(reg: WakeRegistrationCollectionRow): void { + if (this.deliverTimeout(this.timeoutWakeResult(reg))) { + this.timeoutDelivered.add(reg.id) + void this.markTimeoutConsumed(reg.id, reg.tenantId).catch((error) => { + console.warn(`[wake-registry] failed to mark timeout consumed:`, error) + }) + } +} + +private timeoutWakeResult(reg: WakeRegistrationCollectionRow): WakeEvalResult { + return { + tenantId: reg.tenantId, + subscriberUrl: reg.subscriberUrl, + registrationDbId: reg.id, + sourceEventKey: `timeout`, + wakeMessage: { + source: reg.sourceUrl, + timeout: true, + changes: [], + }, + } +} +``` + +- [ ] **Step 6: Stop effect and clear timers on shutdown** + +Replace `stopSync()` with a general cleanup that still keeps the public method name for current call sites: + +```ts +async stopSync(): Promise { + await this.registrationsEffect?.dispose() + this.registrationsEffect = null + this.registrationsCollection = null + this.mode = `unstarted` + this.resetRuntimeState() +} + +private resetRuntimeState(): void { + for (const timer of this.debounceTimers.values()) clearTimeout(timer) + this.debounceTimers.clear() + this.debounceBuffers.clear() + this.debounceRunStatus.clear() + for (const timer of this.timeoutTimers.values()) clearTimeout(timer) + this.timeoutTimers.clear() + this.timeoutDelivered.clear() +} +``` + +- [ ] **Step 7: Run timeout tests** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/wake-registry.test.ts -t "timeout" --run +``` + +Expected: timeout tests PASS or fail only where assertions still assume old synchronous cache behavior. + +- [ ] **Step 8: Commit** + +```bash +git add packages/agents-server/src/wake-registry.ts packages/agents-server/test/wake-registry.test.ts +git commit -m "Drive wake timeouts from registry collection effects" +``` + +--- + +### Task 4: Implement Runtime Electric Collection and Postgres Txid Mutation Handlers + +**Files:** + +- Modify: `packages/agents-server/src/wake-registry.ts:1-360` +- Test: `packages/agents-server/test/wake-registry-sync.test.ts` + +**Interfaces:** + +- Consumes: local collection/action architecture from Tasks 1-3. +- Produces: + - Runtime `WakeRegistry.startSync(electricUrl, electricSecret?)` + - Electric collection with `snakeCamelMapper()` + - runtime `registerAction`, `deleteRowsAction`, `markTimeoutConsumedAction` persistence + - no direct `Shape` or `ShapeStream` usage in `WakeRegistry` + +- [ ] **Step 1: Write failing integration test for Electric-backed registry sync** + +Replace `packages/agents-server/test/wake-registry-sync.test.ts` with this integration test skeleton. It uses the existing managed test backend. + +```ts +import { afterAll, beforeAll, describe, expect, it } from 'vitest' +import { eq } from 'drizzle-orm' +import { createDb } from '../src/db' +import { wakeRegistrations } from '../src/db/schema' +import { WakeRegistry } from '../src/wake-registry' +import { + TEST_ELECTRIC_URL, + TEST_POSTGRES_URL, + resetElectricAgentsTestBackend, +} from './test-backend' + +const db = createDb(TEST_POSTGRES_URL) + +describe(`WakeRegistry Electric collection sync`, () => { + beforeAll(async () => { + await resetElectricAgentsTestBackend() + }) + + afterAll(async () => { + await db.end?.() + }) + + it(`syncs a registered wake through Postgres and Electric`, async () => { + const registry = new WakeRegistry(db as any) + await registry.startSync(TEST_ELECTRIC_URL) + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/c1`, + condition: `runFinished`, + oneShot: false, + }) + + const rows = await db + .select() + .from(wakeRegistrations) + .where(eq(wakeRegistrations.sourceUrl, `/child/c1`)) + + expect(rows).toHaveLength(1) + + const results = await registry.evaluate(`/child/c1`, { + type: `run`, + key: `run-1`, + value: { status: `completed` }, + headers: { operation: `update` }, + }) + + expect(results).toHaveLength(1) + expect(results[0]!.registrationDbId).toBe(rows[0]!.id) + + await registry.stopSync() + }) +}) +``` + +If `createDb(TEST_POSTGRES_URL)` does not expose `end`, remove the `afterAll` block and follow the existing DB helper pattern in nearby integration tests. + +- [ ] **Step 2: Run integration test to verify it fails** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/wake-registry-sync.test.ts --run +``` + +Expected: FAIL because `startSync` still uses manual ShapeStream or runtime actions do not persist and await Electric txids yet. + +- [ ] **Step 3: Import Electric collection and column mapper** + +Update imports in `wake-registry.ts`: + +```ts +import { snakeCamelMapper } from '@electric-sql/client' +import { electricCollectionOptions } from '@tanstack/electric-db-collection' +import { sql } from 'drizzle-orm' +import { electricUrlWithPath } from './utils/electric-url.js' +import { serverLog } from './utils/log.js' +``` + +Keep Drizzle `and`/`eq` imports for mutation predicates: + +```ts +import { and, eq, sql } from 'drizzle-orm' +``` + +- [ ] **Step 4: Implement sequence id allocation and txid helper** + +Add these helpers to `WakeRegistry`: + +```ts +private async allocateRuntimeId(): Promise { + const rows = await this.db.execute(sql<{ id: string }>`select nextval('wake_registrations_id_seq')::text as id`) + const value = Array.isArray(rows) ? rows[0]?.id : (rows as any)[0]?.id + const id = Number(value) + if (!Number.isInteger(id)) { + throw new Error(`Failed to allocate wake registration id`) + } + return id +} + +private async currentTxid(): Promise { + const rows = await this.db.execute(sql<{ txid: string }>`select pg_current_xact_id()::xid::text as txid`) + const value = Array.isArray(rows) ? rows[0]?.txid : (rows as any)[0]?.txid + const txid = Number(value) + if (!Number.isInteger(txid)) { + throw new Error(`Failed to read Postgres transaction id`) + } + return txid +} +``` + +If `DrizzleDB.execute(...)` returns `{ rows }` rather than an array in this codebase, adapt the extraction to: + +```ts +const result = await this.db.execute(...) +const value = result.rows[0]?.txid +``` + +Use the actual shape observed by TypeScript/tests; keep the method returning `number`. + +- [ ] **Step 5: Implement Electric `startSync`** + +Replace old `startSync(...)` with an Electric collection that syncs rows but does not persist writes itself. Persistence belongs to the optimistic action `mutationFn` methods in later steps. + +```ts +async startSync(electricUrl: string, electricSecret?: string): Promise { + if (this.registrationsCollection) { + await this.registrationsCollection.preload() + return + } + + this.mode = `electric` + this.registrationsCollection = createCollection( + electricCollectionOptions({ + id: `wake-registrations:${this.tenantId ?? `all`}`, + getKey: (row) => row.id, + shapeOptions: { + url: electricUrlWithPath(electricUrl, `/v1/shape`).toString(), + params: { + table: `wake_registrations`, + ...(this.tenantId + ? { where: `tenant_id = ${sqlStringLiteral(this.tenantId)}` } + : {}), + ...(electricSecret ? { secret: electricSecret } : {}), + columns: [ + `id`, + `tenant_id`, + `subscriber_url`, + `source_url`, + `condition`, + `debounce_ms`, + `timeout_ms`, + `one_shot`, + `timeout_consumed`, + `include_response`, + `manifest_key`, + `created_at`, + ], + replica: `full`, + }, + parser: { + timestamptz: (value: string) => new Date(value), + }, + columnMapper: snakeCamelMapper(), + }, + }) + ) + + await this.registrationsCollection.preload() + this.startRegistrationEffect() +} +``` + +- [ ] **Step 6: Implement runtime persistence helpers** + +Add these helpers: + +```ts +private async persistInsert(row: WakeRegistrationCollectionRow): Promise { + const result = await this.db.transaction(async (tx) => { + await tx + .insert(wakeRegistrations) + .values({ + id: row.id, + tenantId: row.tenantId, + subscriberUrl: row.subscriberUrl, + sourceUrl: row.sourceUrl, + condition: row.condition, + debounceMs: row.debounceMs, + timeoutMs: row.timeoutMs, + oneShot: row.oneShot, + timeoutConsumed: row.timeoutConsumed, + includeResponse: row.includeResponse, + manifestKey: row.manifestKey, + createdAt: row.createdAt, + }) + .onConflictDoNothing() + const rows = await tx.execute(sql<{ txid: string }>`select pg_current_xact_id()::xid::text as txid`) + return Number(Array.isArray(rows) ? rows[0]!.txid : (rows as any)[0]!.txid) + }) + return result +} + +private async persistTimeoutConsumed(row: WakeRegistrationCollectionRow): Promise { + return await this.db.transaction(async (tx) => { + await tx + .update(wakeRegistrations) + .set({ timeoutConsumed: row.timeoutConsumed }) + .where(and(eq(wakeRegistrations.tenantId, row.tenantId), eq(wakeRegistrations.id, row.id))) + const rows = await tx.execute(sql<{ txid: string }>`select pg_current_xact_id()::xid::text as txid`) + return Number(Array.isArray(rows) ? rows[0]!.txid : (rows as any)[0]!.txid) + }) +} + +private async persistDeleteRows(rows: Array): Promise { + return await this.db.transaction(async (tx) => { + for (const row of rows) { + await tx + .delete(wakeRegistrations) + .where(and(eq(wakeRegistrations.tenantId, row.tenantId), eq(wakeRegistrations.id, row.id))) + } + const txRows = await tx.execute(sql<{ txid: string }>`select pg_current_xact_id()::xid::text as txid`) + return Number(Array.isArray(txRows) ? txRows[0]!.txid : (txRows as any)[0]!.txid) + }) +} +``` + +If Drizzle's transaction type does not accept `.execute` in this form, follow the pattern already used in `packages/agents-server/src/entity-registry.ts` for `pg_current_xact_id()::xid::text`. + +- [ ] **Step 7: Wire runtime actions to persistence** + +Update `register(...)` to allocate runtime ids from Postgres: + +```ts +async register(reg: WakeRegistration): Promise { + const tenantId = this.resolveTenantId(reg.tenantId) + const id = this.mode === `electric` ? await this.allocateRuntimeId() : this.allocateLocalId() + const tx = this.registerAction(this.normalizeRegistration(reg, tenantId, id)) + await tx.isPersisted.promise +} +``` + +Update `registerAction.mutationFn` so the action persists once, then waits for Electric to sync that txid into the collection: + +```ts +mutationFn: async (row) => { + if (this.mode === `local-test`) return + if (this.mode === `electric`) { + const txid = await this.persistInsert(row) + await this.requireCollection().utils.awaitTxId(txid, 10_000) + return { txid } + } + throw new Error(`WakeRegistry registerAction called before startup`) +} +``` + +Update `deleteRowsAction.mutationFn`: + +```ts +mutationFn: async ({ rows }) => { + if (this.mode === `local-test`) return + if (this.mode === `electric`) { + const txid = await this.persistDeleteRows(rows) + await this.requireCollection().utils.awaitTxId(txid, 10_000) + return { txid } + } + throw new Error(`WakeRegistry deleteRowsAction called before startup`) +} +``` + +Update `markTimeoutConsumedAction.mutationFn`: + +```ts +mutationFn: async ({ row }) => { + if (this.mode === `local-test`) return + if (this.mode === `electric`) { + const txid = await this.persistTimeoutConsumed({ + ...row, + timeoutConsumed: true, + }) + await this.requireCollection().utils.awaitTxId(txid, 10_000) + return { txid } + } + throw new Error( + `WakeRegistry markTimeoutConsumedAction called before startup` + ) +} +``` + +- [ ] **Step 8: Remove manual Shape code** + +Delete these obsolete items from `wake-registry.ts`: + +```ts +interface WakeRegistrationShapeRow ... +normalizeShapeRow(...) +shapeRowMatchesRegistration(...) +waitForRegistrationInShape(...) +recoverSync(...) +loadRegistrations(...) +replaceCachedRegistrations(...) +upsertCachedRegistration(...) +removeCachedRegistrationByDbId(...) +findCachedRegistration(...) +resetCachedRegistrations(...) +``` + +Also remove all direct imports/usages of `Shape`, `ShapeStream`, `Row`, and `Value`. + +- [ ] **Step 9: Run sync integration test** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/wake-registry-sync.test.ts --run +``` + +Expected: PASS. + +- [ ] **Step 10: Commit** + +```bash +git add packages/agents-server/src/wake-registry.ts packages/agents-server/test/wake-registry-sync.test.ts +git commit -m "Use Electric collection for wake registrations" +``` + +--- + +### Task 5: Update EntityManager, Host Startup, and Async Evaluation Call Sites + +**Files:** + +- Modify: `packages/agents-server/src/entity-manager.ts:430-440, 3291-3310` +- Modify: `packages/agents-server/src/host.ts:111-120` +- Modify: `packages/agents-server/src/standalone-runtime.ts:151-157` +- Modify: `packages/agents-server/test/server-start.test.ts` +- Modify: `packages/agents-server/test/wake-registry.test.ts` + +**Interfaces:** + +- Consumes: async `WakeRegistry.evaluate(...)`, `WakeRegistry.startSync(...)`, removed `loadRegistrations()`. +- Produces: runtime startup fails without Electric URL, no reload-on-miss fallback, all tests await evaluation. + +- [ ] **Step 1: Write failing startup test for missing Electric URL** + +In `packages/agents-server/test/server-start.test.ts`, update or add a test with this assertion: + +```ts +it(`fails host startup without Electric URL for wake registry sync`, async () => { + const host = createTestHost({ electricUrl: undefined }) + + await expect(host.start()).rejects.toThrow( + `WakeRegistry runtime requires an Electric URL` + ) +}) +``` + +If `createTestHost` does not exist, follow the existing host construction helper in this file and assert the same error around `host.start()`. + +- [ ] **Step 2: Run startup test to verify it fails** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/server-start.test.ts -t "fails host startup without Electric URL" --run +``` + +Expected: FAIL because host still calls `loadRegistrations()` or the test helper needs updating. + +- [ ] **Step 3: Update `EntityManager.rebuildWakeRegistry`** + +Replace `rebuildWakeRegistry(...)` in `packages/agents-server/src/entity-manager.ts` with: + +```ts +async rebuildWakeRegistry( + electricUrl?: string, + electricSecret?: string +): Promise { + if (!electricUrl) { + throw new Error(`WakeRegistry runtime requires an Electric URL`) + } + await this.wakeRegistry.startSync(electricUrl, electricSecret) +} +``` + +- [ ] **Step 4: Await async registry evaluation and remove reload-on-miss workaround** + +In `evaluateWakes(...)`, replace: + +```ts +const results = this.wakeRegistry.evaluate(sourceUrl, event, this.tenantId) +``` + +with: + +```ts +const results = await this.wakeRegistry.evaluate( + sourceUrl, + event, + this.tenantId +) +``` + +Ensure no code remains that calls `wakeRegistry.loadRegistrations()` or retries `evaluate(...)` after a miss. + +- [ ] **Step 5: Update `AgentsHost.start`** + +Replace the wake registry startup block in `packages/agents-server/src/host.ts` with: + +```ts +if (!this.electricUrl) { + throw new Error(`WakeRegistry runtime requires an Electric URL`) +} +await this.wakeRegistry.startSync(this.electricUrl, this.electricSecret) +``` + +- [ ] **Step 6: Update test mocks** + +In `packages/agents-server/test/server-start.test.ts`, remove mocked `loadRegistrations()` from `MockWakeRegistry`. Keep: + +```ts +startSync(): Promise { + return Promise.resolve() +} +``` + +If tests need a local-only registry, use `startLocalForTests()` directly in wake registry unit tests, not host startup tests. + +- [ ] **Step 7: Convert direct unit-test evaluations to await** + +In `packages/agents-server/test/wake-registry.test.ts`, replace patterns like: + +```ts +const results = registry.evaluate(`/child/c1`, event) +``` + +with: + +```ts +const results = await registry.evaluate(`/child/c1`, event) +``` + +For inline assertions, replace: + +```ts +expect(registry.evaluate(`/child/c1`, event)).toHaveLength(1) +``` + +with: + +```ts +await expect(registry.evaluate(`/child/c1`, event)).resolves.toHaveLength(1) +``` + +- [ ] **Step 8: Run affected tests** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/server-start.test.ts test/wake-registry.test.ts --run +``` + +Expected: PASS or only failures from tests still asserting deleted manual Shape/cache behavior. + +- [ ] **Step 9: Commit** + +```bash +git add packages/agents-server/src/entity-manager.ts packages/agents-server/src/host.ts packages/agents-server/src/standalone-runtime.ts packages/agents-server/test/server-start.test.ts packages/agents-server/test/wake-registry.test.ts +git commit -m "Require Electric for wake registry runtime" +``` + +--- + +### Task 6: Remove Obsolete Manual Cache Tests and Verify Full Wake Registry Behavior + +**Files:** + +- Modify: `packages/agents-server/test/wake-registry.test.ts` +- Modify: `packages/agents-server/test/wake-registry-sync.test.ts` +- Modify: `.changeset/fix-deferred-pull-wakes.md` + +**Interfaces:** + +- Consumes: completed TanStack DB-backed registry and async call sites. +- Produces: passing targeted test suite and updated changeset summary. + +- [ ] **Step 1: Delete obsolete manual Shape/cache tests** + +Remove tests whose only subject is deleted internals: + +```ts +it(`removes cached registrations from shape delete old_value ids`, ...) +it(`ignores malformed shape messages without headers while waiting for up-to-date`, ...) +it(`hydrates and updates the cache from shape changes`, ...) +it(`reloads wake registrations ... cache miss`, ...) +``` + +Do not delete behavior tests for tenant scoping, wake matching, debounce, timeout, one-shot, unregister, or end-to-end wake delivery. + +- [ ] **Step 2: Ensure every unit test starts local registry explicitly** + +For every unit test that creates `new WakeRegistry(createMockDb())`, add: + +```ts +await registry.startLocalForTests() +``` + +before `register(...)` or `evaluate(...)`. + +Do not add `startLocalForTests()` to integration tests that use `ElectricAgentsServer`, `EntityManager`, or real `TEST_ELECTRIC_URL`; those should exercise runtime `startSync(...)`. + +- [ ] **Step 3: Run all wake registry tests** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/wake-registry.test.ts test/wake-registry-sync.test.ts --run +``` + +Expected: PASS. + +- [ ] **Step 4: Run broader affected agent-server tests** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server test test/server-start.test.ts test/horton-pull-wake-e2e.test.ts test/pg-sync-wake-delivery.test.ts --run +``` + +Expected: PASS. + +- [ ] **Step 5: Run typecheck** + +Run: + +```bash +pnpm --filter @electric-ax/agents-server typecheck +``` + +Expected: PASS with no TypeScript errors. + +- [ ] **Step 6: Update changeset** + +Edit `.changeset/fix-deferred-pull-wakes.md` so the `@electric-ax/agents-server` entry mentions the TanStack DB registry refactor. Keep the existing runtime changeset text for `@electric-ax/agents-runtime` intact. + +Use wording like: + +```md +Refactor the server wake registry to use TanStack DB collections and optimistic actions over `wake_registrations`, removing the manual ShapeStream-backed registration cache and stale-cache reload fallback. +``` + +- [ ] **Step 7: Validate changeset coverage** + +Run: + +```bash +GITHUB_BASE_REF=main node scripts/check-changeset.mjs +``` + +Expected: success message that changesets cover affected packages. + +- [ ] **Step 8: Commit** + +```bash +git add packages/agents-server/test/wake-registry.test.ts packages/agents-server/test/wake-registry-sync.test.ts .changeset/fix-deferred-pull-wakes.md +git commit -m "Verify TanStack DB wake registry behavior" +``` + +--- + +## Self-Review + +**Spec coverage:** + +- Collection as only in-memory state: Tasks 1, 4, 6 remove `registrationCache`, `Shape`, and Shape tests. +- Runtime Electric collection: Task 4. +- Unit-test local-only collection: Tasks 1, 2, 6. +- Optimistic actions for all mutations: Task 2 and runtime persistence in Task 4. +- `queryOnce` evaluation: Task 1, call sites in Task 5. +- `createEffect` timeout side effects: Task 3. +- Remove reload-on-miss and `loadRegistrations()`: Tasks 4 and 5. +- Tests and verification: Tasks 1-6 include red/green commands and final typecheck. + +**Placeholder scan:** No `TBD`, `TODO`, `implement later`, or `similar to Task N` placeholders remain. The one conditional instruction about Drizzle execute shape gives exact fallback code paths and a required return type. + +**Type consistency:** The plan consistently uses `WakeRegistrationCollectionRow.id: number` as the TanStack DB key, `WakeRegistry.startLocalForTests()`, async `WakeRegistry.evaluate(...)`, and shared optimistic actions. Later tasks consume names introduced in earlier tasks. diff --git a/docs/superpowers/specs/2026-03-23-wake-registry-tanstack-db-design.md b/docs/superpowers/specs/2026-03-23-wake-registry-tanstack-db-design.md new file mode 100644 index 0000000000..ffd8119e8e --- /dev/null +++ b/docs/superpowers/specs/2026-03-23-wake-registry-tanstack-db-design.md @@ -0,0 +1,370 @@ +# Wake Registry TanStack DB Design + +## Context + +The current child wake delivery fix compensates for stale or incomplete wake registry state by reloading `wake_registrations` from Postgres when `runFinished` evaluation misses the in-memory cache. That fixes one symptom, but the underlying architecture still has two flawed state paths: + +1. Tests imperatively manage an in-memory cache. +2. Runtime manually syncs a Postgres table into that cache with `Shape` / `ShapeStream`. + +The desired replacement is to use TanStack DB as the wake registry state engine. + +## Goals + +- Make TanStack DB collection state the only in-memory source of wake registration rows. +- Use an Electric-backed collection in runtime so Postgres changes sync through Electric. +- Use a local-only collection only in unit tests. +- Use optimistic actions for every wake registry mutation. +- Use `queryOnce` for one-shot evaluation reads. +- Use `createEffect` for wake timeout timer side effects. +- Remove manual Shape subscription/cache code and the reload-on-miss workaround. + +## Non-goals + +- No `wake_registrations` schema migration. +- No custom TanStack DB adapter. +- No pull-wake runner changes. +- No changes to persisted wake row payloads. +- No broad `EntityManager` refactor beyond async wake evaluation call sites. + +## Architecture + +`WakeRegistry` owns one TanStack DB collection and one effect handle: + +```ts +private registrationsCollection: Collection | null +private registrationsEffect: { dispose(): Promise } | null +``` + +It no longer owns an authoritative `registrationCache`, `Shape`, `ShapeStream`, shape unsubscribe callback, or shape recovery promise. + +The existing runtime delivery state remains in `WakeRegistry` because it is not registration row state: + +- debounce timers +- debounce buffers +- latest debounced run status +- timeout timers +- timeout-delivered tracking +- timeout/debounce callbacks + +### Collection row shape + +Use camelCase fields internally: + +```ts +type WakeRegistrationCollectionRow = { + id: number + tenantId: string + subscriberUrl: string + sourceUrl: string + condition: WakeRegistration['condition'] + debounceMs: number + timeoutMs: number + oneShot: boolean + timeoutConsumed: boolean + includeResponse: boolean + manifestKey: string | null + createdAt: Date +} +``` + +`id` is both the TanStack DB collection key and the Postgres `wake_registrations.id`. Persisted rows already have a unique serial id, so the collection should use it directly rather than inventing a separate deterministic key. + +For newly registered rows, the action input must include an id before `onMutate` runs, because optimistic actions apply collection changes synchronously. Runtime can preallocate the id from the Postgres sequence and then insert with that explicit id in the action `mutationFn`. Local-only tests can allocate ids from an in-memory counter. + +## Runtime collection + +`startSync(electricUrl, electricSecret)` creates an Electric collection over `wake_registrations`: + +```ts +createCollection( + electricCollectionOptions({ + id: `wake-registrations:${tenantId ?? 'all'}`, + getKey: (row) => row.id, + schema, + shapeOptions: { + url: electricUrlWithPath(electricUrl, '/v1/shape').toString(), + params: { + table: 'wake_registrations', + ...(tenantId + ? { where: `tenant_id = ${sqlStringLiteral(tenantId)}` } + : {}), + columns: [ + 'id', + 'tenant_id', + 'subscriber_url', + 'source_url', + 'condition', + 'debounce_ms', + 'timeout_ms', + 'one_shot', + 'timeout_consumed', + 'include_response', + 'manifest_key', + 'created_at', + ], + replica: 'full', + ...(electricSecret ? { secret: electricSecret } : {}), + }, + parser: { + timestamptz: (value: string) => new Date(value), + }, + columnMapper: snakeCamelMapper(), + }, + onInsert, + onUpdate, + onDelete, + }) +) +``` + +Use Electric's built-in `snakeCamelMapper()` in `shapeOptions.columnMapper` to map snake_case Postgres columns to camelCase collection fields. `created_at` must still parse to `Date` on the sync path via the `timestamptz` parser. + +Startup fails if the collection cannot preload. Running with an empty registry after failed sync would silently drop terminal wake events. + +## Local-only collection + +Runtime requires Electric. There is no no-Electric Postgres fallback and no replacement for the old `loadRegistrations()` rebuild path. + +For unit tests only, create or replace a local-only collection: + +```ts +createCollection( + localOnlyCollectionOptions({ + id: `wake-registrations-local:${tenantId ?? 'all'}`, + getKey: (row) => row.id, + initialData: [], + }) +) +``` + +Unit tests should not need Postgres or Electric just to exercise registry logic. They can start with an empty local-only collection and create rows through the same optimistic actions used by runtime. + +Integration tests that need real persistence or cross-process sync should use Postgres + Electric and `startSync(...)`, not a local collection seeded from Postgres. Server startup without an Electric URL should fail rather than silently rebuilding local state from Postgres. + +## Mutation model + +All `WakeRegistry` mutations use `createOptimisticAction`. Public registry methods do not call `collection.insert`, `collection.update`, or `collection.delete` directly. + +Actions: + +- `registerAction` +- `unregisterByManifestKeyAction` +- `unregisterBySubscriberAction` +- `unregisterBySourceAction` +- `unregisterBySubscriberAndSourceAction` +- `markTimeoutConsumedAction` +- `consumeMatchedRegistrationsAction` or equivalent one-shot cleanup action + +Each action has two responsibilities: + +1. `onMutate`: synchronously apply optimistic changes to the collection. +2. `mutationFn`: persist the intent to Postgres in runtime mode and return/await a txid. In local-only mode, this is a no-op or minimal test hook. + +### Register + +`register(reg)` resolves the tenant, normalizes defaults, invokes `registerAction`, and awaits `tx.isPersisted.promise`. + +`registerAction.onMutate` inserts or upserts the normalized row into the collection. + +`registerAction.mutationFn` performs the existing insert: + +- insert into `wake_registrations` +- `ON CONFLICT DO NOTHING` +- insert with the preallocated id +- if conflicted, fetch the existing row and transaction id + +The transaction id comes from the same Postgres transaction: + +```sql +SELECT pg_current_xact_id()::xid::text AS txid +``` + +The Electric collection should hold optimistic state until that txid syncs back. Because the action preallocates the numeric id, `WakeEvalResult.registrationDbId` can use the collection key directly. + +### Bulk unregister actions + +Bulk unregister methods are domain intents, not repeated ad-hoc row deletes. Each uses one optimistic action: + +- `unregisterByManifestKey(subscriberUrl, manifestKey, tenantId?)` +- `unregisterBySubscriber(subscriberUrl, tenantId?)` +- `unregisterBySource(sourceUrl, tenantId?)` +- `unregisterBySubscriberAndSource(subscriberUrl, sourceUrl, tenantId?)` + +Each action’s `onMutate` deletes all currently matching collection rows synchronously. + +Each action’s `mutationFn` runs one SQL/Drizzle delete statement for the same predicate and returns/awaits the resulting txid. + +Public methods await `tx.isPersisted.promise` because callers expect unregister completion. + +### Timeout consumed + +When a wake timeout is delivered or a matching event clears a timeout, `markTimeoutConsumedAction` updates the row: + +```ts +draft.timeoutConsumed = true +``` + +Its runtime `mutationFn` persists `timeout_consumed = true` and returns/awaits a txid. + +Current fire-and-forget semantics can stay fire-and-forget, but failures must be logged with tenant id, registration id, source url, and subscriber url. + +### One-shot cleanup + +When `evaluate()` matches one-shot registrations, it invokes a cleanup action that deletes the matched rows optimistically before `evaluate()` returns. This prevents immediate repeated evaluation from double-delivering. + +Persistence can remain fire-and-forget if the current path is fire-and-forget, with logged failures. + +## Evaluation reads + +`evaluate()` becomes async: + +```ts +async evaluate( + sourceUrl: string, + event: Record, + tenantId?: string +): Promise> +``` + +It reads current registrations with `queryOnce`: + +```ts +const regs = await queryOnce((q) => + q + .from({ reg: this.requireRegistrationsCollection() }) + .where(({ reg }) => + and(eq(reg.tenantId, resolvedTenantId), eq(reg.sourceUrl, sourceUrl)) + ) +) +``` + +`queryOnce` is async because a source collection may need preload. In the registry’s normal runtime path the collection is already loaded, so this should usually be a cheap microtick async boundary. + +The existing condition matching and wake result construction can remain mostly unchanged: + +- `runFinished` matches terminal run updates +- collection-change conditions match collection names and ops +- immediate matches return `WakeEvalResult[]` +- debounced matches append to debounce buffers +- timeout timers are cleared when the expected event arrives first + +No code path should reload all registrations from Postgres on an evaluation miss. + +## Timeout wake side effects + +Wake registration `timeoutMs` measures how long a registration has been waiting for its matching source event. If that event does not arrive before `createdAt + timeoutMs`, the subscriber receives a timeout wake: + +```ts +wakeMessage: { + source: reg.sourceUrl, + timeout: true, + changes: [], +} +``` + +Use `createEffect` over the registration collection to manage these Node timers: + +```ts +createEffect({ + query: (q) => q.from({ reg: registrationsCollection }), + skipInitial: false, + onEnter: ({ value }) => syncTimeoutTimer(value), + onUpdate: ({ value }) => syncTimeoutTimer(value), + onExit: ({ value }) => clearRegistrationState(value), +}) +``` + +Timer behavior: + +- On enter/update, if `timeoutMs <= 0` or `timeoutConsumed`, ensure no timer is active. +- If `createdAt + timeoutMs` is in the future, schedule one Node timer. +- If the deadline has already passed and the timeout has not been delivered, deliver the timeout wake once if a tenant callback is registered. +- On exit, clear timeout/debounce state and remove the row from `timeoutDelivered`. + +When a timeout fires, it delivers the existing timeout wake result and invokes `markTimeoutConsumedAction`. + +## Error handling + +- Startup fails if the Electric collection preload fails. +- Local-only unit-test initialization should not perform DB I/O. +- Runtime startup without an Electric URL fails clearly; there is no Postgres-seeded fallback. +- Awaited public mutation methods reject if persistence fails; TanStack DB rolls back optimistic state. +- Fire-and-forget action failures are logged with enough context to diagnose the row and intent. +- Electric collection owns Shape lifecycle and retry behavior. `WakeRegistry` should not reimplement Shape recovery. +- Evaluation misses are not retried by reloading all rows from Postgres. + +## Call-site changes + +- Update every `wakeRegistry.evaluate(...)` call to `await wakeRegistry.evaluate(...)`. +- Remove `EntityManager.evaluateWakes()` retry/reload-on-cache-miss logic. +- Remove `loadRegistrations()` and update startup paths to require `startSync(...)` for runtime. +- Keep `flushDebounce(...)` synchronous unless it starts reading registration rows. It currently only drains debounce buffers. + +## Test plan + +Update existing wake registry unit tests: + +- Convert direct `registry.evaluate(...)` assertions to `await registry.evaluate(...)`. +- Preserve coverage for: + - tenant scoping + - `runFinished` matching + - collection-change matching + - debounce coalescing + - timeout delivery + - timeout not consumed before callback registration + - one-shot cleanup + - unregister variants + - `includeResponse` + - concurrent child `runFinished` delivery + +Remove or replace tests that only cover manual Shape cache machinery: + +- direct `applyShapeMessage` tests +- malformed Shape message handling +- custom Shape recovery behavior + +Add TanStack DB-specific tests: + +- `registerAction` preallocates an id, inserts into the local collection, and `evaluate()` sees it through `queryOnce`. +- Each bulk unregister action removes all matching rows optimistically. +- One-shot cleanup removes matched rows before a second immediate evaluation. +- `markTimeoutConsumedAction` prevents repeat timeout delivery. +- Local-only unit tests require no Postgres, Electric, or Electric mock. + +Add or keep an integration-style Electric test if practical: + +- registry A registers a wake through Postgres/Electric collection +- registry B observes the row through Electric collection +- a terminal child run event evaluated by registry B produces the parent wake without any reload-on-miss fallback + +## Files likely touched + +- `packages/agents-server/package.json` + - add `@tanstack/db` + - add `@tanstack/electric-db-collection` +- `packages/agents-server/src/wake-registry.ts` + - main refactor +- `packages/agents-server/src/entity-manager.ts` + - await async evaluation and remove reload-on-miss fallback +- `packages/agents-server/test/wake-registry.test.ts` + - update async assertions and action expectations +- `packages/agents-server/test/wake-registry-sync.test.ts` + - remove/replace manual Shape tests with Postgres + Electric integration coverage where needed +- `.changeset/*` + - update package changeset if implementation changes dependencies or behavior + +## Acceptance criteria + +- `WakeRegistry` has no authoritative `Map` cache. +- `WakeRegistry` does not instantiate `Shape` or `ShapeStream` directly. +- Runtime registration state comes from an Electric TanStack DB collection. +- Unit-test registration state comes from a local-only TanStack DB collection. +- Runtime startup without Electric fails instead of calling `loadRegistrations()`. +- All public registry mutations invoke optimistic actions. +- Bulk unregister operations are one action per domain intent. +- Runtime mutation handlers return/await real Postgres txids. +- Wake evaluation uses `queryOnce` over the collection. +- Timeout wake timers are driven by `createEffect` over collection rows. +- `EntityManager` no longer reloads registrations on terminal evaluation miss. +- Existing wake registry behavior tests pass after async/action updates. diff --git a/packages/agents-runtime/src/pull-wake-runner.ts b/packages/agents-runtime/src/pull-wake-runner.ts index 04175fff6f..5ea066afaf 100644 --- a/packages/agents-runtime/src/pull-wake-runner.ts +++ b/packages/agents-runtime/src/pull-wake-runner.ts @@ -108,6 +108,7 @@ export function createPullWakeRunner( let heartbeatTimer: ReturnType | null = null let eventHeartbeatTimer: ReturnType | null = null let heartbeatInFlight: Promise | null = null + let heartbeatInFlightSignal: AbortSignal | null = null let heartbeatPending = false let currentOffset = config.offset ?? `-1` let startedAt: string | null = null @@ -235,10 +236,15 @@ export function createPullWakeRunner( const requestHeartbeat = (signal: AbortSignal): void => { if (signal.aborted) return heartbeatPending = true - if (heartbeatInFlight) return - heartbeatInFlight = flushHeartbeats(signal).finally(() => { - heartbeatInFlight = null + if (heartbeatInFlight && heartbeatInFlightSignal === signal) return + const inFlight = flushHeartbeats(signal).finally(() => { + if (heartbeatInFlight === inFlight) { + heartbeatInFlight = null + heartbeatInFlightSignal = null + } }) + heartbeatInFlight = inFlight + heartbeatInFlightSignal = signal } const flushHeartbeats = async (signal: AbortSignal): Promise => { @@ -358,35 +364,60 @@ export function createPullWakeRunner( deferredWakeEventsByStreamPath.clear() } - const scheduleDeferredWakeClaim = ( - event: PullWakeEvent, + const drainQueuedWakeClaims = ( + streamPath: string, signal: AbortSignal ): void => { - const streamPath = normalizeStreamPath(event.stream) - deferredWakeEventsByStreamPath.set(streamPath, event) - if (deferredWakeTimersByStreamPath.has(streamPath)) return - - const retry = (): void => { + const timer = deferredWakeTimersByStreamPath.get(streamPath) + if (timer) { + clearTimeout(timer) deferredWakeTimersByStreamPath.delete(streamPath) - const deferredEvent = deferredWakeEventsByStreamPath.get(streamPath) - if (!deferredEvent || signal.aborted || !isRunningState()) { - deferredWakeEventsByStreamPath.delete(streamPath) - return - } - if (hasActiveStreamClaim(streamPath)) { - scheduleDeferredWakeClaim(deferredEvent, signal) - return - } + } + const deferredEvent = deferredWakeEventsByStreamPath.get(streamPath) + if (!deferredEvent || signal.aborted || !isRunningState()) { deferredWakeEventsByStreamPath.delete(streamPath) - spawnClaimActor(deferredEvent, signal) + return } + if (hasActiveStreamClaim(streamPath)) { + scheduleDeferredWakeRetry(streamPath, signal) + return + } + + deferredWakeEventsByStreamPath.delete(streamPath) + spawnClaimActor(deferredEvent, signal) + } + + const scheduleDeferredWakeRetry = ( + streamPath: string, + signal: AbortSignal + ): void => { + if (deferredWakeTimersByStreamPath.has(streamPath)) return - const timer = setTimeout(retry, DEFERRED_WAKE_RETRY_MS) + const timer = setTimeout( + () => drainQueuedWakeClaims(streamPath, signal), + DEFERRED_WAKE_RETRY_MS + ) timer.unref?.() deferredWakeTimersByStreamPath.set(streamPath, timer) } + const scheduleDeferredWakeClaim = ( + event: PullWakeEvent, + signal: AbortSignal + ): void => { + const streamPath = normalizeStreamPath(event.stream) + if (!deferredWakeEventsByStreamPath.has(streamPath)) { + deferredWakeEventsByStreamPath.set(streamPath, event) + } + if (hasActiveStreamClaim(streamPath)) { + recordClaimSkipped() + scheduleDeferredWakeRetry(streamPath, signal) + return + } + drainQueuedWakeClaims(streamPath, signal) + } + const claimWake = async ( event: PullWakeEvent, signal: AbortSignal @@ -523,7 +554,7 @@ export function createPullWakeRunner( eventsReceived++ notifyHeartbeatChange() const signal = controller?.signal - if (signal && !signal.aborted) spawnClaimActor(event, signal) + if (signal && !signal.aborted) scheduleDeferredWakeClaim(event, signal) }, onOffset: (offset) => { if (offset !== currentOffset) { diff --git a/packages/agents-runtime/test/pull-wake-runner.test.ts b/packages/agents-runtime/test/pull-wake-runner.test.ts index 519a9c86f2..24cf53358b 100644 --- a/packages/agents-runtime/test/pull-wake-runner.test.ts +++ b/packages/agents-runtime/test/pull-wake-runner.test.ts @@ -278,6 +278,108 @@ describe(`createPullWakeRunner`, () => { await runner.stop() }) + it(`coalesces wake notifications skipped while the same stream is already being claimed`, async () => { + const events = [ + { ...wakeEvent(`parent`), generation: 1 }, + { ...wakeEvent(`parent`), generation: 2 }, + { ...wakeEvent(`parent`), generation: 3 }, + ] + const claims = [`one`, `coalesced`].map(notification) + const firstClaimResponse = deferred() + const fetchMock = vi + .fn() + .mockImplementationOnce( + (_input: RequestInfo | URL) => firstClaimResponse.promise + ) + .mockImplementationOnce(async (_input: RequestInfo | URL) => + Response.json(claims[1]) + ) + vi.stubGlobal(`fetch`, fetchMock) + const testRuntime = runtime() + const streamFactory = vi.fn(async () => ({ + offset: `42`, + async *jsonStream() { + yield* events + }, + closed: Promise.resolve(), + })) + + const runner = createPullWakeRunner({ + baseUrl: `http://server`, + runnerId: `runner-1`, + runtime: testRuntime, + heartbeatIntervalMs: 0, + eventHeartbeatThrottleMs: 0, + streamFactory, + }) + + runner.start() + await waitFor(() => { + expect(fetchMock).toHaveBeenCalledTimes(1) + }) + expect(testRuntime.dispatchWake).not.toHaveBeenCalled() + + firstClaimResponse.resolve(Response.json(claims[0])) + await waitFor(() => { + expect(fetchMock).toHaveBeenCalledTimes(2) + }) + await waitFor(() => { + expect(testRuntime.dispatchWake).toHaveBeenCalledTimes(2) + }) + + await runner.stop() + }) + + it(`does not let a new same-stream notification jump ahead of a queued trigger`, async () => { + const continueStream = deferred() + const firstClaimResponse = deferred() + const requestGenerations: Array = [] + const fetchMock = vi.fn( + async (_input: RequestInfo | URL, init?: RequestInit) => { + requestGenerations.push(JSON.parse(String(init?.body)).generation) + if (requestGenerations.length === 1) return firstClaimResponse.promise + return Response.json(notification(`coalesced`)) + } + ) + vi.stubGlobal(`fetch`, fetchMock) + const testRuntime = runtime() + const streamFactory = vi.fn(async () => ({ + offset: `42`, + async *jsonStream() { + yield { ...wakeEvent(`parent`), generation: 1 } + yield { ...wakeEvent(`parent`), generation: 2 } + await continueStream.promise + yield { ...wakeEvent(`parent`), generation: 3 } + }, + closed: Promise.resolve(), + })) + + const runner = createPullWakeRunner({ + baseUrl: `http://server`, + runnerId: `runner-1`, + runtime: testRuntime, + heartbeatIntervalMs: 0, + eventHeartbeatThrottleMs: 0, + streamFactory, + }) + + runner.start() + await waitFor(() => { + expect(requestGenerations).toEqual([1]) + }) + + firstClaimResponse.resolve(Response.json(notification(`one`))) + continueStream.resolve() + + await waitFor(() => { + expect(requestGenerations).toEqual([1, 2]) + }) + await new Promise((resolve) => setTimeout(resolve, 50)) + expect(requestGenerations).toEqual([1, 2]) + + await runner.stop() + }) + it(`skips stale wake events when claim returns no pending work`, async () => { const event = wakeEvent(`one`) const fetchMock = vi.fn(async (_input: RequestInfo | URL) => diff --git a/packages/agents-server/package.json b/packages/agents-server/package.json index 9fcdbae14a..6c74c5c5e9 100644 --- a/packages/agents-server/package.json +++ b/packages/agents-server/package.json @@ -53,6 +53,8 @@ "@mariozechner/pi-agent-core": "^0.70.2", "@opentelemetry/api": "^1.9.1", "@sinclair/typebox": "^0.34.48", + "@tanstack/db": "^0.6.7", + "@tanstack/electric-db-collection": "^0.3.5", "@whatwg-node/server": "^0.10.18", "ajv": "^8.18.0", "cron-parser": "^5.5.0", diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index 200f518661..26d9c972f1 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -81,6 +81,7 @@ type SpawnPersistResult = [ PromiseSettledResult, ] type SpawnPersistJob = () => Promise + type WriteTokenValidator = ( entity: ElectricAgentsEntity, token: string @@ -429,12 +430,10 @@ export class EntityManager { electricUrl?: string, electricSecret?: string ): Promise { - if (electricUrl) { - await this.wakeRegistry.startSync(electricUrl, electricSecret) - return + if (!electricUrl) { + throw new Error(`WakeRegistry runtime requires an Electric URL`) } - - await this.wakeRegistry.loadRegistrations() + await this.wakeRegistry.startSync(electricUrl, electricSecret) } setWriteTokenValidator(validator: WriteTokenValidator): void { @@ -3292,7 +3291,7 @@ export class EntityManager { ): Promise { return await withSpan(`electric_agents.evaluateWakes`, async (span) => { span.setAttribute(ATTR.WAKE_SOURCE, sourceUrl) - const results = this.wakeRegistry.evaluate( + let results = await this.wakeRegistry.evaluate( sourceUrl, event, this.tenantId @@ -4079,9 +4078,19 @@ export class EntityManager { // Ensure the backing stream exists const exists = await this.streamClient.exists(streamPath) if (!exists) { - await this.streamClient.create(streamPath, { - contentType: `application/json`, - }) + try { + await this.streamClient.create(streamPath, { + contentType: `application/json`, + }) + } catch (err) { + const status = + err instanceof Error && `status` in err + ? (err as { status?: unknown }).status + : undefined + if (status !== 409 && !/\b409\b/.test(String(err))) { + throw err + } + } } const fireAt = getNextCronFireAt(spec.expression, spec.timezone) diff --git a/packages/agents-server/src/host.ts b/packages/agents-server/src/host.ts index de556f8bdd..21d281b614 100644 --- a/packages/agents-server/src/host.ts +++ b/packages/agents-server/src/host.ts @@ -113,11 +113,10 @@ export class AgentsHost { this.running = true try { - if (this.electricUrl) { - await this.wakeRegistry.startSync(this.electricUrl, this.electricSecret) - } else { - await this.wakeRegistry.loadRegistrations() + if (!this.electricUrl) { + throw new Error(`WakeRegistry runtime requires an Electric URL`) } + await this.wakeRegistry.startSync(this.electricUrl, this.electricSecret) if (this.startEntityBridgeManager) { await this.entityProjector.start() } diff --git a/packages/agents-server/src/wake-registry.ts b/packages/agents-server/src/wake-registry.ts index ff8ef6e75b..be2ff670d9 100644 --- a/packages/agents-server/src/wake-registry.ts +++ b/packages/agents-server/src/wake-registry.ts @@ -1,15 +1,35 @@ +import { snakeCamelMapper } from '@electric-sql/client' import { - ShapeStream, - isChangeMessage, - isControlMessage, -} from '@electric-sql/client' -import { and, eq } from 'drizzle-orm' + and as dbAnd, + createCollection, + createEffect, + createOptimisticAction, + eq as dbEq, + localOnlyCollectionOptions, + queryOnce, +} from '@tanstack/db' +import { and, eq, inArray, sql } from 'drizzle-orm' +import { electricCollectionOptions } from '@tanstack/electric-db-collection' import { wakeRegistrations } from './db/schema.js' import { serverLog } from './utils/log.js' import { electricUrlWithPath } from './utils/electric-url.js' import { DEFAULT_TENANT_ID } from './tenant.js' import type { DrizzleDB } from './db/index.js' -import type { Message, Row, Value } from '@electric-sql/client' +import type { Collection } from '@tanstack/db' + +class WakeRegistrationConflictError extends Error { + constructor(readonly row: WakeRegistrationCollectionRow) { + super(`Wake registration insert conflicted with an existing row`) + this.name = `WakeRegistrationConflictError` + } +} + +class WakeRegistrationStaleError extends Error { + constructor() { + super(`Wake registration row was no longer present`) + this.name = `WakeRegistrationStaleError` + } +} export interface WakeRegistration { tenantId?: string @@ -58,26 +78,46 @@ export interface WakeEvalResult { export type WakeTimeoutCallback = (result: WakeEvalResult) => void export type WakeDebounceCallback = (result: WakeEvalResult) => void -interface CachedWakeRegistration extends WakeRegistration { +export interface WakeRegistrationCollectionRow { + id: number tenantId: string - dbId: number - createdAt?: Date - timeoutConsumed?: boolean + subscriberUrl: string + sourceUrl: string + condition: WakeRegistration[`condition`] + debounceMs: number + timeoutMs: number + oneShot: boolean + timeoutConsumed: boolean + includeResponse: boolean + manifestKey: string | null + createdAt: Date } -interface WakeRegistrationShapeRow extends Row { - id: number - tenant_id: string - subscriber_url: string - source_url: string - condition: WakeRegistration[`condition`] & Value - debounce_ms: number - timeout_ms: number - one_shot: boolean - timeout_consumed: boolean - include_response: boolean - manifest_key: string | null - created_at: Date +type WakeRegistryMode = `unstarted` | `local-test` | `electric` + +type PersistInsertResult = { + txid: number + row: WakeRegistrationCollectionRow +} + +type DeleteRowsInput = { + rows: Array + persist: + | { + kind: `manifestKey` + tenantId: string + subscriberUrl: string + manifestKey: string + } + | { kind: `subscriber`; tenantId: string; subscriberUrl: string } + | { kind: `source`; tenantId: string; sourceUrl: string } + | { + kind: `subscriberAndSource` + tenantId: string + subscriberUrl: string + sourceUrl: string + } + | { kind: `oneShot` } } function wakeSourceEventId(event: Record): string { @@ -105,9 +145,18 @@ function sqlStringLiteral(value: string): string { return `'${value.replace(/'/g, `''`)}'` } +let nextWakeRegistryCollectionInstance = 1 + export class WakeRegistry { private db: DrizzleDB - private registrationCache = new Map>() + private registrationsCollection: Collection< + WakeRegistrationCollectionRow, + number, + any + > | null = null + private mode: WakeRegistryMode = `unstarted` + private nextLocalId = 1 + private registrationsEffect: { dispose(): Promise } | null = null private debounceTimers = new Map() private debounceBuffers = new Map< string, @@ -118,12 +167,7 @@ export class WakeRegistry { private timeoutDelivered = new Set() private timeoutCallbacks = new Map() private debounceCallbacks = new Map() - private syncElectricUrl: string | null = null - private syncElectricSecret: string | undefined - private syncAbortController: AbortController | null = null - private syncUnsubscribe: (() => void) | null = null - private syncReadyPromise: Promise | null = null - private syncRecoveryPromise: Promise | null = null + private readonly collectionInstance = nextWakeRegistryCollectionInstance++ constructor( db: DrizzleDB, @@ -132,10 +176,220 @@ export class WakeRegistry { this.db = db } + requireCollection(): Collection { + if (!this.registrationsCollection) { + throw new Error(`WakeRegistry has not been started`) + } + return this.registrationsCollection + } + + async startLocalForTests(): Promise { + if (this.registrationsCollection) return + this.mode = `local-test` + this.registrationsCollection = createCollection( + localOnlyCollectionOptions({ + id: `wake-registrations-local:${this.tenantId ?? `all`}`, + getKey: (row) => row.id, + initialData: [], + }) + ) + await this.requireCollection().preload() + this.startRegistrationEffect() + } + + private startRegistrationEffect(): void { + if (this.registrationsEffect) return + const collection = this.requireCollection() + this.registrationsEffect = createEffect< + WakeRegistrationCollectionRow, + number + >({ + query: (q) => q.from({ reg: collection }), + skipInitial: false, + onEnter: ({ value }) => { + this.syncTimeoutTimer(value) + }, + onUpdate: ({ value }) => { + this.syncTimeoutTimer(value) + }, + onExit: ({ value }) => { + this.clearRegistrationState(value) + this.timeoutDelivered.delete(value.id) + }, + }) + } + + private allocateLocalId(): number { + return this.nextLocalId++ + } + + private normalizeRegistration( + reg: WakeRegistration, + tenantId: string, + id: number + ): WakeRegistrationCollectionRow { + return { + id, + tenantId, + subscriberUrl: reg.subscriberUrl, + sourceUrl: reg.sourceUrl, + condition: reg.condition, + debounceMs: reg.debounceMs ?? 0, + timeoutMs: reg.timeoutMs ?? 0, + oneShot: reg.oneShot, + timeoutConsumed: false, + includeResponse: reg.includeResponse !== false, + manifestKey: reg.manifestKey ?? ``, + createdAt: new Date(), + } + } + + private normalizeQueriedRows( + rows: Array< + WakeRegistrationCollectionRow | { reg: WakeRegistrationCollectionRow } + > + ): Array { + return rows.map( + (row) => + ((row as { reg?: WakeRegistrationCollectionRow }).reg ?? + row) as WakeRegistrationCollectionRow + ) + } + + private async rowsByPredicate( + predicate: (row: WakeRegistrationCollectionRow) => boolean + ): Promise> { + const rows = await queryOnce((q) => + q.from({ reg: this.requireCollection() }) + ) + return this.normalizeQueriedRows( + rows as Array< + WakeRegistrationCollectionRow | { reg: WakeRegistrationCollectionRow } + > + ).filter(predicate) + } + + private async rowsForSource( + tenantId: string, + sourceUrl: string + ): Promise> { + const rows = await queryOnce((q) => + q + .from({ reg: this.requireCollection() }) + .where(({ reg }) => + dbAnd(dbEq(reg.tenantId, tenantId), dbEq(reg.sourceUrl, sourceUrl)) + ) + ) + return this.normalizeQueriedRows( + rows as Array< + WakeRegistrationCollectionRow | { reg: WakeRegistrationCollectionRow } + > + ) + } + + private registerAction = + createOptimisticAction({ + onMutate: (row) => { + this.requireCollection().insert(row) + }, + mutationFn: async (row, { transaction }) => { + if (this.mode === `local-test`) { + this.requireCollection().utils.acceptMutations(transaction) + return + } + if (this.mode === `electric`) { + const result = await this.persistInsert(row) + try { + await this.requireCollection().utils.awaitTxId(result.txid, 10_000) + } catch (error) { + if ( + error instanceof Error && + error.name === `TimeoutWaitingForTxIdError` + ) { + this.requireCollection().utils.acceptMutations(transaction) + return { txid: result.txid } + } + throw error + } + return { txid: result.txid } + } + throw new Error(`WakeRegistry registerAction called before startup`) + }, + }) + + private deleteRowsAction = createOptimisticAction({ + onMutate: ({ rows }) => { + const collection = this.requireCollection() + for (const row of rows) { + this.clearRegistrationState(row) + this.timeoutDelivered.delete(row.id) + collection.delete(row.id) + } + }, + mutationFn: async (input, { transaction }) => { + if (this.mode === `local-test`) { + this.requireCollection().utils.acceptMutations(transaction) + return + } + if (this.mode === `electric`) { + const txid = await this.persistDeleteRows(input) + if (txid === undefined) { + this.requireCollection().utils.acceptMutations(transaction) + return + } + try { + await this.requireCollection().utils.awaitTxId(txid, 10_000) + } catch (error) { + if ( + error instanceof Error && + error.name === `TimeoutWaitingForTxIdError` + ) { + this.requireCollection().utils.acceptMutations(transaction) + return { txid } + } + throw error + } + return { txid } + } + throw new Error(`WakeRegistry deleteRowsAction called before startup`) + }, + }) + + private markTimeoutConsumedAction = createOptimisticAction<{ + row: WakeRegistrationCollectionRow + }>({ + onMutate: ({ row }) => { + this.requireCollection().update(row.id, (draft) => { + draft.timeoutConsumed = true + }) + }, + mutationFn: async ({ row }, { transaction }) => { + if (this.mode === `local-test`) { + this.requireCollection().utils.acceptMutations(transaction) + return + } + if (this.mode === `electric`) { + const txid = await this.persistTimeoutConsumed({ + ...row, + timeoutConsumed: true, + }) + if (txid === undefined) { + throw new WakeRegistrationStaleError() + } + await this.requireCollection().utils.awaitTxId(txid, 10_000) + return { txid } + } + throw new Error( + `WakeRegistry markTimeoutConsumedAction called before startup` + ) + }, + }) + setTimeoutCallback(cb: WakeTimeoutCallback, tenantId?: string): void { const resolvedTenantId = this.resolveTenantId(tenantId) this.timeoutCallbacks.set(resolvedTenantId, cb) this.syncTenantTimeoutTimers(resolvedTenantId) + void this.syncTenantCollectionTimeoutTimers(resolvedTenantId) } setDebounceCallback(cb: WakeDebounceCallback, tenantId?: string): void { @@ -148,11 +402,7 @@ export class WakeRegistry { throw new Error(`WakeRegistry tenantId is required in shared mode`) } - private cacheKey(tenantId: string, sourceUrl: string): string { - return `${tenantId}:${sourceUrl}` - } - - private registrationKey(reg: CachedWakeRegistration): string { + private registrationKey(reg: WakeRegistrationCollectionRow): string { return [ reg.tenantId, reg.subscriberUrl, @@ -178,197 +428,358 @@ export class WakeRegistry { } async startSync(electricUrl: string, electricSecret?: string): Promise { - if (this.syncReadyPromise) { - await this.syncReadyPromise + if (this.registrationsCollection) { + await this.registrationsCollection.preload() + this.startRegistrationEffect() return } - this.syncElectricUrl = electricUrl - this.syncElectricSecret = electricSecret - - const abortController = new AbortController() - const stream = new ShapeStream({ - url: electricUrlWithPath(electricUrl, `/v1/shape`).toString(), - params: { - table: `wake_registrations`, - ...(this.tenantId - ? { where: `tenant_id = ${sqlStringLiteral(this.tenantId)}` } - : {}), - ...(electricSecret ? { secret: electricSecret } : {}), - columns: [ - `id`, - `tenant_id`, - `subscriber_url`, - `source_url`, - `condition`, - `debounce_ms`, - `timeout_ms`, - `one_shot`, - `timeout_consumed`, - `include_response`, - `manifest_key`, - `created_at`, - ], - replica: `full`, - }, - parser: { - timestamptz: (value: string) => new Date(value), - }, - signal: abortController.signal, - onError: (error) => { - if (abortController.signal.aborted) { - return {} - } - if (this.syncReadyPromise) { - void this.recoverSync(error, `shape stream error`) - } - return {} - }, - }) - - this.syncAbortController = abortController - this.syncReadyPromise = new Promise((resolve, reject) => { - let settled = false - - this.syncUnsubscribe = stream.subscribe( - async (messages) => { - try { - for (const message of messages) { - await this.applyShapeMessage(message) - if ( - !settled && - isControlMessage(message) && - message.headers.control === `up-to-date` - ) { - settled = true - resolve() - } - } - } catch (error) { - if (!settled) { - settled = true - reject(error) - return - } - serverLog.error( - `[wake-registry] failed to apply shape change:`, - error - ) - } + this.mode = `electric` + this.registrationsCollection = createCollection( + electricCollectionOptions({ + id: `wake-registrations:${this.tenantId ?? `all`}:${electricUrlWithPath(electricUrl, `/v1/shape`).toString()}:${this.collectionInstance}`, + getKey: (row: any) => row.id as number, + shapeOptions: { + url: electricUrlWithPath(electricUrl, `/v1/shape`).toString(), + params: { + table: `wake_registrations`, + ...(this.tenantId + ? { where: `tenant_id = ${sqlStringLiteral(this.tenantId)}` } + : {}), + ...(electricSecret ? { secret: electricSecret } : {}), + columns: [ + `id`, + `tenant_id`, + `subscriber_url`, + `source_url`, + `condition`, + `debounce_ms`, + `timeout_ms`, + `one_shot`, + `timeout_consumed`, + `include_response`, + `manifest_key`, + `created_at`, + ], + replica: `full`, + }, + parser: { + timestamptz: (value: string) => new Date(value), + }, + columnMapper: snakeCamelMapper(), }, - (error) => { - if (!settled) { - settled = true - reject(error) - return - } - void this.recoverSync(error, `subscription error`) - } - ) - }) + } as any) + ) as any try { - await this.syncReadyPromise + await this.requireCollection().preload() } catch (error) { - await this.stopSync() + const collection = this.registrationsCollection + this.registrationsCollection = null + this.mode = `unstarted` + await collection?.cleanup?.() throw error } + this.startRegistrationEffect() } async stopSync(): Promise { - this.syncUnsubscribe?.() - this.syncUnsubscribe = null - this.syncAbortController?.abort() - this.syncAbortController = null - this.syncReadyPromise = null + const collection = this.registrationsCollection + await this.registrationsEffect?.dispose() + this.registrationsEffect = null + this.registrationsCollection = null + this.mode = `unstarted` + this.resetRuntimeState() + await collection?.cleanup?.() } - private async recoverSync( - error: unknown, - source: `shape stream error` | `subscription error` - ): Promise { - if (this.syncRecoveryPromise) { - return this.syncRecoveryPromise + private resetRuntimeState(): void { + for (const timer of this.debounceTimers.values()) { + clearTimeout(timer) } + this.debounceTimers.clear() + this.debounceBuffers.clear() + this.debounceRunStatus.clear() - const electricUrl = this.syncElectricUrl - if (!electricUrl) { - serverLog.error( - `[wake-registry] Electric sync failed (${source}):`, - error - ) - return + for (const timer of this.timeoutTimers.values()) { + clearTimeout(timer) } + this.timeoutTimers.clear() + this.timeoutDelivered.clear() + } - this.syncRecoveryPromise = (async () => { - serverLog.error( - `[wake-registry] Electric sync failed (${source}):`, - error - ) + private async allocateRegistrationId(): Promise { + if (this.mode !== `electric`) { + return this.allocateLocalId() + } - await this.stopSync() - await this.loadRegistrations() + for (let attempt = 0; attempt < 100; attempt++) { + const id = await this.allocateRuntimeId() + const existing = await this.rowsByPredicate((row) => row.id === id) + if (existing.length === 0) return id + } - try { - await this.startSync(electricUrl, this.syncElectricSecret) - serverLog.info(`[wake-registry] Electric sync recovered`) - } catch (recoveryError) { - serverLog.error( - `[wake-registry] Electric sync recovery failed:`, - recoveryError - ) - } finally { - this.syncRecoveryPromise = null - } - })() + throw new Error(`Failed to allocate unused wake registration id`) + } - return this.syncRecoveryPromise + private async allocateRuntimeId(): Promise { + const rows = await this.db.execute( + sql<{ + id: string + }>`select nextval('wake_registrations_id_seq')::text as id` + ) + const value = Array.isArray(rows) + ? rows[0]?.id + : ((rows as any).rows?.[0]?.id ?? (rows as any)[0]?.id) + const id = Number(value) + if (!Number.isInteger(id)) { + throw new Error(`Failed to allocate wake registration id`) + } + return id } - async register(reg: WakeRegistration): Promise { - const tenantId = this.resolveTenantId(reg.tenantId) - const result = await this.db - .insert(wakeRegistrations) - .values({ - tenantId, - subscriberUrl: reg.subscriberUrl, - sourceUrl: reg.sourceUrl, - condition: reg.condition, - debounceMs: reg.debounceMs ?? 0, - timeoutMs: reg.timeoutMs ?? 0, - oneShot: reg.oneShot, - includeResponse: reg.includeResponse !== false, - manifestKey: reg.manifestKey ?? null, - }) - .onConflictDoNothing() - .returning({ id: wakeRegistrations.id }) + private async persistInsert( + row: WakeRegistrationCollectionRow + ): Promise { + return await this.db.transaction(async (tx) => { + const rows = await tx + .insert(wakeRegistrations) + .values({ + id: row.id, + tenantId: row.tenantId, + subscriberUrl: row.subscriberUrl, + sourceUrl: row.sourceUrl, + condition: row.condition, + debounceMs: row.debounceMs, + timeoutMs: row.timeoutMs, + oneShot: row.oneShot, + timeoutConsumed: row.timeoutConsumed, + includeResponse: row.includeResponse, + manifestKey: row.manifestKey ?? ``, + createdAt: row.createdAt, + }) + .onConflictDoUpdate({ + target: [ + wakeRegistrations.tenantId, + wakeRegistrations.subscriberUrl, + wakeRegistrations.sourceUrl, + wakeRegistrations.oneShot, + wakeRegistrations.debounceMs, + wakeRegistrations.timeoutMs, + wakeRegistrations.condition, + wakeRegistrations.manifestKey, + ], + set: { + id: row.id, + timeoutConsumed: row.timeoutConsumed, + includeResponse: row.includeResponse, + createdAt: sql`${wakeRegistrations.createdAt}`, + }, + }) + .returning({ + id: wakeRegistrations.id, + tenantId: wakeRegistrations.tenantId, + subscriberUrl: wakeRegistrations.subscriberUrl, + sourceUrl: wakeRegistrations.sourceUrl, + condition: wakeRegistrations.condition, + debounceMs: wakeRegistrations.debounceMs, + timeoutMs: wakeRegistrations.timeoutMs, + oneShot: wakeRegistrations.oneShot, + timeoutConsumed: wakeRegistrations.timeoutConsumed, + includeResponse: wakeRegistrations.includeResponse, + manifestKey: wakeRegistrations.manifestKey, + createdAt: wakeRegistrations.createdAt, + txid: sql`pg_current_xact_id()::xid::text`, + }) + const inserted = rows[0] + if (!inserted) throw new Error(`Wake registration insert returned no row`) + return { + txid: Number(inserted.txid), + row: { + id: inserted.id, + tenantId: inserted.tenantId, + subscriberUrl: inserted.subscriberUrl, + sourceUrl: inserted.sourceUrl, + condition: inserted.condition as WakeRegistration[`condition`], + debounceMs: inserted.debounceMs, + timeoutMs: inserted.timeoutMs, + oneShot: inserted.oneShot, + timeoutConsumed: inserted.timeoutConsumed, + includeResponse: inserted.includeResponse, + manifestKey: inserted.manifestKey, + createdAt: inserted.createdAt, + }, + } + }) + } - if (result.length === 0) { - // Another path (e.g. manifest-sync) may have created the row first. - // Refresh the cache so this process still sees the effective registration. - await this.loadRegistrations() - return + private async persistTimeoutConsumed( + row: WakeRegistrationCollectionRow + ): Promise { + return await this.db.transaction(async (tx) => { + const rows = await tx + .update(wakeRegistrations) + .set({ timeoutConsumed: row.timeoutConsumed }) + .where( + and( + eq(wakeRegistrations.tenantId, row.tenantId), + eq(wakeRegistrations.id, row.id) + ) + ) + .returning({ txid: sql`pg_current_xact_id()::xid::text` }) + return rows[0]?.txid === undefined ? undefined : Number(rows[0].txid) + }) + } + + private deletePredicate(input: DeleteRowsInput) { + switch (input.persist.kind) { + case `manifestKey`: + return and( + eq(wakeRegistrations.tenantId, input.persist.tenantId), + eq(wakeRegistrations.subscriberUrl, input.persist.subscriberUrl), + eq(wakeRegistrations.manifestKey, input.persist.manifestKey) + ) + case `subscriber`: + return and( + eq(wakeRegistrations.tenantId, input.persist.tenantId), + eq(wakeRegistrations.subscriberUrl, input.persist.subscriberUrl) + ) + case `source`: + return and( + eq(wakeRegistrations.tenantId, input.persist.tenantId), + eq(wakeRegistrations.sourceUrl, input.persist.sourceUrl) + ) + case `subscriberAndSource`: + return and( + eq(wakeRegistrations.tenantId, input.persist.tenantId), + eq(wakeRegistrations.subscriberUrl, input.persist.subscriberUrl), + eq(wakeRegistrations.sourceUrl, input.persist.sourceUrl) + ) + case `oneShot`: + return input.rows.length === 0 + ? undefined + : inArray( + wakeRegistrations.id, + input.rows.map((row) => row.id) + ) } + } - const dbId = result[0]!.id - this.upsertCachedRegistration({ - ...reg, - tenantId, - dbId, - createdAt: new Date(), - timeoutConsumed: false, + private async persistDeleteRows( + input: DeleteRowsInput + ): Promise { + const predicate = this.deletePredicate(input) + if (!predicate) return undefined + return await this.db.transaction(async (tx) => { + const deleted = await tx + .delete(wakeRegistrations) + .where(predicate) + .returning({ txid: sql`pg_current_xact_id()::xid::text` }) + return deleted[0]?.txid === undefined + ? undefined + : Number(deleted[0].txid) }) } - private startTimeoutTimer(reg: CachedWakeRegistration, dbId: number): void { - if (reg.timeoutMs == null || reg.timeoutMs <= 0) return - this.startTimeoutTimerWithDuration(reg, dbId, reg.timeoutMs) + private registrationRowsMatch( + row: WakeRegistrationCollectionRow, + other: WakeRegistrationCollectionRow + ): boolean { + return ( + row.tenantId === other.tenantId && + row.subscriberUrl === other.subscriberUrl && + row.sourceUrl === other.sourceUrl && + JSON.stringify(row.condition) === JSON.stringify(other.condition) && + row.debounceMs === other.debounceMs && + row.timeoutMs === other.timeoutMs && + row.oneShot === other.oneShot && + row.manifestKey === other.manifestKey + ) + } + + private registrationMatches( + row: WakeRegistrationCollectionRow, + reg: WakeRegistration, + tenantId: string + ): boolean { + return this.registrationRowsMatch( + row, + this.normalizeRegistration(reg, tenantId, row.id) + ) + } + + private async waitForRegistrationVisible( + row: WakeRegistrationCollectionRow, + timeoutMs = 10_000 + ): Promise { + const deadline = Date.now() + timeoutMs + do { + const rows = await this.rowsByPredicate((candidate) => + this.registrationRowsMatch(candidate, row) + ) + if (rows.length > 0) return + await new Promise((resolve) => setTimeout(resolve, 25)) + } while (Date.now() < deadline) + } + + async register(reg: WakeRegistration): Promise { + const tenantId = this.resolveTenantId(reg.tenantId) + if (this.registrationsCollection) { + const existing = await this.rowsByPredicate((row) => + this.registrationMatches(row, reg, tenantId) + ) + if (existing.length > 0) return + } + const id = await this.allocateRegistrationId() + const tx = this.registerAction( + this.normalizeRegistration(reg, tenantId, id) + ) + try { + await tx.isPersisted.promise + } catch (error) { + if (error instanceof WakeRegistrationConflictError) { + await this.waitForRegistrationVisible(error.row) + return + } + throw error + } + } + + private startTimeoutTimer(reg: WakeRegistrationCollectionRow): void { + if (reg.timeoutMs <= 0) return + this.startTimeoutTimerWithDuration(reg, reg.timeoutMs) } private async markTimeoutConsumed( dbId: number, tenantId: string ): Promise { + if (this.registrationsCollection) { + const row = await queryOnce((q) => + q + .from({ reg: this.requireCollection() }) + .where(({ reg }) => + dbAnd(dbEq(reg.tenantId, tenantId), dbEq(reg.id, dbId)) + ) + .findOne() + ) + const queried = row as + | { reg?: WakeRegistrationCollectionRow } + | WakeRegistrationCollectionRow + | undefined + const normalized = ( + queried && `reg` in queried ? queried.reg : queried + ) as WakeRegistrationCollectionRow | undefined + if (!normalized) return + const tx = this.markTimeoutConsumedAction({ row: normalized }) + try { + await tx.isPersisted.promise + } catch (error) { + if (error instanceof WakeRegistrationStaleError) return + throw error + } + return + } await this.db .update(wakeRegistrations) .set({ timeoutConsumed: true }) @@ -380,11 +791,42 @@ export class WakeRegistry { ) } + private async deleteRows(input: DeleteRowsInput): Promise { + if (this.registrationsCollection && input.rows.length > 0) { + const tx = this.deleteRowsAction(input) + await tx.isPersisted.promise + return + } + if (this.mode === `electric`) { + await this.persistDeleteRows(input) + } + } + async unregisterByManifestKey( subscriberUrl: string, manifestKey: string, tenantId?: string ): Promise { + if (this.registrationsCollection) { + const resolvedTenantId = this.resolveTenantId(tenantId) + const rows = await this.rowsByPredicate( + (row) => + row.tenantId === resolvedTenantId && + row.subscriberUrl === subscriberUrl && + row.manifestKey === manifestKey + ) + await this.deleteRows({ + rows, + persist: { + kind: `manifestKey`, + tenantId: resolvedTenantId, + subscriberUrl, + manifestKey, + }, + }) + return + } + const resolvedTenantId = this.resolveTenantId(tenantId) await this.db .delete(wakeRegistrations) @@ -395,28 +837,30 @@ export class WakeRegistry { eq(wakeRegistrations.manifestKey, manifestKey) ) ) - - const toRemove = Array.from(this.registrationCache.values()).flatMap( - (regs) => - regs - .filter( - (r) => - r.tenantId === resolvedTenantId && - r.subscriberUrl === subscriberUrl && - r.manifestKey === manifestKey - ) - .map((r) => r.dbId) - ) - - for (const dbId of toRemove) { - this.removeCachedRegistrationByDbId(dbId) - } } async unregisterBySubscriber( subscriberUrl: string, tenantId?: string ): Promise { + if (this.registrationsCollection) { + const resolvedTenantId = this.resolveTenantId(tenantId) + const rows = await this.rowsByPredicate( + (row) => + row.tenantId === resolvedTenantId && + row.subscriberUrl === subscriberUrl + ) + await this.deleteRows({ + rows, + persist: { + kind: `subscriber`, + tenantId: resolvedTenantId, + subscriberUrl, + }, + }) + return + } + const resolvedTenantId = this.resolveTenantId(tenantId) await this.db .delete(wakeRegistrations) @@ -426,26 +870,22 @@ export class WakeRegistry { eq(wakeRegistrations.subscriberUrl, subscriberUrl) ) ) - - const toRemove = Array.from(this.registrationCache.values()).flatMap( - (regs) => - regs - .filter( - (r) => - r.tenantId === resolvedTenantId && - r.subscriberUrl === subscriberUrl - ) - .map((r) => r.dbId) - ) - for (const dbId of toRemove) { - this.removeCachedRegistrationByDbId(dbId) - } } async unregisterBySource( sourceUrl: string, tenantId?: string ): Promise { + if (this.registrationsCollection) { + const resolvedTenantId = this.resolveTenantId(tenantId) + const rows = await this.rowsForSource(resolvedTenantId, sourceUrl) + await this.deleteRows({ + rows, + persist: { kind: `source`, tenantId: resolvedTenantId, sourceUrl }, + }) + return + } + const resolvedTenantId = this.resolveTenantId(tenantId) await this.db .delete(wakeRegistrations) @@ -455,15 +895,6 @@ export class WakeRegistry { eq(wakeRegistrations.sourceUrl, sourceUrl) ) ) - - const key = this.cacheKey(resolvedTenantId, sourceUrl) - const regs = this.registrationCache.get(key) - if (regs) { - for (const reg of [...regs]) { - this.removeCachedRegistrationByDbId(reg.dbId) - } - this.registrationCache.delete(key) - } } async unregisterBySubscriberAndSource( @@ -471,6 +902,26 @@ export class WakeRegistry { sourceUrl: string, tenantId?: string ): Promise { + if (this.registrationsCollection) { + const resolvedTenantId = this.resolveTenantId(tenantId) + const rows = await this.rowsByPredicate( + (row) => + row.tenantId === resolvedTenantId && + row.subscriberUrl === subscriberUrl && + row.sourceUrl === sourceUrl + ) + await this.deleteRows({ + rows, + persist: { + kind: `subscriberAndSource`, + tenantId: resolvedTenantId, + subscriberUrl, + sourceUrl, + }, + }) + return + } + const resolvedTenantId = this.resolveTenantId(tenantId) await this.db .delete(wakeRegistrations) @@ -481,62 +932,16 @@ export class WakeRegistry { eq(wakeRegistrations.sourceUrl, sourceUrl) ) ) - - const regs = this.registrationCache.get( - this.cacheKey(resolvedTenantId, sourceUrl) - ) - if (regs) { - const toRemove = regs - .filter( - (r) => - r.tenantId === resolvedTenantId && r.subscriberUrl === subscriberUrl - ) - .map((r) => r.dbId) - for (const dbId of toRemove) { - this.removeCachedRegistrationByDbId(dbId) - } - } - } - - async loadRegistrations(): Promise { - const rows = - this.tenantId === null - ? await this.db.select().from(wakeRegistrations) - : await this.db - .select() - .from(wakeRegistrations) - .where(eq(wakeRegistrations.tenantId, this.tenantId)) - - this.resetCachedRegistrations() - - for (const row of rows) { - const reg: CachedWakeRegistration = { - tenantId: row.tenantId, - subscriberUrl: row.subscriberUrl, - sourceUrl: row.sourceUrl, - condition: row.condition as WakeRegistration[`condition`], - debounceMs: row.debounceMs || undefined, - timeoutMs: row.timeoutMs || undefined, - oneShot: row.oneShot, - includeResponse: row.includeResponse === false ? false : undefined, - manifestKey: row.manifestKey ?? undefined, - dbId: row.id, - createdAt: row.createdAt, - timeoutConsumed: row.timeoutConsumed, - } - this.upsertCachedRegistration(reg) - } } private startTimeoutTimerWithDuration( - reg: CachedWakeRegistration, - dbId: number, + reg: WakeRegistrationCollectionRow, durationMs: number ): void { const timerKey = this.registrationKey(reg) const timer = setTimeout(() => { this.timeoutTimers.delete(timerKey) - this.deliverTimeoutForRegistration(reg, dbId) + this.deliverTimeoutForRegistration(reg) }, durationMs) this.timeoutTimers.set(timerKey, timer) } @@ -559,88 +964,17 @@ export class WakeRegistry { } } - private clearRegistrationState(reg: CachedWakeRegistration): void { + private clearRegistrationState(reg: WakeRegistrationCollectionRow): void { const timerKey = this.registrationKey(reg) this.clearDebounceState(timerKey) this.clearTimeoutState(timerKey) } - private resetCachedRegistrations(): void { - for (const timer of this.debounceTimers.values()) { - clearTimeout(timer) - } - this.debounceTimers.clear() - this.debounceBuffers.clear() - this.debounceRunStatus.clear() - - for (const timer of this.timeoutTimers.values()) { - clearTimeout(timer) - } - this.timeoutTimers.clear() - this.registrationCache.clear() - } - - private findCachedRegistration( - dbId: number - ): { cacheKey: string; index: number; reg: CachedWakeRegistration } | null { - for (const [cacheKey, regs] of this.registrationCache) { - const index = regs.findIndex((reg) => reg.dbId === dbId) - if (index >= 0) { - return { - cacheKey, - index, - reg: regs[index]!, - } - } - } - - return null - } - - private upsertCachedRegistration(reg: CachedWakeRegistration): void { - const existing = this.findCachedRegistration(reg.dbId) - const nextKey = this.registrationKey(reg) - - if (existing) { - const previousKey = this.registrationKey(existing.reg) - const regs = this.registrationCache.get(existing.cacheKey) - if (regs) { - regs.splice(existing.index, 1) - if (regs.length === 0) { - this.registrationCache.delete(existing.cacheKey) - } - } - if (previousKey !== nextKey) { - this.clearRegistrationState(existing.reg) - } - } - - const cacheKey = this.cacheKey(reg.tenantId, reg.sourceUrl) - const cached = this.registrationCache.get(cacheKey) ?? [] - cached.push(reg) - this.registrationCache.set(cacheKey, cached) - this.syncTimeoutTimer(reg) - } - - private removeCachedRegistrationByDbId(dbId: number): void { - const existing = this.findCachedRegistration(dbId) - if (!existing) return - - this.clearRegistrationState(existing.reg) - this.timeoutDelivered.delete(dbId) - - const regs = this.registrationCache.get(existing.cacheKey) - if (!regs) return - regs.splice(existing.index, 1) - if (regs.length === 0) { - this.registrationCache.delete(existing.cacheKey) - } - } - - private syncTimeoutTimer(reg: CachedWakeRegistration): void { + private syncTimeoutTimer(registration: WakeRegistrationCollectionRow): void { + const reg = registration const timerKey = this.registrationKey(reg) - if (reg.timeoutConsumed || reg.timeoutMs == null || reg.timeoutMs <= 0) { + if (reg.timeoutConsumed || reg.timeoutMs <= 0) { this.clearTimeoutState(timerKey) return } @@ -649,52 +983,60 @@ export class WakeRegistry { return } - if (!reg.createdAt) { - this.startTimeoutTimer(reg, reg.dbId) - return - } - const remaining = reg.createdAt.getTime() + reg.timeoutMs - Date.now() if (remaining > 0) { - this.startTimeoutTimerWithDuration(reg, reg.dbId, remaining) + this.startTimeoutTimerWithDuration(reg, remaining) return } - if (this.timeoutDelivered.has(reg.dbId)) { + if (this.timeoutDelivered.has(reg.id)) { return } - this.deliverTimeoutForRegistration(reg, reg.dbId) + this.deliverTimeoutForRegistration(reg) } private deliverTimeoutForRegistration( - reg: CachedWakeRegistration, - dbId: number + reg: WakeRegistrationCollectionRow ): void { - if (this.deliverTimeout(this.timeoutWakeResult(reg, dbId))) { - this.timeoutDelivered.add(dbId) - void this.markTimeoutConsumed(dbId, reg.tenantId) + if (this.deliverTimeout(this.timeoutWakeResult(reg))) { + this.timeoutDelivered.add(reg.id) + void this.markTimeoutConsumed(reg.id, reg.tenantId).catch((error) => { + serverLog.warn( + `[wake-registry] failed to mark timeout consumed for registration ${reg.id} (${reg.tenantId}, ${reg.sourceUrl} -> ${reg.subscriberUrl}):`, + error + ) + }) } } private syncTenantTimeoutTimers(tenantId: string): void { - for (const regs of this.registrationCache.values()) { - for (const reg of regs) { - if (reg.tenantId === tenantId) { - this.syncTimeoutTimer(reg) - } - } + void this.syncTenantCollectionTimeoutTimers(tenantId) + } + + private async syncTenantCollectionTimeoutTimers( + tenantId: string + ): Promise { + if (!this.registrationsCollection) return + const rows = await queryOnce((q) => + q + .from({ reg: this.requireCollection() }) + .where(({ reg }) => dbEq(reg.tenantId, tenantId)) + ) + for (const queriedRow of rows) { + const row = ((queriedRow as { reg?: WakeRegistrationCollectionRow }) + .reg ?? queriedRow) as WakeRegistrationCollectionRow + this.syncTimeoutTimer(row) } } private timeoutWakeResult( - reg: CachedWakeRegistration, - dbId: number + reg: WakeRegistrationCollectionRow ): WakeEvalResult { return { tenantId: reg.tenantId, subscriberUrl: reg.subscriberUrl, - registrationDbId: dbId, + registrationDbId: reg.id, sourceEventKey: `timeout`, wakeMessage: { source: reg.sourceUrl, @@ -704,79 +1046,33 @@ export class WakeRegistry { } } - private normalizeShapeRow( - row: WakeRegistrationShapeRow - ): CachedWakeRegistration { - return { - tenantId: - (row as { tenant_id?: string }).tenant_id ?? this.resolveTenantId(), - subscriberUrl: row.subscriber_url, - sourceUrl: row.source_url, - condition: row.condition, - debounceMs: row.debounce_ms || undefined, - timeoutMs: row.timeout_ms || undefined, - oneShot: row.one_shot, - includeResponse: row.include_response === false ? false : undefined, - manifestKey: row.manifest_key ?? undefined, - dbId: row.id, - createdAt: row.created_at, - timeoutConsumed: row.timeout_consumed, - } - } - - private async applyShapeMessage( - message: Message - ): Promise { - if (isControlMessage(message)) { - if (message.headers.control === `must-refetch`) { - this.resetCachedRegistrations() - } - return - } - - if (!isChangeMessage(message)) { - return - } - - if (message.headers.operation === `delete`) { - // Shape keys are protocol-level identifiers and are not guaranteed to be - // the table primary key. The wake_registrations shape uses - // `replica: full`, so deletes should carry the deleted row in old_value; - // use that row id to remove the matching in-memory registration. If the - // id is unavailable, reset the cache so we fail closed rather than - // keeping a stale wake registration alive. - const oldValue = ( - message as unknown as { - old_value?: { id?: unknown } - } - ).old_value - const oldId = Number(oldValue?.id) - if (Number.isFinite(oldId)) { - this.removeCachedRegistrationByDbId(oldId) - } else { - this.resetCachedRegistrations() - } - return - } - - this.upsertCachedRegistration(this.normalizeShapeRow(message.value)) - } - - evaluate( + async evaluate( sourceUrl: string, event: Record, tenantId?: string - ): Array { + ): Promise> { const resolvedTenantId = this.resolveTenantId(tenantId) - const cacheKey = this.cacheKey(resolvedTenantId, sourceUrl) - const regs = this.registrationCache.get(cacheKey) - if (!regs || regs.length === 0) return [] + const queriedRegs = await queryOnce((q) => + q + .from({ reg: this.requireCollection() }) + .where(({ reg }) => + dbAnd( + dbEq(reg.tenantId, resolvedTenantId), + dbEq(reg.sourceUrl, sourceUrl) + ) + ) + ) + const regs = queriedRegs.map( + (queriedReg) => + ((queriedReg as { reg?: WakeRegistrationCollectionRow }).reg ?? + queriedReg) as WakeRegistrationCollectionRow + ) + if (regs.length === 0) return [] const results: Array = [] - const toRemove: Array = [] + const oneShotRows: Array = [] - for (let i = 0; i < regs.length; i++) { - const reg = regs[i]! + for (const reg of regs) { const match = this.matchCondition(reg, event) if (!match) continue @@ -785,15 +1081,19 @@ export class WakeRegistry { if (timeoutTimer) { clearTimeout(timeoutTimer) this.timeoutTimers.delete(timerKey) - void this.markTimeoutConsumed(reg.dbId, reg.tenantId) + void this.markTimeoutConsumed(reg.id, reg.tenantId).catch((error) => { + console.warn( + `[wake-registry] failed to persist timeout consumption:`, + error + ) + }) } - if (reg.debounceMs != null && reg.debounceMs > 0) { + if (reg.debounceMs > 0) { const buffer = this.debounceBuffers.get(timerKey) ?? [] buffer.push(match.change) this.debounceBuffers.set(timerKey, buffer) - // Preserve the latest runFinished status for debounced delivery if (match.runFinishedStatus) { this.debounceRunStatus.set(timerKey, match.runFinishedStatus) } @@ -811,7 +1111,7 @@ export class WakeRegistry { this.deliverDebounce({ tenantId: reg.tenantId, subscriberUrl: reg.subscriberUrl, - registrationDbId: reg.dbId, + registrationDbId: reg.id, sourceEventKey: flushed[flushed.length - 1]!.key, wakeMessage: { source: sourceUrl, @@ -828,7 +1128,7 @@ export class WakeRegistry { results.push({ tenantId: reg.tenantId, subscriberUrl: reg.subscriberUrl, - registrationDbId: reg.dbId, + registrationDbId: reg.id, sourceEventKey: wakeSourceEventId(event), wakeMessage: { source: sourceUrl, @@ -841,27 +1141,21 @@ export class WakeRegistry { } if (reg.oneShot) { - toRemove.push(i) + oneShotRows.push(reg) } } - for (let j = toRemove.length - 1; j >= 0; j--) { - const removed = regs.splice(toRemove[j]!, 1) - if (removed[0]) { - this.clearRegistrationState(removed[0]) - this.timeoutDelivered.delete(removed[0].dbId) - void this.db - .delete(wakeRegistrations) - .where( - and( - eq(wakeRegistrations.tenantId, removed[0].tenantId), - eq(wakeRegistrations.id, removed[0].dbId) - ) - ) - } - } - if (regs.length === 0) { - this.registrationCache.delete(cacheKey) + if (oneShotRows.length > 0) { + const tx = this.deleteRowsAction({ + rows: oneShotRows, + persist: { kind: `oneShot` }, + }) + void tx.isPersisted.promise.catch((error) => { + console.warn( + `[wake-registry] failed to persist one-shot cleanup:`, + error + ) + }) } return results @@ -906,7 +1200,7 @@ export class WakeRegistry { } private matchCondition( - reg: WakeRegistration, + reg: WakeRegistration | WakeRegistrationCollectionRow, event: Record ): { change: WakeEvalResult[`wakeMessage`][`changes`][number] diff --git a/packages/agents-server/test/horton-pull-wake-e2e.test.ts b/packages/agents-server/test/horton-pull-wake-e2e.test.ts index 17bfd5ea79..66640a02ae 100644 --- a/packages/agents-server/test/horton-pull-wake-e2e.test.ts +++ b/packages/agents-server/test/horton-pull-wake-e2e.test.ts @@ -10,6 +10,7 @@ import { waitFor, } from './test-utils' import { + TEST_ELECTRIC_URL, TEST_POSTGRES_URL, resetElectricAgentsTestBackend, } from './test-backend' @@ -266,7 +267,7 @@ describe(`pull-wake Horton e2e with mocked LLM`, () => { durableStreamsUrl: durableStreamTestServerUrl(dsServer.url), port: 0, postgresUrl: TEST_POSTGRES_URL, - electricUrl: undefined, + electricUrl: TEST_ELECTRIC_URL, authenticateRequest: (req) => req.headers.get(`authorization`) === authHeaders.authorization ? testPrincipal @@ -276,6 +277,7 @@ describe(`pull-wake Horton e2e with mocked LLM`, () => { streamBaseUrl = electricAgentsServer.streamClient.baseUrl builtinAgentsServer = new BuiltinAgentsServer({ agentServerUrl: baseUrl, + durableStreamsFetchCache: false, mockStreamFn, pullWake: { runnerId, diff --git a/packages/agents-server/test/host.test.ts b/packages/agents-server/test/host.test.ts index 3ac40d548a..903b62a90f 100644 --- a/packages/agents-server/test/host.test.ts +++ b/packages/agents-server/test/host.test.ts @@ -86,8 +86,11 @@ describe(`AgentsHost`, () => { const host = new AgentsHost({ db: createMockDb(), pgClient: vi.fn() as any, + electricUrl: `http://electric.test`, }) + vi.spyOn(host.wakeRegistry, `startSync`).mockResolvedValue(undefined) + const runtime = await host.registerTenant({ serviceId: `svc-before-start`, durableStreamsUrl: `https://streams.test/v1/streams/svc-before-start`, @@ -98,6 +101,8 @@ describe(`AgentsHost`, () => { const loadTenantBridges = vi .spyOn(host.entityProjector, `loadTenantBridges`) .mockResolvedValue(undefined) + vi.spyOn(host.entityProjector, `start`).mockResolvedValue(undefined) + vi.spyOn(host.entityProjector, `stop`).mockResolvedValue(undefined) vi.spyOn(host.scheduler, `start`).mockResolvedValue(undefined) vi.spyOn(host.scheduler, `stop`).mockResolvedValue(undefined) vi.spyOn(host.tagStreamOutboxDrainer, `start`).mockImplementation(() => {}) diff --git a/packages/agents-server/test/pg-sync-wake-delivery.test.ts b/packages/agents-server/test/pg-sync-wake-delivery.test.ts index 5a0b70d60a..7a0559abd1 100644 --- a/packages/agents-server/test/pg-sync-wake-delivery.test.ts +++ b/packages/agents-server/test/pg-sync-wake-delivery.test.ts @@ -31,6 +31,7 @@ function event(operation: `insert` | `update` | `delete`, key = operation) { describe(`pgSync wake delivery matching`, () => { it(`insert wakes insert subscriber and delete does not wake insert-only subscriber`, async () => { const registry = new WakeRegistry(createDb() as any, `default`) + await registry.startLocalForTests() await registry.register({ subscriberUrl: `/horton/a`, sourceUrl: `/_electric/pg-sync/test`, @@ -38,7 +39,7 @@ describe(`pgSync wake delivery matching`, () => { oneShot: false, }) - const insertResults = registry.evaluate( + const insertResults = await registry.evaluate( `/_electric/pg-sync/test`, event(`insert`), `default` @@ -49,12 +50,17 @@ describe(`pgSync wake delivery matching`, () => { oldValue: { id: `entity-1`, status: `spawning` }, }) expect( - registry.evaluate(`/_electric/pg-sync/test`, event(`delete`), `default`) + await registry.evaluate( + `/_electric/pg-sync/test`, + event(`delete`), + `default` + ) ).toEqual([]) }) it(`splits two subscribers on the same source by operation`, async () => { const registry = new WakeRegistry(createDb() as any, `default`) + await registry.startLocalForTests() await registry.register({ subscriberUrl: `/horton/a`, sourceUrl: `/_electric/pg-sync/test`, @@ -69,19 +75,28 @@ describe(`pgSync wake delivery matching`, () => { }) expect( - registry - .evaluate(`/_electric/pg-sync/test`, event(`insert`), `default`) - .map((r) => r.subscriberUrl) + ( + await registry.evaluate( + `/_electric/pg-sync/test`, + event(`insert`), + `default` + ) + ).map((r) => r.subscriberUrl) ).toEqual([`/horton/a`]) expect( - registry - .evaluate(`/_electric/pg-sync/test`, event(`delete`), `default`) - .map((r) => r.subscriberUrl) + ( + await registry.evaluate( + `/_electric/pg-sync/test`, + event(`delete`), + `default` + ) + ).map((r) => r.subscriberUrl) ).toEqual([`/horton/b`]) }) it(`filters pgSync events by collection`, async () => { const registry = new WakeRegistry(createDb() as any, `default`) + await registry.startLocalForTests() await registry.register({ subscriberUrl: `/horton/a`, sourceUrl: `/_electric/pg-sync/test`, @@ -90,10 +105,14 @@ describe(`pgSync wake delivery matching`, () => { }) expect( - registry.evaluate(`/_electric/pg-sync/test`, event(`insert`), `default`) + await registry.evaluate( + `/_electric/pg-sync/test`, + event(`insert`), + `default` + ) ).toHaveLength(1) expect( - registry.evaluate( + await registry.evaluate( `/_electric/pg-sync/test`, { ...event(`insert`), type: `other` }, `default` diff --git a/packages/agents-server/test/scheduler-integration.test.ts b/packages/agents-server/test/scheduler-integration.test.ts index af5ba79fbb..f1f4be9e38 100644 --- a/packages/agents-server/test/scheduler-integration.test.ts +++ b/packages/agents-server/test/scheduler-integration.test.ts @@ -1,6 +1,10 @@ +import { execFile } from 'node:child_process' +import { promisify } from 'node:util' +import postgres from 'postgres' import { afterAll, beforeAll, describe, expect, it } from 'vitest' import { getCronStreamPath } from '@electric-ax/agents-runtime' import { DurableStreamTestServer } from '@durable-streams/server' +import { runMigrations } from '../src/db' import { ElectricAgentsServer } from '../src/server' import { durableStreamTestServerUrl, @@ -8,10 +12,81 @@ import { waitFor, } from './test-utils' import { - TEST_ELECTRIC_URL, - TEST_POSTGRES_URL, - resetElectricAgentsTestBackend, -} from './test-backend' + ELECTRIC_AGENTS_COMPOSE_FILE, + getElectricAgentsComposeProject, + getElectricAgentsDevPorts, +} from './electric-agents-compose-utils' + +const execFileAsync = promisify(execFile) +const { postgresPort, electricPort } = getElectricAgentsDevPorts() +const TEST_POSTGRES_PORT = postgresPort + 20 +const TEST_ELECTRIC_PORT = electricPort + 20 +const TEST_COMPOSE_PROJECT = `${getElectricAgentsComposeProject()}-agent-server-scheduler` +const TEST_POSTGRES_URL = `postgres://electric_agents:electric_agents@localhost:${TEST_POSTGRES_PORT}/electric_agents` +const TEST_ELECTRIC_URL = `http://localhost:${TEST_ELECTRIC_PORT}` +const TEST_BACKEND_ENV = { + ...process.env, + ELECTRIC_AGENTS_COMPOSE_PROJECT: TEST_COMPOSE_PROJECT, + PG_HOST_PORT: String(TEST_POSTGRES_PORT), + ELECTRIC_HOST_PORT: String(TEST_ELECTRIC_PORT), + JAEGER_UI_PORT: `0`, + JAEGER_OTLP_HTTP_PORT: `0`, + JAEGER_OTLP_GRPC_PORT: `0`, + DATABASE_URL: TEST_POSTGRES_URL, + ELECTRIC_URL: TEST_ELECTRIC_URL, + ELECTRIC_AGENTS_TEST_BACKEND_MANAGED: `1`, +} + +async function startTestBackend(): Promise { + await execFileAsync( + `docker`, + [ + `compose`, + `-p`, + TEST_COMPOSE_PROJECT, + `-f`, + ELECTRIC_AGENTS_COMPOSE_FILE, + `up`, + `-d`, + `--wait`, + ], + { env: TEST_BACKEND_ENV } + ) +} + +async function stopTestBackend(): Promise { + await execFileAsync( + `docker`, + [ + `compose`, + `-p`, + TEST_COMPOSE_PROJECT, + `-f`, + ELECTRIC_AGENTS_COMPOSE_FILE, + `down`, + `-v`, + ], + { env: TEST_BACKEND_ENV } + ) +} + +async function resetTestBackend(): Promise { + await startTestBackend() + const pg = postgres(TEST_POSTGRES_URL, { + max: 1, + onnotice: () => {}, + }) + try { + await pg.unsafe(` + DROP SCHEMA IF EXISTS drizzle CASCADE; + DROP SCHEMA IF EXISTS public CASCADE; + CREATE SCHEMA public AUTHORIZATION CURRENT_USER; + `) + } finally { + await pg.end() + } + await runMigrations(TEST_POSTGRES_URL) +} describe(`Scheduler Integration`, () => { let dsServer: DurableStreamTestServer @@ -90,12 +165,16 @@ describe(`Scheduler Integration`, () => { longPollTimeout: 500, webhooks: true, }) - await Promise.all([resetElectricAgentsTestBackend(), dsServer.start()]) + await Promise.all([resetTestBackend(), dsServer.start()]) await startElectricAgentsServer() }, 120_000) afterAll(async () => { - await Promise.allSettled([stopElectricAgentsServer(), dsServer.stop()]) + await Promise.allSettled([ + stopElectricAgentsServer(), + dsServer.stop(), + stopTestBackend(), + ]) }, 120_000) it(`delayed_send survives server restart and lands exactly once`, async () => { @@ -289,7 +368,7 @@ describe(`Scheduler Integration`, () => { }), } ) - expect(scheduleRes.status).toBe(200) + expect(scheduleRes.status, await scheduleRes.clone().text()).toBe(200) await waitFor( async () => { diff --git a/packages/agents-server/test/server-start.test.ts b/packages/agents-server/test/server-start.test.ts index 20532f2819..067969bb55 100644 --- a/packages/agents-server/test/server-start.test.ts +++ b/packages/agents-server/test/server-start.test.ts @@ -147,9 +147,6 @@ vi.mock(`../src/wake-registry`, () => ({ stopSync(): Promise { return Promise.resolve() } - loadRegistrations(): Promise { - return Promise.resolve() - } }, })) @@ -279,6 +276,19 @@ describe(`ElectricAgentsServer.start`, () => { } }) + it(`fails host startup without Electric URL for wake registry sync`, async () => { + server = new ElectricAgentsServer({ + durableStreamsUrl: `http://durable.test`, + port: 0, + postgresUrl: TEST_POSTGRES_URL, + electricUrl: undefined, + }) + + await expect(server.start()).rejects.toThrow( + `WakeRegistry runtime requires an Electric URL` + ) + }) + it(`rejects startup and cleans up when scheduler startup fails`, async () => { schedulerStartMock.mockRejectedValueOnce(new Error(`scheduler exploded`)) @@ -286,6 +296,7 @@ describe(`ElectricAgentsServer.start`, () => { durableStreamsUrl: `http://durable.test`, port: 0, postgresUrl: TEST_POSTGRES_URL, + electricUrl: `http://electric.test`, }) await expect(server.start()).rejects.toThrow(`scheduler exploded`) @@ -313,6 +324,7 @@ describe(`ElectricAgentsServer.start`, () => { durableStreamsUrl: `http://durable.test`, port: 0, postgresUrl: `postgres://electric_agents:electric_agents@localhost:5432/electric_agents`, + electricUrl: `http://electric.test`, }) await expect(server.start()).resolves.toMatch(/^http:\/\//) @@ -354,6 +366,7 @@ describe(`ElectricAgentsServer.start`, () => { durableStreamsUrl: `http://durable.test`, port: 0, postgresUrl: `postgres://electric_agents:electric_agents@localhost:5432/electric_agents`, + electricUrl: `http://electric.test`, }) await expect(server.start()).resolves.toMatch(/^http:\/\//) @@ -376,6 +389,7 @@ describe(`ElectricAgentsServer.start`, () => { mockStreamFn: streamFn as any, port: 0, postgresUrl: TEST_POSTGRES_URL, + electricUrl: `http://electric.test`, }) await expect(server.start()).resolves.toMatch(/^http:\/\//) @@ -399,6 +413,7 @@ describe(`ElectricAgentsServer.start`, () => { mockStreamFn: streamFn as any, port: 0, postgresUrl: TEST_POSTGRES_URL, + electricUrl: `http://electric.test`, }) await expect(server.start()).resolves.toMatch(/^http:\/\//) diff --git a/packages/agents-server/test/wake-registry-sync.test.ts b/packages/agents-server/test/wake-registry-sync.test.ts index 1e60ac71a5..79dd27fa58 100644 --- a/packages/agents-server/test/wake-registry-sync.test.ts +++ b/packages/agents-server/test/wake-registry-sync.test.ts @@ -1,155 +1,141 @@ -import { describe, expect, it, vi } from 'vitest' +import { execFile } from 'node:child_process' +import { randomUUID } from 'node:crypto' +import { promisify } from 'node:util' +import postgres from 'postgres' +import { afterAll, beforeAll, describe, expect, it } from 'vitest' +import { eq } from 'drizzle-orm' +import { createDb, runMigrations } from '../src/db' +import { wakeRegistrations } from '../src/db/schema' import { WakeRegistry } from '../src/wake-registry' +import { + ELECTRIC_AGENTS_COMPOSE_FILE, + getElectricAgentsComposeProject, + getElectricAgentsDevPorts, +} from './electric-agents-compose-utils' + +const execFileAsync = promisify(execFile) +const { postgresPort, electricPort } = getElectricAgentsDevPorts() +const TEST_POSTGRES_PORT = postgresPort + 40 +const TEST_ELECTRIC_PORT = electricPort + 40 +const TEST_COMPOSE_PROJECT = `${getElectricAgentsComposeProject()}-wake-registry-sync` +const TEST_POSTGRES_URL = `postgres://electric_agents:electric_agents@localhost:${TEST_POSTGRES_PORT}/electric_agents` +const TEST_ELECTRIC_URL = `http://localhost:${TEST_ELECTRIC_PORT}` +const TEST_BACKEND_ENV = { + ...process.env, + ELECTRIC_AGENTS_COMPOSE_PROJECT: TEST_COMPOSE_PROJECT, + PG_HOST_PORT: String(TEST_POSTGRES_PORT), + ELECTRIC_HOST_PORT: String(TEST_ELECTRIC_PORT), + JAEGER_UI_PORT: `0`, + JAEGER_OTLP_HTTP_PORT: `0`, + JAEGER_OTLP_GRPC_PORT: `0`, + DATABASE_URL: TEST_POSTGRES_URL, + ELECTRIC_URL: TEST_ELECTRIC_URL, + ELECTRIC_AGENTS_TEST_BACKEND_MANAGED: `1`, +} -const { shapeStreamState } = vi.hoisted(() => ({ - shapeStreamState: { - latest: null as null | { - emit: (messages: Array>) => Promise - signal?: AbortSignal - }, - }, -})) - -vi.mock(`@electric-sql/client`, () => ({ - isControlMessage: (message: { headers?: Record }) => - typeof message.headers?.control === `string`, - isChangeMessage: (message: { headers?: Record }) => - typeof message.headers?.operation === `string`, - ShapeStream: class MockShapeStream { - private onMessages: - | ((messages: Array>) => Promise | void) - | null = null - - constructor(options: { signal?: AbortSignal }) { - shapeStreamState.latest = { - signal: options.signal, - emit: async (messages) => { - await this.onMessages?.(messages) - }, - } - } +async function startTestBackend(): Promise { + await execFileAsync( + `docker`, + [ + `compose`, + `-p`, + TEST_COMPOSE_PROJECT, + `-f`, + ELECTRIC_AGENTS_COMPOSE_FILE, + `up`, + `-d`, + `--wait`, + ], + { env: TEST_BACKEND_ENV } + ) +} - subscribe( - callback: (messages: Array>) => Promise, - _onError?: (error: Error) => void - ): () => void { - this.onMessages = callback - return () => { - this.onMessages = null - } - } - }, -})) +async function stopTestBackend(): Promise { + await execFileAsync( + `docker`, + [ + `compose`, + `-p`, + TEST_COMPOSE_PROJECT, + `-f`, + ELECTRIC_AGENTS_COMPOSE_FILE, + `down`, + `-v`, + ], + { env: TEST_BACKEND_ENV } + ) +} -function createMockDb(): any { - return { - insert: () => ({ - values: () => ({ - onConflictDoNothing: () => ({ - returning: () => Promise.resolve([{ id: 1 }]), - }), - }), - }), - delete: () => ({ - where: () => Promise.resolve(), - }), - update: () => ({ - set: () => ({ - where: () => Promise.resolve(), - }), - }), - select: () => ({ - from: () => Promise.resolve([]), - }), +async function resetTestBackend(): Promise { + await startTestBackend() + const pg = postgres(TEST_POSTGRES_URL, { max: 1, onnotice: () => {} }) + try { + await pg.unsafe(` + DROP SCHEMA IF EXISTS drizzle CASCADE; + DROP SCHEMA IF EXISTS public CASCADE; + CREATE SCHEMA public AUTHORIZATION CURRENT_USER; + `) + } finally { + await pg.end() } + await runMigrations(TEST_POSTGRES_URL) } -describe(`WakeRegistry Electric sync`, () => { - it(`ignores malformed shape messages without headers while waiting for up-to-date`, async () => { - const registry = new WakeRegistry(createMockDb()) - - const startPromise = registry.startSync(`http://electric.test`) - - await expect( - shapeStreamState.latest!.emit([ - { - key: `ignored-malformed-message`, - }, - { - headers: { - control: `up-to-date`, - }, - }, - ]) - ).resolves.toBeUndefined() - - await expect(startPromise).resolves.toBeUndefined() - - await registry.stopSync() - }) - - it(`hydrates and updates the cache from shape changes`, async () => { - const registry = new WakeRegistry(createMockDb()) - - const startPromise = registry.startSync(`http://electric.test`) - - await shapeStreamState.latest!.emit([ - { - key: `1`, - value: { - id: 1, - subscriber_url: `/parent/p1`, - source_url: `/child/c1`, - condition: `runFinished`, - debounce_ms: 0, - timeout_ms: 0, - one_shot: false, - timeout_consumed: false, - include_response: true, - manifest_key: null, - created_at: new Date(), - }, - headers: { - operation: `insert`, - }, - }, - { - headers: { - control: `up-to-date`, - }, - }, - ]) - - await startPromise - - expect( - registry.evaluate(`/child/c1`, { - type: `run`, - key: `run-1`, - value: { status: `completed` }, - headers: { operation: `update` }, +type DbConnection = ReturnType + +let connection: DbConnection +let db: DbConnection[`db`] + +describe(`WakeRegistry Electric collection sync`, () => { + beforeAll(async () => { + await resetTestBackend() + connection = createDb(TEST_POSTGRES_URL) + db = connection.db + }, 120_000) + + afterAll(async () => { + await Promise.allSettled([connection?.client.end(), stopTestBackend()]) + }, 120_000) + + it(`syncs wake rows from Postgres through Electric`, async () => { + const suffix = randomUUID() + const subscriberUrl = `/parent/sync-${suffix}` + const sourceUrl = `/child/sync-${suffix}` + const registry = new WakeRegistry(db as any) + await registry.startSync(TEST_ELECTRIC_URL) + + const rows = await db + .insert(wakeRegistrations) + .values({ + subscriberUrl, + sourceUrl, + condition: `runFinished`, + oneShot: false, }) - ).toHaveLength(1) - - await shapeStreamState.latest!.emit([ - { - key: `1`, - headers: { - operation: `delete`, - }, - }, - ]) + .returning() - expect( - registry.evaluate(`/child/c1`, { + try { + let results: Awaited> = [] + const event = { type: `run`, - key: `run-2`, + key: `run-1`, value: { status: `completed` }, headers: { operation: `update` }, - }) - ).toHaveLength(0) - - await registry.stopSync() - expect(shapeStreamState.latest!.signal?.aborted).toBe(true) - }) + } + const deadline = Date.now() + 15_000 + do { + results = await registry.evaluate(sourceUrl, event) + if (results.length > 0) break + await new Promise((resolve) => setTimeout(resolve, 100)) + } while (Date.now() < deadline) + + expect(results).toHaveLength(1) + expect(results[0]!.registrationDbId).toBe(rows[0]!.id) + } finally { + await registry.stopSync() + await db + .delete(wakeRegistrations) + .where(eq(wakeRegistrations.sourceUrl, sourceUrl)) + } + }, 25_000) }) diff --git a/packages/agents-server/test/wake-registry.test.ts b/packages/agents-server/test/wake-registry.test.ts index c879a124ca..28d4eae148 100644 --- a/packages/agents-server/test/wake-registry.test.ts +++ b/packages/agents-server/test/wake-registry.test.ts @@ -29,6 +29,14 @@ import { import type { Server } from 'node:http' import type { WakeEvalResult } from '../src/wake-registry' +async function createLocalRegistry( + tenantId?: string | null +): Promise { + const registry = new WakeRegistry(createMockDb(), tenantId) + await registry.startLocalForTests() + return registry +} + let nextDbId = 1 function createMockDb(): any { return { @@ -61,8 +69,237 @@ function createMockDb(): any { } describe(`Wake Registry`, () => { + it(`evaluates registrations from local TanStack DB collection`, async () => { + const registry = await createLocalRegistry() + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/c1`, + condition: `runFinished`, + oneShot: false, + }) + + const results = await registry.evaluate(`/child/c1`, { + type: `run`, + key: `run-1`, + value: { status: `completed` }, + headers: { operation: `update` }, + }) + + expect(results).toHaveLength(1) + expect(results[0]!.subscriberUrl).toBe(`/parent/p1`) + expect(results[0]!.registrationDbId).toBe(1) + expect(results[0]!.sourceEventKey).toBe(`update:run-1`) + }) + + it(`reconciles invisible insert conflicts to the locally allocated database row`, async () => { + const registry = await createLocalRegistry() + ;(registry as any).mode = `electric` + ;(registry as any).allocateRuntimeId = async () => 42 + ;(registry as any).persistInsert = async (row: any) => ({ + txid: 123, + row, + }) + const timeout = new Error(`timed out`) + timeout.name = `TimeoutWaitingForTxIdError` + ;(registry as any).requireCollection().utils.awaitTxId = async () => { + throw timeout + } + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/conflict`, + condition: `runFinished`, + oneShot: false, + }) + + const results = await registry.evaluate(`/child/conflict`, { + type: `run`, + key: `run-1`, + value: { status: `completed` }, + headers: { operation: `update` }, + }) + + expect(results).toHaveLength(1) + expect(results[0]!.registrationDbId).toBe(42) + }) + + it(`keeps registrations locally visible when Electric tx visibility times out`, async () => { + const registry = await createLocalRegistry() + ;(registry as any).mode = `electric` + ;(registry as any).allocateRuntimeId = async () => 42 + ;(registry as any).persistInsert = async (row: any) => ({ txid: 123, row }) + const timeout = new Error(`timed out`) + timeout.name = `TimeoutWaitingForTxIdError` + ;(registry as any).requireCollection().utils.awaitTxId = async () => { + throw timeout + } + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/slow-visible`, + condition: `runFinished`, + oneShot: false, + }) + + const results = await registry.evaluate(`/child/slow-visible`, { + type: `run`, + key: `run-1`, + value: { status: `completed` }, + headers: { operation: `update` }, + }) + + expect(results).toHaveLength(1) + }) + + it(`keeps committed one-shot deletes locally deleted when Electric tx visibility times out`, async () => { + const registry = await createLocalRegistry() + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/one-shot-timeout`, + condition: `runFinished`, + oneShot: true, + }) + ;(registry as any).mode = `electric` + ;(registry as any).persistDeleteRows = async () => 123 + const timeout = new Error(`timed out`) + timeout.name = `TimeoutWaitingForTxIdError` + ;(registry as any).requireCollection().utils.awaitTxId = async () => { + throw timeout + } + + const first = await registry.evaluate(`/child/one-shot-timeout`, { + type: `run`, + key: `run-1`, + value: { status: `completed` }, + headers: { operation: `update` }, + }) + await new Promise((resolve) => setTimeout(resolve, 0)) + const second = await registry.evaluate(`/child/one-shot-timeout`, { + type: `run`, + key: `run-2`, + value: { status: `completed` }, + headers: { operation: `update` }, + }) + + expect(first).toHaveLength(1) + expect(second).toHaveLength(0) + }) + + it(`persists unregister predicates even when matching rows are not locally visible`, async () => { + const registry = await createLocalRegistry() + ;(registry as any).mode = `electric` + let persisted: unknown + ;(registry as any).persistDeleteRows = async (input: unknown) => { + persisted = input + return undefined + } + + await registry.unregisterBySubscriber(`/parent/missing`) + + expect(persisted).toEqual({ + rows: [], + persist: { + kind: `subscriber`, + tenantId: `default`, + subscriberUrl: `/parent/missing`, + }, + }) + }) + + it(`cleans up the Electric collection on stopSync`, async () => { + const registry = await createLocalRegistry() + const cleanup = vi.fn(async () => undefined) + const dispose = vi.fn(async () => undefined) + ;(registry as any).registrationsCollection = { cleanup } + ;(registry as any).registrationsEffect = { dispose } + + await registry.stopSync() + + expect(dispose).toHaveBeenCalledTimes(1) + expect(cleanup).toHaveBeenCalledTimes(1) + }) + + it(`starts the registration effect after retrying an existing collection preload`, async () => { + const registry = await createLocalRegistry() + const preload = vi.fn(async () => undefined) + const startEffect = vi.fn() + ;(registry as any).registrationsCollection = { preload } + ;(registry as any).startRegistrationEffect = startEffect + + await registry.startSync(`http://electric.test`) + + expect(preload).toHaveBeenCalledTimes(1) + expect(startEffect).toHaveBeenCalledTimes(1) + }) + + it(`does not await an invisible txid when runtime timeout update matches no rows`, async () => { + const registry = await createLocalRegistry() + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/stale-timeout`, + condition: `runFinished`, + oneShot: false, + timeoutMs: 1_000, + }) + ;(registry as any).mode = `electric` + ;(registry as any).persistTimeoutConsumed = async () => undefined + + await expect( + (registry as any).markTimeoutConsumed(1, `default`) + ).resolves.toBeUndefined() + }) + + it(`timeout effect delivers timeout wake once and marks row consumed`, async () => { + const registry = await createLocalRegistry() + const delivered: Array = [] + registry.setTimeoutCallback((result) => delivered.push(result)) + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/c1`, + condition: `runFinished`, + oneShot: false, + timeoutMs: 25, + }) + + await new Promise((resolve) => setTimeout(resolve, 80)) + + expect(delivered).toHaveLength(1) + expect(delivered[0]!.wakeMessage.timeout).toBe(true) + + await new Promise((resolve) => setTimeout(resolve, 80)) + expect(delivered).toHaveLength(1) + }) + + it(`local timeout effect does not consume timeout before callback is registered`, async () => { + const registry = await createLocalRegistry() + const delivered: Array = [] + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/c1`, + condition: `runFinished`, + oneShot: false, + timeoutMs: 25, + }) + + await new Promise((resolve) => setTimeout(resolve, 80)) + + expect(delivered).toHaveLength(0) + + registry.setTimeoutCallback((result) => delivered.push(result)) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(delivered).toHaveLength(1) + expect(delivered[0]!.sourceEventKey).toBe(`timeout`) + }) + it(`keeps shared registrations scoped by tenant`, async () => { - const registry = new WakeRegistry(createMockDb(), null) + const registry = await createLocalRegistry(null) await registry.register({ tenantId: `tenant-a`, subscriberUrl: `/tenant-a/parent`, @@ -84,8 +321,8 @@ describe(`Wake Registry`, () => { value: {}, headers: { operation: `insert` }, } - const tenantA = registry.evaluate(`/source/shared`, event, `tenant-a`) - const tenantB = registry.evaluate(`/source/shared`, event, `tenant-b`) + const tenantA = await registry.evaluate(`/source/shared`, event, `tenant-a`) + const tenantB = await registry.evaluate(`/source/shared`, event, `tenant-b`) expect(tenantA).toHaveLength(1) expect(tenantA[0]!.tenantId).toBe(`tenant-a`) @@ -96,7 +333,7 @@ describe(`Wake Registry`, () => { }) it(`dispatches shared timeout callbacks by tenant`, async () => { - const registry = new WakeRegistry(createMockDb(), null) + const registry = await createLocalRegistry(null) const tenantA: Array = [] const tenantB: Array = [] registry.setTimeoutCallback((result) => tenantA.push(result), `tenant-a`) @@ -128,7 +365,7 @@ describe(`Wake Registry`, () => { }) it(`does not consume shared timeout before tenant callback is registered`, async () => { - const registry = new WakeRegistry(createMockDb(), null) + const registry = await createLocalRegistry(null) const delivered: Array = [] await registry.register({ @@ -148,6 +385,7 @@ describe(`Wake Registry`, () => { (result) => delivered.push(result), `tenant-late` ) + await new Promise((resolve) => setTimeout(resolve, 0)) expect(delivered).toHaveLength(1) expect(delivered[0]!.tenantId).toBe(`tenant-late`) @@ -155,7 +393,7 @@ describe(`Wake Registry`, () => { }) it(`evaluates runFinished condition on completed run event`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/parent/p1`, sourceUrl: `/child/c1`, @@ -163,7 +401,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const results = registry.evaluate(`/child/c1`, { + const results = await registry.evaluate(`/child/c1`, { type: `run`, key: `run-1`, value: { status: `completed` }, @@ -181,7 +419,7 @@ describe(`Wake Registry`, () => { }) it(`returns registrationDbId and sourceEventKey for immediate matches`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/watcher/w1`, sourceUrl: `/_cron/abc`, @@ -189,7 +427,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const results = registry.evaluate(`/_cron/abc`, { + const results = await registry.evaluate(`/_cron/abc`, { type: `cron_tick`, key: `tick-7`, value: {}, @@ -201,40 +439,8 @@ describe(`Wake Registry`, () => { expect(results[0]!.sourceEventKey).toBe(`insert:tick-7`) }) - it(`removes cached registrations from shape delete old_value ids`, async () => { - const registry = new WakeRegistry(createMockDb()) - await registry.register({ - subscriberUrl: `/watcher/w1`, - sourceUrl: `/_cron/abc`, - condition: { on: `change` }, - oneShot: false, - }) - - const before = registry.evaluate(`/_cron/abc`, { - type: `cron_tick`, - key: `tick-7`, - value: {}, - headers: { operation: `insert` }, - }) - expect(before).toHaveLength(1) - - await (registry as any).applyShapeMessage({ - key: `shape-key-is-not-the-registration-id`, - old_value: { id: before[0]!.registrationDbId }, - headers: { operation: `delete` }, - }) - - const after = registry.evaluate(`/_cron/abc`, { - type: `cron_tick`, - key: `tick-8`, - value: {}, - headers: { operation: `insert` }, - }) - expect(after).toHaveLength(0) - }) - it(`keeps distinct registrations distinct for the same source event`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/watcher/w1`, sourceUrl: `/_cron/abc`, @@ -248,7 +454,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const results = registry.evaluate(`/_cron/abc`, { + const results = await registry.evaluate(`/_cron/abc`, { type: `cron_tick`, key: `tick-7`, value: {}, @@ -263,7 +469,7 @@ describe(`Wake Registry`, () => { }) it(`runFinished remains active for repeated child completions`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/parent/p1`, sourceUrl: `/child/c1`, @@ -271,7 +477,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const first = registry.evaluate(`/child/c1`, { + const first = await registry.evaluate(`/child/c1`, { type: `run`, key: `run-1`, value: { status: `completed` }, @@ -279,7 +485,7 @@ describe(`Wake Registry`, () => { }) expect(first).toHaveLength(1) - const second = registry.evaluate(`/child/c1`, { + const second = await registry.evaluate(`/child/c1`, { type: `run`, key: `run-2`, value: { status: `completed` }, @@ -289,7 +495,7 @@ describe(`Wake Registry`, () => { }) it(`evaluates collection change condition`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/watcher/w1`, sourceUrl: `/source/s1`, @@ -297,7 +503,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const results = registry.evaluate(`/source/s1`, { + const results = await registry.evaluate(`/source/s1`, { type: `texts`, key: `text-1`, value: { content: `hello` }, @@ -310,7 +516,7 @@ describe(`Wake Registry`, () => { }) it(`includes inbox message details in wake changes`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/agent/self`, sourceUrl: `/agent/self`, @@ -318,7 +524,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const results = registry.evaluate(`/agent/self`, { + const results = await registry.evaluate(`/agent/self`, { type: `inbox`, key: `msg-1`, value: { @@ -347,7 +553,7 @@ describe(`Wake Registry`, () => { }) it(`ignores events for non-matching collections`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/watcher/w1`, sourceUrl: `/source/s1`, @@ -355,7 +561,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const results = registry.evaluate(`/source/s1`, { + const results = await registry.evaluate(`/source/s1`, { type: `toolCalls`, key: `tc-1`, value: {}, @@ -366,7 +572,7 @@ describe(`Wake Registry`, () => { }) it(`filters collection wakes by operation kind when ops is provided`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/watcher/w1`, sourceUrl: `/source/s1`, @@ -378,13 +584,13 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const insertResults = registry.evaluate(`/source/s1`, { + const insertResults = await registry.evaluate(`/source/s1`, { type: `members`, key: `/task/a`, value: { url: `/task/a` }, headers: { operation: `insert` }, }) - const deleteResults = registry.evaluate(`/source/s1`, { + const deleteResults = await registry.evaluate(`/source/s1`, { type: `members`, key: `/task/a`, old_value: { url: `/task/a` }, @@ -401,7 +607,7 @@ describe(`Wake Registry`, () => { }) it(`maps pg-sync old_value from the event value into change.oldValue`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/watcher/w1`, sourceUrl: `/_electric/pg-sync/default/pg-source-1`, @@ -409,7 +615,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const results = registry.evaluate( + const results = await registry.evaluate( `/_electric/pg-sync/default/pg-source-1`, { type: `pg_sync_change`, @@ -434,8 +640,71 @@ describe(`Wake Registry`, () => { }) }) + it(`unregisterBySource removes all matching local collection rows`, async () => { + const registry = await createLocalRegistry() + + await registry.register({ + subscriberUrl: `/parent/a`, + sourceUrl: `/source/1`, + condition: { on: `change` }, + oneShot: false, + }) + await registry.register({ + subscriberUrl: `/parent/b`, + sourceUrl: `/source/1`, + condition: { on: `change` }, + oneShot: false, + }) + await registry.register({ + subscriberUrl: `/parent/c`, + sourceUrl: `/source/2`, + condition: { on: `change` }, + oneShot: false, + }) + + await registry.unregisterBySource(`/source/1`) + + expect( + await registry.evaluate(`/source/1`, { + type: `texts`, + key: `t1`, + value: {}, + headers: { operation: `insert` }, + }) + ).toHaveLength(0) + expect( + await registry.evaluate(`/source/2`, { + type: `texts`, + key: `t2`, + value: {}, + headers: { operation: `insert` }, + }) + ).toHaveLength(1) + }) + + it(`oneShot match removes row before a second immediate evaluation`, async () => { + const registry = await createLocalRegistry() + + await registry.register({ + subscriberUrl: `/parent/p1`, + sourceUrl: `/child/c1`, + condition: `runFinished`, + oneShot: true, + }) + + const event = { + type: `run`, + key: `run-1`, + value: { status: `completed` }, + headers: { operation: `update` }, + } + + expect(await registry.evaluate(`/child/c1`, event)).toHaveLength(1) + expect(await registry.evaluate(`/child/c1`, event)).toHaveLength(0) + }) + it(`cleanup on subscriber deletion removes all registrations`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/parent/p1`, sourceUrl: `/child/c1`, @@ -451,13 +720,13 @@ describe(`Wake Registry`, () => { await registry.unregisterBySubscriber(`/parent/p1`) - const r1 = registry.evaluate(`/child/c1`, { + const r1 = await registry.evaluate(`/child/c1`, { type: `run`, key: `run-1`, value: { status: `completed` }, headers: { operation: `update` }, }) - const r2 = registry.evaluate(`/child/c2`, { + const r2 = await registry.evaluate(`/child/c2`, { type: `texts`, key: `t-1`, value: {}, @@ -469,7 +738,7 @@ describe(`Wake Registry`, () => { }) it(`surgical unregister removes only the targeted subscriber+source pair`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/parent/p1`, sourceUrl: `/child/c1`, @@ -485,13 +754,13 @@ describe(`Wake Registry`, () => { await registry.unregisterBySubscriberAndSource(`/parent/p1`, `/child/c1`) - const r1 = registry.evaluate(`/child/c1`, { + const r1 = await registry.evaluate(`/child/c1`, { type: `run`, key: `run-1`, value: { status: `completed` }, headers: { operation: `update` }, }) - const r2 = registry.evaluate(`/child/c2`, { + const r2 = await registry.evaluate(`/child/c2`, { type: `texts`, key: `t-1`, value: {}, @@ -502,7 +771,7 @@ describe(`Wake Registry`, () => { expect(r2).toHaveLength(1) }) - it(`register() rejects when DB insert fails so callers can catch`, async () => { + it(`local register does not require a backing DB insert`, async () => { const failingDb = { ...createMockDb(), insert: () => ({ @@ -514,6 +783,7 @@ describe(`Wake Registry`, () => { }), } const registry = new WakeRegistry(failingDb) + await registry.startLocalForTests() await expect( registry.register({ @@ -522,11 +792,11 @@ describe(`Wake Registry`, () => { condition: `runFinished`, oneShot: false, }) - ).rejects.toThrow(`connection refused`) + ).resolves.toBeUndefined() }) it(`rebuilds registry from register calls`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/parent/p1`, sourceUrl: `/child/c1`, @@ -540,7 +810,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const runResults = registry.evaluate(`/child/c1`, { + const runResults = await registry.evaluate(`/child/c1`, { type: `run`, key: `run-1`, value: { status: `completed` }, @@ -549,7 +819,7 @@ describe(`Wake Registry`, () => { expect(runResults).toHaveLength(1) expect(runResults[0]!.subscriberUrl).toBe(`/parent/p1`) - const changeResults = registry.evaluate(`/source/s1`, { + const changeResults = await registry.evaluate(`/source/s1`, { type: `texts`, key: `t-1`, value: {}, @@ -560,7 +830,7 @@ describe(`Wake Registry`, () => { }) it(`evaluates shared-state wakes against the shared-state stream path`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/parent/p1`, sourceUrl: `/_electric/shared-state/board-1`, @@ -568,7 +838,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const results = registry.evaluate(`/_electric/shared-state/board-1`, { + const results = await registry.evaluate(`/_electric/shared-state/board-1`, { type: `texts`, key: `t-1`, value: {}, @@ -580,7 +850,7 @@ describe(`Wake Registry`, () => { }) it(`delivers timeout wake when timeoutMs expires`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() const delivered: Array = [] registry.setTimeoutCallback((result) => { delivered.push(result) @@ -605,7 +875,7 @@ describe(`Wake Registry`, () => { }) it(`debounce coalesces rapid events into single wake`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() const debounced: Array = [] registry.setDebounceCallback((result) => { debounced.push(result) @@ -620,7 +890,7 @@ describe(`Wake Registry`, () => { }) for (let i = 0; i < 3; i++) { - const immediate = registry.evaluate(`/source/s1`, { + const immediate = await registry.evaluate(`/source/s1`, { type: `texts`, key: `text-${i}`, value: { content: `msg-${i}` }, @@ -639,7 +909,7 @@ describe(`Wake Registry`, () => { }) it(`keeps debounced registrations distinct for different conditions on the same source`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() const debounced: Array = [] registry.setDebounceCallback((result) => { debounced.push(result) @@ -661,7 +931,7 @@ describe(`Wake Registry`, () => { }) expect( - registry.evaluate(`/source/s1`, { + await registry.evaluate(`/source/s1`, { type: `texts`, key: `text-1`, value: { content: `hello` }, @@ -670,7 +940,7 @@ describe(`Wake Registry`, () => { ).toHaveLength(0) expect( - registry.evaluate(`/source/s1`, { + await registry.evaluate(`/source/s1`, { type: `toolCalls`, key: `tool-1`, value: { name: `search` }, @@ -694,6 +964,40 @@ describe(`Wake Registry`, () => { ).toBe(2) }) + it(`awaits wake registration evaluation results`, async () => { + const wakeResult = { + tenantId: `default`, + subscriberUrl: `/parent/p1`, + registrationDbId: 12, + sourceEventKey: `run-1`, + wakeMessage: { + source: `/child/c1`, + timeout: false, + changes: [{ collection: `runs`, kind: `update`, key: `run-1` }], + }, + runFinishedStatus: `completed`, + } + const wakeRegistry = { + evaluate: vi.fn().mockResolvedValue([wakeResult]), + } + const deliverWakeResult = vi.fn().mockResolvedValue(undefined) + const manager = { + tenantId: `default`, + wakeRegistry, + deliverWakeResult, + } as any + + await EntityManager.prototype.evaluateWakes.call(manager, `/child/c1`, { + type: `run`, + key: `run-1`, + value: { status: `completed` }, + headers: { operation: `update` }, + }) + + expect(wakeRegistry.evaluate).toHaveBeenCalledTimes(1) + expect(deliverWakeResult).toHaveBeenCalledWith(wakeResult) + }) + it(`deliverWakeResult keys fanout by registrationDbId and sourceEventKey`, async () => { const appendIdempotent = vi.fn().mockResolvedValue(undefined) const manager = { @@ -736,7 +1040,7 @@ describe(`Wake Registry`, () => { }) it(`passes includeResponse through evaluate result`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/parent/p1`, sourceUrl: `/child/c1`, @@ -745,7 +1049,7 @@ describe(`Wake Registry`, () => { includeResponse: false, }) - const results = registry.evaluate(`/child/c1`, { + const results = await registry.evaluate(`/child/c1`, { type: `run`, key: `run-1`, value: { status: `completed` }, @@ -756,8 +1060,8 @@ describe(`Wake Registry`, () => { expect(results[0]!.includeResponse).toBe(false) }) - it(`includeResponse defaults to undefined when not set`, async () => { - const registry = new WakeRegistry(createMockDb()) + it(`includeResponse defaults to true when not set`, async () => { + const registry = await createLocalRegistry() await registry.register({ subscriberUrl: `/parent/p1`, sourceUrl: `/child/c1`, @@ -765,7 +1069,7 @@ describe(`Wake Registry`, () => { oneShot: false, }) - const results = registry.evaluate(`/child/c1`, { + const results = await registry.evaluate(`/child/c1`, { type: `run`, key: `run-1`, value: { status: `completed` }, @@ -773,11 +1077,11 @@ describe(`Wake Registry`, () => { }) expect(results).toHaveLength(1) - expect(results[0]!.includeResponse).toBeUndefined() + expect(results[0]!.includeResponse).toBe(true) }) it(`debounced runFinished preserves runFinishedStatus and includeResponse`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() const delivered: Array = [] registry.setDebounceCallback((result) => { delivered.push(result) @@ -793,7 +1097,7 @@ describe(`Wake Registry`, () => { }) // Immediate evaluate should return nothing (debounced) - const results = registry.evaluate(`/child/c1`, { + const results = await registry.evaluate(`/child/c1`, { type: `run`, key: `run-1`, value: { status: `completed` }, @@ -812,7 +1116,7 @@ describe(`Wake Registry`, () => { }) it(`debounced multi-run: status matches changes[0] run key`, async () => { - const registry = new WakeRegistry(createMockDb()) + const registry = await createLocalRegistry() const delivered: Array = [] registry.setDebounceCallback((result) => { delivered.push(result) @@ -827,7 +1131,7 @@ describe(`Wake Registry`, () => { }) // run-1 completes - registry.evaluate(`/child/c1`, { + await registry.evaluate(`/child/c1`, { type: `run`, key: `run-1`, value: { status: `completed` }, @@ -835,7 +1139,7 @@ describe(`Wake Registry`, () => { }) // run-2 fails in the same debounce window - registry.evaluate(`/child/c1`, { + await registry.evaluate(`/child/c1`, { type: `run`, key: `run-2`, value: { status: `failed` }, @@ -968,6 +1272,79 @@ describe(`Wake Registry Integration`, () => { expect(subRes.status).toBeLessThan(300) } + it(`delivers concurrent runFinished wakes from multiple children to the same parent`, async () => { + const ts = Date.now() + const typeName = `wakemulti${ts}` + + const typeRes = await fetch(`${baseUrl}/_electric/entity-types`, { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ + name: typeName, + description: `multiple child runFinished wake test`, + }), + }) + expect(typeRes.status).toBe(201) + + const parentRes = await fetch( + `${baseUrl}/_electric/entities/${typeName}/parent-${ts}`, + { + method: `PUT`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({}), + } + ) + expect(parentRes.status).toBe(201) + const parent = (await parentRes.json()) as { + url: string + streams: { main: string } + } + + const manager = getElectricAgentsManager() + const children: Array<{ url: string; streams: { main: string } }> = [] + for (const childName of [`a`, `b`, `c`]) { + const childRes = await fetch( + `${baseUrl}/_electric/entities/${typeName}/child-${childName}-${ts}`, + { + method: `PUT`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ parent: parent.url }), + } + ) + expect(childRes.status).toBe(201) + const child = (await childRes.json()) as { + url: string + streams: { main: string } + } + children.push(child) + await manager.wakeRegistry.register({ + subscriberUrl: parent.url, + sourceUrl: child.url, + condition: `runFinished`, + oneShot: false, + }) + } + + await Promise.all( + children.map((child, index) => + manager.evaluateWakes(child.url, { + type: `run`, + key: `run-${index}`, + value: { status: `completed`, result: `done-${index}` }, + headers: { operation: `update` }, + }) + ) + ) + + const wakeEvents = await waitForWakeEvents(parent.streams.main, 3) + expect(wakeEvents).toHaveLength(3) + expect( + new Set( + wakeEvents.map((event) => (event.value as { source?: string }).source) + ) + ).toEqual(new Set(children.map((child) => child.url))) + }, 15_000) + it(`spawn with wake registers condition and delivers wake on child run completion`, async () => { const ts = Date.now() const typeName = `wakerf${ts}` diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f0f939c41c..78a910c6b0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1946,6 +1946,12 @@ importers: '@sinclair/typebox': specifier: ^0.34.48 version: 0.34.49 + '@tanstack/db': + specifier: ^0.6.7 + version: 0.6.7(typescript@5.8.3) + '@tanstack/electric-db-collection': + specifier: ^0.3.5 + version: 0.3.5(typescript@5.8.3) '@whatwg-node/server': specifier: ^0.10.18 version: 0.10.18 @@ -4768,8 +4774,8 @@ packages: '@electric-sql/client@1.2.0': resolution: {integrity: sha512-K/MEjti3UF4aPKJJqO6Tp4f5noqc2/3icU1NPdpKfQaHwbzGtEX2aJmL2vxTEUJbfyrISkPKbOPnrz/lAvw1Vg==} - '@electric-sql/client@1.5.20': - resolution: {integrity: sha512-Ftf+ze/rZK9dBzvZqW7wrS74aQBToUlhQeUFm7KCTfOCkNJZ+7iZMm9JW/NjyIQjPOz4ApXg3VWOpeCbzsU3IQ==} + '@electric-sql/client@1.5.21': + resolution: {integrity: sha512-Aize0Aa2ZyTZySQMfMSTDPP/EJWcR1HfH7qE1DgMhI5f6dDxm1B74CN4o1fnIQeW/ZLg9TdbTgFYo1tsUg7mXg==} hasBin: true '@electric-sql/d2mini@0.1.8': @@ -25333,7 +25339,7 @@ snapshots: optionalDependencies: '@rollup/rollup-darwin-arm64': 4.46.1 - '@electric-sql/client@1.5.20': + '@electric-sql/client@1.5.21': dependencies: '@microsoft/fetch-event-source': 2.0.1(patch_hash=46f4e76dd960e002a542732bb4323817a24fce1673cb71e2f458fe09776fa188) optionalDependencies: @@ -31426,9 +31432,20 @@ snapshots: - supports-color - typescript + '@tanstack/electric-db-collection@0.3.5(typescript@5.8.3)': + dependencies: + '@electric-sql/client': 1.5.21 + '@standard-schema/spec': 1.1.0 + '@tanstack/db': 0.6.7(typescript@5.8.3) + '@tanstack/store': 0.9.3 + debug: 4.4.3 + transitivePeerDependencies: + - supports-color + - typescript + '@tanstack/electric-db-collection@0.3.5(typescript@5.9.3)': dependencies: - '@electric-sql/client': 1.5.20 + '@electric-sql/client': 1.5.21 '@standard-schema/spec': 1.1.0 '@tanstack/db': 0.6.7(typescript@5.9.3) '@tanstack/store': 0.9.3