Skip to content

Commit bc016bd

Browse files
committed
feedback
1 parent 066544c commit bc016bd

2 files changed

Lines changed: 32 additions & 5 deletions

File tree

block/internal/reaping/reaper.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ func (r *Reaper) reaperLoop() {
9696
Int("consecutive_failures", consecutiveFailures).
9797
Dur("backoff", backoff).
9898
Msg("reaper error, backing off")
99-
r.wait(backoff, cleanupTicker.C)
99+
if r.wait(backoff, nil) {
100+
return
101+
}
100102
continue
101103
}
102104

@@ -109,21 +111,28 @@ func (r *Reaper) reaperLoop() {
109111
continue
110112
}
111113

112-
r.wait(r.interval, cleanupTicker.C)
114+
if r.wait(r.interval, cleanupTicker.C) {
115+
return
116+
}
113117
}
114118
}
115119

116-
func (r *Reaper) wait(d time.Duration, cleanupCh <-chan time.Time) {
120+
// wait blocks for the given duration. Returns true if the context was cancelled.
121+
// When cleanupCh is non-nil, processes cache cleanup if that channel fires first.
122+
func (r *Reaper) wait(d time.Duration, cleanupCh <-chan time.Time) bool {
117123
timer := time.NewTimer(d)
118124
defer timer.Stop()
119125
select {
120126
case <-r.ctx.Done():
127+
return true
121128
case <-cleanupCh:
122129
removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention)
123130
if removed > 0 {
124131
r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes")
125132
}
133+
return false
126134
case <-timer.C:
135+
return false
127136
}
128137
}
129138

@@ -156,7 +165,7 @@ func (r *Reaper) drainMempool() (bool, error) {
156165

157166
filtered := r.filterNewTxs(txs)
158167
if len(filtered) == 0 {
159-
continue
168+
break
160169
}
161170

162171
n, err := r.submitFiltered(filtered)

block/internal/reaping/reaper_test.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ func TestReaper_AllSeen_NoSubmit(t *testing.T) {
105105
env.cache.SetTxSeen(hashTx(tx2))
106106

107107
env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once()
108-
env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once()
109108

110109
submitted, err := env.reaper.drainMempool()
111110
assert.NoError(t, err)
@@ -232,3 +231,22 @@ func TestReaper_NilCallback_NoPanic(t *testing.T) {
232231
assert.NoError(t, err)
233232
assert.True(t, submitted)
234233
}
234+
235+
func TestReaper_StopTerminates(t *testing.T) {
236+
env := newTestEnv(t)
237+
env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Maybe()
238+
239+
require.NoError(t, env.reaper.Start(context.Background()))
240+
241+
done := make(chan struct{})
242+
go func() {
243+
env.reaper.Stop()
244+
close(done)
245+
}()
246+
247+
select {
248+
case <-done:
249+
case <-time.After(2 * time.Second):
250+
t.Fatal("Stop() did not return in time")
251+
}
252+
}

0 commit comments

Comments
 (0)