Skip to content

Commit 954a74a

Browse files
committed
Publisher-mode synchronization option for failover scenario
1 parent 022b565 commit 954a74a

20 files changed

Lines changed: 671 additions & 131 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
### Changes
1313

14+
* Added publisher-mode synchronization option for failover scenarios with early P2P infrastructure readiness [#3222](https://github.com/evstack/ev-node/pull/3222)
1415
* Improve execution/evm check for stored meta not stale [#3221](https://github.com/evstack/ev-node/pull/3221)
1516

1617
## v1.1.0-rc.1

apps/evm/go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm
22

33
go 1.25.7
44

5-
// replace (
6-
// github.com/evstack/ev-node => ../../
7-
// github.com/evstack/ev-node/execution/evm => ../../execution/evm
8-
// )
5+
replace (
6+
github.com/evstack/ev-node => ../../
7+
github.com/evstack/ev-node/execution/evm => ../../execution/evm
8+
)
99

1010
require (
1111
github.com/ethereum/go-ethereum v1.17.2

apps/evm/go.sum

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,12 +472,8 @@ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab h1:rvv6MJ
472472
github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab/go.mod h1:IuLm4IsPipXKF7CW5Lzf68PIbZ5yl7FFd74l/E0o9A8=
473473
github.com/ethereum/go-ethereum v1.17.2 h1:ag6geu0kn8Hv5FLKTpH+Hm2DHD+iuFtuqKxEuwUsDOI=
474474
github.com/ethereum/go-ethereum v1.17.2/go.mod h1:KHcRXfGOUfUmKg51IhQ0IowiqZ6PqZf08CMtk0g5K1o=
475-
github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs=
476-
github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk=
477475
github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8=
478476
github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
479-
github.com/evstack/ev-node/execution/evm v1.0.0 h1:UTAdCrnPsLoGzSgsBx4Kv76jkXpMmHBIpNv3MxyzWPo=
480-
github.com/evstack/ev-node/execution/evm v1.0.0/go.mod h1:UrqkiepfTMiot6M8jnswgu3VU8SSucZpaMIHIl22/1A=
481477
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
482478
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
483479
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=

block/internal/syncing/syncer.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,12 @@ var (
694694
// TrySyncNextBlock attempts to sync the next available block
695695
// the event is always the next block in sequence as processHeightEvent ensures it.
696696
func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error {
697+
return s.trySyncNextBlockWithState(ctx, event, s.getLastState())
698+
}
699+
700+
// trySyncNextBlockWithState attempts to sync the next available block using
701+
// the provided current state as the validation/apply baseline.
702+
func (s *Syncer) trySyncNextBlockWithState(ctx context.Context, event *common.DAHeightEvent, currentState types.State) error {
697703
select {
698704
case <-ctx.Done():
699705
return ctx.Err()
@@ -703,7 +709,6 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve
703709
header := event.Header
704710
data := event.Data
705711
nextHeight := event.Header.Height()
706-
currentState := s.getLastState()
707712
headerHash := header.Hash().String()
708713

709714
s.logger.Info().Uint64("height", nextHeight).Str("source", string(event.Source)).Msg("syncing block")
@@ -1189,6 +1194,7 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS
11891194
}
11901195

11911196
currentState := s.getLastState()
1197+
stateBootstrapped := false
11921198

11931199
// Defensive: if lastState is not yet initialized (e.g., RecoverFromRaft called before Start),
11941200
// load it from the store to ensure we have valid state for validation.
@@ -1201,8 +1207,10 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS
12011207
s.logger.Debug().Err(err).Msg("no state in store, using genesis defaults for recovery")
12021208
currentState = types.State{
12031209
ChainID: s.genesis.ChainID,
1210+
InitialHeight: s.genesis.InitialHeight,
12041211
LastBlockHeight: s.genesis.InitialHeight - 1,
12051212
}
1213+
stateBootstrapped = true
12061214
}
12071215
}
12081216

@@ -1214,11 +1222,18 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS
12141222
return nil
12151223
} else if currentState.LastBlockHeight+1 == raftState.Height { // raft is 1 block ahead
12161224
// apply block
1217-
err := s.TrySyncNextBlock(ctx, &common.DAHeightEvent{
1225+
event := &common.DAHeightEvent{
12181226
Header: &header,
12191227
Data: &data,
12201228
Source: "",
1221-
})
1229+
}
1230+
err := s.trySyncNextBlockWithState(ctx, event, currentState)
1231+
if err != nil && stateBootstrapped && errors.Is(err, errInvalidState) {
1232+
s.logger.Debug().Err(err).Msg("raft recovery failed after bootstrap state init, retrying once")
1233+
// Keep strict validation semantics; this retry only guards startup ordering races.
1234+
s.SetLastState(currentState)
1235+
err = s.trySyncNextBlockWithState(ctx, event, currentState)
1236+
}
12221237
if err != nil {
12231238
return err
12241239
}

block/internal/syncing/syncer_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/evstack/ev-node/pkg/config"
2727
datypes "github.com/evstack/ev-node/pkg/da/types"
2828
"github.com/evstack/ev-node/pkg/genesis"
29+
"github.com/evstack/ev-node/pkg/raft"
2930
signerpkg "github.com/evstack/ev-node/pkg/signer"
3031
"github.com/evstack/ev-node/pkg/signer/noop"
3132
"github.com/evstack/ev-node/pkg/store"
@@ -306,6 +307,121 @@ func TestSequentialBlockSync(t *testing.T) {
306307
requireEmptyChan(t, errChan)
307308
}
308309

310+
func TestSyncer_RecoverFromRaft_BootstrapsStateWhenUninitialized(t *testing.T) {
311+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
312+
st := store.New(ds)
313+
314+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
315+
require.NoError(t, err)
316+
317+
addr, pub, signer := buildSyncTestSigner(t)
318+
gen := genesis.Genesis{
319+
ChainID: "1234",
320+
InitialHeight: 1,
321+
StartTime: time.Now().Add(-time.Second),
322+
ProposerAddress: addr,
323+
}
324+
325+
mockExec := testmocks.NewMockExecutor(t)
326+
mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
327+
mockDataStore := extmocks.NewMockStore[*types.P2PData](t)
328+
s := NewSyncer(
329+
st,
330+
mockExec,
331+
nil,
332+
cm,
333+
common.NopMetrics(),
334+
config.DefaultConfig(),
335+
gen,
336+
mockHeaderStore,
337+
mockDataStore,
338+
zerolog.Nop(),
339+
common.DefaultBlockOptions(),
340+
make(chan error, 1),
341+
nil,
342+
)
343+
344+
// lastState intentionally not initialized to simulate recovery-before-start path.
345+
data := makeData(gen.ChainID, 1, 0)
346+
headerBz, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app0"), data, nil)
347+
dataBz, err := data.MarshalBinary()
348+
require.NoError(t, err)
349+
350+
mockExec.EXPECT().
351+
ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, mock.Anything).
352+
Return([]byte("app1"), nil).
353+
Once()
354+
355+
err = s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{
356+
Height: 1,
357+
Hash: hdr.Hash(),
358+
Header: headerBz,
359+
Data: dataBz,
360+
})
361+
require.NoError(t, err)
362+
363+
state, err := st.GetState(t.Context())
364+
require.NoError(t, err)
365+
require.Equal(t, gen.ChainID, state.ChainID)
366+
require.Equal(t, uint64(1), state.LastBlockHeight)
367+
}
368+
369+
func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing.T) {
370+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
371+
st := store.New(ds)
372+
373+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
374+
require.NoError(t, err)
375+
376+
addr, pub, signer := buildSyncTestSigner(t)
377+
gen := genesis.Genesis{
378+
ChainID: "1234",
379+
InitialHeight: 1,
380+
StartTime: time.Now().Add(-time.Second),
381+
ProposerAddress: addr,
382+
}
383+
384+
mockExec := testmocks.NewMockExecutor(t)
385+
mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
386+
mockDataStore := extmocks.NewMockStore[*types.P2PData](t)
387+
s := NewSyncer(
388+
st,
389+
mockExec,
390+
nil,
391+
cm,
392+
common.NopMetrics(),
393+
config.DefaultConfig(),
394+
gen,
395+
mockHeaderStore,
396+
mockDataStore,
397+
zerolog.Nop(),
398+
common.DefaultBlockOptions(),
399+
make(chan error, 1),
400+
nil,
401+
)
402+
403+
// Non-empty state must remain strictly validated.
404+
s.SetLastState(types.State{
405+
ChainID: "wrong-chain",
406+
InitialHeight: 1,
407+
LastBlockHeight: 0,
408+
})
409+
410+
data := makeData(gen.ChainID, 1, 0)
411+
headerBz, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app0"), data, nil)
412+
dataBz, err := data.MarshalBinary()
413+
require.NoError(t, err)
414+
415+
err = s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{
416+
Height: 1,
417+
Hash: hdr.Hash(),
418+
Header: headerBz,
419+
Data: dataBz,
420+
})
421+
require.Error(t, err)
422+
require.ErrorContains(t, err, "invalid chain ID")
423+
}
424+
309425
func TestSyncer_processPendingEvents(t *testing.T) {
310426
ds := dssync.MutexWrap(datastore.NewMapDatastore())
311427
st := store.New(ds)

docs/guides/raft_production.md

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Raft is configured via CLI flags or the `config.toml` file under the `[raft]` (o
3333
| `--evnode.raft.raft_addr` | `raft.raft_addr` | TCP address for Raft transport. | `0.0.0.0:5001` (Bind to private IP) |
3434
| `--evnode.raft.raft_dir` | `raft.raft_dir` | Directory for Raft data. | `/data/raft` (Must be persistent) |
3535
| `--evnode.raft.peers` | `raft.peers` | Comma-separated list of peer addresses in format `nodeID@host:port`. | `node-1@10.0.0.1:5001,node-2@10.0.0.2:5001,node-3@10.0.0.3:5001` |
36-
| `--evnode.raft.bootstrap` | `raft.bootstrap` | Bootstrap the cluster. **Required** for initial setup. | `true` (See Limitations) |
36+
| `--evnode.raft.bootstrap` | `raft.bootstrap` | Compatibility flag. Startup mode is selected automatically from persisted raft configuration state. | optional |
3737

3838
### Timeout Tuning
3939

@@ -55,11 +55,15 @@ Ideally, a failover should complete within `2 * BlockTime` to minimize user impa
5555
5656
## Production Deployment Principles
5757

58-
### 1. Static Peering & Bootstrap
59-
Current implementation requires **Bootstrap Mode** (`--evnode.raft.bootstrap=true`) for all nodes participating in the cluster initialization.
60-
* **All nodes** should list the full set of peers in `--evnode.raft.peers`.
58+
### 1. Static Peering & Automatic Startup Mode
59+
Use static peering with automatic mode selection from local raft configuration:
60+
* If local raft configuration already exists in `--evnode.raft.raft_dir`, the node starts in rejoin mode.
61+
* If no local raft configuration exists yet, the node bootstraps from configured peers.
62+
* `--evnode.raft.bootstrap` is retained for compatibility but does not control mode selection.
63+
* **All configured cluster members** should list the full set of peers in `--evnode.raft.peers`.
6164
* The `peers` list format is strict: `NodeID@Host:Port`.
62-
* **Limitation**: Dynamic addition of peers (Run-time Membership Changes) via RPC/CLI is not currently exposed. The cluster membership is static based on the initial bootstrap configuration.
65+
* **Limitation**: Dynamic addition of peers (run-time membership changes) via RPC/CLI is not currently exposed.
66+
* **Not supported**: Joining an existing cluster as a brand-new node that was not part of the initial static membership.
6367

6468
### 2. Infrastructure Requirements
6569
* **Encrypted Network (CRITICAL)**: Raft traffic is **unencrypted** (plain TCP). You **MUST** run the cluster inside a private network, VPN, or encrypted mesh (e.g., WireGuard, Tailscale). **Never expose Raft ports to the public internet**; doing so allows attackers to hijack the cluster consensus.
@@ -86,13 +90,13 @@ Monitor the following metrics (propagated via Prometheus if enabled):
8690

8791
```bash
8892
./ev-node start \
89-
--node.aggregator \
90-
--raft.enable \
91-
--raft.node_id="node-1" \
92-
--raft.raft_addr="0.0.0.0:5001" \
93-
--raft.raft_dir="/var/lib/ev-node/raft" \
94-
--raft.bootstrap=true \
95-
--raft.peers="node-1@10.0.1.1:5001,node-2@10.0.1.2:5001,node-3@10.0.1.3:5001" \
96-
--p2p.listen_address="/ip4/0.0.0.0/tcp/26656" \
93+
--rollkit.node.aggregator=true \
94+
--evnode.raft.enable=true \
95+
--evnode.raft.node_id="node-1" \
96+
--evnode.raft.raft_addr="0.0.0.0:5001" \
97+
--evnode.raft.raft_dir="/var/lib/ev-node/raft" \
98+
--evnode.raft.bootstrap=true \
99+
--evnode.raft.peers="node-1@10.0.1.1:5001,node-2@10.0.1.2:5001,node-3@10.0.1.3:5001" \
100+
--rollkit.p2p.listen_address="/ip4/0.0.0.0/tcp/26656" \
97101
...other flags
98102
```

docs/learn/config.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,7 @@ _Constant:_ `FlagRaftDir`
13211321
### Raft Bootstrap
13221322

13231323
**Description:**
1324-
If true, bootstraps a new Raft cluster. Only set this on the very first node when initializing a new cluster.
1324+
Legacy compatibility flag. Startup mode is now auto-selected from persisted raft configuration state, so this flag is not used to choose bootstrap vs rejoin.
13251325

13261326
**YAML:**
13271327

@@ -1352,6 +1352,16 @@ raft:
13521352
_Default:_ `""` (empty)
13531353
_Constant:_ `FlagRaftPeers`
13541354

1355+
### Raft Startup Mode
1356+
1357+
Raft startup mode is selected automatically from local raft configuration state:
1358+
1359+
* If the node already has persisted raft configuration in `raft.raft_dir`, it starts in rejoin mode.
1360+
* If no raft configuration exists yet, it bootstraps a cluster from configured peers.
1361+
* `raft.bootstrap` is retained for compatibility but does not control mode selection.
1362+
1363+
`--evnode.raft.rejoin` has been removed.
1364+
13551365
### Raft Snap Count
13561366

13571367
**Description:**

node/failover.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type failoverState struct {
3333
dataSyncService *evsync.DataSyncService
3434
rpcServer *http.Server
3535
bc *block.Components
36+
raftNode *raft.Node
37+
isAggregator bool
3638

3739
// catchup fields — used when the aggregator needs to sync before producing
3840
catchupEnabled bool
@@ -172,13 +174,41 @@ func setupFailoverState(
172174
dataSyncService: dataSyncService,
173175
rpcServer: rpcServer,
174176
bc: bc,
177+
raftNode: raftNode,
178+
isAggregator: isAggregator,
175179
store: rktStore,
176180
catchupEnabled: catchupEnabled,
177181
catchupTimeout: nodeConfig.Node.CatchupTimeout.Duration,
178182
daBlockTime: nodeConfig.DA.BlockTime.Duration,
179183
}, nil
180184
}
181185

186+
// shouldStartSyncInPublisherMode avoids startup deadlock when a raft leader boots
187+
// with empty sync stores and no peer can serve height 1 yet.
188+
func (f *failoverState) shouldStartSyncInPublisherMode(ctx context.Context) bool {
189+
if !f.isAggregator || f.raftNode == nil || !f.raftNode.IsLeader() {
190+
return false
191+
}
192+
193+
storeHeight, err := f.store.Height(ctx)
194+
if err != nil {
195+
f.logger.Warn().Err(err).Msg("cannot determine store height; keeping blocking sync startup")
196+
return false
197+
}
198+
headerHeight := f.headerSyncService.Store().Height()
199+
dataHeight := f.dataSyncService.Store().Height()
200+
if headerHeight > 0 || dataHeight > 0 {
201+
return false
202+
}
203+
204+
f.logger.Info().
205+
Uint64("store_height", storeHeight).
206+
Uint64("header_height", headerHeight).
207+
Uint64("data_height", dataHeight).
208+
Msg("raft-enabled aggregator with empty sync stores: starting sync services in publisher mode")
209+
return true
210+
}
211+
182212
func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
183213
stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally
184214
// parent context is cancelled already, so we need to create a new one
@@ -207,15 +237,28 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
207237
})
208238

209239
// start header and data sync services concurrently to avoid cumulative startup delay.
240+
startSyncInPublisherMode := f.shouldStartSyncInPublisherMode(ctx)
210241
syncWg, syncCtx := errgroup.WithContext(ctx)
211242
syncWg.Go(func() error {
212-
if err := f.headerSyncService.Start(syncCtx); err != nil {
243+
var err error
244+
if startSyncInPublisherMode {
245+
err = f.headerSyncService.StartForPublishing(syncCtx)
246+
} else {
247+
err = f.headerSyncService.Start(syncCtx)
248+
}
249+
if err != nil {
213250
return fmt.Errorf("header sync service: %w", err)
214251
}
215252
return nil
216253
})
217254
syncWg.Go(func() error {
218-
if err := f.dataSyncService.Start(syncCtx); err != nil {
255+
var err error
256+
if startSyncInPublisherMode {
257+
err = f.dataSyncService.StartForPublishing(syncCtx)
258+
} else {
259+
err = f.dataSyncService.Start(syncCtx)
260+
}
261+
if err != nil {
219262
return fmt.Errorf("data sync service: %w", err)
220263
}
221264
return nil

0 commit comments

Comments
 (0)