From 45485ff6c5eb1ec5ad6d015547653ffed3dc2dc1 Mon Sep 17 00:00:00 2001 From: Jonathan Jamroga Date: Fri, 12 Jun 2026 13:04:44 -0400 Subject: [PATCH] Decouple actor lock TTL from workflow deadline via heartbeat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ActorWorkflow.ResumeActor and SuspendActor used to derive their workflow ctx from the Redis lock TTL via acquireActorLock(ctx, id, 30s, 2s) — the workflow deadline and the lock TTL were a single 28s knob. That meant image pulls / restores that legitimately need more than 28s death-looped forever, while raising the knob also raised how long peers wait to retry an actor after a crashed ateapi replica. Split the two concerns: - Lock TTL stays short (30s constant, internal). Bounds peer failover. - Workflow deadline is a separate operator-configurable knob via the new --actor-workflow-deadline pflag (default 5m). Bounds a single Resume/Suspend. - A heartbeat goroutine refreshes the lock every lockTTL/3 (~10s) for the full workflow duration. On RefreshLock=false or any Redis error (peer stole the lock, Redis blip), the workflow ctx is cancelled with errLostActorLock as the cause so in-flight steps unwind cleanly and the mutual-exclusion invariant is preserved. - The release function stops the heartbeat (waits for goroutine exit) before best-effort ReleaseLock. Adds store.Interface.RefreshLock with a Redis CAS Lua script mirroring the existing ReleaseLock script. --- .../internal/controlapi/functional_test.go | 2 +- cmd/ateapi/internal/controlapi/service.go | 9 +- cmd/ateapi/internal/controlapi/workflow.go | 97 +++++++++++--- .../internal/controlapi/workflow_test.go | 124 ++++++++++++++++++ .../internal/store/ateredis/ateredis.go | 20 +++ .../internal/store/ateredis/ateredis_test.go | 91 +++++++++++++ cmd/ateapi/internal/store/store.go | 6 + cmd/ateapi/main.go | 5 +- 8 files changed, 330 insertions(+), 24 deletions(-) create mode 100644 cmd/ateapi/internal/controlapi/workflow_test.go diff --git a/cmd/ateapi/internal/controlapi/functional_test.go b/cmd/ateapi/internal/controlapi/functional_test.go index 1c49a26c7..7554715e2 100644 --- a/cmd/ateapi/internal/controlapi/functional_test.go +++ b/cmd/ateapi/internal/controlapi/functional_test.go @@ -282,7 +282,7 @@ func setupTest(t *testing.T, ns string) *testContext { // 4. Initialize Service dialer := NewAteletDialer(workerInformer.GetIndexer(), ateletInformer.GetIndexer()) - service := NewService(persistence, actorTemplateLister, dialer, k8sClient) + service := NewService(persistence, actorTemplateLister, dialer, k8sClient, 30*time.Second) // 5. Start REAL gRPC Server for ATE API grpcServer := grpc.NewServer(grpc.UnaryInterceptor(ateinterceptors.ServerUnaryInterceptor)) diff --git a/cmd/ateapi/internal/controlapi/service.go b/cmd/ateapi/internal/controlapi/service.go index 39841729e..7bc7f0e90 100644 --- a/cmd/ateapi/internal/controlapi/service.go +++ b/cmd/ateapi/internal/controlapi/service.go @@ -15,6 +15,8 @@ package controlapi import ( + "time" + "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" listersv1alpha1 "github.com/agent-substrate/substrate/pkg/client/listers/api/v1alpha1" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" @@ -32,13 +34,14 @@ type Service struct { var _ ateapipb.ControlServer = (*Service)(nil) -// NewService creates a service. -func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer, kubeClient kubernetes.Interface) *Service { +// NewService creates a service. actorWorkflowDeadline bounds how long a single +// Resume/Suspend workflow can run end-to-end. +func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer, kubeClient kubernetes.Interface, actorWorkflowDeadline time.Duration) *Service { s := &Service{ persistence: persistence, actorTemplateLister: actorTemplateLister, dialer: dialer, - actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient), + actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient, actorWorkflowDeadline), } return s diff --git a/cmd/ateapi/internal/controlapi/workflow.go b/cmd/ateapi/internal/controlapi/workflow.go index 37abb258a..066f6bd91 100644 --- a/cmd/ateapi/internal/controlapi/workflow.go +++ b/cmd/ateapi/internal/controlapi/workflow.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "log/slog" "time" "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" @@ -32,6 +33,18 @@ import ( "k8s.io/client-go/kubernetes" ) +// actorLockTTL is the Redis TTL on the per-actor workflow lock. It bounds how +// long a peer must wait to retry an actor after this process crashes mid-workflow. +const actorLockTTL = 30 * time.Second + +// actorLockHeartbeatInterval is how often the heartbeat refreshes the lock. +// Chosen so we get ~3 attempts before the TTL would otherwise lapse. +const actorLockHeartbeatInterval = actorLockTTL / 3 + +// errLostActorLock is the context cause set when the heartbeat can no longer +// keep the actor lock alive (peer stole it, or Redis returned an error). +var errLostActorLock = errors.New("lost actor lock during workflow") + // WorkflowStep represents a single, idempotent operation in a workflow graph. // Params is the immutable parameters used to start the workflow. // Context is the mutable context fetched or modified during execution. @@ -119,16 +132,22 @@ type ActorWorkflow struct { actorTemplateLister listersv1alpha1.ActorTemplateLister kubeClient kubernetes.Interface secretCache *envSecretCache + // workflowDeadline is the maximum duration of a single Resume/Suspend + // workflow. The lock is kept alive across this duration by a heartbeat, + // independent of actorLockTTL. + workflowDeadline time.Duration } -// NewActorWorkflow creates a new ActorWorkflow. -func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface) *ActorWorkflow { +// NewActorWorkflow creates a new ActorWorkflow. workflowDeadline bounds how +// long a single Resume/Suspend can run end-to-end. +func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface, workflowDeadline time.Duration) *ActorWorkflow { return &ActorWorkflow{ store: store, dialer: dialer, actorTemplateLister: actorTemplateLister, kubeClient: kubeClient, secretCache: newEnvSecretCache(envSecretCacheTTL), + workflowDeadline: workflowDeadline, } } @@ -140,9 +159,7 @@ func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) ( } state := &ResumeState{} - // Acquire lock and get the timeout context for the workflow - // Lock TTL is 7 seconds, with 2 seconds padding for workflow timeout - ctx, releaseLock, err := w.acquireActorLock(ctx, id, 30*time.Second, 2*time.Second) + ctx, releaseLock, err := w.acquireActorLock(ctx, id, actorLockTTL, actorLockHeartbeatInterval) if err != nil { return nil, err } @@ -169,9 +186,7 @@ func (w *ActorWorkflow) SuspendActor(ctx context.Context, id string) (*ateapipb. } state := &SuspendState{} - // Acquire lock and get the timeout context for the workflow - // Lock TTL is 7 seconds, with 2 seconds padding for workflow timeout - ctx, releaseLock, err := w.acquireActorLock(ctx, id, 30*time.Second, 2*time.Second) + ctx, releaseLock, err := w.acquireActorLock(ctx, id, actorLockTTL, actorLockHeartbeatInterval) if err != nil { return nil, err } @@ -191,27 +206,71 @@ func (w *ActorWorkflow) SuspendActor(ctx context.Context, id string) (*ateapipb. return state.Actor, nil } -func (w *ActorWorkflow) acquireActorLock(ctx context.Context, id string, ttl time.Duration, padding time.Duration) (context.Context, func(), error) { +// acquireActorLock takes the per-actor workflow lock and returns a workflow +// context bounded by w.workflowDeadline. A background heartbeat keeps the lock +// alive — independent of lockTTL — for as long as the workflow runs. If the +// heartbeat fails (Redis error or another peer stole the lock) the returned +// context is cancelled with errLostActorLock as the cause, and in-flight steps +// will see ctx.Err() and unwind. The returned release function stops the +// heartbeat, waits for it to exit, then best-effort releases the lock. +func (w *ActorWorkflow) acquireActorLock(ctx context.Context, id string, lockTTL, heartbeatInterval time.Duration) (context.Context, func(), error) { lockKey := "lock:actor:" + id lockValue := uuid.New().String() - // Create a child context for the workflow that expires BEFORE the lock - workflowTimeout := ttl - padding - workflowCtx, cancel := context.WithTimeout(ctx, workflowTimeout) - - acquired, err := w.store.AcquireLock(workflowCtx, lockKey, lockValue, ttl) + acquired, err := w.store.AcquireLock(ctx, lockKey, lockValue, lockTTL) if err != nil { - cancel() return nil, nil, fmt.Errorf("while acquiring lock: %w", err) } if !acquired { - cancel() return nil, nil, status.Error(grpcCodes.Aborted, "another operation is in progress for this actor") } - return workflowCtx, func() { - cancel() + cancellableCtx, cancelCause := context.WithCancelCause(ctx) + workflowCtx, cancelDeadline := context.WithTimeout(cancellableCtx, w.workflowDeadline) + + heartbeatDone := make(chan struct{}) + go w.runLockHeartbeat(workflowCtx, lockKey, lockValue, id, lockTTL, heartbeatInterval, cancelCause, heartbeatDone) + + release := func() { + cancelDeadline() + cancelCause(context.Canceled) + <-heartbeatDone // Use context.Background() to ensure the lock is released even if the workflow context was canceled. w.store.ReleaseLock(context.Background(), lockKey, lockValue) //nolint:errcheck // best-effort release; the lock TTL is the safety net. - }, nil + } + return workflowCtx, release, nil +} + +// runLockHeartbeat refreshes the actor lock on a ticker until ctx is done. If +// a refresh fails or returns false (we no longer own the lock), it cancels the +// workflow context with errLostActorLock so workflow steps tear down promptly. +func (w *ActorWorkflow) runLockHeartbeat(ctx context.Context, lockKey, lockValue, actorID string, lockTTL, heartbeatInterval time.Duration, cancelCause context.CancelCauseFunc, done chan<- struct{}) { + defer close(done) + ticker := time.NewTicker(heartbeatInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ok, err := w.store.RefreshLock(ctx, lockKey, lockValue, lockTTL) + if err != nil { + // If ctx was cancelled out from under us we're already tearing + // down — no need to set a misleading cause. + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + slog.WarnContext(ctx, "Lock heartbeat failed; cancelling workflow", + slog.String("actor_id", actorID), + slog.String("err", err.Error())) + cancelCause(fmt.Errorf("%w: %w", errLostActorLock, err)) + } + return + } + if !ok { + slog.WarnContext(ctx, "Actor lock no longer owned; cancelling workflow", + slog.String("actor_id", actorID)) + cancelCause(errLostActorLock) + return + } + } + } } diff --git a/cmd/ateapi/internal/controlapi/workflow_test.go b/cmd/ateapi/internal/controlapi/workflow_test.go new file mode 100644 index 000000000..57d99e711 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/workflow_test.go @@ -0,0 +1,124 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/agent-substrate/substrate/cmd/ateapi/internal/store/ateredis" + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" +) + +func newLockTestWorkflow(t *testing.T) (*miniredis.Miniredis, *ActorWorkflow) { + t.Helper() + mr, err := miniredis.Run() + if err != nil { + t.Fatalf("miniredis.Run: %v", err) + } + t.Cleanup(mr.Close) + rdb := redis.NewClusterClient(&redis.ClusterOptions{Addrs: []string{mr.Addr()}}) + return mr, &ActorWorkflow{ + store: ateredis.NewPersistence(rdb), + workflowDeadline: 30 * time.Second, + } +} + +func TestAcquireActorLock_HeartbeatKeepsLockAlivePastTTL(t *testing.T) { + mr, w := newLockTestWorkflow(t) + + lockTTL := 150 * time.Millisecond + heartbeat := 40 * time.Millisecond + + ctx, release, err := w.acquireActorLock(context.Background(), "actor-1", lockTTL, heartbeat) + if err != nil { + t.Fatalf("acquireActorLock: %v", err) + } + defer release() + + // Wait through multiple TTLs. If the heartbeat is working the lock key + // must still be in Redis — its TTL is being PEXPIRE'd back to `lockTTL` + // every `heartbeat`. + time.Sleep(4 * lockTTL) + + if !mr.Exists("lock:actor:actor-1") { + t.Fatalf("lock key disappeared from Redis despite heartbeat; ctx err=%v cause=%v", ctx.Err(), context.Cause(ctx)) + } + if ctx.Err() != nil { + t.Fatalf("workflow ctx cancelled while heartbeat was healthy: err=%v cause=%v", ctx.Err(), context.Cause(ctx)) + } +} + +func TestAcquireActorLock_LostLockCancelsWorkflow(t *testing.T) { + mr, w := newLockTestWorkflow(t) + + lockTTL := 200 * time.Millisecond + heartbeat := 30 * time.Millisecond + + ctx, release, err := w.acquireActorLock(context.Background(), "actor-2", lockTTL, heartbeat) + if err != nil { + t.Fatalf("acquireActorLock: %v", err) + } + defer release() + + // Simulate a peer stealing the lock (or the TTL lapsing and someone else + // re-acquiring): wipe our key so the next heartbeat refresh's CAS fails. + mr.Del("lock:actor:actor-2") + + select { + case <-ctx.Done(): + case <-time.After(2 * time.Second): + t.Fatalf("workflow ctx was not cancelled after lock was lost") + } + + if cause := context.Cause(ctx); !errors.Is(cause, errLostActorLock) { + t.Errorf("context.Cause = %v, want errLostActorLock", cause) + } +} + +func TestAcquireActorLock_ReleaseRemovesLock(t *testing.T) { + mr, w := newLockTestWorkflow(t) + + _, release, err := w.acquireActorLock(context.Background(), "actor-3", 200*time.Millisecond, 60*time.Millisecond) + if err != nil { + t.Fatalf("acquireActorLock: %v", err) + } + + if !mr.Exists("lock:actor:actor-3") { + t.Fatalf("lock key not in Redis after acquire") + } + release() + if mr.Exists("lock:actor:actor-3") { + t.Errorf("lock key still in Redis after release") + } +} + +func TestAcquireActorLock_ConflictReturnsAborted(t *testing.T) { + _, w := newLockTestWorkflow(t) + + _, release, err := w.acquireActorLock(context.Background(), "actor-4", 5*time.Second, 1*time.Second) + if err != nil { + t.Fatalf("first acquireActorLock: %v", err) + } + defer release() + + _, _, err = w.acquireActorLock(context.Background(), "actor-4", 5*time.Second, 1*time.Second) + if err == nil { + t.Fatalf("expected second acquireActorLock to fail") + } +} diff --git a/cmd/ateapi/internal/store/ateredis/ateredis.go b/cmd/ateapi/internal/store/ateredis/ateredis.go index 206395c97..e4420930e 100644 --- a/cmd/ateapi/internal/store/ateredis/ateredis.go +++ b/cmd/ateapi/internal/store/ateredis/ateredis.go @@ -589,3 +589,23 @@ func (s *Persistence) ReleaseLock(ctx context.Context, key string, value string) } return nil } + +func (s *Persistence) RefreshLock(ctx context.Context, key string, value string, ttl time.Duration) (bool, error) { + var luaRefresh = redis.NewScript(` + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("pexpire", KEYS[1], ARGV[2]) + else + return 0 + end + `) + + res, err := luaRefresh.Run(ctx, s.rdb, []string{key}, value, ttl.Milliseconds()).Result() + if err != nil { + return false, fmt.Errorf("while refreshing lock for %q with value %q: %w", key, value, err) + } + n, ok := res.(int64) + if !ok { + return false, fmt.Errorf("while refreshing lock for %q: unexpected result type %T", key, res) + } + return n == 1, nil +} diff --git a/cmd/ateapi/internal/store/ateredis/ateredis_test.go b/cmd/ateapi/internal/store/ateredis/ateredis_test.go index 61e058887..f56cc9d12 100644 --- a/cmd/ateapi/internal/store/ateredis/ateredis_test.go +++ b/cmd/ateapi/internal/store/ateredis/ateredis_test.go @@ -741,6 +741,97 @@ func TestAcquireLock_TTLExpiration(t *testing.T) { } } +func TestRefreshLock_ExtendsTTL(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + key := "test-lock" + value := "token-1" + otherValue := "token-2" + ttl := 10 * time.Second + + acquired, err := s.AcquireLock(ctx, key, value, ttl) + if err != nil { + t.Fatalf("AcquireLock failed: %v", err) + } + if !acquired { + t.Fatalf("expected lock to be acquired") + } + + // Advance to just before expiry, refresh, and verify another holder still + // can't acquire after the original TTL would have elapsed. + mr.FastForward(8 * time.Second) + refreshed, err := s.RefreshLock(ctx, key, value, ttl) + if err != nil { + t.Fatalf("RefreshLock failed: %v", err) + } + if !refreshed { + t.Fatalf("expected lock to be refreshed") + } + + mr.FastForward(5 * time.Second) + stolen, err := s.AcquireLock(ctx, key, otherValue, ttl) + if err != nil { + t.Fatalf("AcquireLock failed: %v", err) + } + if stolen { + t.Errorf("expected refreshed lock to still be held, but it was stolen") + } +} + +func TestRefreshLock_WrongValueReturnsFalse(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + key := "test-lock" + value := "token-1" + otherValue := "token-2" + ttl := 10 * time.Second + + acquired, err := s.AcquireLock(ctx, key, value, ttl) + if err != nil { + t.Fatalf("AcquireLock failed: %v", err) + } + if !acquired { + t.Fatalf("expected lock to be acquired") + } + + refreshed, err := s.RefreshLock(ctx, key, otherValue, ttl) + if err != nil { + t.Fatalf("RefreshLock failed: %v", err) + } + if refreshed { + t.Errorf("expected RefreshLock with wrong value to return false") + } +} + +func TestRefreshLock_AfterExpirationReturnsFalse(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + key := "test-lock" + value := "token-1" + ttl := 5 * time.Second + + acquired, err := s.AcquireLock(ctx, key, value, ttl) + if err != nil { + t.Fatalf("AcquireLock failed: %v", err) + } + if !acquired { + t.Fatalf("expected lock to be acquired") + } + + mr.FastForward(6 * time.Second) + + refreshed, err := s.RefreshLock(ctx, key, value, ttl) + if err != nil { + t.Fatalf("RefreshLock failed: %v", err) + } + if refreshed { + t.Errorf("expected RefreshLock after expiration to return false") + } +} + func TestAcquireLock_NonReentry(t *testing.T) { mr, s, ctx := setupTest(t) defer mr.Close() diff --git a/cmd/ateapi/internal/store/store.go b/cmd/ateapi/internal/store/store.go index 638e7f0ee..2d92bf405 100644 --- a/cmd/ateapi/internal/store/store.go +++ b/cmd/ateapi/internal/store/store.go @@ -81,6 +81,12 @@ type Interface interface { // Returns an error only on database failure. ReleaseLock(ctx context.Context, key string, value string) error + // RefreshLock extends the TTL on a distributed lock iff the stored value still matches. + // Returns true if the TTL was extended. + // Returns false if we no longer own the lock (either expired and reclaimed, or never held). + // Returns an error only on database failure. + RefreshLock(ctx context.Context, key string, value string, ttl time.Duration) (bool, error) + // DebugClearAll drop all data from the database. Useful for debugging / local testing/ DebugClearAll(ctx context.Context) error } diff --git a/cmd/ateapi/main.go b/cmd/ateapi/main.go index 87d5a7433..d96f2826d 100644 --- a/cmd/ateapi/main.go +++ b/cmd/ateapi/main.go @@ -64,6 +64,8 @@ var ( sessionIDCAPoolFile = pflag.String("session-id-ca-pool", "", "The file that contains the CA pool for signing session JWTs") workerpoolCACerts = pflag.String("workerpool-ca-certs", "", "The file that contains the CA for verifying workerpool client certificates.") + actorWorkflowDeadline = pflag.Duration("actor-workflow-deadline", 5*time.Minute, "Maximum wall-clock duration of a single Resume/Suspend workflow. A heartbeat keeps the per-actor lock alive across this duration; raise it for slow image registries.") + showVersion = pflag.Bool("version", false, "Print version and exit.") ) @@ -131,7 +133,7 @@ func main() { ateFactory.WaitForCacheSync(stopCh) dialer := controlapi.NewAteletDialer(workerPodInformer.GetIndexer(), ateletPodInformer.GetIndexer()) - sm := controlapi.NewService(redisPersistence, actorTemplateLister, dialer, clientset) + sm := controlapi.NewService(redisPersistence, actorTemplateLister, dialer, clientset, *actorWorkflowDeadline) sessionIdentitySrv := sessionidentity.New(*clientJWTIssuer, *clientJWTAudience, *sessionIDJWTPoolFile, *sessionIDCAPoolFile, *workerpoolCACerts) @@ -196,6 +198,7 @@ func logFlagValues(ctx context.Context) { slog.String("session-id-jwt-pool", *sessionIDJWTPoolFile), slog.String("session-id-ca-pool", *sessionIDCAPoolFile), slog.String("workerpool-ca-certs", *workerpoolCACerts), + slog.Duration("actor-workflow-deadline", *actorWorkflowDeadline), ) }