Skip to content

Commit b751bb0

Browse files
committed
feat(syncer): sync strategies
1 parent b8314a3 commit b751bb0

2 files changed

Lines changed: 142 additions & 78 deletions

File tree

block/internal/syncing/syncer.go

Lines changed: 133 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ func (s *Syncer) syncLoop() {
247247
lastDataHeight := initialHeight
248248

249249
// Backoff control when DA replies with errors
250-
var hffDelay time.Duration
251250
var nextDARequestAt time.Time
252251

253252
blockTicker := time.NewTicker(s.config.Node.BlockTime.Duration)
@@ -262,94 +261,150 @@ func (s *Syncer) syncLoop() {
262261
// Process pending events from cache on every iteration
263262
s.processPendingEvents()
264263

265-
now := time.Now()
266-
daHeight := s.GetDAHeight()
267-
268-
// Respect backoff window if set
269-
if nextDARequestAt.IsZero() || now.After(nextDARequestAt) || now.Equal(nextDARequestAt) {
270-
// Retrieve from DA as fast as possible (unless throttled by HFF)
271-
// DaHeight is only increased on successful retrieval, it will retry on failure at the next iteration
272-
events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
273-
if err != nil {
274-
if errors.Is(err, coreda.ErrBlobNotFound) {
275-
// no data at this height, increase DA height
276-
// we do still want to check p2p
277-
s.SetDAHeight(daHeight + 1)
278-
279-
// Reset backoff on success
280-
nextDARequestAt = time.Time{}
281-
} else {
282-
// Back off exactly by DA block time to avoid overloading
283-
hffDelay = s.config.DA.BlockTime.Duration
284-
if hffDelay <= 0 {
285-
hffDelay = 2 * time.Second
286-
}
287-
nextDARequestAt = now.Add(hffDelay)
288-
289-
if s.isHeightFromFutureError(err) {
290-
s.logger.Debug().Dur("delay", hffDelay).Uint64("da_height", daHeight).Msg("height from future; backing off DA requests")
291-
} else {
292-
s.logger.Error().Err(err).Dur("delay", hffDelay).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests")
293-
}
294-
}
295-
} else {
296-
// Reset backoff on success
297-
nextDARequestAt = time.Time{}
298-
299-
// Process DA events
300-
for _, event := range events {
301-
select {
302-
case s.heightInCh <- event:
303-
default:
304-
s.cache.SetPendingEvent(event.Header.Height(), &event)
305-
}
306-
}
264+
// Try fetching from both DA and P2P in configurable order
265+
if s.tryFetchStrategies(&nextDARequestAt, &lastHeaderHeight, &lastDataHeight, blockTicker.C) {
266+
continue // events were processed, restart loop immediately
267+
}
307268

308-
// increment DA height on successful retrieval and continue immediately
309-
s.SetDAHeight(daHeight + 1)
310-
continue // event sent, no need to check p2p
311-
}
269+
// Prevent busy-waiting when no events are available
270+
waitTime := min(10*time.Millisecond, s.config.Node.BlockTime.Duration)
271+
time.Sleep(waitTime)
272+
}
273+
}
274+
275+
// tryFetchStrategies attempts to fetch from both DA and P2P based on configuration priority.
276+
// The order of fetching depends on the PreferP2P configuration setting:
277+
// - PreferP2P=false (default): DA first, then P2P (original and recommended behavior)
278+
// - PreferP2P=true: P2P first, then DA
279+
//
280+
// Both strategies are always attempted on each call, but the order affects which
281+
// events are processed first and may impact overall sync performance.
282+
func (s *Syncer) tryFetchStrategies(nextDARequestAt *time.Time, lastHeaderHeight, lastDataHeight *uint64, blockTicker <-chan time.Time) bool {
283+
eventsProcessed := false
284+
285+
if s.config.PreferP2P {
286+
if s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight, blockTicker) {
287+
eventsProcessed = true
288+
}
289+
if s.tryFetchFromDA(nextDARequestAt) {
290+
eventsProcessed = true
291+
}
292+
} else {
293+
if s.tryFetchFromDA(nextDARequestAt) {
294+
eventsProcessed = true
295+
}
296+
if s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight, blockTicker) {
297+
eventsProcessed = true
298+
}
299+
}
300+
301+
return eventsProcessed
302+
}
303+
304+
// tryFetchFromDA attempts to fetch events from the DA layer.
305+
// It handles backoff timing, DA height management, and error classification.
306+
// Returns true if any events were successfully processed.
307+
func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool {
308+
now := time.Now()
309+
daHeight := s.GetDAHeight()
310+
311+
// Respect backoff window if set
312+
if !nextDARequestAt.IsZero() && now.Before(*nextDARequestAt) {
313+
return false
314+
}
315+
316+
// Retrieve from DA as fast as possible (unless throttled by HFF)
317+
// DaHeight is only increased on successful retrieval, it will retry on failure at the next iteration
318+
events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
319+
if err != nil {
320+
if errors.Is(err, coreda.ErrBlobNotFound) {
321+
// no data at this height, increase DA height
322+
s.SetDAHeight(daHeight + 1)
323+
// Reset backoff on success
324+
*nextDARequestAt = time.Time{}
325+
return false
326+
}
327+
328+
// Back off exactly by DA block time to avoid overloading
329+
hffDelay := s.config.DA.BlockTime.Duration
330+
if hffDelay <= 0 {
331+
hffDelay = 2 * time.Second
312332
}
333+
*nextDARequestAt = now.Add(hffDelay)
313334

314-
// Opportunistically process any P2P signals
335+
if s.isHeightFromFutureError(err) {
336+
s.logger.Debug().Dur("delay", hffDelay).Uint64("da_height", daHeight).Msg("height from future; backing off DA requests")
337+
} else {
338+
s.logger.Error().Err(err).Dur("delay", hffDelay).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests")
339+
}
340+
return false
341+
}
342+
343+
// Reset backoff on success
344+
*nextDARequestAt = time.Time{}
345+
346+
// Process DA events
347+
for _, event := range events {
315348
select {
316-
case <-blockTicker.C:
317-
newHeaderHeight := s.headerStore.Height()
318-
if newHeaderHeight > lastHeaderHeight {
319-
events := s.p2pHandler.ProcessHeaderRange(s.ctx, lastHeaderHeight+1, newHeaderHeight)
320-
for _, event := range events {
321-
select {
322-
case s.heightInCh <- event:
323-
default:
324-
s.cache.SetPendingEvent(event.Header.Height(), &event)
325-
}
349+
case s.heightInCh <- event:
350+
default:
351+
s.cache.SetPendingEvent(event.Header.Height(), &event)
352+
}
353+
}
354+
355+
// increment DA height on successful retrieval
356+
s.SetDAHeight(daHeight + 1)
357+
return len(events) > 0
358+
}
359+
360+
// tryFetchFromP2P attempts to fetch events from P2P stores.
361+
// It processes both header and data ranges when the block ticker fires.
362+
// Returns true if any events were successfully processed.
363+
func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, blockTicker <-chan time.Time) bool {
364+
eventsProcessed := false
365+
366+
select {
367+
case <-blockTicker:
368+
// Process headers
369+
newHeaderHeight := s.headerStore.Height()
370+
if newHeaderHeight > *lastHeaderHeight {
371+
events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight)
372+
for _, event := range events {
373+
select {
374+
case s.heightInCh <- event:
375+
default:
376+
s.cache.SetPendingEvent(event.Header.Height(), &event)
326377
}
327-
lastHeaderHeight = newHeaderHeight
328378
}
329-
330-
newDataHeight := s.dataStore.Height()
331-
if newDataHeight == newHeaderHeight {
332-
lastDataHeight = newDataHeight
333-
continue
379+
*lastHeaderHeight = newHeaderHeight
380+
if len(events) > 0 {
381+
eventsProcessed = true
334382
}
335-
if newDataHeight > lastDataHeight {
336-
events := s.p2pHandler.ProcessDataRange(s.ctx, lastDataHeight+1, newDataHeight)
337-
for _, event := range events {
338-
select {
339-
case s.heightInCh <- event:
340-
default:
341-
s.cache.SetPendingEvent(event.Header.Height(), &event)
342-
}
383+
}
384+
385+
// Process data
386+
newDataHeight := s.dataStore.Height()
387+
if newDataHeight == newHeaderHeight {
388+
*lastDataHeight = newDataHeight
389+
} else if newDataHeight > *lastDataHeight {
390+
events := s.p2pHandler.ProcessDataRange(s.ctx, *lastDataHeight+1, newDataHeight)
391+
for _, event := range events {
392+
select {
393+
case s.heightInCh <- event:
394+
default:
395+
s.cache.SetPendingEvent(event.Header.Height(), &event)
343396
}
344-
lastDataHeight = newDataHeight
345397
}
346-
default:
347-
// Prevent busy-waiting when no events are available.
348-
waitTime := min(10*time.Millisecond, s.config.Node.BlockTime.Duration)
349-
350-
time.Sleep(waitTime)
398+
*lastDataHeight = newDataHeight
399+
if len(events) > 0 {
400+
eventsProcessed = true
401+
}
351402
}
403+
default:
404+
// No P2P events available
352405
}
406+
407+
return eventsProcessed
353408
}
354409

355410
func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {

pkg/config/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ const (
4949
FlagReadinessMaxBlocksBehind = FlagPrefixEvnode + "node.readiness_max_blocks_behind"
5050
// FlagClearCache is a flag for clearing the cache
5151
FlagClearCache = FlagPrefixEvnode + "clear_cache"
52+
// FlagP2PPrefer is a flag to prioritize p2p over da fetching
53+
FlagP2PPrefer = FlagPrefixEvnode + "prefer_p2p"
5254

5355
// Data Availability configuration flags
5456

@@ -131,6 +133,7 @@ const (
131133
type Config struct {
132134
RootDir string `mapstructure:"-" yaml:"-" comment:"Root directory where rollkit files are located"`
133135
ClearCache bool `mapstructure:"-" yaml:"-" comment:"Clear the cache"`
136+
PreferP2P bool `mapstructure:"-" yaml:"-" comment:"Prefer P2P over DA fetching"`
134137

135138
// Base configuration
136139
DBPath string `mapstructure:"db_path" yaml:"db_path" comment:"Path inside the root directory where the database is located"`
@@ -308,6 +311,7 @@ func AddFlags(cmd *cobra.Command) {
308311
// Add base flags
309312
cmd.Flags().String(FlagDBPath, def.DBPath, "path for the node database")
310313
cmd.Flags().Bool(FlagClearCache, def.ClearCache, "clear the cache")
314+
cmd.Flags().Bool(FlagP2PPrefer, def.PreferP2P, "prefer P2P over DA fetching")
311315

312316
// Node configuration flags
313317
cmd.Flags().Bool(FlagAggregator, def.Node.Aggregator, "run node in aggregator mode")
@@ -354,6 +358,11 @@ func AddFlags(cmd *cobra.Command) {
354358
cmd.Flags().String(FlagSignerType, def.Signer.SignerType, "type of signer to use (file, grpc)")
355359
cmd.Flags().String(FlagSignerPath, def.Signer.SignerPath, "path to the signer file or address")
356360
cmd.Flags().String(FlagSignerPassphrase, "", "passphrase for the signer (required for file signer and if aggregator is enabled)")
361+
362+
// Only sync nodes can use the FlagP2PPrefer
363+
cmd.MarkFlagsMutuallyExclusive(FlagP2PPrefer, FlagAggregator)
364+
// Clearing cache on an aggregator is dangerous, should be done manually
365+
cmd.MarkFlagsMutuallyExclusive(FlagClearCache, FlagAggregator)
357366
}
358367

359368
// Load loads the node configuration in the following order of precedence:

0 commit comments

Comments
 (0)