From 19ea929c1f6c7e072d258811e54cd5ecd8506ae6 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 13 Apr 2026 13:54:35 +0200 Subject: [PATCH 1/6] feat(da): support fiber (not via c-node) --- block/internal/da/fiber_client.go | 446 +++++++++++++++++++++ block/internal/da/fiber_client_test.go | 518 +++++++++++++++++++++++++ block/public.go | 22 ++ pkg/config/config.go | 54 +++ pkg/config/config_test.go | 14 +- pkg/config/defaults.go | 7 + 6 files changed, 1060 insertions(+), 1 deletion(-) create mode 100644 block/internal/da/fiber_client.go create mode 100644 block/internal/da/fiber_client_test.go diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go new file mode 100644 index 0000000000..a64047eb44 --- /dev/null +++ b/block/internal/da/fiber_client.go @@ -0,0 +1,446 @@ +package da + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + "time" + + "github.com/rs/zerolog" + + "github.com/evstack/ev-node/block/internal/common" + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +// FiberUploadResult contains the result of a Fiber upload operation. +type FiberUploadResult struct { + // BlobID is the Fiber blob identifier (typically 33 bytes: 1 version + 32 commitment). + BlobID []byte + // Height is the validator set height at which the blob was uploaded. + Height uint64 + // Promise is the serialized signed payment promise from validators, + // which serves as proof of data availability. + Promise []byte +} + +// FiberClient defines the interface for a Fiber protocol client backend. +// Implementations wrap the celestia-app fibre.Client or equivalent. +type FiberClient interface { + // Upload uploads data to the Fiber network under the given namespace. + // The namespace must be a valid share.Namespace (29 bytes). + Upload(ctx context.Context, namespace, data []byte) (FiberUploadResult, error) + + // Download downloads and reconstructs data from the Fiber network. + // blobID is the Fiber blob identifier returned by Upload. + // height is the validator set height (0 to use the current head). + Download(ctx context.Context, blobID []byte, height uint64) ([]byte, error) + + // GetLatestHeight returns the latest block height from the Fiber network. + GetLatestHeight(ctx context.Context) (uint64, error) +} + +// FiberConfig holds configuration for the Fiber DA client. +type FiberConfig struct { + // Client is the Fiber protocol client backend. + Client FiberClient + // Logger is the structured logger. + Logger zerolog.Logger + // DefaultTimeout is the default timeout for operations. + DefaultTimeout time.Duration + // Namespace is the header namespace string. + Namespace string + // DataNamespace is the data namespace string. + DataNamespace string + // ForcedInclusionNamespace is the forced inclusion namespace string. + ForcedInclusionNamespace string +} + +// fiberDAClient adapts a FiberClient to the ev-node FullClient interface. +// It bridges the Fiber push/pull model to the block-based DA interface by +// maintaining a local index of submitted blobs for height-based retrieval. +type fiberDAClient struct { + fiber FiberClient + logger zerolog.Logger + defaultTimeout time.Duration + namespaceBz []byte + dataNamespaceBz []byte + forcedNamespaceBz []byte + hasForcedNamespace bool + + mu sync.RWMutex + index map[uint64][]fiberIndexedBlob +} + +type fiberIndexedBlob struct { + id datypes.ID + data []byte + promise []byte + blobID []byte +} + +var _ FullClient = (*fiberDAClient)(nil) + +// NewFiberClient creates a new Fiber DA client adapter. +// Returns nil if the Fiber client backend is not provided. +func NewFiberClient(cfg FiberConfig) FullClient { + if cfg.Client == nil { + return nil + } + if cfg.DefaultTimeout == 0 { + cfg.DefaultTimeout = 60 * time.Second + } + + hasForced := cfg.ForcedInclusionNamespace != "" + var forcedBz []byte + if hasForced { + forcedBz = datypes.NamespaceFromString(cfg.ForcedInclusionNamespace).Bytes() + } + + return &fiberDAClient{ + fiber: cfg.Client, + logger: cfg.Logger.With().Str("component", "fiber_da_client").Logger(), + defaultTimeout: cfg.DefaultTimeout, + namespaceBz: datypes.NamespaceFromString(cfg.Namespace).Bytes(), + dataNamespaceBz: datypes.NamespaceFromString(cfg.DataNamespace).Bytes(), + forcedNamespaceBz: forcedBz, + hasForcedNamespace: hasForced, + index: make(map[uint64][]fiberIndexedBlob), + } +} + +// makeFiberID constructs an ev-node DA ID from a Fiber height and blob ID. +// Format: 8 bytes LE height + blobID bytes (compatible with datypes.SplitID). +func makeFiberID(height uint64, blobID []byte) datypes.ID { + id := make([]byte, 8+len(blobID)) + binary.LittleEndian.PutUint64(id, height) + copy(id[8:], blobID) + return id +} + +// splitFiberID extracts the Fiber height and blob ID from an ev-node DA ID. +func splitFiberID(id datypes.ID) (uint64, []byte) { + if len(id) <= 8 { + return 0, nil + } + return binary.LittleEndian.Uint64(id[:8]), id[8:] +} + +// Submit uploads each data blob to the Fiber network. +// All blobs are uploaded individually, then indexed under a single canonical height +// (the height of the last upload) to satisfy the ev-node DA contract that all +// submitted blobs appear at the same height. +func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, namespace []byte, _ []byte) datypes.ResultSubmit { + var blobSize uint64 + for _, b := range data { + blobSize += uint64(len(b)) + } + + type uploadResult struct { + blobID []byte + height uint64 + promise []byte + data []byte + } + + uploaded := make([]uploadResult, 0, len(data)) + + for i, raw := range data { + if uint64(len(raw)) > common.DefaultMaxBlobSize { + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusTooBig, + Message: fmt.Sprintf("blob %d exceeds max size (%d > %d)", i, len(raw), common.DefaultMaxBlobSize), + }, + } + } + + uploadCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + result, err := c.fiber.Upload(uploadCtx, namespace, raw) + cancel() + if err != nil { + code := datypes.StatusError + switch { + case errors.Is(err, context.Canceled): + code = datypes.StatusContextCanceled + case errors.Is(err, context.DeadlineExceeded): + code = datypes.StatusContextDeadline + } + c.logger.Error().Err(err).Int("blob_index", i).Msg("fiber upload failed") + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: code, + Message: fmt.Sprintf("fiber upload failed for blob %d: %v", i, err), + SubmittedCount: uint64(len(uploaded)), + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } + } + + uploaded = append(uploaded, uploadResult{ + blobID: result.BlobID, + height: result.Height, + promise: result.Promise, + data: raw, + }) + } + + if len(uploaded) == 0 { + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } + } + + // Use the height of the last upload as the canonical submit height. + // Re-index all blobs under this single height so that Retrieve(height) + // returns all blobs from the same submit call. + submitHeight := uploaded[len(uploaded)-1].height + ids := make([]datypes.ID, len(uploaded)) + + c.mu.Lock() + for i, u := range uploaded { + id := makeFiberID(submitHeight, u.blobID) + ids[i] = id + c.index[submitHeight] = append(c.index[submitHeight], fiberIndexedBlob{ + id: id, + data: u.data, + promise: u.promise, + blobID: u.blobID, + }) + } + c.mu.Unlock() + + c.logger.Debug().Int("num_ids", len(ids)).Uint64("height", submitHeight).Msg("fiber DA submission successful") + + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + IDs: ids, + SubmittedCount: uint64(len(ids)), + Height: submitHeight, + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } +} + +// Retrieve retrieves blobs from the Fiber network at the specified height and namespace. +// It first checks the local submission index, then falls back to downloading via blob IDs. +func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + return c.retrieve(ctx, height, namespace, true) +} + +// RetrieveBlobs retrieves blobs without blocking on timestamp resolution. +func (c *fiberDAClient) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + return c.retrieve(ctx, height, namespace, false) +} + +func (c *fiberDAClient) retrieve(_ context.Context, height uint64, _ []byte, _ bool) datypes.ResultRetrieve { + c.mu.RLock() + blobs, ok := c.index[height] + c.mu.RUnlock() + + if !ok || len(blobs) == 0 { + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusNotFound, + Message: "no blobs found at height in fiber index", + Height: height, + Timestamp: time.Now(), + }, + } + } + + ids := make([]datypes.ID, len(blobs)) + data := make([]datypes.Blob, len(blobs)) + for i, b := range blobs { + ids[i] = b.id + data[i] = b.data + } + + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + Height: height, + IDs: ids, + Timestamp: time.Now(), + }, + Data: data, + } +} + +// Get downloads specific blobs by their IDs from the Fiber network. +// Each ID is decoded to extract the Fiber blob ID and height, +// then downloaded via the Fiber client. +func (c *fiberDAClient) Get(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { + if len(ids) == 0 { + return nil, nil + } + + res := make([]datypes.Blob, 0, len(ids)) + for _, id := range ids { + height, blobID := splitFiberID(id) + if blobID == nil { + return nil, fmt.Errorf("invalid fiber blob id: %x", id) + } + + downloadCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + data, err := c.fiber.Download(downloadCtx, blobID, height) + cancel() + if err != nil { + return nil, fmt.Errorf("fiber download failed for blob %x: %w", blobID, err) + } + res = append(res, data) + } + + return res, nil +} + +// Subscribe returns a channel that emits SubscriptionEvents for new DA heights. +// Since the Fiber protocol doesn't have a native subscription mechanism, this +// implementation polls GetLatestHeight and emits events for heights present in +// the local submission index. Only heights indexed by this client are emitted. +func (c *fiberDAClient) Subscribe(ctx context.Context, _ []byte, _ bool) (<-chan datypes.SubscriptionEvent, error) { + out := make(chan datypes.SubscriptionEvent, 16) + + go func() { + defer close(out) + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + var lastHeight uint64 + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + heightCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + height, err := c.fiber.GetLatestHeight(heightCtx) + cancel() + if err != nil { + c.logger.Error().Err(err).Msg("failed to get latest fiber height during subscribe") + continue + } + if height <= lastHeight { + continue + } + + for h := lastHeight + 1; h <= height; h++ { + c.mu.RLock() + blobs, ok := c.index[h] + c.mu.RUnlock() + if !ok || len(blobs) == 0 { + continue + } + + blobData := make([][]byte, len(blobs)) + for i, b := range blobs { + blobData[i] = b.data + } + + select { + case out <- datypes.SubscriptionEvent{ + Height: h, + Timestamp: time.Now(), + Blobs: blobData, + }: + case <-ctx.Done(): + return + } + } + lastHeight = height + } + } + }() + + return out, nil +} + +// GetLatestDAHeight returns the latest block height from the Fiber network. +func (c *fiberDAClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { + heightCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + defer cancel() + + height, err := c.fiber.GetLatestHeight(heightCtx) + if err != nil { + return 0, fmt.Errorf("failed to get latest fiber height: %w", err) + } + return height, nil +} + +// GetProofs returns the serialized payment promises as proofs for the given IDs. +// In the Fiber protocol, the signed payment promise from validators serves as +// proof of data availability. +func (c *fiberDAClient) GetProofs(_ context.Context, ids []datypes.ID, _ []byte) ([]datypes.Proof, error) { + if len(ids) == 0 { + return []datypes.Proof{}, nil + } + + proofs := make([]datypes.Proof, len(ids)) + for i, id := range ids { + height, _ := splitFiberID(id) + + c.mu.RLock() + blobs := c.index[height] + c.mu.RUnlock() + + for _, b := range blobs { + if string(b.id) == string(id) { + proofs[i] = b.promise + break + } + } + } + + return proofs, nil +} + +// Validate verifies that the proofs (payment promises) correspond to the given IDs. +// It checks that each proof was stored for the matching blob during submission. +func (c *fiberDAClient) Validate(_ context.Context, ids []datypes.ID, proofs []datypes.Proof, _ []byte) ([]bool, error) { + if len(ids) != len(proofs) { + return nil, errors.New("number of IDs and proofs must match") + } + if len(ids) == 0 { + return []bool{}, nil + } + + results := make([]bool, len(ids)) + for i, id := range ids { + height, _ := splitFiberID(id) + + c.mu.RLock() + blobs := c.index[height] + c.mu.RUnlock() + + for _, b := range blobs { + if string(b.id) == string(id) { + // A non-empty promise proof that matches the stored promise is valid. + results[i] = len(proofs[i]) > 0 && string(proofs[i]) == string(b.promise) + break + } + } + } + + return results, nil +} + +// GetHeaderNamespace returns the header namespace bytes. +func (c *fiberDAClient) GetHeaderNamespace() []byte { return c.namespaceBz } + +// GetDataNamespace returns the data namespace bytes. +func (c *fiberDAClient) GetDataNamespace() []byte { return c.dataNamespaceBz } + +// GetForcedInclusionNamespace returns the forced inclusion namespace bytes. +func (c *fiberDAClient) GetForcedInclusionNamespace() []byte { return c.forcedNamespaceBz } + +// HasForcedInclusionNamespace reports whether forced inclusion namespace is configured. +func (c *fiberDAClient) HasForcedInclusionNamespace() bool { return c.hasForcedNamespace } diff --git a/block/internal/da/fiber_client_test.go b/block/internal/da/fiber_client_test.go new file mode 100644 index 0000000000..d119b8a931 --- /dev/null +++ b/block/internal/da/fiber_client_test.go @@ -0,0 +1,518 @@ +package da + +import ( + "context" + "crypto/sha256" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +// mockFiberClient is a test mock for the FiberClient interface. +type mockFiberClient struct { + mu sync.Mutex + uploads map[string][]byte // blobID hex -> data + height uint64 + uploadErr error + callCount atomic.Uint64 +} + +func newMockFiberClient() *mockFiberClient { + return &mockFiberClient{ + uploads: make(map[string][]byte), + height: 100, + } +} + +func (m *mockFiberClient) Upload(_ context.Context, namespace, data []byte) (FiberUploadResult, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.uploadErr != nil { + return FiberUploadResult{}, m.uploadErr + } + + m.height++ + callIdx := m.callCount.Add(1) + + // Generate a unique blob ID for each upload + blobID := make([]byte, 33) + blobID[0] = 0 // version 0 + hash := sha256.Sum256(append([]byte{byte(callIdx)}, data...)) + copy(blobID[1:], hash[:]) + + m.uploads[string(blobID)] = data + + return FiberUploadResult{ + BlobID: blobID, + Height: m.height, + Promise: []byte("signed-promise-" + string(blobID)), + }, nil +} + +func (m *mockFiberClient) Download(_ context.Context, blobID []byte, _ uint64) ([]byte, error) { + m.mu.Lock() + defer m.mu.Unlock() + + data, ok := m.uploads[string(blobID)] + if !ok { + return nil, errors.New("blob not found") + } + return data, nil +} + +func (m *mockFiberClient) GetLatestHeight(_ context.Context) (uint64, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.height, nil +} + +func makeTestFiberClient(t *testing.T) (*mockFiberClient, FullClient) { + t.Helper() + mock := newMockFiberClient() + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + require.NotNil(t, cl) + return mock, cl +} + +func TestFiberClient_NewClient_Nil(t *testing.T) { + cl := NewFiberClient(FiberConfig{Client: nil}) + assert.Nil(t, cl) +} + +func TestFiberClient_Submit_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("hello"), []byte("world")}, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Len(t, res.IDs, 2) + require.Equal(t, uint64(2), res.SubmittedCount) + require.Greater(t, res.Height, uint64(0)) + require.Equal(t, uint64(10), res.BlobSize) +} + +func TestFiberClient_Submit_SingleBlob(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("single")}, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Len(t, res.IDs, 1) + require.Equal(t, uint64(6), res.BlobSize) +} + +func TestFiberClient_Submit_EmptyData(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{}, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Empty(t, res.IDs) + require.Equal(t, uint64(0), res.SubmittedCount) +} + +func TestFiberClient_Submit_UploadError(t *testing.T) { + mock := newMockFiberClient() + mock.uploadErr = errors.New("upload failed") + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + + require.Equal(t, datypes.StatusError, res.Code) + require.Contains(t, res.Message, "fiber upload failed") +} + +func TestFiberClient_Submit_CanceledContext(t *testing.T) { + mock := newMockFiberClient() + mock.uploadErr = context.Canceled + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + + require.Equal(t, datypes.StatusContextCanceled, res.Code) +} + +func TestFiberClient_Submit_DeadlineExceeded(t *testing.T) { + mock := newMockFiberClient() + mock.uploadErr = context.DeadlineExceeded + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + + require.Equal(t, datypes.StatusContextDeadline, res.Code) +} + +func TestFiberClient_Submit_BlobTooLarge(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + largeBlob := make([]byte, 6*1024*1024) // 6MB > 5MB default max + res := cl.Submit(context.Background(), [][]byte{largeBlob}, 0, ns, nil) + + require.Equal(t, datypes.StatusTooBig, res.Code) +} + +func TestFiberClient_Retrieve_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("hello")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + retrieveRes := cl.Retrieve(context.Background(), submitRes.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 1) + require.Equal(t, []byte("hello"), retrieveRes.Data[0]) + require.Equal(t, submitRes.IDs, retrieveRes.IDs) +} + +func TestFiberClient_RetrieveBlobs_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("blob1"), []byte("blob2")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + retrieveRes := cl.RetrieveBlobs(context.Background(), submitRes.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 2) + require.Equal(t, []byte("blob1"), retrieveRes.Data[0]) + require.Equal(t, []byte("blob2"), retrieveRes.Data[1]) +} + +func TestFiberClient_Retrieve_NotFound(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + retrieveRes := cl.Retrieve(context.Background(), 9999, ns) + require.Equal(t, datypes.StatusNotFound, retrieveRes.Code) +} + +func TestFiberClient_Get_Success(t *testing.T) { + mock, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("getme")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + require.Len(t, submitRes.IDs, 1) + + blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Len(t, blobs, 1) + require.Equal(t, []byte("getme"), blobs[0]) + + _ = mock // mock stores the data for download +} + +func TestFiberClient_Get_EmptyIDs(t *testing.T) { + _, cl := makeTestFiberClient(t) + + blobs, err := cl.Get(context.Background(), nil, nil) + require.NoError(t, err) + require.Nil(t, blobs) +} + +func TestFiberClient_Get_InvalidID(t *testing.T) { + _, cl := makeTestFiberClient(t) + + _, err := cl.Get(context.Background(), []datypes.ID{[]byte{0x01}}, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid fiber blob id") +} + +func TestFiberClient_Get_DownloadError(t *testing.T) { + _, cl := makeTestFiberClient(t) + + // Construct a valid-looking ID but with a blob ID that doesn't exist + fakeBlobID := make([]byte, 33) + id := makeFiberID(1, fakeBlobID) + + _, err := cl.Get(context.Background(), []datypes.ID{id}, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "fiber download failed") +} + +func TestFiberClient_GetLatestDAHeight(t *testing.T) { + mock, cl := makeTestFiberClient(t) + + height, err := cl.GetLatestDAHeight(context.Background()) + require.NoError(t, err) + require.Equal(t, mock.height, height) +} + +func TestFiberClient_GetProofs_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("prooftest")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Len(t, proofs, 1) + require.NotEmpty(t, proofs[0]) // Should contain the promise +} + +func TestFiberClient_GetProofs_Empty(t *testing.T) { + _, cl := makeTestFiberClient(t) + + proofs, err := cl.GetProofs(context.Background(), nil, nil) + require.NoError(t, err) + require.Empty(t, proofs) +} + +func TestFiberClient_Validate_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("validateme")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + + results, err := cl.Validate(context.Background(), submitRes.IDs, proofs, ns) + require.NoError(t, err) + require.Len(t, results, 1) + require.True(t, results[0]) +} + +func TestFiberClient_Validate_MismatchedLengths(t *testing.T) { + _, cl := makeTestFiberClient(t) + + _, err := cl.Validate(context.Background(), make([]datypes.ID, 3), make([]datypes.Proof, 2), nil) + require.Error(t, err) + require.Contains(t, err.Error(), "must match") +} + +func TestFiberClient_Validate_Empty(t *testing.T) { + _, cl := makeTestFiberClient(t) + + results, err := cl.Validate(context.Background(), nil, nil, nil) + require.NoError(t, err) + require.Empty(t, results) +} + +func TestFiberClient_Validate_WrongProof(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("validatewrong")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + // Use a wrong proof + fakeProofs := []datypes.Proof{[]byte("wrong-proof")} + results, err := cl.Validate(context.Background(), submitRes.IDs, fakeProofs, ns) + require.NoError(t, err) + require.Len(t, results, 1) + require.False(t, results[0]) // Wrong proof should fail validation +} + +func TestFiberClient_Validate_EmptyProof(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + emptyProofs := []datypes.Proof{[]byte{}} + results, err := cl.Validate(context.Background(), submitRes.IDs, emptyProofs, ns) + require.NoError(t, err) + require.False(t, results[0]) +} + +func TestFiberClient_Namespaces(t *testing.T) { + cl := NewFiberClient(FiberConfig{ + Client: newMockFiberClient(), + Logger: zerolog.Nop(), + Namespace: "header-ns", + DataNamespace: "data-ns", + ForcedInclusionNamespace: "forced-ns", + }) + require.NotNil(t, cl) + + require.Equal(t, datypes.NamespaceFromString("header-ns").Bytes(), cl.GetHeaderNamespace()) + require.Equal(t, datypes.NamespaceFromString("data-ns").Bytes(), cl.GetDataNamespace()) + require.Equal(t, datypes.NamespaceFromString("forced-ns").Bytes(), cl.GetForcedInclusionNamespace()) + require.True(t, cl.HasForcedInclusionNamespace()) +} + +func TestFiberClient_NoForcedNamespace(t *testing.T) { + cl := NewFiberClient(FiberConfig{ + Client: newMockFiberClient(), + Logger: zerolog.Nop(), + Namespace: "header-ns", + DataNamespace: "data-ns", + }) + require.NotNil(t, cl) + + require.Nil(t, cl.GetForcedInclusionNamespace()) + require.False(t, cl.HasForcedInclusionNamespace()) +} + +func TestFiberClient_Subscribe(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, err := cl.Subscribe(ctx, nil, false) + require.NoError(t, err) + require.NotNil(t, ch) + + // Submit a blob so the index has something + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("sub-data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + // The subscribe goroutine polls and should emit the event for the submitted height. + // Since the mock height starts at 100 and upload increments to 101, + // the subscribe loop should eventually pick it up. + select { + case ev := <-ch: + require.Equal(t, submitRes.Height, ev.Height) + require.Len(t, ev.Blobs, 1) + require.Equal(t, []byte("sub-data"), ev.Blobs[0]) + case <-time.After(5 * time.Second): + t.Fatal("subscribe did not emit event within timeout") + } +} + +func TestFiberClient_Submit_MultipleBlobs(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + data := [][]byte{[]byte("first"), []byte("second"), []byte("third")} + res := cl.Submit(context.Background(), data, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Len(t, res.IDs, 3) + require.Equal(t, uint64(3), res.SubmittedCount) + + // Verify all blobs can be retrieved + retrieveRes := cl.Retrieve(context.Background(), res.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 3) + require.Equal(t, []byte("first"), retrieveRes.Data[0]) + require.Equal(t, []byte("second"), retrieveRes.Data[1]) + require.Equal(t, []byte("third"), retrieveRes.Data[2]) +} + +func TestFiberClient_SubmitAndDownload(t *testing.T) { + mock, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + data := []byte("download-test") + submitRes := cl.Submit(context.Background(), [][]byte{data}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + // The mock stores the data, so Get should be able to download it + blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Len(t, blobs, 1) + require.Equal(t, data, blobs[0]) + + _ = mock +} + +func TestMakeFiberID_RoundTrip(t *testing.T) { + blobID := make([]byte, 33) + blobID[0] = 1 + for i := 1; i < 33; i++ { + blobID[i] = byte(i) + } + + id := makeFiberID(42, blobID) + height, extractedBlobID := splitFiberID(id) + + require.Equal(t, uint64(42), height) + require.Equal(t, blobID, extractedBlobID) +} + +func TestSplitFiberID_Invalid(t *testing.T) { + height, blobID := splitFiberID([]byte{0x01, 0x02}) + require.Equal(t, uint64(0), height) + require.Nil(t, blobID) +} + +func TestFiberClient_DefaultTimeout(t *testing.T) { + cl := NewFiberClient(FiberConfig{ + Client: newMockFiberClient(), + Logger: zerolog.Nop(), + Namespace: "ns", + DataNamespace: "ns", + }) + require.NotNil(t, cl) + fc := cl.(*fiberDAClient) + require.Equal(t, 60*time.Second, fc.defaultTimeout) +} + +func TestFiberClient_FullSubmitRetrieveCycle(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + + // Submit + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("cycle-data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + require.Len(t, submitRes.IDs, 1) + submittedHeight := submitRes.Height + + // Retrieve + retrieveRes := cl.Retrieve(context.Background(), submittedHeight, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Equal(t, []byte("cycle-data"), retrieveRes.Data[0]) + + // Get + blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Equal(t, []byte("cycle-data"), blobs[0]) + + // GetProofs + Validate + proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.NotEmpty(t, proofs[0]) + + valid, err := cl.Validate(context.Background(), submitRes.IDs, proofs, ns) + require.NoError(t, err) + require.True(t, valid[0]) +} diff --git a/block/public.go b/block/public.go index cc7691c299..e8ae4d14c5 100644 --- a/block/public.go +++ b/block/public.go @@ -63,6 +63,28 @@ func NewDAClient( return base } +// NewFiberDAClient creates a new DA client backed by the Fiber protocol. +// The fiberClient parameter must implement the da.FiberClient interface. +// The returned client implements both DAClient and DAVerifier interfaces. +func NewFiberDAClient( + fiberClient da.FiberClient, + config config.Config, + logger zerolog.Logger, +) FullDAClient { + base := da.NewFiberClient(da.FiberConfig{ + Client: fiberClient, + Logger: logger, + DefaultTimeout: config.DA.RequestTimeout.Duration, + Namespace: config.DA.GetNamespace(), + DataNamespace: config.DA.GetDataNamespace(), + ForcedInclusionNamespace: config.DA.GetForcedInclusionNamespace(), + }) + if config.Instrumentation.IsTracingEnabled() { + return da.WithTracingClient(base) + } + return base +} + // Exported errors used by the sequencers var ( // ErrForceInclusionNotConfigured is returned when force inclusion is not configured. diff --git a/pkg/config/config.go b/pkg/config/config.go index 09e85f3e20..02c1338279 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -92,6 +92,21 @@ const ( // FlagDAStartHeight is a flag for forcing the DA retrieval height to start from a specific height FlagDAStartHeight = FlagPrefixEvnode + "da.start_height" + // Fiber DA configuration flags + + // FlagDAFiberEnabled enables the Fiber DA client instead of the default JSON-RPC blob client + FlagDAFiberEnabled = FlagPrefixEvnode + "da.fiber.enabled" + // FlagDAFiberStateAddress is the gRPC address of the celestia-app node for Fiber state queries + FlagDAFiberStateAddress = FlagPrefixEvnode + "da.fiber.state_address" + // FlagDAFiberKeyringPath is the path to the keyring directory for Fiber payment promise signing + FlagDAFiberKeyringPath = FlagPrefixEvnode + "da.fiber.keyring_path" + // FlagDAFiberKeyName is the key name in the keyring to use for signing payment promises + FlagDAFiberKeyName = FlagPrefixEvnode + "da.fiber.key_name" + // FlagDAFiberUploadConcurrency limits concurrent Fiber uploads to validators + FlagDAFiberUploadConcurrency = FlagPrefixEvnode + "da.fiber.upload_concurrency" + // FlagDAFiberDownloadConcurrency limits concurrent Fiber downloads from validators + FlagDAFiberDownloadConcurrency = FlagPrefixEvnode + "da.fiber.download_concurrency" + // P2P configuration flags // FlagP2PListenAddress is a flag for specifying the P2P listen address @@ -258,6 +273,37 @@ type DAConfig struct { BatchSizeThreshold float64 `mapstructure:"batch_size_threshold" yaml:"batch_size_threshold" comment:"Minimum blob size threshold (as fraction of max blob size, 0.0-1.0) before submitting. Only applies to 'size' and 'adaptive' strategies. Example: 0.8 means wait until batch is 80% full. Default: 0.8."` BatchMaxDelay DurationWrapper `mapstructure:"batch_max_delay" yaml:"batch_max_delay" comment:"Maximum time to wait before submitting a batch regardless of size. Applies to 'time' and 'adaptive' strategies. Lower values reduce latency but may increase costs. Examples: \"6s\", \"12s\", \"30s\". Default: DA BlockTime."` BatchMinItems uint64 `mapstructure:"batch_min_items" yaml:"batch_min_items" comment:"Minimum number of items (headers or data) to accumulate before considering submission. Helps avoid submitting single items when more are expected soon. Default: 1."` + + // Fiber DA client configuration + Fiber FiberDAConfig `mapstructure:"fiber" yaml:"fiber"` +} + +// FiberDAConfig contains configuration for the Fiber DA client. +// When Enabled is true, the Fiber client is used instead of the default +// JSON-RPC blob client for DA operations. +type FiberDAConfig struct { + // Enabled switches the DA backend from the default JSON-RPC blob client + // to the Fiber protocol client. + Enabled bool `mapstructure:"enabled" yaml:"enabled" comment:"Enable the Fiber DA client for direct validator communication instead of the default JSON-RPC blob client"` + // StateAddress is the gRPC address of the celestia-app node used for + // state queries (validator set, chain ID, promise verification). + StateAddress string `mapstructure:"state_address" yaml:"state_address" comment:"gRPC address of the celestia-app node for Fiber state queries (host:port)"` + // KeyringPath is the directory path containing the keyring for signing + // Fiber payment promises. + KeyringPath string `mapstructure:"keyring_path" yaml:"keyring_path" comment:"Path to the keyring directory for Fiber payment promise signing"` + // KeyName is the name of the key in the keyring to use for signing. + KeyName string `mapstructure:"key_name" yaml:"key_name" comment:"Name of the key in the keyring to use for signing Fiber payment promises"` + // UploadConcurrency limits the number of concurrent upload connections + // to validators. + UploadConcurrency int `mapstructure:"upload_concurrency" yaml:"upload_concurrency" comment:"Maximum number of concurrent upload connections to Fiber validators"` + // DownloadConcurrency limits the number of concurrent download connections + // from validators. + DownloadConcurrency int `mapstructure:"download_concurrency" yaml:"download_concurrency" comment:"Maximum number of concurrent download connections from Fiber validators"` +} + +// IsFiberEnabled returns true if the Fiber DA client is configured and enabled. +func (d *DAConfig) IsFiberEnabled() bool { + return d.Fiber.Enabled } // GetNamespace returns the namespace for header submissions. @@ -602,6 +648,14 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Uint64(FlagDAStartHeight, def.DA.StartHeight, "force DA retrieval to start from a specific height (0 for disabled)") cmd.Flags().MarkHidden(FlagDAStartHeight) + // Fiber DA configuration flags + cmd.Flags().Bool(FlagDAFiberEnabled, def.DA.Fiber.Enabled, "enable the Fiber DA client for direct validator communication") + cmd.Flags().String(FlagDAFiberStateAddress, def.DA.Fiber.StateAddress, "gRPC address of the celestia-app node for Fiber state queries (host:port)") + cmd.Flags().String(FlagDAFiberKeyringPath, def.DA.Fiber.KeyringPath, "path to the keyring directory for Fiber payment promise signing") + cmd.Flags().String(FlagDAFiberKeyName, def.DA.Fiber.KeyName, "name of the key in the keyring for signing Fiber payment promises") + cmd.Flags().Int(FlagDAFiberUploadConcurrency, def.DA.Fiber.UploadConcurrency, "maximum concurrent uploads to Fiber validators") + cmd.Flags().Int(FlagDAFiberDownloadConcurrency, def.DA.Fiber.DownloadConcurrency, "maximum concurrent downloads from Fiber validators") + // P2P configuration flags cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)") cmd.Flags().String(FlagP2PPeers, def.P2P.Peers, "Comma separated list of seed nodes to connect to") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index cf556803c2..9ad6447306 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -78,6 +78,18 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagDAMempoolTTL, DefaultConfig().DA.MempoolTTL) assertFlagValue(t, flags, FlagDAMaxSubmitAttempts, DefaultConfig().DA.MaxSubmitAttempts) assertFlagValue(t, flags, FlagDARequestTimeout, DefaultConfig().DA.RequestTimeout.Duration) + assertFlagValue(t, flags, FlagDABatchingStrategy, DefaultConfig().DA.BatchingStrategy) + assertFlagValue(t, flags, FlagDABatchSizeThreshold, DefaultConfig().DA.BatchSizeThreshold) + assertFlagValue(t, flags, FlagDABatchMaxDelay, DefaultConfig().DA.BatchMaxDelay.Duration) + assertFlagValue(t, flags, FlagDABatchMinItems, DefaultConfig().DA.BatchMinItems) + + // DA Fiber flags + assertFlagValue(t, flags, FlagDAFiberEnabled, DefaultConfig().DA.Fiber.Enabled) + assertFlagValue(t, flags, FlagDAFiberStateAddress, DefaultConfig().DA.Fiber.StateAddress) + assertFlagValue(t, flags, FlagDAFiberKeyringPath, DefaultConfig().DA.Fiber.KeyringPath) + assertFlagValue(t, flags, FlagDAFiberKeyName, DefaultConfig().DA.Fiber.KeyName) + assertFlagValue(t, flags, FlagDAFiberUploadConcurrency, DefaultConfig().DA.Fiber.UploadConcurrency) + assertFlagValue(t, flags, FlagDAFiberDownloadConcurrency, DefaultConfig().DA.Fiber.DownloadConcurrency) // P2P flags assertFlagValue(t, flags, FlagP2PListenAddress, DefaultConfig().P2P.ListenAddress) @@ -140,7 +152,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 78 // Update this number if you add more flag checks above + expectedFlagCount := 84 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 91fe68e3fc..2dd22b3e8a 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -83,6 +83,13 @@ func DefaultConfig() Config { BatchSizeThreshold: 0.8, BatchMaxDelay: DurationWrapper{0}, // 0 means use DA BlockTime BatchMinItems: 1, + Fiber: FiberDAConfig{ + Enabled: false, + StateAddress: "127.0.0.1:9090", + KeyName: "default-fibre", + UploadConcurrency: 100, + DownloadConcurrency: 34, + }, }, Instrumentation: DefaultInstrumentationConfig(), Log: LogConfig{ From ef44db26599c3dccee0b871cd3db10c1d51b58bb Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 14 Apr 2026 15:12:38 +0200 Subject: [PATCH 2/6] wip --- block/internal/da/fiber_client.go | 228 +++++++++++++------------ block/internal/da/fiber_client_test.go | 194 +++++++++++++++++---- 2 files changed, 282 insertions(+), 140 deletions(-) diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index a64047eb44..a903af43f4 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -14,52 +14,33 @@ import ( datypes "github.com/evstack/ev-node/pkg/da/types" ) -// FiberUploadResult contains the result of a Fiber upload operation. +const ( + fiberIndexRetainHeights = 4096 + fiberSubscribePollFreq = 1 * time.Second + fiberSubscribeChanSize = 16 +) + type FiberUploadResult struct { - // BlobID is the Fiber blob identifier (typically 33 bytes: 1 version + 32 commitment). - BlobID []byte - // Height is the validator set height at which the blob was uploaded. - Height uint64 - // Promise is the serialized signed payment promise from validators, - // which serves as proof of data availability. + BlobID []byte + Height uint64 Promise []byte } -// FiberClient defines the interface for a Fiber protocol client backend. -// Implementations wrap the celestia-app fibre.Client or equivalent. type FiberClient interface { - // Upload uploads data to the Fiber network under the given namespace. - // The namespace must be a valid share.Namespace (29 bytes). Upload(ctx context.Context, namespace, data []byte) (FiberUploadResult, error) - - // Download downloads and reconstructs data from the Fiber network. - // blobID is the Fiber blob identifier returned by Upload. - // height is the validator set height (0 to use the current head). Download(ctx context.Context, blobID []byte, height uint64) ([]byte, error) - - // GetLatestHeight returns the latest block height from the Fiber network. GetLatestHeight(ctx context.Context) (uint64, error) } -// FiberConfig holds configuration for the Fiber DA client. type FiberConfig struct { - // Client is the Fiber protocol client backend. - Client FiberClient - // Logger is the structured logger. - Logger zerolog.Logger - // DefaultTimeout is the default timeout for operations. - DefaultTimeout time.Duration - // Namespace is the header namespace string. - Namespace string - // DataNamespace is the data namespace string. - DataNamespace string - // ForcedInclusionNamespace is the forced inclusion namespace string. + Client FiberClient + Logger zerolog.Logger + DefaultTimeout time.Duration + Namespace string + DataNamespace string ForcedInclusionNamespace string } -// fiberDAClient adapts a FiberClient to the ev-node FullClient interface. -// It bridges the Fiber push/pull model to the block-based DA interface by -// maintaining a local index of submitted blobs for height-based retrieval. type fiberDAClient struct { fiber FiberClient logger zerolog.Logger @@ -69,21 +50,22 @@ type fiberDAClient struct { forcedNamespaceBz []byte hasForcedNamespace bool - mu sync.RWMutex - index map[uint64][]fiberIndexedBlob + mu sync.RWMutex + index map[uint64][]fiberIndexedBlob + indexTail uint64 + indexWindow uint64 } type fiberIndexedBlob struct { - id datypes.ID - data []byte - promise []byte - blobID []byte + id datypes.ID + namespace []byte + data []byte + promise []byte + blobID []byte } var _ FullClient = (*fiberDAClient)(nil) -// NewFiberClient creates a new Fiber DA client adapter. -// Returns nil if the Fiber client backend is not provided. func NewFiberClient(cfg FiberConfig) FullClient { if cfg.Client == nil { return nil @@ -107,11 +89,10 @@ func NewFiberClient(cfg FiberConfig) FullClient { forcedNamespaceBz: forcedBz, hasForcedNamespace: hasForced, index: make(map[uint64][]fiberIndexedBlob), + indexWindow: fiberIndexRetainHeights, } } -// makeFiberID constructs an ev-node DA ID from a Fiber height and blob ID. -// Format: 8 bytes LE height + blobID bytes (compatible with datypes.SplitID). func makeFiberID(height uint64, blobID []byte) datypes.ID { id := make([]byte, 8+len(blobID)) binary.LittleEndian.PutUint64(id, height) @@ -119,7 +100,6 @@ func makeFiberID(height uint64, blobID []byte) datypes.ID { return id } -// splitFiberID extracts the Fiber height and blob ID from an ev-node DA ID. func splitFiberID(id datypes.ID) (uint64, []byte) { if len(id) <= 8 { return 0, nil @@ -127,24 +107,25 @@ func splitFiberID(id datypes.ID) (uint64, []byte) { return binary.LittleEndian.Uint64(id[:8]), id[8:] } -// Submit uploads each data blob to the Fiber network. -// All blobs are uploaded individually, then indexed under a single canonical height -// (the height of the last upload) to satisfy the ev-node DA contract that all -// submitted blobs appear at the same height. +func (c *fiberDAClient) pruneIndexLocked() { + if c.indexWindow == 0 || c.indexTail == 0 || c.indexTail < c.indexWindow { + return + } + cutoff := c.indexTail - c.indexWindow + 1 + for h := range c.index { + if h < cutoff { + delete(c.index, h) + } + } +} + func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, namespace []byte, _ []byte) datypes.ResultSubmit { var blobSize uint64 for _, b := range data { blobSize += uint64(len(b)) } - type uploadResult struct { - blobID []byte - height uint64 - promise []byte - data []byte - } - - uploaded := make([]uploadResult, 0, len(data)) + uploaded := make([]fiberUploadResult, 0, len(data)) for i, raw := range data { if uint64(len(raw)) > common.DefaultMaxBlobSize { @@ -167,7 +148,13 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na case errors.Is(err, context.DeadlineExceeded): code = datypes.StatusContextDeadline } + c.logger.Error().Err(err).Int("blob_index", i).Msg("fiber upload failed") + + if len(uploaded) > 0 { + c.indexUploaded(uploaded, namespace) + } + return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: code, @@ -179,7 +166,7 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } - uploaded = append(uploaded, uploadResult{ + uploaded = append(uploaded, fiberUploadResult{ blobID: result.BlobID, height: result.Height, promise: result.Promise, @@ -197,9 +184,30 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } - // Use the height of the last upload as the canonical submit height. - // Re-index all blobs under this single height so that Retrieve(height) - // returns all blobs from the same submit call. + ids := c.indexUploaded(uploaded, namespace) + + c.logger.Debug().Int("num_ids", len(ids)).Uint64("height", uploaded[len(uploaded)-1].height).Msg("fiber DA submission successful") + + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + IDs: ids, + SubmittedCount: uint64(len(ids)), + Height: uploaded[len(uploaded)-1].height, + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } +} + +type fiberUploadResult struct { + blobID []byte + height uint64 + promise []byte + data []byte +} + +func (c *fiberDAClient) indexUploaded(uploaded []fiberUploadResult, namespace []byte) []datypes.ID { submitHeight := uploaded[len(uploaded)-1].height ids := make([]datypes.ID, len(uploaded)) @@ -208,45 +216,36 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na id := makeFiberID(submitHeight, u.blobID) ids[i] = id c.index[submitHeight] = append(c.index[submitHeight], fiberIndexedBlob{ - id: id, - data: u.data, - promise: u.promise, - blobID: u.blobID, + id: id, + namespace: namespace, + data: u.data, + promise: u.promise, + blobID: u.blobID, }) } + if submitHeight > c.indexTail { + c.indexTail = submitHeight + } + c.pruneIndexLocked() c.mu.Unlock() - c.logger.Debug().Int("num_ids", len(ids)).Uint64("height", submitHeight).Msg("fiber DA submission successful") - - return datypes.ResultSubmit{ - BaseResult: datypes.BaseResult{ - Code: datypes.StatusSuccess, - IDs: ids, - SubmittedCount: uint64(len(ids)), - Height: submitHeight, - BlobSize: blobSize, - Timestamp: time.Now(), - }, - } + return ids } -// Retrieve retrieves blobs from the Fiber network at the specified height and namespace. -// It first checks the local submission index, then falls back to downloading via blob IDs. func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { return c.retrieve(ctx, height, namespace, true) } -// RetrieveBlobs retrieves blobs without blocking on timestamp resolution. func (c *fiberDAClient) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { return c.retrieve(ctx, height, namespace, false) } -func (c *fiberDAClient) retrieve(_ context.Context, height uint64, _ []byte, _ bool) datypes.ResultRetrieve { +func (c *fiberDAClient) retrieve(_ context.Context, height uint64, namespace []byte, _ bool) datypes.ResultRetrieve { c.mu.RLock() - blobs, ok := c.index[height] + allBlobs, ok := c.index[height] c.mu.RUnlock() - if !ok || len(blobs) == 0 { + if !ok || len(allBlobs) == 0 { return datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{ Code: datypes.StatusNotFound, @@ -257,9 +256,27 @@ func (c *fiberDAClient) retrieve(_ context.Context, height uint64, _ []byte, _ b } } - ids := make([]datypes.ID, len(blobs)) - data := make([]datypes.Blob, len(blobs)) - for i, b := range blobs { + var matching []fiberIndexedBlob + for _, b := range allBlobs { + if nsEqual(b.namespace, namespace) { + matching = append(matching, b) + } + } + + if len(matching) == 0 { + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusNotFound, + Message: "no blobs found at height for given namespace in fiber index", + Height: height, + Timestamp: time.Now(), + }, + } + } + + ids := make([]datypes.ID, len(matching)) + data := make([]datypes.Blob, len(matching)) + for i, b := range matching { ids[i] = b.id data[i] = b.data } @@ -275,9 +292,18 @@ func (c *fiberDAClient) retrieve(_ context.Context, height uint64, _ []byte, _ b } } -// Get downloads specific blobs by their IDs from the Fiber network. -// Each ID is decoded to extract the Fiber blob ID and height, -// then downloaded via the Fiber client. +func nsEqual(a, b []byte) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + func (c *fiberDAClient) Get(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { if len(ids) == 0 { return nil, nil @@ -302,17 +328,13 @@ func (c *fiberDAClient) Get(ctx context.Context, ids []datypes.ID, _ []byte) ([] return res, nil } -// Subscribe returns a channel that emits SubscriptionEvents for new DA heights. -// Since the Fiber protocol doesn't have a native subscription mechanism, this -// implementation polls GetLatestHeight and emits events for heights present in -// the local submission index. Only heights indexed by this client are emitted. func (c *fiberDAClient) Subscribe(ctx context.Context, _ []byte, _ bool) (<-chan datypes.SubscriptionEvent, error) { - out := make(chan datypes.SubscriptionEvent, 16) + out := make(chan datypes.SubscriptionEvent, fiberSubscribeChanSize) go func() { defer close(out) - ticker := time.NewTicker(1 * time.Second) + ticker := time.NewTicker(fiberSubscribePollFreq) defer ticker.Stop() var lastHeight uint64 @@ -364,7 +386,6 @@ func (c *fiberDAClient) Subscribe(ctx context.Context, _ []byte, _ bool) (<-chan return out, nil } -// GetLatestDAHeight returns the latest block height from the Fiber network. func (c *fiberDAClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { heightCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) defer cancel() @@ -376,9 +397,6 @@ func (c *fiberDAClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { return height, nil } -// GetProofs returns the serialized payment promises as proofs for the given IDs. -// In the Fiber protocol, the signed payment promise from validators serves as -// proof of data availability. func (c *fiberDAClient) GetProofs(_ context.Context, ids []datypes.ID, _ []byte) ([]datypes.Proof, error) { if len(ids) == 0 { return []datypes.Proof{}, nil @@ -403,8 +421,6 @@ func (c *fiberDAClient) GetProofs(_ context.Context, ids []datypes.ID, _ []byte) return proofs, nil } -// Validate verifies that the proofs (payment promises) correspond to the given IDs. -// It checks that each proof was stored for the matching blob during submission. func (c *fiberDAClient) Validate(_ context.Context, ids []datypes.ID, proofs []datypes.Proof, _ []byte) ([]bool, error) { if len(ids) != len(proofs) { return nil, errors.New("number of IDs and proofs must match") @@ -423,7 +439,6 @@ func (c *fiberDAClient) Validate(_ context.Context, ids []datypes.ID, proofs []d for _, b := range blobs { if string(b.id) == string(id) { - // A non-empty promise proof that matches the stored promise is valid. results[i] = len(proofs[i]) > 0 && string(proofs[i]) == string(b.promise) break } @@ -433,14 +448,7 @@ func (c *fiberDAClient) Validate(_ context.Context, ids []datypes.ID, proofs []d return results, nil } -// GetHeaderNamespace returns the header namespace bytes. -func (c *fiberDAClient) GetHeaderNamespace() []byte { return c.namespaceBz } - -// GetDataNamespace returns the data namespace bytes. -func (c *fiberDAClient) GetDataNamespace() []byte { return c.dataNamespaceBz } - -// GetForcedInclusionNamespace returns the forced inclusion namespace bytes. +func (c *fiberDAClient) GetHeaderNamespace() []byte { return c.namespaceBz } +func (c *fiberDAClient) GetDataNamespace() []byte { return c.dataNamespaceBz } func (c *fiberDAClient) GetForcedInclusionNamespace() []byte { return c.forcedNamespaceBz } - -// HasForcedInclusionNamespace reports whether forced inclusion namespace is configured. -func (c *fiberDAClient) HasForcedInclusionNamespace() bool { return c.hasForcedNamespace } +func (c *fiberDAClient) HasForcedInclusionNamespace() bool { return c.hasForcedNamespace } diff --git a/block/internal/da/fiber_client_test.go b/block/internal/da/fiber_client_test.go index d119b8a931..ae4fd5af84 100644 --- a/block/internal/da/fiber_client_test.go +++ b/block/internal/da/fiber_client_test.go @@ -16,10 +16,9 @@ import ( datypes "github.com/evstack/ev-node/pkg/da/types" ) -// mockFiberClient is a test mock for the FiberClient interface. type mockFiberClient struct { mu sync.Mutex - uploads map[string][]byte // blobID hex -> data + uploads map[string][]byte height uint64 uploadErr error callCount atomic.Uint64 @@ -43,9 +42,8 @@ func (m *mockFiberClient) Upload(_ context.Context, namespace, data []byte) (Fib m.height++ callIdx := m.callCount.Add(1) - // Generate a unique blob ID for each upload blobID := make([]byte, 33) - blobID[0] = 0 // version 0 + blobID[0] = 0 hash := sha256.Sum256(append([]byte{byte(callIdx)}, data...)) copy(blobID[1:], hash[:]) @@ -185,12 +183,93 @@ func TestFiberClient_Submit_BlobTooLarge(t *testing.T) { _, cl := makeTestFiberClient(t) ns := datypes.NamespaceFromString("test-ns").Bytes() - largeBlob := make([]byte, 6*1024*1024) // 6MB > 5MB default max + largeBlob := make([]byte, 6*1024*1024) res := cl.Submit(context.Background(), [][]byte{largeBlob}, 0, ns, nil) require.Equal(t, datypes.StatusTooBig, res.Code) } +func TestFiberClient_Submit_PartialFailureIndexesUploaded(t *testing.T) { + mock := newMockFiberClient() + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + + res1 := cl.Submit(context.Background(), [][]byte{[]byte("first")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, res1.Code) + + mock.uploadErr = errors.New("transient failure") + res2 := cl.Submit(context.Background(), [][]byte{[]byte("second")}, 0, ns, nil) + require.Equal(t, datypes.StatusError, res2.Code) + + mock.uploadErr = nil + res3 := cl.Submit(context.Background(), [][]byte{[]byte("third")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, res3.Code) + + retrieveRes := cl.Retrieve(context.Background(), res1.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 1) + require.Equal(t, []byte("first"), retrieveRes.Data[0]) + + retrieveRes3 := cl.Retrieve(context.Background(), res3.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes3.Code) + require.Equal(t, []byte("third"), retrieveRes3.Data[0]) +} + +func TestFiberClient_Submit_PartialFailureOnSecondBlob(t *testing.T) { + failingMock := &failingOnSecondBlobFiberClient{inner: newMockFiberClient()} + cl := NewFiberClient(FiberConfig{ + Client: failingMock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + + res := cl.Submit(context.Background(), [][]byte{[]byte("first"), []byte("second"), []byte("third")}, 0, ns, nil) + require.Equal(t, datypes.StatusError, res.Code) + require.Contains(t, res.Message, "blob 1") + require.Equal(t, uint64(1), res.SubmittedCount) + + fc := cl.(*fiberDAClient) + fc.mu.RLock() + totalBlobs := 0 + for _, blobs := range fc.index { + totalBlobs += len(blobs) + } + fc.mu.RUnlock() + require.Equal(t, 1, totalBlobs) +} + +type failingOnSecondBlobFiberClient struct { + inner *mockFiberClient + callCount atomic.Uint64 +} + +func (f *failingOnSecondBlobFiberClient) Upload(ctx context.Context, namespace, data []byte) (FiberUploadResult, error) { + idx := f.callCount.Add(1) + if idx == 2 { + return FiberUploadResult{}, errors.New("second blob fails") + } + return f.inner.Upload(ctx, namespace, data) +} + +func (f *failingOnSecondBlobFiberClient) Download(ctx context.Context, blobID []byte, height uint64) ([]byte, error) { + return f.inner.Download(ctx, blobID, height) +} + +func (f *failingOnSecondBlobFiberClient) GetLatestHeight(ctx context.Context) (uint64, error) { + return f.inner.GetLatestHeight(ctx) +} + func TestFiberClient_Retrieve_Success(t *testing.T) { _, cl := makeTestFiberClient(t) @@ -227,8 +306,35 @@ func TestFiberClient_Retrieve_NotFound(t *testing.T) { require.Equal(t, datypes.StatusNotFound, retrieveRes.Code) } +func TestFiberClient_Retrieve_NamespaceFiltering(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns1 := datypes.NamespaceFromString("ns-a").Bytes() + ns2 := datypes.NamespaceFromString("ns-b").Bytes() + + res1 := cl.Submit(context.Background(), [][]byte{[]byte("alpha")}, 0, ns1, nil) + require.Equal(t, datypes.StatusSuccess, res1.Code) + + res2 := cl.Submit(context.Background(), [][]byte{[]byte("beta")}, 0, ns2, nil) + require.Equal(t, datypes.StatusSuccess, res2.Code) + + // Retrieving at ns1's height with ns1 returns the blob + rr1 := cl.Retrieve(context.Background(), res1.Height, ns1) + require.Equal(t, datypes.StatusSuccess, rr1.Code) + require.Equal(t, []byte("alpha"), rr1.Data[0]) + + // Retrieving at ns1's height with ns2 returns not found (different height) + rr2 := cl.Retrieve(context.Background(), res1.Height, ns2) + require.Equal(t, datypes.StatusNotFound, rr2.Code) + + // Retrieving at ns2's height with ns2 returns the blob + rr3 := cl.Retrieve(context.Background(), res2.Height, ns2) + require.Equal(t, datypes.StatusSuccess, rr3.Code) + require.Equal(t, []byte("beta"), rr3.Data[0]) +} + func TestFiberClient_Get_Success(t *testing.T) { - mock, cl := makeTestFiberClient(t) + _, cl := makeTestFiberClient(t) ns := datypes.NamespaceFromString("test-ns").Bytes() submitRes := cl.Submit(context.Background(), [][]byte{[]byte("getme")}, 0, ns, nil) @@ -239,8 +345,6 @@ func TestFiberClient_Get_Success(t *testing.T) { require.NoError(t, err) require.Len(t, blobs, 1) require.Equal(t, []byte("getme"), blobs[0]) - - _ = mock // mock stores the data for download } func TestFiberClient_Get_EmptyIDs(t *testing.T) { @@ -262,7 +366,6 @@ func TestFiberClient_Get_InvalidID(t *testing.T) { func TestFiberClient_Get_DownloadError(t *testing.T) { _, cl := makeTestFiberClient(t) - // Construct a valid-looking ID but with a blob ID that doesn't exist fakeBlobID := make([]byte, 33) id := makeFiberID(1, fakeBlobID) @@ -289,7 +392,7 @@ func TestFiberClient_GetProofs_Success(t *testing.T) { proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) require.NoError(t, err) require.Len(t, proofs, 1) - require.NotEmpty(t, proofs[0]) // Should contain the promise + require.NotEmpty(t, proofs[0]) } func TestFiberClient_GetProofs_Empty(t *testing.T) { @@ -339,12 +442,11 @@ func TestFiberClient_Validate_WrongProof(t *testing.T) { submitRes := cl.Submit(context.Background(), [][]byte{[]byte("validatewrong")}, 0, ns, nil) require.Equal(t, datypes.StatusSuccess, submitRes.Code) - // Use a wrong proof fakeProofs := []datypes.Proof{[]byte("wrong-proof")} results, err := cl.Validate(context.Background(), submitRes.IDs, fakeProofs, ns) require.NoError(t, err) require.Len(t, results, 1) - require.False(t, results[0]) // Wrong proof should fail validation + require.False(t, results[0]) } func TestFiberClient_Validate_EmptyProof(t *testing.T) { @@ -360,13 +462,24 @@ func TestFiberClient_Validate_EmptyProof(t *testing.T) { require.False(t, results[0]) } +func TestFiberClient_Validate_UnknownID(t *testing.T) { + _, cl := makeTestFiberClient(t) + + fakeID := makeFiberID(99999, make([]byte, 33)) + proofs := []datypes.Proof{[]byte("some-proof")} + results, err := cl.Validate(context.Background(), []datypes.ID{fakeID}, proofs, nil) + require.NoError(t, err) + require.Len(t, results, 1) + require.False(t, results[0]) +} + func TestFiberClient_Namespaces(t *testing.T) { cl := NewFiberClient(FiberConfig{ - Client: newMockFiberClient(), - Logger: zerolog.Nop(), - Namespace: "header-ns", - DataNamespace: "data-ns", - ForcedInclusionNamespace: "forced-ns", + Client: newMockFiberClient(), + Logger: zerolog.Nop(), + Namespace: "header-ns", + DataNamespace: "data-ns", + ForcedInclusionNamespace: "forced-ns", }) require.NotNil(t, cl) @@ -399,14 +512,10 @@ func TestFiberClient_Subscribe(t *testing.T) { require.NoError(t, err) require.NotNil(t, ch) - // Submit a blob so the index has something ns := datypes.NamespaceFromString("test-ns").Bytes() submitRes := cl.Submit(context.Background(), [][]byte{[]byte("sub-data")}, 0, ns, nil) require.Equal(t, datypes.StatusSuccess, submitRes.Code) - // The subscribe goroutine polls and should emit the event for the submitted height. - // Since the mock height starts at 100 and upload increments to 101, - // the subscribe loop should eventually pick it up. select { case ev := <-ch: require.Equal(t, submitRes.Height, ev.Height) @@ -428,7 +537,6 @@ func TestFiberClient_Submit_MultipleBlobs(t *testing.T) { require.Len(t, res.IDs, 3) require.Equal(t, uint64(3), res.SubmittedCount) - // Verify all blobs can be retrieved retrieveRes := cl.Retrieve(context.Background(), res.Height, ns) require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) require.Len(t, retrieveRes.Data, 3) @@ -438,20 +546,17 @@ func TestFiberClient_Submit_MultipleBlobs(t *testing.T) { } func TestFiberClient_SubmitAndDownload(t *testing.T) { - mock, cl := makeTestFiberClient(t) + _, cl := makeTestFiberClient(t) ns := datypes.NamespaceFromString("test-ns").Bytes() data := []byte("download-test") submitRes := cl.Submit(context.Background(), [][]byte{data}, 0, ns, nil) require.Equal(t, datypes.StatusSuccess, submitRes.Code) - // The mock stores the data, so Get should be able to download it blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) require.NoError(t, err) require.Len(t, blobs, 1) require.Equal(t, data, blobs[0]) - - _ = mock } func TestMakeFiberID_RoundTrip(t *testing.T) { @@ -491,23 +596,19 @@ func TestFiberClient_FullSubmitRetrieveCycle(t *testing.T) { ns := datypes.NamespaceFromString("test-ns").Bytes() - // Submit submitRes := cl.Submit(context.Background(), [][]byte{[]byte("cycle-data")}, 0, ns, nil) require.Equal(t, datypes.StatusSuccess, submitRes.Code) require.Len(t, submitRes.IDs, 1) submittedHeight := submitRes.Height - // Retrieve retrieveRes := cl.Retrieve(context.Background(), submittedHeight, ns) require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) require.Equal(t, []byte("cycle-data"), retrieveRes.Data[0]) - // Get blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) require.NoError(t, err) require.Equal(t, []byte("cycle-data"), blobs[0]) - // GetProofs + Validate proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) require.NoError(t, err) require.NotEmpty(t, proofs[0]) @@ -516,3 +617,36 @@ func TestFiberClient_FullSubmitRetrieveCycle(t *testing.T) { require.NoError(t, err) require.True(t, valid[0]) } + +func TestFiberClient_IndexPruning(t *testing.T) { + mock := newMockFiberClient() + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + require.NotNil(t, cl) + fc := cl.(*fiberDAClient) + fc.indexWindow = 10 + + ns := datypes.NamespaceFromString("test-ns").Bytes() + + var lastHeight uint64 + for i := 0; i < 20; i++ { + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, res.Code) + lastHeight = res.Height + } + + fc.mu.RLock() + indexLen := len(fc.index) + _, hasOld := fc.index[lastHeight-20] + _, hasRecent := fc.index[lastHeight] + fc.mu.RUnlock() + + require.True(t, hasRecent, "most recent height should be in index") + require.False(t, hasOld, "old height should have been pruned") + require.LessOrEqual(t, indexLen, 10, "index should be bounded by window") +} From f3356c6043ed8f1b45fc87df1a408b5c50af6075 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 14 Apr 2026 15:59:24 +0200 Subject: [PATCH 3/6] reduce alloc --- pkg/sequencers/solo/sequencer.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/sequencers/solo/sequencer.go b/pkg/sequencers/solo/sequencer.go index 0fcae9f31c..86fa08d45d 100644 --- a/pkg/sequencers/solo/sequencer.go +++ b/pkg/sequencers/solo/sequencer.go @@ -16,6 +16,12 @@ import ( var ErrInvalidID = errors.New("invalid chain id") +var ( + emptyBatch = &coresequencer.Batch{} + submitBatchResp = &coresequencer.SubmitBatchTxsResponse{} + verifyBatchOKResp = &coresequencer.VerifyBatchResponse{Status: true} +) + var _ coresequencer.Sequencer = (*SoloSequencer)(nil) // SoloSequencer is a single-leader sequencer without forced inclusion @@ -55,14 +61,14 @@ func (s *SoloSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Su } if req.Batch == nil || len(req.Batch.Transactions) == 0 { - return &coresequencer.SubmitBatchTxsResponse{}, nil + return submitBatchResp, nil } s.mu.Lock() defer s.mu.Unlock() s.queue = append(s.queue, req.Batch.Transactions...) - return &coresequencer.SubmitBatchTxsResponse{}, nil + return submitBatchResp, nil } func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { @@ -77,7 +83,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN if len(txs) == 0 { return &coresequencer.GetNextBatchResponse{ - Batch: &coresequencer.Batch{}, + Batch: emptyBatch, Timestamp: time.Now().UTC(), BatchData: req.LastBatchData, }, nil @@ -94,21 +100,22 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN filterStatuses, err := s.executor.FilterTxs(ctx, txs, req.MaxBytes, maxGas, false) if err != nil { s.logger.Warn().Err(err).Msg("failed to filter transactions, proceeding with unfiltered") - filterStatuses = make([]execution.FilterStatus, len(txs)) - for i := range filterStatuses { - filterStatuses[i] = execution.FilterOK - } + return &coresequencer.GetNextBatchResponse{ + Batch: &coresequencer.Batch{Transactions: txs}, + Timestamp: time.Now().UTC(), + BatchData: req.LastBatchData, + }, nil } - var validTxs [][]byte + write := 0 var postponedTxs [][]byte for i, status := range filterStatuses { switch status { case execution.FilterOK: - validTxs = append(validTxs, txs[i]) + txs[write] = txs[i] + write++ case execution.FilterPostpone: postponedTxs = append(postponedTxs, txs[i]) - case execution.FilterRemove: } } @@ -119,7 +126,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN } return &coresequencer.GetNextBatchResponse{ - Batch: &coresequencer.Batch{Transactions: validTxs}, + Batch: &coresequencer.Batch{Transactions: txs[:write]}, Timestamp: time.Now().UTC(), BatchData: req.LastBatchData, }, nil @@ -130,7 +137,7 @@ func (s *SoloSequencer) VerifyBatch(ctx context.Context, req coresequencer.Verif return nil, ErrInvalidID } - return &coresequencer.VerifyBatchResponse{Status: true}, nil + return verifyBatchOKResp, nil } func (s *SoloSequencer) SetDAHeight(height uint64) { From 72786857ca383e823fdb25c7de16b73fd47094e0 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 15 Apr 2026 13:54:24 +0200 Subject: [PATCH 4/6] lint --- block/public.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/block/public.go b/block/public.go index e8ae4d14c5..d056394270 100644 --- a/block/public.go +++ b/block/public.go @@ -72,12 +72,12 @@ func NewFiberDAClient( logger zerolog.Logger, ) FullDAClient { base := da.NewFiberClient(da.FiberConfig{ - Client: fiberClient, - Logger: logger, - DefaultTimeout: config.DA.RequestTimeout.Duration, - Namespace: config.DA.GetNamespace(), - DataNamespace: config.DA.GetDataNamespace(), - ForcedInclusionNamespace: config.DA.GetForcedInclusionNamespace(), + Client: fiberClient, + Logger: logger, + DefaultTimeout: config.DA.RequestTimeout.Duration, + Namespace: config.DA.GetNamespace(), + DataNamespace: config.DA.GetDataNamespace(), + ForcedInclusionNamespace: config.DA.GetForcedInclusionNamespace(), }) if config.Instrumentation.IsTracingEnabled() { return da.WithTracingClient(base) From 7109b0ef55767f8a3dbb5851e7b36ef75576d459 Mon Sep 17 00:00:00 2001 From: Vlad Krinitsyn Date: Wed, 15 Apr 2026 18:54:02 +0200 Subject: [PATCH 5/6] feat(da): add Go DA interface and in-memory mock for fibre (#3256) Adds a fibremock package with: - DA interface (Upload/Download/Listen) matching the fibre gRPC service - In-memory MockDA implementation with LRU eviction and configurable retention - Tests covering all paths Migrated from celestiaorg/x402-risotto#16 as-is for integration. --- pkg/da/fibremock/fibre.go | 79 +++++++++++ pkg/da/fibremock/mock.go | 246 ++++++++++++++++++++++++++++++++++ pkg/da/fibremock/mock_test.go | 197 +++++++++++++++++++++++++++ 3 files changed, 522 insertions(+) create mode 100644 pkg/da/fibremock/fibre.go create mode 100644 pkg/da/fibremock/mock.go create mode 100644 pkg/da/fibremock/mock_test.go diff --git a/pkg/da/fibremock/fibre.go b/pkg/da/fibremock/fibre.go new file mode 100644 index 0000000000..986c27cc3b --- /dev/null +++ b/pkg/da/fibremock/fibre.go @@ -0,0 +1,79 @@ +// Package fibre provides a Go client interface and mock implementation for the +// Fibre DA (Data Availability) gRPC service. +// +// # Design Assumptions +// +// - The sequencer trusts the encoder to eventually confirm blob inclusion. +// Upload returns after the blob is uploaded and the PFF transaction is +// broadcast, NOT after on-chain confirmation. This keeps the sequencer's +// write path fast (~2s per 128 MB blob). +// +// - Callers are expected to batch/buffer their data into blobs sized for the +// protocol maximum (128 MiB - 5 byte header = 134,217,723 bytes). +// The interface accepts arbitrary sizes but the implementation may batch +// or reject oversized blobs. +// +// - Confirmation/finality is intentionally omitted from the initial API. +// The sequencer does not need it; the read path (Listen + Download) is +// sufficient for full nodes. A Status or Confirm RPC can be added later +// if needed without breaking existing callers. +// +// - Blob ordering is encoded in the blob data itself by the caller. +// The interface does not impose or guarantee ordering. +// +// - The interface is the same whether the encoder runs in-process or as an +// external gRPC service. For in-process use, call the mock or real +// implementation directly; for external use, connect via gRPC. +package fibremock + +import ( + "context" + "time" +) + +// BlobID uniquely identifies an uploaded blob (version byte + 32-byte commitment). +type BlobID []byte + +// UploadResult is returned by Upload after the blob is accepted. +type UploadResult struct { + // BlobID uniquely identifies the uploaded blob. + BlobID BlobID + // ExpiresAt is when the blob will be pruned from the DA network. + // Consumers must download before this time. + ExpiresAt time.Time +} + +// BlobEvent is delivered via Listen when a blob is confirmed on-chain. +type BlobEvent struct { + // BlobID of the confirmed blob. + BlobID BlobID + // Height is the chain height at which the blob was confirmed. + Height uint64 + // DataSize is the size of the original blob data in bytes (from the PFF). + // This allows full nodes to know the size before downloading. + DataSize uint64 +} + +// DA is the interface for interacting with the Fibre data availability layer. +// +// Implementations include: +// - MockDA: in-memory mock for testing +// - (future) gRPC client wrapping the Fibre service +// - (future) in-process encoder using fibre.Client directly +type DA interface { + // Upload submits a blob under the given namespace to the DA network. + // Returns after the blob is uploaded and the payment transaction is broadcast. + // Does NOT wait for on-chain confirmation (see package doc for rationale). + // + // The caller is responsible for batching data to the target blob size. + Upload(ctx context.Context, namespace []byte, data []byte) (UploadResult, error) + + // Download retrieves and reconstructs a blob by its ID. + // Returns the original data that was passed to Upload. + Download(ctx context.Context, blobID BlobID) ([]byte, error) + + // Listen streams confirmed blob events for the given namespace. + // The returned channel is closed when the context is cancelled. + // Each event includes the blob ID, confirmation height, and data size. + Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error) +} diff --git a/pkg/da/fibremock/mock.go b/pkg/da/fibremock/mock.go new file mode 100644 index 0000000000..ee88552121 --- /dev/null +++ b/pkg/da/fibremock/mock.go @@ -0,0 +1,246 @@ +package fibremock + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "sync" + "time" +) + +var ( + // ErrBlobNotFound is returned when a blob ID is not in the store. + ErrBlobNotFound = errors.New("blob not found") + // ErrDataEmpty is returned when Upload is called with empty data. + ErrDataEmpty = errors.New("data cannot be empty") +) + +// MockDAConfig configures the mock DA implementation. +type MockDAConfig struct { + // MaxBlobs is the maximum number of blobs stored in memory. + // When exceeded, the oldest blob is evicted regardless of retention. + // 0 means no limit (use with caution — large blobs will OOM). + MaxBlobs int + // Retention is how long blobs are kept before automatic pruning. + // 0 means blobs are kept until evicted by MaxBlobs. + Retention time.Duration +} + +// DefaultMockDAConfig returns a config suitable for testing: +// 100 blobs max, 10 minute retention. +func DefaultMockDAConfig() MockDAConfig { + return MockDAConfig{ + MaxBlobs: 100, + Retention: 10 * time.Minute, + } +} + +// storedBlob holds a blob and its metadata in the mock store. +type storedBlob struct { + namespace []byte + data []byte + height uint64 + expiresAt time.Time + createdAt time.Time +} + +// subscriber tracks a Listen subscription. +type subscriber struct { + namespace []byte + ch chan BlobEvent +} + +// MockDA is an in-memory mock implementation of the DA interface. +// It stores blobs in memory with configurable retention and max blob count. +// Safe for concurrent use. +type MockDA struct { + cfg MockDAConfig + + mu sync.RWMutex + blobs map[string]*storedBlob // keyed by hex(blobID) + order []string // insertion order for LRU eviction + height uint64 + subscribers []subscriber +} + +// NewMockDA creates a new mock DA with the given config. +func NewMockDA(cfg MockDAConfig) *MockDA { + return &MockDA{ + cfg: cfg, + blobs: make(map[string]*storedBlob), + } +} + +// Upload stores the blob in memory and notifies listeners. +func (m *MockDA) Upload(ctx context.Context, namespace []byte, data []byte) (UploadResult, error) { + if len(data) == 0 { + return UploadResult{}, ErrDataEmpty + } + + blobID := mockBlobID(data) + key := fmt.Sprintf("%x", blobID) + now := time.Now() + + var expiresAt time.Time + if m.cfg.Retention > 0 { + expiresAt = now.Add(m.cfg.Retention) + } + + m.mu.Lock() + + // Evict oldest if at capacity + if m.cfg.MaxBlobs > 0 && len(m.blobs) >= m.cfg.MaxBlobs { + m.evictOldestLocked() + } + + // Prune expired blobs opportunistically + if m.cfg.Retention > 0 { + m.pruneExpiredLocked(now) + } + + m.height++ + height := m.height + + m.blobs[key] = &storedBlob{ + namespace: namespace, + data: data, + height: height, + expiresAt: expiresAt, + createdAt: now, + } + m.order = append(m.order, key) + + // Notify subscribers (non-blocking) + event := BlobEvent{ + BlobID: blobID, + Height: height, + DataSize: uint64(len(data)), + } + for i := range m.subscribers { + if namespaceMatch(m.subscribers[i].namespace, namespace) { + select { + case m.subscribers[i].ch <- event: + default: + // Channel full, drop event. Subscriber is too slow. + } + } + } + + m.mu.Unlock() + + return UploadResult{ + BlobID: blobID, + ExpiresAt: expiresAt, + }, nil +} + +// Download retrieves a blob by ID. +func (m *MockDA) Download(ctx context.Context, blobID BlobID) ([]byte, error) { + key := fmt.Sprintf("%x", blobID) + + m.mu.RLock() + blob, ok := m.blobs[key] + m.mu.RUnlock() + + if !ok { + return nil, ErrBlobNotFound + } + + if !blob.expiresAt.IsZero() && time.Now().After(blob.expiresAt) { + return nil, ErrBlobNotFound + } + + return blob.data, nil +} + +// Listen returns a channel that receives events when blobs matching the +// namespace are uploaded. The channel is closed when ctx is cancelled. +func (m *MockDA) Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error) { + ch := make(chan BlobEvent, 64) + + m.mu.Lock() + idx := len(m.subscribers) + m.subscribers = append(m.subscribers, subscriber{ + namespace: namespace, + ch: ch, + }) + m.mu.Unlock() + + // Clean up when context is done. + go func() { + <-ctx.Done() + m.mu.Lock() + // Remove subscriber by swapping with last + last := len(m.subscribers) - 1 + if idx <= last { + m.subscribers[idx] = m.subscribers[last] + } + m.subscribers = m.subscribers[:last] + m.mu.Unlock() + close(ch) + }() + + return ch, nil +} + +// BlobCount returns the number of blobs currently stored. +func (m *MockDA) BlobCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.blobs) +} + +// evictOldestLocked removes the oldest blob. Caller must hold m.mu. +func (m *MockDA) evictOldestLocked() { + if len(m.order) == 0 { + return + } + key := m.order[0] + m.order = m.order[1:] + delete(m.blobs, key) +} + +// pruneExpiredLocked removes blobs past their retention. Caller must hold m.mu. +func (m *MockDA) pruneExpiredLocked(now time.Time) { + surviving := m.order[:0] + for _, key := range m.order { + blob, ok := m.blobs[key] + if !ok { + continue + } + if !blob.expiresAt.IsZero() && now.After(blob.expiresAt) { + delete(m.blobs, key) + } else { + surviving = append(surviving, key) + } + } + m.order = surviving +} + +// namespaceMatch returns true if the subscription namespace matches the blob namespace. +// An empty subscription namespace matches all namespaces (wildcard). +func namespaceMatch(subNS, blobNS []byte) bool { + if len(subNS) == 0 { + return true + } + if len(subNS) != len(blobNS) { + return false + } + for i := range subNS { + if subNS[i] != blobNS[i] { + return false + } + } + return true +} + +// mockBlobID produces a deterministic blob ID from the data. +// Format: 1 byte version (0) + 32 bytes SHA256 hash. +func mockBlobID(data []byte) BlobID { + hash := sha256.Sum256(data) + id := make([]byte, 33) + id[0] = 0 // version byte + copy(id[1:], hash[:]) + return id +} diff --git a/pkg/da/fibremock/mock_test.go b/pkg/da/fibremock/mock_test.go new file mode 100644 index 0000000000..e5aa62aed0 --- /dev/null +++ b/pkg/da/fibremock/mock_test.go @@ -0,0 +1,197 @@ +package fibremock + +import ( + "bytes" + "context" + "testing" + "time" +) + +func TestMockDA_UploadDownload(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx := context.Background() + + ns := []byte("test-ns") + data := []byte("hello fibre") + + result, err := m.Upload(ctx, ns, data) + if err != nil { + t.Fatal(err) + } + if len(result.BlobID) != 33 { + t.Fatalf("expected 33-byte blob ID, got %d", len(result.BlobID)) + } + + got, err := m.Download(ctx, result.BlobID) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, data) { + t.Fatalf("data mismatch: got %q, want %q", got, data) + } +} + +func TestMockDA_UploadEmpty(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + _, err := m.Upload(context.Background(), []byte("ns"), nil) + if err != ErrDataEmpty { + t.Fatalf("expected ErrDataEmpty, got %v", err) + } +} + +func TestMockDA_DownloadNotFound(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + _, err := m.Download(context.Background(), BlobID{0, 1, 2}) + if err != ErrBlobNotFound { + t.Fatalf("expected ErrBlobNotFound, got %v", err) + } +} + +func TestMockDA_Listen(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ns := []byte("test-ns") + ch, err := m.Listen(ctx, ns) + if err != nil { + t.Fatal(err) + } + + // Upload a blob — should trigger the listener + data := []byte("listened blob") + result, err := m.Upload(ctx, ns, data) + if err != nil { + t.Fatal(err) + } + + select { + case event := <-ch: + if !bytes.Equal(event.BlobID, result.BlobID) { + t.Fatal("blob ID mismatch in event") + } + if event.Height == 0 { + t.Fatal("expected non-zero height") + } + if event.DataSize != uint64(len(data)) { + t.Fatalf("expected data size %d, got %d", len(data), event.DataSize) + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } +} + +func TestMockDA_ListenNamespaceFilter(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, err := m.Listen(ctx, []byte("ns-A")) + if err != nil { + t.Fatal(err) + } + + // Upload to different namespace — should NOT trigger + m.Upload(ctx, []byte("ns-B"), []byte("wrong namespace")) + + select { + case <-ch: + t.Fatal("should not receive event for different namespace") + case <-time.After(50 * time.Millisecond): + // good + } +} + +func TestMockDA_ListenWildcard(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Empty namespace = wildcard + ch, err := m.Listen(ctx, nil) + if err != nil { + t.Fatal(err) + } + + m.Upload(ctx, []byte("any-ns"), []byte("wildcard test")) + + select { + case event := <-ch: + if event.Height == 0 { + t.Fatal("expected event") + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for wildcard event") + } +} + +func TestMockDA_MaxBlobsEviction(t *testing.T) { + m := NewMockDA(MockDAConfig{MaxBlobs: 3}) + ctx := context.Background() + + var ids []BlobID + for i := range 5 { + r, err := m.Upload(ctx, nil, []byte{byte(i), 1, 2, 3}) + if err != nil { + t.Fatal(err) + } + ids = append(ids, r.BlobID) + } + + // First two should be evicted + if _, err := m.Download(ctx, ids[0]); err != ErrBlobNotFound { + t.Fatal("expected first blob to be evicted") + } + if _, err := m.Download(ctx, ids[1]); err != ErrBlobNotFound { + t.Fatal("expected second blob to be evicted") + } + + // Last three should still be there + for i := 2; i < 5; i++ { + if _, err := m.Download(ctx, ids[i]); err != nil { + t.Fatalf("blob %d should exist: %v", i, err) + } + } + + if m.BlobCount() != 3 { + t.Fatalf("expected 3 blobs, got %d", m.BlobCount()) + } +} + +func TestMockDA_Retention(t *testing.T) { + m := NewMockDA(MockDAConfig{Retention: 50 * time.Millisecond}) + ctx := context.Background() + + r, err := m.Upload(ctx, nil, []byte("ephemeral")) + if err != nil { + t.Fatal(err) + } + + // Should exist immediately + if _, err := m.Download(ctx, r.BlobID); err != nil { + t.Fatal("blob should exist immediately") + } + + // Wait for expiry + time.Sleep(100 * time.Millisecond) + + if _, err := m.Download(ctx, r.BlobID); err != ErrBlobNotFound { + t.Fatal("blob should have expired") + } +} + +func TestMockDA_DeterministicBlobID(t *testing.T) { + m := NewMockDA(DefaultMockDAConfig()) + ctx := context.Background() + + data := []byte("deterministic") + r1, _ := m.Upload(ctx, nil, data) + r2, _ := m.Upload(ctx, nil, data) + + if !bytes.Equal(r1.BlobID, r2.BlobID) { + t.Fatal("same data should produce same blob ID") + } +} + +// Verify MockDA satisfies the DA interface at compile time. +var _ DA = (*MockDA)(nil) From 4485d912c6c9c2023c88b577f5668801975cc269 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 15 Apr 2026 19:41:08 +0200 Subject: [PATCH 6/6] updates --- block/internal/da/fiber_client.go | 80 ++++----- block/internal/da/fiber_client_test.go | 168 +++++++----------- {pkg => block/internal}/da/fibremock/fibre.go | 0 {pkg => block/internal}/da/fibremock/mock.go | 0 .../internal}/da/fibremock/mock_test.go | 0 5 files changed, 95 insertions(+), 153 deletions(-) rename {pkg => block/internal}/da/fibremock/fibre.go (100%) rename {pkg => block/internal}/da/fibremock/mock.go (100%) rename {pkg => block/internal}/da/fibremock/mock_test.go (100%) diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index a903af43f4..39988c67e5 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -11,6 +11,7 @@ import ( "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da/fibremock" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -20,17 +21,12 @@ const ( fiberSubscribeChanSize = 16 ) -type FiberUploadResult struct { - BlobID []byte - Height uint64 - Promise []byte -} - -type FiberClient interface { - Upload(ctx context.Context, namespace, data []byte) (FiberUploadResult, error) - Download(ctx context.Context, blobID []byte, height uint64) ([]byte, error) - GetLatestHeight(ctx context.Context) (uint64, error) -} +type ( + FiberClient = fibremock.DA + BlobID = fibremock.BlobID + UploadResult = fibremock.UploadResult + BlobEvent = fibremock.BlobEvent +) type FiberConfig struct { Client FiberClient @@ -50,17 +46,17 @@ type fiberDAClient struct { forcedNamespaceBz []byte hasForcedNamespace bool - mu sync.RWMutex - index map[uint64][]fiberIndexedBlob - indexTail uint64 - indexWindow uint64 + mu sync.RWMutex + index map[uint64][]fiberIndexedBlob + indexTail uint64 + indexWindow uint64 + latestHeight uint64 } type fiberIndexedBlob struct { id datypes.ID namespace []byte data []byte - promise []byte blobID []byte } @@ -166,11 +162,15 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } + c.mu.Lock() + c.latestHeight++ + h := c.latestHeight + c.mu.Unlock() + uploaded = append(uploaded, fiberUploadResult{ - blobID: result.BlobID, - height: result.Height, - promise: result.Promise, - data: raw, + blobID: result.BlobID, + height: h, + data: raw, }) } @@ -201,10 +201,9 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } type fiberUploadResult struct { - blobID []byte - height uint64 - promise []byte - data []byte + blobID []byte + height uint64 + data []byte } func (c *fiberDAClient) indexUploaded(uploaded []fiberUploadResult, namespace []byte) []datypes.ID { @@ -219,7 +218,6 @@ func (c *fiberDAClient) indexUploaded(uploaded []fiberUploadResult, namespace [] id: id, namespace: namespace, data: u.data, - promise: u.promise, blobID: u.blobID, }) } @@ -311,13 +309,13 @@ func (c *fiberDAClient) Get(ctx context.Context, ids []datypes.ID, _ []byte) ([] res := make([]datypes.Blob, 0, len(ids)) for _, id := range ids { - height, blobID := splitFiberID(id) + _, blobID := splitFiberID(id) if blobID == nil { return nil, fmt.Errorf("invalid fiber blob id: %x", id) } downloadCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) - data, err := c.fiber.Download(downloadCtx, blobID, height) + data, err := c.fiber.Download(downloadCtx, blobID) cancel() if err != nil { return nil, fmt.Errorf("fiber download failed for blob %x: %w", blobID, err) @@ -344,13 +342,10 @@ func (c *fiberDAClient) Subscribe(ctx context.Context, _ []byte, _ bool) (<-chan case <-ctx.Done(): return case <-ticker.C: - heightCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) - height, err := c.fiber.GetLatestHeight(heightCtx) - cancel() - if err != nil { - c.logger.Error().Err(err).Msg("failed to get latest fiber height during subscribe") - continue - } + c.mu.RLock() + height := c.latestHeight + c.mu.RUnlock() + if height <= lastHeight { continue } @@ -386,15 +381,10 @@ func (c *fiberDAClient) Subscribe(ctx context.Context, _ []byte, _ bool) (<-chan return out, nil } -func (c *fiberDAClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { - heightCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) - defer cancel() - - height, err := c.fiber.GetLatestHeight(heightCtx) - if err != nil { - return 0, fmt.Errorf("failed to get latest fiber height: %w", err) - } - return height, nil +func (c *fiberDAClient) GetLatestDAHeight(context.Context) (uint64, error) { + c.mu.RLock() + defer c.mu.RUnlock() + return c.latestHeight, nil } func (c *fiberDAClient) GetProofs(_ context.Context, ids []datypes.ID, _ []byte) ([]datypes.Proof, error) { @@ -412,7 +402,7 @@ func (c *fiberDAClient) GetProofs(_ context.Context, ids []datypes.ID, _ []byte) for _, b := range blobs { if string(b.id) == string(id) { - proofs[i] = b.promise + proofs[i] = b.blobID break } } @@ -439,7 +429,7 @@ func (c *fiberDAClient) Validate(_ context.Context, ids []datypes.ID, proofs []d for _, b := range blobs { if string(b.id) == string(id) { - results[i] = len(proofs[i]) > 0 && string(proofs[i]) == string(b.promise) + results[i] = len(proofs[i]) > 0 && string(proofs[i]) == string(b.blobID) break } } diff --git a/block/internal/da/fiber_client_test.go b/block/internal/da/fiber_client_test.go index ae4fd5af84..a4c9843cf3 100644 --- a/block/internal/da/fiber_client_test.go +++ b/block/internal/da/fiber_client_test.go @@ -2,80 +2,21 @@ package da import ( "context" - "crypto/sha256" "errors" - "sync" "sync/atomic" "testing" "time" "github.com/rs/zerolog" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/evstack/ev-node/block/internal/da/fibremock" datypes "github.com/evstack/ev-node/pkg/da/types" ) -type mockFiberClient struct { - mu sync.Mutex - uploads map[string][]byte - height uint64 - uploadErr error - callCount atomic.Uint64 -} - -func newMockFiberClient() *mockFiberClient { - return &mockFiberClient{ - uploads: make(map[string][]byte), - height: 100, - } -} - -func (m *mockFiberClient) Upload(_ context.Context, namespace, data []byte) (FiberUploadResult, error) { - m.mu.Lock() - defer m.mu.Unlock() - - if m.uploadErr != nil { - return FiberUploadResult{}, m.uploadErr - } - - m.height++ - callIdx := m.callCount.Add(1) - - blobID := make([]byte, 33) - blobID[0] = 0 - hash := sha256.Sum256(append([]byte{byte(callIdx)}, data...)) - copy(blobID[1:], hash[:]) - - m.uploads[string(blobID)] = data - - return FiberUploadResult{ - BlobID: blobID, - Height: m.height, - Promise: []byte("signed-promise-" + string(blobID)), - }, nil -} - -func (m *mockFiberClient) Download(_ context.Context, blobID []byte, _ uint64) ([]byte, error) { - m.mu.Lock() - defer m.mu.Unlock() - - data, ok := m.uploads[string(blobID)] - if !ok { - return nil, errors.New("blob not found") - } - return data, nil -} - -func (m *mockFiberClient) GetLatestHeight(_ context.Context) (uint64, error) { - m.mu.Lock() - defer m.mu.Unlock() - return m.height, nil -} - -func makeTestFiberClient(t *testing.T) (*mockFiberClient, FullClient) { +func makeTestFiberClient(t *testing.T) (*fibremock.MockDA, FullClient) { t.Helper() - mock := newMockFiberClient() + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) cl := NewFiberClient(FiberConfig{ Client: mock, Logger: zerolog.Nop(), @@ -89,7 +30,7 @@ func makeTestFiberClient(t *testing.T) (*mockFiberClient, FullClient) { func TestFiberClient_NewClient_Nil(t *testing.T) { cl := NewFiberClient(FiberConfig{Client: nil}) - assert.Nil(t, cl) + require.Nil(t, cl) } func TestFiberClient_Submit_Success(t *testing.T) { @@ -128,10 +69,9 @@ func TestFiberClient_Submit_EmptyData(t *testing.T) { } func TestFiberClient_Submit_UploadError(t *testing.T) { - mock := newMockFiberClient() - mock.uploadErr = errors.New("upload failed") + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) cl := NewFiberClient(FiberConfig{ - Client: mock, + Client: &faultInjector{FiberClient: mock, err: errors.New("upload failed")}, Logger: zerolog.Nop(), DefaultTimeout: 5 * time.Second, Namespace: "test-ns", @@ -146,10 +86,9 @@ func TestFiberClient_Submit_UploadError(t *testing.T) { } func TestFiberClient_Submit_CanceledContext(t *testing.T) { - mock := newMockFiberClient() - mock.uploadErr = context.Canceled + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) cl := NewFiberClient(FiberConfig{ - Client: mock, + Client: &faultInjector{FiberClient: mock, err: context.Canceled}, Logger: zerolog.Nop(), DefaultTimeout: 5 * time.Second, Namespace: "test-ns", @@ -163,10 +102,9 @@ func TestFiberClient_Submit_CanceledContext(t *testing.T) { } func TestFiberClient_Submit_DeadlineExceeded(t *testing.T) { - mock := newMockFiberClient() - mock.uploadErr = context.DeadlineExceeded + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) cl := NewFiberClient(FiberConfig{ - Client: mock, + Client: &faultInjector{FiberClient: mock, err: context.DeadlineExceeded}, Logger: zerolog.Nop(), DefaultTimeout: 5 * time.Second, Namespace: "test-ns", @@ -190,9 +128,10 @@ func TestFiberClient_Submit_BlobTooLarge(t *testing.T) { } func TestFiberClient_Submit_PartialFailureIndexesUploaded(t *testing.T) { - mock := newMockFiberClient() + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + fault := &faultInjector{FiberClient: mock} cl := NewFiberClient(FiberConfig{ - Client: mock, + Client: fault, Logger: zerolog.Nop(), DefaultTimeout: 5 * time.Second, Namespace: "test-ns", @@ -204,11 +143,11 @@ func TestFiberClient_Submit_PartialFailureIndexesUploaded(t *testing.T) { res1 := cl.Submit(context.Background(), [][]byte{[]byte("first")}, 0, ns, nil) require.Equal(t, datypes.StatusSuccess, res1.Code) - mock.uploadErr = errors.New("transient failure") + fault.SetError(errors.New("transient failure")) res2 := cl.Submit(context.Background(), [][]byte{[]byte("second")}, 0, ns, nil) require.Equal(t, datypes.StatusError, res2.Code) - mock.uploadErr = nil + fault.SetError(nil) res3 := cl.Submit(context.Background(), [][]byte{[]byte("third")}, 0, ns, nil) require.Equal(t, datypes.StatusSuccess, res3.Code) @@ -223,9 +162,10 @@ func TestFiberClient_Submit_PartialFailureIndexesUploaded(t *testing.T) { } func TestFiberClient_Submit_PartialFailureOnSecondBlob(t *testing.T) { - failingMock := &failingOnSecondBlobFiberClient{inner: newMockFiberClient()} + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) + failing := &failOnNthUpload{FiberClient: mock, failAt: 2, err: errors.New("second blob fails")} cl := NewFiberClient(FiberConfig{ - Client: failingMock, + Client: failing, Logger: zerolog.Nop(), DefaultTimeout: 5 * time.Second, Namespace: "test-ns", @@ -249,27 +189,6 @@ func TestFiberClient_Submit_PartialFailureOnSecondBlob(t *testing.T) { require.Equal(t, 1, totalBlobs) } -type failingOnSecondBlobFiberClient struct { - inner *mockFiberClient - callCount atomic.Uint64 -} - -func (f *failingOnSecondBlobFiberClient) Upload(ctx context.Context, namespace, data []byte) (FiberUploadResult, error) { - idx := f.callCount.Add(1) - if idx == 2 { - return FiberUploadResult{}, errors.New("second blob fails") - } - return f.inner.Upload(ctx, namespace, data) -} - -func (f *failingOnSecondBlobFiberClient) Download(ctx context.Context, blobID []byte, height uint64) ([]byte, error) { - return f.inner.Download(ctx, blobID, height) -} - -func (f *failingOnSecondBlobFiberClient) GetLatestHeight(ctx context.Context) (uint64, error) { - return f.inner.GetLatestHeight(ctx) -} - func TestFiberClient_Retrieve_Success(t *testing.T) { _, cl := makeTestFiberClient(t) @@ -318,16 +237,13 @@ func TestFiberClient_Retrieve_NamespaceFiltering(t *testing.T) { res2 := cl.Submit(context.Background(), [][]byte{[]byte("beta")}, 0, ns2, nil) require.Equal(t, datypes.StatusSuccess, res2.Code) - // Retrieving at ns1's height with ns1 returns the blob rr1 := cl.Retrieve(context.Background(), res1.Height, ns1) require.Equal(t, datypes.StatusSuccess, rr1.Code) require.Equal(t, []byte("alpha"), rr1.Data[0]) - // Retrieving at ns1's height with ns2 returns not found (different height) rr2 := cl.Retrieve(context.Background(), res1.Height, ns2) require.Equal(t, datypes.StatusNotFound, rr2.Code) - // Retrieving at ns2's height with ns2 returns the blob rr3 := cl.Retrieve(context.Background(), res2.Height, ns2) require.Equal(t, datypes.StatusSuccess, rr3.Code) require.Equal(t, []byte("beta"), rr3.Data[0]) @@ -375,11 +291,15 @@ func TestFiberClient_Get_DownloadError(t *testing.T) { } func TestFiberClient_GetLatestDAHeight(t *testing.T) { - mock, cl := makeTestFiberClient(t) + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, res.Code) height, err := cl.GetLatestDAHeight(context.Background()) require.NoError(t, err) - require.Equal(t, mock.height, height) + require.Equal(t, res.Height, height) } func TestFiberClient_GetProofs_Success(t *testing.T) { @@ -474,8 +394,9 @@ func TestFiberClient_Validate_UnknownID(t *testing.T) { } func TestFiberClient_Namespaces(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) cl := NewFiberClient(FiberConfig{ - Client: newMockFiberClient(), + Client: mock, Logger: zerolog.Nop(), Namespace: "header-ns", DataNamespace: "data-ns", @@ -490,8 +411,9 @@ func TestFiberClient_Namespaces(t *testing.T) { } func TestFiberClient_NoForcedNamespace(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) cl := NewFiberClient(FiberConfig{ - Client: newMockFiberClient(), + Client: mock, Logger: zerolog.Nop(), Namespace: "header-ns", DataNamespace: "data-ns", @@ -580,8 +502,9 @@ func TestSplitFiberID_Invalid(t *testing.T) { } func TestFiberClient_DefaultTimeout(t *testing.T) { + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) cl := NewFiberClient(FiberConfig{ - Client: newMockFiberClient(), + Client: mock, Logger: zerolog.Nop(), Namespace: "ns", DataNamespace: "ns", @@ -619,7 +542,7 @@ func TestFiberClient_FullSubmitRetrieveCycle(t *testing.T) { } func TestFiberClient_IndexPruning(t *testing.T) { - mock := newMockFiberClient() + mock := fibremock.NewMockDA(fibremock.DefaultMockDAConfig()) cl := NewFiberClient(FiberConfig{ Client: mock, Logger: zerolog.Nop(), @@ -650,3 +573,32 @@ func TestFiberClient_IndexPruning(t *testing.T) { require.False(t, hasOld, "old height should have been pruned") require.LessOrEqual(t, indexLen, 10, "index should be bounded by window") } + +type faultInjector struct { + FiberClient + err error +} + +func (f *faultInjector) SetError(err error) { f.err = err } + +func (f *faultInjector) Upload(ctx context.Context, namespace, data []byte) (fibremock.UploadResult, error) { + if f.err != nil { + return fibremock.UploadResult{}, f.err + } + return f.FiberClient.Upload(ctx, namespace, data) +} + +type failOnNthUpload struct { + FiberClient + failAt uint64 + err error + callCount atomic.Uint64 +} + +func (f *failOnNthUpload) Upload(ctx context.Context, namespace, data []byte) (fibremock.UploadResult, error) { + n := f.callCount.Add(1) + if n == f.failAt { + return fibremock.UploadResult{}, f.err + } + return f.FiberClient.Upload(ctx, namespace, data) +} diff --git a/pkg/da/fibremock/fibre.go b/block/internal/da/fibremock/fibre.go similarity index 100% rename from pkg/da/fibremock/fibre.go rename to block/internal/da/fibremock/fibre.go diff --git a/pkg/da/fibremock/mock.go b/block/internal/da/fibremock/mock.go similarity index 100% rename from pkg/da/fibremock/mock.go rename to block/internal/da/fibremock/mock.go diff --git a/pkg/da/fibremock/mock_test.go b/block/internal/da/fibremock/mock_test.go similarity index 100% rename from pkg/da/fibremock/mock_test.go rename to block/internal/da/fibremock/mock_test.go