Skip to content

Commit 6ed85f4

Browse files
committed
fix(session): shard-aware getPart + sweep marker to skip redundant seed
1 parent 9c04c93 commit 6ed85f4

3 files changed

Lines changed: 148 additions & 14 deletions

File tree

packages/opencode/src/session/index.ts

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -476,19 +476,17 @@ export namespace Session {
476476
}).pipe(Effect.withSpan("Session.updatePart"))
477477

478478
const getPart: Interface["getPart"] = Effect.fn("Session.getPart")(function* (input) {
479-
const row = Database.use((db) =>
480-
db
481-
.select()
482-
.from(PartTable)
483-
.where(
484-
and(
485-
eq(PartTable.session_id, input.sessionID),
486-
eq(PartTable.message_id, input.messageID),
487-
eq(PartTable.id, input.partID),
488-
),
489-
)
490-
.get(),
491-
)
479+
const row = Database.resolveSession(input.sessionID)
480+
.select()
481+
.from(PartTable)
482+
.where(
483+
and(
484+
eq(PartTable.session_id, input.sessionID),
485+
eq(PartTable.message_id, input.messageID),
486+
eq(PartTable.id, input.partID),
487+
),
488+
)
489+
.get()
492490
if (!row) return
493491
return {
494492
...row.data,

packages/opencode/src/storage/db.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ export namespace Database {
6969

7070
const cache = new Map<string, Entry>()
7171
const migrating = new Set<string>()
72+
const swept = new Set<string>()
7273

7374
const schema = [
7475
`CREATE TABLE IF NOT EXISTS message (
@@ -113,6 +114,7 @@ export namespace Database {
113114
origin TEXT
114115
)`,
115116
]
117+
const meta = `CREATE TABLE IF NOT EXISTS _meta (key TEXT PRIMARY KEY, value TEXT)`
116118

117119
const tables = ["message", "part", "todo", "event_sequence", "event"]
118120

@@ -325,6 +327,7 @@ export namespace Database {
325327

326328
if (fresh) {
327329
for (const sql of schema) db.run(sql)
330+
db.run(meta)
328331
}
329332

330333
cache.set(id, { db, at: Date.now() })
@@ -444,6 +447,9 @@ export namespace Database {
444447
}
445448

446449
dst.exec("COMMIT")
450+
dst.run(meta)
451+
const max = src.query("SELECT MAX(time_created) as t FROM message WHERE session_id = ?").get(root) as { t: number | null } | null
452+
dst.query("INSERT OR REPLACE INTO _meta (key, value) VALUES ('swept', ?)").run(String(max?.t ?? 0))
447453
log.info("migrate.done", { root, sessions: ids.length })
448454
} catch (err) {
449455
dst.exec("ROLLBACK")
@@ -456,7 +462,17 @@ export namespace Database {
456462

457463
export function ensureShard(id: string): string | undefined {
458464
const root = sessionRoot(id)
459-
if (root) return root
465+
if (root) {
466+
if (!swept.has(root)) {
467+
swept.add(root)
468+
const db = session(root).$client
469+
db.run(meta)
470+
const marker = db.query("SELECT value FROM _meta WHERE key = 'swept'").get() as { value: string } | null
471+
const max = Client().$client.query("SELECT MAX(time_created) as t FROM message WHERE session_id = ? LIMIT 1").get(id) as { t: number | null } | null
472+
if (!marker || String(max?.t ?? 0) !== marker.value) seed(root)
473+
}
474+
return root
475+
}
460476

461477
const target = findRoot(id)
462478
if (!target) return undefined
@@ -478,6 +494,10 @@ export namespace Database {
478494
cache.delete(id)
479495
}
480496

497+
export function resetSwept() {
498+
swept.clear()
499+
}
500+
481501
export function close() {
482502
for (const item of cache.values()) item.db.$client.close()
483503
cache.clear()

packages/opencode/test/session/resolve-routing.test.ts

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,3 +482,119 @@ describe("resolve() routing", () => {
482482
})
483483
})
484484
})
485+
486+
describe("sweep marker", () => {
487+
test("seed writes marker, second ensureShard skips seed", async () => {
488+
await scope(async () => {
489+
const session = await Session.create({})
490+
const sid = session.id
491+
492+
// Write a message via Sync → goes to shard
493+
await msg(sid, "shard-msg")
494+
495+
// Simulate orphan: insert directly into global
496+
const src = Database.Client().$client
497+
const now = Date.now()
498+
src.query("INSERT INTO message (id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?)").run(
499+
MessageID.ascending(), sid, now, now, JSON.stringify({ role: "user", text: "orphan" }),
500+
)
501+
502+
// First ensureShard: no marker → should seed
503+
Database.closeSession(sid)
504+
const db1 = Database.ensureShard(sid)
505+
expect(db1).toBe(sid)
506+
507+
// Verify marker was written
508+
const shard = Database.session(sid).$client
509+
const marker = shard.query("SELECT value FROM _meta WHERE key = 'swept'").get() as { value: string } | null
510+
expect(marker).not.toBeNull()
511+
expect(Number(marker!.value)).toBeGreaterThan(0)
512+
513+
// Close and reopen to reset in-memory swept set
514+
Database.closeSession(sid)
515+
Database.resetSwept()
516+
517+
// Second ensureShard: marker matches global MAX → should skip seed
518+
const before = performance.now()
519+
const db2 = Database.ensureShard(sid)
520+
const elapsed = performance.now() - before
521+
expect(db2).toBe(sid)
522+
// Should be fast (< 50ms) because seed was skipped
523+
expect(elapsed).toBeLessThan(50)
524+
})
525+
})
526+
527+
test("new orphan after sweep triggers re-seed", async () => {
528+
await scope(async () => {
529+
const session = await Session.create({})
530+
const sid = session.id
531+
532+
// Write initial message and trigger first sweep
533+
await msg(sid, "initial")
534+
const src = Database.Client().$client
535+
const now = Date.now()
536+
src.query("INSERT INTO message (id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?)").run(
537+
MessageID.ascending(), sid, now, now, JSON.stringify({ role: "user", text: "orphan-1" }),
538+
)
539+
540+
Database.closeSession(sid)
541+
Database.ensureShard(sid)
542+
543+
// Verify orphan was copied to shard
544+
const shard1 = Database.session(sid)
545+
const count1 = shard1
546+
.select()
547+
.from(MessageTable)
548+
.where(eq(MessageTable.session_id, sid))
549+
.all().length
550+
expect(count1).toBe(2) // shard-msg + orphan-1
551+
552+
// Now add ANOTHER orphan with a newer timestamp
553+
const later = now + 10000
554+
src.query("INSERT INTO message (id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?)").run(
555+
MessageID.ascending(), sid, later, later, JSON.stringify({ role: "user", text: "orphan-2" }),
556+
)
557+
558+
// Reset swept set to simulate process restart
559+
Database.closeSession(sid)
560+
Database.resetSwept()
561+
562+
// ensureShard should detect stale marker and re-seed
563+
Database.ensureShard(sid)
564+
565+
const shard2 = Database.session(sid)
566+
const count2 = shard2
567+
.select()
568+
.from(MessageTable)
569+
.where(eq(MessageTable.session_id, sid))
570+
.all().length
571+
expect(count2).toBe(3) // shard-msg + orphan-1 + orphan-2
572+
})
573+
})
574+
575+
test("marker survives shard cache eviction", async () => {
576+
await scope(async () => {
577+
const session = await Session.create({})
578+
const sid = session.id
579+
580+
// Plant orphan and sweep
581+
const src = Database.Client().$client
582+
const now = Date.now()
583+
src.query("INSERT INTO message (id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?)").run(
584+
MessageID.ascending(), sid, now, now, JSON.stringify({ role: "user", text: "orphan" }),
585+
)
586+
587+
Database.ensureShard(sid)
588+
589+
// Close the shard (simulates cache eviction)
590+
Database.closeSession(sid)
591+
Database.resetSwept()
592+
593+
// Reopen — marker should still be in the sqlite file
594+
const shard = Database.session(sid).$client
595+
const marker = shard.query("SELECT value FROM _meta WHERE key = 'swept'").get() as { value: string } | null
596+
expect(marker).not.toBeNull()
597+
expect(Number(marker!.value)).toBe(now)
598+
})
599+
})
600+
})

0 commit comments

Comments
 (0)