Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/reconnect-wake-stream-after-sleep.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-ax/agents-runtime': patch
---

Reconnect pull-wake streams after the runner resumes from sleep or a long event-loop stall so desktop agents keep receiving wake notifications.
25 changes: 25 additions & 0 deletions packages/agents-runtime/src/pull-wake-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ export interface PullWakeRunnerConfig {
wakeStreamPath?: string
heartbeatIntervalMs?: number
eventHeartbeatThrottleMs?: number
/**
* If the heartbeat timer fires after a wall-clock gap larger than this,
* assume the process resumed from sleep or a long event-loop stall and force
* the long-lived wake stream to reconnect. Defaults to the runner lease.
*/
resumeGapResetMs?: number
leaseMs?: number
heartbeatPath?: string
claimPath?: string
Expand Down Expand Up @@ -146,6 +152,8 @@ export function createPullWakeRunner(
config.eventHeartbeatThrottleMs ?? DEFAULT_EVENT_HEARTBEAT_THROTTLE_MS
)
const leaseMs = config.leaseMs ?? heartbeatIntervalMs * 3
const resumeGapResetMs = config.resumeGapResetMs ?? leaseMs
let lastHeartbeatTickAt = Date.now()
const heartbeatPath =
config.heartbeatPath ??
`/_electric/runners/${encodeURIComponent(config.runnerId)}/heartbeat`
Expand Down Expand Up @@ -296,8 +304,25 @@ export function createPullWakeRunner(

const startHeartbeat = (signal: AbortSignal): void => {
if (heartbeatIntervalMs <= 0) return
lastHeartbeatTickAt = Date.now()
requestHeartbeat(signal)
heartbeatTimer = setInterval(() => {
const now = Date.now()
const elapsedMs = now - lastHeartbeatTickAt
lastHeartbeatTickAt = now
if (
resumeGapResetMs > 0 &&
elapsedMs > resumeGapResetMs &&
isRunningState() &&
!signal.aborted
) {
actor.send({
type: `STREAM_RESET`,
error: new Error(
`Pull-wake runner heartbeat timer resumed after ${elapsedMs}ms gap`
),
})
}
requestHeartbeat(signal)
}, heartbeatIntervalMs)
}
Expand Down
63 changes: 63 additions & 0 deletions packages/agents-runtime/test/pull-wake-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,69 @@ describe(`createPullWakeRunner`, () => {
await runner.stop()
})

it(`forces the stream to reconnect when heartbeat timer observes a resume gap`, async () => {
vi.useFakeTimers()
const firstStreamOpened = deferred<void>()
const secondStreamOpened = deferred<void>()
const firstStreamClosed = deferred<void>()
const secondStreamClosed = deferred<void>()
const firstCancel = vi.fn(() => firstStreamClosed.resolve())
const streamFactory = vi.fn(async () => {
if (streamFactory.mock.calls.length === 1) {
firstStreamOpened.resolve()
return {
async *jsonStream() {
await firstStreamClosed.promise
},
cancel: firstCancel,
closed: firstStreamClosed.promise,
}
}
secondStreamOpened.resolve()
return {
async *jsonStream() {
await secondStreamClosed.promise
},
cancel: () => secondStreamClosed.resolve(),
closed: secondStreamClosed.promise,
}
})
vi.stubGlobal(
`fetch`,
vi.fn(async () => Response.json({}))
)

const runner = createPullWakeRunner({
baseUrl: `http://server`,
runnerId: `runner-1`,
runtime: runtime(),
heartbeatIntervalMs: 10,
resumeGapResetMs: 25,
eventHeartbeatThrottleMs: 0,
streamFactory,
})

runner.start()
await firstStreamOpened.promise

// Simulate the computer being asleep: wall-clock time jumps forward before
// the next heartbeat timer gets CPU again, while networking is healthy.
vi.setSystemTime(Date.now() + 60)
await vi.advanceTimersByTimeAsync(10)

await waitFor(() => {
expect(firstCancel).toHaveBeenCalledWith(expect.any(Error))
})

await vi.advanceTimersByTimeAsync(1_000)
await secondStreamOpened.promise

expect(streamFactory).toHaveBeenCalledTimes(2)
expect(runner.getHealth().reconnect_count).toBe(1)

await runner.stop()
})

it(`forces the stream to reconnect after repeated heartbeat failures`, async () => {
vi.useFakeTimers()
const firstStreamOpened = deferred<void>()
Expand Down
Loading