Skip to content

Commit 7109b0e

Browse files
authored
feat(da): add Go DA interface and in-memory mock for fibre (#3256)
Adds a fibremock package with: - DA interface (Upload/Download/Listen) matching the fibre gRPC service - In-memory MockDA implementation with LRU eviction and configurable retention - Tests covering all paths Migrated from celestiaorg/x402-risotto#16 as-is for integration.
1 parent 7278685 commit 7109b0e

File tree

3 files changed

+522
-0
lines changed

3 files changed

+522
-0
lines changed

pkg/da/fibremock/fibre.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Package fibre provides a Go client interface and mock implementation for the
2+
// Fibre DA (Data Availability) gRPC service.
3+
//
4+
// # Design Assumptions
5+
//
6+
// - The sequencer trusts the encoder to eventually confirm blob inclusion.
7+
// Upload returns after the blob is uploaded and the PFF transaction is
8+
// broadcast, NOT after on-chain confirmation. This keeps the sequencer's
9+
// write path fast (~2s per 128 MB blob).
10+
//
11+
// - Callers are expected to batch/buffer their data into blobs sized for the
12+
// protocol maximum (128 MiB - 5 byte header = 134,217,723 bytes).
13+
// The interface accepts arbitrary sizes but the implementation may batch
14+
// or reject oversized blobs.
15+
//
16+
// - Confirmation/finality is intentionally omitted from the initial API.
17+
// The sequencer does not need it; the read path (Listen + Download) is
18+
// sufficient for full nodes. A Status or Confirm RPC can be added later
19+
// if needed without breaking existing callers.
20+
//
21+
// - Blob ordering is encoded in the blob data itself by the caller.
22+
// The interface does not impose or guarantee ordering.
23+
//
24+
// - The interface is the same whether the encoder runs in-process or as an
25+
// external gRPC service. For in-process use, call the mock or real
26+
// implementation directly; for external use, connect via gRPC.
27+
package fibremock
28+
29+
import (
30+
"context"
31+
"time"
32+
)
33+
34+
// BlobID uniquely identifies an uploaded blob (version byte + 32-byte commitment).
35+
type BlobID []byte
36+
37+
// UploadResult is returned by Upload after the blob is accepted.
38+
type UploadResult struct {
39+
// BlobID uniquely identifies the uploaded blob.
40+
BlobID BlobID
41+
// ExpiresAt is when the blob will be pruned from the DA network.
42+
// Consumers must download before this time.
43+
ExpiresAt time.Time
44+
}
45+
46+
// BlobEvent is delivered via Listen when a blob is confirmed on-chain.
47+
type BlobEvent struct {
48+
// BlobID of the confirmed blob.
49+
BlobID BlobID
50+
// Height is the chain height at which the blob was confirmed.
51+
Height uint64
52+
// DataSize is the size of the original blob data in bytes (from the PFF).
53+
// This allows full nodes to know the size before downloading.
54+
DataSize uint64
55+
}
56+
57+
// DA is the interface for interacting with the Fibre data availability layer.
58+
//
59+
// Implementations include:
60+
// - MockDA: in-memory mock for testing
61+
// - (future) gRPC client wrapping the Fibre service
62+
// - (future) in-process encoder using fibre.Client directly
63+
type DA interface {
64+
// Upload submits a blob under the given namespace to the DA network.
65+
// Returns after the blob is uploaded and the payment transaction is broadcast.
66+
// Does NOT wait for on-chain confirmation (see package doc for rationale).
67+
//
68+
// The caller is responsible for batching data to the target blob size.
69+
Upload(ctx context.Context, namespace []byte, data []byte) (UploadResult, error)
70+
71+
// Download retrieves and reconstructs a blob by its ID.
72+
// Returns the original data that was passed to Upload.
73+
Download(ctx context.Context, blobID BlobID) ([]byte, error)
74+
75+
// Listen streams confirmed blob events for the given namespace.
76+
// The returned channel is closed when the context is cancelled.
77+
// Each event includes the blob ID, confirmation height, and data size.
78+
Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error)
79+
}

pkg/da/fibremock/mock.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
package fibremock
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"errors"
7+
"fmt"
8+
"sync"
9+
"time"
10+
)
11+
12+
var (
13+
// ErrBlobNotFound is returned when a blob ID is not in the store.
14+
ErrBlobNotFound = errors.New("blob not found")
15+
// ErrDataEmpty is returned when Upload is called with empty data.
16+
ErrDataEmpty = errors.New("data cannot be empty")
17+
)
18+
19+
// MockDAConfig configures the mock DA implementation.
20+
type MockDAConfig struct {
21+
// MaxBlobs is the maximum number of blobs stored in memory.
22+
// When exceeded, the oldest blob is evicted regardless of retention.
23+
// 0 means no limit (use with caution — large blobs will OOM).
24+
MaxBlobs int
25+
// Retention is how long blobs are kept before automatic pruning.
26+
// 0 means blobs are kept until evicted by MaxBlobs.
27+
Retention time.Duration
28+
}
29+
30+
// DefaultMockDAConfig returns a config suitable for testing:
31+
// 100 blobs max, 10 minute retention.
32+
func DefaultMockDAConfig() MockDAConfig {
33+
return MockDAConfig{
34+
MaxBlobs: 100,
35+
Retention: 10 * time.Minute,
36+
}
37+
}
38+
39+
// storedBlob holds a blob and its metadata in the mock store.
40+
type storedBlob struct {
41+
namespace []byte
42+
data []byte
43+
height uint64
44+
expiresAt time.Time
45+
createdAt time.Time
46+
}
47+
48+
// subscriber tracks a Listen subscription.
49+
type subscriber struct {
50+
namespace []byte
51+
ch chan BlobEvent
52+
}
53+
54+
// MockDA is an in-memory mock implementation of the DA interface.
55+
// It stores blobs in memory with configurable retention and max blob count.
56+
// Safe for concurrent use.
57+
type MockDA struct {
58+
cfg MockDAConfig
59+
60+
mu sync.RWMutex
61+
blobs map[string]*storedBlob // keyed by hex(blobID)
62+
order []string // insertion order for LRU eviction
63+
height uint64
64+
subscribers []subscriber
65+
}
66+
67+
// NewMockDA creates a new mock DA with the given config.
68+
func NewMockDA(cfg MockDAConfig) *MockDA {
69+
return &MockDA{
70+
cfg: cfg,
71+
blobs: make(map[string]*storedBlob),
72+
}
73+
}
74+
75+
// Upload stores the blob in memory and notifies listeners.
76+
func (m *MockDA) Upload(ctx context.Context, namespace []byte, data []byte) (UploadResult, error) {
77+
if len(data) == 0 {
78+
return UploadResult{}, ErrDataEmpty
79+
}
80+
81+
blobID := mockBlobID(data)
82+
key := fmt.Sprintf("%x", blobID)
83+
now := time.Now()
84+
85+
var expiresAt time.Time
86+
if m.cfg.Retention > 0 {
87+
expiresAt = now.Add(m.cfg.Retention)
88+
}
89+
90+
m.mu.Lock()
91+
92+
// Evict oldest if at capacity
93+
if m.cfg.MaxBlobs > 0 && len(m.blobs) >= m.cfg.MaxBlobs {
94+
m.evictOldestLocked()
95+
}
96+
97+
// Prune expired blobs opportunistically
98+
if m.cfg.Retention > 0 {
99+
m.pruneExpiredLocked(now)
100+
}
101+
102+
m.height++
103+
height := m.height
104+
105+
m.blobs[key] = &storedBlob{
106+
namespace: namespace,
107+
data: data,
108+
height: height,
109+
expiresAt: expiresAt,
110+
createdAt: now,
111+
}
112+
m.order = append(m.order, key)
113+
114+
// Notify subscribers (non-blocking)
115+
event := BlobEvent{
116+
BlobID: blobID,
117+
Height: height,
118+
DataSize: uint64(len(data)),
119+
}
120+
for i := range m.subscribers {
121+
if namespaceMatch(m.subscribers[i].namespace, namespace) {
122+
select {
123+
case m.subscribers[i].ch <- event:
124+
default:
125+
// Channel full, drop event. Subscriber is too slow.
126+
}
127+
}
128+
}
129+
130+
m.mu.Unlock()
131+
132+
return UploadResult{
133+
BlobID: blobID,
134+
ExpiresAt: expiresAt,
135+
}, nil
136+
}
137+
138+
// Download retrieves a blob by ID.
139+
func (m *MockDA) Download(ctx context.Context, blobID BlobID) ([]byte, error) {
140+
key := fmt.Sprintf("%x", blobID)
141+
142+
m.mu.RLock()
143+
blob, ok := m.blobs[key]
144+
m.mu.RUnlock()
145+
146+
if !ok {
147+
return nil, ErrBlobNotFound
148+
}
149+
150+
if !blob.expiresAt.IsZero() && time.Now().After(blob.expiresAt) {
151+
return nil, ErrBlobNotFound
152+
}
153+
154+
return blob.data, nil
155+
}
156+
157+
// Listen returns a channel that receives events when blobs matching the
158+
// namespace are uploaded. The channel is closed when ctx is cancelled.
159+
func (m *MockDA) Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error) {
160+
ch := make(chan BlobEvent, 64)
161+
162+
m.mu.Lock()
163+
idx := len(m.subscribers)
164+
m.subscribers = append(m.subscribers, subscriber{
165+
namespace: namespace,
166+
ch: ch,
167+
})
168+
m.mu.Unlock()
169+
170+
// Clean up when context is done.
171+
go func() {
172+
<-ctx.Done()
173+
m.mu.Lock()
174+
// Remove subscriber by swapping with last
175+
last := len(m.subscribers) - 1
176+
if idx <= last {
177+
m.subscribers[idx] = m.subscribers[last]
178+
}
179+
m.subscribers = m.subscribers[:last]
180+
m.mu.Unlock()
181+
close(ch)
182+
}()
183+
184+
return ch, nil
185+
}
186+
187+
// BlobCount returns the number of blobs currently stored.
188+
func (m *MockDA) BlobCount() int {
189+
m.mu.RLock()
190+
defer m.mu.RUnlock()
191+
return len(m.blobs)
192+
}
193+
194+
// evictOldestLocked removes the oldest blob. Caller must hold m.mu.
195+
func (m *MockDA) evictOldestLocked() {
196+
if len(m.order) == 0 {
197+
return
198+
}
199+
key := m.order[0]
200+
m.order = m.order[1:]
201+
delete(m.blobs, key)
202+
}
203+
204+
// pruneExpiredLocked removes blobs past their retention. Caller must hold m.mu.
205+
func (m *MockDA) pruneExpiredLocked(now time.Time) {
206+
surviving := m.order[:0]
207+
for _, key := range m.order {
208+
blob, ok := m.blobs[key]
209+
if !ok {
210+
continue
211+
}
212+
if !blob.expiresAt.IsZero() && now.After(blob.expiresAt) {
213+
delete(m.blobs, key)
214+
} else {
215+
surviving = append(surviving, key)
216+
}
217+
}
218+
m.order = surviving
219+
}
220+
221+
// namespaceMatch returns true if the subscription namespace matches the blob namespace.
222+
// An empty subscription namespace matches all namespaces (wildcard).
223+
func namespaceMatch(subNS, blobNS []byte) bool {
224+
if len(subNS) == 0 {
225+
return true
226+
}
227+
if len(subNS) != len(blobNS) {
228+
return false
229+
}
230+
for i := range subNS {
231+
if subNS[i] != blobNS[i] {
232+
return false
233+
}
234+
}
235+
return true
236+
}
237+
238+
// mockBlobID produces a deterministic blob ID from the data.
239+
// Format: 1 byte version (0) + 32 bytes SHA256 hash.
240+
func mockBlobID(data []byte) BlobID {
241+
hash := sha256.Sum256(data)
242+
id := make([]byte, 33)
243+
id[0] = 0 // version byte
244+
copy(id[1:], hash[:])
245+
return id
246+
}

0 commit comments

Comments
 (0)