Skip to content

Commit bf6bb50

Browse files
auricomclaude
andcommitted
fix(raft): address julienrbrt review — logger, boltdb filter, ShutdownTimeout
- Remove stdlib log filter (boltTxClosedFilter / suppressBoltNoise): it redirected the global stdlib logger which is the wrong scope. raft-boltdb uses log.Printf directly with no Logger option, so the "tx closed" noise is now accepted as-is from stderr. - Wire hashicorp/raft's internal hclog output through zerolog: set raft.Config.Logger to an hclog.Logger backed by the zerolog writer so all raft-internal messages appear in the structured log stream under component=raft-hashicorp. - Remove ShutdownTimeout from public config: it was a second "how long to wait" knob that confused operators. ShutdownTimeout is now computed internally as 5 × SendTimeout at the initRaftNode call site. - Delete TestRaftRetrieverStopClearsApplyCallback: tested an implementation detail (Stop clears the apply callback pointer) rather than observable behaviour. The stubRaftNode helper it defined is moved to syncer_test.go where it is still needed. - Rename TestNewNode_SnapshotConfigApplied → TestBuildRaftConfig_SnapshotConfigApplied to reflect that it tests buildRaftConfig, not NewNode. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 40b7251 commit bf6bb50

8 files changed

Lines changed: 44 additions & 144 deletions

File tree

block/internal/syncing/raft_retriever_test.go

Lines changed: 0 additions & 61 deletions
This file was deleted.

block/internal/syncing/syncer_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"crypto/sha512"
77
"errors"
88
"math"
9+
"sync"
910
"sync/atomic"
1011
"testing"
1112
"testing/synctest"
@@ -35,6 +36,30 @@ import (
3536
"github.com/evstack/ev-node/types"
3637
)
3738

39+
// stubRaftNode is a minimal RaftNode stub that records SetApplyCallback calls.
40+
type stubRaftNode struct {
41+
mu sync.Mutex
42+
callbacks []chan<- raft.RaftApplyMsg
43+
}
44+
45+
func (s *stubRaftNode) IsLeader() bool { return false }
46+
func (s *stubRaftNode) HasQuorum() bool { return false }
47+
func (s *stubRaftNode) GetState() *raft.RaftBlockState { return nil }
48+
func (s *stubRaftNode) Broadcast(context.Context, *raft.RaftBlockState) error { return nil }
49+
func (s *stubRaftNode) SetApplyCallback(ch chan<- raft.RaftApplyMsg) {
50+
s.mu.Lock()
51+
defer s.mu.Unlock()
52+
s.callbacks = append(s.callbacks, ch)
53+
}
54+
55+
func (s *stubRaftNode) recordedCallbacks() []chan<- raft.RaftApplyMsg {
56+
s.mu.Lock()
57+
defer s.mu.Unlock()
58+
out := make([]chan<- raft.RaftApplyMsg, len(s.callbacks))
59+
copy(out, s.callbacks)
60+
return out
61+
}
62+
3863
// helper to create a signer, pubkey and address for tests
3964
func buildSyncTestSigner(tb testing.TB) (addr []byte, pub crypto.PubKey, signer signerpkg.Signer) {
4065
tb.Helper()

node/full.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func initRaftNode(nodeConfig config.Config, logger zerolog.Logger) (*raftpkg.Nod
155155
Bootstrap: nodeConfig.Raft.Bootstrap,
156156
SnapCount: nodeConfig.Raft.SnapCount,
157157
SendTimeout: nodeConfig.Raft.SendTimeout,
158-
ShutdownTimeout: nodeConfig.Raft.ShutdownTimeout,
158+
ShutdownTimeout: 5 * nodeConfig.Raft.SendTimeout,
159159
HeartbeatTimeout: nodeConfig.Raft.HeartbeatTimeout,
160160
LeaderLeaseTimeout: nodeConfig.Raft.LeaderLeaseTimeout,
161161
ElectionTimeout: nodeConfig.Raft.ElectionTimeout,

pkg/config/config.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,6 @@ const (
201201
FlagRaftSnapshotThreshold = FlagPrefixEvnode + "raft.snapshot_threshold"
202202
// FlagRaftTrailingLogs is the flag for the number of trailing logs after a snapshot.
203203
FlagRaftTrailingLogs = FlagPrefixEvnode + "raft.trailing_logs"
204-
// FlagRaftShutdownTimeout is the flag for how long to wait for committed logs to be applied on graceful shutdown.
205-
FlagRaftShutdownTimeout = FlagPrefixEvnode + "raft.shutdown_timeout"
206-
207204
// Pruning configuration flags
208205
FlagPruningMode = FlagPrefixEvnode + "pruning.pruning_mode"
209206
FlagPruningKeepRecent = FlagPrefixEvnode + "pruning.pruning_keep_recent"
@@ -412,7 +409,6 @@ type RaftConfig struct {
412409
Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"`
413410
SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of snapshot files to retain on disk"`
414411
SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"`
415-
ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout" yaml:"shutdown_timeout" comment:"Max duration to wait for committed raft logs to be applied on graceful shutdown"`
416412
HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"`
417413
LeaderLeaseTimeout time.Duration `mapstructure:"leader_lease_timeout" yaml:"leader_lease_timeout" comment:"Duration of the leader lease"`
418414
ElectionTimeout time.Duration `mapstructure:"election_timeout" yaml:"election_timeout" comment:"Time a candidate waits for votes before restarting election; must be >= heartbeat_timeout"`
@@ -439,10 +435,6 @@ func (c RaftConfig) Validate() error {
439435
multiErr = errors.Join(multiErr, fmt.Errorf("send timeout must be positive"))
440436
}
441437

442-
if c.ShutdownTimeout <= 0 {
443-
multiErr = errors.Join(multiErr, fmt.Errorf("shutdown timeout must be positive"))
444-
}
445-
446438
if c.HeartbeatTimeout <= 0 {
447439
multiErr = errors.Join(multiErr, fmt.Errorf("heartbeat timeout must be positive"))
448440
}
@@ -672,7 +664,6 @@ func AddFlags(cmd *cobra.Command) {
672664
cmd.Flags().String(FlagRaftPeers, def.Raft.Peers, "comma-separated list of peer Raft addresses (nodeID@host:port)")
673665
cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of snapshot files to retain on disk")
674666
cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer")
675-
cmd.Flags().Duration(FlagRaftShutdownTimeout, def.Raft.ShutdownTimeout, "max duration to wait for committed raft logs to be applied on graceful shutdown")
676667
cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers")
677668
cmd.Flags().Duration(FlagRaftLeaderLeaseTimeout, def.Raft.LeaderLeaseTimeout, "duration of the leader lease")
678669
cmd.Flags().Duration(FlagRaftElectionTimeout, def.Raft.ElectionTimeout, "time a candidate waits for votes before restarting election")

pkg/config/config_test.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ func TestAddFlags(t *testing.T) {
131131
assertFlagValue(t, flags, FlagRaftPeers, DefaultConfig().Raft.Peers)
132132
assertFlagValue(t, flags, FlagRaftSnapCount, DefaultConfig().Raft.SnapCount)
133133
assertFlagValue(t, flags, FlagRaftSendTimeout, DefaultConfig().Raft.SendTimeout)
134-
assertFlagValue(t, flags, FlagRaftShutdownTimeout, DefaultConfig().Raft.ShutdownTimeout)
135134
assertFlagValue(t, flags, FlagRaftHeartbeatTimeout, DefaultConfig().Raft.HeartbeatTimeout)
136135
assertFlagValue(t, flags, FlagRaftLeaderLeaseTimeout, DefaultConfig().Raft.LeaderLeaseTimeout)
137136
assertFlagValue(t, flags, FlagRaftElectionTimeout, DefaultConfig().Raft.ElectionTimeout)
@@ -144,7 +143,7 @@ func TestAddFlags(t *testing.T) {
144143
assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration)
145144

146145
// Count the number of flags we're explicitly checking
147-
expectedFlagCount := 82 // Update this number if you add more flag checks above
146+
expectedFlagCount := 81 // Update this number if you add more flag checks above
148147

149148
// Get the actual number of flags (both regular and persistent)
150149
actualFlagCount := 0
@@ -405,7 +404,6 @@ func TestRaftConfig_Validate(t *testing.T) {
405404
Peers: "",
406405
SnapCount: 1,
407406
SendTimeout: 1 * time.Second,
408-
ShutdownTimeout: 5 * time.Second,
409407
HeartbeatTimeout: 1 * time.Second,
410408
LeaderLeaseTimeout: 1 * time.Second,
411409
ElectionTimeout: 2 * time.Second,
@@ -446,10 +444,6 @@ func TestRaftConfig_Validate(t *testing.T) {
446444
mutate: func(c *RaftConfig) { c.LeaderLeaseTimeout = 0 },
447445
expErr: "leader lease timeout must be positive",
448446
},
449-
"non-positive shutdown timeout": {
450-
mutate: func(c *RaftConfig) { c.ShutdownTimeout = 0 },
451-
expErr: "shutdown timeout must be positive",
452-
},
453447
"negative election timeout rejected": {
454448
mutate: func(c *RaftConfig) { c.ElectionTimeout = -1 * time.Second },
455449
expErr: "election timeout (-1s) must be >= 0",

pkg/config/defaults.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ func DefaultConfig() Config {
116116
},
117117
Raft: RaftConfig{
118118
SendTimeout: 200 * time.Millisecond,
119-
ShutdownTimeout: 5 * time.Second,
120119
HeartbeatTimeout: 350 * time.Millisecond,
121120
LeaderLeaseTimeout: 175 * time.Millisecond,
122121
ElectionTimeout: 1000 * time.Millisecond,

pkg/raft/node.go

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package raft
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
87
"io"
9-
"log"
108
"net"
119
"os"
1210
"path/filepath"
@@ -15,30 +13,13 @@ import (
1513
"sync/atomic"
1614
"time"
1715

16+
hclog "github.com/hashicorp/go-hclog"
1817
"github.com/hashicorp/raft"
1918
raftboltdb "github.com/hashicorp/raft-boltdb"
2019
"github.com/rs/zerolog"
2120
"google.golang.org/protobuf/proto"
2221
)
2322

24-
// suppressBoltNoise redirects the stdlib log output once to drop the
25-
// "Rollback failed: tx closed" messages emitted by hashicorp/raft-boltdb.
26-
// boltdb returns ErrTxClosed when Rollback is called after a successful
27-
// Commit; raft-boltdb unconditionally logs this as an error even though it
28-
// is the expected outcome of every successful transaction.
29-
var suppressBoltNoise sync.Once
30-
31-
// boltTxClosedFilter is an io.Writer that silently drops log lines containing
32-
// "tx closed" and forwards everything else to the underlying writer.
33-
type boltTxClosedFilter struct{ w io.Writer }
34-
35-
func (f *boltTxClosedFilter) Write(p []byte) (n int, err error) {
36-
if bytes.Contains(p, []byte("tx closed")) {
37-
return len(p), nil
38-
}
39-
return f.w.Write(p)
40-
}
41-
4223
// Node represents a raft consensus node
4324
type Node struct {
4425
raft *raft.Raft
@@ -73,10 +54,19 @@ type FSM struct {
7354
}
7455

7556
// buildRaftConfig converts a Node Config into a hashicorp/raft Config.
76-
func buildRaftConfig(cfg *Config) *raft.Config {
57+
// logger is used to bridge hashicorp/raft's internal hclog output to zerolog.
58+
func buildRaftConfig(cfg *Config, logger zerolog.Logger) *raft.Config {
7759
raftConfig := raft.DefaultConfig()
7860
raftConfig.LocalID = raft.ServerID(cfg.NodeID)
79-
raftConfig.LogLevel = "INFO"
61+
// Route raft's internal hclog messages through zerolog so all log output is
62+
// consistent. hclog writes formatted text lines; zerolog receives them via
63+
// its io.Writer implementation and emits them as structured JSON.
64+
raftConfig.Logger = hclog.New(&hclog.LoggerOptions{
65+
Name: "raft",
66+
Level: hclog.Info,
67+
Output: logger.With().Str("component", "raft-hashicorp").Logger(),
68+
DisableTime: true, // zerolog adds its own timestamp
69+
})
8070
raftConfig.HeartbeatTimeout = cfg.HeartbeatTimeout
8171
raftConfig.LeaderLeaseTimeout = cfg.LeaderLeaseTimeout
8272
if cfg.ElectionTimeout > 0 {
@@ -92,14 +82,11 @@ func buildRaftConfig(cfg *Config) *raft.Config {
9282
}
9383

9484
func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) {
95-
suppressBoltNoise.Do(func() {
96-
log.SetOutput(&boltTxClosedFilter{w: log.Writer()})
97-
})
9885
if err := os.MkdirAll(cfg.RaftDir, 0755); err != nil {
9986
return nil, fmt.Errorf("create raft dir: %w", err)
10087
}
10188

102-
raftConfig := buildRaftConfig(cfg)
89+
raftConfig := buildRaftConfig(cfg, logger)
10390

10491
startPointer := new(atomic.Pointer[RaftBlockState])
10592
startPointer.Store(&RaftBlockState{})

pkg/raft/node_test.go

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,17 @@
11
package raft
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"testing"
87
"time"
98

109
"github.com/hashicorp/raft"
10+
"github.com/rs/zerolog"
1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
1313
)
1414

15-
func TestBoltTxClosedFilter_Write(t *testing.T) {
16-
specs := map[string]struct {
17-
input string
18-
expectFwd bool
19-
}{
20-
"passes through normal log line": {
21-
input: "some normal log message\n",
22-
expectFwd: true,
23-
},
24-
"drops line containing tx closed": {
25-
input: "Rollback failed: tx closed\n",
26-
expectFwd: false,
27-
},
28-
"drops line with tx closed anywhere": {
29-
input: "error: tx closed due to commit\n",
30-
expectFwd: false,
31-
},
32-
}
33-
34-
for name, spec := range specs {
35-
t.Run(name, func(t *testing.T) {
36-
var buf bytes.Buffer
37-
f := &boltTxClosedFilter{w: &buf}
38-
n, err := f.Write([]byte(spec.input))
39-
require.NoError(t, err)
40-
assert.Equal(t, len(spec.input), n)
41-
if spec.expectFwd {
42-
assert.Equal(t, spec.input, buf.String())
43-
} else {
44-
assert.Empty(t, buf.String())
45-
}
46-
})
47-
}
48-
}
49-
5015
func TestBuildRaftConfig_ElectionTimeout(t *testing.T) {
5116
specs := map[string]struct {
5217
cfg *Config
@@ -70,7 +35,7 @@ func TestBuildRaftConfig_ElectionTimeout(t *testing.T) {
7035

7136
for name, spec := range specs {
7237
t.Run(name, func(t *testing.T) {
73-
rc := buildRaftConfig(spec.cfg)
38+
rc := buildRaftConfig(spec.cfg, zerolog.Nop())
7439
assert.Equal(t, spec.expectedElectionTimeout, rc.ElectionTimeout)
7540
})
7641
}
@@ -192,7 +157,7 @@ func TestNodeResignLeader_NotLeaderNoop(t *testing.T) {
192157
assert.NoError(t, n.ResignLeader(context.Background()))
193158
}
194159

195-
func TestNewNode_SnapshotConfigApplied(t *testing.T) {
160+
func TestBuildRaftConfig_SnapshotConfigApplied(t *testing.T) {
196161
specs := map[string]struct {
197162
cfg *Config
198163
expectedSnapshotThreshold uint64
@@ -220,7 +185,7 @@ func TestNewNode_SnapshotConfigApplied(t *testing.T) {
220185

221186
for name, spec := range specs {
222187
t.Run(name, func(t *testing.T) {
223-
rc := buildRaftConfig(spec.cfg)
188+
rc := buildRaftConfig(spec.cfg, zerolog.Nop())
224189
assert.Equal(t, spec.expectedSnapshotThreshold, rc.SnapshotThreshold)
225190
assert.Equal(t, spec.expectedTrailingLogs, rc.TrailingLogs)
226191
})

0 commit comments

Comments
 (0)