Skip to content

Commit 066544c

Browse files
committed
refactor: reaper to drain mempool
1 parent 6f09600 commit 066544c

5 files changed

Lines changed: 275 additions & 234 deletions

File tree

block/components.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,9 @@ func newAggregatorComponents(
278278
sequencer,
279279
genesis,
280280
logger,
281-
executor,
282281
cacheManager,
283282
config.Node.ScrapeInterval.Duration,
283+
executor.NotifyNewTransactions,
284284
)
285285
if err != nil {
286286
return nil, fmt.Errorf("failed to create reaper: %w", err)

block/internal/cache/manager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type CacheManager interface {
5353
// Transaction operations
5454
IsTxSeen(hash string) bool
5555
SetTxSeen(hash string)
56+
SetTxsSeen(hashes []string)
5657
CleanupOldTxs(olderThan time.Duration) int
5758

5859
// Pending events syncing coordination
@@ -210,6 +211,14 @@ func (m *implementation) SetTxSeen(hash string) {
210211
m.txTimestamps.Store(hash, time.Now())
211212
}
212213

214+
func (m *implementation) SetTxsSeen(hashes []string) {
215+
now := time.Now()
216+
for _, hash := range hashes {
217+
m.txCache.setSeen(hash, 0)
218+
m.txTimestamps.Store(hash, now)
219+
}
220+
}
221+
213222
// CleanupOldTxs removes transaction hashes older than olderThan and returns
214223
// the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0.
215224
func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {

block/internal/reaping/reaper.go

Lines changed: 110 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/rs/zerolog"
1313

1414
"github.com/evstack/ev-node/block/internal/cache"
15-
"github.com/evstack/ev-node/block/internal/executing"
1615
coreexecutor "github.com/evstack/ev-node/core/execution"
1716
coresequencer "github.com/evstack/ev-node/core/sequencer"
1817
"github.com/evstack/ev-node/pkg/genesis"
@@ -21,40 +20,35 @@ import (
2120
const (
2221
// MaxBackoffInterval is the maximum backoff interval for retries
2322
MaxBackoffInterval = 30 * time.Second
23+
CleanupInterval = 1 * time.Hour
2424
)
2525

2626
// Reaper is responsible for periodically retrieving transactions from the executor,
2727
// filtering out already seen transactions, and submitting new transactions to the sequencer.
2828
type Reaper struct {
29-
exec coreexecutor.Executor
30-
sequencer coresequencer.Sequencer
31-
chainID string
32-
interval time.Duration
33-
cache cache.CacheManager
34-
executor *executing.Executor
35-
36-
// shared components
29+
exec coreexecutor.Executor
30+
sequencer coresequencer.Sequencer
31+
chainID string
32+
interval time.Duration
33+
cache cache.CacheManager
34+
onTxsSubmitted func()
35+
3736
logger zerolog.Logger
3837

39-
// Lifecycle
4038
ctx context.Context
4139
cancel context.CancelFunc
4240
wg sync.WaitGroup
4341
}
4442

45-
// NewReaper creates a new Reaper instance.
4643
func NewReaper(
4744
exec coreexecutor.Executor,
4845
sequencer coresequencer.Sequencer,
4946
genesis genesis.Genesis,
5047
logger zerolog.Logger,
51-
executor *executing.Executor,
5248
cache cache.CacheManager,
5349
scrapeInterval time.Duration,
50+
onTxsSubmitted func(),
5451
) (*Reaper, error) {
55-
if executor == nil {
56-
return nil, errors.New("executor cannot be nil")
57-
}
5852
if cache == nil {
5953
return nil, errors.New("cache cannot be nil")
6054
}
@@ -63,13 +57,13 @@ func NewReaper(
6357
}
6458

6559
return &Reaper{
66-
exec: exec,
67-
sequencer: sequencer,
68-
chainID: genesis.ChainID,
69-
interval: scrapeInterval,
70-
logger: logger.With().Str("component", "reaper").Logger(),
71-
cache: cache,
72-
executor: executor,
60+
exec: exec,
61+
sequencer: sequencer,
62+
chainID: genesis.ChainID,
63+
interval: scrapeInterval,
64+
logger: logger.With().Str("component", "reaper").Logger(),
65+
cache: cache,
66+
onTxsSubmitted: onTxsSubmitted,
7367
}, nil
7468
}
7569

@@ -80,54 +74,56 @@ func (r *Reaper) Start(ctx context.Context) error {
8074
// Start reaper loop
8175
r.wg.Go(r.reaperLoop)
8276

83-
r.logger.Info().Dur("interval", r.interval).Msg("reaper started")
77+
r.logger.Info().Dur("idle_interval", r.interval).Msg("reaper started")
8478
return nil
8579
}
8680

8781
func (r *Reaper) reaperLoop() {
88-
ticker := time.NewTicker(r.interval)
89-
defer ticker.Stop()
90-
91-
cleanupTicker := time.NewTicker(1 * time.Hour)
82+
cleanupTicker := time.NewTicker(CleanupInterval)
9283
defer cleanupTicker.Stop()
9384

9485
consecutiveFailures := 0
9586

9687
for {
97-
select {
98-
case <-r.ctx.Done():
99-
return
100-
case <-ticker.C:
101-
err := r.SubmitTxs()
102-
if err != nil {
103-
// Increment failure counter and apply exponential backoff
104-
consecutiveFailures++
105-
backoff := r.interval * time.Duration(1<<min(consecutiveFailures, 5)) // Cap at 2^5 = 32x
106-
backoff = min(backoff, MaxBackoffInterval)
107-
r.logger.Warn().
108-
Err(err).
109-
Int("consecutive_failures", consecutiveFailures).
110-
Dur("next_retry_in", backoff).
111-
Msg("reaper encountered error, applying backoff")
112-
113-
// Reset ticker with backoff interval
114-
ticker.Reset(backoff)
115-
} else {
116-
// Reset failure counter and backoff on success
117-
if consecutiveFailures > 0 {
118-
r.logger.Info().Msg("reaper recovered from errors, resetting backoff")
119-
consecutiveFailures = 0
120-
ticker.Reset(r.interval)
121-
}
122-
}
123-
case <-cleanupTicker.C:
124-
// Clean up transaction hashes older than 24 hours
125-
// This prevents unbounded growth of the transaction seen cache
126-
removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention)
127-
if removed > 0 {
128-
r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes")
129-
}
88+
submitted, err := r.drainMempool()
89+
90+
if err != nil {
91+
consecutiveFailures++
92+
backoff := r.interval * time.Duration(1<<min(consecutiveFailures, 5))
93+
backoff = min(backoff, MaxBackoffInterval)
94+
r.logger.Warn().
95+
Err(err).
96+
Int("consecutive_failures", consecutiveFailures).
97+
Dur("backoff", backoff).
98+
Msg("reaper error, backing off")
99+
r.wait(backoff, cleanupTicker.C)
100+
continue
101+
}
102+
103+
if consecutiveFailures > 0 {
104+
r.logger.Info().Msg("reaper recovered from errors")
105+
consecutiveFailures = 0
106+
}
107+
108+
if submitted {
109+
continue
110+
}
111+
112+
r.wait(r.interval, cleanupTicker.C)
113+
}
114+
}
115+
116+
func (r *Reaper) wait(d time.Duration, cleanupCh <-chan time.Time) {
117+
timer := time.NewTimer(d)
118+
defer timer.Stop()
119+
select {
120+
case <-r.ctx.Done():
121+
case <-cleanupCh:
122+
removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention)
123+
if removed > 0 {
124+
r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes")
130125
}
126+
case <-timer.C:
131127
}
132128
}
133129

@@ -137,60 +133,78 @@ func (r *Reaper) Stop() error {
137133
r.cancel()
138134
}
139135
r.wg.Wait()
140-
141136
r.logger.Info().Msg("reaper stopped")
142137
return nil
143138
}
144139

145-
// SubmitTxs retrieves transactions from the executor and submits them to the sequencer.
146-
// Returns an error if any critical operation fails.
147-
func (r *Reaper) SubmitTxs() error {
148-
txs, err := r.exec.GetTxs(r.ctx)
149-
if err != nil {
150-
r.logger.Error().Err(err).Msg("failed to get txs from executor")
151-
return fmt.Errorf("failed to get txs from executor: %w", err)
140+
type pendingTx struct {
141+
tx []byte
142+
hash string
143+
}
144+
145+
func (r *Reaper) drainMempool() (bool, error) {
146+
var totalSubmitted int
147+
148+
for {
149+
txs, err := r.exec.GetTxs(r.ctx)
150+
if err != nil {
151+
return totalSubmitted > 0, fmt.Errorf("failed to get txs from executor: %w", err)
152+
}
153+
if len(txs) == 0 {
154+
break
155+
}
156+
157+
filtered := r.filterNewTxs(txs)
158+
if len(filtered) == 0 {
159+
continue
160+
}
161+
162+
n, err := r.submitFiltered(filtered)
163+
if err != nil {
164+
return totalSubmitted > 0, err
165+
}
166+
totalSubmitted += n
152167
}
153-
if len(txs) == 0 {
154-
r.logger.Debug().Msg("no new txs")
155-
return nil
168+
169+
if totalSubmitted > 0 {
170+
r.logger.Debug().Int("total_txs", totalSubmitted).Msg("drained mempool")
171+
if r.onTxsSubmitted != nil {
172+
r.onTxsSubmitted()
173+
}
156174
}
157175

158-
var newTxs [][]byte
176+
return totalSubmitted > 0, nil
177+
}
178+
179+
func (r *Reaper) filterNewTxs(txs [][]byte) []pendingTx {
180+
pending := make([]pendingTx, 0, len(txs))
159181
for _, tx := range txs {
160-
txHash := hashTx(tx)
161-
if !r.cache.IsTxSeen(txHash) {
162-
newTxs = append(newTxs, tx)
182+
h := hashTx(tx)
183+
if !r.cache.IsTxSeen(h) {
184+
pending = append(pending, pendingTx{tx: tx, hash: h})
163185
}
164186
}
187+
return pending
188+
}
165189

166-
if len(newTxs) == 0 {
167-
r.logger.Debug().Msg("no new txs to submit")
168-
return nil
190+
func (r *Reaper) submitFiltered(batch []pendingTx) (int, error) {
191+
txs := make([][]byte, len(batch))
192+
hashes := make([]string, len(batch))
193+
for i, p := range batch {
194+
txs[i] = p.tx
195+
hashes[i] = p.hash
169196
}
170197

171-
r.logger.Debug().Int("txCount", len(newTxs)).Msg("submitting txs to sequencer")
172-
173-
_, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
198+
_, err := r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
174199
Id: []byte(r.chainID),
175-
Batch: &coresequencer.Batch{Transactions: newTxs},
200+
Batch: &coresequencer.Batch{Transactions: txs},
176201
})
177202
if err != nil {
178-
return fmt.Errorf("failed to submit txs to sequencer: %w", err)
179-
}
180-
181-
for _, tx := range newTxs {
182-
txHash := hashTx(tx)
183-
r.cache.SetTxSeen(txHash)
184-
}
185-
186-
// Notify the executor that new transactions are available
187-
if len(newTxs) > 0 {
188-
r.logger.Debug().Msg("notifying executor of new transactions")
189-
r.executor.NotifyNewTransactions()
203+
return 0, fmt.Errorf("failed to submit txs to sequencer: %w", err)
190204
}
191205

192-
r.logger.Debug().Msg("successfully submitted txs")
193-
return nil
206+
r.cache.SetTxsSeen(hashes)
207+
return len(txs), nil
194208
}
195209

196210
func hashTx(tx []byte) string {

0 commit comments

Comments
 (0)