Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/ateapi/internal/controlapi/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func setupTest(t *testing.T, ns string) *testContext {

// 4. Initialize Service
dialer := NewAteletDialer(workerInformer.GetIndexer(), ateletInformer.GetIndexer())
service := NewService(persistence, actorTemplateLister, dialer, k8sClient)
service := NewService(persistence, actorTemplateLister, dialer, k8sClient, 30*time.Second)

// 5. Start REAL gRPC Server for ATE API
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(ateinterceptors.ServerUnaryInterceptor))
Expand Down
9 changes: 6 additions & 3 deletions cmd/ateapi/internal/controlapi/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package controlapi

import (
"time"

"github.com/agent-substrate/substrate/cmd/ateapi/internal/store"
listersv1alpha1 "github.com/agent-substrate/substrate/pkg/client/listers/api/v1alpha1"
"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
Expand All @@ -32,13 +34,14 @@ type Service struct {

var _ ateapipb.ControlServer = (*Service)(nil)

// NewService creates a service.
func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer, kubeClient kubernetes.Interface) *Service {
// NewService creates a service. actorWorkflowDeadline bounds how long a single
// Resume/Suspend workflow can run end-to-end.
func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer, kubeClient kubernetes.Interface, actorWorkflowDeadline time.Duration) *Service {
s := &Service{
persistence: persistence,
actorTemplateLister: actorTemplateLister,
dialer: dialer,
actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient),
actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient, actorWorkflowDeadline),
}

return s
Expand Down
97 changes: 78 additions & 19 deletions cmd/ateapi/internal/controlapi/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"time"

"github.com/agent-substrate/substrate/cmd/ateapi/internal/store"
Expand All @@ -32,6 +33,18 @@ import (
"k8s.io/client-go/kubernetes"
)

// actorLockTTL is the Redis TTL on the per-actor workflow lock. It bounds how
// long a peer must wait to retry an actor after this process crashes mid-workflow.
const actorLockTTL = 30 * time.Second

// actorLockHeartbeatInterval is how often the heartbeat refreshes the lock.
// Chosen so we get ~3 attempts before the TTL would otherwise lapse.
const actorLockHeartbeatInterval = actorLockTTL / 3

// errLostActorLock is the context cause set when the heartbeat can no longer
// keep the actor lock alive (peer stole it, or Redis returned an error).
var errLostActorLock = errors.New("lost actor lock during workflow")

// WorkflowStep represents a single, idempotent operation in a workflow graph.
// Params is the immutable parameters used to start the workflow.
// Context is the mutable context fetched or modified during execution.
Expand Down Expand Up @@ -119,16 +132,22 @@ type ActorWorkflow struct {
actorTemplateLister listersv1alpha1.ActorTemplateLister
kubeClient kubernetes.Interface
secretCache *envSecretCache
// workflowDeadline is the maximum duration of a single Resume/Suspend
// workflow. The lock is kept alive across this duration by a heartbeat,
// independent of actorLockTTL.
workflowDeadline time.Duration
}

// NewActorWorkflow creates a new ActorWorkflow.
func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface) *ActorWorkflow {
// NewActorWorkflow creates a new ActorWorkflow. workflowDeadline bounds how
// long a single Resume/Suspend can run end-to-end.
func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface, workflowDeadline time.Duration) *ActorWorkflow {
return &ActorWorkflow{
store: store,
dialer: dialer,
actorTemplateLister: actorTemplateLister,
kubeClient: kubeClient,
secretCache: newEnvSecretCache(envSecretCacheTTL),
workflowDeadline: workflowDeadline,
}
}

Expand All @@ -140,9 +159,7 @@ func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) (
}
state := &ResumeState{}

// Acquire lock and get the timeout context for the workflow
// Lock TTL is 7 seconds, with 2 seconds padding for workflow timeout
ctx, releaseLock, err := w.acquireActorLock(ctx, id, 30*time.Second, 2*time.Second)
ctx, releaseLock, err := w.acquireActorLock(ctx, id, actorLockTTL, actorLockHeartbeatInterval)
if err != nil {
return nil, err
}
Expand All @@ -169,9 +186,7 @@ func (w *ActorWorkflow) SuspendActor(ctx context.Context, id string) (*ateapipb.
}
state := &SuspendState{}

// Acquire lock and get the timeout context for the workflow
// Lock TTL is 7 seconds, with 2 seconds padding for workflow timeout
ctx, releaseLock, err := w.acquireActorLock(ctx, id, 30*time.Second, 2*time.Second)
ctx, releaseLock, err := w.acquireActorLock(ctx, id, actorLockTTL, actorLockHeartbeatInterval)
if err != nil {
return nil, err
}
Expand All @@ -191,27 +206,71 @@ func (w *ActorWorkflow) SuspendActor(ctx context.Context, id string) (*ateapipb.
return state.Actor, nil
}

func (w *ActorWorkflow) acquireActorLock(ctx context.Context, id string, ttl time.Duration, padding time.Duration) (context.Context, func(), error) {
// acquireActorLock takes the per-actor workflow lock and returns a workflow
// context bounded by w.workflowDeadline. A background heartbeat keeps the lock
// alive — independent of lockTTL — for as long as the workflow runs. If the
// heartbeat fails (Redis error or another peer stole the lock) the returned
// context is cancelled with errLostActorLock as the cause, and in-flight steps
// will see ctx.Err() and unwind. The returned release function stops the
// heartbeat, waits for it to exit, then best-effort releases the lock.
func (w *ActorWorkflow) acquireActorLock(ctx context.Context, id string, lockTTL, heartbeatInterval time.Duration) (context.Context, func(), error) {
lockKey := "lock:actor:" + id
lockValue := uuid.New().String()

// Create a child context for the workflow that expires BEFORE the lock
workflowTimeout := ttl - padding
workflowCtx, cancel := context.WithTimeout(ctx, workflowTimeout)

acquired, err := w.store.AcquireLock(workflowCtx, lockKey, lockValue, ttl)
acquired, err := w.store.AcquireLock(ctx, lockKey, lockValue, lockTTL)
if err != nil {
cancel()
return nil, nil, fmt.Errorf("while acquiring lock: %w", err)
}
if !acquired {
cancel()
return nil, nil, status.Error(grpcCodes.Aborted, "another operation is in progress for this actor")
}

return workflowCtx, func() {
cancel()
cancellableCtx, cancelCause := context.WithCancelCause(ctx)
workflowCtx, cancelDeadline := context.WithTimeout(cancellableCtx, w.workflowDeadline)

heartbeatDone := make(chan struct{})
go w.runLockHeartbeat(workflowCtx, lockKey, lockValue, id, lockTTL, heartbeatInterval, cancelCause, heartbeatDone)

release := func() {
cancelDeadline()
cancelCause(context.Canceled)
<-heartbeatDone
// Use context.Background() to ensure the lock is released even if the workflow context was canceled.
w.store.ReleaseLock(context.Background(), lockKey, lockValue) //nolint:errcheck // best-effort release; the lock TTL is the safety net.
}, nil
}
return workflowCtx, release, nil
}

// runLockHeartbeat refreshes the actor lock on a ticker until ctx is done. If
// a refresh fails or returns false (we no longer own the lock), it cancels the
// workflow context with errLostActorLock so workflow steps tear down promptly.
func (w *ActorWorkflow) runLockHeartbeat(ctx context.Context, lockKey, lockValue, actorID string, lockTTL, heartbeatInterval time.Duration, cancelCause context.CancelCauseFunc, done chan<- struct{}) {
defer close(done)
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
ok, err := w.store.RefreshLock(ctx, lockKey, lockValue, lockTTL)
if err != nil {
// If ctx was cancelled out from under us we're already tearing
// down — no need to set a misleading cause.
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
slog.WarnContext(ctx, "Lock heartbeat failed; cancelling workflow",
slog.String("actor_id", actorID),
slog.String("err", err.Error()))
cancelCause(fmt.Errorf("%w: %w", errLostActorLock, err))
}
return
}
if !ok {
slog.WarnContext(ctx, "Actor lock no longer owned; cancelling workflow",
slog.String("actor_id", actorID))
cancelCause(errLostActorLock)
return
}
}
}
}
124 changes: 124 additions & 0 deletions cmd/ateapi/internal/controlapi/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controlapi

import (
"context"
"errors"
"testing"
"time"

"github.com/agent-substrate/substrate/cmd/ateapi/internal/store/ateredis"
"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
)

func newLockTestWorkflow(t *testing.T) (*miniredis.Miniredis, *ActorWorkflow) {
t.Helper()
mr, err := miniredis.Run()
if err != nil {
t.Fatalf("miniredis.Run: %v", err)
}
t.Cleanup(mr.Close)
rdb := redis.NewClusterClient(&redis.ClusterOptions{Addrs: []string{mr.Addr()}})
return mr, &ActorWorkflow{
store: ateredis.NewPersistence(rdb),
workflowDeadline: 30 * time.Second,
}
}

func TestAcquireActorLock_HeartbeatKeepsLockAlivePastTTL(t *testing.T) {
mr, w := newLockTestWorkflow(t)

lockTTL := 150 * time.Millisecond
heartbeat := 40 * time.Millisecond

ctx, release, err := w.acquireActorLock(context.Background(), "actor-1", lockTTL, heartbeat)
if err != nil {
t.Fatalf("acquireActorLock: %v", err)
}
defer release()

// Wait through multiple TTLs. If the heartbeat is working the lock key
// must still be in Redis — its TTL is being PEXPIRE'd back to `lockTTL`
// every `heartbeat`.
time.Sleep(4 * lockTTL)

if !mr.Exists("lock:actor:actor-1") {
t.Fatalf("lock key disappeared from Redis despite heartbeat; ctx err=%v cause=%v", ctx.Err(), context.Cause(ctx))
}
if ctx.Err() != nil {
t.Fatalf("workflow ctx cancelled while heartbeat was healthy: err=%v cause=%v", ctx.Err(), context.Cause(ctx))
}
}

func TestAcquireActorLock_LostLockCancelsWorkflow(t *testing.T) {
mr, w := newLockTestWorkflow(t)

lockTTL := 200 * time.Millisecond
heartbeat := 30 * time.Millisecond

ctx, release, err := w.acquireActorLock(context.Background(), "actor-2", lockTTL, heartbeat)
if err != nil {
t.Fatalf("acquireActorLock: %v", err)
}
defer release()

// Simulate a peer stealing the lock (or the TTL lapsing and someone else
// re-acquiring): wipe our key so the next heartbeat refresh's CAS fails.
mr.Del("lock:actor:actor-2")

select {
case <-ctx.Done():
case <-time.After(2 * time.Second):
t.Fatalf("workflow ctx was not cancelled after lock was lost")
}

if cause := context.Cause(ctx); !errors.Is(cause, errLostActorLock) {
t.Errorf("context.Cause = %v, want errLostActorLock", cause)
}
}

func TestAcquireActorLock_ReleaseRemovesLock(t *testing.T) {
mr, w := newLockTestWorkflow(t)

_, release, err := w.acquireActorLock(context.Background(), "actor-3", 200*time.Millisecond, 60*time.Millisecond)
if err != nil {
t.Fatalf("acquireActorLock: %v", err)
}

if !mr.Exists("lock:actor:actor-3") {
t.Fatalf("lock key not in Redis after acquire")
}
release()
if mr.Exists("lock:actor:actor-3") {
t.Errorf("lock key still in Redis after release")
}
}

func TestAcquireActorLock_ConflictReturnsAborted(t *testing.T) {
_, w := newLockTestWorkflow(t)

_, release, err := w.acquireActorLock(context.Background(), "actor-4", 5*time.Second, 1*time.Second)
if err != nil {
t.Fatalf("first acquireActorLock: %v", err)
}
defer release()

_, _, err = w.acquireActorLock(context.Background(), "actor-4", 5*time.Second, 1*time.Second)
if err == nil {
t.Fatalf("expected second acquireActorLock to fail")
}
}
20 changes: 20 additions & 0 deletions cmd/ateapi/internal/store/ateredis/ateredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,3 +589,23 @@ func (s *Persistence) ReleaseLock(ctx context.Context, key string, value string)
}
return nil
}

func (s *Persistence) RefreshLock(ctx context.Context, key string, value string, ttl time.Duration) (bool, error) {
var luaRefresh = redis.NewScript(`
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("pexpire", KEYS[1], ARGV[2])
else
return 0
end
`)

res, err := luaRefresh.Run(ctx, s.rdb, []string{key}, value, ttl.Milliseconds()).Result()
if err != nil {
return false, fmt.Errorf("while refreshing lock for %q with value %q: %w", key, value, err)
}
n, ok := res.(int64)
if !ok {
return false, fmt.Errorf("while refreshing lock for %q: unexpected result type %T", key, res)
}
return n == 1, nil
}
Loading