Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions cmd/ateapi/internal/controlapi/capacity_pressure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controlapi

import "sync"

// poolKey identifies a worker pool by namespace and name.
type poolKey struct {
namespace string
name string
}

// CapacityPressureHub fans out capacity-pressure notifications — a pool had no
// free worker for a resume — to any number of subscribers (the WatchCapacity-
// Pressure RPC handlers). Publish is called on the resume hot path and must
// never block: a subscriber whose buffer is full simply misses the event, and
// the autoscaler's periodic reconcile is the backstop.
type CapacityPressureHub struct {
mu sync.Mutex
nextID int
subs map[int]chan poolKey
}

// NewCapacityPressureHub returns an empty hub.
func NewCapacityPressureHub() *CapacityPressureHub {
return &CapacityPressureHub{subs: make(map[int]chan poolKey)}
}

// Subscribe registers a subscriber and returns its event channel plus a cancel
// func that unregisters and closes the channel. cancel is idempotent. The
// channel is buffered so short bursts aren't dropped, and lossy beyond that by
// design.
func (h *CapacityPressureHub) Subscribe() (<-chan poolKey, func()) {
ch := make(chan poolKey, 64)

h.mu.Lock()
id := h.nextID
h.nextID++
h.subs[id] = ch
h.mu.Unlock()

var once sync.Once
cancel := func() {
once.Do(func() {
h.mu.Lock()
defer h.mu.Unlock()
delete(h.subs, id)
close(ch)
})
}
return ch, cancel
}

// Publish notifies every subscriber that the named pool had no free worker. It
// never blocks: a full subscriber buffer drops the event.
func (h *CapacityPressureHub) Publish(namespace, name string) {
key := poolKey{namespace: namespace, name: name}

h.mu.Lock()
defer h.mu.Unlock()
for _, ch := range h.subs {
select {
case ch <- key:
default:
}
}
}
75 changes: 75 additions & 0 deletions cmd/ateapi/internal/controlapi/capacity_pressure_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controlapi

import (
"testing"
"time"
)

func TestCapacityPressureHubFanOut(t *testing.T) {
h := NewCapacityPressureHub()
a, cancelA := h.Subscribe()
defer cancelA()
b, cancelB := h.Subscribe()
defer cancelB()

h.Publish("ns", "pool")

for i, ch := range []<-chan poolKey{a, b} {
select {
case got := <-ch:
if got.namespace != "ns" || got.name != "pool" {
t.Fatalf("subscriber %d: got %+v, want {ns pool}", i, got)
}
case <-time.After(time.Second):
t.Fatalf("subscriber %d: no event delivered", i)
}
}
}

func TestCapacityPressureHubUnsubscribe(t *testing.T) {
h := NewCapacityPressureHub()
ch, cancel := h.Subscribe()

cancel()
// Publishing after unsubscribe must not panic, and the channel is closed.
h.Publish("ns", "pool")
if _, ok := <-ch; ok {
t.Fatal("channel should be closed after cancel")
}
// cancel is idempotent.
cancel()
}

func TestCapacityPressureHubPublishNeverBlocks(t *testing.T) {
h := NewCapacityPressureHub()
_, cancel := h.Subscribe() // a subscriber that never drains
defer cancel()

done := make(chan struct{})
go func() {
for i := 0; i < 10_000; i++ {
h.Publish("ns", "pool") // must drop, not block, once the buffer fills
}
close(done)
}()

select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("Publish blocked on a full subscriber buffer")
}
}
7 changes: 6 additions & 1 deletion cmd/ateapi/internal/controlapi/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@ type Service struct {
dialer *AteletDialer
actorTemplateLister listersv1alpha1.ActorTemplateLister
actorWorkflow *ActorWorkflow
pressure *CapacityPressureHub
}

var _ ateapipb.ControlServer = (*Service)(nil)

// NewService creates a service.
func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer, kubeClient kubernetes.Interface) *Service {
// The hub is shared: the resume workflow publishes capacity-pressure events
// to it, and the WatchCapacityPressure RPC streams them to subscribers.
pressure := NewCapacityPressureHub()
s := &Service{
persistence: persistence,
actorTemplateLister: actorTemplateLister,
dialer: dialer,
actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient),
actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient, pressure),
pressure: pressure,
}

return s
Expand Down
43 changes: 43 additions & 0 deletions cmd/ateapi/internal/controlapi/watch_capacity_pressure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controlapi

import (
"google.golang.org/grpc"

"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
)

// WatchCapacityPressure streams a CapacityPressureEvent every time a pool has
// no free worker for a resume, until the client disconnects.
func (s *Service) WatchCapacityPressure(_ *ateapipb.WatchCapacityPressureRequest, stream grpc.ServerStreamingServer[ateapipb.CapacityPressureEvent]) error {
events, cancel := s.pressure.Subscribe()
defer cancel()

ctx := stream.Context()
for {
select {
case <-ctx.Done():
return nil
case key := <-events:
if err := stream.Send(&ateapipb.CapacityPressureEvent{
WorkerNamespace: key.namespace,
WorkerPool: key.name,
}); err != nil {
return err
}
}
}
}
6 changes: 4 additions & 2 deletions cmd/ateapi/internal/controlapi/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,18 @@ type ActorWorkflow struct {
actorTemplateLister listersv1alpha1.ActorTemplateLister
kubeClient kubernetes.Interface
secretCache *envSecretCache
pressure *CapacityPressureHub
}

// NewActorWorkflow creates a new ActorWorkflow.
func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface) *ActorWorkflow {
func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface, pressure *CapacityPressureHub) *ActorWorkflow {
return &ActorWorkflow{
store: store,
dialer: dialer,
actorTemplateLister: actorTemplateLister,
kubeClient: kubeClient,
secretCache: newEnvSecretCache(envSecretCacheTTL),
pressure: pressure,
}
}

Expand All @@ -150,7 +152,7 @@ func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) (

steps := []WorkflowStep[*ResumeInput, *ResumeState]{
&LoadActorForResumeStep{store: w.store, actorTemplateLister: w.actorTemplateLister},
&AssignWorkerStep{store: w.store},
&AssignWorkerStep{store: w.store, pressure: w.pressure},
&CallAteletRestoreStep{dialer: w.dialer, kubeClient: w.kubeClient, secretCache: w.secretCache},
&FinalizeRunningStep{store: w.store},
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/ateapi/internal/controlapi/workflow_resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func (s *LoadActorForResumeStep) Execute(ctx context.Context, input *ResumeInput
func (s *LoadActorForResumeStep) RetryBackoff() *wait.Backoff { return nil }

type AssignWorkerStep struct {
store store.Interface
store store.Interface
pressure *CapacityPressureHub
}

func (s *AssignWorkerStep) Name() string { return "AssignWorker" }
Expand Down Expand Up @@ -105,6 +106,11 @@ func (s *AssignWorkerStep) Execute(ctx context.Context, input *ResumeInput, stat
if assignedWorker == nil {
pickedWorker := s.findFreeWorker(workers, state.ActorTemplate.Spec.WorkerPoolRef.Namespace, state.ActorTemplate.Spec.WorkerPoolRef.Name)
if pickedWorker == nil {
// Signal capacity pressure for this pool so the autoscaler can react
// at the request edge instead of waiting for its next poll.
if s.pressure != nil {
s.pressure.Publish(state.ActorTemplate.Spec.WorkerPoolRef.Namespace, state.ActorTemplate.Spec.WorkerPoolRef.Name)
}
return status.Errorf(codes.FailedPrecondition, "no free workers available")
}

Expand Down
16 changes: 16 additions & 0 deletions cmd/atecontroller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package main
import (
"crypto/tls"
"os"
"time"

"github.com/agent-substrate/substrate/internal/autoscaler"
"github.com/agent-substrate/substrate/internal/controllers"
clientv1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
Expand Down Expand Up @@ -88,6 +90,20 @@ func main() {
os.Exit(1)
}

if err = (&controllers.WorkerPoolAutoscaler{
Client: mgr.GetClient(),
AteClient: ateapiClient,
Config: autoscaler.Config{

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe should be configurable per worker pool?

// TODO: surface these as flags once we tune them in a cluster.
ScaleDownStabilization: 60 * time.Second,
MaxScaleUpStep: 0, // unlimited
},
Interval: 10 * time.Second,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "WorkerPoolAutoscaler")
os.Exit(1)
}

//+kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
Expand Down
Loading