diff --git a/cmd/ateapi/internal/controlapi/capacity_pressure.go b/cmd/ateapi/internal/controlapi/capacity_pressure.go new file mode 100644 index 000000000..b6ebd773e --- /dev/null +++ b/cmd/ateapi/internal/controlapi/capacity_pressure.go @@ -0,0 +1,79 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import "sync" + +// poolKey identifies a worker pool by namespace and name. +type poolKey struct { + namespace string + name string +} + +// CapacityPressureHub fans out capacity-pressure notifications — a pool had no +// free worker for a resume — to any number of subscribers (the WatchCapacity- +// Pressure RPC handlers). Publish is called on the resume hot path and must +// never block: a subscriber whose buffer is full simply misses the event, and +// the autoscaler's periodic reconcile is the backstop. +type CapacityPressureHub struct { + mu sync.Mutex + nextID int + subs map[int]chan poolKey +} + +// NewCapacityPressureHub returns an empty hub. +func NewCapacityPressureHub() *CapacityPressureHub { + return &CapacityPressureHub{subs: make(map[int]chan poolKey)} +} + +// Subscribe registers a subscriber and returns its event channel plus a cancel +// func that unregisters and closes the channel. cancel is idempotent. The +// channel is buffered so short bursts aren't dropped, and lossy beyond that by +// design. +func (h *CapacityPressureHub) Subscribe() (<-chan poolKey, func()) { + ch := make(chan poolKey, 64) + + h.mu.Lock() + id := h.nextID + h.nextID++ + h.subs[id] = ch + h.mu.Unlock() + + var once sync.Once + cancel := func() { + once.Do(func() { + h.mu.Lock() + defer h.mu.Unlock() + delete(h.subs, id) + close(ch) + }) + } + return ch, cancel +} + +// Publish notifies every subscriber that the named pool had no free worker. It +// never blocks: a full subscriber buffer drops the event. +func (h *CapacityPressureHub) Publish(namespace, name string) { + key := poolKey{namespace: namespace, name: name} + + h.mu.Lock() + defer h.mu.Unlock() + for _, ch := range h.subs { + select { + case ch <- key: + default: + } + } +} diff --git a/cmd/ateapi/internal/controlapi/capacity_pressure_test.go b/cmd/ateapi/internal/controlapi/capacity_pressure_test.go new file mode 100644 index 000000000..46e210240 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/capacity_pressure_test.go @@ -0,0 +1,75 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "testing" + "time" +) + +func TestCapacityPressureHubFanOut(t *testing.T) { + h := NewCapacityPressureHub() + a, cancelA := h.Subscribe() + defer cancelA() + b, cancelB := h.Subscribe() + defer cancelB() + + h.Publish("ns", "pool") + + for i, ch := range []<-chan poolKey{a, b} { + select { + case got := <-ch: + if got.namespace != "ns" || got.name != "pool" { + t.Fatalf("subscriber %d: got %+v, want {ns pool}", i, got) + } + case <-time.After(time.Second): + t.Fatalf("subscriber %d: no event delivered", i) + } + } +} + +func TestCapacityPressureHubUnsubscribe(t *testing.T) { + h := NewCapacityPressureHub() + ch, cancel := h.Subscribe() + + cancel() + // Publishing after unsubscribe must not panic, and the channel is closed. + h.Publish("ns", "pool") + if _, ok := <-ch; ok { + t.Fatal("channel should be closed after cancel") + } + // cancel is idempotent. + cancel() +} + +func TestCapacityPressureHubPublishNeverBlocks(t *testing.T) { + h := NewCapacityPressureHub() + _, cancel := h.Subscribe() // a subscriber that never drains + defer cancel() + + done := make(chan struct{}) + go func() { + for i := 0; i < 10_000; i++ { + h.Publish("ns", "pool") // must drop, not block, once the buffer fills + } + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Publish blocked on a full subscriber buffer") + } +} diff --git a/cmd/ateapi/internal/controlapi/service.go b/cmd/ateapi/internal/controlapi/service.go index 39841729e..f60775495 100644 --- a/cmd/ateapi/internal/controlapi/service.go +++ b/cmd/ateapi/internal/controlapi/service.go @@ -28,17 +28,22 @@ type Service struct { dialer *AteletDialer actorTemplateLister listersv1alpha1.ActorTemplateLister actorWorkflow *ActorWorkflow + pressure *CapacityPressureHub } var _ ateapipb.ControlServer = (*Service)(nil) // NewService creates a service. func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer, kubeClient kubernetes.Interface) *Service { + // The hub is shared: the resume workflow publishes capacity-pressure events + // to it, and the WatchCapacityPressure RPC streams them to subscribers. + pressure := NewCapacityPressureHub() s := &Service{ persistence: persistence, actorTemplateLister: actorTemplateLister, dialer: dialer, - actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient), + actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient, pressure), + pressure: pressure, } return s diff --git a/cmd/ateapi/internal/controlapi/watch_capacity_pressure.go b/cmd/ateapi/internal/controlapi/watch_capacity_pressure.go new file mode 100644 index 000000000..eb922a2d2 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/watch_capacity_pressure.go @@ -0,0 +1,43 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "google.golang.org/grpc" + + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" +) + +// WatchCapacityPressure streams a CapacityPressureEvent every time a pool has +// no free worker for a resume, until the client disconnects. +func (s *Service) WatchCapacityPressure(_ *ateapipb.WatchCapacityPressureRequest, stream grpc.ServerStreamingServer[ateapipb.CapacityPressureEvent]) error { + events, cancel := s.pressure.Subscribe() + defer cancel() + + ctx := stream.Context() + for { + select { + case <-ctx.Done(): + return nil + case key := <-events: + if err := stream.Send(&ateapipb.CapacityPressureEvent{ + WorkerNamespace: key.namespace, + WorkerPool: key.name, + }); err != nil { + return err + } + } + } +} diff --git a/cmd/ateapi/internal/controlapi/workflow.go b/cmd/ateapi/internal/controlapi/workflow.go index 37abb258a..f92af7684 100644 --- a/cmd/ateapi/internal/controlapi/workflow.go +++ b/cmd/ateapi/internal/controlapi/workflow.go @@ -119,16 +119,18 @@ type ActorWorkflow struct { actorTemplateLister listersv1alpha1.ActorTemplateLister kubeClient kubernetes.Interface secretCache *envSecretCache + pressure *CapacityPressureHub } // NewActorWorkflow creates a new ActorWorkflow. -func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface) *ActorWorkflow { +func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface, pressure *CapacityPressureHub) *ActorWorkflow { return &ActorWorkflow{ store: store, dialer: dialer, actorTemplateLister: actorTemplateLister, kubeClient: kubeClient, secretCache: newEnvSecretCache(envSecretCacheTTL), + pressure: pressure, } } @@ -150,7 +152,7 @@ func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) ( steps := []WorkflowStep[*ResumeInput, *ResumeState]{ &LoadActorForResumeStep{store: w.store, actorTemplateLister: w.actorTemplateLister}, - &AssignWorkerStep{store: w.store}, + &AssignWorkerStep{store: w.store, pressure: w.pressure}, &CallAteletRestoreStep{dialer: w.dialer, kubeClient: w.kubeClient, secretCache: w.secretCache}, &FinalizeRunningStep{store: w.store}, } diff --git a/cmd/ateapi/internal/controlapi/workflow_resume.go b/cmd/ateapi/internal/controlapi/workflow_resume.go index 9f06eec0d..bad377d13 100644 --- a/cmd/ateapi/internal/controlapi/workflow_resume.go +++ b/cmd/ateapi/internal/controlapi/workflow_resume.go @@ -77,7 +77,8 @@ func (s *LoadActorForResumeStep) Execute(ctx context.Context, input *ResumeInput func (s *LoadActorForResumeStep) RetryBackoff() *wait.Backoff { return nil } type AssignWorkerStep struct { - store store.Interface + store store.Interface + pressure *CapacityPressureHub } func (s *AssignWorkerStep) Name() string { return "AssignWorker" } @@ -105,6 +106,11 @@ func (s *AssignWorkerStep) Execute(ctx context.Context, input *ResumeInput, stat if assignedWorker == nil { pickedWorker := s.findFreeWorker(workers, state.ActorTemplate.Spec.WorkerPoolRef.Namespace, state.ActorTemplate.Spec.WorkerPoolRef.Name) if pickedWorker == nil { + // Signal capacity pressure for this pool so the autoscaler can react + // at the request edge instead of waiting for its next poll. + if s.pressure != nil { + s.pressure.Publish(state.ActorTemplate.Spec.WorkerPoolRef.Namespace, state.ActorTemplate.Spec.WorkerPoolRef.Name) + } return status.Errorf(codes.FailedPrecondition, "no free workers available") } diff --git a/cmd/atecontroller/main.go b/cmd/atecontroller/main.go index f7e922273..746cd7537 100644 --- a/cmd/atecontroller/main.go +++ b/cmd/atecontroller/main.go @@ -16,7 +16,9 @@ package main import ( "crypto/tls" "os" + "time" + "github.com/agent-substrate/substrate/internal/autoscaler" "github.com/agent-substrate/substrate/internal/controllers" clientv1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" @@ -88,6 +90,20 @@ func main() { os.Exit(1) } + if err = (&controllers.WorkerPoolAutoscaler{ + Client: mgr.GetClient(), + AteClient: ateapiClient, + Config: autoscaler.Config{ + // TODO: surface these as flags once we tune them in a cluster. + ScaleDownStabilization: 60 * time.Second, + MaxScaleUpStep: 0, // unlimited + }, + Interval: 10 * time.Second, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "WorkerPoolAutoscaler") + os.Exit(1) + } + //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/demos/autoscaling/README.md b/demos/autoscaling/README.md new file mode 100644 index 000000000..0bd3caa3e --- /dev/null +++ b/demos/autoscaling/README.md @@ -0,0 +1,98 @@ +# WorkerPool autoscaling demo + +Demonstrates demand-reactive autoscaling of a `WorkerPool` ([issue #198]) on a +running ate cluster: + +- **Reactive scale-up** — when a resume finds no free worker, ateapi emits a + capacity-pressure signal and the autoscaler raises `spec.replicas` + *immediately*, at the request edge — not at its next poll. +- **Hysteretic scale-down** — once idle workers are in surplus, the pool shrinks + back to `minReady`, but only after a stabilization window, so a brief lull + never discards warm capacity. + +The pool keeps a small warm **buffer** (`targetBuffer`) of idle workers so a +burst is served by capacity that *already exists*; the autoscaler's job is to +refill that buffer fast and trim it slowly. + +[issue #198]: https://github.com/agent-substrate/substrate/issues/198 + +## What gets deployed + +`autoscaling.yaml.tmpl` creates the `ate-demo-autoscaling` namespace, an +autoscaled `WorkerPool`, and an `ActorTemplate` that reuses the `counter` +workload (the workload is irrelevant — the focus is the pool): + +```yaml +spec: + replicas: 2 # starting point; the autoscaler owns it from here + minReady: 2 # reservation floor — never scale below 2 warm workers + targetBuffer: 2 # keep ~2 idle workers ready to absorb a burst + maxReplicas: 8 # ceiling +``` + +## Prerequisites + +- An ate cluster deployed +- A kind cluster via `hack/install-ate-kind.sh` works. +- `KO_DOCKER_REPO` and `BUCKET_NAME` set (same as the other demos — `ko` + resolves the `ko://` images and `BUCKET_NAME` is the GCS bucket for actor + snapshots). +- The `kubectl-ate` plugin available as `kubectl ate` (or run the demo with + `ATE=./bin/kubectl-ate ./demo.sh`). If your CLI needs an explicit API + endpoint, set it the same way you do for other `kubectl ate` calls. + +## Run + +```sh +# From the repo root: +./demos/autoscaling/demo.sh # deploys if needed, then runs the scenario +``` + +Watch it react live in another terminal: + +```sh +kubectl get workerpool autoscaling -n ate-demo-autoscaling -w +kubectl logs -n ate-system deploy/ate-controller -f | grep 'autoscaled WorkerPool' +``` + +The driver also accepts: + +```sh +./demos/autoscaling/demo.sh deploy # just apply the manifest +./demos/autoscaling/demo.sh cleanup # remove the demo actors + manifest +``` + +Tunables (env vars): `N` (actors woken, default 6), `SCALE_DOWN_WAIT` +(seconds to wait for the shrink, default 180), `ATE`, `KO`. + +## What you should see + +1. **Steady** — the pool settles at `DESIRED 2 / REPLICAS 2`: two idle workers + held warm. +2. **Burst** — the demo wakes 6 actors. The first two consume the warm buffer; + the next resume finds no free worker and returns `503`. That miss is the + trigger: `ate-controller` logs `autoscaled WorkerPool` and `spec.replicas` + jumps up toward `occupied + targetBuffer`, capped at `maxReplicas=8`. The + `503`'d resumes succeed on retry once the new workers finish booting (cold + start takes a little while — that delay is *why* the warm buffer exists). +3. **Idle** — the demo suspends all actors, freeing their workers. The buffer is + now in surplus. After the stabilization window (~60s) the pool shrinks back + to the floor, `DESIRED 2`. + +To see the ceiling enforced, run with `N=12 ./demos/autoscaling/demo.sh`: the +pool grows to `maxReplicas=8` and no further, so the excess resumes keep +getting `503` (there is no capacity left to create). + +## How it works + +- ateapi publishes a pool-scoped `CapacityPressureEvent` whenever + `AssignWorkerStep` finds no free worker, exposed via the + `WatchCapacityPressure` streaming RPC. +- `WorkerPoolAutoscaler` (in `ate-controller`) subscribes to that stream and + turns each event into an immediate reconcile of the pool. It also re-evaluates + on a ~10s poll (the scale-down path and a safety net). It reads occupancy via + `ListWorkers`, runs the decision policy, and is the **single writer** of + `spec.replicas`; `WorkerPoolReconciler` still owns the Deployment. + +> Note: `ate-controller` runs without leader election today, so the +> single-writer guarantee assumes a single controller replica. diff --git a/demos/autoscaling/autoscaling.yaml.tmpl b/demos/autoscaling/autoscaling.yaml.tmpl new file mode 100644 index 000000000..092cc4eeb --- /dev/null +++ b/demos/autoscaling/autoscaling.yaml.tmpl @@ -0,0 +1,68 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Namespace +metadata: + name: ate-demo-autoscaling + +--- + +apiVersion: ate.dev/v1alpha1 +kind: WorkerPool +metadata: + name: autoscaling + namespace: ate-demo-autoscaling +spec: + # replicas becomes the autoscaler's to own once the bounds below are set; + # this is only the starting point. At steady state the autoscaler keeps + # `targetBuffer` idle workers, so a fresh pool settles at minReady warm. + replicas: 2 + ateomImage: ko://github.com/agent-substrate/substrate/cmd/ateom-gvisor + + # --- Autoscaling bounds (the feature this demo exercises, issue #198) --- + # Never scale below 2 warm workers (the reservation floor). + minReady: 2 + # Keep ~2 idle (warm) workers ready to absorb a resume burst. + targetBuffer: 2 + # Ceiling the autoscaler may grow to. + maxReplicas: 8 + +--- + +apiVersion: ate.dev/v1alpha1 +kind: ActorTemplate +metadata: + name: autoscaling + namespace: ate-demo-autoscaling +spec: + runsc: + amd64: + url: "gs://gvisor/releases/nightly/2026-05-19/x86_64/runsc" + sha256Hash: "a397be1abc2420d26bce6c70e6e2ff96c73aaaab929756c56f5e2089ea842b63" + arm64: + url: "gs://gvisor/releases/nightly/2026-05-19/aarch64/runsc" + sha256Hash: "1ba2366ae2efceba166046f51a4104f9261c9cb72c6db8f5b3fe2dc57dea86b9" + pauseImage: "registry.k8s.io/pause:3.10.2@sha256:f548e0e8e3dc1896ca956272154dde3314e8cc4fde0a57577ee9fa1c63f5baf4" + containers: + - name: workload + # Any workload works; the counter demo binary is reused so this demo has no + # source of its own — the focus is the WorkerPool, not the actor. + image: ko://github.com/agent-substrate/substrate/demos/counter + command: ["/ko-app/counter"] + workerPoolRef: + namespace: ate-demo-autoscaling + name: autoscaling + snapshotsConfig: + location: gs://${BUCKET_NAME}/ate-demo-autoscaling/ diff --git a/demos/autoscaling/demo.sh b/demos/autoscaling/demo.sh new file mode 100755 index 000000000..a40ed5242 --- /dev/null +++ b/demos/autoscaling/demo.sh @@ -0,0 +1,140 @@ +#!/usr/bin/env bash + +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Demo: WorkerPool autoscaling (issue #198). +# +# reactive scale-UP — a resume that finds no free worker emits a +# capacity-pressure signal; the autoscaler raises +# spec.replicas immediately (not at the next poll). +# hysteretic scale-DOWN — once the buffer is in surplus, the pool shrinks +# back to minReady, but only after a stabilization +# window so a brief lull never throws away warm workers. +# +# See README.md for prerequisites. Usage: +# ./demo.sh [run] deploy if needed, then run the up + down scenario +# ./demo.sh deploy just apply the manifest +# ./demo.sh cleanup suspend/delete the demo actors and remove the manifest + +set -o errexit -o nounset -o pipefail + +ATE="${ATE:-kubectl ate}" # override e.g. ATE=./bin/kubectl-ate +KO="${KO:-hack/run-tool.sh ko}" # how to resolve ko:// images +NS="ate-demo-autoscaling" +POOL="autoscaling" +TEMPLATE="${NS}/${POOL}" +N="${N:-6}" # actors to wake (drives the burst) +SCALE_DOWN_WAIT="${SCALE_DOWN_WAIT:-180}" # seconds to wait for hysteretic shrink +ROOT="$(cd "$(dirname "$0")/../.." && pwd)" +TMPL="${ROOT}/demos/autoscaling/autoscaling.yaml.tmpl" + +banner() { printf '\n\033[1;36m== %s ==\033[0m\n' "$*"; } +pool() { kubectl get workerpool "$POOL" -n "$NS"; } +desired() { kubectl get workerpool "$POOL" -n "$NS" -o jsonpath='{.spec.replicas}'; } + +render() { + : "${BUCKET_NAME:?set BUCKET_NAME (GCS bucket for actor snapshots)}" + sed "s|\${BUCKET_NAME}|${BUCKET_NAME}|g" "$TMPL" +} + +deploy() { + banner "Deploying the autoscaled WorkerPool + ActorTemplate" + render | ( cd "$ROOT" && $KO apply -f - ) + kubectl rollout status "deployment/${POOL}-deployment" -n "$NS" --timeout=300s + kubectl wait --for=condition=Ready "actortemplate/${POOL}" -n "$NS" --timeout=300s +} + +cleanup() { + banner "Cleanup" + for i in $(seq 1 "$N"); do + $ATE suspend actor "demo-${i}" >/dev/null 2>&1 || true + $ATE delete actor "demo-${i}" >/dev/null 2>&1 || true + done + render | kubectl delete --ignore-not-found -f - +} + +# resume_with_retry tolerates the 503 a resume gets when the buffer is empty: +# that miss is exactly what triggers the scale-up, and the retry lands once a +# fresh worker has booted. Bounded so the demo never hangs on slow cold starts. +resume_with_retry() { + local id="$1" tries=0 + until $ATE resume actor "$id" >/dev/null 2>&1; do + tries=$((tries + 1)) + if [ "$tries" -ge 15 ]; then + echo " resume ${id}: still no capacity after ${tries} tries (workers may still be booting)" + return 0 + fi + echo " resume ${id}: no free worker (503) — autoscaler reacting; retrying (${tries})..." + sleep 4 + done + echo " resume ${id}: running" +} + +main() { + if ! kubectl get workerpool "$POOL" -n "$NS" >/dev/null 2>&1; then + deploy + fi + + banner "Initial state — autoscaler holds the warm buffer (minReady=2, targetBuffer=2)" + pool + echo + echo "Tip: watch live in another terminal:" + echo " kubectl get workerpool ${POOL} -n ${NS} -w" + echo " kubectl logs -n ate-system deploy/ate-controller -f | grep 'autoscaled WorkerPool'" + + banner "Burst: waking ${N} actors — drains the buffer and triggers reactive scale-up" + for i in $(seq 1 "$N"); do + $ATE create actor "demo-${i}" -t "$TEMPLATE" >/dev/null 2>&1 || true + done + for i in $(seq 1 "$N"); do + resume_with_retry "demo-${i}" + done + sleep 3 + echo + echo "After the burst (spec.replicas climbed toward occupied + targetBuffer, capped at maxReplicas=8):" + pool + + banner "Idle: suspending all actors — frees workers, buffer goes into surplus" + for i in $(seq 1 "$N"); do + $ATE suspend actor "demo-${i}" >/dev/null 2>&1 || true + done + pool + echo + echo "Scale-down is hysteretic — waiting up to ${SCALE_DOWN_WAIT}s for the stabilization window..." + local deadline + deadline=$(( $(date +%s) + SCALE_DOWN_WAIT )) + while [ "$(date +%s)" -lt "$deadline" ]; do + local d + d="$(desired)" + echo " $(date +%H:%M:%S) spec.replicas=${d}" + if [ "${d:-99}" -le 2 ]; then + echo " shrunk to the reservation floor (minReady=2)." + break + fi + sleep 10 + done + + banner "Done" + pool + echo + echo "Run './demo.sh cleanup' to remove the demo actors and manifest." +} + +case "${1:-run}" in + run) main ;; + deploy) deploy ;; + cleanup) cleanup ;; + *) echo "usage: $0 [run|deploy|cleanup]" >&2; exit 1 ;; +esac diff --git a/internal/autoscaler/policy.go b/internal/autoscaler/policy.go new file mode 100644 index 000000000..5f0cf863a --- /dev/null +++ b/internal/autoscaler/policy.go @@ -0,0 +1,149 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package autoscaler holds the WorkerPool autoscaling control logic. This file +// is the pure decision core: given a pool's declarative bounds and a live +// observation it computes the next desired replica count. It has no Kubernetes +// or gRPC dependencies so it can be unit-tested exhaustively; the reconciler +// (see reconciler.go) supplies the observation and persists the small amount of +// timing state Step needs. +package autoscaler + +import "time" + +// Bounds are the declarative autoscaling inputs from WorkerPoolSpec. A nil +// pointer means the operator left the field unset. +type Bounds struct { + MinReady *int32 + TargetBuffer *int32 + MaxReplicas *int32 +} + +// Enabled reports whether autoscaling is configured for the pool. With neither +// minReady nor targetBuffer set, the autoscaler leaves the pool alone — replicas +// stay whatever a human (or some other tool) set on the scale subresource. +func (b Bounds) Enabled() bool { + return b.MinReady != nil || b.TargetBuffer != nil +} + +// Observation is the measured live state of a pool at decision time. +type Observation struct { + // Current is spec.replicas the autoscaler last set (the desired pod count). + Current int32 + // Free is the number of registered workers with no actor assigned (warm/idle). + Free int32 + // InFlight is pods requested but not yet registered as workers (still starting). + InFlight int32 +} + +// Config tunes the loop's deliberate up/down asymmetry. +type Config struct { + // ScaleDownStabilization is how long a shrink must be continuously wanted + // before it is applied. Scale-up ignores it. + ScaleDownStabilization time.Duration + // MaxScaleUpStep caps how many replicas a single up-step may add above the + // reservation floor. Zero means unlimited. The floor itself (MinReady) is + // always reached in one step regardless of this cap. + MaxScaleUpStep int32 +} + +// Decision is the loop's output for one tick. +type Decision struct { + // Target is the replica count to write. It equals Observation.Current when + // Changed is false. + Target int32 + Changed bool + Reason string +} + +// clamp constrains v to [MinReady, MaxReplicas] (and to a non-negative value). +// MinReady can only raise v; MaxReplicas can only lower it. Admission already +// guarantees MinReady <= MaxReplicas (CEL rule on WorkerPoolSpec). +func clamp(v int32, b Bounds) int32 { + if v < 0 { + v = 0 + } + if b.MinReady != nil && v < *b.MinReady { + v = *b.MinReady + } + if b.MaxReplicas != nil && v > *b.MaxReplicas { + v = *b.MaxReplicas + } + return v +} + +// ideal is the instantaneous target that keeps Free ≈ TargetBuffer: +// +// ideal = Current + TargetBuffer - (Free + InFlight) +// +// i.e. add the buffer deficit (when idle headroom is short) or subtract the +// surplus (when it is over-stocked), then clamp to [MinReady, MaxReplicas]. +// Netting against InFlight is the anti-windup term: pods already starting count +// toward the buffer, so the loop does not pile on scale-ups while they boot. +// +// With TargetBuffer unset there is no buffer goal, so the ideal is just Current +// clamped to the bounds — which still lets MinReady raise an under-floored pool +// and MaxReplicas cap an over-sized one. +func ideal(b Bounds, o Observation) int32 { + target := o.Current + if b.TargetBuffer != nil { + target = o.Current + *b.TargetBuffer - (o.Free + o.InFlight) + } + return clamp(target, b) +} + +// Step computes the next Decision. now is the current time; downWantedSince is +// the instant the pool first became eligible to scale down, or the zero Time if +// it is not currently eligible. Both are state the caller persists between +// ticks. Step returns the decision and the updated downWantedSince to carry +// into the next call. +// +// The asymmetry encodes the design's core constraint: +// - scale UP is latency-critical, so it is applied immediately (capped by +// MaxScaleUpStep for buffer-driven growth, but never throttled below the +// reservation floor); +// - scale DOWN is safety-critical, so it is applied only after the shrink has +// been wanted continuously for ScaleDownStabilization — any tick that no +// longer wants to shrink resets the timer; +// - the target is always within [MinReady, MaxReplicas]. +func Step(b Bounds, o Observation, c Config, now, downWantedSince time.Time) (Decision, time.Time) { + target := ideal(b, o) + + switch { + case target > o.Current: + // Scale up now. Cap buffer-driven growth, but re-clamp so the floor is + // still reached in a single step. + next := target + if c.MaxScaleUpStep > 0 && next-o.Current > c.MaxScaleUpStep { + next = clamp(o.Current+c.MaxScaleUpStep, b) + } + if next <= o.Current { + return Decision{Target: o.Current, Reason: "steady"}, time.Time{} + } + return Decision{Target: next, Changed: true, Reason: "scale up: refill buffer"}, time.Time{} + + case target < o.Current: + // Want to shrink: hold until the desire has persisted long enough. + if downWantedSince.IsZero() { + downWantedSince = now + } + if now.Sub(downWantedSince) >= c.ScaleDownStabilization { + return Decision{Target: target, Changed: true, Reason: "scale down: surplus buffer"}, time.Time{} + } + return Decision{Target: o.Current, Reason: "scale down pending stabilization"}, downWantedSince + + default: + return Decision{Target: o.Current, Reason: "steady"}, time.Time{} + } +} diff --git a/internal/autoscaler/policy_test.go b/internal/autoscaler/policy_test.go new file mode 100644 index 000000000..f186aa0d6 --- /dev/null +++ b/internal/autoscaler/policy_test.go @@ -0,0 +1,220 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package autoscaler + +import ( + "testing" + "time" +) + +func p(v int32) *int32 { return &v } + +var base = time.Unix(1_700_000_000, 0) + +func TestBoundsEnabled(t *testing.T) { + for _, tc := range []struct { + name string + b Bounds + want bool + }{ + {"none", Bounds{}, false}, + {"maxOnly", Bounds{MaxReplicas: p(5)}, false}, + {"minReady", Bounds{MinReady: p(1)}, true}, + {"targetBuffer", Bounds{TargetBuffer: p(2)}, true}, + } { + if got := tc.b.Enabled(); got != tc.want { + t.Errorf("%s: Enabled()=%v want %v", tc.name, got, tc.want) + } + } +} + +func TestStep(t *testing.T) { + const stab = 30 * time.Second + for _, tc := range []struct { + name string + b Bounds + o Observation + c Config + now time.Time + downSince time.Time + wantTarget int32 + wantChanged bool + wantDown time.Time // zero unless a timer should be carried + }{ + { + name: "buffer deficit scales up immediately", + b: Bounds{TargetBuffer: p(3)}, + o: Observation{Current: 5, Free: 0}, + now: base, + wantTarget: 8, wantChanged: true, + }, + { + name: "inflight counts toward buffer (anti-windup)", + b: Bounds{TargetBuffer: p(3)}, + o: Observation{Current: 5, Free: 0, InFlight: 3}, + now: base, + wantTarget: 5, wantChanged: false, + }, + { + name: "partial deficit", + b: Bounds{TargetBuffer: p(3)}, + o: Observation{Current: 5, Free: 1}, + now: base, + wantTarget: 7, wantChanged: true, + }, + { + name: "scale up capped by MaxScaleUpStep", + b: Bounds{TargetBuffer: p(10)}, + o: Observation{Current: 5}, + c: Config{MaxScaleUpStep: 2}, + now: base, + wantTarget: 7, wantChanged: true, + }, + { + name: "reservation floor reached in one step despite small cap", + b: Bounds{MinReady: p(10)}, + o: Observation{Current: 0}, + c: Config{MaxScaleUpStep: 2}, + now: base, + wantTarget: 10, wantChanged: true, + }, + { + name: "maxReplicas caps scale up", + b: Bounds{TargetBuffer: p(100), MaxReplicas: p(8)}, + o: Observation{Current: 5}, + now: base, + wantTarget: 8, wantChanged: true, + }, + { + name: "surplus buffer: shrink held, timer starts", + b: Bounds{TargetBuffer: p(2)}, + o: Observation{Current: 10, Free: 8}, + c: Config{ScaleDownStabilization: stab}, + now: base, + wantTarget: 10, wantChanged: false, wantDown: base, + }, + { + name: "shrink applied once stabilization elapsed", + b: Bounds{TargetBuffer: p(2)}, + o: Observation{Current: 10, Free: 8}, + c: Config{ScaleDownStabilization: stab}, + now: base.Add(stab), + downSince: base, + wantTarget: 4, wantChanged: true, + }, + { + name: "shrink still pending within window", + b: Bounds{TargetBuffer: p(2)}, + o: Observation{Current: 10, Free: 8}, + c: Config{ScaleDownStabilization: stab}, + now: base.Add(10 * time.Second), + downSince: base, + wantTarget: 10, wantChanged: false, wantDown: base, + }, + { + name: "no targetBuffer: minReady raises under-floored pool", + b: Bounds{MinReady: p(3)}, + o: Observation{Current: 1}, + now: base, + wantTarget: 3, wantChanged: true, + }, + { + name: "no targetBuffer: maxReplicas trims oversized pool after window", + b: Bounds{MaxReplicas: p(5)}, + o: Observation{Current: 8}, + c: Config{ScaleDownStabilization: stab}, + now: base.Add(stab), + downSince: base, + wantTarget: 5, wantChanged: true, + }, + { + name: "no autoscaling fields: steady no-op", + b: Bounds{}, + o: Observation{Current: 4}, + now: base, + wantTarget: 4, wantChanged: false, + }, + { + name: "reaching steady clears a pending shrink timer", + b: Bounds{TargetBuffer: p(2)}, + o: Observation{Current: 4, Free: 2}, + c: Config{ScaleDownStabilization: stab}, + now: base.Add(5 * time.Second), + downSince: base, // a shrink was pending... + wantTarget: 4, wantChanged: false, // ...but ideal now equals current, so timer resets + }, + { + name: "minReady=0 allows shrink to zero after window", + b: Bounds{MinReady: p(0), TargetBuffer: p(0)}, + o: Observation{Current: 3, Free: 3}, + c: Config{ScaleDownStabilization: stab}, + now: base.Add(stab), + downSince: base, + wantTarget: 0, wantChanged: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + got, gotDown := Step(tc.b, tc.o, tc.c, tc.now, tc.downSince) + if got.Target != tc.wantTarget || got.Changed != tc.wantChanged { + t.Errorf("Step => {Target:%d Changed:%v Reason:%q}, want {Target:%d Changed:%v}", + got.Target, got.Changed, got.Reason, tc.wantTarget, tc.wantChanged) + } + if !gotDown.Equal(tc.wantDown) { + t.Errorf("downWantedSince => %v, want %v", gotDown, tc.wantDown) + } + }) + } +} + +// TestStepScaleDownLifecycle walks the hysteresis state machine across ticks. +func TestStepScaleDownLifecycle(t *testing.T) { + b := Bounds{TargetBuffer: p(2)} + c := Config{ScaleDownStabilization: 30 * time.Second} + surplus := Observation{Current: 10, Free: 8} // ideal 4, wants shrink + + // Tick 1: shrink wanted, timer starts, held. + d, down := Step(b, surplus, c, base, time.Time{}) + if d.Changed || !down.Equal(base) { + t.Fatalf("tick1: got changed=%v down=%v, want held with timer=%v", d.Changed, down, base) + } + // Tick 2: still within window, still held, timer carried. + d, down = Step(b, surplus, c, base.Add(10*time.Second), down) + if d.Changed || !down.Equal(base) { + t.Fatalf("tick2: got changed=%v down=%v, want held", d.Changed, down) + } + // Tick 3: window elapsed, shrink applied, timer cleared. + d, down = Step(b, surplus, c, base.Add(31*time.Second), down) + if !d.Changed || d.Target != 4 || !down.IsZero() { + t.Fatalf("tick3: got {changed:%v target:%d} down=%v, want shrink to 4 + cleared timer", d.Changed, d.Target, down) + } +} + +// TestStepScaleDownTimerResetsOnDemand verifies a returning burst cancels a +// pending shrink so a brief lull never throws away warm capacity. +func TestStepScaleDownTimerResetsOnDemand(t *testing.T) { + b := Bounds{TargetBuffer: p(2)} + c := Config{ScaleDownStabilization: 30 * time.Second} + + // Shrink wanted: timer starts. + _, down := Step(b, Observation{Current: 10, Free: 8}, c, base, time.Time{}) + if !down.Equal(base) { + t.Fatalf("expected timer to start at %v, got %v", base, down) + } + // Demand returns (free drops below buffer): scale up now, timer cleared. + d, down := Step(b, Observation{Current: 10, Free: 0}, c, base.Add(5*time.Second), down) + if !d.Changed || d.Target != 12 || !down.IsZero() { + t.Fatalf("burst: got {changed:%v target:%d} down=%v, want scale up to 12 + cleared timer", d.Changed, d.Target, down) + } +} diff --git a/internal/controllers/workerpool_autoscaler.go b/internal/controllers/workerpool_autoscaler.go new file mode 100644 index 000000000..8d3dc20c9 --- /dev/null +++ b/internal/controllers/workerpool_autoscaler.go @@ -0,0 +1,285 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "context" + "fmt" + "sync" + "time" + + k8errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/agent-substrate/substrate/internal/autoscaler" + atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" +) + +// pressureReconnectDelay is how long the capacity-pressure watcher waits before +// re-opening the stream after it ends or errors. +const pressureReconnectDelay = 2 * time.Second + +// defaultAutoscaleInterval is how often each autoscaled pool is re-evaluated +// when nothing else triggers a reconcile. Occupancy lives in ateapi rather than +// in Kubernetes, so the loop polls on this cadence instead of waking on events. +const defaultAutoscaleInterval = 10 * time.Second + +// WorkerPoolAutoscaler is the single writer of spec.replicas for autoscaled +// WorkerPools. Each tick it reads a pool's declarative bounds, measures live +// occupancy from ateapi, runs the decision policy (internal/autoscaler), and +// patches the replica count. It deliberately does not touch the Deployment — +// WorkerPoolReconciler still materializes spec.replicas — so the two +// controllers own disjoint fields and never fight. +type WorkerPoolAutoscaler struct { + client.Client + + // AteClient is the control-plane API used to read per-pool worker occupancy. + AteClient ateapipb.ControlClient + // Config tunes the decision policy (stabilization window, max up-step). + Config autoscaler.Config + // Interval is the re-evaluation cadence. Defaults to defaultAutoscaleInterval. + Interval time.Duration + + // now is the clock, overridable in tests. + now func() time.Time + // downSince remembers, per pool, when a scale-down first became eligible, so + // the stabilization window survives across reconciles. Lost on restart, which + // is safe: it merely restarts the (conservative) down timer. + mu sync.Mutex + downSince map[types.NamespacedName]time.Time + + // pressureEvents carries capacity-pressure notifications from ateapi into the + // controller's workqueue as immediate reconciles (the reactive up-path). + pressureEvents chan event.GenericEvent +} + +//+kubebuilder:rbac:groups=ate.dev,resources=workerpools,verbs=get;list;watch;update;patch + +// Reconcile evaluates one WorkerPool and, if autoscaling is configured, moves +// spec.replicas toward the policy's target. +func (r *WorkerPoolAutoscaler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := log.FromContext(ctx) + + wp := &atev1alpha1.WorkerPool{} + if err := r.Get(ctx, req.NamespacedName, wp); err != nil { + if k8errors.IsNotFound(err) { + r.forget(req.NamespacedName) + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("failed to get worker pool %q: %w", req.NamespacedName, err) + } + + if !wp.GetDeletionTimestamp().IsZero() { + r.forget(req.NamespacedName) + return ctrl.Result{}, nil + } + + bounds := autoscaler.Bounds{ + MinReady: wp.Spec.MinReady, + TargetBuffer: wp.Spec.TargetBuffer, + MaxReplicas: wp.Spec.MaxReplicas, + } + if !bounds.Enabled() { + // Pool is not autoscaled: leave spec.replicas to whoever owns it and stop + // requeuing it. + r.forget(req.NamespacedName) + return ctrl.Result{}, nil + } + + obs, err := r.observe(ctx, wp) + if err != nil { + return ctrl.Result{}, fmt.Errorf("while observing pool occupancy: %w", err) + } + + decision, downSince := autoscaler.Step(bounds, obs, r.Config, r.nowFn(), r.loadDownSince(req.NamespacedName)) + r.storeDownSince(req.NamespacedName, downSince) + + if decision.Changed { + if err := r.scaleTo(ctx, wp, decision.Target); err != nil { + return ctrl.Result{}, fmt.Errorf("while scaling worker pool: %w", err) + } + log.Info("autoscaled WorkerPool", + "from", obs.Current, "to", decision.Target, + "free", obs.Free, "inFlight", obs.InFlight, "reason", decision.Reason) + } + + return ctrl.Result{RequeueAfter: r.interval()}, nil +} + +// observe measures the pool's live occupancy from ateapi. Free is the number of +// registered workers with no actor; InFlight is pods requested but not yet +// registered (current desired count minus everything that has registered). +func (r *WorkerPoolAutoscaler) observe(ctx context.Context, wp *atev1alpha1.WorkerPool) (autoscaler.Observation, error) { + resp, err := r.AteClient.ListWorkers(ctx, &ateapipb.ListWorkersRequest{}) + if err != nil { + return autoscaler.Observation{}, fmt.Errorf("listing workers: %w", err) + } + + var registered, free int32 + for _, w := range resp.GetWorkers() { + if w.GetWorkerNamespace() != wp.Namespace || w.GetWorkerPool() != wp.Name { + continue + } + registered++ + if w.GetActorId() == "" { + free++ + } + } + + inFlight := wp.Spec.Replicas - registered + if inFlight < 0 { + inFlight = 0 + } + return autoscaler.Observation{Current: wp.Spec.Replicas, Free: free, InFlight: inFlight}, nil +} + +// scaleTo patches only spec.replicas — the field the scale subresource maps to — +// leaving every other field to its owner. +func (r *WorkerPoolAutoscaler) scaleTo(ctx context.Context, wp *atev1alpha1.WorkerPool, target int32) error { + patch := client.MergeFrom(wp.DeepCopy()) + wp.Spec.Replicas = target + return r.Patch(ctx, wp, patch) +} + +func (r *WorkerPoolAutoscaler) nowFn() time.Time { + if r.now != nil { + return r.now() + } + return time.Now() +} + +func (r *WorkerPoolAutoscaler) interval() time.Duration { + if r.Interval > 0 { + return r.Interval + } + return defaultAutoscaleInterval +} + +func (r *WorkerPoolAutoscaler) loadDownSince(key types.NamespacedName) time.Time { + r.mu.Lock() + defer r.mu.Unlock() + return r.downSince[key] +} + +func (r *WorkerPoolAutoscaler) storeDownSince(key types.NamespacedName, t time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + if r.downSince == nil { + r.downSince = map[types.NamespacedName]time.Time{} + } + if t.IsZero() { + delete(r.downSince, key) + return + } + r.downSince[key] = t +} + +func (r *WorkerPoolAutoscaler) forget(key types.NamespacedName) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.downSince, key) +} + +// SetupWithManager registers the autoscaler. It uses a distinct controller name +// (WorkerPoolReconciler also watches WorkerPool) and a generation predicate so +// status-only writes don't wake the periodic path. A second event source — +// fed by the capacity-pressure watcher — supplies the reactive up-path. +func (r *WorkerPoolAutoscaler) SetupWithManager(mgr ctrl.Manager) error { + r.mu.Lock() + if r.downSince == nil { + r.downSince = map[types.NamespacedName]time.Time{} + } + r.mu.Unlock() + + // Reactive up-path: ateapi streams a capacity-pressure event the instant a + // pool has no free worker; the watcher turns each into an immediate reconcile + // of that pool. The For() generation predicate does not apply to this source, + // so pressure always enqueues. The periodic requeue remains the slow path. + r.pressureEvents = make(chan event.GenericEvent, 128) + if err := mgr.Add(manager.RunnableFunc(r.watchCapacityPressure)); err != nil { + return err + } + + return ctrl.NewControllerManagedBy(mgr). + For(&atev1alpha1.WorkerPool{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + WatchesRawSource(source.Channel(r.pressureEvents, &handler.EnqueueRequestForObject{})). + Named("workerpool-autoscaler"). + Complete(r) +} + +// watchCapacityPressure subscribes to ateapi's capacity-pressure stream and +// enqueues an immediate reconcile for each pool that misses. It runs for the +// life of the manager, reconnecting (with a short delay) whenever the stream +// ends or errors. It only returns when the manager context is cancelled, so a +// failing stream never takes the manager down. +func (r *WorkerPoolAutoscaler) watchCapacityPressure(ctx context.Context) error { + log := log.FromContext(ctx).WithName("capacity-pressure-watch") + for { + if ctx.Err() != nil { + return nil + } + stream, err := r.AteClient.WatchCapacityPressure(ctx, &ateapipb.WatchCapacityPressureRequest{}) + if err != nil { + log.V(1).Info("capacity-pressure stream open failed; will retry", "err", err) + if !sleep(ctx, pressureReconnectDelay) { + return nil + } + continue + } + + for { + ev, err := stream.Recv() + if err != nil { + log.V(1).Info("capacity-pressure stream ended; will reconnect", "err", err) + break + } + select { + case r.pressureEvents <- event.GenericEvent{Object: &atev1alpha1.WorkerPool{ + ObjectMeta: metav1.ObjectMeta{Namespace: ev.GetWorkerNamespace(), Name: ev.GetWorkerPool()}, + }}: + case <-ctx.Done(): + return nil + } + } + + if !sleep(ctx, pressureReconnectDelay) { + return nil + } + } +} + +// sleep waits for d or until ctx is cancelled. It reports whether the full +// delay elapsed (true) versus being cut short by cancellation (false). +func sleep(ctx context.Context, d time.Duration) bool { + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + return false + case <-t.C: + return true + } +} diff --git a/internal/controllers/workerpool_autoscaler_test.go b/internal/controllers/workerpool_autoscaler_test.go new file mode 100644 index 000000000..4a638dec1 --- /dev/null +++ b/internal/controllers/workerpool_autoscaler_test.go @@ -0,0 +1,208 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "context" + "io" + "testing" + "time" + + "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/event" + + "github.com/agent-substrate/substrate/internal/autoscaler" + atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" +) + +func ptrInt32(v int32) *int32 { return &v } + +// stubControl is a ControlClient whose ListWorkers returns a fixed worker set. +// The autoscaler only calls ListWorkers; any other method would panic via the +// nil embedded interface, which keeps the stub honest about what it relies on. +type stubControl struct { + ateapipb.ControlClient + workers []*ateapipb.Worker + pressure <-chan *ateapipb.CapacityPressureEvent +} + +func (s *stubControl) ListWorkers(context.Context, *ateapipb.ListWorkersRequest, ...grpc.CallOption) (*ateapipb.ListWorkersResponse, error) { + return &ateapipb.ListWorkersResponse{Workers: s.workers}, nil +} + +func (s *stubControl) WatchCapacityPressure(ctx context.Context, _ *ateapipb.WatchCapacityPressureRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[ateapipb.CapacityPressureEvent], error) { + return &stubPressureStream{ctx: ctx, events: s.pressure}, nil +} + +// stubPressureStream is a minimal server-streaming client backed by a channel. +type stubPressureStream struct { + grpc.ClientStream + ctx context.Context + events <-chan *ateapipb.CapacityPressureEvent +} + +func (s *stubPressureStream) Recv() (*ateapipb.CapacityPressureEvent, error) { + select { + case e, ok := <-s.events: + if !ok { + return nil, io.EOF + } + return e, nil + case <-s.ctx.Done(): + return nil, s.ctx.Err() + } +} + +// TestAutoscalerCapacityPressureTriggersReconcile checks that a streamed +// capacity-pressure event is turned into an immediate reconcile request for the +// named pool (the reactive up-path). +func TestAutoscalerCapacityPressureTriggersReconcile(t *testing.T) { + events := make(chan *ateapipb.CapacityPressureEvent, 1) + r := &WorkerPoolAutoscaler{ + AteClient: &stubControl{pressure: events}, + pressureEvents: make(chan event.GenericEvent, 1), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = r.watchCapacityPressure(ctx) }() + + events <- &ateapipb.CapacityPressureEvent{WorkerNamespace: "ns", WorkerPool: "pool"} + + select { + case ev := <-r.pressureEvents: + if ev.Object.GetNamespace() != "ns" || ev.Object.GetName() != "pool" { + t.Fatalf("reconcile event for %s/%s, want ns/pool", ev.Object.GetNamespace(), ev.Object.GetName()) + } + case <-time.After(2 * time.Second): + t.Fatal("capacity-pressure event did not produce a reconcile request") + } +} + +// poolWorkers builds `total` workers for a pool, the first `occupied` of which +// carry an actor (the rest are free/idle). +func poolWorkers(ns, pool string, total, occupied int) []*ateapipb.Worker { + ws := make([]*ateapipb.Worker, 0, total) + for i := 0; i < total; i++ { + actor := "" + if i < occupied { + actor = "actor" + } + ws = append(ws, &ateapipb.Worker{WorkerNamespace: ns, WorkerPool: pool, ActorId: actor}) + } + return ws +} + +func TestAutoscalerScalesUpToRefillBuffer(t *testing.T) { + wp := makeWorkerPool("autoscale-up", "default", 5, "ateom:v1") + wp.Spec.TargetBuffer = ptrInt32(3) + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + + // 5 registered, 1 free, 0 in-flight => ideal = 5 + 3 - 1 = 7. + r := &WorkerPoolAutoscaler{ + Client: k8sClient, + AteClient: &stubControl{workers: poolWorkers("default", "autoscale-up", 5, 4)}, + now: func() time.Time { return time.Unix(1_700_000_000, 0) }, + } + key := types.NamespacedName{Namespace: "default", Name: "autoscale-up"} + if _, err := r.Reconcile(testCtx, ctrl.Request{NamespacedName: key}); err != nil { + t.Fatalf("reconcile: %v", err) + } + + got := &atev1alpha1.WorkerPool{} + if err := k8sClient.Get(testCtx, key, got); err != nil { + t.Fatal(err) + } + if got.Spec.Replicas != 7 { + t.Fatalf("replicas = %d, want 7", got.Spec.Replicas) + } +} + +func TestAutoscalerSkipsUnconfiguredPool(t *testing.T) { + wp := makeWorkerPool("autoscale-skip", "default", 4, "ateom:v1") // no autoscaling fields + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + + r := &WorkerPoolAutoscaler{ + Client: k8sClient, + AteClient: &stubControl{}, // ListWorkers must not be reached + now: func() time.Time { return time.Unix(1_700_000_000, 0) }, + } + key := types.NamespacedName{Namespace: "default", Name: "autoscale-skip"} + res, err := r.Reconcile(testCtx, ctrl.Request{NamespacedName: key}) + if err != nil { + t.Fatalf("reconcile: %v", err) + } + if res.RequeueAfter != 0 { + t.Fatalf("unconfigured pool should not requeue, got %v", res.RequeueAfter) + } + + got := &atev1alpha1.WorkerPool{} + if err := k8sClient.Get(testCtx, key, got); err != nil { + t.Fatal(err) + } + if got.Spec.Replicas != 4 { + t.Fatalf("replicas = %d, want 4 (unchanged)", got.Spec.Replicas) + } +} + +func TestAutoscalerScaleDownAfterStabilization(t *testing.T) { + wp := makeWorkerPool("autoscale-down", "default", 10, "ateom:v1") + wp.Spec.TargetBuffer = ptrInt32(2) + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + + // 10 registered, 8 free => ideal = 10 + 2 - 8 = 4 (wants to shrink). + base := time.Unix(1_700_000_000, 0) + now := base + r := &WorkerPoolAutoscaler{ + Client: k8sClient, + AteClient: &stubControl{workers: poolWorkers("default", "autoscale-down", 10, 2)}, + Config: autoscaler.Config{ScaleDownStabilization: 30 * time.Second}, + now: func() time.Time { return now }, + } + key := types.NamespacedName{Namespace: "default", Name: "autoscale-down"} + + // Tick 1: shrink wanted but within the window => held at 10. + if _, err := r.Reconcile(testCtx, ctrl.Request{NamespacedName: key}); err != nil { + t.Fatalf("reconcile tick1: %v", err) + } + got := &atev1alpha1.WorkerPool{} + if err := k8sClient.Get(testCtx, key, got); err != nil { + t.Fatal(err) + } + if got.Spec.Replicas != 10 { + t.Fatalf("after tick1 replicas = %d, want 10 (held)", got.Spec.Replicas) + } + + // Tick 2: window elapsed => shrink applied to 4. + now = base.Add(31 * time.Second) + if _, err := r.Reconcile(testCtx, ctrl.Request{NamespacedName: key}); err != nil { + t.Fatalf("reconcile tick2: %v", err) + } + if err := k8sClient.Get(testCtx, key, got); err != nil { + t.Fatal(err) + } + if got.Spec.Replicas != 4 { + t.Fatalf("after tick2 replicas = %d, want 4 (applied)", got.Spec.Replicas) + } +} diff --git a/manifests/ate-install/generated/ate.dev_workerpools.yaml b/manifests/ate-install/generated/ate.dev_workerpools.yaml index 634512b23..ed4727f6f 100644 --- a/manifests/ate-install/generated/ate.dev_workerpools.yaml +++ b/manifests/ate-install/generated/ate.dev_workerpools.yaml @@ -70,8 +70,37 @@ spec: workers. minLength: 1 type: string + maxReplicas: + description: |- + MaxReplicas is the upper bound the autoscaler may grow the pool to. When + unset the autoscaler applies no ceiling of its own. + format: int32 + minimum: 0 + type: integer + minReady: + description: |- + MinReady is the minimum number of worker pods the autoscaler keeps the + pool at — the reservation floor it must never scale below. When unset the + pool may be scaled to zero. The floor is enforced by the autoscaler; the + WorkerPool controller never clamps Replicas itself, so that the scale + subresource keeps a single writer. + format: int32 + minimum: 0 + type: integer replicas: - description: Replicas is the number of worker pods to run. + description: |- + Replicas is the number of worker pods to run. When autoscaling is enabled + it is owned by the autoscaler (written via the scale subresource); the + fields below are the declarative inputs that drive it. + format: int32 + minimum: 0 + type: integer + targetBuffer: + description: |- + TargetBuffer is the desired number of idle (warm) workers the autoscaler + keeps available to absorb resume bursts. When the idle count falls below + this target the autoscaler provisions more workers, net of pods already + starting. When unset, buffer-based scale-up is disabled. format: int32 minimum: 0 type: integer @@ -79,6 +108,10 @@ spec: - ateomImage - replicas type: object + x-kubernetes-validations: + - message: minReady must not exceed maxReplicas + rule: '!has(self.minReady) || !has(self.maxReplicas) || self.minReady + <= self.maxReplicas' status: description: status is the observed state of WorkerPool properties: diff --git a/pkg/api/v1alpha1/workerpool_types.go b/pkg/api/v1alpha1/workerpool_types.go index 2350432b6..ec01428e2 100644 --- a/pkg/api/v1alpha1/workerpool_types.go +++ b/pkg/api/v1alpha1/workerpool_types.go @@ -18,8 +18,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// +kubebuilder:validation:XValidation:rule="!has(self.minReady) || !has(self.maxReplicas) || self.minReady <= self.maxReplicas",message="minReady must not exceed maxReplicas" type WorkerPoolSpec struct { - // Replicas is the number of worker pods to run. + // Replicas is the number of worker pods to run. When autoscaling is enabled + // it is owned by the autoscaler (written via the scale subresource); the + // fields below are the declarative inputs that drive it. // +required // +kubebuilder:validation:Minimum=0 Replicas int32 `json:"replicas"` @@ -28,6 +31,29 @@ type WorkerPoolSpec struct { // +kubebuilder:validation:MinLength=1 // +required AteomImage string `json:"ateomImage"` + + // MinReady is the minimum number of worker pods the autoscaler keeps the + // pool at — the reservation floor it must never scale below. When unset the + // pool may be scaled to zero. The floor is enforced by the autoscaler; the + // WorkerPool controller never clamps Replicas itself, so that the scale + // subresource keeps a single writer. + // +optional + // +kubebuilder:validation:Minimum=0 + MinReady *int32 `json:"minReady,omitempty"` + + // TargetBuffer is the desired number of idle (warm) workers the autoscaler + // keeps available to absorb resume bursts. When the idle count falls below + // this target the autoscaler provisions more workers, net of pods already + // starting. When unset, buffer-based scale-up is disabled. + // +optional + // +kubebuilder:validation:Minimum=0 + TargetBuffer *int32 `json:"targetBuffer,omitempty"` + + // MaxReplicas is the upper bound the autoscaler may grow the pool to. When + // unset the autoscaler applies no ceiling of its own. + // +optional + // +kubebuilder:validation:Minimum=0 + MaxReplicas *int32 `json:"maxReplicas,omitempty"` } type WorkerPoolStatus struct { diff --git a/pkg/api/v1alpha1/zz_generated.deepcopy.go b/pkg/api/v1alpha1/zz_generated.deepcopy.go index 4bbfe02b6..dc215517d 100644 --- a/pkg/api/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/api/v1alpha1/zz_generated.deepcopy.go @@ -318,7 +318,7 @@ func (in *WorkerPool) DeepCopyInto(out *WorkerPool) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -375,6 +375,21 @@ func (in *WorkerPoolList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WorkerPoolSpec) DeepCopyInto(out *WorkerPoolSpec) { *out = *in + if in.MinReady != nil { + in, out := &in.MinReady, &out.MinReady + *out = new(int32) + **out = **in + } + if in.TargetBuffer != nil { + in, out := &in.TargetBuffer, &out.TargetBuffer + *out = new(int32) + **out = **in + } + if in.MaxReplicas != nil { + in, out := &in.MaxReplicas, &out.MaxReplicas + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkerPoolSpec. diff --git a/pkg/proto/ateapipb/ateapi.pb.go b/pkg/proto/ateapipb/ateapi.pb.go index a11f341fa..a3a992875 100644 --- a/pkg/proto/ateapipb/ateapi.pb.go +++ b/pkg/proto/ateapipb/ateapi.pb.go @@ -1052,6 +1052,96 @@ func (*DebugClearResponse) Descriptor() ([]byte, []int) { return file_ateapi_proto_rawDescGZIP(), []int{17} } +type WatchCapacityPressureRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WatchCapacityPressureRequest) Reset() { + *x = WatchCapacityPressureRequest{} + mi := &file_ateapi_proto_msgTypes[18] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WatchCapacityPressureRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WatchCapacityPressureRequest) ProtoMessage() {} + +func (x *WatchCapacityPressureRequest) ProtoReflect() protoreflect.Message { + mi := &file_ateapi_proto_msgTypes[18] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WatchCapacityPressureRequest.ProtoReflect.Descriptor instead. +func (*WatchCapacityPressureRequest) Descriptor() ([]byte, []int) { + return file_ateapi_proto_rawDescGZIP(), []int{18} +} + +// CapacityPressureEvent signals that a worker pool had no free worker for a +// resume, identifying the pool so a consumer can react per-pool. +type CapacityPressureEvent struct { + state protoimpl.MessageState `protogen:"open.v1"` + WorkerNamespace string `protobuf:"bytes,1,opt,name=worker_namespace,json=workerNamespace,proto3" json:"worker_namespace,omitempty"` + WorkerPool string `protobuf:"bytes,2,opt,name=worker_pool,json=workerPool,proto3" json:"worker_pool,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CapacityPressureEvent) Reset() { + *x = CapacityPressureEvent{} + mi := &file_ateapi_proto_msgTypes[19] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CapacityPressureEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CapacityPressureEvent) ProtoMessage() {} + +func (x *CapacityPressureEvent) ProtoReflect() protoreflect.Message { + mi := &file_ateapi_proto_msgTypes[19] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CapacityPressureEvent.ProtoReflect.Descriptor instead. +func (*CapacityPressureEvent) Descriptor() ([]byte, []int) { + return file_ateapi_proto_rawDescGZIP(), []int{19} +} + +func (x *CapacityPressureEvent) GetWorkerNamespace() string { + if x != nil { + return x.WorkerNamespace + } + return "" +} + +func (x *CapacityPressureEvent) GetWorkerPool() string { + if x != nil { + return x.WorkerPool + } + return "" +} + type MintJWTRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Audience []string `protobuf:"bytes,1,rep,name=audience,proto3" json:"audience,omitempty"` @@ -1064,7 +1154,7 @@ type MintJWTRequest struct { func (x *MintJWTRequest) Reset() { *x = MintJWTRequest{} - mi := &file_ateapi_proto_msgTypes[18] + mi := &file_ateapi_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1076,7 +1166,7 @@ func (x *MintJWTRequest) String() string { func (*MintJWTRequest) ProtoMessage() {} func (x *MintJWTRequest) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[18] + mi := &file_ateapi_proto_msgTypes[20] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1089,7 +1179,7 @@ func (x *MintJWTRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use MintJWTRequest.ProtoReflect.Descriptor instead. func (*MintJWTRequest) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{18} + return file_ateapi_proto_rawDescGZIP(), []int{20} } func (x *MintJWTRequest) GetAudience() []string { @@ -1149,7 +1239,7 @@ type MintJWTResponse struct { func (x *MintJWTResponse) Reset() { *x = MintJWTResponse{} - mi := &file_ateapi_proto_msgTypes[19] + mi := &file_ateapi_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1161,7 +1251,7 @@ func (x *MintJWTResponse) String() string { func (*MintJWTResponse) ProtoMessage() {} func (x *MintJWTResponse) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[19] + mi := &file_ateapi_proto_msgTypes[21] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1174,7 +1264,7 @@ func (x *MintJWTResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MintJWTResponse.ProtoReflect.Descriptor instead. func (*MintJWTResponse) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{19} + return file_ateapi_proto_rawDescGZIP(), []int{21} } func (x *MintJWTResponse) GetSessionJwt() string { @@ -1199,7 +1289,7 @@ type MintCertRequest struct { func (x *MintCertRequest) Reset() { *x = MintCertRequest{} - mi := &file_ateapi_proto_msgTypes[20] + mi := &file_ateapi_proto_msgTypes[22] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1211,7 +1301,7 @@ func (x *MintCertRequest) String() string { func (*MintCertRequest) ProtoMessage() {} func (x *MintCertRequest) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[20] + mi := &file_ateapi_proto_msgTypes[22] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1224,7 +1314,7 @@ func (x *MintCertRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use MintCertRequest.ProtoReflect.Descriptor instead. func (*MintCertRequest) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{20} + return file_ateapi_proto_rawDescGZIP(), []int{22} } func (x *MintCertRequest) GetAppId() string { @@ -1267,7 +1357,7 @@ type MintCertResponse struct { func (x *MintCertResponse) Reset() { *x = MintCertResponse{} - mi := &file_ateapi_proto_msgTypes[21] + mi := &file_ateapi_proto_msgTypes[23] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1279,7 +1369,7 @@ func (x *MintCertResponse) String() string { func (*MintCertResponse) ProtoMessage() {} func (x *MintCertResponse) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[21] + mi := &file_ateapi_proto_msgTypes[23] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1292,7 +1382,7 @@ func (x *MintCertResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MintCertResponse.ProtoReflect.Descriptor instead. func (*MintCertResponse) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{21} + return file_ateapi_proto_rawDescGZIP(), []int{23} } func (x *MintCertResponse) GetSessionCertificates() [][]byte { @@ -1372,7 +1462,12 @@ const file_ateapi_proto_rawDesc = "" + "\aversion\x18\b \x01(\x03R\aversion\x12$\n" + "\x0eworker_pod_uid\x18\t \x01(\tR\fworkerPodUid\"\x13\n" + "\x11DebugClearRequest\"\x14\n" + - "\x12DebugClearResponse\"{\n" + + "\x12DebugClearResponse\"\x1e\n" + + "\x1cWatchCapacityPressureRequest\"c\n" + + "\x15CapacityPressureEvent\x12)\n" + + "\x10worker_namespace\x18\x01 \x01(\tR\x0fworkerNamespace\x12\x1f\n" + + "\vworker_pool\x18\x02 \x01(\tR\n" + + "workerPool\"{\n" + "\x0eMintJWTRequest\x12\x1a\n" + "\baudience\x18\x01 \x03(\tR\baudience\x12\x15\n" + "\x06app_id\x18\x02 \x01(\tR\x05appId\x12\x17\n" + @@ -1389,7 +1484,7 @@ const file_ateapi_proto_rawDesc = "" + "session_id\x18\x03 \x01(\tR\tsessionId\x12>\n" + "\x1bcertificate_signing_request\x18\x04 \x01(\fR\x19certificateSigningRequest\"E\n" + "\x10MintCertResponse\x121\n" + - "\x14session_certificates\x18\x01 \x03(\fR\x13sessionCertificates2\xcd\x04\n" + + "\x14session_certificates\x18\x01 \x03(\fR\x13sessionCertificates2\xaf\x05\n" + "\aControl\x12?\n" + "\bGetActor\x12\x17.ateapi.GetActorRequest\x1a\x18.ateapi.GetActorResponse\"\x00\x12H\n" + "\vCreateActor\x12\x1a.ateapi.CreateActorRequest\x1a\x1b.ateapi.CreateActorResponse\"\x00\x12K\n" + @@ -1400,7 +1495,8 @@ const file_ateapi_proto_rawDesc = "" + "\n" + "ListActors\x12\x19.ateapi.ListActorsRequest\x1a\x1a.ateapi.ListActorsResponse\"\x00\x12E\n" + "\n" + - "DebugClear\x12\x19.ateapi.DebugClearRequest\x1a\x1a.ateapi.DebugClearResponse\"\x002\x8c\x01\n" + + "DebugClear\x12\x19.ateapi.DebugClearRequest\x1a\x1a.ateapi.DebugClearResponse\"\x00\x12`\n" + + "\x15WatchCapacityPressure\x12$.ateapi.WatchCapacityPressureRequest\x1a\x1d.ateapi.CapacityPressureEvent\"\x000\x012\x8c\x01\n" + "\x0fSessionIdentity\x12:\n" + "\aMintJWT\x12\x16.ateapi.MintJWTRequest\x1a\x17.ateapi.MintJWTResponse\x12=\n" + "\bMintCert\x12\x17.ateapi.MintCertRequest\x1a\x18.ateapi.MintCertResponseB9Z7github.com/agent-substrate/substrate/pkg/proto/ateapipbb\x06proto3" @@ -1418,31 +1514,33 @@ func file_ateapi_proto_rawDescGZIP() []byte { } var file_ateapi_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_ateapi_proto_msgTypes = make([]protoimpl.MessageInfo, 22) +var file_ateapi_proto_msgTypes = make([]protoimpl.MessageInfo, 24) var file_ateapi_proto_goTypes = []any{ - (Actor_Status)(0), // 0: ateapi.Actor.Status - (*Actor)(nil), // 1: ateapi.Actor - (*GetActorRequest)(nil), // 2: ateapi.GetActorRequest - (*GetActorResponse)(nil), // 3: ateapi.GetActorResponse - (*CreateActorRequest)(nil), // 4: ateapi.CreateActorRequest - (*CreateActorResponse)(nil), // 5: ateapi.CreateActorResponse - (*SuspendActorRequest)(nil), // 6: ateapi.SuspendActorRequest - (*SuspendActorResponse)(nil), // 7: ateapi.SuspendActorResponse - (*ResumeActorRequest)(nil), // 8: ateapi.ResumeActorRequest - (*ResumeActorResponse)(nil), // 9: ateapi.ResumeActorResponse - (*DeleteActorRequest)(nil), // 10: ateapi.DeleteActorRequest - (*DeleteActorResponse)(nil), // 11: ateapi.DeleteActorResponse - (*ListWorkersRequest)(nil), // 12: ateapi.ListWorkersRequest - (*ListWorkersResponse)(nil), // 13: ateapi.ListWorkersResponse - (*ListActorsRequest)(nil), // 14: ateapi.ListActorsRequest - (*ListActorsResponse)(nil), // 15: ateapi.ListActorsResponse - (*Worker)(nil), // 16: ateapi.Worker - (*DebugClearRequest)(nil), // 17: ateapi.DebugClearRequest - (*DebugClearResponse)(nil), // 18: ateapi.DebugClearResponse - (*MintJWTRequest)(nil), // 19: ateapi.MintJWTRequest - (*MintJWTResponse)(nil), // 20: ateapi.MintJWTResponse - (*MintCertRequest)(nil), // 21: ateapi.MintCertRequest - (*MintCertResponse)(nil), // 22: ateapi.MintCertResponse + (Actor_Status)(0), // 0: ateapi.Actor.Status + (*Actor)(nil), // 1: ateapi.Actor + (*GetActorRequest)(nil), // 2: ateapi.GetActorRequest + (*GetActorResponse)(nil), // 3: ateapi.GetActorResponse + (*CreateActorRequest)(nil), // 4: ateapi.CreateActorRequest + (*CreateActorResponse)(nil), // 5: ateapi.CreateActorResponse + (*SuspendActorRequest)(nil), // 6: ateapi.SuspendActorRequest + (*SuspendActorResponse)(nil), // 7: ateapi.SuspendActorResponse + (*ResumeActorRequest)(nil), // 8: ateapi.ResumeActorRequest + (*ResumeActorResponse)(nil), // 9: ateapi.ResumeActorResponse + (*DeleteActorRequest)(nil), // 10: ateapi.DeleteActorRequest + (*DeleteActorResponse)(nil), // 11: ateapi.DeleteActorResponse + (*ListWorkersRequest)(nil), // 12: ateapi.ListWorkersRequest + (*ListWorkersResponse)(nil), // 13: ateapi.ListWorkersResponse + (*ListActorsRequest)(nil), // 14: ateapi.ListActorsRequest + (*ListActorsResponse)(nil), // 15: ateapi.ListActorsResponse + (*Worker)(nil), // 16: ateapi.Worker + (*DebugClearRequest)(nil), // 17: ateapi.DebugClearRequest + (*DebugClearResponse)(nil), // 18: ateapi.DebugClearResponse + (*WatchCapacityPressureRequest)(nil), // 19: ateapi.WatchCapacityPressureRequest + (*CapacityPressureEvent)(nil), // 20: ateapi.CapacityPressureEvent + (*MintJWTRequest)(nil), // 21: ateapi.MintJWTRequest + (*MintJWTResponse)(nil), // 22: ateapi.MintJWTResponse + (*MintCertRequest)(nil), // 23: ateapi.MintCertRequest + (*MintCertResponse)(nil), // 24: ateapi.MintCertResponse } var file_ateapi_proto_depIdxs = []int32{ 0, // 0: ateapi.Actor.status:type_name -> ateapi.Actor.Status @@ -1460,20 +1558,22 @@ var file_ateapi_proto_depIdxs = []int32{ 12, // 12: ateapi.Control.ListWorkers:input_type -> ateapi.ListWorkersRequest 14, // 13: ateapi.Control.ListActors:input_type -> ateapi.ListActorsRequest 17, // 14: ateapi.Control.DebugClear:input_type -> ateapi.DebugClearRequest - 19, // 15: ateapi.SessionIdentity.MintJWT:input_type -> ateapi.MintJWTRequest - 21, // 16: ateapi.SessionIdentity.MintCert:input_type -> ateapi.MintCertRequest - 3, // 17: ateapi.Control.GetActor:output_type -> ateapi.GetActorResponse - 5, // 18: ateapi.Control.CreateActor:output_type -> ateapi.CreateActorResponse - 7, // 19: ateapi.Control.SuspendActor:output_type -> ateapi.SuspendActorResponse - 9, // 20: ateapi.Control.ResumeActor:output_type -> ateapi.ResumeActorResponse - 11, // 21: ateapi.Control.DeleteActor:output_type -> ateapi.DeleteActorResponse - 13, // 22: ateapi.Control.ListWorkers:output_type -> ateapi.ListWorkersResponse - 15, // 23: ateapi.Control.ListActors:output_type -> ateapi.ListActorsResponse - 18, // 24: ateapi.Control.DebugClear:output_type -> ateapi.DebugClearResponse - 20, // 25: ateapi.SessionIdentity.MintJWT:output_type -> ateapi.MintJWTResponse - 22, // 26: ateapi.SessionIdentity.MintCert:output_type -> ateapi.MintCertResponse - 17, // [17:27] is the sub-list for method output_type - 7, // [7:17] is the sub-list for method input_type + 19, // 15: ateapi.Control.WatchCapacityPressure:input_type -> ateapi.WatchCapacityPressureRequest + 21, // 16: ateapi.SessionIdentity.MintJWT:input_type -> ateapi.MintJWTRequest + 23, // 17: ateapi.SessionIdentity.MintCert:input_type -> ateapi.MintCertRequest + 3, // 18: ateapi.Control.GetActor:output_type -> ateapi.GetActorResponse + 5, // 19: ateapi.Control.CreateActor:output_type -> ateapi.CreateActorResponse + 7, // 20: ateapi.Control.SuspendActor:output_type -> ateapi.SuspendActorResponse + 9, // 21: ateapi.Control.ResumeActor:output_type -> ateapi.ResumeActorResponse + 11, // 22: ateapi.Control.DeleteActor:output_type -> ateapi.DeleteActorResponse + 13, // 23: ateapi.Control.ListWorkers:output_type -> ateapi.ListWorkersResponse + 15, // 24: ateapi.Control.ListActors:output_type -> ateapi.ListActorsResponse + 18, // 25: ateapi.Control.DebugClear:output_type -> ateapi.DebugClearResponse + 20, // 26: ateapi.Control.WatchCapacityPressure:output_type -> ateapi.CapacityPressureEvent + 22, // 27: ateapi.SessionIdentity.MintJWT:output_type -> ateapi.MintJWTResponse + 24, // 28: ateapi.SessionIdentity.MintCert:output_type -> ateapi.MintCertResponse + 18, // [18:29] is the sub-list for method output_type + 7, // [7:18] is the sub-list for method input_type 7, // [7:7] is the sub-list for extension type_name 7, // [7:7] is the sub-list for extension extendee 0, // [0:7] is the sub-list for field type_name @@ -1490,7 +1590,7 @@ func file_ateapi_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_ateapi_proto_rawDesc), len(file_ateapi_proto_rawDesc)), NumEnums: 1, - NumMessages: 22, + NumMessages: 24, NumExtensions: 0, NumServices: 2, }, diff --git a/pkg/proto/ateapipb/ateapi.proto b/pkg/proto/ateapipb/ateapi.proto index 8cce2b41c..0f8541352 100644 --- a/pkg/proto/ateapipb/ateapi.proto +++ b/pkg/proto/ateapipb/ateapi.proto @@ -46,6 +46,11 @@ service Control { // Debugging: drop all data from the ate database. rpc DebugClear(DebugClearRequest) returns (DebugClearResponse) {} + + // Stream a notification whenever a resume finds no free worker in a pool — a + // capacity-pressure signal sensed at the request edge. The autoscaler + // subscribes so it can react immediately instead of waiting for its poll. + rpc WatchCapacityPressure(WatchCapacityPressureRequest) returns (stream CapacityPressureEvent) {} } message Actor { @@ -168,6 +173,15 @@ message DebugClearRequest {} message DebugClearResponse {} +message WatchCapacityPressureRequest {} + +// CapacityPressureEvent signals that a worker pool had no free worker for a +// resume, identifying the pool so a consumer can react per-pool. +message CapacityPressureEvent { + string worker_namespace = 1; + string worker_pool = 2; +} + // SessionIdentity allows substrate workloads to exchange their // infrastructure-level credentials (k8s service account token, etc.) for a // substrate session-level credential. A given substrate session might migrate diff --git a/pkg/proto/ateapipb/ateapi_grpc.pb.go b/pkg/proto/ateapipb/ateapi_grpc.pb.go index c79b0cbff..d6cb7a380 100644 --- a/pkg/proto/ateapipb/ateapi_grpc.pb.go +++ b/pkg/proto/ateapipb/ateapi_grpc.pb.go @@ -35,14 +35,15 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - Control_GetActor_FullMethodName = "/ateapi.Control/GetActor" - Control_CreateActor_FullMethodName = "/ateapi.Control/CreateActor" - Control_SuspendActor_FullMethodName = "/ateapi.Control/SuspendActor" - Control_ResumeActor_FullMethodName = "/ateapi.Control/ResumeActor" - Control_DeleteActor_FullMethodName = "/ateapi.Control/DeleteActor" - Control_ListWorkers_FullMethodName = "/ateapi.Control/ListWorkers" - Control_ListActors_FullMethodName = "/ateapi.Control/ListActors" - Control_DebugClear_FullMethodName = "/ateapi.Control/DebugClear" + Control_GetActor_FullMethodName = "/ateapi.Control/GetActor" + Control_CreateActor_FullMethodName = "/ateapi.Control/CreateActor" + Control_SuspendActor_FullMethodName = "/ateapi.Control/SuspendActor" + Control_ResumeActor_FullMethodName = "/ateapi.Control/ResumeActor" + Control_DeleteActor_FullMethodName = "/ateapi.Control/DeleteActor" + Control_ListWorkers_FullMethodName = "/ateapi.Control/ListWorkers" + Control_ListActors_FullMethodName = "/ateapi.Control/ListActors" + Control_DebugClear_FullMethodName = "/ateapi.Control/DebugClear" + Control_WatchCapacityPressure_FullMethodName = "/ateapi.Control/WatchCapacityPressure" ) // ControlClient is the client API for Control service. @@ -67,6 +68,10 @@ type ControlClient interface { ListActors(ctx context.Context, in *ListActorsRequest, opts ...grpc.CallOption) (*ListActorsResponse, error) // Debugging: drop all data from the ate database. DebugClear(ctx context.Context, in *DebugClearRequest, opts ...grpc.CallOption) (*DebugClearResponse, error) + // Stream a notification whenever a resume finds no free worker in a pool — a + // capacity-pressure signal sensed at the request edge. The autoscaler + // subscribes so it can react immediately instead of waiting for its poll. + WatchCapacityPressure(ctx context.Context, in *WatchCapacityPressureRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[CapacityPressureEvent], error) } type controlClient struct { @@ -157,6 +162,25 @@ func (c *controlClient) DebugClear(ctx context.Context, in *DebugClearRequest, o return out, nil } +func (c *controlClient) WatchCapacityPressure(ctx context.Context, in *WatchCapacityPressureRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[CapacityPressureEvent], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Control_ServiceDesc.Streams[0], Control_WatchCapacityPressure_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[WatchCapacityPressureRequest, CapacityPressureEvent]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Control_WatchCapacityPressureClient = grpc.ServerStreamingClient[CapacityPressureEvent] + // ControlServer is the server API for Control service. // All implementations must embed UnimplementedControlServer // for forward compatibility. @@ -179,6 +203,10 @@ type ControlServer interface { ListActors(context.Context, *ListActorsRequest) (*ListActorsResponse, error) // Debugging: drop all data from the ate database. DebugClear(context.Context, *DebugClearRequest) (*DebugClearResponse, error) + // Stream a notification whenever a resume finds no free worker in a pool — a + // capacity-pressure signal sensed at the request edge. The autoscaler + // subscribes so it can react immediately instead of waiting for its poll. + WatchCapacityPressure(*WatchCapacityPressureRequest, grpc.ServerStreamingServer[CapacityPressureEvent]) error mustEmbedUnimplementedControlServer() } @@ -213,6 +241,9 @@ func (UnimplementedControlServer) ListActors(context.Context, *ListActorsRequest func (UnimplementedControlServer) DebugClear(context.Context, *DebugClearRequest) (*DebugClearResponse, error) { return nil, status.Error(codes.Unimplemented, "method DebugClear not implemented") } +func (UnimplementedControlServer) WatchCapacityPressure(*WatchCapacityPressureRequest, grpc.ServerStreamingServer[CapacityPressureEvent]) error { + return status.Error(codes.Unimplemented, "method WatchCapacityPressure not implemented") +} func (UnimplementedControlServer) mustEmbedUnimplementedControlServer() {} func (UnimplementedControlServer) testEmbeddedByValue() {} @@ -378,6 +409,17 @@ func _Control_DebugClear_Handler(srv interface{}, ctx context.Context, dec func( return interceptor(ctx, in, info, handler) } +func _Control_WatchCapacityPressure_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(WatchCapacityPressureRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ControlServer).WatchCapacityPressure(m, &grpc.GenericServerStream[WatchCapacityPressureRequest, CapacityPressureEvent]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Control_WatchCapacityPressureServer = grpc.ServerStreamingServer[CapacityPressureEvent] + // Control_ServiceDesc is the grpc.ServiceDesc for Control service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -418,7 +460,13 @@ var Control_ServiceDesc = grpc.ServiceDesc{ Handler: _Control_DebugClear_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "WatchCapacityPressure", + Handler: _Control_WatchCapacityPressure_Handler, + ServerStreams: true, + }, + }, Metadata: "ateapi.proto", }