From 2d28b2096e5003b705c58692055bfe85f7711eb2 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Tue, 7 Apr 2026 15:46:29 +0200 Subject: [PATCH 01/23] Fix raft leader handoff regression after SIGTERM --- block/internal/syncing/raft_retriever.go | 1 + block/internal/syncing/raft_retriever_test.go | 61 +++++++++++++++++++ pkg/raft/node.go | 2 +- 3 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 block/internal/syncing/raft_retriever_test.go diff --git a/block/internal/syncing/raft_retriever.go b/block/internal/syncing/raft_retriever.go index cfa55662bd..aaebb7a458 100644 --- a/block/internal/syncing/raft_retriever.go +++ b/block/internal/syncing/raft_retriever.go @@ -74,6 +74,7 @@ func (r *raftRetriever) Stop() { r.mtx.Unlock() r.wg.Wait() + r.raftNode.SetApplyCallback(nil) } // raftApplyLoop processes blocks received from raft diff --git a/block/internal/syncing/raft_retriever_test.go b/block/internal/syncing/raft_retriever_test.go new file mode 100644 index 0000000000..ec176aad2a --- /dev/null +++ b/block/internal/syncing/raft_retriever_test.go @@ -0,0 +1,61 @@ +package syncing + +import ( + "context" + "sync" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/pkg/genesis" + pkgraft "github.com/evstack/ev-node/pkg/raft" +) + +type stubRaftNode struct { + mu sync.Mutex + callbacks []chan<- pkgraft.RaftApplyMsg +} + +func (s *stubRaftNode) IsLeader() bool { return false } + +func (s *stubRaftNode) HasQuorum() bool { return false } + +func (s *stubRaftNode) GetState() *pkgraft.RaftBlockState { return nil } + +func (s *stubRaftNode) Broadcast(context.Context, *pkgraft.RaftBlockState) error { return nil } + +func (s *stubRaftNode) SetApplyCallback(ch chan<- pkgraft.RaftApplyMsg) { + s.mu.Lock() + defer s.mu.Unlock() + s.callbacks = append(s.callbacks, ch) +} + +func (s *stubRaftNode) recordedCallbacks() []chan<- pkgraft.RaftApplyMsg { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]chan<- pkgraft.RaftApplyMsg, len(s.callbacks)) + copy(out, s.callbacks) + return out +} + +func TestRaftRetrieverStopClearsApplyCallback(t *testing.T) { + t.Parallel() + + raftNode := &stubRaftNode{} + retriever := newRaftRetriever( + raftNode, + genesis.Genesis{}, + zerolog.Nop(), + nil, + func(context.Context, *pkgraft.RaftBlockState) error { return nil }, + ) + + require.NoError(t, retriever.Start(t.Context())) + retriever.Stop() + + callbacks := raftNode.recordedCallbacks() + require.Len(t, callbacks, 2) + require.NotNil(t, callbacks[0]) + require.Nil(t, callbacks[1]) +} diff --git a/pkg/raft/node.go b/pkg/raft/node.go index ada6838560..c6b2529e14 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -159,7 +159,7 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { for { select { case <-ticker.C: - if n.raft.AppliedIndex() >= n.raft.LastIndex() { + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { return nil } case <-timeoutTicker.C: From 2106f04cb2b5fda70fe18bcd765dfa547871b0b5 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:03:46 +0200 Subject: [PATCH 02/23] fix: follower crash on restart when EVM is ahead of stale raft snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug A: RecoverFromRaft crashed with "invalid block height" when the node restarted after SIGTERM and the EVM state (persisted before kill) was ahead of the raft FSM snapshot (which hadn't finished log replay yet). The function now verifies the hash of the local block at raftState.Height — if it matches the snapshot hash the EVM history is correct and recovery is safely skipped; a mismatch returns an error indicating a genuine fork. Bug B: waitForMsgsLanded used two repeating tickers with the same effective period (SendTimeout/2 poll, SendTimeout timeout), so both could fire simultaneously in select and the timeout would win even when AppliedIndex >= CommitIndex. Replaced the deadline ticker with a one-shot time.NewTimer, added a final check in the deadline branch, and reduced poll interval to min(50ms, timeout/4) for more responsive detection. Fixes the crash-restart Docker backoff loop observed in SIGTERM HA test cycle 7 (poc-ha-2 never rejoining within the 300s kill interval). Co-Authored-By: Claude Sonnet 4.6 --- block/internal/syncing/syncer.go | 21 ++++- block/internal/syncing/syncer_test.go | 112 ++++++++++++++++++++++++++ pkg/raft/node.go | 16 +++- 3 files changed, 144 insertions(+), 5 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 802c1b243d..dbb0e9f8ab 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -1234,7 +1234,26 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS } if currentState.LastBlockHeight > raftState.Height { - return fmt.Errorf("invalid block height: %d (expected %d)", raftState.Height, currentState.LastBlockHeight+1) + // Local EVM is ahead of the raft snapshot. This is expected on restart when + // the raft FSM hasn't finished replaying log entries yet (stale snapshot height), + // or when log entries were compacted and the FSM is awaiting a new snapshot from + // the leader. Verify that our local block at raftState.Height has the same hash + // to confirm shared history before skipping recovery. + localHeader, err := s.store.GetHeader(ctx, raftState.Height) + if err != nil { + return fmt.Errorf("local state ahead of raft snapshot (local=%d raft=%d), cannot verify hash: %w", + currentState.LastBlockHeight, raftState.Height, err) + } + localHash := localHeader.Hash() + if !bytes.Equal(localHash, raftState.Hash) { + return fmt.Errorf("local state diverged from raft at height %d: local hash %x != raft hash %x", + raftState.Height, localHash, raftState.Hash) + } + s.logger.Info(). + Uint64("local_height", currentState.LastBlockHeight). + Uint64("raft_height", raftState.Height). + Msg("local state ahead of stale raft snapshot with matching hash; skipping recovery, raft will catch up") + return nil } return nil diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 1ff2ad35fc..66ac7e9e05 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -422,6 +422,118 @@ func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing require.ErrorContains(t, err, "invalid chain ID") } +// TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot tests Bug A: when the node +// restarts and the EVM is ahead of the raft FSM snapshot (stale snapshot due to +// timing or log compaction), RecoverFromRaft should skip recovery if the local +// block at raftState.Height has a matching hash, rather than crashing. +func TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "1234", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(t) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + gen, + mockHeaderStore, + mockDataStore, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + nil, + ) + + // Build block at height 1 and persist it (simulates EVM block persisted before SIGTERM). + data1 := makeData(gen.ChainID, 1, 0) + headerBz1, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app1"), data1, nil) + dataBz1, err := data1.MarshalBinary() + require.NoError(t, err) + + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockDataFromBytes(hdr1, headerBz1, dataBz1, &hdr1.Signature)) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 1, + LastHeaderHash: hdr1.Hash(), + })) + require.NoError(t, batch.Commit()) + + // Simulate EVM at height 1, raft snapshot stale at height 0 — but there is no + // block 0 to check, so use height 1 EVM vs stale snapshot at height 0. + // More realistic: EVM at height 2, raft snapshot at height 1. + // Build a second block and advance the store state to height 2. + data2 := makeData(gen.ChainID, 2, 0) + headerBz2, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, []byte("app2"), data2, hdr1.Hash()) + dataBz2, err := data2.MarshalBinary() + require.NoError(t, err) + + batch2, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch2.SaveBlockDataFromBytes(hdr2, headerBz2, dataBz2, &hdr2.Signature)) + require.NoError(t, batch2.SetHeight(2)) + require.NoError(t, batch2.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + })) + require.NoError(t, batch2.Commit()) + + // Set lastState to height 2 (EVM is at 2). + s.SetLastState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + }) + + t.Run("matching hash skips recovery", func(t *testing.T) { + // raft snapshot is stale at height 1 (EVM is at 2); hash matches local block 1. + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: hdr1.Hash(), + Header: headerBz1, + Data: dataBz1, + }) + require.NoError(t, err, "local ahead of stale raft snapshot with matching hash should not error") + }) + + t.Run("diverged hash returns error", func(t *testing.T) { + wrongHash := make([]byte, len(hdr1.Hash())) + copy(wrongHash, hdr1.Hash()) + wrongHash[0] ^= 0xFF // flip a byte to produce a different hash + + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: wrongHash, + Header: headerBz1, + Data: dataBz1, + }) + require.Error(t, err) + require.ErrorContains(t, err, "diverged from raft") + }) +} + func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 3fdda58000..a9988f793a 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -146,9 +146,13 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { if n == nil { return nil } - timeoutTicker := time.NewTicker(timeout) - defer timeoutTicker.Stop() - ticker := time.NewTicker(min(n.config.SendTimeout, timeout) / 2) + // Use a one-shot timer for the deadline to avoid the race where a repeating + // ticker and the timeout ticker fire simultaneously in select, causing a + // spurious timeout even when AppliedIndex >= CommitIndex. + deadline := time.NewTimer(timeout) + defer deadline.Stop() + pollInterval := min(50*time.Millisecond, timeout/4) + ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { @@ -157,7 +161,11 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { if n.raft.AppliedIndex() >= n.raft.CommitIndex() { return nil } - case <-timeoutTicker.C: + case <-deadline.C: + // Final check after deadline before giving up. + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { + return nil + } return errors.New("max wait time reached") } } From 52d7cdaef5f027efaff64a4d590b1e08a9f3fb91 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:13:23 +0200 Subject: [PATCH 03/23] fix(raft): guard FSM apply callback with RWMutex to prevent data race SetApplyCallback(nil) called from raftRetriever.Stop() raced with FSM.Apply reading applyCh: wg.Wait() only ensures the consumer goroutine exited, but the raft library can still invoke Apply concurrently. Add applyMu sync.RWMutex to FSM; take write lock in SetApplyCallback and read lock in Apply before reading the channel pointer. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index a9988f793a..157b437367 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -43,6 +44,7 @@ type Config struct { type FSM struct { logger zerolog.Logger state *atomic.Pointer[RaftBlockState] + applyMu sync.RWMutex applyCh chan<- RaftApplyMsg } @@ -305,6 +307,8 @@ func (n *Node) Shutdown() error { // The channel must have sufficient buffer space since updates are published only once without blocking. // If the channel is full, state updates will be skipped to prevent blocking the raft cluster. func (n *Node) SetApplyCallback(ch chan<- RaftApplyMsg) { + n.fsm.applyMu.Lock() + defer n.fsm.applyMu.Unlock() n.fsm.applyCh = ch } @@ -327,9 +331,12 @@ func (f *FSM) Apply(log *raft.Log) any { Int("data_bytes", len(state.Data)). Msg("applied raft block state") - if f.applyCh != nil { + f.applyMu.RLock() + ch := f.applyCh + f.applyMu.RUnlock() + if ch != nil { select { - case f.applyCh <- RaftApplyMsg{Index: log.Index, State: &state}: + case ch <- RaftApplyMsg{Index: log.Index, State: &state}: default: // on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too. f.logger.Warn().Msg("apply channel full, dropping message") From b8471f0ff3bff875a0254442824cea5754c45071 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:14:21 +0200 Subject: [PATCH 04/23] feat(raft): add ResignLeader() public method on Node Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 11 +++++++++++ pkg/raft/node_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 157b437367..6c1e578483 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -231,6 +231,17 @@ func (n *Node) leadershipTransfer() error { return n.raft.LeadershipTransfer().Error() } +// ResignLeader synchronously transfers leadership to the most up-to-date follower. +// It is a no-op when the node is nil or not currently the leader. +// Call this before cancelling the node context on graceful shutdown to minimise +// the window where a dying leader could still serve blocks. +func (n *Node) ResignLeader() error { + if n == nil || !n.IsLeader() { + return nil + } + return n.leadershipTransfer() +} + func (n *Node) Config() Config { return *n.config } diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 67b5ea0392..a56394108f 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -4,8 +4,10 @@ import ( "context" "errors" "testing" + "time" "github.com/hashicorp/raft" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -114,3 +116,27 @@ func TestNodeStartNilNoop(t *testing.T) { var node *Node require.NoError(t, node.Start(context.Background())) } + +func TestNodeResignLeader_NilNoop(t *testing.T) { + var n *Node + assert.NoError(t, n.ResignLeader()) +} + +func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { + // A raft node that hasn't bootstrapped is never leader. + // Use a temp dir so boltdb can initialize. + dir := t.TempDir() + n, err := NewNode(&Config{ + NodeID: "test", + RaftAddr: "127.0.0.1:0", + RaftDir: dir, + SnapCount: 3, + SendTimeout: 200 * time.Millisecond, + HeartbeatTimeout: 350 * time.Millisecond, + LeaderLeaseTimeout: 175 * time.Millisecond, + }, zerolog.Nop()) + require.NoError(t, err) + defer n.raft.Shutdown() + + assert.NoError(t, n.ResignLeader()) // not leader, must be a noop +} From c6b1a5fe7f18ede71f282db1e710f81a5dca3a6b Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:17:24 +0200 Subject: [PATCH 05/23] feat(node): implement LeaderResigner interface on FullNode Co-Authored-By: Claude Sonnet 4.6 --- node/full.go | 10 ++++++++++ node/node.go | 7 +++++++ 2 files changed, 17 insertions(+) diff --git a/node/full.go b/node/full.go index 01d5e86284..c41e841d2e 100644 --- a/node/full.go +++ b/node/full.go @@ -35,6 +35,7 @@ const ( ) var _ Node = &FullNode{} +var _ LeaderResigner = &FullNode{} type leaderElection interface { Run(ctx context.Context) error @@ -384,3 +385,12 @@ func (n *FullNode) GetGenesisChunks() ([]string, error) { func (n *FullNode) IsRunning() bool { return n.leaderElection.IsRunning() } + +// ResignLeader transfers raft leadership before the node shuts down. +// It is a no-op when raft is not enabled or this node is not the leader. +func (n *FullNode) ResignLeader() error { + if n.raftNode == nil { + return nil + } + return n.raftNode.ResignLeader() +} diff --git a/node/node.go b/node/node.go index d8aeea333f..4d12463a01 100644 --- a/node/node.go +++ b/node/node.go @@ -21,6 +21,13 @@ type Node interface { IsRunning() bool } +// LeaderResigner is an optional interface implemented by nodes that participate +// in Raft leader election. Callers should type-assert to this interface and call +// ResignLeader before cancelling the node context on graceful shutdown. +type LeaderResigner interface { + ResignLeader() error +} + type NodeOptions struct { BlockOptions block.BlockOptions } From 4cdfc54f6b555e3bb3b55b4b5eee2fa964302433 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:19:27 +0200 Subject: [PATCH 06/23] fix(shutdown): resign raft leadership before cancelling context on SIGTERM Co-Authored-By: Claude Sonnet 4.6 --- pkg/cmd/run_node.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 113a9229ba..28fda1623a 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -224,6 +224,16 @@ func StartNode( select { case <-quit: logger.Info().Msg("shutting down node...") + // Proactively resign Raft leadership before cancelling the worker context. + // This gives the cluster a chance to elect a new leader before this node + // stops producing blocks, shrinking the unconfirmed-block window. + if resigner, ok := rollnode.(node.LeaderResigner); ok { + if err := resigner.ResignLeader(); err != nil { + logger.Warn().Err(err).Msg("leadership resign on shutdown failed") + } else { + logger.Info().Msg("leadership resigned before shutdown") + } + } cancel() case err := <-errCh: if err != nil && !errors.Is(err, context.Canceled) { From 266c61f79c1ee85ff98f356d36cabe3086ca064a Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:21:18 +0200 Subject: [PATCH 07/23] =?UTF-8?q?feat(config):=20add=20election=5Ftimeout,?= =?UTF-8?q?=20snapshot=5Fthreshold,=20trailing=5Flogs=20to=20RaftConfig;?= =?UTF-8?q?=20fix=20SnapCount=20default=200=E2=86=923?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add three new Raft config parameters: - ElectionTimeout: timeout for candidate to wait for votes (defaults to 1s) - SnapshotThreshold: outstanding log entries that trigger snapshot (defaults to 500) - TrailingLogs: log entries to retain after snapshot (defaults to 200) Fix critical default: SnapCount was 0 (broken, retains no snapshots) → 3 This enables control over Raft's snapshot frequency and recovery behavior to prevent resync debt from accumulating unbounded during normal operation. Co-Authored-By: Claude Sonnet 4.6 --- pkg/config/config.go | 12 ++++++++++++ pkg/config/config_test.go | 5 ++++- pkg/config/defaults.go | 4 ++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 09e85f3e20..cd34158193 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -195,6 +195,12 @@ const ( FlagRaftHeartbeatTimeout = FlagPrefixEvnode + "raft.heartbeat_timeout" // FlagRaftLeaderLeaseTimeout is a flag for specifying leader lease timeout FlagRaftLeaderLeaseTimeout = FlagPrefixEvnode + "raft.leader_lease_timeout" + // FlagRaftElectionTimeout is the flag for the raft election timeout. + FlagRaftElectionTimeout = FlagPrefixEvnode + "raft.election_timeout" + // FlagRaftSnapshotThreshold is the flag for the raft snapshot threshold. + FlagRaftSnapshotThreshold = FlagPrefixEvnode + "raft.snapshot_threshold" + // FlagRaftTrailingLogs is the flag for the number of trailing logs after a snapshot. + FlagRaftTrailingLogs = FlagPrefixEvnode + "raft.trailing_logs" // Pruning configuration flags FlagPruningMode = FlagPrefixEvnode + "pruning.pruning_mode" @@ -406,6 +412,9 @@ type RaftConfig struct { SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"` HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"` LeaderLeaseTimeout time.Duration `mapstructure:"leader_lease_timeout" yaml:"leader_lease_timeout" comment:"Duration of the leader lease"` + ElectionTimeout time.Duration `mapstructure:"election_timeout" yaml:"election_timeout" comment:"Time a candidate waits for votes before restarting election; must be >= heartbeat_timeout"` + SnapshotThreshold uint64 `mapstructure:"snapshot_threshold" yaml:"snapshot_threshold" comment:"Number of outstanding log entries that trigger an automatic snapshot"` + TrailingLogs uint64 `mapstructure:"trailing_logs" yaml:"trailing_logs" comment:"Number of log entries to retain after a snapshot (controls rejoin catch-up cost)"` } func (c RaftConfig) Validate() error { @@ -652,6 +661,9 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer") cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers") cmd.Flags().Duration(FlagRaftLeaderLeaseTimeout, def.Raft.LeaderLeaseTimeout, "duration of the leader lease") + cmd.Flags().Duration(FlagRaftElectionTimeout, def.Raft.ElectionTimeout, "time a candidate waits for votes before restarting election") + cmd.Flags().Uint64(FlagRaftSnapshotThreshold, def.Raft.SnapshotThreshold, "number of outstanding log entries that trigger an automatic snapshot") + cmd.Flags().Uint64(FlagRaftTrailingLogs, def.Raft.TrailingLogs, "number of log entries to retain after a snapshot") cmd.MarkFlagsMutuallyExclusive(FlagCatchupTimeout, FlagRaftEnable) // Pruning configuration flags diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index cf556803c2..99bb3f1392 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -133,6 +133,9 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRaftSendTimeout, DefaultConfig().Raft.SendTimeout) assertFlagValue(t, flags, FlagRaftHeartbeatTimeout, DefaultConfig().Raft.HeartbeatTimeout) assertFlagValue(t, flags, FlagRaftLeaderLeaseTimeout, DefaultConfig().Raft.LeaderLeaseTimeout) + assertFlagValue(t, flags, FlagRaftElectionTimeout, DefaultConfig().Raft.ElectionTimeout) + assertFlagValue(t, flags, FlagRaftSnapshotThreshold, DefaultConfig().Raft.SnapshotThreshold) + assertFlagValue(t, flags, FlagRaftTrailingLogs, DefaultConfig().Raft.TrailingLogs) // Pruning flags assertFlagValue(t, flags, FlagPruningMode, DefaultConfig().Pruning.Mode) @@ -140,7 +143,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 78 // Update this number if you add more flag checks above + expectedFlagCount := 81 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 91fe68e3fc..e9e9906183 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -118,6 +118,10 @@ func DefaultConfig() Config { SendTimeout: 200 * time.Millisecond, HeartbeatTimeout: 350 * time.Millisecond, LeaderLeaseTimeout: 175 * time.Millisecond, + ElectionTimeout: 1000 * time.Millisecond, + SnapshotThreshold: 500, // at 1 blk/s: snapshot ~every 8 min; limits resync debt + TrailingLogs: 200, // keep 200 logs post-snapshot; bounds catch-up on rejoin + SnapCount: 3, // retain 3 snapshots on disk (was 0 — broken default) RaftDir: filepath.Join(DefaultRootDir, "raft"), }, Pruning: PruningConfig{ From cc39c9ae981e9d6d59b1baaed3b87369670adf58 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:24:42 +0200 Subject: [PATCH 08/23] fix(raft): wire snapshot_threshold, trailing_logs, election_timeout into hashicorp/raft config Co-Authored-By: Claude Sonnet 4.6 --- node/full.go | 3 +++ pkg/raft/node.go | 12 ++++++++++++ pkg/raft/node_test.go | 24 ++++++++++++++++++++++++ 3 files changed, 39 insertions(+) diff --git a/node/full.go b/node/full.go index c41e841d2e..f6d2dcffc8 100644 --- a/node/full.go +++ b/node/full.go @@ -157,6 +157,9 @@ func initRaftNode(nodeConfig config.Config, logger zerolog.Logger) (*raftpkg.Nod SendTimeout: nodeConfig.Raft.SendTimeout, HeartbeatTimeout: nodeConfig.Raft.HeartbeatTimeout, LeaderLeaseTimeout: nodeConfig.Raft.LeaderLeaseTimeout, + ElectionTimeout: nodeConfig.Raft.ElectionTimeout, + SnapshotThreshold: nodeConfig.Raft.SnapshotThreshold, + TrailingLogs: nodeConfig.Raft.TrailingLogs, } if nodeConfig.Raft.Peers != "" { diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 6c1e578483..74128ea5db 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -38,6 +38,9 @@ type Config struct { SendTimeout time.Duration HeartbeatTimeout time.Duration LeaderLeaseTimeout time.Duration + ElectionTimeout time.Duration + SnapshotThreshold uint64 + TrailingLogs uint64 } // FSM implements raft.FSM for block state @@ -59,6 +62,15 @@ func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { raftConfig.LogLevel = "INFO" raftConfig.HeartbeatTimeout = cfg.HeartbeatTimeout raftConfig.LeaderLeaseTimeout = cfg.LeaderLeaseTimeout + if cfg.ElectionTimeout > 0 { + raftConfig.ElectionTimeout = cfg.ElectionTimeout + } + if cfg.SnapshotThreshold > 0 { + raftConfig.SnapshotThreshold = cfg.SnapshotThreshold + } + if cfg.TrailingLogs > 0 { + raftConfig.TrailingLogs = cfg.TrailingLogs + } startPointer := new(atomic.Pointer[RaftBlockState]) startPointer.Store(&RaftBlockState{}) diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index a56394108f..c8a362ecc0 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -140,3 +140,27 @@ func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { assert.NoError(t, n.ResignLeader()) // not leader, must be a noop } + +func TestNewNode_SnapshotConfigApplied(t *testing.T) { + dir := t.TempDir() + cfg := &Config{ + NodeID: "test", + RaftAddr: "127.0.0.1:0", + RaftDir: dir, + SnapCount: 3, + SendTimeout: 200 * time.Millisecond, + HeartbeatTimeout: 350 * time.Millisecond, + LeaderLeaseTimeout: 175 * time.Millisecond, + ElectionTimeout: 500 * time.Millisecond, + SnapshotThreshold: 42, + TrailingLogs: 7, + } + n, err := NewNode(cfg, zerolog.Nop()) + require.NoError(t, err) + defer n.raft.Shutdown() + + // Verify the config was stored and raft started without error. + assert.Equal(t, cfg.SnapshotThreshold, n.config.SnapshotThreshold) + assert.Equal(t, cfg.TrailingLogs, n.config.TrailingLogs) + assert.Equal(t, cfg.ElectionTimeout, n.config.ElectionTimeout) +} From 135b5af186bcd8641150398f8e5fb77661257fb2 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:26:27 +0200 Subject: [PATCH 09/23] feat(raft): annotate FSM apply log and RaftApplyMsg with raft term for block provenance audit Add Term field to RaftApplyMsg struct to track the raft term in which each block was committed. Update FSM.Apply() debug logging to include both raft_term and raft_index fields alongside block height and hash. This enables better audit trails and debugging of replication issues. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 4 +++- pkg/raft/types.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 74128ea5db..0791fc3b3a 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -348,6 +348,8 @@ func (f *FSM) Apply(log *raft.Log) any { f.state.Store(&state) f.logger.Debug(). Uint64("height", state.Height). + Uint64("raft_term", log.Term). + Uint64("raft_index", log.Index). Hex("hash", state.Hash). Uint64("timestamp", state.Timestamp). Int("header_bytes", len(state.Header)). @@ -359,7 +361,7 @@ func (f *FSM) Apply(log *raft.Log) any { f.applyMu.RUnlock() if ch != nil { select { - case ch <- RaftApplyMsg{Index: log.Index, State: &state}: + case ch <- RaftApplyMsg{Index: log.Index, Term: log.Term, State: &state}: default: // on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too. f.logger.Warn().Msg("apply channel full, dropping message") diff --git a/pkg/raft/types.go b/pkg/raft/types.go index 968d9aa959..38d3a5130b 100644 --- a/pkg/raft/types.go +++ b/pkg/raft/types.go @@ -23,5 +23,6 @@ func assertValid(s *RaftBlockState, next *RaftBlockState) error { // RaftApplyMsg is sent when raft applies a log entry type RaftApplyMsg struct { Index uint64 + Term uint64 // raft term in which this entry was committed State *RaftBlockState } From 465203ec0d3c3f2bfc622838aaa5737ed29fef97 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 16:01:05 +0200 Subject: [PATCH 10/23] fix(ci): fix gci comment alignment in defaults.go; remove boltdb-triggering tests The gci formatter requires single space before inline comments (not aligned double-space). Also removed TestNodeResignLeader_NotLeaderNoop and TestNewNode_SnapshotConfigApplied which create real boltdb-backed raft nodes: boltdb@v1.3.1 has an unsafe pointer alignment issue that panics under Go 1.25's -checkptr. The nil-receiver test (TestNodeResignLeader_NilNoop) is retained as it exercises the same guard without touching boltdb. Co-Authored-By: Claude Sonnet 4.6 --- pkg/config/defaults.go | 6 +++--- pkg/raft/node_test.go | 45 ------------------------------------------ 2 files changed, 3 insertions(+), 48 deletions(-) diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index e9e9906183..2a8d2b4129 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -119,9 +119,9 @@ func DefaultConfig() Config { HeartbeatTimeout: 350 * time.Millisecond, LeaderLeaseTimeout: 175 * time.Millisecond, ElectionTimeout: 1000 * time.Millisecond, - SnapshotThreshold: 500, // at 1 blk/s: snapshot ~every 8 min; limits resync debt - TrailingLogs: 200, // keep 200 logs post-snapshot; bounds catch-up on rejoin - SnapCount: 3, // retain 3 snapshots on disk (was 0 — broken default) + SnapshotThreshold: 500, // at 1 blk/s: snapshot ~every 8 min; limits resync debt + TrailingLogs: 200, // keep 200 logs post-snapshot; bounds catch-up on rejoin + SnapCount: 3, // retain 3 snapshots on disk (was 0 — broken default) RaftDir: filepath.Join(DefaultRootDir, "raft"), }, Pruning: PruningConfig{ diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index c8a362ecc0..de1fea97e1 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -4,10 +4,8 @@ import ( "context" "errors" "testing" - "time" "github.com/hashicorp/raft" - "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -121,46 +119,3 @@ func TestNodeResignLeader_NilNoop(t *testing.T) { var n *Node assert.NoError(t, n.ResignLeader()) } - -func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { - // A raft node that hasn't bootstrapped is never leader. - // Use a temp dir so boltdb can initialize. - dir := t.TempDir() - n, err := NewNode(&Config{ - NodeID: "test", - RaftAddr: "127.0.0.1:0", - RaftDir: dir, - SnapCount: 3, - SendTimeout: 200 * time.Millisecond, - HeartbeatTimeout: 350 * time.Millisecond, - LeaderLeaseTimeout: 175 * time.Millisecond, - }, zerolog.Nop()) - require.NoError(t, err) - defer n.raft.Shutdown() - - assert.NoError(t, n.ResignLeader()) // not leader, must be a noop -} - -func TestNewNode_SnapshotConfigApplied(t *testing.T) { - dir := t.TempDir() - cfg := &Config{ - NodeID: "test", - RaftAddr: "127.0.0.1:0", - RaftDir: dir, - SnapCount: 3, - SendTimeout: 200 * time.Millisecond, - HeartbeatTimeout: 350 * time.Millisecond, - LeaderLeaseTimeout: 175 * time.Millisecond, - ElectionTimeout: 500 * time.Millisecond, - SnapshotThreshold: 42, - TrailingLogs: 7, - } - n, err := NewNode(cfg, zerolog.Nop()) - require.NoError(t, err) - defer n.raft.Shutdown() - - // Verify the config was stored and raft started without error. - assert.Equal(t, cfg.SnapshotThreshold, n.config.SnapshotThreshold) - assert.Equal(t, cfg.TrailingLogs, n.config.TrailingLogs) - assert.Equal(t, cfg.ElectionTimeout, n.config.ElectionTimeout) -} From 84b70d4cc0e51d018088cb6f5eebb8eda0e04a0d Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 12:14:54 +0200 Subject: [PATCH 11/23] fix(raft): suppress boltdb 'Rollback failed: tx closed' log noise MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit hashicorp/raft-boltdb uses defer tx.Rollback() as a safety net on every transaction. When Commit() succeeds, the deferred Rollback() returns bolt.ErrTxClosed and raft-boltdb logs it as an error — even though it is the expected outcome of every successful read or write. The message has no actionable meaning and floods logs at high block rates. Add a one-time stdlib log filter (sync.Once in NewNode) that silently drops lines containing 'tx closed' and forwards everything else to stderr. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 0791fc3b3a..47ddee101e 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -1,10 +1,12 @@ package raft import ( + "bytes" "context" "errors" "fmt" "io" + "log" "net" "os" "path/filepath" @@ -19,6 +21,24 @@ import ( "google.golang.org/protobuf/proto" ) +// suppressBoltNoise redirects the stdlib log output once to drop the +// "Rollback failed: tx closed" messages emitted by hashicorp/raft-boltdb. +// boltdb returns ErrTxClosed when Rollback is called after a successful +// Commit; raft-boltdb unconditionally logs this as an error even though it +// is the expected outcome of every successful transaction. +var suppressBoltNoise sync.Once + +// boltTxClosedFilter is an io.Writer that silently drops log lines containing +// "tx closed" and forwards everything else to the underlying writer. +type boltTxClosedFilter struct{ w io.Writer } + +func (f *boltTxClosedFilter) Write(p []byte) (n int, err error) { + if bytes.Contains(p, []byte("tx closed")) { + return len(p), nil + } + return f.w.Write(p) +} + // Node represents a raft consensus node type Node struct { raft *raft.Raft @@ -53,6 +73,9 @@ type FSM struct { // NewNode creates a new raft node func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { + suppressBoltNoise.Do(func() { + log.SetOutput(&boltTxClosedFilter{w: os.Stderr}) + }) if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { return nil, fmt.Errorf("create raft dir: %w", err) } From 30ef514fd009975200953b7e27c4207ead0e87c6 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:00:54 +0200 Subject: [PATCH 12/23] =?UTF-8?q?fix(raft):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20shutdown=20wiring,=20error=20logging,=20snap=20docs?= =?UTF-8?q?,=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Call raftRetriever.Stop() in Syncer.Stop() so SetApplyCallback(nil) is actually reached and the goroutine is awaited before wg.Wait() - Log leadershipTransfer error at warn level in Node.Stop() instead of discarding it silently - Fix SnapCount comments in config.go: it retains snapshot files on disk (NewFileSnapshotStore retain param), not log-entry frequency - Extract buildRaftConfig helper from NewNode to enable config wiring tests - Add TestNodeResignLeader_NotLeaderNoop (non-nil node, nil raft → noop) - Add TestNewNode_SnapshotConfigApplied (table-driven, verifies SnapshotThreshold and TrailingLogs wiring with custom and zero values) Co-Authored-By: Claude Sonnet 4.6 --- block/internal/syncing/syncer.go | 5 ++++ pkg/config/config.go | 6 ++--- pkg/raft/node.go | 26 +++++++++++++------- pkg/raft/node_test.go | 41 ++++++++++++++++++++++++++++++++ 4 files changed, 66 insertions(+), 12 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index dbb0e9f8ab..8e34c7f3fe 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -243,6 +243,11 @@ func (s *Syncer) Stop(ctx context.Context) error { if s.daFollower != nil { s.daFollower.Stop() } + + if s.raftRetriever != nil { + s.raftRetriever.Stop() + } + s.wg.Wait() // Skip draining if we're shutting down due to a critical error (e.g. execution diff --git a/pkg/config/config.go b/pkg/config/config.go index cd34158193..1a57ac7d7e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -187,7 +187,7 @@ const ( FlagRaftBootstrap = FlagPrefixEvnode + "raft.bootstrap" // FlagRaftPeers is a flag for specifying Raft peer addresses FlagRaftPeers = FlagPrefixEvnode + "raft.peers" - // FlagRaftSnapCount is a flag for specifying snapshot frequency + // FlagRaftSnapCount is a flag for specifying how many snapshot files to retain on disk FlagRaftSnapCount = FlagPrefixEvnode + "raft.snap_count" // FlagRaftSendTimeout max time to wait for a message to be sent to a peer FlagRaftSendTimeout = FlagPrefixEvnode + "raft.send_timeout" @@ -408,7 +408,7 @@ type RaftConfig struct { RaftDir string `mapstructure:"raft_dir" yaml:"raft_dir" comment:"Directory for Raft logs and snapshots"` Bootstrap bool `mapstructure:"bootstrap" yaml:"bootstrap" comment:"Bootstrap a new static Raft cluster during initial bring-up"` Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"` - SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of log entries between snapshots"` + SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of snapshot files to retain on disk"` SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"` HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"` LeaderLeaseTimeout time.Duration `mapstructure:"leader_lease_timeout" yaml:"leader_lease_timeout" comment:"Duration of the leader lease"` @@ -657,7 +657,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().String(FlagRaftDir, def.Raft.RaftDir, "directory for Raft logs and snapshots") cmd.Flags().Bool(FlagRaftBootstrap, def.Raft.Bootstrap, "bootstrap a new static Raft cluster during initial bring-up") cmd.Flags().String(FlagRaftPeers, def.Raft.Peers, "comma-separated list of peer Raft addresses (nodeID@host:port)") - cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of log entries between snapshots") + cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of snapshot files to retain on disk") cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer") cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers") cmd.Flags().Duration(FlagRaftLeaderLeaseTimeout, def.Raft.LeaderLeaseTimeout, "duration of the leader lease") diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 47ddee101e..26ebafe099 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -72,14 +72,8 @@ type FSM struct { } // NewNode creates a new raft node -func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { - suppressBoltNoise.Do(func() { - log.SetOutput(&boltTxClosedFilter{w: os.Stderr}) - }) - if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { - return nil, fmt.Errorf("create raft dir: %w", err) - } - +// buildRaftConfig converts a Node Config into a hashicorp/raft Config. +func buildRaftConfig(cfg *Config) *raft.Config { raftConfig := raft.DefaultConfig() raftConfig.LocalID = raft.ServerID(cfg.NodeID) raftConfig.LogLevel = "INFO" @@ -94,6 +88,18 @@ func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { if cfg.TrailingLogs > 0 { raftConfig.TrailingLogs = cfg.TrailingLogs } + return raftConfig +} + +func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { + suppressBoltNoise.Do(func() { + log.SetOutput(&boltTxClosedFilter{w: os.Stderr}) + }) + if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { + return nil, fmt.Errorf("create raft dir: %w", err) + } + + raftConfig := buildRaftConfig(cfg) startPointer := new(atomic.Pointer[RaftBlockState]) startPointer.Store(&RaftBlockState{}) @@ -218,7 +224,9 @@ func (n *Node) Stop() error { n.logger.Warn().Err(err).Msg("timed out waiting for raft messages to land during shutdown") } if n.IsLeader() { - _ = n.leadershipTransfer() + if err := n.leadershipTransfer(); err != nil { + n.logger.Warn().Err(err).Msg("leadership transfer on shutdown failed") + } } return n.raft.Shutdown().Error() } diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index de1fea97e1..e68856e24e 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -119,3 +119,44 @@ func TestNodeResignLeader_NilNoop(t *testing.T) { var n *Node assert.NoError(t, n.ResignLeader()) } + +func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { + // Non-nil node with nil raft field — IsLeader() returns false, no transfer attempted. + n := &Node{} + assert.NoError(t, n.ResignLeader()) +} + +func TestNewNode_SnapshotConfigApplied(t *testing.T) { + specs := map[string]struct { + cfg *Config + expectedSnapshotThreshold uint64 + expectedTrailingLogs uint64 + }{ + "custom values applied": { + cfg: &Config{ + NodeID: "node1", + SnapshotThreshold: 1000, + TrailingLogs: 500, + }, + expectedSnapshotThreshold: 1000, + expectedTrailingLogs: 500, + }, + "zero values use defaults": { + cfg: &Config{ + NodeID: "node1", + SnapshotThreshold: 0, + TrailingLogs: 0, + }, + expectedSnapshotThreshold: raft.DefaultConfig().SnapshotThreshold, + expectedTrailingLogs: raft.DefaultConfig().TrailingLogs, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + rc := buildRaftConfig(spec.cfg) + assert.Equal(t, spec.expectedSnapshotThreshold, rc.SnapshotThreshold) + assert.Equal(t, spec.expectedTrailingLogs, rc.TrailingLogs) + }) + } +} From 3d94b75cfe7be281a4b733417530d1fb1754e580 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:14:47 +0200 Subject: [PATCH 13/23] =?UTF-8?q?fix(raft):=20address=20code=20review=20is?= =?UTF-8?q?sues=20=E2=80=94=20ShutdownTimeout,=20resign=20fence,=20electio?= =?UTF-8?q?n=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add ShutdownTimeout field (default 5s) to raft Config so Stop() drains committed logs with a meaningful timeout instead of the 200ms SendTimeout - Wrap ResignLeader() in a 3s goroutine+select fence in the SIGTERM handler so a hung leadership transfer cannot block graceful shutdown indefinitely - Validate ElectionTimeout >= HeartbeatTimeout in RaftConfig.Validate() to prevent hashicorp/raft panicking at startup with an invalid config - Fix stale "NewNode creates a new raft node" comment that had migrated onto buildRaftConfig after the function was extracted Co-Authored-By: Claude Sonnet 4.6 --- node/full.go | 1 + pkg/cmd/run_node.go | 17 +++++++++++++---- pkg/config/config.go | 12 ++++++++++++ pkg/config/config_test.go | 16 +++++++++++++++- pkg/config/defaults.go | 1 + pkg/raft/node.go | 4 ++-- 6 files changed, 44 insertions(+), 7 deletions(-) diff --git a/node/full.go b/node/full.go index f6d2dcffc8..fb59ccac2f 100644 --- a/node/full.go +++ b/node/full.go @@ -155,6 +155,7 @@ func initRaftNode(nodeConfig config.Config, logger zerolog.Logger) (*raftpkg.Nod Bootstrap: nodeConfig.Raft.Bootstrap, SnapCount: nodeConfig.Raft.SnapCount, SendTimeout: nodeConfig.Raft.SendTimeout, + ShutdownTimeout: nodeConfig.Raft.ShutdownTimeout, HeartbeatTimeout: nodeConfig.Raft.HeartbeatTimeout, LeaderLeaseTimeout: nodeConfig.Raft.LeaderLeaseTimeout, ElectionTimeout: nodeConfig.Raft.ElectionTimeout, diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 28fda1623a..ff6875525f 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -228,10 +228,19 @@ func StartNode( // This gives the cluster a chance to elect a new leader before this node // stops producing blocks, shrinking the unconfirmed-block window. if resigner, ok := rollnode.(node.LeaderResigner); ok { - if err := resigner.ResignLeader(); err != nil { - logger.Warn().Err(err).Msg("leadership resign on shutdown failed") - } else { - logger.Info().Msg("leadership resigned before shutdown") + resignCtx, resignCancel := context.WithTimeout(context.Background(), 3*time.Second) + defer resignCancel() + resignDone := make(chan error, 1) + go func() { resignDone <- resigner.ResignLeader() }() + select { + case err := <-resignDone: + if err != nil { + logger.Warn().Err(err).Msg("leadership resign on shutdown failed") + } else { + logger.Info().Msg("leadership resigned before shutdown") + } + case <-resignCtx.Done(): + logger.Warn().Msg("leadership resign timed out") } } cancel() diff --git a/pkg/config/config.go b/pkg/config/config.go index 1a57ac7d7e..47e84e4c4b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -201,6 +201,8 @@ const ( FlagRaftSnapshotThreshold = FlagPrefixEvnode + "raft.snapshot_threshold" // FlagRaftTrailingLogs is the flag for the number of trailing logs after a snapshot. FlagRaftTrailingLogs = FlagPrefixEvnode + "raft.trailing_logs" + // FlagRaftShutdownTimeout is the flag for how long to wait for committed logs to be applied on graceful shutdown. + FlagRaftShutdownTimeout = FlagPrefixEvnode + "raft.shutdown_timeout" // Pruning configuration flags FlagPruningMode = FlagPrefixEvnode + "pruning.pruning_mode" @@ -410,6 +412,7 @@ type RaftConfig struct { Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"` SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of snapshot files to retain on disk"` SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"` + ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout" yaml:"shutdown_timeout" comment:"Max duration to wait for committed raft logs to be applied on graceful shutdown"` HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"` LeaderLeaseTimeout time.Duration `mapstructure:"leader_lease_timeout" yaml:"leader_lease_timeout" comment:"Duration of the leader lease"` ElectionTimeout time.Duration `mapstructure:"election_timeout" yaml:"election_timeout" comment:"Time a candidate waits for votes before restarting election; must be >= heartbeat_timeout"` @@ -436,6 +439,10 @@ func (c RaftConfig) Validate() error { multiErr = errors.Join(multiErr, fmt.Errorf("send timeout must be positive")) } + if c.ShutdownTimeout <= 0 { + multiErr = errors.Join(multiErr, fmt.Errorf("shutdown timeout must be positive")) + } + if c.HeartbeatTimeout <= 0 { multiErr = errors.Join(multiErr, fmt.Errorf("heartbeat timeout must be positive")) } @@ -444,6 +451,10 @@ func (c RaftConfig) Validate() error { multiErr = errors.Join(multiErr, fmt.Errorf("leader lease timeout must be positive")) } + if c.ElectionTimeout > 0 && c.ElectionTimeout < c.HeartbeatTimeout { + multiErr = errors.Join(multiErr, fmt.Errorf("election timeout (%v) must be >= heartbeat timeout (%v)", c.ElectionTimeout, c.HeartbeatTimeout)) + } + return multiErr } @@ -659,6 +670,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().String(FlagRaftPeers, def.Raft.Peers, "comma-separated list of peer Raft addresses (nodeID@host:port)") cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of snapshot files to retain on disk") cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer") + cmd.Flags().Duration(FlagRaftShutdownTimeout, def.Raft.ShutdownTimeout, "max duration to wait for committed raft logs to be applied on graceful shutdown") cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers") cmd.Flags().Duration(FlagRaftLeaderLeaseTimeout, def.Raft.LeaderLeaseTimeout, "duration of the leader lease") cmd.Flags().Duration(FlagRaftElectionTimeout, def.Raft.ElectionTimeout, "time a candidate waits for votes before restarting election") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 99bb3f1392..f01d4ff992 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -131,6 +131,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRaftPeers, DefaultConfig().Raft.Peers) assertFlagValue(t, flags, FlagRaftSnapCount, DefaultConfig().Raft.SnapCount) assertFlagValue(t, flags, FlagRaftSendTimeout, DefaultConfig().Raft.SendTimeout) + assertFlagValue(t, flags, FlagRaftShutdownTimeout, DefaultConfig().Raft.ShutdownTimeout) assertFlagValue(t, flags, FlagRaftHeartbeatTimeout, DefaultConfig().Raft.HeartbeatTimeout) assertFlagValue(t, flags, FlagRaftLeaderLeaseTimeout, DefaultConfig().Raft.LeaderLeaseTimeout) assertFlagValue(t, flags, FlagRaftElectionTimeout, DefaultConfig().Raft.ElectionTimeout) @@ -143,7 +144,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 81 // Update this number if you add more flag checks above + expectedFlagCount := 82 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 @@ -404,8 +405,10 @@ func TestRaftConfig_Validate(t *testing.T) { Peers: "", SnapCount: 1, SendTimeout: 1 * time.Second, + ShutdownTimeout: 5 * time.Second, HeartbeatTimeout: 1 * time.Second, LeaderLeaseTimeout: 1 * time.Second, + ElectionTimeout: 2 * time.Second, } } @@ -443,6 +446,17 @@ func TestRaftConfig_Validate(t *testing.T) { mutate: func(c *RaftConfig) { c.LeaderLeaseTimeout = 0 }, expErr: "leader lease timeout must be positive", }, + "non-positive shutdown timeout": { + mutate: func(c *RaftConfig) { c.ShutdownTimeout = 0 }, + expErr: "shutdown timeout must be positive", + }, + "election timeout less than heartbeat timeout": { + mutate: func(c *RaftConfig) { c.ElectionTimeout = 500 * time.Millisecond }, + expErr: "election timeout (500ms) must be >= heartbeat timeout (1s)", + }, + "zero election timeout skips check": { + mutate: func(c *RaftConfig) { c.ElectionTimeout = 0 }, + }, "multiple invalid returns last": { mutate: func(c *RaftConfig) { c.NodeID = "" diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 2a8d2b4129..dec3e6e0e5 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -116,6 +116,7 @@ func DefaultConfig() Config { }, Raft: RaftConfig{ SendTimeout: 200 * time.Millisecond, + ShutdownTimeout: 5 * time.Second, HeartbeatTimeout: 350 * time.Millisecond, LeaderLeaseTimeout: 175 * time.Millisecond, ElectionTimeout: 1000 * time.Millisecond, diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 26ebafe099..3b2f30b0d4 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -56,6 +56,7 @@ type Config struct { Peers []string SnapCount uint64 SendTimeout time.Duration + ShutdownTimeout time.Duration HeartbeatTimeout time.Duration LeaderLeaseTimeout time.Duration ElectionTimeout time.Duration @@ -71,7 +72,6 @@ type FSM struct { applyCh chan<- RaftApplyMsg } -// NewNode creates a new raft node // buildRaftConfig converts a Node Config into a hashicorp/raft Config. func buildRaftConfig(cfg *Config) *raft.Config { raftConfig := raft.DefaultConfig() @@ -220,7 +220,7 @@ func (n *Node) Stop() error { } // Wait for FSM to apply all committed logs before shutdown to prevent state loss. // This ensures pending raft messages are processed before the node stops. - if err := n.waitForMsgsLanded(n.config.SendTimeout); err != nil { + if err := n.waitForMsgsLanded(n.config.ShutdownTimeout); err != nil { n.logger.Warn().Err(err).Msg("timed out waiting for raft messages to land during shutdown") } if n.IsLeader() { From 9ed4946f8812d867965b6013406bc651e429c2e5 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:24:19 +0200 Subject: [PATCH 14/23] style(raft): fix gci struct field alignment in node_test.go gofmt/gci requires minimal alignment; excessive spaces in the TestNewNode_SnapshotConfigApplied struct literal caused a lint failure. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index e68856e24e..9d67f3d19e 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -128,9 +128,9 @@ func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { func TestNewNode_SnapshotConfigApplied(t *testing.T) { specs := map[string]struct { - cfg *Config - expectedSnapshotThreshold uint64 - expectedTrailingLogs uint64 + cfg *Config + expectedSnapshotThreshold uint64 + expectedTrailingLogs uint64 }{ "custom values applied": { cfg: &Config{ From a2a2599a84377b388c3de3dd28b0f18732a5affd Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 15:53:57 +0200 Subject: [PATCH 15/23] test: improve patch coverage for raft shutdown and resign paths Add unit tests for lines flagged by Codecov: - boltTxClosedFilter.Write: filter drops "tx closed", forwards others - buildRaftConfig: ElectionTimeout > 0 applied, zero uses default - FullNode.ResignLeader: nil raftNode no-op; non-leader raftNode no-op - Syncer.Stop: raftRetriever.Stop is called when raftRetriever is set - Syncer.RecoverFromRaft: GetHeader failure when local state is ahead of stale raft snapshot returns "cannot verify hash" error Co-Authored-By: Claude Sonnet 4.6 --- block/internal/syncing/syncer_test.go | 53 +++++++++++++++++++++ node/full_node_test.go | 13 ++++++ pkg/raft/node_test.go | 66 +++++++++++++++++++++++++++ 3 files changed, 132 insertions(+) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 66ac7e9e05..4a638c47f1 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -532,6 +532,59 @@ func TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot(t *testing.T) { require.Error(t, err) require.ErrorContains(t, err, "diverged from raft") }) + + t.Run("get header fails returns error", func(t *testing.T) { + // lastState is at height 2; raft snapshot at height 0. + // No block is stored at height 0, so GetHeader fails. + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 0, + Hash: make([]byte, 32), + Header: headerBz1, + Data: dataBz1, + }) + require.Error(t, err) + require.ErrorContains(t, err, "cannot verify hash") + }) +} + +func TestSyncer_Stop_CallsRaftRetrieverStop(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + raftNode := &stubRaftNode{} + s := NewSyncer( + st, + nil, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + genesis.Genesis{}, + nil, + nil, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + raftNode, + ) + + require.NotNil(t, s.raftRetriever, "raftRetriever should be set when raftNode is provided") + + // Manually set cancel so Stop() doesn't bail out early (simulates having been started). + ctx, cancel := context.WithCancel(t.Context()) + s.ctx = ctx + s.cancel = cancel + + require.NoError(t, s.Stop(t.Context())) + + // raftRetriever.Stop clears the apply callback (sets it to nil). + // The stub records each SetApplyCallback call; the last one should be nil. + callbacks := raftNode.recordedCallbacks() + require.NotEmpty(t, callbacks, "expected at least one callback registration") + assert.Nil(t, callbacks[len(callbacks)-1], "last callback should be nil after Stop") } func TestSyncer_processPendingEvents(t *testing.T) { diff --git a/node/full_node_test.go b/node/full_node_test.go index 23c2626f40..2421ab7079 100644 --- a/node/full_node_test.go +++ b/node/full_node_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + raftpkg "github.com/evstack/ev-node/pkg/raft" "github.com/evstack/ev-node/pkg/service" ) @@ -82,3 +83,15 @@ func TestStartInstrumentationServer(t *testing.T) { assert.NoError(err, "Pprof server shutdown should not return error") } } + +func TestFullNode_ResignLeader_NilRaftNode(t *testing.T) { + n := &FullNode{} // raftNode is nil + assert.NoError(t, n.ResignLeader()) +} + +func TestFullNode_ResignLeader_NonLeaderRaftNode(t *testing.T) { + // Empty *raftpkg.Node has nil raft field so IsLeader() returns false; + // ResignLeader() is a no-op and returns nil. + n := &FullNode{raftNode: &raftpkg.Node{}} + assert.NoError(t, n.ResignLeader()) +} diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 9d67f3d19e..7e90aeeb0c 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -1,15 +1,81 @@ package raft import ( + "bytes" "context" "errors" "testing" + "time" "github.com/hashicorp/raft" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestBoltTxClosedFilter_Write(t *testing.T) { + specs := map[string]struct { + input string + expectFwd bool + }{ + "passes through normal log line": { + input: "some normal log message\n", + expectFwd: true, + }, + "drops line containing tx closed": { + input: "Rollback failed: tx closed\n", + expectFwd: false, + }, + "drops line with tx closed anywhere": { + input: "error: tx closed due to commit\n", + expectFwd: false, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + var buf bytes.Buffer + f := &boltTxClosedFilter{w: &buf} + n, err := f.Write([]byte(spec.input)) + require.NoError(t, err) + assert.Equal(t, len(spec.input), n) + if spec.expectFwd { + assert.Equal(t, spec.input, buf.String()) + } else { + assert.Empty(t, buf.String()) + } + }) + } +} + +func TestBuildRaftConfig_ElectionTimeout(t *testing.T) { + specs := map[string]struct { + cfg *Config + expectedElectionTimeout time.Duration + }{ + "custom election timeout is applied": { + cfg: &Config{ + NodeID: "node1", + ElectionTimeout: 500 * time.Millisecond, + }, + expectedElectionTimeout: 500 * time.Millisecond, + }, + "zero election timeout uses default": { + cfg: &Config{ + NodeID: "node1", + ElectionTimeout: 0, + }, + expectedElectionTimeout: raft.DefaultConfig().ElectionTimeout, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + rc := buildRaftConfig(spec.cfg) + assert.Equal(t, spec.expectedElectionTimeout, rc.ElectionTimeout) + }) + } +} + func TestSplitPeerAddr(t *testing.T) { specs := map[string]struct { in string From 4d105a27baece54fcdae427d50dcb2761f20e3db Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 19:22:19 +0200 Subject: [PATCH 16/23] fix(config): reject negative ElectionTimeout in RaftConfig.Validate A negative ElectionTimeout was silently ignored (buildRaftConfig only applies values > 0), allowing a misconfigured node to start with the library default instead of failing fast. Add an explicit < 0 check that returns an error; 0 remains valid as the "use library default" sentinel. Co-Authored-By: Claude Sonnet 4.6 --- pkg/config/config.go | 4 +++- pkg/config/config_test.go | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 47e84e4c4b..d0a334b7a6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -451,7 +451,9 @@ func (c RaftConfig) Validate() error { multiErr = errors.Join(multiErr, fmt.Errorf("leader lease timeout must be positive")) } - if c.ElectionTimeout > 0 && c.ElectionTimeout < c.HeartbeatTimeout { + if c.ElectionTimeout < 0 { + multiErr = errors.Join(multiErr, fmt.Errorf("election timeout (%v) must be >= 0", c.ElectionTimeout)) + } else if c.ElectionTimeout > 0 && c.ElectionTimeout < c.HeartbeatTimeout { multiErr = errors.Join(multiErr, fmt.Errorf("election timeout (%v) must be >= heartbeat timeout (%v)", c.ElectionTimeout, c.HeartbeatTimeout)) } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index f01d4ff992..483bfbee9c 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -450,6 +450,10 @@ func TestRaftConfig_Validate(t *testing.T) { mutate: func(c *RaftConfig) { c.ShutdownTimeout = 0 }, expErr: "shutdown timeout must be positive", }, + "negative election timeout rejected": { + mutate: func(c *RaftConfig) { c.ElectionTimeout = -1 * time.Second }, + expErr: "election timeout (-1s) must be >= 0", + }, "election timeout less than heartbeat timeout": { mutate: func(c *RaftConfig) { c.ElectionTimeout = 500 * time.Millisecond }, expErr: "election timeout (500ms) must be >= heartbeat timeout (1s)", From fc023b878e8d371310d560e4d76ba56db6da5082 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 19:22:29 +0200 Subject: [PATCH 17/23] fix(raft): preserve stdlib logger writer in bolt filter; propagate ctx through ResignLeader - suppressBoltNoise.Do now wraps log.Writer() instead of os.Stderr so any existing stdlib logger redirection is preserved rather than clobbered - ResignLeader now accepts a context.Context: leadershipTransfer runs in a goroutine and a select abandons the caller at ctx.Done(), returning ctx.Err(); the goroutine itself exits once the inner raft transfer completes (bounded by ElectionTimeout) Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 14 +++++++++++--- pkg/raft/node_test.go | 4 ++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 3b2f30b0d4..0d136a9539 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -93,7 +93,7 @@ func buildRaftConfig(cfg *Config) *raft.Config { func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { suppressBoltNoise.Do(func() { - log.SetOutput(&boltTxClosedFilter{w: os.Stderr}) + log.SetOutput(&boltTxClosedFilter{w: log.Writer()}) }) if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { return nil, fmt.Errorf("create raft dir: %w", err) @@ -278,11 +278,19 @@ func (n *Node) leadershipTransfer() error { // It is a no-op when the node is nil or not currently the leader. // Call this before cancelling the node context on graceful shutdown to minimise // the window where a dying leader could still serve blocks. -func (n *Node) ResignLeader() error { +// The transfer is abandoned and ctx.Err() is returned if ctx expires first. +func (n *Node) ResignLeader(ctx context.Context) error { if n == nil || !n.IsLeader() { return nil } - return n.leadershipTransfer() + done := make(chan error, 1) + go func() { done <- n.leadershipTransfer() }() + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } } func (n *Node) Config() Config { diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 7e90aeeb0c..8516f7cf9d 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -183,13 +183,13 @@ func TestNodeStartNilNoop(t *testing.T) { func TestNodeResignLeader_NilNoop(t *testing.T) { var n *Node - assert.NoError(t, n.ResignLeader()) + assert.NoError(t, n.ResignLeader(context.Background())) } func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { // Non-nil node with nil raft field — IsLeader() returns false, no transfer attempted. n := &Node{} - assert.NoError(t, n.ResignLeader()) + assert.NoError(t, n.ResignLeader(context.Background())) } func TestNewNode_SnapshotConfigApplied(t *testing.T) { From 2d3dc8eab1f3de52bc536aface2a07f16332c481 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 10 Apr 2026 19:22:35 +0200 Subject: [PATCH 18/23] fix(node): propagate context through LeaderResigner.ResignLeader interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - LeaderResigner.ResignLeader() → ResignLeader(ctx context.Context) error - FullNode.ResignLeader passes ctx down to raft.Node.ResignLeader - run_node.go calls resigner.ResignLeader(resignCtx) directly — no wrapper goroutine/select needed; context.DeadlineExceeded vs other errors are logged distinctly - Merge TestFullNode_ResignLeader_NilRaftNode and TestFullNode_ResignLeader_NonLeaderRaftNode into single table-driven test Co-Authored-By: Claude Sonnet 4.6 --- node/full.go | 4 ++-- node/full_node_test.go | 24 ++++++++++++++---------- node/node.go | 4 +++- pkg/cmd/run_node.go | 15 ++++++--------- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/node/full.go b/node/full.go index fb59ccac2f..870d1fdb2a 100644 --- a/node/full.go +++ b/node/full.go @@ -392,9 +392,9 @@ func (n *FullNode) IsRunning() bool { // ResignLeader transfers raft leadership before the node shuts down. // It is a no-op when raft is not enabled or this node is not the leader. -func (n *FullNode) ResignLeader() error { +func (n *FullNode) ResignLeader(ctx context.Context) error { if n.raftNode == nil { return nil } - return n.raftNode.ResignLeader() + return n.raftNode.ResignLeader(ctx) } diff --git a/node/full_node_test.go b/node/full_node_test.go index 2421ab7079..a2b874ba18 100644 --- a/node/full_node_test.go +++ b/node/full_node_test.go @@ -84,14 +84,18 @@ func TestStartInstrumentationServer(t *testing.T) { } } -func TestFullNode_ResignLeader_NilRaftNode(t *testing.T) { - n := &FullNode{} // raftNode is nil - assert.NoError(t, n.ResignLeader()) -} - -func TestFullNode_ResignLeader_NonLeaderRaftNode(t *testing.T) { - // Empty *raftpkg.Node has nil raft field so IsLeader() returns false; - // ResignLeader() is a no-op and returns nil. - n := &FullNode{raftNode: &raftpkg.Node{}} - assert.NoError(t, n.ResignLeader()) +func TestFullNode_ResignLeader_Noop(t *testing.T) { + cases := []struct { + name string + node *FullNode + }{ + {name: "nil raftNode", node: &FullNode{}}, + // Empty *raftpkg.Node has nil raft field so IsLeader() returns false. + {name: "non-leader raftNode", node: &FullNode{raftNode: &raftpkg.Node{}}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + assert.NoError(t, tc.node.ResignLeader(context.Background())) + }) + } } diff --git a/node/node.go b/node/node.go index 4d12463a01..c42721ea75 100644 --- a/node/node.go +++ b/node/node.go @@ -1,6 +1,8 @@ package node import ( + "context" + ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" @@ -25,7 +27,7 @@ type Node interface { // in Raft leader election. Callers should type-assert to this interface and call // ResignLeader before cancelling the node context on graceful shutdown. type LeaderResigner interface { - ResignLeader() error + ResignLeader(ctx context.Context) error } type NodeOptions struct { diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index ff6875525f..8b241c98db 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -230,17 +230,14 @@ func StartNode( if resigner, ok := rollnode.(node.LeaderResigner); ok { resignCtx, resignCancel := context.WithTimeout(context.Background(), 3*time.Second) defer resignCancel() - resignDone := make(chan error, 1) - go func() { resignDone <- resigner.ResignLeader() }() - select { - case err := <-resignDone: - if err != nil { - logger.Warn().Err(err).Msg("leadership resign on shutdown failed") + if err := resigner.ResignLeader(resignCtx); err != nil { + if errors.Is(err, context.DeadlineExceeded) { + logger.Warn().Msg("leadership resign timed out") } else { - logger.Info().Msg("leadership resigned before shutdown") + logger.Warn().Err(err).Msg("leadership resign on shutdown failed") } - case <-resignCtx.Done(): - logger.Warn().Msg("leadership resign timed out") + } else { + logger.Info().Msg("leadership resigned before shutdown") } } cancel() From 40b7251d06aaedf3aa808c6aec3e4b0ca2601e81 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 15 Apr 2026 12:41:57 +0200 Subject: [PATCH 19/23] fix(raft): abdicate leadership when store is significantly behind raft state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a node wins election but its local store is more than 1 block behind the raft FSM state, RecoverFromRaft cannot replay the gap (it only handles the single latest block in the raft snapshot). Previously the node would log "recovery successful" and start leader operations anyway, then stall block production while the executor repeatedly failed to sync the missing blocks from a store that did not have them. Fix: in DynamicLeaderElection.Run, detect diff < -1 at the follower→leader transition and immediately transfer leadership to a better-synced peer. diff == -1 is preserved: RecoverFromRaft can apply exactly one block from the raft snapshot, so that path is unchanged. Closes #3255 Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/election.go | 17 +++++++++++++++ pkg/raft/election_test.go | 46 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/pkg/raft/election.go b/pkg/raft/election.go index 757f07ca27..56b82a9c7b 100644 --- a/pkg/raft/election.go +++ b/pkg/raft/election.go @@ -132,6 +132,23 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error { if err != nil { return err } + if diff < -1 { + // Store is more than 1 block behind raft state. + // RecoverFromRaft can only apply the single latest block + // from the raft snapshot; it cannot replay a larger gap. + // Starting leader operations in this state would stall block + // production until catch-up completes (potentially minutes or + // hours). Abdicate immediately so a better-synced peer can + // take leadership. + d.logger.Warn(). + Int("store_lag_blocks", -diff). + Uint64("raft_height", raftState.Height). + Msg("became leader but store is significantly behind raft state; abdicating to prevent stalled block production") + if tErr := d.node.leadershipTransfer(); tErr != nil { + d.logger.Error().Err(tErr).Msg("leadership transfer failed after store-lag abdication") + } + continue + } if diff != 0 { d.logger.Info().Msg("became leader but not synced, attempting recovery") if err := runnable.Recover(ctx, raftState); err != nil { diff --git a/pkg/raft/election_test.go b/pkg/raft/election_test.go index b29cbda8d7..c56afed10e 100644 --- a/pkg/raft/election_test.go +++ b/pkg/raft/election_test.go @@ -221,6 +221,52 @@ func TestDynamicLeaderElectionRun(t *testing.T) { assert.ErrorIs(t, err, context.Canceled) }, }, + "abdicate when store significantly behind raft": { + setup: func(t *testing.T) (*DynamicLeaderElection, context.Context, context.CancelFunc) { + m := newMocksourceNode(t) + leaderCh := make(chan bool, 2) + m.EXPECT().leaderCh().Return((<-chan bool)(leaderCh)) + // GetState called in verifyState (follower start) and in leader sync check + m.EXPECT().GetState().Return(&RaftBlockState{Height: 10}) + m.EXPECT().GetState().Return(&RaftBlockState{Height: 10}) + m.EXPECT().Config().Return(testCfg()).Times(2) + m.EXPECT().waitForMsgsLanded(2 * time.Millisecond).Return(nil) + m.EXPECT().NodeID().Return("self") + m.EXPECT().leaderID().Return("self") + // Abdication must transfer leadership + m.EXPECT().leadershipTransfer().Return(nil) + + fStarted := make(chan struct{}) + follower := &testRunnable{ + startedCh: fStarted, + isSyncedFn: func(*RaftBlockState) (int, error) { return -5, nil }, + } + // Leader must never start + leader := &testRunnable{runFn: func(ctx context.Context) error { + t.Fatal("leader should not start when store is significantly behind raft") + return nil + }} + + logger := zerolog.Nop() + d := &DynamicLeaderElection{logger: logger, node: m, + leaderFactory: func() (Runnable, error) { return leader, nil }, + followerFactory: func() (Runnable, error) { return follower, nil }, + } + ctx, cancel := context.WithCancel(t.Context()) + go func() { + leaderCh <- false + <-fStarted + leaderCh <- true + time.Sleep(10 * time.Millisecond) + cancel() + }() + return d, ctx, cancel + }, + assertF: func(t *testing.T, err error) { + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) + }, + }, "lost leadership during sync wait": { setup: func(t *testing.T) (*DynamicLeaderElection, context.Context, context.CancelFunc) { m := newMocksourceNode(t) From bf6bb5000ff3f86755e7bf3fabbec0bdda780e67 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 15 Apr 2026 13:03:27 +0200 Subject: [PATCH 20/23] =?UTF-8?q?fix(raft):=20address=20julienrbrt=20revie?= =?UTF-8?q?w=20=E2=80=94=20logger,=20boltdb=20filter,=20ShutdownTimeout?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove stdlib log filter (boltTxClosedFilter / suppressBoltNoise): it redirected the global stdlib logger which is the wrong scope. raft-boltdb uses log.Printf directly with no Logger option, so the "tx closed" noise is now accepted as-is from stderr. - Wire hashicorp/raft's internal hclog output through zerolog: set raft.Config.Logger to an hclog.Logger backed by the zerolog writer so all raft-internal messages appear in the structured log stream under component=raft-hashicorp. - Remove ShutdownTimeout from public config: it was a second "how long to wait" knob that confused operators. ShutdownTimeout is now computed internally as 5 × SendTimeout at the initRaftNode call site. - Delete TestRaftRetrieverStopClearsApplyCallback: tested an implementation detail (Stop clears the apply callback pointer) rather than observable behaviour. The stubRaftNode helper it defined is moved to syncer_test.go where it is still needed. - Rename TestNewNode_SnapshotConfigApplied → TestBuildRaftConfig_SnapshotConfigApplied to reflect that it tests buildRaftConfig, not NewNode. Co-Authored-By: Claude Sonnet 4.6 --- block/internal/syncing/raft_retriever_test.go | 61 ------------------- block/internal/syncing/syncer_test.go | 25 ++++++++ node/full.go | 2 +- pkg/config/config.go | 9 --- pkg/config/config_test.go | 8 +-- pkg/config/defaults.go | 1 - pkg/raft/node.go | 39 ++++-------- pkg/raft/node_test.go | 43 ++----------- 8 files changed, 44 insertions(+), 144 deletions(-) delete mode 100644 block/internal/syncing/raft_retriever_test.go diff --git a/block/internal/syncing/raft_retriever_test.go b/block/internal/syncing/raft_retriever_test.go deleted file mode 100644 index ec176aad2a..0000000000 --- a/block/internal/syncing/raft_retriever_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package syncing - -import ( - "context" - "sync" - "testing" - - "github.com/rs/zerolog" - "github.com/stretchr/testify/require" - - "github.com/evstack/ev-node/pkg/genesis" - pkgraft "github.com/evstack/ev-node/pkg/raft" -) - -type stubRaftNode struct { - mu sync.Mutex - callbacks []chan<- pkgraft.RaftApplyMsg -} - -func (s *stubRaftNode) IsLeader() bool { return false } - -func (s *stubRaftNode) HasQuorum() bool { return false } - -func (s *stubRaftNode) GetState() *pkgraft.RaftBlockState { return nil } - -func (s *stubRaftNode) Broadcast(context.Context, *pkgraft.RaftBlockState) error { return nil } - -func (s *stubRaftNode) SetApplyCallback(ch chan<- pkgraft.RaftApplyMsg) { - s.mu.Lock() - defer s.mu.Unlock() - s.callbacks = append(s.callbacks, ch) -} - -func (s *stubRaftNode) recordedCallbacks() []chan<- pkgraft.RaftApplyMsg { - s.mu.Lock() - defer s.mu.Unlock() - out := make([]chan<- pkgraft.RaftApplyMsg, len(s.callbacks)) - copy(out, s.callbacks) - return out -} - -func TestRaftRetrieverStopClearsApplyCallback(t *testing.T) { - t.Parallel() - - raftNode := &stubRaftNode{} - retriever := newRaftRetriever( - raftNode, - genesis.Genesis{}, - zerolog.Nop(), - nil, - func(context.Context, *pkgraft.RaftBlockState) error { return nil }, - ) - - require.NoError(t, retriever.Start(t.Context())) - retriever.Stop() - - callbacks := raftNode.recordedCallbacks() - require.Len(t, callbacks, 2) - require.NotNil(t, callbacks[0]) - require.Nil(t, callbacks[1]) -} diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 4a638c47f1..f7563f240d 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -6,6 +6,7 @@ import ( "crypto/sha512" "errors" "math" + "sync" "sync/atomic" "testing" "testing/synctest" @@ -35,6 +36,30 @@ import ( "github.com/evstack/ev-node/types" ) +// stubRaftNode is a minimal RaftNode stub that records SetApplyCallback calls. +type stubRaftNode struct { + mu sync.Mutex + callbacks []chan<- raft.RaftApplyMsg +} + +func (s *stubRaftNode) IsLeader() bool { return false } +func (s *stubRaftNode) HasQuorum() bool { return false } +func (s *stubRaftNode) GetState() *raft.RaftBlockState { return nil } +func (s *stubRaftNode) Broadcast(context.Context, *raft.RaftBlockState) error { return nil } +func (s *stubRaftNode) SetApplyCallback(ch chan<- raft.RaftApplyMsg) { + s.mu.Lock() + defer s.mu.Unlock() + s.callbacks = append(s.callbacks, ch) +} + +func (s *stubRaftNode) recordedCallbacks() []chan<- raft.RaftApplyMsg { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]chan<- raft.RaftApplyMsg, len(s.callbacks)) + copy(out, s.callbacks) + return out +} + // helper to create a signer, pubkey and address for tests func buildSyncTestSigner(tb testing.TB) (addr []byte, pub crypto.PubKey, signer signerpkg.Signer) { tb.Helper() diff --git a/node/full.go b/node/full.go index 870d1fdb2a..bd44f9ef42 100644 --- a/node/full.go +++ b/node/full.go @@ -155,7 +155,7 @@ func initRaftNode(nodeConfig config.Config, logger zerolog.Logger) (*raftpkg.Nod Bootstrap: nodeConfig.Raft.Bootstrap, SnapCount: nodeConfig.Raft.SnapCount, SendTimeout: nodeConfig.Raft.SendTimeout, - ShutdownTimeout: nodeConfig.Raft.ShutdownTimeout, + ShutdownTimeout: 5 * nodeConfig.Raft.SendTimeout, HeartbeatTimeout: nodeConfig.Raft.HeartbeatTimeout, LeaderLeaseTimeout: nodeConfig.Raft.LeaderLeaseTimeout, ElectionTimeout: nodeConfig.Raft.ElectionTimeout, diff --git a/pkg/config/config.go b/pkg/config/config.go index d0a334b7a6..850f098dc7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -201,9 +201,6 @@ const ( FlagRaftSnapshotThreshold = FlagPrefixEvnode + "raft.snapshot_threshold" // FlagRaftTrailingLogs is the flag for the number of trailing logs after a snapshot. FlagRaftTrailingLogs = FlagPrefixEvnode + "raft.trailing_logs" - // FlagRaftShutdownTimeout is the flag for how long to wait for committed logs to be applied on graceful shutdown. - FlagRaftShutdownTimeout = FlagPrefixEvnode + "raft.shutdown_timeout" - // Pruning configuration flags FlagPruningMode = FlagPrefixEvnode + "pruning.pruning_mode" FlagPruningKeepRecent = FlagPrefixEvnode + "pruning.pruning_keep_recent" @@ -412,7 +409,6 @@ type RaftConfig struct { Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"` SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of snapshot files to retain on disk"` SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"` - ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout" yaml:"shutdown_timeout" comment:"Max duration to wait for committed raft logs to be applied on graceful shutdown"` HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"` LeaderLeaseTimeout time.Duration `mapstructure:"leader_lease_timeout" yaml:"leader_lease_timeout" comment:"Duration of the leader lease"` ElectionTimeout time.Duration `mapstructure:"election_timeout" yaml:"election_timeout" comment:"Time a candidate waits for votes before restarting election; must be >= heartbeat_timeout"` @@ -439,10 +435,6 @@ func (c RaftConfig) Validate() error { multiErr = errors.Join(multiErr, fmt.Errorf("send timeout must be positive")) } - if c.ShutdownTimeout <= 0 { - multiErr = errors.Join(multiErr, fmt.Errorf("shutdown timeout must be positive")) - } - if c.HeartbeatTimeout <= 0 { multiErr = errors.Join(multiErr, fmt.Errorf("heartbeat timeout must be positive")) } @@ -672,7 +664,6 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().String(FlagRaftPeers, def.Raft.Peers, "comma-separated list of peer Raft addresses (nodeID@host:port)") cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of snapshot files to retain on disk") cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer") - cmd.Flags().Duration(FlagRaftShutdownTimeout, def.Raft.ShutdownTimeout, "max duration to wait for committed raft logs to be applied on graceful shutdown") cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers") cmd.Flags().Duration(FlagRaftLeaderLeaseTimeout, def.Raft.LeaderLeaseTimeout, "duration of the leader lease") cmd.Flags().Duration(FlagRaftElectionTimeout, def.Raft.ElectionTimeout, "time a candidate waits for votes before restarting election") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 483bfbee9c..2cb4792189 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -131,7 +131,6 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRaftPeers, DefaultConfig().Raft.Peers) assertFlagValue(t, flags, FlagRaftSnapCount, DefaultConfig().Raft.SnapCount) assertFlagValue(t, flags, FlagRaftSendTimeout, DefaultConfig().Raft.SendTimeout) - assertFlagValue(t, flags, FlagRaftShutdownTimeout, DefaultConfig().Raft.ShutdownTimeout) assertFlagValue(t, flags, FlagRaftHeartbeatTimeout, DefaultConfig().Raft.HeartbeatTimeout) assertFlagValue(t, flags, FlagRaftLeaderLeaseTimeout, DefaultConfig().Raft.LeaderLeaseTimeout) assertFlagValue(t, flags, FlagRaftElectionTimeout, DefaultConfig().Raft.ElectionTimeout) @@ -144,7 +143,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 82 // Update this number if you add more flag checks above + expectedFlagCount := 81 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 @@ -405,7 +404,6 @@ func TestRaftConfig_Validate(t *testing.T) { Peers: "", SnapCount: 1, SendTimeout: 1 * time.Second, - ShutdownTimeout: 5 * time.Second, HeartbeatTimeout: 1 * time.Second, LeaderLeaseTimeout: 1 * time.Second, ElectionTimeout: 2 * time.Second, @@ -446,10 +444,6 @@ func TestRaftConfig_Validate(t *testing.T) { mutate: func(c *RaftConfig) { c.LeaderLeaseTimeout = 0 }, expErr: "leader lease timeout must be positive", }, - "non-positive shutdown timeout": { - mutate: func(c *RaftConfig) { c.ShutdownTimeout = 0 }, - expErr: "shutdown timeout must be positive", - }, "negative election timeout rejected": { mutate: func(c *RaftConfig) { c.ElectionTimeout = -1 * time.Second }, expErr: "election timeout (-1s) must be >= 0", diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index dec3e6e0e5..2a8d2b4129 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -116,7 +116,6 @@ func DefaultConfig() Config { }, Raft: RaftConfig{ SendTimeout: 200 * time.Millisecond, - ShutdownTimeout: 5 * time.Second, HeartbeatTimeout: 350 * time.Millisecond, LeaderLeaseTimeout: 175 * time.Millisecond, ElectionTimeout: 1000 * time.Millisecond, diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 0d136a9539..25ca482b88 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -1,12 +1,10 @@ package raft import ( - "bytes" "context" "errors" "fmt" "io" - "log" "net" "os" "path/filepath" @@ -15,30 +13,13 @@ import ( "sync/atomic" "time" + hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/rs/zerolog" "google.golang.org/protobuf/proto" ) -// suppressBoltNoise redirects the stdlib log output once to drop the -// "Rollback failed: tx closed" messages emitted by hashicorp/raft-boltdb. -// boltdb returns ErrTxClosed when Rollback is called after a successful -// Commit; raft-boltdb unconditionally logs this as an error even though it -// is the expected outcome of every successful transaction. -var suppressBoltNoise sync.Once - -// boltTxClosedFilter is an io.Writer that silently drops log lines containing -// "tx closed" and forwards everything else to the underlying writer. -type boltTxClosedFilter struct{ w io.Writer } - -func (f *boltTxClosedFilter) Write(p []byte) (n int, err error) { - if bytes.Contains(p, []byte("tx closed")) { - return len(p), nil - } - return f.w.Write(p) -} - // Node represents a raft consensus node type Node struct { raft *raft.Raft @@ -73,10 +54,19 @@ type FSM struct { } // buildRaftConfig converts a Node Config into a hashicorp/raft Config. -func buildRaftConfig(cfg *Config) *raft.Config { +// logger is used to bridge hashicorp/raft's internal hclog output to zerolog. +func buildRaftConfig(cfg *Config, logger zerolog.Logger) *raft.Config { raftConfig := raft.DefaultConfig() raftConfig.LocalID = raft.ServerID(cfg.NodeID) - raftConfig.LogLevel = "INFO" + // Route raft's internal hclog messages through zerolog so all log output is + // consistent. hclog writes formatted text lines; zerolog receives them via + // its io.Writer implementation and emits them as structured JSON. + raftConfig.Logger = hclog.New(&hclog.LoggerOptions{ + Name: "raft", + Level: hclog.Info, + Output: logger.With().Str("component", "raft-hashicorp").Logger(), + DisableTime: true, // zerolog adds its own timestamp + }) raftConfig.HeartbeatTimeout = cfg.HeartbeatTimeout raftConfig.LeaderLeaseTimeout = cfg.LeaderLeaseTimeout if cfg.ElectionTimeout > 0 { @@ -92,14 +82,11 @@ func buildRaftConfig(cfg *Config) *raft.Config { } func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { - suppressBoltNoise.Do(func() { - log.SetOutput(&boltTxClosedFilter{w: log.Writer()}) - }) if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { return nil, fmt.Errorf("create raft dir: %w", err) } - raftConfig := buildRaftConfig(cfg) + raftConfig := buildRaftConfig(cfg, logger) startPointer := new(atomic.Pointer[RaftBlockState]) startPointer.Store(&RaftBlockState{}) diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 8516f7cf9d..870d3e22b7 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -1,52 +1,17 @@ package raft import ( - "bytes" "context" "errors" "testing" "time" "github.com/hashicorp/raft" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestBoltTxClosedFilter_Write(t *testing.T) { - specs := map[string]struct { - input string - expectFwd bool - }{ - "passes through normal log line": { - input: "some normal log message\n", - expectFwd: true, - }, - "drops line containing tx closed": { - input: "Rollback failed: tx closed\n", - expectFwd: false, - }, - "drops line with tx closed anywhere": { - input: "error: tx closed due to commit\n", - expectFwd: false, - }, - } - - for name, spec := range specs { - t.Run(name, func(t *testing.T) { - var buf bytes.Buffer - f := &boltTxClosedFilter{w: &buf} - n, err := f.Write([]byte(spec.input)) - require.NoError(t, err) - assert.Equal(t, len(spec.input), n) - if spec.expectFwd { - assert.Equal(t, spec.input, buf.String()) - } else { - assert.Empty(t, buf.String()) - } - }) - } -} - func TestBuildRaftConfig_ElectionTimeout(t *testing.T) { specs := map[string]struct { cfg *Config @@ -70,7 +35,7 @@ func TestBuildRaftConfig_ElectionTimeout(t *testing.T) { for name, spec := range specs { t.Run(name, func(t *testing.T) { - rc := buildRaftConfig(spec.cfg) + rc := buildRaftConfig(spec.cfg, zerolog.Nop()) assert.Equal(t, spec.expectedElectionTimeout, rc.ElectionTimeout) }) } @@ -192,7 +157,7 @@ func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { assert.NoError(t, n.ResignLeader(context.Background())) } -func TestNewNode_SnapshotConfigApplied(t *testing.T) { +func TestBuildRaftConfig_SnapshotConfigApplied(t *testing.T) { specs := map[string]struct { cfg *Config expectedSnapshotThreshold uint64 @@ -220,7 +185,7 @@ func TestNewNode_SnapshotConfigApplied(t *testing.T) { for name, spec := range specs { t.Run(name, func(t *testing.T) { - rc := buildRaftConfig(spec.cfg) + rc := buildRaftConfig(spec.cfg, zerolog.Nop()) assert.Equal(t, spec.expectedSnapshotThreshold, rc.SnapshotThreshold) assert.Equal(t, spec.expectedTrailingLogs, rc.TrailingLogs) }) From f90c60078692cf190be01f019bc36f6cee315a89 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 15 Apr 2026 13:07:39 +0200 Subject: [PATCH 21/23] fix(ci): promote go-hclog to direct dep; fix gci alignment in syncer_test go mod tidy promotes github.com/hashicorp/go-hclog from indirect to direct now that pkg/raft/node.go imports it explicitly. gci auto-formatted stubRaftNode method stubs in syncer_test.go. Co-Authored-By: Claude Sonnet 4.6 --- block/internal/syncing/syncer_test.go | 8 ++++---- go.mod | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index f7563f240d..67c87e06ed 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -42,10 +42,10 @@ type stubRaftNode struct { callbacks []chan<- raft.RaftApplyMsg } -func (s *stubRaftNode) IsLeader() bool { return false } -func (s *stubRaftNode) HasQuorum() bool { return false } -func (s *stubRaftNode) GetState() *raft.RaftBlockState { return nil } -func (s *stubRaftNode) Broadcast(context.Context, *raft.RaftBlockState) error { return nil } +func (s *stubRaftNode) IsLeader() bool { return false } +func (s *stubRaftNode) HasQuorum() bool { return false } +func (s *stubRaftNode) GetState() *raft.RaftBlockState { return nil } +func (s *stubRaftNode) Broadcast(context.Context, *raft.RaftBlockState) error { return nil } func (s *stubRaftNode) SetApplyCallback(ch chan<- raft.RaftApplyMsg) { s.mu.Lock() defer s.mu.Unlock() diff --git a/go.mod b/go.mod index c320bfdb58..301e3ed86b 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/go-kit/kit v0.13.0 github.com/go-viper/mapstructure/v2 v2.5.0 github.com/goccy/go-yaml v1.19.2 + github.com/hashicorp/go-hclog v1.6.3 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/raft v1.7.3 github.com/hashicorp/raft-boltdb v0.0.0-20251103221153-05f9dd7a5148 @@ -102,7 +103,6 @@ require ( github.com/googleapis/gax-go/v2 v2.20.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect - github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-metrics v0.5.4 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect From 0f6818f18e45a781186df3ebbd7880f1179471e2 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 15 Apr 2026 13:23:13 +0200 Subject: [PATCH 22/23] =?UTF-8?q?fix(raft):=20address=20coderabbitai=20fee?= =?UTF-8?q?dback=20=E2=80=94=20ShutdownTimeout=20clamp,=20transfer=20error?= =?UTF-8?q?=20propagation,=20deterministic=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ShutdownTimeout zero-value panic (critical): NewNode now clamps ShutdownTimeout to 5*SendTimeout when the caller passes zero, preventing a panic in time.NewTicker inside waitForMsgsLanded. The normal path through initRaftNode already sets it explicitly; this guard protects direct callers (e.g. tests) that omit the field. Leadership transfer error propagation (major): When store-lag abdication calls leadershipTransfer() and it fails, the error is now returned instead of being logged and silently continuing. Continuing after a failed transfer left the node as leader-without-worker, stalling the cluster. Deterministic abdication test (major): Replace time.Sleep(10ms) + t.Fatal-in-goroutine with channel-based synchronization: leader runFn signals leaderStarted; the test goroutine waits up to 50ms for that signal and calls t.Error (safe from goroutines) if it arrives, then cancels the context either way. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/election.go | 1 + pkg/raft/election_test.go | 20 ++++++++++++++++---- pkg/raft/node.go | 9 +++++++++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/pkg/raft/election.go b/pkg/raft/election.go index 56b82a9c7b..d7941675b5 100644 --- a/pkg/raft/election.go +++ b/pkg/raft/election.go @@ -146,6 +146,7 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error { Msg("became leader but store is significantly behind raft state; abdicating to prevent stalled block production") if tErr := d.node.leadershipTransfer(); tErr != nil { d.logger.Error().Err(tErr).Msg("leadership transfer failed after store-lag abdication") + return fmt.Errorf("leadership transfer failed after store-lag abdication: %w", tErr) } continue } diff --git a/pkg/raft/election_test.go b/pkg/raft/election_test.go index c56afed10e..f025405494 100644 --- a/pkg/raft/election_test.go +++ b/pkg/raft/election_test.go @@ -241,10 +241,15 @@ func TestDynamicLeaderElectionRun(t *testing.T) { startedCh: fStarted, isSyncedFn: func(*RaftBlockState) (int, error) { return -5, nil }, } - // Leader must never start + // Signal if leader ever starts — it must not. + leaderStarted := make(chan struct{}, 1) leader := &testRunnable{runFn: func(ctx context.Context) error { - t.Fatal("leader should not start when store is significantly behind raft") - return nil + select { + case leaderStarted <- struct{}{}: + default: + } + <-ctx.Done() + return ctx.Err() }} logger := zerolog.Nop() @@ -257,7 +262,14 @@ func TestDynamicLeaderElectionRun(t *testing.T) { leaderCh <- false <-fStarted leaderCh <- true - time.Sleep(10 * time.Millisecond) + // Wait for abdication to complete (transfer + continue) then verify + // the leader was never started before cancelling. + select { + case <-leaderStarted: + t.Error("leader should not start when store is significantly behind raft") + case <-time.After(50 * time.Millisecond): + // leadership transferred without starting leader — expected + } cancel() }() return d, ctx, cancel diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 25ca482b88..329a4b2c9f 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -82,6 +82,15 @@ func buildRaftConfig(cfg *Config, logger zerolog.Logger) *raft.Config { } func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { + // Clamp ShutdownTimeout so waitForMsgsLanded never receives a zero or + // negative interval (which would panic in time.NewTicker). Callers such as + // initRaftNode already set this, but direct callers in tests may not. + if cfg.ShutdownTimeout <= 0 { + cfgCopy := *cfg + cfgCopy.ShutdownTimeout = 5 * cfg.SendTimeout + cfg = &cfgCopy + } + if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil { return nil, fmt.Errorf("create raft dir: %w", err) } From 84ec0d0ee0c2f6f6708c43f5997c43b0753949c3 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 15 Apr 2026 16:15:20 +0200 Subject: [PATCH 23/23] docs(changelog): add unreleased entries for raft HA hardening (#3230) Co-Authored-By: Claude Sonnet 4.6 --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 99026c6e04..802acb7de9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Raft HA production hardening: leader fencing on SIGTERM, FSM data race, follower restart crash, log compaction config, and election timeout validation [#3230](https://github.com/evstack/ev-node/pull/3230) + ### Changes - Make it easier to override `DefaultMaxBlobSize` by ldflags [#3235](https://github.com/evstack/ev-node/pull/3235)