Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
444 changes: 444 additions & 0 deletions block/internal/da/fiber_client.go

Large diffs are not rendered by default.

604 changes: 604 additions & 0 deletions block/internal/da/fiber_client_test.go

Large diffs are not rendered by default.

79 changes: 79 additions & 0 deletions block/internal/da/fibremock/fibre.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Package fibre provides a Go client interface and mock implementation for the
// Fibre DA (Data Availability) gRPC service.
//
// # Design Assumptions
//
// - The sequencer trusts the encoder to eventually confirm blob inclusion.
// Upload returns after the blob is uploaded and the PFF transaction is
// broadcast, NOT after on-chain confirmation. This keeps the sequencer's
// write path fast (~2s per 128 MB blob).
//
// - Callers are expected to batch/buffer their data into blobs sized for the
// protocol maximum (128 MiB - 5 byte header = 134,217,723 bytes).
// The interface accepts arbitrary sizes but the implementation may batch
// or reject oversized blobs.
//
// - Confirmation/finality is intentionally omitted from the initial API.
// The sequencer does not need it; the read path (Listen + Download) is
// sufficient for full nodes. A Status or Confirm RPC can be added later
// if needed without breaking existing callers.
//
// - Blob ordering is encoded in the blob data itself by the caller.
// The interface does not impose or guarantee ordering.
//
// - The interface is the same whether the encoder runs in-process or as an
// external gRPC service. For in-process use, call the mock or real
// implementation directly; for external use, connect via gRPC.
package fibremock

import (
"context"
"time"
)

// BlobID uniquely identifies an uploaded blob (version byte + 32-byte commitment).
type BlobID []byte

// UploadResult is returned by Upload after the blob is accepted.
type UploadResult struct {
// BlobID uniquely identifies the uploaded blob.
BlobID BlobID
// ExpiresAt is when the blob will be pruned from the DA network.
// Consumers must download before this time.
ExpiresAt time.Time
}

// BlobEvent is delivered via Listen when a blob is confirmed on-chain.
type BlobEvent struct {
// BlobID of the confirmed blob.
BlobID BlobID
// Height is the chain height at which the blob was confirmed.
Height uint64
// DataSize is the size of the original blob data in bytes (from the PFF).
// This allows full nodes to know the size before downloading.
DataSize uint64
}

// DA is the interface for interacting with the Fibre data availability layer.
//
// Implementations include:
// - MockDA: in-memory mock for testing
// - (future) gRPC client wrapping the Fibre service
// - (future) in-process encoder using fibre.Client directly
type DA interface {
// Upload submits a blob under the given namespace to the DA network.
// Returns after the blob is uploaded and the payment transaction is broadcast.
// Does NOT wait for on-chain confirmation (see package doc for rationale).
//
// The caller is responsible for batching data to the target blob size.
Upload(ctx context.Context, namespace []byte, data []byte) (UploadResult, error)

// Download retrieves and reconstructs a blob by its ID.
// Returns the original data that was passed to Upload.
Download(ctx context.Context, blobID BlobID) ([]byte, error)

// Listen streams confirmed blob events for the given namespace.
// The returned channel is closed when the context is cancelled.
// Each event includes the blob ID, confirmation height, and data size.
Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error)
}
246 changes: 246 additions & 0 deletions block/internal/da/fibremock/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package fibremock

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"sync"
"time"
)

var (
// ErrBlobNotFound is returned when a blob ID is not in the store.
ErrBlobNotFound = errors.New("blob not found")
// ErrDataEmpty is returned when Upload is called with empty data.
ErrDataEmpty = errors.New("data cannot be empty")
)

// MockDAConfig configures the mock DA implementation.
type MockDAConfig struct {
// MaxBlobs is the maximum number of blobs stored in memory.
// When exceeded, the oldest blob is evicted regardless of retention.
// 0 means no limit (use with caution — large blobs will OOM).
MaxBlobs int
// Retention is how long blobs are kept before automatic pruning.
// 0 means blobs are kept until evicted by MaxBlobs.
Retention time.Duration
}

// DefaultMockDAConfig returns a config suitable for testing:
// 100 blobs max, 10 minute retention.
func DefaultMockDAConfig() MockDAConfig {
return MockDAConfig{
MaxBlobs: 100,
Retention: 10 * time.Minute,
}
}

// storedBlob holds a blob and its metadata in the mock store.
type storedBlob struct {
namespace []byte
data []byte
height uint64
expiresAt time.Time
createdAt time.Time
}

// subscriber tracks a Listen subscription.
type subscriber struct {
namespace []byte
ch chan BlobEvent
}

// MockDA is an in-memory mock implementation of the DA interface.
// It stores blobs in memory with configurable retention and max blob count.
// Safe for concurrent use.
type MockDA struct {
cfg MockDAConfig

mu sync.RWMutex
blobs map[string]*storedBlob // keyed by hex(blobID)
order []string // insertion order for LRU eviction
height uint64
subscribers []subscriber
}

// NewMockDA creates a new mock DA with the given config.
func NewMockDA(cfg MockDAConfig) *MockDA {
return &MockDA{
cfg: cfg,
blobs: make(map[string]*storedBlob),
}
}

// Upload stores the blob in memory and notifies listeners.
func (m *MockDA) Upload(ctx context.Context, namespace []byte, data []byte) (UploadResult, error) {
if len(data) == 0 {
return UploadResult{}, ErrDataEmpty
}

blobID := mockBlobID(data)
key := fmt.Sprintf("%x", blobID)
now := time.Now()

var expiresAt time.Time
if m.cfg.Retention > 0 {
expiresAt = now.Add(m.cfg.Retention)
}

m.mu.Lock()

// Evict oldest if at capacity
if m.cfg.MaxBlobs > 0 && len(m.blobs) >= m.cfg.MaxBlobs {
m.evictOldestLocked()
}

// Prune expired blobs opportunistically
if m.cfg.Retention > 0 {
m.pruneExpiredLocked(now)
}

m.height++
height := m.height

m.blobs[key] = &storedBlob{
namespace: namespace,
data: data,
height: height,
expiresAt: expiresAt,
createdAt: now,
}
m.order = append(m.order, key)

// Notify subscribers (non-blocking)
event := BlobEvent{
BlobID: blobID,
Height: height,
DataSize: uint64(len(data)),
}
for i := range m.subscribers {
if namespaceMatch(m.subscribers[i].namespace, namespace) {
select {
case m.subscribers[i].ch <- event:
default:
// Channel full, drop event. Subscriber is too slow.
}
}
}

m.mu.Unlock()

return UploadResult{
BlobID: blobID,
ExpiresAt: expiresAt,
}, nil
}

// Download retrieves a blob by ID.
func (m *MockDA) Download(ctx context.Context, blobID BlobID) ([]byte, error) {
key := fmt.Sprintf("%x", blobID)

m.mu.RLock()
blob, ok := m.blobs[key]
m.mu.RUnlock()

if !ok {
return nil, ErrBlobNotFound
}

if !blob.expiresAt.IsZero() && time.Now().After(blob.expiresAt) {
return nil, ErrBlobNotFound
}

return blob.data, nil
}

// Listen returns a channel that receives events when blobs matching the
// namespace are uploaded. The channel is closed when ctx is cancelled.
func (m *MockDA) Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error) {
ch := make(chan BlobEvent, 64)

m.mu.Lock()
idx := len(m.subscribers)
m.subscribers = append(m.subscribers, subscriber{
namespace: namespace,
ch: ch,
})
m.mu.Unlock()

// Clean up when context is done.
go func() {
<-ctx.Done()
m.mu.Lock()
// Remove subscriber by swapping with last
last := len(m.subscribers) - 1
if idx <= last {
m.subscribers[idx] = m.subscribers[last]
}
m.subscribers = m.subscribers[:last]
m.mu.Unlock()
close(ch)
}()

return ch, nil
}

// BlobCount returns the number of blobs currently stored.
func (m *MockDA) BlobCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.blobs)
}

// evictOldestLocked removes the oldest blob. Caller must hold m.mu.
func (m *MockDA) evictOldestLocked() {
if len(m.order) == 0 {
return
}
key := m.order[0]
m.order = m.order[1:]
delete(m.blobs, key)
}

// pruneExpiredLocked removes blobs past their retention. Caller must hold m.mu.
func (m *MockDA) pruneExpiredLocked(now time.Time) {
surviving := m.order[:0]
for _, key := range m.order {
blob, ok := m.blobs[key]
if !ok {
continue
}
if !blob.expiresAt.IsZero() && now.After(blob.expiresAt) {
delete(m.blobs, key)
} else {
surviving = append(surviving, key)
}
}
m.order = surviving
}

// namespaceMatch returns true if the subscription namespace matches the blob namespace.
// An empty subscription namespace matches all namespaces (wildcard).
func namespaceMatch(subNS, blobNS []byte) bool {
if len(subNS) == 0 {
return true
}
if len(subNS) != len(blobNS) {
return false
}
for i := range subNS {
if subNS[i] != blobNS[i] {
return false
}
}
return true
}

// mockBlobID produces a deterministic blob ID from the data.
// Format: 1 byte version (0) + 32 bytes SHA256 hash.
func mockBlobID(data []byte) BlobID {
hash := sha256.Sum256(data)
id := make([]byte, 33)
id[0] = 0 // version byte
copy(id[1:], hash[:])
return id
}
Loading
Loading