Skip to content

feat(event-loop): WS reconnect with exponential backoff per stream (COW-1071)#40

Open
brunota20 wants to merge 1 commit into
feat/supervisor-restart-cow-1033from
feat/ws-reconnect-cow-1071
Open

feat(event-loop): WS reconnect with exponential backoff per stream (COW-1071)#40
brunota20 wants to merge 1 commit into
feat/supervisor-restart-cow-1033from
feat/ws-reconnect-cow-1071

Conversation

@brunota20

Copy link
Copy Markdown
Collaborator

What does this PR do?

Replaces the previous "bail on WS drop" semantic (flagged as the "0.3 fix" in the source) with per-stream reconnect-aware tasks. Each chain's block subscription and each (module, chain) log subscription gets a dedicated task that survives WS drops with exponential backoff.

Sixth M4 issue landed.

Behaviour change

Public Sepolia WS (wss://ethereum-sepolia-rpc.publicnode.com) drops connections after ~20 min of sustained load. Before: engine bailed within seconds of the first drop, operator restart re-opened the subscription but every event during the gap was missed. After: the reconnect task waits 1s and reopens; only events that arrived in the 1s gap are missed. Multi-minute drops get progressively longer waits, capped at 5 min.

How it works

Each (chain_id) block subscription and each (module, chain_id, filter) log subscription gets a tokio::spawn'd task that:

  1. Opens the subscription via ProviderPool.
  2. Pumps items to an mpsc channel until the underlying stream yields None (WS drop) or Err (transport error).
  3. Logs the drop + sleeps for restart_policy::backoff_for(attempt) (1s -> 2s -> 4s -> ... cap 5 min, reusing the COW-1033 policy).
  4. Reopens. The first event after a reopen emits an INFO ... reopened line + increments shepherd_stream_reconnects_total.
  5. Resets attempt = 0 once the stream has been healthy for HEALTHY_WINDOW (60s of uninterrupted events).

The event loop reads the channel as a Stream via futures::stream::unfold (no new dep). A bare None from the merged stream now indicates a reconnect task panicked or the channel closed; the existing bail-on-None remains as a defensive safety net with an updated log message.

New metric (consumed via COW-1034)

shepherd_stream_reconnects_total{kind, chain_id, module}

Incremented on every successful reopen. SRE alerts can be wired against "stream churn" thresholds.

Changes

File Change
crates/nexum-engine/src/runtime/event_loop.rs open_block_streams / open_log_streams now spawn reconnect tasks; new reconnecting_block_task / reconnecting_log_task private fns; receiver_stream helper wraps mpsc::Receiver<T> as a Stream<Item = T>. run() bail-on-None log message updated to reflect the new semantic.

Single file, 1 file changed, +204/-53.

Breaking changes

None. Public surface (open_block_streams, open_log_streams, run, TaggedBlockStream, TaggedLogStream) preserved.

Tests

  • cargo test --workspace -> 159 host tests + 6 doctests passing.
  • cargo clippy --all-targets --workspace -- -D warnings clean.
  • cargo fmt --all --check clean.
  • Live Sepolia happy path: just run-m3 boots, all 3 modules dispatch normally, block subscription open chain_id=11155111 log emitted, no reconnect activity in 60s window (network was stable). Clean SIGTERM shutdown.
  • Existing run_does_not_bail_when_both_stream_kinds_are_empty regression guard still passes (the empty-stream tolerance is independent of reconnect).

Out of scope (already filed)

  • WS endpoint failover (Alchemy <-> publicnode on failure) - operator concern.
  • Backfill of events missed during the drop window - live-stream semantic only.
  • Operator-tunable backoff / healthy-window via engine.toml (configurable in 0.3).
  • Multi-chain supervisor isolation - COW-1073 (partial isolation is already in place from this PR's task-per-chain model; COW-1073 adds the explicit guarantee + tests).

AI assistance disclosure

AI Assistance: this change + description was produced by a Claude Code agent (Claude Opus 4.7 1M context). The agent designed the reconnect task shape, implemented the receiver_stream wrapper, validated the happy path on live Sepolia, and authored this PR description. A human (Bruno) reviewed and is accountable for the result.

Linear: COW-1071. Stacks on #39 (COW-1033 supervisor restart).

…OW-1071)

Replaces the previous "bail on WS drop" semantic (flagged as the
"0.3 fix" in the source) with per-stream reconnect-aware tasks. Each
chain's block subscription and each (module, chain) log subscription
gets a dedicated task that:

1. Opens the subscription via `ProviderPool`.
2. Pumps items to an mpsc channel until the underlying stream
   yields `None` (WS drop) or `Err` (transport-level error).
3. Logs the drop + sleeps for `restart_policy::backoff_for(attempt)`
   (1s -> 2s -> 4s -> ... cap 5 min, reusing the COW-1033 policy).
4. Reopens. The first event after a reopen emits an `INFO ...
   reopened` line + increments `shepherd_stream_reconnects_total`.
5. Resets `attempt = 0` once the stream has been healthy for the
   `HEALTHY_WINDOW` (60 s of uninterrupted events) so a flaky-but-
   then-stable connection reverts to fast retries on the next drop.

The event loop reads the channel as a regular `Stream` (wrapped
with `futures::stream::unfold` to avoid pulling in `tokio-stream`
just for `ReceiverStream`). A bare `None` from the merged stream
now indicates the reconnect task itself exited (panic or channel
closed); that path still bails the engine as before, but the log
message updated to reflect the new semantic.

## Key behavioural change

Public Sepolia (`wss://ethereum-sepolia-rpc.publicnode.com`) drops
WS connections after ~20 min of sustained load. Pre-fix: engine
bailed within seconds of the first drop, an operator restart
re-opened the subscription but the engine had missed every event in
between. Post-fix: the reconnect task waits 1s and reopens; only
events that arrived during the 1s gap are missed. Multi-minute drops
get progressively longer waits, capped at 5 min.

## New metric (consumed via COW-1034)

`shepherd_stream_reconnects_total{kind, chain_id, module}` counter,
incremented on every successful reopen. Operators write SLO alerts
against this for "stream churn" (e.g. > 5 reconnects per 10 min on
the same chain).

## Channel buffer + back-pressure

Buffer is 64 events per task. Real-time dispatch usually drains in
~12 s (Sepolia block time) so the buffer is overkill for normal
operation; it absorbs a brief dispatch-side stall (e.g. a stop-loss
cow-api submit that takes 2 s) without dropping events at the WS
boundary.

## Tests

- `cargo test --workspace` -> 159 host tests + 6 doctests passing
  (unchanged shape - all existing tests still pass, including the
  `run_does_not_bail_when_both_stream_kinds_are_empty` regression
  guard which verifies the empty-stream path).
- `cargo clippy --all-targets --workspace -- -D warnings` clean.
- `cargo fmt --all --check` clean.
- Live Sepolia happy path: `just run-m3` boots, all 3 modules
  dispatch normally, `subscription open` log line emitted, no
  reconnect activity in 60 s window (network was stable). Clean
  SIGTERM shutdown.

## Out of scope

- WS endpoint failover (swap Alchemy <-> publicnode on failure).
  Operator concern; track via `[engine.chains.<id>]` schema if
  demand arises.
- Backfill of events missed during the drop window. Live-stream
  semantic only; backfill is an indexer concern outside the M4
  engine scope.
- Operator-tunable backoff / healthy-window via `engine.toml`. The
  current constants are workspace literals; configurable in 0.3.
- Per-chain isolation across reconnects (COW-1073). The current
  patch already gives partial isolation: each chain's task drops
  + reconnects independently and one task's failure does not
  starve the others. COW-1073 covers the supervisor-side
  multi-chain coordination.

Linear: COW-1071. Sixth M4 issue landed; stacks on #39 (COW-1033).
@linear-code

linear-code Bot commented Jun 18, 2026

Copy link
Copy Markdown

COW-1071

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant