Skip to content

Commit 3ea90e3

Browse files
committed
Merge branch 'julien/speedup' into julien/testapp
2 parents cf5966c + 989f43d commit 3ea90e3

2 files changed

Lines changed: 330 additions & 1 deletion

File tree

Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
package reaping
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"fmt"
7+
"sync"
8+
"sync/atomic"
9+
"testing"
10+
"time"
11+
12+
ds "github.com/ipfs/go-datastore"
13+
dssync "github.com/ipfs/go-datastore/sync"
14+
"github.com/rs/zerolog"
15+
16+
"github.com/evstack/ev-node/block/internal/cache"
17+
coreexecutor "github.com/evstack/ev-node/core/execution"
18+
coresequencer "github.com/evstack/ev-node/core/sequencer"
19+
"github.com/evstack/ev-node/pkg/config"
20+
"github.com/evstack/ev-node/pkg/genesis"
21+
"github.com/evstack/ev-node/pkg/store"
22+
)
23+
24+
func newBenchCache(b *testing.B) cache.CacheManager {
25+
b.Helper()
26+
cfg := config.Config{RootDir: b.TempDir()}
27+
memDS := dssync.MutexWrap(ds.NewMapDatastore())
28+
st := store.New(memDS)
29+
cm, err := cache.NewManager(cfg, st, zerolog.Nop())
30+
if err != nil {
31+
b.Fatal(err)
32+
}
33+
return cm
34+
}
35+
36+
type infiniteExecutor struct {
37+
mu sync.Mutex
38+
batch [][]byte
39+
pending atomic.Int64
40+
}
41+
42+
func (e *infiniteExecutor) Feed(n int, txSize int) {
43+
txs := make([][]byte, n)
44+
for i := range txs {
45+
tx := make([]byte, txSize)
46+
_, _ = rand.Read(tx)
47+
txs[i] = tx
48+
}
49+
e.mu.Lock()
50+
e.batch = txs
51+
e.mu.Unlock()
52+
e.pending.Add(int64(n))
53+
}
54+
55+
func (e *infiniteExecutor) GetTxs(_ context.Context) ([][]byte, error) {
56+
e.mu.Lock()
57+
txs := e.batch
58+
e.batch = nil
59+
e.mu.Unlock()
60+
return txs, nil
61+
}
62+
63+
func (e *infiniteExecutor) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) {
64+
return nil, nil
65+
}
66+
67+
func (e *infiniteExecutor) FilterTxs(_ context.Context, txs [][]byte, _ uint64, _ uint64, _ bool) ([]coreexecutor.FilterStatus, error) {
68+
return make([]coreexecutor.FilterStatus, len(txs)), nil
69+
}
70+
71+
func (e *infiniteExecutor) GetExecutionInfo(_ context.Context) (coreexecutor.ExecutionInfo, error) {
72+
return coreexecutor.ExecutionInfo{}, nil
73+
}
74+
75+
func (e *infiniteExecutor) InitChain(_ context.Context, _ time.Time, _ uint64, _ string) ([]byte, error) {
76+
return nil, nil
77+
}
78+
79+
func (e *infiniteExecutor) SetFinal(_ context.Context, _ uint64) error { return nil }
80+
81+
type countingSequencer struct {
82+
submitted atomic.Int64
83+
}
84+
85+
func (s *countingSequencer) SubmitBatchTxs(_ context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) {
86+
s.submitted.Add(int64(len(req.Batch.Transactions)))
87+
return &coresequencer.SubmitBatchTxsResponse{}, nil
88+
}
89+
90+
func (s *countingSequencer) GetNextBatch(_ context.Context, _ coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) {
91+
return &coresequencer.GetNextBatchResponse{}, nil
92+
}
93+
94+
func (s *countingSequencer) VerifyBatch(_ context.Context, _ coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) {
95+
return &coresequencer.VerifyBatchResponse{}, nil
96+
}
97+
98+
func (s *countingSequencer) GetDAHeight() uint64 { return 0 }
99+
func (s *countingSequencer) SetDAHeight(_ uint64) {}
100+
101+
func benchmarkReaperFlow(b *testing.B, batchSize int, txSize int, feedInterval time.Duration) {
102+
scenario := fmt.Sprintf("batch=%d/txSize=%d", batchSize, txSize)
103+
104+
b.Run(scenario, func(b *testing.B) {
105+
exec := &infiniteExecutor{}
106+
seq := &countingSequencer{}
107+
108+
cm := newBenchCache(b)
109+
var notified atomic.Int64
110+
111+
r, err := NewReaper(
112+
exec,
113+
seq,
114+
genesis.Genesis{ChainID: "bench"},
115+
zerolog.Nop(),
116+
cm,
117+
50*time.Millisecond,
118+
func() { notified.Add(1) },
119+
)
120+
if err != nil {
121+
b.Fatal(err)
122+
}
123+
124+
ctx, cancel := context.WithCancel(context.Background())
125+
defer cancel()
126+
127+
if err := r.Start(ctx); err != nil {
128+
b.Fatal(err)
129+
}
130+
131+
stopFeeding := make(chan struct{})
132+
feederDone := make(chan struct{})
133+
go func() {
134+
defer close(feederDone)
135+
ticker := time.NewTicker(feedInterval)
136+
defer ticker.Stop()
137+
for {
138+
select {
139+
case <-stopFeeding:
140+
return
141+
case <-ticker.C:
142+
exec.Feed(batchSize, txSize)
143+
}
144+
}
145+
}()
146+
147+
b.ResetTimer()
148+
149+
var lastCount int64
150+
for i := 0; i < b.N; i++ {
151+
exec.Feed(batchSize, txSize)
152+
153+
deadline := time.After(5 * time.Second)
154+
expected := seq.submitted.Load() + int64(batchSize)
155+
for {
156+
cur := seq.submitted.Load()
157+
if cur >= expected {
158+
lastCount = cur
159+
break
160+
}
161+
select {
162+
case <-deadline:
163+
b.Fatalf("timeout: submitted %d, expected >= %d", cur, expected)
164+
default:
165+
}
166+
}
167+
}
168+
169+
b.StopTimer()
170+
close(stopFeeding)
171+
<-feederDone
172+
cancel()
173+
r.Stop()
174+
175+
b.ReportMetric(float64(lastCount)/b.Elapsed().Seconds(), "txs/sec")
176+
})
177+
}
178+
179+
func BenchmarkReaperFlow_Throughput(b *testing.B) {
180+
sizes := []int{256, 1024, 4096}
181+
batches := []int{10, 100, 500}
182+
183+
for _, batchSize := range batches {
184+
for _, txSize := range sizes {
185+
benchmarkReaperFlow(b, batchSize, txSize, 10*time.Millisecond)
186+
}
187+
}
188+
}
189+
190+
func BenchmarkReaperFlow_Sustained(b *testing.B) {
191+
b.Run("steady_100txs_256B", func(b *testing.B) {
192+
exec := &infiniteExecutor{}
193+
seq := &countingSequencer{}
194+
cm := newBenchCache(b)
195+
var notified atomic.Int64
196+
197+
r, err := NewReaper(
198+
exec, seq,
199+
genesis.Genesis{ChainID: "bench"},
200+
zerolog.Nop(), cm,
201+
10*time.Millisecond,
202+
func() { notified.Add(1) },
203+
)
204+
if err != nil {
205+
b.Fatal(err)
206+
}
207+
208+
ctx, cancel := context.WithCancel(context.Background())
209+
defer cancel()
210+
211+
if err := r.Start(ctx); err != nil {
212+
b.Fatal(err)
213+
}
214+
215+
const batchSize = 100
216+
const txSize = 256
217+
const duration = 3 * time.Second
218+
219+
feederDone := make(chan struct{})
220+
go func() {
221+
defer close(feederDone)
222+
for {
223+
exec.Feed(batchSize, txSize)
224+
time.Sleep(time.Millisecond)
225+
select {
226+
case <-ctx.Done():
227+
return
228+
default:
229+
}
230+
}
231+
}()
232+
233+
b.ResetTimer()
234+
time.Sleep(duration)
235+
b.StopTimer()
236+
237+
cancel()
238+
r.Stop()
239+
<-feederDone
240+
241+
total := seq.submitted.Load()
242+
elapsed := b.Elapsed().Seconds()
243+
b.ReportMetric(float64(total)/elapsed, "txs/sec")
244+
b.ReportMetric(float64(notified.Load())/elapsed, "notifies/sec")
245+
b.Logf("submitted %d txs in %.1fs (%.0f txs/sec, %d notifications)", total, elapsed, float64(total)/elapsed, notified.Load())
246+
})
247+
}
248+
249+
func BenchmarkReaperFlow_StartStop(b *testing.B) {
250+
b.Run("lifecycle", func(b *testing.B) {
251+
exec := &infiniteExecutor{}
252+
seq := &countingSequencer{}
253+
cm := newBenchCache(b)
254+
255+
for i := 0; i < b.N; i++ {
256+
r, err := NewReaper(
257+
exec, seq,
258+
genesis.Genesis{ChainID: "bench"},
259+
zerolog.Nop(), cm,
260+
100*time.Millisecond,
261+
func() {},
262+
)
263+
if err != nil {
264+
b.Fatal(err)
265+
}
266+
267+
ctx, cancel := context.WithCancel(context.Background())
268+
if err := r.Start(ctx); err != nil {
269+
cancel()
270+
b.Fatal(err)
271+
}
272+
r.Stop()
273+
cancel()
274+
}
275+
})
276+
277+
b.Run("start_with_backlog", func(b *testing.B) {
278+
const batchSize = 500
279+
const txSize = 256
280+
281+
for i := 0; i < b.N; i++ {
282+
b.StopTimer()
283+
284+
exec := &infiniteExecutor{}
285+
seq := &countingSequencer{}
286+
cm := newBenchCache(b)
287+
288+
txs := make([][]byte, batchSize)
289+
for j := range txs {
290+
txs[j] = make([]byte, txSize)
291+
_, _ = rand.Read(txs[j])
292+
}
293+
exec.mu.Lock()
294+
exec.batch = txs
295+
exec.mu.Unlock()
296+
297+
r, err := NewReaper(
298+
exec, seq,
299+
genesis.Genesis{ChainID: "bench"},
300+
zerolog.Nop(), cm,
301+
10*time.Millisecond,
302+
func() {},
303+
)
304+
if err != nil {
305+
b.Fatal(err)
306+
}
307+
308+
ctx, cancel := context.WithCancel(context.Background())
309+
310+
b.StartTimer()
311+
if err := r.Start(ctx); err != nil {
312+
cancel()
313+
b.Fatal(err)
314+
}
315+
316+
deadline := time.After(5 * time.Second)
317+
for seq.submitted.Load() < batchSize {
318+
select {
319+
case <-deadline:
320+
b.Fatal("timeout waiting for backlog drain")
321+
default:
322+
}
323+
}
324+
b.StopTimer()
325+
326+
r.Stop()
327+
cancel()
328+
}
329+
})
330+
}

block/internal/syncing/syncer.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,6 @@ func (s *Syncer) Stop(ctx context.Context) error {
279279

280280
s.logger.Info().Msg("syncer stopped")
281281
close(s.heightInCh)
282-
s.cancel = nil
283282
return nil
284283
}
285284

0 commit comments

Comments
 (0)