Skip to content

Commit f56244c

Browse files
committed
feat(session): shard-aware todo reads and update export
1 parent 8f0c84c commit f56244c

4 files changed

Lines changed: 267 additions & 143 deletions

File tree

packages/opencode/src/session/todo.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,15 @@ export namespace Todo {
6262
})
6363

6464
const get = Effect.fn("Todo.get")(function* (sessionID: SessionID) {
65-
const rows = yield* Effect.sync(() =>
66-
Database.resolveSession(sessionID)
65+
const rows = yield* Effect.sync(() => {
66+
const db = Database.resolveSession(sessionID)
67+
return db
6768
.select()
6869
.from(TodoTable)
6970
.where(eq(TodoTable.session_id, sessionID))
7071
.orderBy(asc(TodoTable.position))
71-
.all(),
72-
)
72+
.all()
73+
})
7374
return rows.map((row) => ({
7475
content: row.content,
7576
status: row.status,
@@ -87,4 +88,8 @@ export namespace Todo {
8788
export async function get(sessionID: SessionID) {
8889
return runPromise((svc) => svc.get(sessionID))
8990
}
91+
92+
export async function update(input: { sessionID: SessionID; todos: Info[] }) {
93+
return runPromise((svc) => svc.update(input))
94+
}
9095
}

packages/opencode/src/storage/db.ts

Lines changed: 113 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -417,43 +417,94 @@ export namespace Database {
417417

418418
log.info("migrate.start", { root, sessions: ids.length })
419419

420-
dst.exec("BEGIN IMMEDIATE")
420+
const ph = ids.map(() => "?").join(",")
421+
const mem = Path.endsWith(":memory:")
422+
if (!mem) dst.exec(`ATTACH DATABASE '${Path}' AS global`)
421423
try {
422-
const msg = dst.prepare(
423-
"INSERT OR IGNORE INTO message (id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?)",
424-
)
425-
const part = dst.prepare(
426-
"INSERT OR IGNORE INTO part (id, message_id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?, ?)",
427-
)
428-
const todo = dst.prepare(
429-
"INSERT OR IGNORE INTO todo (session_id, content, status, priority, position, time_created, time_updated) VALUES (?, ?, ?, ?, ?, ?, ?)",
430-
)
431-
const seq = dst.prepare("INSERT OR IGNORE INTO event_sequence (aggregate_id, seq) VALUES (?, ?)")
432-
const evt = dst.prepare(
433-
"INSERT OR IGNORE INTO event (id, aggregate_id, seq, type, data, origin) VALUES (?, ?, ?, ?, ?, ?)",
434-
)
435-
436-
for (const sid of ids) {
437-
for (const r of src.query("SELECT * FROM message WHERE session_id = ?").all(sid) as any[])
438-
msg.run(r.id, r.session_id, r.time_created, r.time_updated, r.data)
439-
for (const r of src.query("SELECT * FROM part WHERE session_id = ?").all(sid) as any[])
440-
part.run(r.id, r.message_id, r.session_id, r.time_created, r.time_updated, r.data)
441-
for (const r of src.query("SELECT * FROM todo WHERE session_id = ?").all(sid) as any[])
442-
todo.run(r.session_id, r.content, r.status, r.priority, r.position, r.time_created, r.time_updated)
443-
for (const r of src.query("SELECT * FROM event_sequence WHERE aggregate_id = ?").all(sid) as any[])
444-
seq.run(r.aggregate_id, r.seq)
445-
for (const r of src.query("SELECT * FROM event WHERE aggregate_id = ?").all(sid) as any[])
446-
evt.run(r.id, r.aggregate_id, r.seq, r.type, r.data, r.origin)
424+
dst.exec("BEGIN IMMEDIATE")
425+
try {
426+
if (mem) {
427+
const msg = dst.prepare(
428+
"INSERT OR IGNORE INTO message (id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?)",
429+
)
430+
for (const r of src.query(`SELECT * FROM message WHERE session_id IN (${ph})`).all(...ids) as any[])
431+
msg.run(r.id, r.session_id, r.time_created, r.time_updated, r.data)
432+
const part = dst.prepare(
433+
"INSERT OR IGNORE INTO part (id, message_id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?, ?)",
434+
)
435+
for (const r of src.query(`SELECT * FROM part WHERE session_id IN (${ph})`).all(...ids) as any[])
436+
part.run(r.id, r.message_id, r.session_id, r.time_created, r.time_updated, r.data)
437+
const todo = dst.prepare(
438+
"INSERT OR IGNORE INTO todo (session_id, content, status, priority, position, time_created, time_updated) VALUES (?, ?, ?, ?, ?, ?, ?)",
439+
)
440+
for (const r of src.query(`SELECT * FROM todo WHERE session_id IN (${ph})`).all(...ids) as any[])
441+
todo.run(r.session_id, r.content, r.status, r.priority, r.position, r.time_created, r.time_updated)
442+
const seq = dst.prepare("INSERT OR IGNORE INTO event_sequence (aggregate_id, seq) VALUES (?, ?)")
443+
for (const r of src
444+
.query(`SELECT * FROM event_sequence WHERE aggregate_id IN (${ph})`)
445+
.all(...ids) as any[])
446+
seq.run(r.aggregate_id, r.seq)
447+
const evt = dst.prepare(
448+
"INSERT OR IGNORE INTO event (id, aggregate_id, seq, type, data, origin) VALUES (?, ?, ?, ?, ?, ?)",
449+
)
450+
for (const r of src.query(`SELECT * FROM event WHERE aggregate_id IN (${ph})`).all(...ids) as any[])
451+
evt.run(r.id, r.aggregate_id, r.seq, r.type, r.data, r.origin)
452+
} else {
453+
dst
454+
.query(
455+
`INSERT OR IGNORE INTO message (id, session_id, time_created, time_updated, data) SELECT id, session_id, time_created, time_updated, data FROM global.message WHERE session_id IN (${ph})`,
456+
)
457+
.run(...ids)
458+
dst
459+
.query(
460+
`INSERT OR IGNORE INTO part (id, message_id, session_id, time_created, time_updated, data) SELECT id, message_id, session_id, time_created, time_updated, data FROM global.part WHERE session_id IN (${ph})`,
461+
)
462+
.run(...ids)
463+
dst
464+
.query(
465+
`INSERT OR IGNORE INTO todo (session_id, content, status, priority, position, time_created, time_updated) SELECT session_id, content, status, priority, position, time_created, time_updated FROM global.todo WHERE session_id IN (${ph})`,
466+
)
467+
.run(...ids)
468+
dst
469+
.query(
470+
`INSERT OR IGNORE INTO event_sequence (aggregate_id, seq) SELECT aggregate_id, seq FROM global.event_sequence WHERE aggregate_id IN (${ph})`,
471+
)
472+
.run(...ids)
473+
dst
474+
.query(
475+
`INSERT OR IGNORE INTO event (id, aggregate_id, seq, type, data, origin) SELECT id, aggregate_id, seq, type, data, origin FROM global.event WHERE aggregate_id IN (${ph})`,
476+
)
477+
.run(...ids)
478+
}
479+
dst.exec("COMMIT")
480+
dst.run(meta)
481+
for (const sid of ids) {
482+
const msg = src.query("SELECT MAX(time_created) as t FROM message WHERE session_id = ?").get(sid) as {
483+
t: number | null
484+
} | null
485+
const part = src.query("SELECT MAX(time_created) as t FROM part WHERE session_id = ?").get(sid) as {
486+
t: number | null
487+
} | null
488+
const todo = src.query("SELECT MAX(time_created) as t FROM todo WHERE session_id = ?").get(sid) as {
489+
t: number | null
490+
} | null
491+
dst
492+
.query("INSERT OR REPLACE INTO _meta (key, value) VALUES (?, ?)")
493+
.run(`swept:msg:${sid}`, String(msg?.t ?? 0))
494+
dst
495+
.query("INSERT OR REPLACE INTO _meta (key, value) VALUES (?, ?)")
496+
.run(`swept:part:${sid}`, String(part?.t ?? 0))
497+
dst
498+
.query("INSERT OR REPLACE INTO _meta (key, value) VALUES (?, ?)")
499+
.run(`swept:todo:${sid}`, String(todo?.t ?? 0))
500+
}
501+
log.info("migrate.done", { root, sessions: ids.length })
502+
} catch (err) {
503+
dst.exec("ROLLBACK")
504+
throw err
447505
}
448-
449-
dst.exec("COMMIT")
450-
dst.run(meta)
451-
const max = src.query("SELECT MAX(time_created) as t FROM message WHERE session_id = ?").get(root) as { t: number | null } | null
452-
dst.query("INSERT OR REPLACE INTO _meta (key, value) VALUES ('swept', ?)").run(String(max?.t ?? 0))
453-
log.info("migrate.done", { root, sessions: ids.length })
454-
} catch (err) {
455-
dst.exec("ROLLBACK")
456-
throw err
506+
} finally {
507+
if (!mem) dst.exec("DETACH DATABASE global")
457508
}
458509
} finally {
459510
migrating.delete(root)
@@ -463,13 +514,34 @@ export namespace Database {
463514
export function ensureShard(id: string): string | undefined {
464515
const root = sessionRoot(id)
465516
if (root) {
466-
if (!swept.has(root)) {
467-
swept.add(root)
517+
if (!swept.has(id)) {
518+
swept.add(id)
468519
const db = session(root).$client
469520
db.run(meta)
470-
const marker = db.query("SELECT value FROM _meta WHERE key = 'swept'").get() as { value: string } | null
471-
const max = Client().$client.query("SELECT MAX(time_created) as t FROM message WHERE session_id = ? LIMIT 1").get(id) as { t: number | null } | null
472-
if (!marker || String(max?.t ?? 0) !== marker.value) seed(root)
521+
const msg = db.query("SELECT value FROM _meta WHERE key = ?").get(`swept:msg:${id}`) as { value: string } | null
522+
const part = db.query("SELECT value FROM _meta WHERE key = ?").get(`swept:part:${id}`) as {
523+
value: string
524+
} | null
525+
const todo = db.query("SELECT value FROM _meta WHERE key = ?").get(`swept:todo:${id}`) as {
526+
value: string
527+
} | null
528+
const gmsg = Client()
529+
.$client.query("SELECT MAX(time_created) as t FROM message WHERE session_id = ?")
530+
.get(id) as { t: number | null } | null
531+
const gpart = Client()
532+
.$client.query("SELECT MAX(time_created) as t FROM part WHERE session_id = ?")
533+
.get(id) as { t: number | null } | null
534+
const gtodo = Client()
535+
.$client.query("SELECT MAX(time_created) as t FROM todo WHERE session_id = ?")
536+
.get(id) as { t: number | null } | null
537+
const stale =
538+
!msg ||
539+
String(gmsg?.t ?? 0) !== msg.value ||
540+
!part ||
541+
String(gpart?.t ?? 0) !== part.value ||
542+
!todo ||
543+
String(gtodo?.t ?? 0) !== todo.value
544+
if (stale) seed(root)
473545
}
474546
return root
475547
}
@@ -501,6 +573,7 @@ export namespace Database {
501573
export function close() {
502574
for (const item of cache.values()) item.db.$client.close()
503575
cache.clear()
576+
swept.clear()
504577
Client().$client.close()
505578
Client.reset()
506579
}

packages/opencode/test/server/session-sharding.test.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ async function msg(sid: SessionID, text: string) {
6363
agent: "test",
6464
model: { providerID: ProviderID.make("test"), modelID: ModelID.make("test") },
6565
tools: {},
66-
mode: "",
6766
} satisfies MessageV2.User)
6867
await Session.updatePart({
6968
id: PartID.ascending(),
@@ -199,7 +198,7 @@ describe("sharding via serve API", () => {
199198
})
200199

201200
describe("getPart shard routing", () => {
202-
test("tool parts written to shard are readable via getPart", async () => {
201+
test("resolveSession routes to shard for tool parts", async () => {
203202
await using tmp = await tmpdir({ git: true })
204203
await Instance.provide({
205204
directory: tmp.path,
@@ -220,7 +219,6 @@ describe("getPart shard routing", () => {
220219
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
221220
time: { created: Date.now() },
222221
agent: "test",
223-
tools: {},
224222
mode: "",
225223
} satisfies MessageV2.Assistant)
226224

0 commit comments

Comments
 (0)