Skip to content

Commit 0982a03

Browse files
committed
lazy init
1 parent 1b8da2e commit 0982a03

3 files changed

Lines changed: 86 additions & 147 deletions

File tree

node/failover.go

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -183,32 +183,6 @@ func setupFailoverState(
183183
}, nil
184184
}
185185

186-
// shouldStartSyncInPublisherMode avoids startup deadlock when a raft leader boots
187-
// with empty sync stores and no peer can serve height 1 yet.
188-
func (f *failoverState) shouldStartSyncInPublisherMode(ctx context.Context) bool {
189-
if !f.isAggregator || f.raftNode == nil || !f.raftNode.IsLeader() {
190-
return false
191-
}
192-
193-
storeHeight, err := f.store.Height(ctx)
194-
if err != nil {
195-
f.logger.Warn().Err(err).Msg("cannot determine store height; keeping blocking sync startup")
196-
return false
197-
}
198-
headerHeight := f.headerSyncService.Store().Height()
199-
dataHeight := f.dataSyncService.Store().Height()
200-
if headerHeight > 0 || dataHeight > 0 {
201-
return false
202-
}
203-
204-
f.logger.Info().
205-
Uint64("store_height", storeHeight).
206-
Uint64("header_height", headerHeight).
207-
Uint64("data_height", dataHeight).
208-
Msg("raft-enabled aggregator with empty sync stores: starting sync services in publisher mode")
209-
return true
210-
}
211-
212186
func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
213187
stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally
214188
// parent context is cancelled already, so we need to create a new one
@@ -237,28 +211,15 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
237211
})
238212

239213
// start header and data sync services concurrently to avoid cumulative startup delay.
240-
startSyncInPublisherMode := f.shouldStartSyncInPublisherMode(ctx)
241214
syncWg, syncCtx := errgroup.WithContext(ctx)
242215
syncWg.Go(func() error {
243-
var err error
244-
if startSyncInPublisherMode {
245-
err = f.headerSyncService.StartForPublishing(syncCtx)
246-
} else {
247-
err = f.headerSyncService.Start(syncCtx)
248-
}
249-
if err != nil {
216+
if err := f.headerSyncService.Start(syncCtx); err != nil {
250217
return fmt.Errorf("header sync service: %w", err)
251218
}
252219
return nil
253220
})
254221
syncWg.Go(func() error {
255-
var err error
256-
if startSyncInPublisherMode {
257-
err = f.dataSyncService.StartForPublishing(syncCtx)
258-
} else {
259-
err = f.dataSyncService.Start(syncCtx)
260-
}
261-
if err != nil {
222+
if err := f.dataSyncService.Start(syncCtx); err != nil {
262223
return fmt.Errorf("data sync service: %w", err)
263224
}
264225
return nil

pkg/sync/sync_service.go

Lines changed: 82 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"strings"
8+
"sync"
89
"sync/atomic"
910
"time"
1011

@@ -67,6 +68,10 @@ type SyncService[H store.EntityWithDAHint[H]] struct {
6768
topicSubscription header.Subscription[H]
6869

6970
storeInitialized atomic.Bool
71+
72+
ctx context.Context
73+
cancel context.CancelFunc
74+
wg sync.WaitGroup
7075
}
7176

7277
// NewDataSyncService returns a new DataSyncService.
@@ -186,56 +191,104 @@ func (s *SyncService[H]) AppendDAHint(ctx context.Context, daHeight uint64, heig
186191
}
187192

188193
// Start is a part of Service interface.
194+
//
195+
// Sets up P2P infrastructure and starts the subscriber. A background goroutine
196+
// periodically retries P2P initialization until the syncer starts, then stops.
189197
func (syncService *SyncService[H]) Start(ctx context.Context) error {
198+
syncService.ctx, syncService.cancel = context.WithCancel(ctx)
199+
190200
peerIDs, err := syncService.prepareStart(ctx)
191201
if err != nil {
192202
return err
193203
}
194204

195-
// initialize stores from P2P (blocking until genesis is fetched for followers)
196-
// Aggregators (no peers configured) return immediately and initialize on first produced block.
197-
if err := syncService.initFromP2PWithRetry(ctx, peerIDs); err != nil {
198-
return fmt.Errorf("failed to initialize stores from P2P: %w", err)
199-
}
200-
201-
// start the subscriber, stores are guaranteed to have genesis for followers.
202-
//
203-
// NOTE: we must start the subscriber after the syncer is initialized in initFromP2PWithRetry to ensure p2p syncing
204-
// works correctly.
205205
if err := syncService.startSubscriber(ctx); err != nil {
206206
return fmt.Errorf("failed to start subscriber: %w", err)
207207
}
208208

209+
syncService.wg.Go(func() { syncService.p2pInitRetryLoop(peerIDs) })
210+
209211
return nil
210212
}
211213

212-
// StartForPublishing starts the sync service in publisher mode.
213-
//
214-
// This mode is used by a raft leader with an empty local store: no peer can serve
215-
// height 1 yet, so waiting for initFromP2PWithRetry would deadlock block production.
216-
// We still need the P2P exchange server and pubsub subscriber to be ready before the
217-
// first block is produced, because WriteToStoreAndBroadcast relies on them to gossip
218-
// the block that bootstraps the network.
219-
func (syncService *SyncService[H]) StartForPublishing(ctx context.Context) error {
220-
if _, err := syncService.prepareStart(ctx); err != nil {
221-
return err
214+
// initFromP2POnce makes a single best-effort attempt to initialize the store
215+
// from P2P peers. If peers are unavailable it returns nil so startup continues
216+
// and DA sync provides blocks instead.
217+
func (syncService *SyncService[H]) initFromP2POnce(ctx context.Context, peerIDs []peer.ID) error {
218+
if len(peerIDs) == 0 {
219+
return nil
222220
}
223221

224-
if err := syncService.startSubscriber(ctx); err != nil {
225-
return fmt.Errorf("failed to start subscriber: %w", err)
222+
var heightToQuery uint64
223+
head, headErr := syncService.store.Head(ctx)
224+
switch {
225+
case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore):
226+
heightToQuery = syncService.genesis.InitialHeight
227+
case headErr != nil:
228+
return fmt.Errorf("failed to inspect local store head: %w", headErr)
229+
default:
230+
heightToQuery = head.Height()
231+
}
232+
233+
trusted, err := syncService.ex.GetByHeight(ctx, heightToQuery)
234+
if err != nil {
235+
syncService.logger.Info().Err(err).Msg("P2P init failed, deferring to DA sync")
236+
return nil
237+
}
238+
239+
if syncService.storeInitialized.CompareAndSwap(false, true) {
240+
if _, err := syncService.initStore(ctx, trusted); err != nil {
241+
syncService.storeInitialized.Store(false)
242+
return fmt.Errorf("failed to initialize the store: %w", err)
243+
}
244+
}
245+
246+
if _, err := syncService.startSyncer(ctx); err != nil {
247+
return err
226248
}
227249

228250
return nil
229251
}
230252

253+
// p2pInitRetryLoop periodically retries P2P initialization until the syncer
254+
// has been started. Once started (either by this loop, WriteToStoreAndBroadcast,
255+
// or initFromP2POnce), the loop exits.
256+
func (syncService *SyncService[H]) p2pInitRetryLoop(peerIDs []peer.ID) {
257+
if len(peerIDs) == 0 {
258+
return
259+
}
260+
261+
backoff := 1 * time.Second
262+
maxBackoff := 10 * time.Second
263+
264+
for {
265+
if syncService.syncerStatus.isStarted() {
266+
return
267+
}
268+
269+
if err := syncService.initFromP2POnce(syncService.ctx, peerIDs); err != nil {
270+
syncService.logger.Error().Err(err).Msg("P2P init retry failed")
271+
}
272+
273+
select {
274+
case <-syncService.ctx.Done():
275+
return
276+
case <-time.After(backoff):
277+
}
278+
279+
backoff *= 2
280+
if backoff > maxBackoff {
281+
backoff = maxBackoff
282+
}
283+
}
284+
}
285+
231286
func (syncService *SyncService[H]) prepareStart(ctx context.Context) ([]peer.ID, error) {
232-
// setup P2P infrastructure, but don't start Subscriber yet.
233287
peerIDs, err := syncService.setupP2PInfrastructure(ctx)
234288
if err != nil {
235289
return nil, fmt.Errorf("failed to setup syncer P2P infrastructure: %w", err)
236290
}
237291

238-
// create syncer, must be before initFromP2PWithRetry which calls startSyncer.
239292
if syncService.syncer, err = newSyncer(
240293
syncService.ex,
241294
syncService.store,
@@ -362,90 +415,15 @@ func (s *SyncService[H]) Height() uint64 {
362415
return s.store.Height()
363416
}
364417

365-
// initFromP2PWithRetry initializes the syncer from P2P with a retry mechanism.
366-
// It inspects the local store to determine the first height to request:
367-
// - when the store already contains items, it reuses the latest height as the starting point;
368-
// - otherwise, it falls back to the configured genesis height.
369-
func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, peerIDs []peer.ID) error {
370-
if len(peerIDs) == 0 {
371-
return nil
372-
}
373-
374-
tryInit := func(ctx context.Context) (bool, error) {
375-
var (
376-
trusted H
377-
err error
378-
heightToQuery uint64
379-
)
380-
381-
head, headErr := syncService.store.Head(ctx)
382-
switch {
383-
case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore):
384-
heightToQuery = syncService.genesis.InitialHeight
385-
case headErr != nil:
386-
return false, fmt.Errorf("failed to inspect local store head: %w", headErr)
387-
default:
388-
heightToQuery = head.Height()
389-
}
390-
391-
if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil {
392-
return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err)
393-
}
394-
395-
if syncService.storeInitialized.CompareAndSwap(false, true) {
396-
if _, err := syncService.initStore(ctx, trusted); err != nil {
397-
syncService.storeInitialized.Store(false)
398-
return false, fmt.Errorf("failed to initialize the store: %w", err)
399-
}
400-
}
401-
if _, err := syncService.startSyncer(ctx); err != nil {
402-
return false, err
403-
}
404-
return true, nil
405-
}
406-
407-
// block with exponential backoff until initialization succeeds, context is canceled, or timeout.
408-
// If timeout is reached, we return nil to allow startup to continue - DA sync will
409-
// provide headers and WriteToStoreAndBroadcast will lazily initialize the store/syncer.
410-
backoff := 1 * time.Second
411-
maxBackoff := 10 * time.Second
412-
413-
p2pInitTimeout := 30 * time.Second
414-
timeoutTimer := time.NewTimer(p2pInitTimeout)
415-
defer timeoutTimer.Stop()
416-
retryTimer := time.NewTimer(backoff)
417-
defer retryTimer.Stop()
418-
419-
for {
420-
ok, err := tryInit(ctx)
421-
if ok {
422-
return nil
423-
}
424-
425-
syncService.logger.Info().Err(err).Dur("retry_in", backoff).Msg("headers not yet available from peers, waiting to initialize header sync")
426-
427-
select {
428-
case <-ctx.Done():
429-
return ctx.Err()
430-
case <-timeoutTimer.C:
431-
syncService.logger.Warn().
432-
Dur("timeout", p2pInitTimeout).
433-
Msg("P2P header sync initialization timed out, deferring to DA sync")
434-
return nil
435-
case <-retryTimer.C:
436-
}
437-
backoff *= 2
438-
if backoff > maxBackoff {
439-
backoff = maxBackoff
440-
}
441-
retryTimer.Reset(backoff)
442-
}
443-
}
444-
445418
// Stop is a part of Service interface.
446419
//
447420
// `store` is closed last because it's used by other services.
448421
func (syncService *SyncService[H]) Stop(ctx context.Context) error {
422+
if syncService.cancel != nil {
423+
syncService.cancel()
424+
}
425+
syncService.wg.Wait()
426+
449427
// unsubscribe from topic first so that sub.Stop() does not fail
450428
syncService.topicSubscription.Cancel()
451429
err := errors.Join(

pkg/sync/sync_service_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"github.com/evstack/ev-node/types"
2727
)
2828

29-
func TestHeaderSyncServiceStartForPublishingWithPeers(t *testing.T) {
29+
func TestHeaderSyncServiceStartWithPeers(t *testing.T) {
3030
mainKV := sync.MutexWrap(datastore.NewMapDatastore())
3131
pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader)
3232
require.NoError(t, err)
@@ -79,7 +79,7 @@ func TestHeaderSyncServiceStartForPublishingWithPeers(t *testing.T) {
7979
evStore := store.New(mainKV)
8080
svc, err := NewHeaderSyncService(evStore, conf, genesisDoc, client1, logger)
8181
require.NoError(t, err)
82-
require.NoError(t, svc.StartForPublishing(ctx))
82+
require.NoError(t, svc.Start(ctx))
8383
t.Cleanup(func() { _ = svc.Stop(context.Background()) })
8484

8585
headerConfig := types.HeaderConfig{

0 commit comments

Comments
 (0)