Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/api-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ The `WorkerPool` defines the pool of physical "warm" compute capacity. It manage
| :--- | :--- | :--- |
| `replicas` | `int32` | **Required.** Number of physical standby pods to maintain in the cluster. |
| `ateomImage` | `string` | **Required.** The container image for the `ateom` herder process (e.g. `ko://github.com/agent-substrate/substrate/cmd/ateom-gvisor`). |
| `template` | `WorkerPoolPodTemplate` | **Optional.** Pod scheduling and resource settings for worker pods. |

#### `WorkerPoolPodTemplate` (`spec.template`)

| Field | Type | Pod mapping |
| :--- | :--- | :--- |
| `resources` | `WorkerPoolResourceRequirements` | `containers[name=ateom].resources` (`requests` and `limits` only) |
| `nodeSelector` | `map[string]string` | `spec.nodeSelector` |
| `tolerations` | `[]Toleration` | `spec.tolerations` (max 16) |
| `priorityClassName` | `string` | `spec.priorityClassName` |
| `nodeAffinity` | `NodeAffinity` | `spec.affinity.nodeAffinity` |

### Example

Expand All @@ -26,6 +37,40 @@ spec:
ateomImage: ko://github.com/agent-substrate/substrate/cmd/ateom-gvisor
```

### Example with GPU node scheduling

```yaml
apiVersion: ate.dev/v1alpha1
kind: WorkerPool
metadata:
name: gpu-pool
namespace: ate-demo
spec:
replicas: 5
ateomImage: ko://github.com/agent-substrate/substrate/cmd/ateom-gvisor
template:
resources:
requests:
cpu: "4"
memory: 16Gi
limits:
memory: 16Gi
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-tesla-t4
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
priorityClassName: substrate-workers
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: workload
operator: In
values: [substrate]
```

---

## 2. ActorTemplate: The Workload Blueprint
Expand Down
7 changes: 4 additions & 3 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,10 @@ These resources define the intended state of the system and are managed via
Kubernetes CRD APIs. They are used for administrative operations and actor
environment definitions.

* **WorkerPool**: Defines a pool of "warm" compute capacity. It specifies the
hardware shape (CPU, memory, accelerators), manages a fleet of standby
worker pods initialized and ready to receive resumed actor states.
* **WorkerPool**: Defines a pool of "warm" compute capacity. It manages a
fleet of standby worker pods initialized and ready to receive resumed actor
states. Optional `spec.template` fields configure worker pod resources,
node selection, tolerations, priority class, and node affinity.

* **ActorTemplate**: An immutable definition of an actor-version. It
encapsulates the container image, configuration, and environment required
Expand Down
141 changes: 141 additions & 0 deletions internal/controllers/workerpool_apply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllers

import (
corev1 "k8s.io/api/core/v1"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"

atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
)

func applyWorkerPoolPodTemplate(
podSpecAC *corev1ac.PodSpecApplyConfiguration,
containerAC *corev1ac.ContainerApplyConfiguration,
tmpl *atev1alpha1.WorkerPoolPodTemplate,
) {
podSpecAC.NodeSelector = map[string]string{}
podSpecAC.Tolerations = []corev1ac.TolerationApplyConfiguration{}
podSpecAC.WithPriorityClassName("")
podSpecAC.WithAffinity(corev1ac.Affinity())
containerAC.WithResources(corev1ac.ResourceRequirements())

if tmpl == nil {
return
}

if tmpl.NodeSelector != nil {
podSpecAC.WithNodeSelector(tmpl.NodeSelector)
}
podSpecAC.Tolerations = tolerationApplyValues(tolerationsToApply(tmpl.Tolerations))
podSpecAC.WithPriorityClassName(tmpl.PriorityClassName)

if tmpl.NodeAffinity != nil {
podSpecAC.WithAffinity(corev1ac.Affinity().WithNodeAffinity(nodeAffinityToApply(tmpl.NodeAffinity)))
}

if tmpl.Resources != nil {
containerAC.WithResources(resourceRequirementsToApply(tmpl.Resources))
}
}

func resourceRequirementsToApply(r *atev1alpha1.WorkerPoolResourceRequirements) *corev1ac.ResourceRequirementsApplyConfiguration {
ac := corev1ac.ResourceRequirements()
if len(r.Limits) > 0 {
ac.WithLimits(r.Limits)
}
if len(r.Requests) > 0 {
ac.WithRequests(r.Requests)
}
return ac
}

func tolerationApplyValues(tolerations []*corev1ac.TolerationApplyConfiguration) []corev1ac.TolerationApplyConfiguration {
out := make([]corev1ac.TolerationApplyConfiguration, 0, len(tolerations))
for _, toleration := range tolerations {
out = append(out, *toleration)
}
return out
}

func tolerationsToApply(tolerations []corev1.Toleration) []*corev1ac.TolerationApplyConfiguration {
out := make([]*corev1ac.TolerationApplyConfiguration, 0, len(tolerations))
for i := range tolerations {
t := &tolerations[i]
ac := corev1ac.Toleration()
if t.Key != "" {
ac.WithKey(t.Key)
}
if t.Operator != "" {
ac.WithOperator(t.Operator)
}
if t.Value != "" {
ac.WithValue(t.Value)
}
if t.Effect != "" {
ac.WithEffect(t.Effect)
}
if t.TolerationSeconds != nil {
ac.WithTolerationSeconds(*t.TolerationSeconds)
}
out = append(out, ac)
}
return out
}

func nodeAffinityToApply(na *corev1.NodeAffinity) *corev1ac.NodeAffinityApplyConfiguration {
ac := corev1ac.NodeAffinity()
if na.RequiredDuringSchedulingIgnoredDuringExecution != nil {
ac.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelectorToApply(na.RequiredDuringSchedulingIgnoredDuringExecution))
}
for i := range na.PreferredDuringSchedulingIgnoredDuringExecution {
term := &na.PreferredDuringSchedulingIgnoredDuringExecution[i]
ac.WithPreferredDuringSchedulingIgnoredDuringExecution(preferredSchedulingTermToApply(term))
}
return ac
}

func nodeSelectorToApply(ns *corev1.NodeSelector) *corev1ac.NodeSelectorApplyConfiguration {
ac := corev1ac.NodeSelector()
for i := range ns.NodeSelectorTerms {
ac.WithNodeSelectorTerms(nodeSelectorTermToApply(&ns.NodeSelectorTerms[i]))
}
return ac
}

func preferredSchedulingTermToApply(term *corev1.PreferredSchedulingTerm) *corev1ac.PreferredSchedulingTermApplyConfiguration {
return corev1ac.PreferredSchedulingTerm().
WithWeight(term.Weight).
WithPreference(nodeSelectorTermToApply(&term.Preference))
}

func nodeSelectorTermToApply(term *corev1.NodeSelectorTerm) *corev1ac.NodeSelectorTermApplyConfiguration {
ac := corev1ac.NodeSelectorTerm()
for i := range term.MatchExpressions {
ac.WithMatchExpressions(nodeSelectorRequirementToApply(&term.MatchExpressions[i]))
}
for i := range term.MatchFields {
ac.WithMatchFields(nodeSelectorRequirementToApply(&term.MatchFields[i]))
}
return ac
}

func nodeSelectorRequirementToApply(req *corev1.NodeSelectorRequirement) *corev1ac.NodeSelectorRequirementApplyConfiguration {
ac := corev1ac.NodeSelectorRequirement().WithKey(req.Key).WithOperator(req.Operator)
if len(req.Values) > 0 {
ac.WithValues(req.Values...)
}
return ac
}
102 changes: 102 additions & 0 deletions internal/controllers/workerpool_apply_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllers

import (
"encoding/json"
"testing"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
)

func TestBuildDeploymentApplyConfigPodTemplate(t *testing.T) {
wp := &atev1alpha1.WorkerPool{
ObjectMeta: metav1.ObjectMeta{Name: "pool", Namespace: "default", UID: "uid"},
Spec: atev1alpha1.WorkerPoolSpec{
Replicas: 2,
AteomImage: "ateom:v1",
Template: sampleWorkerPoolPodTemplate(),
},
}

depAC := buildDeploymentApplyConfig(wp)
data, err := json.Marshal(depAC)
if err != nil {
t.Fatalf("marshal: %v", err)
}

var dep map[string]any
if err := json.Unmarshal(data, &dep); err != nil {
t.Fatalf("unmarshal: %v", err)
}

spec := dep["spec"].(map[string]any)
template := spec["template"].(map[string]any)
podSpec := template["spec"].(map[string]any)

nodeSelector := podSpec["nodeSelector"].(map[string]any)
if nodeSelector["workload"] != "substrate" {
t.Fatalf("unexpected nodeSelector: %v", nodeSelector)
}
if podSpec["priorityClassName"] != "substrate-workers" {
t.Fatalf("unexpected priorityClassName: %v", podSpec["priorityClassName"])
}

containers := podSpec["containers"].([]any)
container := containers[0].(map[string]any)
resources := container["resources"].(map[string]any)
requests := resources["requests"].(map[string]any)
if requests["cpu"] != "2" {
t.Fatalf("unexpected cpu request: %v", requests["cpu"])
}
}

func TestBuildDeploymentApplyConfigClearsNodeSelector(t *testing.T) {
wp := &atev1alpha1.WorkerPool{
ObjectMeta: metav1.ObjectMeta{Name: "pool", Namespace: "default", UID: "uid"},
Spec: atev1alpha1.WorkerPoolSpec{
Replicas: 1,
AteomImage: "ateom:v1",
Template: &atev1alpha1.WorkerPoolPodTemplate{
Resources: &atev1alpha1.WorkerPoolResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
},
},
},
},
}

depAC := buildDeploymentApplyConfig(wp)
data, err := json.Marshal(depAC)
if err != nil {
t.Fatalf("marshal: %v", err)
}

var dep map[string]any
if err := json.Unmarshal(data, &dep); err != nil {
t.Fatalf("unmarshal: %v", err)
}

podSpec := dep["spec"].(map[string]any)["template"].(map[string]any)["spec"].(map[string]any)
nodeSelector, ok := podSpec["nodeSelector"].(map[string]any)
if ok && len(nodeSelector) != 0 {
t.Fatalf("expected empty nodeSelector, got %v", nodeSelector)
}
}
81 changes: 81 additions & 0 deletions internal/controllers/workerpool_apply_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllers

import (
"testing"

corev1 "k8s.io/api/core/v1"
)

func TestTolerationsToApply(t *testing.T) {
tolerations := []corev1.Toleration{{
Key: "gpu",
Operator: corev1.TolerationOpEqual,
Value: "true",
Effect: corev1.TaintEffectNoSchedule,
TolerationSeconds: ptrInt64(300),
}}

got := tolerationsToApply(tolerations)
if len(got) != 1 {
t.Fatalf("expected 1 toleration, got %d", len(got))
}
if got[0].Key == nil || *got[0].Key != "gpu" {
t.Fatalf("unexpected key: %v", got[0].Key)
}
if got[0].TolerationSeconds == nil || *got[0].TolerationSeconds != 300 {
t.Fatalf("unexpected tolerationSeconds: %v", got[0].TolerationSeconds)
}
}

func TestNodeAffinityToApply(t *testing.T) {
na := &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{{
MatchExpressions: []corev1.NodeSelectorRequirement{{
Key: "zone",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"us-west1-a"},
}},
}},
},
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{{
Weight: 50,
Preference: corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{{
Key: "disk",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"ssd"},
}},
},
}},
}

got := nodeAffinityToApply(na)
if got.RequiredDuringSchedulingIgnoredDuringExecution == nil {
t.Fatal("expected required node selector")
}
if len(got.PreferredDuringSchedulingIgnoredDuringExecution) != 1 {
t.Fatalf("expected 1 preferred term, got %d", len(got.PreferredDuringSchedulingIgnoredDuringExecution))
}
if got.PreferredDuringSchedulingIgnoredDuringExecution[0].Weight == nil || *got.PreferredDuringSchedulingIgnoredDuringExecution[0].Weight != 50 {
t.Fatalf("unexpected weight: %v", got.PreferredDuringSchedulingIgnoredDuringExecution[0].Weight)
}
}

func ptrInt64(v int64) *int64 {
return &v
}
Loading