diff --git a/cmd/atelet/main.go b/cmd/atelet/main.go index c4425bade..802e099e8 100644 --- a/cmd/atelet/main.go +++ b/cmd/atelet/main.go @@ -519,6 +519,17 @@ func (s *AteomHerder) prepareOCIBundles( ) error { netnsPath := ateompath.AteomNetNSPath(targetAteomUid) + // Populate the per-actor identity directory that gets bind-mounted into + // the application containers. Regenerated on every resume, so it carries + // the correct per-actor ID even when restoring from the golden snapshot. + identityDir := ateompath.ActorIdentityDirPath(actorTemplateNamespace, actorTemplateName, actorID) + if err := os.MkdirAll(identityDir, 0o755); err != nil { + return fmt.Errorf("while creating actor identity dir: %w", err) + } + if err := writeFileAtomic(filepath.Join(identityDir, ActorIDFileName), []byte(actorID), 0o644); err != nil { + return fmt.Errorf("while writing actor identity file: %w", err) + } + g, gCtx := errgroup.WithContext(ctx) // Pause container. @@ -536,6 +547,7 @@ func (s *AteomHerder) prepareOCIBundles( "io.kubernetes.cri.container-name": "pause", }, netnsPath, + "", // pause is sandbox infra; it gets no actor identity mount. ); err != nil { return fmt.Errorf("while creating pause OCI bundle: %w", err) } @@ -564,6 +576,7 @@ func (s *AteomHerder) prepareOCIBundles( "io.kubernetes.cri.container-name": ctr.GetName(), }, netnsPath, + identityDir, ); err != nil { return fmt.Errorf("while creating %q OCI bundle: %w", ctr.GetName(), err) } @@ -681,6 +694,45 @@ func validateActorRequest(namespace, template, actorID, targetAteomUID string, s return resources.ValidateContainerNames(names) } +// writeFileAtomic writes data to path by writing a temp file in the same +// directory, syncing, and renaming it over the target, then syncing the +// parent directory so the rename is durable. The identity directory is +// bind-mounted into actors, so the file must change atomically: a reader +// must never observe a truncated or partially written value. +func writeFileAtomic(path string, data []byte, perm os.FileMode) error { + f, err := os.CreateTemp(filepath.Dir(path), "."+filepath.Base(path)+".tmp-*") + if err != nil { + return err + } + defer os.Remove(f.Name()) // no-op once the rename succeeds + + if _, err := f.Write(data); err != nil { + f.Close() + return err + } + if err := f.Chmod(perm); err != nil { + f.Close() + return err + } + if err := f.Sync(); err != nil { + f.Close() + return err + } + if err := f.Close(); err != nil { + return err + } + if err := os.Rename(f.Name(), path); err != nil { + return err + } + + dir, err := os.Open(filepath.Dir(path)) + if err != nil { + return err + } + defer dir.Close() + return dir.Sync() +} + func resetActorDirs(actorTemplateNamespace, actorTemplateName, actorID string) error { // Explicitly leave runsc logs dir untouched. @@ -724,5 +776,15 @@ func resetActorDirs(actorTemplateNamespace, actorTemplateName, actorID string) e return fmt.Errorf("while creating restore-state dir: %w", err) } + // World-readable (0o755): bind-mounted into the actor, whose workload + // reads it through the gofer. + identityDir := ateompath.ActorIdentityDirPath(actorTemplateNamespace, actorTemplateName, actorID) + if err := os.RemoveAll(identityDir); err != nil { + return fmt.Errorf("while deleting actor identity dir: %w", err) + } + if err := os.MkdirAll(identityDir, 0o755); err != nil { + return fmt.Errorf("while creating actor identity dir: %w", err) + } + return nil } diff --git a/cmd/atelet/main_test.go b/cmd/atelet/main_test.go index aad83fa61..462b307ce 100644 --- a/cmd/atelet/main_test.go +++ b/cmd/atelet/main_test.go @@ -17,6 +17,7 @@ package main import ( "context" "os" + "path/filepath" "testing" "github.com/agent-substrate/substrate/internal/ateompath" @@ -25,6 +26,55 @@ import ( "google.golang.org/grpc/status" ) +func TestWriteFileAtomic(t *testing.T) { + dir := t.TempDir() + target := filepath.Join(dir, "actor-id") + + // One shared write over an existing value, as happens on every resume; + // each subtest checks one postcondition. + if err := os.WriteFile(target, []byte("golden-id"), 0o600); err != nil { + t.Fatalf("seeding target: %v", err) + } + if err := writeFileAtomic(target, []byte("counter-1"), 0o644); err != nil { + t.Fatalf("writeFileAtomic: %v", err) + } + + t.Run("replaces content", func(t *testing.T) { + got, err := os.ReadFile(target) + if err != nil { + t.Fatalf("reading target: %v", err) + } + if string(got) != "counter-1" { + t.Errorf("content = %q, want %q", got, "counter-1") + } + }) + + t.Run("sets permissions", func(t *testing.T) { + info, err := os.Stat(target) + if err != nil { + t.Fatalf("stat target: %v", err) + } + if perm := info.Mode().Perm(); perm != 0o644 { + t.Errorf("perm = %o, want 644", perm) + } + }) + + t.Run("leaves no temp files", func(t *testing.T) { + // The directory is visible inside the actor. + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("reading dir: %v", err) + } + if len(entries) != 1 { + names := make([]string, 0, len(entries)) + for _, e := range entries { + names = append(names, e.Name()) + } + t.Errorf("leftover files in identity dir: %v", names) + } + }) +} + func TestValidateActorRequest(t *testing.T) { const okNS, okTmpl, okID, okUID = "ate-demo", "counter", "counter-1", "422938ba-8860-4983-a25d-d6bcb0a69d4e" okSpec := &ateletpb.WorkloadSpec{Containers: []*ateletpb.Container{{Name: "worker"}}} diff --git a/cmd/atelet/oci.go b/cmd/atelet/oci.go index 3bf7503f6..cd5ebc12f 100644 --- a/cmd/atelet/oci.go +++ b/cmd/atelet/oci.go @@ -34,7 +34,24 @@ import ( "go.opentelemetry.io/otel/attribute" ) -func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryPullCache, actorTemplateNamespace, actorTemplateName, actorID, containerName, ref string, args []string, env []string, annotations map[string]string, netns string) error { +const ( + // IdentityMountPath is the in-actor directory at which atelet bind-mounts + // the actor's identity data. Workloads read the files inside it (at + // request time, not cached at startup) to learn about themselves. It is + // delivered as a per-actor bind mount rather than environment variables + // because env lives in the checkpointed process memory and would be + // frozen at the golden snapshot's values after a restore; a bind mount is + // re-attached per-actor on every resume. A directory (rather than a + // single-file mount) so further identity data can be added without + // changing the mount shape. + IdentityMountPath = "/run/ate" + + // ActorIDFileName is the file inside IdentityMountPath holding the + // actor's own ID, raw with no trailing newline. + ActorIDFileName = "actor-id" +) + +func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryPullCache, actorTemplateNamespace, actorTemplateName, actorID, containerName, ref string, args []string, env []string, annotations map[string]string, netns string, identityDir string) error { tracer := otel.Tracer("prepareOCIDirectory") ctx, span := tracer.Start(ctx, "prepareOCIDirectory") @@ -62,12 +79,77 @@ func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryP return fmt.Errorf("in untar: %w", err) } + // Bind-mount the per-actor identity directory so the workload can read its + // own ID at IdentityMountPath/ActorIDFileName. The bind target must exist + // in the rootfs for the mount to attach. + if identityDir != "" { + if err := createMountPoint(rootPath, IdentityMountPath); err != nil { + return fmt.Errorf("while creating identity mount point: %w", err) + } + } + + ociSpec := buildActorOCISpec(args, env, annotations, netns, identityDir) + ociSpecBytes, err := json.MarshalIndent(ociSpec, "", " ") + if err != nil { + return fmt.Errorf("while marshaling OCI spec: %w", err) + } + specPath := path.Join(bundlePath, "config.json") + if err := os.WriteFile(specPath, ociSpecBytes, 0o600); err != nil { + return fmt.Errorf("while writing OCI spec: %w", err) + } + + return nil +} + +// buildActorOCISpec assembles the OCI runtime spec for an actor container. +// When identityDir is non-empty it adds a read-only bind mount of that host +// directory at IdentityMountPath so the actor can read its own ID (see +// IdentityMountPath for why this is a bind mount rather than env vars). +func buildActorOCISpec(args []string, env []string, annotations map[string]string, netns string, identityDir string) *specs.Spec { envVars := []string{ "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", } envVars = append(envVars, env...) - ociSpec := &specs.Spec{ + mounts := []specs.Mount{ + { + Destination: "/proc", + Type: "proc", + Source: "proc", + }, + { + Destination: "/dev", + Type: "tmpfs", + Source: "tmpfs", + }, + { + Destination: "/sys", + Type: "sysfs", + Source: "sysfs", + Options: []string{ + "nosuid", + "noexec", + "nodev", + "ro", + }, + }, + { + Destination: "/etc/resolv.conf", + Type: "bind", + Source: "/etc/resolv.conf", + Options: []string{"ro"}, + }, + } + if identityDir != "" { + mounts = append(mounts, specs.Mount{ + Destination: IdentityMountPath, + Type: "bind", + Source: identityDir, + Options: []string{"ro"}, + }) + } + + return &specs.Spec{ Process: &specs.Process{ User: specs.User{ UID: 0, @@ -112,35 +194,7 @@ func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryP Readonly: false, }, Hostname: "runsc", - Mounts: []specs.Mount{ - { - Destination: "/proc", - Type: "proc", - Source: "proc", - }, - { - Destination: "/dev", - Type: "tmpfs", - Source: "tmpfs", - }, - { - Destination: "/sys", - Type: "sysfs", - Source: "sysfs", - Options: []string{ - "nosuid", - "noexec", - "nodev", - "ro", - }, - }, - { - Destination: "/etc/resolv.conf", - Type: "bind", - Source: "/etc/resolv.conf", - Options: []string{"ro"}, - }, - }, + Mounts: mounts, Linux: &specs.Linux{ Namespaces: []specs.LinuxNamespace{ { @@ -163,15 +217,23 @@ func prepareOCIDirectory(ctx context.Context, pullCache *memorypullcache.MemoryP }, Annotations: annotations, } - ociSpecBytes, err := json.MarshalIndent(ociSpec, "", " ") +} + +// createMountPoint creates the directory mountPath (an absolute in-rootfs +// path) to serve as a bind-mount target. It uses os.Root so the operation is +// confined to rootPath: a symlink planted by the image cannot redirect the +// write outside the extracted rootfs (same protection untar relies on). +func createMountPoint(rootPath, mountPath string) error { + root, err := os.OpenRoot(rootPath) if err != nil { - return fmt.Errorf("while marshaling OCI spec: %w", err) - } - specPath := path.Join(bundlePath, "config.json") - if err := os.WriteFile(specPath, ociSpecBytes, 0o600); err != nil { - return fmt.Errorf("while writing OCI spec: %w", err) + return fmt.Errorf("opening rootfs %q: %w", rootPath, err) } + defer root.Close() + rel := strings.TrimPrefix(mountPath, "/") + if err := root.MkdirAll(rel, 0o755); err != nil { + return fmt.Errorf("creating mount dir %q: %w", rel, err) + } return nil } diff --git a/cmd/atelet/oci_test.go b/cmd/atelet/oci_test.go index 4fbaf6a58..5b53f2520 100644 --- a/cmd/atelet/oci_test.go +++ b/cmd/atelet/oci_test.go @@ -20,6 +20,7 @@ import ( "context" "os" "path/filepath" + "slices" "strings" "testing" ) @@ -80,6 +81,81 @@ func runUntar(t *testing.T, entries []tarEntry) (string, error) { return dir, untar(context.Background(), bytes.NewReader(buildTar(t, entries)), dir) } +// With an identity dir, a read-only bind mount appears at IdentityMountPath. +func TestBuildActorOCISpec_IdentityMount(t *testing.T) { + spec := buildActorOCISpec( + []string{"/app"}, + []string{"FOO=bar"}, + map[string]string{"k": "v"}, + "/run/netns/x", + "/host/actors/ns:tmpl:id/identity", + ) + found := false + for _, m := range spec.Mounts { + if m.Destination != IdentityMountPath { + continue + } + found = true + if m.Source != "/host/actors/ns:tmpl:id/identity" { + t.Errorf("identity mount source = %q, want the per-actor identity dir", m.Source) + } + if m.Type != "bind" { + t.Errorf("identity mount type = %q, want bind", m.Type) + } + if !slices.Contains(m.Options, "ro") { + t.Errorf("identity mount must be read-only, options=%v", m.Options) + } + } + if !found { + t.Fatalf("identity mount %q missing; mounts=%v", IdentityMountPath, spec.Mounts) + } +} + +// Without an identity dir (the pause container), no identity mount appears. +func TestBuildActorOCISpec_NoIdentityMountForPause(t *testing.T) { + bare := buildActorOCISpec([]string{"/pause"}, nil, nil, "/run/netns/x", "") + for _, m := range bare.Mounts { + if m.Destination == IdentityMountPath { + t.Errorf("identity mount must be absent when identityDir is empty") + } + } +} + +func TestCreateMountPoint(t *testing.T) { + t.Run("creates target inside rootfs", func(t *testing.T) { + root := t.TempDir() + if err := createMountPoint(root, IdentityMountPath); err != nil { + t.Fatalf("createMountPoint: %v", err) + } + info, err := os.Stat(filepath.Join(root, "run", "ate")) + if err != nil { + t.Fatalf("mount point not created in rootfs: %v", err) + } + if !info.IsDir() { + t.Errorf("mount point must be a directory to host the identity bind mount") + } + }) + + t.Run("refuses symlink escaping the rootfs", func(t *testing.T) { + root := t.TempDir() + outside := t.TempDir() + // A malicious image could ship /run as a symlink pointing out of the + // rootfs; os.Root must refuse to follow it. + if err := os.Symlink(outside, filepath.Join(root, "run")); err != nil { + t.Fatalf("planting symlink: %v", err) + } + if err := createMountPoint(root, IdentityMountPath); err == nil { + t.Errorf("expected error when /run escapes the rootfs, got nil") + } + // Nothing may be created through the escaping symlink. + if entries, err := os.ReadDir(outside); err != nil { + t.Errorf("reading outside dir: %v", err) + } else if len(entries) != 0 { + t.Errorf("write escaped the rootfs: %s is not empty (%d entries)", outside, len(entries)) + } + }) +} + func TestValidateTarName(t *testing.T) { tests := []struct { name string diff --git a/docs/api-guide.md b/docs/api-guide.md index fe609e3ad..58cdba69d 100644 --- a/docs/api-guide.md +++ b/docs/api-guide.md @@ -49,6 +49,11 @@ Substrate has standardized on a **Uniform DNS Mesh**. You no longer need to defi **Format:** `.actors.resources.substrate.ate.dev` +### Actor Identity +Substrate bind-mounts a read-only, per-actor identity directory at **`/run/ate`** into each of the actor's containers. An actor can learn its own ID without parsing the `Host` header by reading the file **`/run/ate/actor-id`** inside it, which contains the raw actor ID with no trailing newline. Further identity and configuration data may appear in this directory over time. + +Read it fresh rather than caching it at process start. It is delivered as a per-actor bind mount, not an environment variable, precisely so it carries the correct ID after a resume from the golden snapshot — an env var (or a file baked into the image) would be frozen at the *golden* actor's ID, since it lives in the checkpointed process memory, and would therefore be identical for every actor of the template. + ### Example ```yaml diff --git a/internal/ateompath/ateompath.go b/internal/ateompath/ateompath.go index 304d53f5f..9fd93dc77 100644 --- a/internal/ateompath/ateompath.go +++ b/internal/ateompath/ateompath.go @@ -68,6 +68,18 @@ func ActorPath(actorTemplateNamespace, actorTemplateName, actorID string) string ) } +// ActorIdentityDirPath is the host directory atelet populates with the +// actor's identity data (currently the single file "actor-id") and +// bind-mounts read-only into the actor. It is per-actor and regenerated on +// every resume, so (unlike the checkpointed process environment) it reflects +// the correct ID after a restore from the golden snapshot. +func ActorIdentityDirPath(actorTemplateNamespace, actorTemplateName, actorID string) string { + return filepath.Join( + ActorPath(actorTemplateNamespace, actorTemplateName, actorID), + "identity", + ) +} + func RunSCStateDir(actorTemplateNamespace, actorTemplateName, actorID string) string { return filepath.Join( ActorPath(actorTemplateNamespace, actorTemplateName, actorID), diff --git a/internal/e2e/fixtures/probe/main.go b/internal/e2e/fixtures/probe/main.go new file mode 100644 index 000000000..6927a1d04 --- /dev/null +++ b/internal/e2e/fixtures/probe/main.go @@ -0,0 +1,69 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Command probe is a minimal introspection actor used by the e2e suites. It +// reports what the runtime looks like from inside the actor, so tests can +// assert on real in-actor state rather than the config atelet generates. +// +// Keep each endpoint small and independently assertable. New e2e suites add +// probes here. +package main + +import ( + "encoding/json" + "log" + "net/http" + "os" +) + +// identityFile is the actor-id file inside the identity directory atelet +// bind-mounts at IdentityMountPath. +const identityFile = "/run/ate/actor-id" + +// whoami reports the actor's identity as observed at request time from the +// bind-mounted identity file. A read failure is reported in the response +// rather than swallowed, so a failing e2e assertion explains itself. +func whoami(w http.ResponseWriter, _ *http.Request) { + host, _ := os.Hostname() + + resp := map[string]string{"hostname": host} + if b, err := os.ReadFile(identityFile); err == nil { + resp["file"] = string(b) + } else { + resp["file"] = "" + resp["error"] = err.Error() + } + + writeJSON(w, resp) +} + +func writeJSON(w http.ResponseWriter, v any) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(v); err != nil { + log.Printf("probe: encoding response: %v", err) + } +} + +func main() { + mux := http.NewServeMux() + mux.HandleFunc("/whoami", whoami) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) }) + + const addr = ":80" + log.Printf("probe listening on %s", addr) + server := &http.Server{Addr: addr, Handler: mux} + if err := server.ListenAndServe(); err != nil { + log.Fatalf("probe server: %v", err) + } +} diff --git a/internal/e2e/fixtures/probe/probe.yaml.tmpl b/internal/e2e/fixtures/probe/probe.yaml.tmpl new file mode 100644 index 000000000..bd276a5e0 --- /dev/null +++ b/internal/e2e/fixtures/probe/probe.yaml.tmpl @@ -0,0 +1,55 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Namespace +metadata: + name: ate-e2e-probe + +--- + +apiVersion: ate.dev/v1alpha1 +kind: WorkerPool +metadata: + name: probe + namespace: ate-e2e-probe +spec: + replicas: 3 + ateomImage: ko://github.com/agent-substrate/substrate/cmd/ateom-gvisor + +--- + +apiVersion: ate.dev/v1alpha1 +kind: ActorTemplate +metadata: + name: probe + namespace: ate-e2e-probe +spec: + runsc: + amd64: + url: "gs://gvisor/releases/nightly/2026-05-19/x86_64/runsc" + sha256Hash: "a397be1abc2420d26bce6c70e6e2ff96c73aaaab929756c56f5e2089ea842b63" + arm64: + url: "gs://gvisor/releases/nightly/2026-05-19/aarch64/runsc" + sha256Hash: "1ba2366ae2efceba166046f51a4104f9261c9cb72c6db8f5b3fe2dc57dea86b9" + pauseImage: "registry.k8s.io/pause:3.10.2@sha256:f548e0e8e3dc1896ca956272154dde3314e8cc4fde0a57577ee9fa1c63f5baf4" + containers: + - name: probe + image: ko://github.com/agent-substrate/substrate/internal/e2e/fixtures/probe + command: ["/ko-app/probe"] + workerPoolRef: + namespace: ate-e2e-probe + name: probe + snapshotsConfig: + location: gs://${BUCKET_NAME}/ate-e2e-probe/ diff --git a/internal/e2e/router_client.go b/internal/e2e/router_client.go new file mode 100644 index 000000000..6896e5a64 --- /dev/null +++ b/internal/e2e/router_client.go @@ -0,0 +1,206 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "io" + "net/http" + "time" + + "github.com/agent-substrate/substrate/internal/ateclient" + "github.com/agent-substrate/substrate/internal/resources" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" +) + +const ( + routerNamespace = "ate-system" + routerService = "atenet-router" +) + +// RouterClient sends HTTP requests to actors through the atenet router, the +// same way real traffic arrives (so the request is routed and, if needed, the +// actor is resumed). It port-forwards the router Service, mirroring the +// approach in internal/ateclient. +type RouterClient struct { + baseURL string + http *http.Client + stopCh chan struct{} +} + +// NewRouterClient establishes a port-forward to the atenet router. Call Close +// to tear it down. +func NewRouterClient(ctx context.Context) (*RouterClient, error) { + config, err := ateclient.LoadConfig(KubeConfig, KubeContext) + if err != nil { + return nil, fmt.Errorf("loading kubeconfig: %w", err) + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("creating k8s client: %w", err) + } + + // Resolve the router Service to one of its backing pods. + svc, err := clientset.CoreV1().Services(routerNamespace).Get(ctx, routerService, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("getting %s service: %w", routerService, err) + } + // A headless/selectorless Service would make SelectorFromSet match every + // pod in the namespace; refuse rather than forward to an arbitrary pod. + if len(svc.Spec.Selector) == 0 { + return nil, fmt.Errorf("service %s has no selector", routerService) + } + selector := labels.SelectorFromSet(svc.Spec.Selector).String() + pods, err := clientset.CoreV1().Pods(routerNamespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) + if err != nil { + return nil, fmt.Errorf("listing %s pods: %w", routerService, err) + } + var targetPod *corev1.Pod + for i := range pods.Items { + if isPodReady(&pods.Items[i]) { + targetPod = &pods.Items[i] + break + } + } + if targetPod == nil { + return nil, fmt.Errorf("no ready %s pods in %s", routerService, routerNamespace) + } + + // Port-forward targets a pod's container port, so resolve the Service's + // HTTP port (80) to its backing targetPort (kubectl does this for us when + // forwarding a Service, but we forward the pod directly). + targetPort, err := resolveHTTPTargetPort(svc, targetPod) + if err != nil { + return nil, err + } + + req := clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Namespace(routerNamespace). + Name(targetPod.Name). + SubResource("portforward") + + transport, upgrader, err := spdy.RoundTripperFor(config) + if err != nil { + return nil, fmt.Errorf("creating SPDY transport: %w", err) + } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, req.URL()) + + stopCh := make(chan struct{}) + readyCh := make(chan struct{}) + // Port 0 asks the OS for a random available local port. + fw, err := portforward.New(dialer, []string{fmt.Sprintf("0:%d", targetPort)}, stopCh, readyCh, io.Discard, io.Discard) + if err != nil { + return nil, fmt.Errorf("creating port forwarder: %w", err) + } + + errCh := make(chan error, 1) + go func() { + if err := fw.ForwardPorts(); err != nil { + errCh <- err + } + }() + select { + case <-readyCh: + case err := <-errCh: + return nil, fmt.Errorf("port forwarding: %w", err) + case <-ctx.Done(): + close(stopCh) + return nil, ctx.Err() + } + + ports, err := fw.GetPorts() + if err != nil || len(ports) == 0 { + close(stopCh) + return nil, fmt.Errorf("getting forwarded ports: %w", err) + } + + return &RouterClient{ + baseURL: fmt.Sprintf("http://127.0.0.1:%d", ports[0].Local), + http: &http.Client{Timeout: 30 * time.Second}, + stopCh: stopCh, + }, nil +} + +// isPodReady reports whether the pod is Running, not terminating, and has +// passed its readiness probe — i.e. actually serving, the same bar the Service +// uses to select endpoints. +func isPodReady(pod *corev1.Pod) bool { + if pod.Status.Phase != corev1.PodRunning || pod.DeletionTimestamp != nil { + return false + } + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodReady { + return c.Status == corev1.ConditionTrue + } + } + return false +} + +// resolveHTTPTargetPort maps the router Service's HTTP port (80) to the +// container port it targets on the given pod, resolving named targetPorts. +func resolveHTTPTargetPort(svc *corev1.Service, pod *corev1.Pod) (int32, error) { + for _, sp := range svc.Spec.Ports { + if sp.Port != 80 { + continue + } + var port int32 + switch sp.TargetPort.Type { + case intstr.Int: + port = sp.TargetPort.IntVal + case intstr.String: + for _, c := range pod.Spec.Containers { + for _, cp := range c.Ports { + if cp.Name == sp.TargetPort.StrVal { + port = cp.ContainerPort + } + } + } + if port == 0 { + return 0, fmt.Errorf("named targetPort %q not found on pod %s", sp.TargetPort.StrVal, pod.Name) + } + } + // Guard against an unset/zero targetPort, which would forward to nothing. + if port <= 0 { + return 0, fmt.Errorf("service %s port 80 has no usable targetPort", svc.Name) + } + return port, nil + } + return 0, fmt.Errorf("service %s has no port 80", svc.Name) +} + +// Close stops the port-forward tunnel. +func (c *RouterClient) Close() { + close(c.stopCh) +} + +// Get issues GET path to actorID through the router, setting the actor's mesh +// Host so the router routes (and resumes) it. The caller must close the body. +func (c *RouterClient) Get(ctx context.Context, actorID, path string) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil) + if err != nil { + return nil, err + } + // The router routes on the Host/:authority, not a header. + req.Host = fmt.Sprintf("%s.%s", actorID, resources.ActorDNSSuffix) + return c.http.Do(req) +} diff --git a/internal/e2e/router_client_test.go b/internal/e2e/router_client_test.go new file mode 100644 index 000000000..632534515 --- /dev/null +++ b/internal/e2e/router_client_test.go @@ -0,0 +1,109 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func TestResolveHTTPTargetPort(t *testing.T) { + pod := &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "envoy", + Ports: []corev1.ContainerPort{{Name: "http", ContainerPort: 8080}}, + }}, + }, + } + svc := func(p corev1.ServicePort) *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "atenet-router"}, + Spec: corev1.ServiceSpec{Ports: []corev1.ServicePort{p}}, + } + } + + // Pod whose named "http" port resolves to an invalid zero container port. + zeroPortPod := &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "envoy", + Ports: []corev1.ContainerPort{{Name: "http", ContainerPort: 0}}, + }}, + }, + } + + tests := []struct { + name string + svc *corev1.Service + pod *corev1.Pod + want int32 + wantErr bool + }{ + {"int target port", svc(corev1.ServicePort{Port: 80, TargetPort: intstr.FromInt(8080)}), pod, 8080, false}, + {"named target port", svc(corev1.ServicePort{Port: 80, TargetPort: intstr.FromString("http")}), pod, 8080, false}, + {"named target not found", svc(corev1.ServicePort{Port: 80, TargetPort: intstr.FromString("nope")}), pod, 0, true}, + {"named target resolves to zero", svc(corev1.ServicePort{Port: 80, TargetPort: intstr.FromString("http")}), zeroPortPod, 0, true}, + {"unset target port", svc(corev1.ServicePort{Port: 80}), pod, 0, true}, + {"no port 80", svc(corev1.ServicePort{Port: 443, TargetPort: intstr.FromInt(8443)}), pod, 0, true}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got, err := resolveHTTPTargetPort(tc.svc, tc.pod) + if (err != nil) != tc.wantErr { + t.Fatalf("err = %v, wantErr %v", err, tc.wantErr) + } + if !tc.wantErr && got != tc.want { + t.Errorf("got %d, want %d", got, tc.want) + } + }) + } +} + +func TestIsPodReady(t *testing.T) { + pod := func(phase corev1.PodPhase, ready corev1.ConditionStatus, deleting bool) *corev1.Pod { + p := &corev1.Pod{Status: corev1.PodStatus{ + Phase: phase, + Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: ready}}, + }} + if deleting { + now := metav1.Now() + p.DeletionTimestamp = &now + } + return p + } + + tests := []struct { + name string + pod *corev1.Pod + want bool + }{ + {"running and ready", pod(corev1.PodRunning, corev1.ConditionTrue, false), true}, + {"running not ready", pod(corev1.PodRunning, corev1.ConditionFalse, false), false}, + {"pending", pod(corev1.PodPending, corev1.ConditionTrue, false), false}, + {"terminating", pod(corev1.PodRunning, corev1.ConditionTrue, true), false}, + {"no ready condition", &corev1.Pod{Status: corev1.PodStatus{Phase: corev1.PodRunning}}, false}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if got := isPodReady(tc.pod); got != tc.want { + t.Errorf("isPodReady = %v, want %v", got, tc.want) + } + }) + } +} diff --git a/internal/e2e/suites/identity/identity_test.go b/internal/e2e/suites/identity/identity_test.go new file mode 100644 index 000000000..3b0d48790 --- /dev/null +++ b/internal/e2e/suites/identity/identity_test.go @@ -0,0 +1,197 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package identity + +import ( + "context" + "encoding/json" + "io" + "net/http" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/agent-substrate/substrate/internal/e2e" + "github.com/agent-substrate/substrate/pkg/api/v1alpha1" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + probeNamespace = "ate-e2e-probe" + probeTemplate = "probe" +) + +type whoamiResponse struct { + File string `json:"file"` + Hostname string `json:"hostname"` + // Error is the probe's identity-file read error, if any, so a failed + // assertion explains why the ID was missing. + Error string `json:"error"` +} + +// TestActorIdentity_AfterRestore_IsOwnID_NotGolden is the regression gate for +// per-actor identity. The env-var approach passed unit tests and config.json +// inspection yet was broken at runtime: actors restored from the shared golden +// snapshot all reported the golden actor's ID. This test catches that by +// restoring TWO actors from one golden snapshot and asserting each observes its +// OWN id — and explicitly that it is not the golden id. +func TestActorIdentity_AfterRestore_IsOwnID_NotGolden(t *testing.T) { + env, err := e2e.CheckEnv("BUCKET_NAME", "KO_DOCKER_REPO") + if err != nil { + t.Fatalf("CheckEnv failed: %v", err) + } + ctx := context.Background() + clients := e2e.GetClients() + + deployProbe(t, env["BUCKET_NAME"]) + golden := waitForGolden(t, ctx, clients) + + // Two distinct actors from the same golden snapshot. + ids := []string{"probe-alpha", "probe-beta"} + for _, id := range ids { + createAndResumeActor(t, ctx, clients, id) + } + + rc, err := e2e.NewRouterClient(ctx) + if err != nil { + t.Fatalf("NewRouterClient: %v", err) + } + defer rc.Close() + + seen := map[string]string{} + for _, id := range ids { + got := whoami(t, ctx, rc, id) + + if got.File != id { + t.Errorf("actor %q: /run/ate/actor-id = %q, want %q (probe read error: %q)", id, got.File, id, got.Error) + } + if got.File == golden { + t.Errorf("actor %q: identity is the GOLDEN snapshot id %q — restore leaked shared state", id, golden) + } + if other, dup := seen[got.File]; dup { + t.Errorf("actor %q and %q both report identity %q — actors are not distinct", id, other, got.File) + } + seen[got.File] = id + } +} + +func deployProbe(t *testing.T, bucket string) { + t.Helper() + root, err := e2e.FindRepoRoot() + if err != nil { + t.Fatalf("FindRepoRoot: %v", err) + } + + // Render the manifest template to a file so both apply and delete can + // consume it without any shell involved. + tmpl, err := os.ReadFile(filepath.Join(root, "internal/e2e/fixtures/probe/probe.yaml.tmpl")) + if err != nil { + t.Fatalf("reading probe manifest template: %v", err) + } + manifest := filepath.Join(t.TempDir(), "probe.yaml") + rendered := strings.ReplaceAll(string(tmpl), "${BUCKET_NAME}", bucket) + if err := os.WriteFile(manifest, []byte(rendered), 0o644); err != nil { + t.Fatalf("writing rendered probe manifest: %v", err) + } + + // Build/push the probe image and apply the manifest through the repo's + // pinned ko (hack/run-tool.sh ko); CI does not install ko on PATH, and every + // other deploy in this repo goes through this wrapper. The trailing + // `-- --context=...` mirrors run_ko in hack/install-ate.sh: ko's apply + // subcommand forwards args after `--` to kubectl. KO_CONFIG_PATH is + // required because ko resolves .ko.yaml from its working directory, which + // is the test's package dir, not the repo root; without it the build + // silently loses defaultPlatforms (and produces amd64-only images that + // cannot run on arm64 nodes). + applyArgs := []string{"ko", "apply", "-f", manifest} + if e2e.KubeContext != "" { + applyArgs = append(applyArgs, "--", "--context="+e2e.KubeContext) + } + e2e.RunCmdWithEnv(t, []string{"KO_CONFIG_PATH=" + root}, filepath.Join(root, "hack/run-tool.sh"), applyArgs...) + + t.Cleanup(func() { + // Deletion needs no image build, so go straight to kubectl (matching + // demo-counter_delete in hack/install-demo-counter.sh). `ko delete` + // rejects this arg shape ("you may not specify resource arguments as + // well"). + delArgs := []string{"delete", "--ignore-not-found", "-f", manifest} + if e2e.KubeContext != "" { + delArgs = append([]string{"--context=" + e2e.KubeContext}, delArgs...) + } + e2e.RunCmd(t, "kubectl", delArgs...) + }) +} + +func waitForGolden(t *testing.T, ctx context.Context, clients *e2e.Clients) string { + t.Helper() + deadline := time.Now().Add(5 * time.Minute) + for time.Now().Before(deadline) { + at, err := clients.SubstrateK8s.ApiV1alpha1().ActorTemplates(probeNamespace).Get(ctx, probeTemplate, metav1.GetOptions{}) + if err == nil { + switch at.Status.Phase { + case v1alpha1.PhaseReady: + t.Logf("probe ActorTemplate ready, golden=%s", at.Status.GoldenActorID) + return at.Status.GoldenActorID + case v1alpha1.PhaseFailed: + t.Fatalf("probe ActorTemplate entered PhaseFailed") + } + } + time.Sleep(2 * time.Second) + } + t.Fatalf("timed out waiting for probe ActorTemplate to be Ready") + return "" +} + +func createAndResumeActor(t *testing.T, ctx context.Context, clients *e2e.Clients, id string) { + t.Helper() + if _, err := clients.SubstrateAPI.CreateActor(ctx, &ateapipb.CreateActorRequest{ + ActorId: id, + ActorTemplateNamespace: probeNamespace, + ActorTemplateName: probeTemplate, + }); err != nil { + t.Fatalf("CreateActor %q: %v", id, err) + } + t.Cleanup(func() { + // DeleteActor requires the actor to be suspended. + _, _ = clients.SubstrateAPI.SuspendActor(ctx, &ateapipb.SuspendActorRequest{ActorId: id}) + _, _ = clients.SubstrateAPI.DeleteActor(ctx, &ateapipb.DeleteActorRequest{ActorId: id}) + }) + + // Resume from the golden snapshot (the restore path, not --boot). + if _, err := clients.SubstrateAPI.ResumeActor(ctx, &ateapipb.ResumeActorRequest{ActorId: id}); err != nil { + t.Fatalf("ResumeActor %q: %v", id, err) + } +} + +func whoami(t *testing.T, ctx context.Context, rc *e2e.RouterClient, id string) whoamiResponse { + t.Helper() + resp, err := rc.Get(ctx, id, "/whoami") + if err != nil { + t.Fatalf("GET /whoami for %q: %v", id, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("GET /whoami for %q: status %d, body %q", id, resp.StatusCode, body) + } + var out whoamiResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + t.Fatalf("decoding /whoami for %q: %v", id, err) + } + return out +} diff --git a/internal/e2e/suites/identity/testmain_test.go b/internal/e2e/suites/identity/testmain_test.go new file mode 100644 index 000000000..2ccbd931b --- /dev/null +++ b/internal/e2e/suites/identity/testmain_test.go @@ -0,0 +1,24 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package identity + +import ( + "os" + "testing" + + "github.com/agent-substrate/substrate/internal/e2e" +) + +func TestMain(m *testing.M) { os.Exit(e2e.RunTestMain(m)) }