Skip to content

Commit a8754d4

Browse files
committed
better split + fix da inclusion for p2p
1 parent b751bb0 commit a8754d4

2 files changed

Lines changed: 53 additions & 56 deletions

File tree

block/internal/syncing/syncer.go

Lines changed: 30 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,11 @@ func (s *Syncer) syncLoop() {
243243
return
244244
}
245245

246-
lastHeaderHeight := initialHeight
247-
lastDataHeight := initialHeight
246+
lastHeaderHeight := &initialHeight
247+
lastDataHeight := &initialHeight
248248

249249
// Backoff control when DA replies with errors
250-
var nextDARequestAt time.Time
250+
nextDARequestAt := &time.Time{}
251251

252252
blockTicker := time.NewTicker(s.config.Node.BlockTime.Duration)
253253
defer blockTicker.Stop()
@@ -261,44 +261,27 @@ func (s *Syncer) syncLoop() {
261261
// Process pending events from cache on every iteration
262262
s.processPendingEvents()
263263

264-
// Try fetching from both DA and P2P in configurable order
265-
if s.tryFetchStrategies(&nextDARequestAt, &lastHeaderHeight, &lastDataHeight, blockTicker.C) {
266-
continue // events were processed, restart loop immediately
264+
// Fetch events from DA layer first and optimistically p2p when necessary.
265+
// This is the default behavior.
266+
if !s.config.Sync.PreferP2P {
267+
if s.tryFetchFromDA(nextDARequestAt) {
268+
continue
269+
}
270+
if s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight, blockTicker.C) {
271+
continue
272+
}
273+
} else {
274+
if s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight, blockTicker.C) {
275+
continue
276+
}
277+
if s.tryFetchFromDA(nextDARequestAt) {
278+
continue
279+
}
267280
}
268281

269282
// Prevent busy-waiting when no events are available
270-
waitTime := min(10*time.Millisecond, s.config.Node.BlockTime.Duration)
271-
time.Sleep(waitTime)
272-
}
273-
}
274-
275-
// tryFetchStrategies attempts to fetch from both DA and P2P based on configuration priority.
276-
// The order of fetching depends on the PreferP2P configuration setting:
277-
// - PreferP2P=false (default): DA first, then P2P (original and recommended behavior)
278-
// - PreferP2P=true: P2P first, then DA
279-
//
280-
// Both strategies are always attempted on each call, but the order affects which
281-
// events are processed first and may impact overall sync performance.
282-
func (s *Syncer) tryFetchStrategies(nextDARequestAt *time.Time, lastHeaderHeight, lastDataHeight *uint64, blockTicker <-chan time.Time) bool {
283-
eventsProcessed := false
284-
285-
if s.config.PreferP2P {
286-
if s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight, blockTicker) {
287-
eventsProcessed = true
288-
}
289-
if s.tryFetchFromDA(nextDARequestAt) {
290-
eventsProcessed = true
291-
}
292-
} else {
293-
if s.tryFetchFromDA(nextDARequestAt) {
294-
eventsProcessed = true
295-
}
296-
if s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight, blockTicker) {
297-
eventsProcessed = true
298-
}
283+
time.Sleep(min(10*time.Millisecond, s.config.Node.BlockTime.Duration))
299284
}
300-
301-
return eventsProcessed
302285
}
303286

304287
// tryFetchFromDA attempts to fetch events from the DA layer.
@@ -474,15 +457,17 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
474457
return errors.Join(errInvalidBlock, fmt.Errorf("failed to validate block: %w", err))
475458
}
476459

477-
// Mark as DA included
478-
headerHash := header.Hash().String()
479-
s.cache.SetHeaderDAIncluded(headerHash, event.HeaderDaIncludedHeight)
460+
// Mark as DA included (only if set, p2p sync does not set it)
461+
if event.HeaderDaIncludedHeight > 0 {
462+
headerHash := header.Hash().String()
463+
s.cache.SetHeaderDAIncluded(headerHash, event.HeaderDaIncludedHeight)
480464

481-
s.logger.Info().
482-
Str("header_hash", headerHash).
483-
Uint64("da_height", event.HeaderDaIncludedHeight).
484-
Uint64("height", header.Height()).
485-
Msg("header marked as DA included")
465+
s.logger.Info().
466+
Str("header_hash", headerHash).
467+
Uint64("da_height", event.HeaderDaIncludedHeight).
468+
Uint64("height", header.Height()).
469+
Msg("header marked as DA included")
470+
}
486471

487472
// Apply block
488473
newState, err := s.applyBlock(header.Header, data, currentState)

pkg/config/config.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ const (
2828
FlagRootDir = "home"
2929
// FlagDBPath is a flag for specifying the database path
3030
FlagDBPath = FlagPrefixEvnode + "db_path"
31+
// FlagClearCache is a flag for clearing the cache
32+
FlagClearCache = FlagPrefixEvnode + "clear_cache"
3133

3234
// Node configuration flags
3335

@@ -47,10 +49,11 @@ const (
4749
FlagLazyBlockTime = FlagPrefixEvnode + "node.lazy_block_interval"
4850
// FlagReadinessMaxBlocksBehind configures how many blocks behind best-known head is still considered ready
4951
FlagReadinessMaxBlocksBehind = FlagPrefixEvnode + "node.readiness_max_blocks_behind"
50-
// FlagClearCache is a flag for clearing the cache
51-
FlagClearCache = FlagPrefixEvnode + "clear_cache"
52-
// FlagP2PPrefer is a flag to prioritize p2p over da fetching
53-
FlagP2PPrefer = FlagPrefixEvnode + "prefer_p2p"
52+
53+
// Sync configuration flags
54+
55+
// FlagSyncP2PPrefer is a flag to prioritize p2p over da fetching
56+
FlagSyncP2PPrefer = FlagPrefixEvnode + "sync.prefer_p2p"
5457

5558
// Data Availability configuration flags
5659

@@ -132,11 +135,14 @@ const (
132135
// Config stores Rollkit configuration.
133136
type Config struct {
134137
RootDir string `mapstructure:"-" yaml:"-" comment:"Root directory where rollkit files are located"`
135-
ClearCache bool `mapstructure:"-" yaml:"-" comment:"Clear the cache"`
136-
PreferP2P bool `mapstructure:"-" yaml:"-" comment:"Prefer P2P over DA fetching"`
138+
ClearCache bool `mapstructure:"clear_cache" yaml:"-" comment:"Clear the cache"`
137139

138140
// Base configuration
139141
DBPath string `mapstructure:"db_path" yaml:"db_path" comment:"Path inside the root directory where the database is located"`
142+
143+
// Sync configuration (not in yaml)
144+
Sync SyncConfig `mapstructure:"sync" yaml:"-"`
145+
140146
// P2P configuration
141147
P2P P2PConfig `mapstructure:"p2p" yaml:"p2p"`
142148

@@ -159,6 +165,12 @@ type Config struct {
159165
Signer SignerConfig `mapstructure:"signer" yaml:"signer"`
160166
}
161167

168+
// SyncConfig contains synchronization configuration parameters
169+
// Those are used in flags are not set in the ev-node config.
170+
type SyncConfig struct {
171+
PreferP2P bool `mapstructure:"prefer_p2p" yaml:"-" comment:"Prefer P2P over DA fetching"`
172+
}
173+
162174
// DAConfig contains all Data Availability configuration parameters
163175
type DAConfig struct {
164176
Address string `mapstructure:"address" yaml:"address" comment:"Address of the data availability layer service (host:port). This is the endpoint where Rollkit will connect to submit and retrieve data."`
@@ -311,7 +323,9 @@ func AddFlags(cmd *cobra.Command) {
311323
// Add base flags
312324
cmd.Flags().String(FlagDBPath, def.DBPath, "path for the node database")
313325
cmd.Flags().Bool(FlagClearCache, def.ClearCache, "clear the cache")
314-
cmd.Flags().Bool(FlagP2PPrefer, def.PreferP2P, "prefer P2P over DA fetching")
326+
327+
// Sync configuration flags
328+
cmd.Flags().Bool(FlagSyncP2PPrefer, def.Sync.PreferP2P, "prefer P2P over DA fetching")
315329

316330
// Node configuration flags
317331
cmd.Flags().Bool(FlagAggregator, def.Node.Aggregator, "run node in aggregator mode")
@@ -359,10 +373,8 @@ func AddFlags(cmd *cobra.Command) {
359373
cmd.Flags().String(FlagSignerPath, def.Signer.SignerPath, "path to the signer file or address")
360374
cmd.Flags().String(FlagSignerPassphrase, "", "passphrase for the signer (required for file signer and if aggregator is enabled)")
361375

362-
// Only sync nodes can use the FlagP2PPrefer
363-
cmd.MarkFlagsMutuallyExclusive(FlagP2PPrefer, FlagAggregator)
364-
// Clearing cache on an aggregator is dangerous, should be done manually
365-
cmd.MarkFlagsMutuallyExclusive(FlagClearCache, FlagAggregator)
376+
// FlagSyncP2PPrefer is only relevant for sync nodes.
377+
cmd.MarkFlagsMutuallyExclusive(FlagSyncP2PPrefer, FlagAggregator)
366378
}
367379

368380
// Load loads the node configuration in the following order of precedence:

0 commit comments

Comments
 (0)