Skip to content

Commit a2c0fc1

Browse files
committed
feat(session): shard-aware todo reads and update export
1 parent 8ba4799 commit a2c0fc1

18 files changed

Lines changed: 3004 additions & 188 deletions

packages/opencode/src/effect/runner.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ export namespace Runner {
184184
case "ShellThenRun":
185185
return [
186186
Effect.gen(function* () {
187-
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
188187
yield* stopShell(st.shell)
188+
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
189189
yield* idleIfCurrent()
190190
}),
191191
{ _tag: "Idle" } as const,

packages/opencode/src/project/project.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ export namespace Project {
133133
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
134134
Effect.sync(() => Database.use(fn))
135135

136+
type Sync<T> = T extends Promise<any> ? never : T
137+
138+
const tx = <T>(
139+
fn: (d: Parameters<typeof Database.transaction>[0] extends (trx: infer D) => any ? D : never) => Sync<T>,
140+
) => Effect.sync(() => Database.transaction(fn, { behavior: "immediate" }))
141+
136142
const emitUpdated = (data: Info) =>
137143
Effect.sync(() =>
138144
GlobalBus.emit("event", {
@@ -246,7 +252,7 @@ export namespace Project {
246252
})
247253

248254
// Phase 2: upsert
249-
const row = yield* db((d) => d.select().from(ProjectTable).where(eq(ProjectTable.id, data.id)).get())
255+
const row = yield* tx((d) => d.select().from(ProjectTable).where(eq(ProjectTable.id, data.id)).get())
250256
const existing = row
251257
? fromRow(row)
252258
: {
@@ -278,9 +284,8 @@ export namespace Project {
278284
{ concurrency: "unbounded" },
279285
).pipe(Effect.map((arr) => arr.filter((x): x is string => x !== undefined)))
280286

281-
yield* db((d) =>
282-
d
283-
.insert(ProjectTable)
287+
yield* tx((d) => {
288+
d.insert(ProjectTable)
284289
.values({
285290
id: result.id,
286291
worktree: result.worktree,
@@ -308,18 +313,15 @@ export namespace Project {
308313
commands: result.commands,
309314
},
310315
})
311-
.run(),
312-
)
316+
.run()
313317

314-
if (data.id !== ProjectID.global) {
315-
yield* db((d) =>
316-
d
317-
.update(SessionTable)
318-
.set({ project_id: data.id })
319-
.where(and(eq(SessionTable.project_id, ProjectID.global), eq(SessionTable.directory, data.worktree)))
320-
.run(),
321-
)
322-
}
318+
if (data.id === ProjectID.global) return
319+
320+
d.update(SessionTable)
321+
.set({ project_id: data.id })
322+
.where(and(eq(SessionTable.project_id, ProjectID.global), eq(SessionTable.directory, data.worktree)))
323+
.run()
324+
})
323325

324326
yield* emitUpdated(result)
325327
return { project: result, sandbox: data.sandbox }

packages/opencode/src/session/index.ts

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { Snapshot } from "@/snapshot"
2323
import { ProjectID } from "../project/schema"
2424
import { WorkspaceID } from "../control-plane/schema"
2525
import { SessionID, MessageID, PartID } from "./schema"
26+
import { makeRuntime } from "@/effect/run-service"
2627

2728
import type { Provider } from "@/provider/provider"
2829
import { Permission } from "@/permission"
@@ -424,6 +425,14 @@ export namespace Session {
424425

425426
yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
426427

428+
if (!input.parentID) {
429+
try {
430+
Database.session(result.id)
431+
} catch (e) {
432+
log.error("failed to create per-tree db", { error: e })
433+
}
434+
}
435+
427436
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
428437
// This only exist for backwards compatibility. We should not be
429438
// manually publishing this event; it is a sync event now
@@ -498,19 +507,17 @@ export namespace Session {
498507
}).pipe(Effect.withSpan("Session.updatePart"))
499508

500509
const getPart: Interface["getPart"] = Effect.fn("Session.getPart")(function* (input) {
501-
const row = Database.use((db) =>
502-
db
503-
.select()
504-
.from(PartTable)
505-
.where(
506-
and(
507-
eq(PartTable.session_id, input.sessionID),
508-
eq(PartTable.message_id, input.messageID),
509-
eq(PartTable.id, input.partID),
510-
),
511-
)
512-
.get(),
513-
)
510+
const row = Database.resolveSession(input.sessionID)
511+
.select()
512+
.from(PartTable)
513+
.where(
514+
and(
515+
eq(PartTable.session_id, input.sessionID),
516+
eq(PartTable.message_id, input.messageID),
517+
eq(PartTable.id, input.partID),
518+
),
519+
)
520+
.get()
514521
if (!row) return
515522
return {
516523
...row.data,
@@ -704,6 +711,24 @@ export namespace Session {
704711

705712
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer))
706713

714+
const { runPromise } = makeRuntime(Service, defaultLayer)
715+
716+
export async function create(input?: CreateInput) {
717+
return runPromise((svc) => svc.create(input))
718+
}
719+
720+
export async function remove(sessionID: SessionID) {
721+
return runPromise((svc) => svc.remove(sessionID))
722+
}
723+
724+
export async function updateMessage<T extends MessageV2.Info>(msg: T) {
725+
return runPromise((svc) => svc.updateMessage(msg))
726+
}
727+
728+
export async function updatePart<T extends MessageV2.Part>(part: T) {
729+
return runPromise((svc) => svc.updatePart(part))
730+
}
731+
707732
export function* list(input?: {
708733
directory?: string
709734
workspaceID?: WorkspaceID

packages/opencode/src/session/message-v2.ts

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -550,18 +550,20 @@ export namespace MessageV2 {
550550
and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)),
551551
)
552552

553-
function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
553+
function resolve(sid: SessionID) {
554+
return Database.resolveSession(sid)
555+
}
556+
557+
function hydrate(db: Database.TxOrDb, rows: (typeof MessageTable.$inferSelect)[]) {
554558
const ids = rows.map((row) => row.id)
555559
const partByMessage = new Map<string, MessageV2.Part[]>()
556560
if (ids.length > 0) {
557-
const partRows = Database.use((db) =>
558-
db
559-
.select()
560-
.from(PartTable)
561-
.where(inArray(PartTable.message_id, ids))
562-
.orderBy(PartTable.message_id, PartTable.id)
563-
.all(),
564-
)
561+
const partRows = db
562+
.select()
563+
.from(PartTable)
564+
.where(inArray(PartTable.message_id, ids))
565+
.orderBy(PartTable.message_id, PartTable.id)
566+
.all()
565567
for (const row of partRows) {
566568
const next = part(row)
567569
const list = partByMessage.get(row.message_id)
@@ -850,15 +852,14 @@ export namespace MessageV2 {
850852
const where = before
851853
? and(eq(MessageTable.session_id, input.sessionID), older(before))
852854
: eq(MessageTable.session_id, input.sessionID)
853-
const rows = Database.use((db) =>
854-
db
855-
.select()
856-
.from(MessageTable)
857-
.where(where)
858-
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
859-
.limit(input.limit + 1)
860-
.all(),
861-
)
855+
const db = resolve(input.sessionID)
856+
const rows = db
857+
.select()
858+
.from(MessageTable)
859+
.where(where)
860+
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
861+
.limit(input.limit + 1)
862+
.all()
862863
if (rows.length === 0) {
863864
const row = Database.use((db) =>
864865
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
@@ -872,7 +873,7 @@ export namespace MessageV2 {
872873

873874
const more = rows.length > input.limit
874875
const slice = more ? rows.slice(0, input.limit) : rows
875-
const items = hydrate(slice)
876+
const items = hydrate(db, slice)
876877
items.reverse()
877878
const tail = slice.at(-1)
878879
return {
@@ -896,33 +897,28 @@ export namespace MessageV2 {
896897
}
897898
}
898899

899-
export function parts(message_id: MessageID) {
900-
const rows = Database.use((db) =>
901-
db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
902-
)
903-
return rows.map(
904-
(row) =>
905-
({
906-
...row.data,
907-
id: row.id,
908-
sessionID: row.session_id,
909-
messageID: row.message_id,
910-
}) as MessageV2.Part,
911-
)
900+
export function parts(mid: MessageID, sid?: SessionID) {
901+
const db = sid ? resolve(sid) : Database.Client()
902+
return db.select().from(PartTable).where(eq(PartTable.message_id, mid)).orderBy(PartTable.id).all().map(part)
912903
}
913904

914905
export function get(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
915-
const row = Database.use((db) =>
916-
db
917-
.select()
918-
.from(MessageTable)
919-
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
920-
.get(),
921-
)
906+
const db = resolve(input.sessionID)
907+
const row = db
908+
.select()
909+
.from(MessageTable)
910+
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
911+
.get()
922912
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
923913
return {
924914
info: info(row),
925-
parts: parts(input.messageID),
915+
parts: db
916+
.select()
917+
.from(PartTable)
918+
.where(eq(PartTable.message_id, input.messageID))
919+
.orderBy(PartTable.id)
920+
.all()
921+
.map(part),
926922
}
927923
}
928924

@@ -944,9 +940,7 @@ export namespace MessageV2 {
944940
return result
945941
}
946942

947-
export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
948-
return filterCompacted(stream(sessionID))
949-
})
943+
export const filterCompactedEffect = (sessionID: SessionID) => Effect.succeed(filterCompacted(stream(sessionID)))
950944

951945
export function fromError(
952946
e: unknown,
@@ -998,7 +992,7 @@ export namespace MessageV2 {
998992
},
999993
{ cause: e },
1000994
).toObject()
1001-
case APICallError.isInstance(e):
995+
case APICallError.isInstance(e): {
1002996
const parsed = ProviderError.parseAPICallError({
1003997
providerID: ctx.providerID,
1004998
error: e,
@@ -1024,6 +1018,7 @@ export namespace MessageV2 {
10241018
},
10251019
{ cause: e },
10261020
).toObject()
1021+
}
10271022
case e instanceof Error:
10281023
return new NamedError.Unknown({ message: errorMessage(e) }, { cause: e }).toObject()
10291024
default:

packages/opencode/src/session/todo.ts

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { SessionID } from "./schema"
44
import { Effect, Layer, Context } from "effect"
55
import z from "zod"
66
import { Database, eq, asc } from "../storage/db"
7+
import { makeRuntime } from "@/effect/run-service"
78
import { TodoTable } from "./session.sql"
89

910
export namespace Todo {
@@ -39,11 +40,12 @@ export namespace Todo {
3940
const bus = yield* Bus.Service
4041

4142
const update = Effect.fn("Todo.update")(function* (input: { sessionID: SessionID; todos: Info[] }) {
42-
yield* Effect.sync(() =>
43-
Database.transaction((db) => {
44-
db.delete(TodoTable).where(eq(TodoTable.session_id, input.sessionID)).run()
43+
yield* Effect.sync(() => {
44+
const db = Database.resolveSession(input.sessionID)
45+
db.transaction((tx) => {
46+
tx.delete(TodoTable).where(eq(TodoTable.session_id, input.sessionID)).run()
4547
if (input.todos.length === 0) return
46-
db.insert(TodoTable)
48+
tx.insert(TodoTable)
4749
.values(
4850
input.todos.map((todo, position) => ({
4951
session_id: input.sessionID,
@@ -54,22 +56,21 @@ export namespace Todo {
5456
})),
5557
)
5658
.run()
57-
}),
58-
)
59+
})
60+
})
5961
yield* bus.publish(Event.Updated, input)
6062
})
6163

6264
const get = Effect.fn("Todo.get")(function* (sessionID: SessionID) {
63-
const rows = yield* Effect.sync(() =>
64-
Database.use((db) =>
65-
db
66-
.select()
67-
.from(TodoTable)
68-
.where(eq(TodoTable.session_id, sessionID))
69-
.orderBy(asc(TodoTable.position))
70-
.all(),
71-
),
72-
)
65+
const rows = yield* Effect.sync(() => {
66+
const db = Database.resolveSession(sessionID)
67+
return db
68+
.select()
69+
.from(TodoTable)
70+
.where(eq(TodoTable.session_id, sessionID))
71+
.orderBy(asc(TodoTable.position))
72+
.all()
73+
})
7374
return rows.map((row) => ({
7475
content: row.content,
7576
status: row.status,
@@ -82,4 +83,14 @@ export namespace Todo {
8283
)
8384

8485
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
86+
87+
const { runPromise } = makeRuntime(Service, defaultLayer)
88+
89+
export async function get(sessionID: SessionID) {
90+
return runPromise((svc) => svc.get(sessionID))
91+
}
92+
93+
export async function update(input: { sessionID: SessionID; todos: Info[] }) {
94+
return runPromise((svc) => svc.update(input))
95+
}
8596
}

0 commit comments

Comments
 (0)