Skip to content

Commit 96df768

Browse files
committed
feat(session): harden SQLite sharding with retries, monitoring, and load tests
1 parent cd8e8a9 commit 96df768

20 files changed

Lines changed: 1926 additions & 221 deletions

packages/opencode/src/effect/runner.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ export namespace Runner {
192192
case "ShellThenRun":
193193
return [
194194
Effect.gen(function* () {
195-
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
196195
yield* stopShell(st.shell)
196+
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
197197
yield* idleIfCurrent()
198198
}),
199199
{ _tag: "Idle" } as const,

packages/opencode/src/project/project.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ export namespace Project {
134134
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
135135
Effect.sync(() => Database.use(fn))
136136

137+
type Sync<T> = T extends Promise<any> ? never : T
138+
139+
const tx = <T>(
140+
fn: (d: Parameters<typeof Database.transaction>[0] extends (trx: infer D) => any ? D : never) => Sync<T>,
141+
) => Effect.sync(() => Database.transaction(fn, { behavior: "immediate" }))
142+
137143
const emitUpdated = (data: Info) =>
138144
Effect.sync(() =>
139145
GlobalBus.emit("event", {
@@ -245,7 +251,7 @@ export namespace Project {
245251
})
246252

247253
// Phase 2: upsert
248-
const row = yield* db((d) => d.select().from(ProjectTable).where(eq(ProjectTable.id, data.id)).get())
254+
const row = yield* tx((d) => d.select().from(ProjectTable).where(eq(ProjectTable.id, data.id)).get())
249255
const existing = row
250256
? fromRow(row)
251257
: {
@@ -277,9 +283,8 @@ export namespace Project {
277283
{ concurrency: "unbounded" },
278284
).pipe(Effect.map((arr) => arr.filter((x): x is string => x !== undefined)))
279285

280-
yield* db((d) =>
281-
d
282-
.insert(ProjectTable)
286+
yield* tx((d) => {
287+
d.insert(ProjectTable)
283288
.values({
284289
id: result.id,
285290
worktree: result.worktree,
@@ -307,18 +312,15 @@ export namespace Project {
307312
commands: result.commands,
308313
},
309314
})
310-
.run(),
311-
)
315+
.run()
312316

313-
if (data.id !== ProjectID.global) {
314-
yield* db((d) =>
315-
d
316-
.update(SessionTable)
317-
.set({ project_id: data.id })
318-
.where(and(eq(SessionTable.project_id, ProjectID.global), eq(SessionTable.directory, data.worktree)))
319-
.run(),
320-
)
321-
}
317+
if (data.id === ProjectID.global) return
318+
319+
d.update(SessionTable)
320+
.set({ project_id: data.id })
321+
.where(and(eq(SessionTable.project_id, ProjectID.global), eq(SessionTable.directory, data.worktree)))
322+
.run()
323+
})
322324

323325
yield* emitUpdated(result)
324326
return { project: result, sandbox: data.sandbox }

packages/opencode/src/session/index.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,14 @@ export namespace Session {
403403

404404
yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
405405

406+
if (!input.parentID) {
407+
try {
408+
Database.session(result.id)
409+
} catch (e) {
410+
log.error("failed to create per-tree db", { error: e })
411+
}
412+
}
413+
406414
const cfg = yield* config.get()
407415
if (!result.parentID && (Flag.OPENCODE_AUTO_SHARE || cfg.share === "auto")) {
408416
yield* share(result.id).pipe(Effect.ignore, Effect.forkIn(scope))
@@ -592,12 +600,13 @@ export namespace Session {
592600
)
593601
})
594602

595-
const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) {
596-
if (input.limit) {
597-
return MessageV2.page({ sessionID: input.sessionID, limit: input.limit }).items
598-
}
599-
return Array.from(MessageV2.stream(input.sessionID)).reverse()
600-
})
603+
const messages = Effect.fn("Session.messages")((input: { sessionID: SessionID; limit?: number }) =>
604+
Effect.succeed(
605+
input.limit
606+
? MessageV2.page({ sessionID: input.sessionID, limit: input.limit }).items
607+
: Array.from(MessageV2.stream(input.sessionID)).reverse(),
608+
),
609+
)
601610

602611
const removeMessage = Effect.fn("Session.removeMessage")(function* (input: {
603612
sessionID: SessionID

packages/opencode/src/session/llm.ts

Lines changed: 34 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -235,13 +235,45 @@ export namespace LLM {
235235
// and results sent back over the WebSocket.
236236
if (language instanceof GitLabWorkflowLanguageModel) {
237237
const workflowModel = language
238-
workflowModel.sessionID = input.sessionID
239238
workflowModel.systemPrompt = system.join("\n")
240-
workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
239+
240+
const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
241+
const preapprovedTools = Object.keys(tools).filter((name) => {
242+
const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
243+
return !match || match.action !== "ask"
244+
})
245+
const approvedToolsForSession = new Set<string>()
246+
workflowModel.toolExecutor = async (toolName: string, argsJson: string, _requestID: string) => {
241247
const t = tools[toolName]
242248
if (!t || !t.execute) {
243249
return { result: "", error: `Unknown tool: ${toolName}` }
244250
}
251+
252+
if (!preapprovedTools.includes(toolName) && !approvedToolsForSession.has(toolName)) {
253+
const id = PermissionID.ascending()
254+
let unsub: (() => void) | undefined
255+
try {
256+
const parsed = JSON.parse(argsJson) as Record<string, unknown>
257+
const title = (parsed?.title ?? parsed?.name ?? "") as string
258+
const pattern = title ? `${toolName}: ${title}` : toolName
259+
unsub = Bus.subscribe(Permission.Event.Replied, (_evt) => {})
260+
await Permission.ask({
261+
id,
262+
sessionID: SessionID.make(input.sessionID),
263+
permission: "workflow_tool_approval",
264+
patterns: [pattern],
265+
metadata: { tools: [{ name: toolName, args: argsJson }] },
266+
always: [pattern],
267+
ruleset: [],
268+
})
269+
approvedToolsForSession.add(toolName)
270+
} catch {
271+
return { result: "", error: `Tool approval rejected: ${toolName}` }
272+
} finally {
273+
unsub?.()
274+
}
275+
}
276+
245277
try {
246278
const result = await t.execute!(JSON.parse(argsJson), {
247279
toolCallId: _requestID,
@@ -258,57 +290,6 @@ export namespace LLM {
258290
return { result: "", error: e.message ?? String(e) }
259291
}
260292
}
261-
262-
const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
263-
workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
264-
const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
265-
return !match || match.action !== "ask"
266-
})
267-
268-
const approvedToolsForSession = new Set<string>()
269-
workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
270-
const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
271-
// Auto-approve tools that were already approved in this session
272-
// (prevents infinite approval loops for server-side MCP tools)
273-
if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
274-
return { approved: true }
275-
}
276-
277-
const id = PermissionID.ascending()
278-
let reply: Permission.Reply | undefined
279-
let unsub: (() => void) | undefined
280-
try {
281-
unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
282-
if (evt.properties.requestID === id) reply = evt.properties.reply
283-
})
284-
const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
285-
try {
286-
const parsed = JSON.parse(t.args) as Record<string, unknown>
287-
const title = (parsed?.title ?? parsed?.name ?? "") as string
288-
return title ? `${t.name}: ${title}` : t.name
289-
} catch {
290-
return t.name
291-
}
292-
})
293-
const uniquePatterns = [...new Set(toolPatterns)] as string[]
294-
await Permission.ask({
295-
id,
296-
sessionID: SessionID.make(input.sessionID),
297-
permission: "workflow_tool_approval",
298-
patterns: uniquePatterns,
299-
metadata: { tools: approvalTools },
300-
always: uniquePatterns,
301-
ruleset: [],
302-
})
303-
for (const name of uniqueNames) approvedToolsForSession.add(name)
304-
workflowModel.sessionPreapprovedTools = [...workflowModel.sessionPreapprovedTools, ...uniqueNames]
305-
return { approved: true }
306-
} catch {
307-
return { approved: false }
308-
} finally {
309-
unsub?.()
310-
}
311-
})
312293
}
313294

314295
return streamText({

packages/opencode/src/session/message-v2.ts

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -547,18 +547,20 @@ export namespace MessageV2 {
547547
and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)),
548548
)
549549

550-
function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
550+
function resolve(sid: SessionID) {
551+
return Database.resolveSession(sid)
552+
}
553+
554+
function hydrate(db: Database.TxOrDb, rows: (typeof MessageTable.$inferSelect)[]) {
551555
const ids = rows.map((row) => row.id)
552556
const partByMessage = new Map<string, MessageV2.Part[]>()
553557
if (ids.length > 0) {
554-
const partRows = Database.use((db) =>
555-
db
556-
.select()
557-
.from(PartTable)
558-
.where(inArray(PartTable.message_id, ids))
559-
.orderBy(PartTable.message_id, PartTable.id)
560-
.all(),
561-
)
558+
const partRows = db
559+
.select()
560+
.from(PartTable)
561+
.where(inArray(PartTable.message_id, ids))
562+
.orderBy(PartTable.message_id, PartTable.id)
563+
.all()
562564
for (const row of partRows) {
563565
const next = part(row)
564566
const list = partByMessage.get(row.message_id)
@@ -831,15 +833,14 @@ export namespace MessageV2 {
831833
const where = before
832834
? and(eq(MessageTable.session_id, input.sessionID), older(before))
833835
: eq(MessageTable.session_id, input.sessionID)
834-
const rows = Database.use((db) =>
835-
db
836-
.select()
837-
.from(MessageTable)
838-
.where(where)
839-
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
840-
.limit(input.limit + 1)
841-
.all(),
842-
)
836+
const db = resolve(input.sessionID)
837+
const rows = db
838+
.select()
839+
.from(MessageTable)
840+
.where(where)
841+
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
842+
.limit(input.limit + 1)
843+
.all()
843844
if (rows.length === 0) {
844845
const row = Database.use((db) =>
845846
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
@@ -853,7 +854,7 @@ export namespace MessageV2 {
853854

854855
const more = rows.length > input.limit
855856
const slice = more ? rows.slice(0, input.limit) : rows
856-
const items = hydrate(slice)
857+
const items = hydrate(db, slice)
857858
items.reverse()
858859
const tail = slice.at(-1)
859860
return {
@@ -877,33 +878,28 @@ export namespace MessageV2 {
877878
}
878879
}
879880

880-
export function parts(message_id: MessageID) {
881-
const rows = Database.use((db) =>
882-
db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
883-
)
884-
return rows.map(
885-
(row) =>
886-
({
887-
...row.data,
888-
id: row.id,
889-
sessionID: row.session_id,
890-
messageID: row.message_id,
891-
}) as MessageV2.Part,
892-
)
881+
export function parts(mid: MessageID, sid?: SessionID) {
882+
const db = sid ? resolve(sid) : Database.Client()
883+
return db.select().from(PartTable).where(eq(PartTable.message_id, mid)).orderBy(PartTable.id).all().map(part)
893884
}
894885

895886
export function get(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
896-
const row = Database.use((db) =>
897-
db
898-
.select()
899-
.from(MessageTable)
900-
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
901-
.get(),
902-
)
887+
const db = resolve(input.sessionID)
888+
const row = db
889+
.select()
890+
.from(MessageTable)
891+
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
892+
.get()
903893
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
904894
return {
905895
info: info(row),
906-
parts: parts(input.messageID),
896+
parts: db
897+
.select()
898+
.from(PartTable)
899+
.where(eq(PartTable.message_id, input.messageID))
900+
.orderBy(PartTable.id)
901+
.all()
902+
.map(part),
907903
}
908904
}
909905

@@ -925,9 +921,7 @@ export namespace MessageV2 {
925921
return result
926922
}
927923

928-
export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
929-
return filterCompacted(stream(sessionID))
930-
})
924+
export const filterCompactedEffect = (sessionID: SessionID) => Effect.succeed(filterCompacted(stream(sessionID)))
931925

932926
export function fromError(
933927
e: unknown,
@@ -979,7 +973,7 @@ export namespace MessageV2 {
979973
},
980974
{ cause: e },
981975
).toObject()
982-
case APICallError.isInstance(e):
976+
case APICallError.isInstance(e): {
983977
const parsed = ProviderError.parseAPICallError({
984978
providerID: ctx.providerID,
985979
error: e,
@@ -1005,6 +999,7 @@ export namespace MessageV2 {
1005999
},
10061000
{ cause: e },
10071001
).toObject()
1002+
}
10081003
case e instanceof Error:
10091004
return new NamedError.Unknown({ message: errorMessage(e) }, { cause: e }).toObject()
10101005
default:

packages/opencode/src/session/processor.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export namespace SessionProcessor {
3030
export interface Handle {
3131
readonly message: MessageV2.Assistant
3232
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
33+
readonly abort: Effect.Effect<void>
3334
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
3435
}
3536

@@ -186,7 +187,7 @@ export namespace SessionProcessor {
186187
: value.providerMetadata,
187188
} satisfies MessageV2.ToolPart)
188189

189-
const parts = MessageV2.parts(ctx.assistantMessage.id)
190+
const parts = MessageV2.parts(ctx.assistantMessage.id, ctx.assistantMessage.sessionID)
190191
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
191192

192193
if (
@@ -398,7 +399,7 @@ export namespace SessionProcessor {
398399
}
399400
ctx.reasoningMap = {}
400401

401-
const parts = MessageV2.parts(ctx.assistantMessage.id)
402+
const parts = MessageV2.parts(ctx.assistantMessage.id, ctx.assistantMessage.sessionID)
402403
for (const part of parts) {
403404
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
404405
yield* session.updatePart({
@@ -489,6 +490,7 @@ export namespace SessionProcessor {
489490
partFromToolCall(toolCallID: string) {
490491
return ctx.toolcalls[toolCallID]
491492
},
493+
abort: cleanup(),
492494
process,
493495
} satisfies Handle
494496
})

0 commit comments

Comments
 (0)