Skip to content

Commit 8f0c84c

Browse files
committed
test(session): add shard integration coverage
1 parent 6ed85f4 commit 8f0c84c

2 files changed

Lines changed: 731 additions & 57 deletions

File tree

packages/opencode/src/sync/index.ts

Lines changed: 104 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ export namespace SyncEvent {
3030
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
3131

3232
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
33+
type Sync<T> = T extends Promise<any> ? never : T
34+
const sessionTypes = new Set(["session.created", "session.updated", "session.deleted"])
3335

3436
export const registry = new Map<string, Definition>()
3537
let projectors: Map<Definition, ProjectorFunc> | undefined
@@ -65,7 +67,7 @@ export namespace SyncEvent {
6567
}
6668

6769
export function versionedType<A extends string>(type: A): A
68-
export function versionedType<A extends string, B extends number>(type: A, version: B): `${A}/${B}`
70+
export function versionedType<A extends string, B extends number>(type: A, version: B): `${A}.${B}`
6971
export function versionedType(type: string, version?: number) {
7072
return version ? `${type}.${version}` : type
7173
}
@@ -102,6 +104,90 @@ export namespace SyncEvent {
102104
return [def, func as ProjectorFunc]
103105
}
104106

107+
function root(type: string, agg: string): string | undefined {
108+
if (sessionTypes.has(type)) return
109+
const version = versions.get(type)
110+
if (!version) return
111+
const def = registry.get(versionedType(type, version))
112+
if (!def) return
113+
if (def.aggregate !== "sessionID") return
114+
return Database.sessionRoot(agg)
115+
}
116+
117+
function seq(type: string, agg: string) {
118+
const id = root(type, agg)
119+
if (id) {
120+
return Database.session(id)
121+
.select({ seq: EventSequenceTable.seq })
122+
.from(EventSequenceTable)
123+
.where(eq(EventSequenceTable.aggregate_id, agg))
124+
.get()
125+
}
126+
return Database.use((db) =>
127+
db
128+
.select({ seq: EventSequenceTable.seq })
129+
.from(EventSequenceTable)
130+
.where(eq(EventSequenceTable.aggregate_id, agg))
131+
.get(),
132+
)
133+
}
134+
135+
function transact<T>(
136+
type: string,
137+
agg: string,
138+
cb: (tx: Database.TxOrDb) => Sync<T>,
139+
options?: { behavior?: "deferred" | "immediate" | "exclusive" },
140+
): Sync<T> {
141+
const id = root(type, agg)
142+
if (id) {
143+
return Database.session(id).transaction((tx) => cb(tx), { behavior: options?.behavior }) as Sync<T>
144+
}
145+
return Database.transaction(cb, options)
146+
}
147+
148+
function apply<Def extends Definition>(tx: Database.TxOrDb, projector: ProjectorFunc, def: Def, event: Event<Def>) {
149+
projector(tx, event.data)
150+
151+
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
152+
tx.insert(EventSequenceTable)
153+
.values({
154+
aggregate_id: event.aggregateID,
155+
seq: event.seq,
156+
})
157+
.onConflictDoUpdate({
158+
target: EventSequenceTable.aggregate_id,
159+
set: { seq: event.seq },
160+
})
161+
.run()
162+
tx.insert(EventTable)
163+
.values({
164+
id: event.id,
165+
seq: event.seq,
166+
aggregate_id: event.aggregateID,
167+
type: versionedType(def.type, def.version),
168+
data: event.data as Record<string, unknown>,
169+
})
170+
.run()
171+
}
172+
}
173+
174+
function emit<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
175+
return () => {
176+
Bus.emit("event", { def, event })
177+
178+
if (options?.publish) {
179+
const result = convertEvent(def.type, event.data)
180+
if (result instanceof Promise) {
181+
result.then((data) => {
182+
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
183+
})
184+
return
185+
}
186+
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
187+
}
188+
}
189+
}
190+
105191
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
106192
if (projectors == null) {
107193
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
@@ -114,49 +200,8 @@ export namespace SyncEvent {
114200

115201
// idempotent: need to ignore any events already logged
116202

117-
Database.transaction((tx) => {
118-
projector(tx, event.data)
119-
120-
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
121-
tx.insert(EventSequenceTable)
122-
.values({
123-
aggregate_id: event.aggregateID,
124-
seq: event.seq,
125-
})
126-
.onConflictDoUpdate({
127-
target: EventSequenceTable.aggregate_id,
128-
set: { seq: event.seq },
129-
})
130-
.run()
131-
tx.insert(EventTable)
132-
.values({
133-
id: event.id,
134-
seq: event.seq,
135-
aggregate_id: event.aggregateID,
136-
type: versionedType(def.type, def.version),
137-
data: event.data as Record<string, unknown>,
138-
})
139-
.run()
140-
}
141-
142-
Database.effect(() => {
143-
Bus.emit("event", {
144-
def,
145-
event,
146-
})
147-
148-
if (options?.publish) {
149-
const result = convertEvent(def.type, event.data)
150-
if (result instanceof Promise) {
151-
result.then((data) => {
152-
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
153-
})
154-
} else {
155-
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
156-
}
157-
}
158-
})
159-
})
203+
transact(def.type, event.aggregateID, (tx) => apply(tx, projector, def, event))
204+
Database.effect(emit(def, event, options))
160205
}
161206

162207
// TODO:
@@ -165,19 +210,13 @@ export namespace SyncEvent {
165210
// and it validets all the sequence ids
166211
// * when loading events from db, apply zod validation to ensure shape
167212

168-
export function replay(event: SerializedEvent, options?: { publish: boolean }) {
213+
export function replay(event: SerializedEvent, options?: { republish: boolean }) {
169214
const def = registry.get(event.type)
170215
if (!def) {
171216
throw new Error(`Unknown event type: ${event.type}`)
172217
}
173218

174-
const row = Database.use((db) =>
175-
db
176-
.select({ seq: EventSequenceTable.seq })
177-
.from(EventSequenceTable)
178-
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
179-
.get(),
180-
)
219+
const row = seq(def.type, event.aggregateID)
181220

182221
const latest = row?.seq ?? -1
183222
if (event.seq <= latest) {
@@ -189,11 +228,12 @@ export namespace SyncEvent {
189228
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
190229
}
191230

192-
process(def, event, { publish: !!options?.publish })
231+
process(def, event, { publish: !!options?.republish })
193232
}
194233

195234
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
196235
const agg = (data as Record<string, string>)[def.aggregate]
236+
const publish = options?.publish ?? true
197237
// This should never happen: we've enforced it via typescript in
198238
// the definition
199239
if (agg == null) {
@@ -204,12 +244,13 @@ export namespace SyncEvent {
204244
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
205245
}
206246

207-
const { publish = true } = options || {}
208-
209247
// Note that this is an "immediate" transaction which is critical.
210248
// We need to make sure we can safely read and write with nothing
211249
// else changing the data from under us
212-
Database.transaction(
250+
let fn = () => {}
251+
transact(
252+
def.type,
253+
agg,
213254
(tx) => {
214255
const id = EventID.ascending()
215256
const row = tx
@@ -220,12 +261,18 @@ export namespace SyncEvent {
220261
const seq = row?.seq != null ? row.seq + 1 : 0
221262

222263
const event = { id, seq, aggregateID: agg, data }
223-
process(def, event, { publish })
264+
const projector = projectors?.get(def)
265+
if (!projector) {
266+
throw new Error(`Projector not found for event: ${def.type}`)
267+
}
268+
apply(tx, projector, def, event)
269+
fn = emit(def, event, { publish })
224270
},
225271
{
226272
behavior: "immediate",
227273
},
228274
)
275+
Database.effect(fn)
229276
}
230277

231278
export function remove(aggregateID: string) {

0 commit comments

Comments
 (0)