Skip to content

Commit ebf5448

Browse files
authored
feat(cache): add 100MB MaxSize and sets Discard MaxBytes limit to NATS KV cache buckets (#3014)
Signed-off-by: Miguel Martinez Trivino <migmartri@gmail.com> Signed-off-by: Miguel Martinez Trivino <miguel@chainloop.dev>
1 parent b81f027 commit ebf5448

6 files changed

Lines changed: 69 additions & 11 deletions

File tree

pkg/cache/attestationbundle/attestationbundle.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
const (
2929
ttl = 5 * 24 * time.Hour
30+
maxBytes = 100 * 1024 * 1024 // 100 MB
3031
bucket = "chainloop-attestation-bundles"
3132
description = "Cache for attestation bundles"
3233
)
@@ -40,6 +41,7 @@ type Cache struct {
4041
func New(ctx context.Context, rc *natsconn.ReloadableConnection, logger log.Logger) (*Cache, error) {
4142
opts := []cache.Option{
4243
cache.WithTTL(ttl),
44+
cache.WithMaxBytes(maxBytes),
4345
cache.WithDescription(description),
4446
}
4547

pkg/cache/cache.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,9 @@ type Logger interface {
4040
Errorw(keyvals ...any)
4141
}
4242

43-
// defaultMaxSize is a sensible upper bound on in-memory cache entries
44-
// to prevent unbounded growth. 0 means no LRU eviction (TTL-only).
45-
const defaultMaxSize = 1000
46-
4743
type config struct {
4844
ttl time.Duration
49-
maxSize int
45+
maxBytes int64
5046
logger Logger
5147
natsConn *nats.Conn
5248
bucketName string
@@ -75,6 +71,12 @@ func WithNATS(conn *nats.Conn, bucketName string) Option {
7571
}
7672
}
7773

74+
// WithMaxBytes sets the maximum total size (in bytes) for the NATS KV bucket.
75+
// When the limit is reached, NATS discards the oldest entries. Ignored for in-memory backend.
76+
func WithMaxBytes(n int64) Option {
77+
return func(c *config) { c.maxBytes = n }
78+
}
79+
7880
// WithDescription sets the NATS KV bucket description. Ignored for in-memory backend.
7981
func WithDescription(desc string) Option {
8082
return func(c *config) { c.description = desc }
@@ -108,10 +110,6 @@ func New[T any](opts ...Option) (Cache[T], error) {
108110
return newNATSKV[T](cfg)
109111
}
110112

111-
if cfg.maxSize == 0 {
112-
cfg.maxSize = defaultMaxSize
113-
}
114-
115113
return newMemoryCache[T](cfg), nil
116114
}
117115

pkg/cache/memory.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,19 @@ import (
2121
"github.com/hashicorp/golang-lru/v2/expirable"
2222
)
2323

24+
// defaultMaxSize is a sensible upper bound on in-memory cache entries
25+
// to prevent unbounded growth. 0 means no LRU eviction (TTL-only).
26+
const defaultMaxSize = 1000
27+
2428
type memoryCache[T any] struct {
2529
lru *expirable.LRU[string, T]
2630
logger Logger
2731
}
2832

2933
func newMemoryCache[T any](cfg *config) *memoryCache[T] {
30-
cfg.logger.Infow("msg", "cache: using in-memory LRU backend", "ttl", cfg.ttl, "maxSize", cfg.maxSize)
34+
cfg.logger.Infow("msg", "cache: using in-memory LRU backend", "ttl", cfg.ttl, "maxSize", defaultMaxSize)
3135
return &memoryCache[T]{
32-
lru: expirable.NewLRU[string, T](cfg.maxSize, nil, cfg.ttl),
36+
lru: expirable.NewLRU[string, T](defaultMaxSize, nil, cfg.ttl),
3337
logger: cfg.logger,
3438
}
3539
}

pkg/cache/natskv.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"encoding/base64"
2121
"encoding/json"
2222
"errors"
23+
"fmt"
2324
"sync"
2425

2526
"github.com/nats-io/nats.go"
@@ -65,12 +66,29 @@ func (c *natsKVCache[T]) initBucket() error {
6566
Bucket: c.bucket,
6667
Description: c.cfg.description,
6768
TTL: c.cfg.ttl,
69+
MaxBytes: c.cfg.maxBytes,
6870
Storage: jetstream.MemoryStorage,
6971
})
7072
if err != nil {
7173
return err
7274
}
7375

76+
// NATS KV hardcodes DiscardNew on the backing stream, which rejects writes
77+
// when MaxBytes is reached. For cache use-cases we want DiscardOld so that
78+
// the oldest entries are evicted automatically to make room for new ones.
79+
if c.cfg.maxBytes > 0 {
80+
streamName := fmt.Sprintf("KV_%s", c.bucket)
81+
stream, err := js.Stream(context.Background(), streamName)
82+
if err != nil {
83+
return fmt.Errorf("cache: failed to get backing stream %s: %w", streamName, err)
84+
}
85+
cfg := stream.CachedInfo().Config
86+
cfg.Discard = jetstream.DiscardOld
87+
if _, err := js.UpdateStream(context.Background(), cfg); err != nil {
88+
return fmt.Errorf("cache: failed to set DiscardOld on stream %s: %w", streamName, err)
89+
}
90+
}
91+
7492
c.mu.Lock()
7593
c.kv = kv
7694
c.mu.Unlock()

pkg/cache/natskv_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,40 @@ func TestNATSKV_NilKVGracefulDegradation(t *testing.T) {
216216
require.NoError(t, nkv.Purge(ctx))
217217
}
218218

219+
func TestNATSKV_MaxBytesEvictsOldEntries(t *testing.T) {
220+
nc := startEmbeddedNATS(t)
221+
222+
// With MaxBytes set, the backing stream is updated to DiscardOld so that
223+
// the oldest entries are evicted when the bucket is full.
224+
const maxBytes int64 = 10 * 1024
225+
c, err := New[[]byte](
226+
WithTTL(time.Minute),
227+
WithNATS(nc, "test-maxbytes"),
228+
WithMaxBytes(maxBytes),
229+
)
230+
require.NoError(t, err)
231+
232+
ctx := context.Background()
233+
payload := make([]byte, 1024)
234+
235+
// Write 20 entries, well beyond the 10 KiB limit
236+
for i := range 20 {
237+
key := "key-" + strings.Repeat("x", i)
238+
require.NoError(t, c.Set(ctx, key, payload), "Set should not fail even when bucket is full")
239+
}
240+
241+
// The latest entry should still be retrievable
242+
lastKey := "key-" + strings.Repeat("x", 19)
243+
_, ok, err := c.Get(ctx, lastKey)
244+
require.NoError(t, err)
245+
assert.True(t, ok, "most recent entry should still be in the cache")
246+
247+
// The earliest entries should have been evicted
248+
_, ok, err = c.Get(ctx, "key-")
249+
require.NoError(t, err)
250+
assert.False(t, ok, "oldest entry should have been evicted")
251+
}
252+
219253
func TestNew_WithNATSReturnsNATSBackend(t *testing.T) {
220254
nc := startEmbeddedNATS(t)
221255
c, err := New[string](

pkg/cache/policyevalbundle/policyevalbundle.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
const (
2929
ttl = 24 * time.Hour
30+
maxBytes = 100 * 1024 * 1024 // 100 MB
3031
bucket = "chainloop-policy-eval-bundles"
3132
description = "Cache for policy evaluation bundles from CAS"
3233
)
@@ -40,6 +41,7 @@ type Cache struct {
4041
func New(ctx context.Context, rc *natsconn.ReloadableConnection, logger log.Logger) (*Cache, error) {
4142
opts := []cache.Option{
4243
cache.WithTTL(ttl),
44+
cache.WithMaxBytes(maxBytes),
4345
cache.WithDescription(description),
4446
}
4547

0 commit comments

Comments
 (0)