From 4f215105b4c94d95c8eae2e4a375db61c2f7f5b1 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Wed, 15 Apr 2026 18:48:38 +0200 Subject: [PATCH] feat(da): add Go DA interface and in-memory mock for fibre Adds a fibremock package with: - DA interface (Upload/Download/Listen) matching the fibre gRPC service - In-memory MockDA implementation with LRU eviction and configurable retention - Tests covering all paths Migrated from celestiaorg/x402-risotto#16 as-is for integration. --- pkg/da/fibremock/fibre.go | 79 +++++++++++ pkg/da/fibremock/mock.go | 246 ++++++++++++++++++++++++++++++++++ pkg/da/fibremock/mock_test.go | 197 +++++++++++++++++++++++++++ 3 files changed, 522 insertions(+) create mode 100644 pkg/da/fibremock/fibre.go create mode 100644 pkg/da/fibremock/mock.go create mode 100644 pkg/da/fibremock/mock_test.go diff --git a/pkg/da/fibremock/fibre.go b/pkg/da/fibremock/fibre.go new file mode 100644 index 0000000000..986c27cc3b --- /dev/null +++ b/pkg/da/fibremock/fibre.go @@ -0,0 +1,79 @@ +// Package fibre provides a Go client interface and mock implementation for the +// Fibre DA (Data Availability) gRPC service. +// +// # Design Assumptions +// +// - The sequencer trusts the encoder to eventually confirm blob inclusion. +// Upload returns after the blob is uploaded and the PFF transaction is +// broadcast, NOT after on-chain confirmation. This keeps the sequencer's +// write path fast (~2s per 128 MB blob). +// +// - Callers are expected to batch/buffer their data into blobs sized for the +// protocol maximum (128 MiB - 5 byte header = 134,217,723 bytes). +// The interface accepts arbitrary sizes but the implementation may batch +// or reject oversized blobs. +// +// - Confirmation/finality is intentionally omitted from the initial API. +// The sequencer does not need it; the read path (Listen + Download) is +// sufficient for full nodes. A Status or Confirm RPC can be added later +// if needed without breaking existing callers. +// +// - Blob ordering is encoded in the blob data itself by the caller. +// The interface does not impose or guarantee ordering. +// +// - The interface is the same whether the encoder runs in-process or as an +// external gRPC service. For in-process use, call the mock or real +// implementation directly; for external use, connect via gRPC. +package fibremock + +import ( + "context" + "time" +) + +// BlobID uniquely identifies an uploaded blob (version byte + 32-byte commitment). +type BlobID []byte + +// UploadResult is returned by Upload after the blob is accepted. +type UploadResult struct { + // BlobID uniquely identifies the uploaded blob. + BlobID BlobID + // ExpiresAt is when the blob will be pruned from the DA network. + // Consumers must download before this time. + ExpiresAt time.Time +} + +// BlobEvent is delivered via Listen when a blob is confirmed on-chain. +type BlobEvent struct { + // BlobID of the confirmed blob. + BlobID BlobID + // Height is the chain height at which the blob was confirmed. + Height uint64 + // DataSize is the size of the original blob data in bytes (from the PFF). + // This allows full nodes to know the size before downloading. + DataSize uint64 +} + +// DA is the interface for interacting with the Fibre data availability layer. +// +// Implementations include: +// - MockDA: in-memory mock for testing +// - (future) gRPC client wrapping the Fibre service +// - (future) in-process encoder using fibre.Client directly +type DA interface { + // Upload submits a blob under the given namespace to the DA network. + // Returns after the blob is uploaded and the payment transaction is broadcast. + // Does NOT wait for on-chain confirmation (see package doc for rationale). + // + // The caller is responsible for batching data to the target blob size. + Upload(ctx context.Context, namespace []byte, data []byte) (UploadResult, error) + + // Download retrieves and reconstructs a blob by its ID. + // Returns the original data that was passed to Upload. + Download(ctx context.Context, blobID BlobID) ([]byte, error) + + // Listen streams confirmed blob events for the given namespace. + // The returned channel is closed when the context is cancelled. + // Each event includes the blob ID, confirmation height, and data size. + Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error) +} diff --git a/pkg/da/fibremock/mock.go b/pkg/da/fibremock/mock.go new file mode 100644 index 0000000000..ee88552121 --- /dev/null +++ b/pkg/da/fibremock/mock.go @@ -0,0 +1,246 @@ +package fibremock + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "sync" + "time" +) + +var ( + // ErrBlobNotFound is returned when a blob ID is not in the store. + ErrBlobNotFound = errors.New("blob not found") + // ErrDataEmpty is returned when Upload is called with empty data. + ErrDataEmpty = errors.New("data cannot be empty") +) + +// MockDAConfig configures the mock DA implementation. +type MockDAConfig struct { + // MaxBlobs is the maximum number of blobs stored in memory. + // When exceeded, the oldest blob is evicted regardless of retention. + // 0 means no limit (use with caution — large blobs will OOM). + MaxBlobs int + // Retention is how long blobs are kept before automatic pruning. + // 0 means blobs are kept until evicted by MaxBlobs. + Retention time.Duration +} + +// DefaultMockDAConfig returns a config suitable for testing: +// 100 blobs max, 10 minute retention. +func DefaultMockDAConfig() MockDAConfig { + return MockDAConfig{ + MaxBlobs: 100, + Retention: 10 * time.Minute, + } +} + +// storedBlob holds a blob and its metadata in the mock store. +type storedBlob struct { + namespace []byte + data []byte + height uint64 + expiresAt time.Time + createdAt time.Time +} + +// subscriber tracks a Listen subscription. +type subscriber struct { + namespace []byte + ch chan BlobEvent +} + +// MockDA is an in-memory mock implementation of the DA interface. +// It stores blobs in memory with configurable retention and max blob count. +// Safe for concurrent use. +type MockDA struct { + cfg MockDAConfig + + mu sync.RWMutex + blobs map[string]*storedBlob // keyed by hex(blobID) + order []string // insertion order for LRU eviction + height uint64 + subscribers []subscriber +} + +// NewMockDA creates a new mock DA with the given config. +func NewMockDA(cfg MockDAConfig) *MockDA { + return &MockDA{ + cfg: cfg, + blobs: make(map[string]*storedBlob), + } +} + +// Upload stores the blob in memory and notifies listeners. +func (m *MockDA) Upload(ctx context.Context, namespace []byte, data []byte) (UploadResult, error) { + if len(data) == 0 { + return UploadResult{}, ErrDataEmpty + } + + blobID := mockBlobID(data) + key := fmt.Sprintf("%x", blobID) + now := time.Now() + + var expiresAt time.Time + if m.cfg.Retention > 0 { + expiresAt = now.Add(m.cfg.Retention) + } + + m.mu.Lock() + + // Evict oldest if at capacity + if m.cfg.MaxBlobs > 0 && len(m.blobs) >= m.cfg.MaxBlobs { + m.evictOldestLocked() + } + + // Prune expired blobs opportunistically + if m.cfg.Retention > 0 { + m.pruneExpiredLocked(now) + } + + m.height++ + height := m.height + + m.blobs[key] = &storedBlob{ + namespace: namespace, + data: data, + height: height, + expiresAt: expiresAt, + createdAt: now, + } + m.order = append(m.order, key) + + // Notify subscribers (non-blocking) + event := BlobEvent{ + BlobID: blobID, + Height: height, + DataSize: uint64(len(data)), + } + for i := range m.subscribers { + if namespaceMatch(m.subscribers[i].namespace, namespace) { + select { + case m.subscribers[i].ch <- event: + default: + // Channel full, drop event. Subscriber is too slow. + } + } + } + + m.mu.Unlock() + + return UploadResult{ + BlobID: blobID, + ExpiresAt: expiresAt, + }, nil +} + +// Download retrieves a blob by ID. +func (m *MockDA) Download(ctx context.Context, blobID BlobID) ([]byte, error) { + key := fmt.Sprintf("%x", blobID) + + m.mu.RLock() + blob, ok := m.blobs[key] + m.mu.RUnlock() + + if !ok { + return nil, ErrBlobNotFound + } + + if !blob.expiresAt.IsZero() && time.Now().After(blob.expiresAt) { + return nil, ErrBlobNotFound + } + + return blob.data, nil +} + +// Listen returns a channel that receives events when blobs matching the +// namespace are uploaded. The channel is closed when ctx is cancelled. +func (m *MockDA) Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error) { + ch := make(chan BlobEvent, 64) + + m.mu.Lock() + idx := len(m.subscribers) + m.subscribers = append(m.subscribers, subscriber{ + namespace: namespace, + ch: ch, + }) + m.mu.Unlock() + + // Clean up when context is done. + go func() { + <-ctx.Done() + m.mu.Lock() + // Remove subscriber by swapping with last + last := len(m.subscribers) - 1 + if idx <= last { + m.subscribers[idx] = m.subscribers[last] + } + m.subscribers = m.subscribers[:last] + m.mu.Unlock() + close(ch) + }() + + return ch, nil +} + +// BlobCount returns the number of blobs currently stored. +func (m *MockDA) BlobCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.blobs) +} + +// evictOldestLocked removes the oldest blob. Caller must hold m.mu. +func (m *MockDA) evictOldestLocked() { + if len(m.order) == 0 { + return + } + key := m.order[0] + m.order = m.order[1:] + delete(m.blobs, key) +} + +// pruneExpiredLocked removes blobs past their retention. Caller must hold m.mu. +func (m *MockDA) pruneExpiredLocked(now time.Time) { + surviving := m.order[:0] + for _, key := range m.order { + blob, ok := m.blobs[key] + if !ok { + continue + } + if !blob.expiresAt.IsZero() && now.After(blob.expiresAt) { + delete(m.blobs, key) + } else { + surviving = append(surviving, key) + } + } + m.order = surviving +} + +// namespaceMatch returns true if the subscription namespace matches the blob namespace. +// An empty subscription namespace matches all namespaces (wildcard). +func namespaceMatch(subNS, blobNS []byte) bool { + if len(subNS) == 0 { + return true + } + if len(subNS) != len(blobNS) { + return false + } + for i := range subNS { + if subNS[i] != blobNS[i] { + return false + } + } + return true +} + +// mockBlobID produces a deterministic blob ID from the data. +// Format: 1 byte version (0) + 32 bytes SHA256 hash. +func mockBlobID(data []byte) BlobID { + hash := sha256.Sum256(data) + id := make([]byte, 33) + id[0] = 0 // version byte + copy(id[1:], hash[:]) + return id +} diff --git a/pkg/da/fibremock/mock_test.go b/pkg/da/fibremock/mock_test.go new file mode 100644 index 0000000000..e5aa62aed0 --- /dev/null +++ b/pkg/da/fibremock/mock_test.go @@ -0,0 +1,197 @@ +package fibremock + +import ( + "bytes" + "context" + "testing" + "time" +) + +func TestMockDA_UploadDownload(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx := context.Background() + + ns := []byte("test-ns") + data := []byte("hello fibre") + + result, err := m.Upload(ctx, ns, data) + if err != nil { + t.Fatal(err) + } + if len(result.BlobID) != 33 { + t.Fatalf("expected 33-byte blob ID, got %d", len(result.BlobID)) + } + + got, err := m.Download(ctx, result.BlobID) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, data) { + t.Fatalf("data mismatch: got %q, want %q", got, data) + } +} + +func TestMockDA_UploadEmpty(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + _, err := m.Upload(context.Background(), []byte("ns"), nil) + if err != ErrDataEmpty { + t.Fatalf("expected ErrDataEmpty, got %v", err) + } +} + +func TestMockDA_DownloadNotFound(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + _, err := m.Download(context.Background(), BlobID{0, 1, 2}) + if err != ErrBlobNotFound { + t.Fatalf("expected ErrBlobNotFound, got %v", err) + } +} + +func TestMockDA_Listen(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ns := []byte("test-ns") + ch, err := m.Listen(ctx, ns) + if err != nil { + t.Fatal(err) + } + + // Upload a blob — should trigger the listener + data := []byte("listened blob") + result, err := m.Upload(ctx, ns, data) + if err != nil { + t.Fatal(err) + } + + select { + case event := <-ch: + if !bytes.Equal(event.BlobID, result.BlobID) { + t.Fatal("blob ID mismatch in event") + } + if event.Height == 0 { + t.Fatal("expected non-zero height") + } + if event.DataSize != uint64(len(data)) { + t.Fatalf("expected data size %d, got %d", len(data), event.DataSize) + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } +} + +func TestMockDA_ListenNamespaceFilter(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, err := m.Listen(ctx, []byte("ns-A")) + if err != nil { + t.Fatal(err) + } + + // Upload to different namespace — should NOT trigger + m.Upload(ctx, []byte("ns-B"), []byte("wrong namespace")) + + select { + case <-ch: + t.Fatal("should not receive event for different namespace") + case <-time.After(50 * time.Millisecond): + // good + } +} + +func TestMockDA_ListenWildcard(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Empty namespace = wildcard + ch, err := m.Listen(ctx, nil) + if err != nil { + t.Fatal(err) + } + + m.Upload(ctx, []byte("any-ns"), []byte("wildcard test")) + + select { + case event := <-ch: + if event.Height == 0 { + t.Fatal("expected event") + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for wildcard event") + } +} + +func TestMockDA_MaxBlobsEviction(t *testing.T) { + m := NewMockDA(MockDAConfig{MaxBlobs: 3}) + ctx := context.Background() + + var ids []BlobID + for i := range 5 { + r, err := m.Upload(ctx, nil, []byte{byte(i), 1, 2, 3}) + if err != nil { + t.Fatal(err) + } + ids = append(ids, r.BlobID) + } + + // First two should be evicted + if _, err := m.Download(ctx, ids[0]); err != ErrBlobNotFound { + t.Fatal("expected first blob to be evicted") + } + if _, err := m.Download(ctx, ids[1]); err != ErrBlobNotFound { + t.Fatal("expected second blob to be evicted") + } + + // Last three should still be there + for i := 2; i < 5; i++ { + if _, err := m.Download(ctx, ids[i]); err != nil { + t.Fatalf("blob %d should exist: %v", i, err) + } + } + + if m.BlobCount() != 3 { + t.Fatalf("expected 3 blobs, got %d", m.BlobCount()) + } +} + +func TestMockDA_Retention(t *testing.T) { + m := NewMockDA(MockDAConfig{Retention: 50 * time.Millisecond}) + ctx := context.Background() + + r, err := m.Upload(ctx, nil, []byte("ephemeral")) + if err != nil { + t.Fatal(err) + } + + // Should exist immediately + if _, err := m.Download(ctx, r.BlobID); err != nil { + t.Fatal("blob should exist immediately") + } + + // Wait for expiry + time.Sleep(100 * time.Millisecond) + + if _, err := m.Download(ctx, r.BlobID); err != ErrBlobNotFound { + t.Fatal("blob should have expired") + } +} + +func TestMockDA_DeterministicBlobID(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx := context.Background() + + data := []byte("deterministic") + r1, _ := m.Upload(ctx, nil, data) + r2, _ := m.Upload(ctx, nil, data) + + if !bytes.Equal(r1.BlobID, r2.BlobID) { + t.Fatal("same data should produce same blob ID") + } +} + +// Verify MockDA satisfies the DA interface at compile time. +var _ DA = (*MockDA)(nil)