From 18e793e9c88150053f1de47832338c6f0e774869 Mon Sep 17 00:00:00 2001 From: Junjie Wang Date: Fri, 12 Jun 2026 17:23:43 -0700 Subject: [PATCH 1/3] Add e2e substrate tests with mocks. --- .github/workflows/go.yml | 7 +- internal/harness/antigravity_test.go | 186 +++--------------- internal/harness/mocks_test.go | 277 +++++++++++++++++++++++++++ internal/harness/substrate_test.go | 168 ++++++++++++++++ 4 files changed, 474 insertions(+), 164 deletions(-) create mode 100644 internal/harness/mocks_test.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index d13b224..1609eaf 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -48,7 +48,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: '1.25' + go-version-file: go.mod - name: Build run: go build -v ./... @@ -56,6 +56,11 @@ jobs: - name: Test run: go test -v ./... + - name: Build (harness) + run: go build -tags harness -v ./... + + - name: Test (harness) + run: go test -tags harness -v ./... - name: Check for binary files in PR diff --git a/internal/harness/antigravity_test.go b/internal/harness/antigravity_test.go index ca7392f..e328663 100644 --- a/internal/harness/antigravity_test.go +++ b/internal/harness/antigravity_test.go @@ -16,204 +16,64 @@ package harness import ( "context" - "net" "strings" - "sync" "testing" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/google/ax/proto" ) -type mockHandler struct { - mu sync.Mutex - messages []*proto.Message - complete bool - err error -} - -func (h *mockHandler) OnMessage(ctx context.Context, execID string, msg *proto.Message) error { - h.mu.Lock() - defer h.mu.Unlock() - h.messages = append(h.messages, msg) - return h.err -} - -func (h *mockHandler) OnComplete(ctx context.Context, execID string) error { - h.mu.Lock() - defer h.mu.Unlock() - h.complete = true - return nil -} - -// mockHarnessServer implements proto.HarnessServiceServer for testing. -type mockHarnessServer struct { - proto.UnimplementedHarnessServiceServer - failConnect bool -} - -func (s *mockHarnessServer) Connect(stream proto.HarnessService_ConnectServer) error { - if s.failConnect { - return status.Error(codes.Internal, "internal mock server crash") - } - - // Read the initiating HarnessRequest{start}. - req, err := stream.Recv() - if err != nil { - return err - } - - // 1. Verify conversation details - if req.GetConversationId() != "conv-test" { - return status.Error(codes.InvalidArgument, "invalid conversation_id") - } - - // 2. Stream thought frame - tMsg := &proto.Message{ - Role: "model", - Content: &proto.Content{ - Type: &proto.Content_Thought{ - Thought: &proto.ThoughtContent{ - Summary: []*proto.ThoughtSummaryContent{ - { - Type: &proto.ThoughtSummaryContent_Text{ - Text: &proto.TextContent{Text: "Analyzing"}, - }, - }, - }, - }, - }, - }, - } - if err := stream.Send(&proto.HarnessResponse{ - ConversationId: req.GetConversationId(), - Type: &proto.HarnessResponse_Outputs{ - Outputs: &proto.HarnessOutputs{Messages: []*proto.Message{tMsg}}, - }, - }); err != nil { - return err - } - - // 3. Stream text frame - txtMsg := &proto.Message{ - Role: "assistant", - Content: &proto.Content{ - Type: &proto.Content_Text{ - Text: &proto.TextContent{Text: "Hello world"}, - }, - }, - } - if err := stream.Send(&proto.HarnessResponse{ - ConversationId: req.GetConversationId(), - Type: &proto.HarnessResponse_Outputs{ - Outputs: &proto.HarnessOutputs{Messages: []*proto.Message{txtMsg}}, - }, - }); err != nil { - return err - } - - // 4. Stream end frame - return stream.Send(&proto.HarnessResponse{ - ConversationId: req.GetConversationId(), - Type: &proto.HarnessResponse_End{ - End: &proto.HarnessEnd{State: proto.State_STATE_COMPLETED}, - }, - }) -} - func TestAntigravityHarness_Run_Success(t *testing.T) { - // Spin up a local TCP listener - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to listen: %v", err) + srv := &fakeHarnessServer{ + outputs: []*proto.Message{thoughtText("Analyzing"), assistantText("Hello world")}, } - defer lis.Close() - - // Initialize and start local gRPC server - grpcServer := grpc.NewServer() - mockServer := &mockHarnessServer{} - proto.RegisterHarnessServiceServer(grpcServer, mockServer) - - go func() { - if err := grpcServer.Serve(lis); err != nil && err != grpc.ErrServerStopped { - t.Errorf("Serve failed: %v", err) - } - }() - defer grpcServer.Stop() + harnessClient := NewAntigravityHarness(startHarnessServer(t, srv)) - harnessClient := NewAntigravityHarness(lis.Addr().String()) exec, err := harnessClient.Start(context.Background(), "conv-test") if err != nil { t.Fatalf("failed to start execution: %v", err) } defer exec.Close(context.Background()) - msg := &proto.Message{ - Role: "user", - Content: &proto.Content{ - Type: &proto.Content_Text{Text: &proto.TextContent{Text: "Hi"}}, - }, - } - if err := exec.Queue(context.Background(), msg); err != nil { + if err := exec.Queue(context.Background(), userText("Hi")); err != nil { t.Fatalf("failed to queue message: %v", err) } - handler := &mockHandler{} - err = exec.Run(context.Background(), handler) - if err != nil { + handler := &fakeHandler{} + if err := exec.Run(context.Background(), handler); err != nil { t.Fatalf("Run failed: %v", err) } - handler.mu.Lock() - defer handler.mu.Unlock() - - if !handler.complete { + if !handler.isDone() { t.Error("expected OnComplete to be called") } - if len(handler.messages) != 2 { - t.Fatalf("expected 2 messages, got %d", len(handler.messages)) + msgs := handler.collected() + if len(msgs) != 2 { + t.Fatalf("expected 2 messages, got %d", len(msgs)) } - if handler.messages[0].GetContent().GetThought().GetSummary()[0].GetText().GetText() != "Analyzing" { - t.Errorf("expected 'Analyzing', got %q", handler.messages[0].GetContent().GetThought().GetSummary()[0].GetText().GetText()) + if got := msgs[0].GetContent().GetThought().GetSummary()[0].GetText().GetText(); got != "Analyzing" { + t.Errorf("expected 'Analyzing', got %q", got) } - if handler.messages[1].GetContent().GetText().GetText() != "Hello world" { - t.Errorf("expected 'Hello world', got %q", handler.messages[1].GetContent().GetText().GetText()) + if got := msgs[1].GetContent().GetText().GetText(); got != "Hello world" { + t.Errorf("expected 'Hello world', got %q", got) + } + // The harness propagated the conversation id to the server. + if convID, _, _ := srv.received(); convID != "conv-test" { + t.Errorf("server got convID=%q, want conv-test", convID) } } func TestAntigravityHarness_Run_ErrorFrame(t *testing.T) { - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to listen: %v", err) - } - defer lis.Close() - - grpcServer := grpc.NewServer() - mockServer := &mockHarnessServer{failConnect: true} - proto.RegisterHarnessServiceServer(grpcServer, mockServer) + srv := &fakeHarnessServer{failConnect: true, errMessage: "internal mock server crash"} + harnessClient := NewAntigravityHarness(startHarnessServer(t, srv)) - go func() { - _ = grpcServer.Serve(lis) - }() - defer grpcServer.Stop() - - harnessClient := NewAntigravityHarness(lis.Addr().String()) exec, _ := harnessClient.Start(context.Background(), "conv-test") defer exec.Close(context.Background()) - msg := &proto.Message{ - Role: "user", - Content: &proto.Content{ - Type: &proto.Content_Text{Text: &proto.TextContent{Text: "Hi"}}, - }, + if err := exec.Queue(context.Background(), userText("Hi")); err != nil { + t.Fatalf("failed to queue message: %v", err) } - _ = exec.Queue(context.Background(), msg) - handler := &mockHandler{} - err = exec.Run(context.Background(), handler) + err := exec.Run(context.Background(), &fakeHandler{}) if err == nil { t.Fatal("expected error from Run(), got nil") } diff --git a/internal/harness/mocks_test.go b/internal/harness/mocks_test.go new file mode 100644 index 0000000..31262fe --- /dev/null +++ b/internal/harness/mocks_test.go @@ -0,0 +1,277 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package harness + +// Shared in-process fakes for the harness tests: a fake ATE Control server +// (the substrate control plane), a fake HarnessService server (the harness +// inside an actor), a recording Handler, and message builders. + +import ( + "context" + "net" + "sync" + "testing" + + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "github.com/google/ax/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" +) + +// fakeControlServer is an in-process ateapipb.ControlServer that records the +// actor lifecycle calls SubstrateHarness makes and lets tests steer the +// CreateActor/ResumeActor responses. Only the three RPCs SubstrateHarness uses +// are implemented; the rest come from the embedded Unimplemented server. +type fakeControlServer struct { + ateapipb.UnimplementedControlServer + + mu sync.Mutex + createCalls []string + resumeCalls []string + suspendCalls []string + + createErr error // returned from CreateActor when non-nil + resumeIP string // AteomPodIp returned from ResumeActor + resumeNilActor bool // when true, ResumeActor returns a nil Actor +} + +func (f *fakeControlServer) CreateActor(_ context.Context, req *ateapipb.CreateActorRequest) (*ateapipb.CreateActorResponse, error) { + f.mu.Lock() + f.createCalls = append(f.createCalls, req.GetActorId()) + f.mu.Unlock() + if f.createErr != nil { + return nil, f.createErr + } + return &ateapipb.CreateActorResponse{Actor: &ateapipb.Actor{ActorId: req.GetActorId()}}, nil +} + +func (f *fakeControlServer) ResumeActor(_ context.Context, req *ateapipb.ResumeActorRequest) (*ateapipb.ResumeActorResponse, error) { + f.mu.Lock() + f.resumeCalls = append(f.resumeCalls, req.GetActorId()) + f.mu.Unlock() + if f.resumeNilActor { + return &ateapipb.ResumeActorResponse{}, nil + } + return &ateapipb.ResumeActorResponse{Actor: &ateapipb.Actor{ActorId: req.GetActorId(), AteomPodIp: f.resumeIP}}, nil +} + +func (f *fakeControlServer) SuspendActor(_ context.Context, req *ateapipb.SuspendActorRequest) (*ateapipb.SuspendActorResponse, error) { + f.mu.Lock() + f.suspendCalls = append(f.suspendCalls, req.GetActorId()) + f.mu.Unlock() + return &ateapipb.SuspendActorResponse{}, nil +} + +// calls returns copies of the recorded call lists. +func (f *fakeControlServer) calls() (create, resume, suspend []string) { + f.mu.Lock() + defer f.mu.Unlock() + return append([]string(nil), f.createCalls...), + append([]string(nil), f.resumeCalls...), + append([]string(nil), f.suspendCalls...) +} + +// fakeHarnessServer is an in-process proto.HarnessServiceServer standing in for +// the harness running inside an actor (substrate) or a local subprocess +// (antigravity). It records the start frame and emits its configured outputs +// followed by a terminal HarnessEnd. +type fakeHarnessServer struct { + proto.UnimplementedHarnessServiceServer + + // outputs are the messages emitted (in a single Outputs frame) before the + // terminal HarnessEnd. When nil, each input is echoed as "ack: ". + outputs []*proto.Message + // failConnect makes Connect return an RPC error before any frame. + failConnect bool + // failFrame makes Connect terminate the turn with HarnessEnd{STATE_FAILED}. + failFrame bool + // errMessage is the error text used by failConnect/failFrame. + errMessage string + + mu sync.Mutex + gotConvID string + gotHarnessID string + gotInputs []string +} + +func (s *fakeHarnessServer) Connect(stream proto.HarnessService_ConnectServer) error { + if s.failConnect { + return status.Error(codes.Internal, s.errMessage) + } + + req, err := stream.Recv() + if err != nil { + return err + } + + var inputs []string + for _, m := range req.GetStart().GetMessages() { + if text := m.GetContent().GetText().GetText(); text != "" { + inputs = append(inputs, text) + } + } + s.mu.Lock() + s.gotConvID = req.GetConversationId() + s.gotHarnessID = req.GetHarnessId() + s.gotInputs = inputs + s.mu.Unlock() + + convID := req.GetConversationId() + if s.failFrame { + return stream.Send(&proto.HarnessResponse{ + ConversationId: convID, + Type: &proto.HarnessResponse_End{ + End: &proto.HarnessEnd{State: proto.State_STATE_FAILED, ErrorMessage: s.errMessage}, + }, + }) + } + + msgs := s.outputs + if msgs == nil { + for _, in := range inputs { + msgs = append(msgs, assistantText("ack: "+in)) + } + } + if len(msgs) > 0 { + if err := stream.Send(&proto.HarnessResponse{ + ConversationId: convID, + Type: &proto.HarnessResponse_Outputs{ + Outputs: &proto.HarnessOutputs{Messages: msgs}, + }, + }); err != nil { + return err + } + } + return stream.Send(&proto.HarnessResponse{ + ConversationId: convID, + Type: &proto.HarnessResponse_End{End: &proto.HarnessEnd{State: proto.State_STATE_COMPLETED}}, + }) +} + +// received returns a copy of the start frame the server received. +func (s *fakeHarnessServer) received() (convID, harnessID string, inputs []string) { + s.mu.Lock() + defer s.mu.Unlock() + return s.gotConvID, s.gotHarnessID, append([]string(nil), s.gotInputs...) +} + +// fakeHandler records the messages and completion streamed during a turn. +type fakeHandler struct { + mu sync.Mutex + messages []*proto.Message + complete bool +} + +func (h *fakeHandler) OnMessage(_ context.Context, _ string, msg *proto.Message) error { + h.mu.Lock() + defer h.mu.Unlock() + h.messages = append(h.messages, msg) + return nil +} + +func (h *fakeHandler) OnComplete(_ context.Context, _ string) error { + h.mu.Lock() + defer h.mu.Unlock() + h.complete = true + return nil +} + +func (h *fakeHandler) isDone() bool { + h.mu.Lock() + defer h.mu.Unlock() + return h.complete +} + +// collected returns a copy of the messages received via OnMessage. +func (h *fakeHandler) collected() []*proto.Message { + h.mu.Lock() + defer h.mu.Unlock() + return append([]*proto.Message(nil), h.messages...) +} + +// texts returns the text content of each received message, in order. +func (h *fakeHandler) texts() []string { + h.mu.Lock() + defer h.mu.Unlock() + var out []string + for _, m := range h.messages { + out = append(out, m.GetContent().GetText().GetText()) + } + return out +} + +func assistantText(text string) *proto.Message { + return &proto.Message{ + Role: "assistant", + Content: &proto.Content{Type: &proto.Content_Text{Text: &proto.TextContent{Text: text}}}, + } +} + +func userText(text string) *proto.Message { + return &proto.Message{ + Role: "user", + Content: &proto.Content{Type: &proto.Content_Text{Text: &proto.TextContent{Text: text}}}, + } +} + +func thoughtText(summary string) *proto.Message { + return &proto.Message{ + Role: "model", + Content: &proto.Content{ + Type: &proto.Content_Thought{ + Thought: &proto.ThoughtContent{ + Summary: []*proto.ThoughtSummaryContent{ + {Type: &proto.ThoughtSummaryContent_Text{Text: &proto.TextContent{Text: summary}}}, + }, + }, + }, + }, + } +} + +// startHarnessServer starts a HarnessService + health server (status SERVING) +// on a random local port and returns its address. +func startHarnessServer(t *testing.T, srv *fakeHarnessServer) string { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + proto.RegisterHarnessServiceServer(s, srv) + hs := health.NewServer() + hs.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(s, hs) + go func() { _ = s.Serve(lis) }() + t.Cleanup(s.Stop) + return lis.Addr().String() +} + +// startControlServer starts a fake ATE Control server on a random local port. +func startControlServer(t *testing.T, srv *fakeControlServer) string { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + ateapipb.RegisterControlServer(s, srv) + go func() { _ = s.Serve(lis) }() + t.Cleanup(s.Stop) + return lis.Addr().String() +} diff --git a/internal/harness/substrate_test.go b/internal/harness/substrate_test.go index 0ec3d3b..76aa102 100644 --- a/internal/harness/substrate_test.go +++ b/internal/harness/substrate_test.go @@ -17,13 +17,19 @@ package harness import ( "context" "net" + "slices" + "strconv" + "strings" "testing" "time" + "github.com/google/ax/internal/experimental/k8s/ate" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" ) // startHealthTestServer starts a gRPC server on a random local port. If hs is @@ -113,3 +119,165 @@ func TestWaitForHealthy_ServerDown(t *testing.T) { t.Fatal("expected timeout error when server is down, got nil") } } + +// newTestSubstrateHarness builds a SubstrateHarness wired to the fake control +// server and the fake harness server. It constructs the struct directly (rather +// than via NewSubstrateHarness) so the control client can use insecure +// credentials instead of the TLS that NewSubstrateHarness hard-codes. +func newTestSubstrateHarness(t *testing.T, ctrlAddr, harnessAddr string) *SubstrateHarness { + t.Helper() + _, portStr, err := net.SplitHostPort(harnessAddr) + if err != nil { + t.Fatalf("bad harness addr %q: %v", harnessAddr, err) + } + port, err := strconv.Atoi(portStr) + if err != nil { + t.Fatalf("bad harness port %q: %v", portStr, err) + } + client, err := ate.NewClient("ax", "antigravity-template", ctrlAddr, + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("failed to create ate client: %v", err) + } + return &SubstrateHarness{ + harnessID: "antigravity", + ateClient: client, + port: port, + dialOpts: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, + } +} + +// Test full SubstrateHarness Start -> Run -> Close flow against the shared +// in-process mocks (see mocks_test.go). +// They lock in the wiring that a substrate bump or an ax-side change could silently +// break: create/resume idempotency, worker-IP extraction, the health gate, the +// Connect streaming protocol, and suspend-on-close. +func TestSubstrateHarness_EndToEnd(t *testing.T) { + ctrl := &fakeControlServer{resumeIP: "127.0.0.1"} + srv := &fakeHarnessServer{} + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, srv)) + + ctx := context.Background() + exec, err := h.Start(ctx, "conv-1") + if err != nil { + t.Fatalf("Start: %v", err) + } + if err := exec.Queue(ctx, userText("hi")); err != nil { + t.Fatalf("Queue: %v", err) + } + handler := &fakeHandler{} + if err := exec.Run(ctx, handler); err != nil { + t.Fatalf("Run: %v", err) + } + + // The harness server received the start frame with the right identifiers. + convID, harnessID, inputs := srv.received() + if convID != "conv-1" || harnessID != "antigravity" { + t.Errorf("server got convID=%q harnessID=%q, want conv-1/antigravity", convID, harnessID) + } + if !slices.Equal(inputs, []string{"hi"}) { + t.Errorf("server got inputs=%v, want [hi]", inputs) + } + + // The handler streamed the output and completed. + if !handler.isDone() { + t.Error("handler did not complete") + } + if got := handler.texts(); !slices.Equal(got, []string{"ack: hi"}) { + t.Errorf("handler messages=%v, want [ack: hi]", got) + } + + // CreateActor then ResumeActor ran for the conversation; no suspend yet. + create, resume, suspend := ctrl.calls() + want := []string{"conv-1"} + if !slices.Equal(create, want) || !slices.Equal(resume, want) { + t.Errorf("create=%v resume=%v, want %v each", create, resume, want) + } + if len(suspend) != 0 { + t.Errorf("suspend called before Close: %v", suspend) + } + + // Close suspends the actor. + if err := exec.Close(ctx); err != nil { + t.Fatalf("Close: %v", err) + } + if _, _, suspend = ctrl.calls(); !slices.Equal(suspend, want) { + t.Errorf("suspend=%v, want %v", suspend, want) + } +} + +func TestSubstrateHarness_CreateAlreadyExistsTolerated(t *testing.T) { + ctrl := &fakeControlServer{ + resumeIP: "127.0.0.1", + createErr: status.Error(codes.AlreadyExists, "exists"), + } + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &fakeHarnessServer{})) + + ctx := context.Background() + exec, err := h.Start(ctx, "conv-1") + if err != nil { + t.Fatalf("Start should tolerate AlreadyExists: %v", err) + } + t.Cleanup(func() { _ = exec.Close(ctx) }) + + if err := exec.Queue(ctx, userText("hi")); err != nil { + t.Fatalf("Queue: %v", err) + } + handler := &fakeHandler{} + if err := exec.Run(ctx, handler); err != nil { + t.Fatalf("Run: %v", err) + } + if !handler.isDone() { + t.Error("handler did not complete") + } + if _, resume, _ := ctrl.calls(); !slices.Equal(resume, []string{"conv-1"}) { + t.Errorf("resume=%v, want [conv-1]", resume) + } +} + +func TestSubstrateHarness_ResumeNoWorkerIP(t *testing.T) { + ctrl := &fakeControlServer{resumeIP: ""} // empty AteomPodIp + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &fakeHarnessServer{})) + + _, err := h.Start(context.Background(), "conv-1") + if err == nil { + t.Fatal("expected error for empty worker IP, got nil") + } + if !strings.Contains(err.Error(), "no active worker IP") { + t.Errorf("error = %v, want it to mention 'no active worker IP'", err) + } +} + +func TestSubstrateHarness_ResumeNilActor(t *testing.T) { + ctrl := &fakeControlServer{resumeNilActor: true} + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &fakeHarnessServer{})) + + _, err := h.Start(context.Background(), "conv-1") + if err == nil { + t.Fatal("expected error for nil actor, got nil") + } + if !strings.Contains(err.Error(), "nil actor") { + t.Errorf("error = %v, want it to mention 'nil actor'", err) + } +} + +func TestSubstrateHarness_HarnessFailedFrame(t *testing.T) { + ctrl := &fakeControlServer{resumeIP: "127.0.0.1"} + srv := &fakeHarnessServer{failFrame: true, errMessage: "boom"} + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, srv)) + + ctx := context.Background() + exec, err := h.Start(ctx, "conv-1") + if err != nil { + t.Fatalf("Start: %v", err) + } + t.Cleanup(func() { _ = exec.Close(ctx) }) + if err := exec.Queue(ctx, userText("hi")); err != nil { + t.Fatalf("Queue: %v", err) + } + if err := exec.Run(ctx, &fakeHandler{}); err == nil { + t.Fatal("expected error from failed harness frame, got nil") + } else if !strings.Contains(err.Error(), "harness failed") { + t.Errorf("error = %v, want it to mention 'harness failed'", err) + } +} From e7fc68edd09b08f54ce51ff13518472e8d6ee1e1 Mon Sep 17 00:00:00 2001 From: Junjie Wang Date: Fri, 12 Jun 2026 17:43:26 -0700 Subject: [PATCH 2/3] Rename mock structs. --- internal/harness/antigravity_test.go | 8 ++--- internal/harness/mocks_test.go | 44 ++++++++++++++-------------- internal/harness/substrate_test.go | 30 +++++++++---------- 3 files changed, 41 insertions(+), 41 deletions(-) diff --git a/internal/harness/antigravity_test.go b/internal/harness/antigravity_test.go index e328663..4d2f02a 100644 --- a/internal/harness/antigravity_test.go +++ b/internal/harness/antigravity_test.go @@ -23,7 +23,7 @@ import ( ) func TestAntigravityHarness_Run_Success(t *testing.T) { - srv := &fakeHarnessServer{ + srv := &mockHarnessServer{ outputs: []*proto.Message{thoughtText("Analyzing"), assistantText("Hello world")}, } harnessClient := NewAntigravityHarness(startHarnessServer(t, srv)) @@ -38,7 +38,7 @@ func TestAntigravityHarness_Run_Success(t *testing.T) { t.Fatalf("failed to queue message: %v", err) } - handler := &fakeHandler{} + handler := &mockHandler{} if err := exec.Run(context.Background(), handler); err != nil { t.Fatalf("Run failed: %v", err) } @@ -63,7 +63,7 @@ func TestAntigravityHarness_Run_Success(t *testing.T) { } func TestAntigravityHarness_Run_ErrorFrame(t *testing.T) { - srv := &fakeHarnessServer{failConnect: true, errMessage: "internal mock server crash"} + srv := &mockHarnessServer{failConnect: true, errMessage: "internal mock server crash"} harnessClient := NewAntigravityHarness(startHarnessServer(t, srv)) exec, _ := harnessClient.Start(context.Background(), "conv-test") @@ -73,7 +73,7 @@ func TestAntigravityHarness_Run_ErrorFrame(t *testing.T) { t.Fatalf("failed to queue message: %v", err) } - err := exec.Run(context.Background(), &fakeHandler{}) + err := exec.Run(context.Background(), &mockHandler{}) if err == nil { t.Fatal("expected error from Run(), got nil") } diff --git a/internal/harness/mocks_test.go b/internal/harness/mocks_test.go index 31262fe..68bf2e4 100644 --- a/internal/harness/mocks_test.go +++ b/internal/harness/mocks_test.go @@ -14,8 +14,8 @@ package harness -// Shared in-process fakes for the harness tests: a fake ATE Control server -// (the substrate control plane), a fake HarnessService server (the harness +// Shared in-process mocks for the harness tests: a mock Substrate Control server +// (the substrate control plane), a mock HarnessService server (the harness // inside an actor), a recording Handler, and message builders. import ( @@ -33,11 +33,11 @@ import ( "google.golang.org/grpc/status" ) -// fakeControlServer is an in-process ateapipb.ControlServer that records the +// mockControlServer is an in-process ateapipb.ControlServer that records the // actor lifecycle calls SubstrateHarness makes and lets tests steer the // CreateActor/ResumeActor responses. Only the three RPCs SubstrateHarness uses // are implemented; the rest come from the embedded Unimplemented server. -type fakeControlServer struct { +type mockControlServer struct { ateapipb.UnimplementedControlServer mu sync.Mutex @@ -50,7 +50,7 @@ type fakeControlServer struct { resumeNilActor bool // when true, ResumeActor returns a nil Actor } -func (f *fakeControlServer) CreateActor(_ context.Context, req *ateapipb.CreateActorRequest) (*ateapipb.CreateActorResponse, error) { +func (f *mockControlServer) CreateActor(_ context.Context, req *ateapipb.CreateActorRequest) (*ateapipb.CreateActorResponse, error) { f.mu.Lock() f.createCalls = append(f.createCalls, req.GetActorId()) f.mu.Unlock() @@ -60,7 +60,7 @@ func (f *fakeControlServer) CreateActor(_ context.Context, req *ateapipb.CreateA return &ateapipb.CreateActorResponse{Actor: &ateapipb.Actor{ActorId: req.GetActorId()}}, nil } -func (f *fakeControlServer) ResumeActor(_ context.Context, req *ateapipb.ResumeActorRequest) (*ateapipb.ResumeActorResponse, error) { +func (f *mockControlServer) ResumeActor(_ context.Context, req *ateapipb.ResumeActorRequest) (*ateapipb.ResumeActorResponse, error) { f.mu.Lock() f.resumeCalls = append(f.resumeCalls, req.GetActorId()) f.mu.Unlock() @@ -70,7 +70,7 @@ func (f *fakeControlServer) ResumeActor(_ context.Context, req *ateapipb.ResumeA return &ateapipb.ResumeActorResponse{Actor: &ateapipb.Actor{ActorId: req.GetActorId(), AteomPodIp: f.resumeIP}}, nil } -func (f *fakeControlServer) SuspendActor(_ context.Context, req *ateapipb.SuspendActorRequest) (*ateapipb.SuspendActorResponse, error) { +func (f *mockControlServer) SuspendActor(_ context.Context, req *ateapipb.SuspendActorRequest) (*ateapipb.SuspendActorResponse, error) { f.mu.Lock() f.suspendCalls = append(f.suspendCalls, req.GetActorId()) f.mu.Unlock() @@ -78,7 +78,7 @@ func (f *fakeControlServer) SuspendActor(_ context.Context, req *ateapipb.Suspen } // calls returns copies of the recorded call lists. -func (f *fakeControlServer) calls() (create, resume, suspend []string) { +func (f *mockControlServer) calls() (create, resume, suspend []string) { f.mu.Lock() defer f.mu.Unlock() return append([]string(nil), f.createCalls...), @@ -86,11 +86,11 @@ func (f *fakeControlServer) calls() (create, resume, suspend []string) { append([]string(nil), f.suspendCalls...) } -// fakeHarnessServer is an in-process proto.HarnessServiceServer standing in for +// mockHarnessServer is an in-process proto.HarnessServiceServer standing in for // the harness running inside an actor (substrate) or a local subprocess // (antigravity). It records the start frame and emits its configured outputs // followed by a terminal HarnessEnd. -type fakeHarnessServer struct { +type mockHarnessServer struct { proto.UnimplementedHarnessServiceServer // outputs are the messages emitted (in a single Outputs frame) before the @@ -109,7 +109,7 @@ type fakeHarnessServer struct { gotInputs []string } -func (s *fakeHarnessServer) Connect(stream proto.HarnessService_ConnectServer) error { +func (s *mockHarnessServer) Connect(stream proto.HarnessService_ConnectServer) error { if s.failConnect { return status.Error(codes.Internal, s.errMessage) } @@ -164,48 +164,48 @@ func (s *fakeHarnessServer) Connect(stream proto.HarnessService_ConnectServer) e } // received returns a copy of the start frame the server received. -func (s *fakeHarnessServer) received() (convID, harnessID string, inputs []string) { +func (s *mockHarnessServer) received() (convID, harnessID string, inputs []string) { s.mu.Lock() defer s.mu.Unlock() return s.gotConvID, s.gotHarnessID, append([]string(nil), s.gotInputs...) } -// fakeHandler records the messages and completion streamed during a turn. -type fakeHandler struct { +// mockHandler records the messages and completion streamed during a turn. +type mockHandler struct { mu sync.Mutex messages []*proto.Message complete bool } -func (h *fakeHandler) OnMessage(_ context.Context, _ string, msg *proto.Message) error { +func (h *mockHandler) OnMessage(_ context.Context, _ string, msg *proto.Message) error { h.mu.Lock() defer h.mu.Unlock() h.messages = append(h.messages, msg) return nil } -func (h *fakeHandler) OnComplete(_ context.Context, _ string) error { +func (h *mockHandler) OnComplete(_ context.Context, _ string) error { h.mu.Lock() defer h.mu.Unlock() h.complete = true return nil } -func (h *fakeHandler) isDone() bool { +func (h *mockHandler) isDone() bool { h.mu.Lock() defer h.mu.Unlock() return h.complete } // collected returns a copy of the messages received via OnMessage. -func (h *fakeHandler) collected() []*proto.Message { +func (h *mockHandler) collected() []*proto.Message { h.mu.Lock() defer h.mu.Unlock() return append([]*proto.Message(nil), h.messages...) } // texts returns the text content of each received message, in order. -func (h *fakeHandler) texts() []string { +func (h *mockHandler) texts() []string { h.mu.Lock() defer h.mu.Unlock() var out []string @@ -246,7 +246,7 @@ func thoughtText(summary string) *proto.Message { // startHarnessServer starts a HarnessService + health server (status SERVING) // on a random local port and returns its address. -func startHarnessServer(t *testing.T, srv *fakeHarnessServer) string { +func startHarnessServer(t *testing.T, srv *mockHarnessServer) string { t.Helper() lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -262,8 +262,8 @@ func startHarnessServer(t *testing.T, srv *fakeHarnessServer) string { return lis.Addr().String() } -// startControlServer starts a fake ATE Control server on a random local port. -func startControlServer(t *testing.T, srv *fakeControlServer) string { +// startControlServer starts a mock Substrate Control server on a random local port. +func startControlServer(t *testing.T, srv *mockControlServer) string { t.Helper() lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { diff --git a/internal/harness/substrate_test.go b/internal/harness/substrate_test.go index 76aa102..ee7ada5 100644 --- a/internal/harness/substrate_test.go +++ b/internal/harness/substrate_test.go @@ -120,8 +120,8 @@ func TestWaitForHealthy_ServerDown(t *testing.T) { } } -// newTestSubstrateHarness builds a SubstrateHarness wired to the fake control -// server and the fake harness server. It constructs the struct directly (rather +// newTestSubstrateHarness builds a SubstrateHarness wired to the mock control +// server and the mock harness server. It constructs the struct directly (rather // than via NewSubstrateHarness) so the control client can use insecure // credentials instead of the TLS that NewSubstrateHarness hard-codes. func newTestSubstrateHarness(t *testing.T, ctrlAddr, harnessAddr string) *SubstrateHarness { @@ -153,8 +153,8 @@ func newTestSubstrateHarness(t *testing.T, ctrlAddr, harnessAddr string) *Substr // break: create/resume idempotency, worker-IP extraction, the health gate, the // Connect streaming protocol, and suspend-on-close. func TestSubstrateHarness_EndToEnd(t *testing.T) { - ctrl := &fakeControlServer{resumeIP: "127.0.0.1"} - srv := &fakeHarnessServer{} + ctrl := &mockControlServer{resumeIP: "127.0.0.1"} + srv := &mockHarnessServer{} h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, srv)) ctx := context.Background() @@ -165,7 +165,7 @@ func TestSubstrateHarness_EndToEnd(t *testing.T) { if err := exec.Queue(ctx, userText("hi")); err != nil { t.Fatalf("Queue: %v", err) } - handler := &fakeHandler{} + handler := &mockHandler{} if err := exec.Run(ctx, handler); err != nil { t.Fatalf("Run: %v", err) } @@ -207,11 +207,11 @@ func TestSubstrateHarness_EndToEnd(t *testing.T) { } func TestSubstrateHarness_CreateAlreadyExistsTolerated(t *testing.T) { - ctrl := &fakeControlServer{ + ctrl := &mockControlServer{ resumeIP: "127.0.0.1", createErr: status.Error(codes.AlreadyExists, "exists"), } - h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &fakeHarnessServer{})) + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &mockHarnessServer{})) ctx := context.Background() exec, err := h.Start(ctx, "conv-1") @@ -223,7 +223,7 @@ func TestSubstrateHarness_CreateAlreadyExistsTolerated(t *testing.T) { if err := exec.Queue(ctx, userText("hi")); err != nil { t.Fatalf("Queue: %v", err) } - handler := &fakeHandler{} + handler := &mockHandler{} if err := exec.Run(ctx, handler); err != nil { t.Fatalf("Run: %v", err) } @@ -236,8 +236,8 @@ func TestSubstrateHarness_CreateAlreadyExistsTolerated(t *testing.T) { } func TestSubstrateHarness_ResumeNoWorkerIP(t *testing.T) { - ctrl := &fakeControlServer{resumeIP: ""} // empty AteomPodIp - h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &fakeHarnessServer{})) + ctrl := &mockControlServer{resumeIP: ""} // empty AteomPodIp + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &mockHarnessServer{})) _, err := h.Start(context.Background(), "conv-1") if err == nil { @@ -249,8 +249,8 @@ func TestSubstrateHarness_ResumeNoWorkerIP(t *testing.T) { } func TestSubstrateHarness_ResumeNilActor(t *testing.T) { - ctrl := &fakeControlServer{resumeNilActor: true} - h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &fakeHarnessServer{})) + ctrl := &mockControlServer{resumeNilActor: true} + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &mockHarnessServer{})) _, err := h.Start(context.Background(), "conv-1") if err == nil { @@ -262,8 +262,8 @@ func TestSubstrateHarness_ResumeNilActor(t *testing.T) { } func TestSubstrateHarness_HarnessFailedFrame(t *testing.T) { - ctrl := &fakeControlServer{resumeIP: "127.0.0.1"} - srv := &fakeHarnessServer{failFrame: true, errMessage: "boom"} + ctrl := &mockControlServer{resumeIP: "127.0.0.1"} + srv := &mockHarnessServer{failFrame: true, errMessage: "boom"} h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, srv)) ctx := context.Background() @@ -275,7 +275,7 @@ func TestSubstrateHarness_HarnessFailedFrame(t *testing.T) { if err := exec.Queue(ctx, userText("hi")); err != nil { t.Fatalf("Queue: %v", err) } - if err := exec.Run(ctx, &fakeHandler{}); err == nil { + if err := exec.Run(ctx, &mockHandler{}); err == nil { t.Fatal("expected error from failed harness frame, got nil") } else if !strings.Contains(err.Error(), "harness failed") { t.Errorf("error = %v, want it to mention 'harness failed'", err) From 6eca7b530695dddde0a2e229cbcfb8f95671349a Mon Sep 17 00:00:00 2001 From: Junjie Wang Date: Tue, 16 Jun 2026 10:50:30 -1000 Subject: [PATCH 3/3] Remove the undefined endpoint. --- cmd/ax/internal/cliutil/cliutil_harness.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/ax/internal/cliutil/cliutil_harness.go b/cmd/ax/internal/cliutil/cliutil_harness.go index 1187bf1..08a65ad 100644 --- a/cmd/ax/internal/cliutil/cliutil_harness.go +++ b/cmd/ax/internal/cliutil/cliutil_harness.go @@ -84,7 +84,7 @@ func NewControllerFromConfig(ctx context.Context, cfg *Config) (*controller2.Con return nil, fmt.Errorf("custom substrate harnesses require AX_SUBSTRATE=1") } for _, sc := range cfg.Harnesses.Substrate { - h, err := sc.NewHarness(endpoint) + h, err := sc.NewHarness("") if err != nil { return nil, fmt.Errorf("substrate harness %q: %w", sc.ID, err) }