feat(event-loop): WS reconnect with exponential backoff per stream (COW-1071)#40
Open
brunota20 wants to merge 1 commit into
Open
feat(event-loop): WS reconnect with exponential backoff per stream (COW-1071)#40brunota20 wants to merge 1 commit into
brunota20 wants to merge 1 commit into
Conversation
…OW-1071)
Replaces the previous "bail on WS drop" semantic (flagged as the
"0.3 fix" in the source) with per-stream reconnect-aware tasks. Each
chain's block subscription and each (module, chain) log subscription
gets a dedicated task that:
1. Opens the subscription via `ProviderPool`.
2. Pumps items to an mpsc channel until the underlying stream
yields `None` (WS drop) or `Err` (transport-level error).
3. Logs the drop + sleeps for `restart_policy::backoff_for(attempt)`
(1s -> 2s -> 4s -> ... cap 5 min, reusing the COW-1033 policy).
4. Reopens. The first event after a reopen emits an `INFO ...
reopened` line + increments `shepherd_stream_reconnects_total`.
5. Resets `attempt = 0` once the stream has been healthy for the
`HEALTHY_WINDOW` (60 s of uninterrupted events) so a flaky-but-
then-stable connection reverts to fast retries on the next drop.
The event loop reads the channel as a regular `Stream` (wrapped
with `futures::stream::unfold` to avoid pulling in `tokio-stream`
just for `ReceiverStream`). A bare `None` from the merged stream
now indicates the reconnect task itself exited (panic or channel
closed); that path still bails the engine as before, but the log
message updated to reflect the new semantic.
## Key behavioural change
Public Sepolia (`wss://ethereum-sepolia-rpc.publicnode.com`) drops
WS connections after ~20 min of sustained load. Pre-fix: engine
bailed within seconds of the first drop, an operator restart
re-opened the subscription but the engine had missed every event in
between. Post-fix: the reconnect task waits 1s and reopens; only
events that arrived during the 1s gap are missed. Multi-minute drops
get progressively longer waits, capped at 5 min.
## New metric (consumed via COW-1034)
`shepherd_stream_reconnects_total{kind, chain_id, module}` counter,
incremented on every successful reopen. Operators write SLO alerts
against this for "stream churn" (e.g. > 5 reconnects per 10 min on
the same chain).
## Channel buffer + back-pressure
Buffer is 64 events per task. Real-time dispatch usually drains in
~12 s (Sepolia block time) so the buffer is overkill for normal
operation; it absorbs a brief dispatch-side stall (e.g. a stop-loss
cow-api submit that takes 2 s) without dropping events at the WS
boundary.
## Tests
- `cargo test --workspace` -> 159 host tests + 6 doctests passing
(unchanged shape - all existing tests still pass, including the
`run_does_not_bail_when_both_stream_kinds_are_empty` regression
guard which verifies the empty-stream path).
- `cargo clippy --all-targets --workspace -- -D warnings` clean.
- `cargo fmt --all --check` clean.
- Live Sepolia happy path: `just run-m3` boots, all 3 modules
dispatch normally, `subscription open` log line emitted, no
reconnect activity in 60 s window (network was stable). Clean
SIGTERM shutdown.
## Out of scope
- WS endpoint failover (swap Alchemy <-> publicnode on failure).
Operator concern; track via `[engine.chains.<id>]` schema if
demand arises.
- Backfill of events missed during the drop window. Live-stream
semantic only; backfill is an indexer concern outside the M4
engine scope.
- Operator-tunable backoff / healthy-window via `engine.toml`. The
current constants are workspace literals; configurable in 0.3.
- Per-chain isolation across reconnects (COW-1073). The current
patch already gives partial isolation: each chain's task drops
+ reconnects independently and one task's failure does not
starve the others. COW-1073 covers the supervisor-side
multi-chain coordination.
Linear: COW-1071. Sixth M4 issue landed; stacks on #39 (COW-1033).
4 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What does this PR do?
Replaces the previous "bail on WS drop" semantic (flagged as the "0.3 fix" in the source) with per-stream reconnect-aware tasks. Each chain's block subscription and each (module, chain) log subscription gets a dedicated task that survives WS drops with exponential backoff.
Sixth M4 issue landed.
Behaviour change
Public Sepolia WS (
wss://ethereum-sepolia-rpc.publicnode.com) drops connections after ~20 min of sustained load. Before: engine bailed within seconds of the first drop, operator restart re-opened the subscription but every event during the gap was missed. After: the reconnect task waits 1s and reopens; only events that arrived in the 1s gap are missed. Multi-minute drops get progressively longer waits, capped at 5 min.How it works
Each
(chain_id)block subscription and each(module, chain_id, filter)log subscription gets atokio::spawn'd task that:ProviderPool.None(WS drop) orErr(transport error).restart_policy::backoff_for(attempt)(1s -> 2s -> 4s -> ... cap 5 min, reusing the COW-1033 policy).INFO ... reopenedline + incrementsshepherd_stream_reconnects_total.attempt = 0once the stream has been healthy forHEALTHY_WINDOW(60s of uninterrupted events).The event loop reads the channel as a
Streamviafutures::stream::unfold(no new dep). A bareNonefrom the merged stream now indicates a reconnect task panicked or the channel closed; the existing bail-on-None remains as a defensive safety net with an updated log message.New metric (consumed via COW-1034)
Incremented on every successful reopen. SRE alerts can be wired against "stream churn" thresholds.
Changes
crates/nexum-engine/src/runtime/event_loop.rsopen_block_streams/open_log_streamsnow spawn reconnect tasks; newreconnecting_block_task/reconnecting_log_taskprivate fns;receiver_streamhelper wrapsmpsc::Receiver<T>as aStream<Item = T>.run()bail-on-None log message updated to reflect the new semantic.Single file, 1 file changed, +204/-53.
Breaking changes
None. Public surface (
open_block_streams,open_log_streams,run,TaggedBlockStream,TaggedLogStream) preserved.Tests
cargo test --workspace-> 159 host tests + 6 doctests passing.cargo clippy --all-targets --workspace -- -D warningsclean.cargo fmt --all --checkclean.just run-m3boots, all 3 modules dispatch normally,block subscription open chain_id=11155111log emitted, no reconnect activity in 60s window (network was stable). Clean SIGTERM shutdown.run_does_not_bail_when_both_stream_kinds_are_emptyregression guard still passes (the empty-stream tolerance is independent of reconnect).Out of scope (already filed)
engine.toml(configurable in 0.3).AI assistance disclosure
AI Assistance: this change + description was produced by a Claude Code agent (Claude Opus 4.7 1M context). The agent designed the reconnect task shape, implemented the receiver_stream wrapper, validated the happy path on live Sepolia, and authored this PR description. A human (Bruno) reviewed and is accountable for the result.
Linear: COW-1071. Stacks on #39 (COW-1033 supervisor restart).