diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index c0aff9f..4bf580c 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -48,7 +48,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: '1.25' + go-version-file: go.mod - name: Verify go mod tidy run: | @@ -61,6 +61,11 @@ jobs: - name: Test run: go test -v ./... + - name: Build (harness) + run: go build -tags harness -v ./... + + - name: Test (harness) + run: go test -tags harness -v ./... - name: Check for binary files in PR diff --git a/cmd/ax/internal/cliutil/cliutil_harness.go b/cmd/ax/internal/cliutil/cliutil_harness.go index 1187bf1..08a65ad 100644 --- a/cmd/ax/internal/cliutil/cliutil_harness.go +++ b/cmd/ax/internal/cliutil/cliutil_harness.go @@ -84,7 +84,7 @@ func NewControllerFromConfig(ctx context.Context, cfg *Config) (*controller2.Con return nil, fmt.Errorf("custom substrate harnesses require AX_SUBSTRATE=1") } for _, sc := range cfg.Harnesses.Substrate { - h, err := sc.NewHarness(endpoint) + h, err := sc.NewHarness("") if err != nil { return nil, fmt.Errorf("substrate harness %q: %w", sc.ID, err) } diff --git a/internal/harness/antigravity_test.go b/internal/harness/antigravity_test.go index ca7392f..4d2f02a 100644 --- a/internal/harness/antigravity_test.go +++ b/internal/harness/antigravity_test.go @@ -16,204 +16,64 @@ package harness import ( "context" - "net" "strings" - "sync" "testing" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/google/ax/proto" ) -type mockHandler struct { - mu sync.Mutex - messages []*proto.Message - complete bool - err error -} - -func (h *mockHandler) OnMessage(ctx context.Context, execID string, msg *proto.Message) error { - h.mu.Lock() - defer h.mu.Unlock() - h.messages = append(h.messages, msg) - return h.err -} - -func (h *mockHandler) OnComplete(ctx context.Context, execID string) error { - h.mu.Lock() - defer h.mu.Unlock() - h.complete = true - return nil -} - -// mockHarnessServer implements proto.HarnessServiceServer for testing. -type mockHarnessServer struct { - proto.UnimplementedHarnessServiceServer - failConnect bool -} - -func (s *mockHarnessServer) Connect(stream proto.HarnessService_ConnectServer) error { - if s.failConnect { - return status.Error(codes.Internal, "internal mock server crash") - } - - // Read the initiating HarnessRequest{start}. - req, err := stream.Recv() - if err != nil { - return err - } - - // 1. Verify conversation details - if req.GetConversationId() != "conv-test" { - return status.Error(codes.InvalidArgument, "invalid conversation_id") - } - - // 2. Stream thought frame - tMsg := &proto.Message{ - Role: "model", - Content: &proto.Content{ - Type: &proto.Content_Thought{ - Thought: &proto.ThoughtContent{ - Summary: []*proto.ThoughtSummaryContent{ - { - Type: &proto.ThoughtSummaryContent_Text{ - Text: &proto.TextContent{Text: "Analyzing"}, - }, - }, - }, - }, - }, - }, - } - if err := stream.Send(&proto.HarnessResponse{ - ConversationId: req.GetConversationId(), - Type: &proto.HarnessResponse_Outputs{ - Outputs: &proto.HarnessOutputs{Messages: []*proto.Message{tMsg}}, - }, - }); err != nil { - return err - } - - // 3. Stream text frame - txtMsg := &proto.Message{ - Role: "assistant", - Content: &proto.Content{ - Type: &proto.Content_Text{ - Text: &proto.TextContent{Text: "Hello world"}, - }, - }, - } - if err := stream.Send(&proto.HarnessResponse{ - ConversationId: req.GetConversationId(), - Type: &proto.HarnessResponse_Outputs{ - Outputs: &proto.HarnessOutputs{Messages: []*proto.Message{txtMsg}}, - }, - }); err != nil { - return err - } - - // 4. Stream end frame - return stream.Send(&proto.HarnessResponse{ - ConversationId: req.GetConversationId(), - Type: &proto.HarnessResponse_End{ - End: &proto.HarnessEnd{State: proto.State_STATE_COMPLETED}, - }, - }) -} - func TestAntigravityHarness_Run_Success(t *testing.T) { - // Spin up a local TCP listener - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to listen: %v", err) + srv := &mockHarnessServer{ + outputs: []*proto.Message{thoughtText("Analyzing"), assistantText("Hello world")}, } - defer lis.Close() - - // Initialize and start local gRPC server - grpcServer := grpc.NewServer() - mockServer := &mockHarnessServer{} - proto.RegisterHarnessServiceServer(grpcServer, mockServer) + harnessClient := NewAntigravityHarness(startHarnessServer(t, srv)) - go func() { - if err := grpcServer.Serve(lis); err != nil && err != grpc.ErrServerStopped { - t.Errorf("Serve failed: %v", err) - } - }() - defer grpcServer.Stop() - - harnessClient := NewAntigravityHarness(lis.Addr().String()) exec, err := harnessClient.Start(context.Background(), "conv-test") if err != nil { t.Fatalf("failed to start execution: %v", err) } defer exec.Close(context.Background()) - msg := &proto.Message{ - Role: "user", - Content: &proto.Content{ - Type: &proto.Content_Text{Text: &proto.TextContent{Text: "Hi"}}, - }, - } - if err := exec.Queue(context.Background(), msg); err != nil { + if err := exec.Queue(context.Background(), userText("Hi")); err != nil { t.Fatalf("failed to queue message: %v", err) } handler := &mockHandler{} - err = exec.Run(context.Background(), handler) - if err != nil { + if err := exec.Run(context.Background(), handler); err != nil { t.Fatalf("Run failed: %v", err) } - handler.mu.Lock() - defer handler.mu.Unlock() - - if !handler.complete { + if !handler.isDone() { t.Error("expected OnComplete to be called") } - if len(handler.messages) != 2 { - t.Fatalf("expected 2 messages, got %d", len(handler.messages)) + msgs := handler.collected() + if len(msgs) != 2 { + t.Fatalf("expected 2 messages, got %d", len(msgs)) + } + if got := msgs[0].GetContent().GetThought().GetSummary()[0].GetText().GetText(); got != "Analyzing" { + t.Errorf("expected 'Analyzing', got %q", got) } - if handler.messages[0].GetContent().GetThought().GetSummary()[0].GetText().GetText() != "Analyzing" { - t.Errorf("expected 'Analyzing', got %q", handler.messages[0].GetContent().GetThought().GetSummary()[0].GetText().GetText()) + if got := msgs[1].GetContent().GetText().GetText(); got != "Hello world" { + t.Errorf("expected 'Hello world', got %q", got) } - if handler.messages[1].GetContent().GetText().GetText() != "Hello world" { - t.Errorf("expected 'Hello world', got %q", handler.messages[1].GetContent().GetText().GetText()) + // The harness propagated the conversation id to the server. + if convID, _, _ := srv.received(); convID != "conv-test" { + t.Errorf("server got convID=%q, want conv-test", convID) } } func TestAntigravityHarness_Run_ErrorFrame(t *testing.T) { - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to listen: %v", err) - } - defer lis.Close() + srv := &mockHarnessServer{failConnect: true, errMessage: "internal mock server crash"} + harnessClient := NewAntigravityHarness(startHarnessServer(t, srv)) - grpcServer := grpc.NewServer() - mockServer := &mockHarnessServer{failConnect: true} - proto.RegisterHarnessServiceServer(grpcServer, mockServer) - - go func() { - _ = grpcServer.Serve(lis) - }() - defer grpcServer.Stop() - - harnessClient := NewAntigravityHarness(lis.Addr().String()) exec, _ := harnessClient.Start(context.Background(), "conv-test") defer exec.Close(context.Background()) - msg := &proto.Message{ - Role: "user", - Content: &proto.Content{ - Type: &proto.Content_Text{Text: &proto.TextContent{Text: "Hi"}}, - }, + if err := exec.Queue(context.Background(), userText("Hi")); err != nil { + t.Fatalf("failed to queue message: %v", err) } - _ = exec.Queue(context.Background(), msg) - handler := &mockHandler{} - err = exec.Run(context.Background(), handler) + err := exec.Run(context.Background(), &mockHandler{}) if err == nil { t.Fatal("expected error from Run(), got nil") } diff --git a/internal/harness/mocks_test.go b/internal/harness/mocks_test.go new file mode 100644 index 0000000..68bf2e4 --- /dev/null +++ b/internal/harness/mocks_test.go @@ -0,0 +1,277 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package harness + +// Shared in-process mocks for the harness tests: a mock Substrate Control server +// (the substrate control plane), a mock HarnessService server (the harness +// inside an actor), a recording Handler, and message builders. + +import ( + "context" + "net" + "sync" + "testing" + + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "github.com/google/ax/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" +) + +// mockControlServer is an in-process ateapipb.ControlServer that records the +// actor lifecycle calls SubstrateHarness makes and lets tests steer the +// CreateActor/ResumeActor responses. Only the three RPCs SubstrateHarness uses +// are implemented; the rest come from the embedded Unimplemented server. +type mockControlServer struct { + ateapipb.UnimplementedControlServer + + mu sync.Mutex + createCalls []string + resumeCalls []string + suspendCalls []string + + createErr error // returned from CreateActor when non-nil + resumeIP string // AteomPodIp returned from ResumeActor + resumeNilActor bool // when true, ResumeActor returns a nil Actor +} + +func (f *mockControlServer) CreateActor(_ context.Context, req *ateapipb.CreateActorRequest) (*ateapipb.CreateActorResponse, error) { + f.mu.Lock() + f.createCalls = append(f.createCalls, req.GetActorId()) + f.mu.Unlock() + if f.createErr != nil { + return nil, f.createErr + } + return &ateapipb.CreateActorResponse{Actor: &ateapipb.Actor{ActorId: req.GetActorId()}}, nil +} + +func (f *mockControlServer) ResumeActor(_ context.Context, req *ateapipb.ResumeActorRequest) (*ateapipb.ResumeActorResponse, error) { + f.mu.Lock() + f.resumeCalls = append(f.resumeCalls, req.GetActorId()) + f.mu.Unlock() + if f.resumeNilActor { + return &ateapipb.ResumeActorResponse{}, nil + } + return &ateapipb.ResumeActorResponse{Actor: &ateapipb.Actor{ActorId: req.GetActorId(), AteomPodIp: f.resumeIP}}, nil +} + +func (f *mockControlServer) SuspendActor(_ context.Context, req *ateapipb.SuspendActorRequest) (*ateapipb.SuspendActorResponse, error) { + f.mu.Lock() + f.suspendCalls = append(f.suspendCalls, req.GetActorId()) + f.mu.Unlock() + return &ateapipb.SuspendActorResponse{}, nil +} + +// calls returns copies of the recorded call lists. +func (f *mockControlServer) calls() (create, resume, suspend []string) { + f.mu.Lock() + defer f.mu.Unlock() + return append([]string(nil), f.createCalls...), + append([]string(nil), f.resumeCalls...), + append([]string(nil), f.suspendCalls...) +} + +// mockHarnessServer is an in-process proto.HarnessServiceServer standing in for +// the harness running inside an actor (substrate) or a local subprocess +// (antigravity). It records the start frame and emits its configured outputs +// followed by a terminal HarnessEnd. +type mockHarnessServer struct { + proto.UnimplementedHarnessServiceServer + + // outputs are the messages emitted (in a single Outputs frame) before the + // terminal HarnessEnd. When nil, each input is echoed as "ack: ". + outputs []*proto.Message + // failConnect makes Connect return an RPC error before any frame. + failConnect bool + // failFrame makes Connect terminate the turn with HarnessEnd{STATE_FAILED}. + failFrame bool + // errMessage is the error text used by failConnect/failFrame. + errMessage string + + mu sync.Mutex + gotConvID string + gotHarnessID string + gotInputs []string +} + +func (s *mockHarnessServer) Connect(stream proto.HarnessService_ConnectServer) error { + if s.failConnect { + return status.Error(codes.Internal, s.errMessage) + } + + req, err := stream.Recv() + if err != nil { + return err + } + + var inputs []string + for _, m := range req.GetStart().GetMessages() { + if text := m.GetContent().GetText().GetText(); text != "" { + inputs = append(inputs, text) + } + } + s.mu.Lock() + s.gotConvID = req.GetConversationId() + s.gotHarnessID = req.GetHarnessId() + s.gotInputs = inputs + s.mu.Unlock() + + convID := req.GetConversationId() + if s.failFrame { + return stream.Send(&proto.HarnessResponse{ + ConversationId: convID, + Type: &proto.HarnessResponse_End{ + End: &proto.HarnessEnd{State: proto.State_STATE_FAILED, ErrorMessage: s.errMessage}, + }, + }) + } + + msgs := s.outputs + if msgs == nil { + for _, in := range inputs { + msgs = append(msgs, assistantText("ack: "+in)) + } + } + if len(msgs) > 0 { + if err := stream.Send(&proto.HarnessResponse{ + ConversationId: convID, + Type: &proto.HarnessResponse_Outputs{ + Outputs: &proto.HarnessOutputs{Messages: msgs}, + }, + }); err != nil { + return err + } + } + return stream.Send(&proto.HarnessResponse{ + ConversationId: convID, + Type: &proto.HarnessResponse_End{End: &proto.HarnessEnd{State: proto.State_STATE_COMPLETED}}, + }) +} + +// received returns a copy of the start frame the server received. +func (s *mockHarnessServer) received() (convID, harnessID string, inputs []string) { + s.mu.Lock() + defer s.mu.Unlock() + return s.gotConvID, s.gotHarnessID, append([]string(nil), s.gotInputs...) +} + +// mockHandler records the messages and completion streamed during a turn. +type mockHandler struct { + mu sync.Mutex + messages []*proto.Message + complete bool +} + +func (h *mockHandler) OnMessage(_ context.Context, _ string, msg *proto.Message) error { + h.mu.Lock() + defer h.mu.Unlock() + h.messages = append(h.messages, msg) + return nil +} + +func (h *mockHandler) OnComplete(_ context.Context, _ string) error { + h.mu.Lock() + defer h.mu.Unlock() + h.complete = true + return nil +} + +func (h *mockHandler) isDone() bool { + h.mu.Lock() + defer h.mu.Unlock() + return h.complete +} + +// collected returns a copy of the messages received via OnMessage. +func (h *mockHandler) collected() []*proto.Message { + h.mu.Lock() + defer h.mu.Unlock() + return append([]*proto.Message(nil), h.messages...) +} + +// texts returns the text content of each received message, in order. +func (h *mockHandler) texts() []string { + h.mu.Lock() + defer h.mu.Unlock() + var out []string + for _, m := range h.messages { + out = append(out, m.GetContent().GetText().GetText()) + } + return out +} + +func assistantText(text string) *proto.Message { + return &proto.Message{ + Role: "assistant", + Content: &proto.Content{Type: &proto.Content_Text{Text: &proto.TextContent{Text: text}}}, + } +} + +func userText(text string) *proto.Message { + return &proto.Message{ + Role: "user", + Content: &proto.Content{Type: &proto.Content_Text{Text: &proto.TextContent{Text: text}}}, + } +} + +func thoughtText(summary string) *proto.Message { + return &proto.Message{ + Role: "model", + Content: &proto.Content{ + Type: &proto.Content_Thought{ + Thought: &proto.ThoughtContent{ + Summary: []*proto.ThoughtSummaryContent{ + {Type: &proto.ThoughtSummaryContent_Text{Text: &proto.TextContent{Text: summary}}}, + }, + }, + }, + }, + } +} + +// startHarnessServer starts a HarnessService + health server (status SERVING) +// on a random local port and returns its address. +func startHarnessServer(t *testing.T, srv *mockHarnessServer) string { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + proto.RegisterHarnessServiceServer(s, srv) + hs := health.NewServer() + hs.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(s, hs) + go func() { _ = s.Serve(lis) }() + t.Cleanup(s.Stop) + return lis.Addr().String() +} + +// startControlServer starts a mock Substrate Control server on a random local port. +func startControlServer(t *testing.T, srv *mockControlServer) string { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + ateapipb.RegisterControlServer(s, srv) + go func() { _ = s.Serve(lis) }() + t.Cleanup(s.Stop) + return lis.Addr().String() +} diff --git a/internal/harness/substrate_test.go b/internal/harness/substrate_test.go index 0ec3d3b..ee7ada5 100644 --- a/internal/harness/substrate_test.go +++ b/internal/harness/substrate_test.go @@ -17,13 +17,19 @@ package harness import ( "context" "net" + "slices" + "strconv" + "strings" "testing" "time" + "github.com/google/ax/internal/experimental/k8s/ate" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" ) // startHealthTestServer starts a gRPC server on a random local port. If hs is @@ -113,3 +119,165 @@ func TestWaitForHealthy_ServerDown(t *testing.T) { t.Fatal("expected timeout error when server is down, got nil") } } + +// newTestSubstrateHarness builds a SubstrateHarness wired to the mock control +// server and the mock harness server. It constructs the struct directly (rather +// than via NewSubstrateHarness) so the control client can use insecure +// credentials instead of the TLS that NewSubstrateHarness hard-codes. +func newTestSubstrateHarness(t *testing.T, ctrlAddr, harnessAddr string) *SubstrateHarness { + t.Helper() + _, portStr, err := net.SplitHostPort(harnessAddr) + if err != nil { + t.Fatalf("bad harness addr %q: %v", harnessAddr, err) + } + port, err := strconv.Atoi(portStr) + if err != nil { + t.Fatalf("bad harness port %q: %v", portStr, err) + } + client, err := ate.NewClient("ax", "antigravity-template", ctrlAddr, + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("failed to create ate client: %v", err) + } + return &SubstrateHarness{ + harnessID: "antigravity", + ateClient: client, + port: port, + dialOpts: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, + } +} + +// Test full SubstrateHarness Start -> Run -> Close flow against the shared +// in-process mocks (see mocks_test.go). +// They lock in the wiring that a substrate bump or an ax-side change could silently +// break: create/resume idempotency, worker-IP extraction, the health gate, the +// Connect streaming protocol, and suspend-on-close. +func TestSubstrateHarness_EndToEnd(t *testing.T) { + ctrl := &mockControlServer{resumeIP: "127.0.0.1"} + srv := &mockHarnessServer{} + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, srv)) + + ctx := context.Background() + exec, err := h.Start(ctx, "conv-1") + if err != nil { + t.Fatalf("Start: %v", err) + } + if err := exec.Queue(ctx, userText("hi")); err != nil { + t.Fatalf("Queue: %v", err) + } + handler := &mockHandler{} + if err := exec.Run(ctx, handler); err != nil { + t.Fatalf("Run: %v", err) + } + + // The harness server received the start frame with the right identifiers. + convID, harnessID, inputs := srv.received() + if convID != "conv-1" || harnessID != "antigravity" { + t.Errorf("server got convID=%q harnessID=%q, want conv-1/antigravity", convID, harnessID) + } + if !slices.Equal(inputs, []string{"hi"}) { + t.Errorf("server got inputs=%v, want [hi]", inputs) + } + + // The handler streamed the output and completed. + if !handler.isDone() { + t.Error("handler did not complete") + } + if got := handler.texts(); !slices.Equal(got, []string{"ack: hi"}) { + t.Errorf("handler messages=%v, want [ack: hi]", got) + } + + // CreateActor then ResumeActor ran for the conversation; no suspend yet. + create, resume, suspend := ctrl.calls() + want := []string{"conv-1"} + if !slices.Equal(create, want) || !slices.Equal(resume, want) { + t.Errorf("create=%v resume=%v, want %v each", create, resume, want) + } + if len(suspend) != 0 { + t.Errorf("suspend called before Close: %v", suspend) + } + + // Close suspends the actor. + if err := exec.Close(ctx); err != nil { + t.Fatalf("Close: %v", err) + } + if _, _, suspend = ctrl.calls(); !slices.Equal(suspend, want) { + t.Errorf("suspend=%v, want %v", suspend, want) + } +} + +func TestSubstrateHarness_CreateAlreadyExistsTolerated(t *testing.T) { + ctrl := &mockControlServer{ + resumeIP: "127.0.0.1", + createErr: status.Error(codes.AlreadyExists, "exists"), + } + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &mockHarnessServer{})) + + ctx := context.Background() + exec, err := h.Start(ctx, "conv-1") + if err != nil { + t.Fatalf("Start should tolerate AlreadyExists: %v", err) + } + t.Cleanup(func() { _ = exec.Close(ctx) }) + + if err := exec.Queue(ctx, userText("hi")); err != nil { + t.Fatalf("Queue: %v", err) + } + handler := &mockHandler{} + if err := exec.Run(ctx, handler); err != nil { + t.Fatalf("Run: %v", err) + } + if !handler.isDone() { + t.Error("handler did not complete") + } + if _, resume, _ := ctrl.calls(); !slices.Equal(resume, []string{"conv-1"}) { + t.Errorf("resume=%v, want [conv-1]", resume) + } +} + +func TestSubstrateHarness_ResumeNoWorkerIP(t *testing.T) { + ctrl := &mockControlServer{resumeIP: ""} // empty AteomPodIp + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &mockHarnessServer{})) + + _, err := h.Start(context.Background(), "conv-1") + if err == nil { + t.Fatal("expected error for empty worker IP, got nil") + } + if !strings.Contains(err.Error(), "no active worker IP") { + t.Errorf("error = %v, want it to mention 'no active worker IP'", err) + } +} + +func TestSubstrateHarness_ResumeNilActor(t *testing.T) { + ctrl := &mockControlServer{resumeNilActor: true} + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, &mockHarnessServer{})) + + _, err := h.Start(context.Background(), "conv-1") + if err == nil { + t.Fatal("expected error for nil actor, got nil") + } + if !strings.Contains(err.Error(), "nil actor") { + t.Errorf("error = %v, want it to mention 'nil actor'", err) + } +} + +func TestSubstrateHarness_HarnessFailedFrame(t *testing.T) { + ctrl := &mockControlServer{resumeIP: "127.0.0.1"} + srv := &mockHarnessServer{failFrame: true, errMessage: "boom"} + h := newTestSubstrateHarness(t, startControlServer(t, ctrl), startHarnessServer(t, srv)) + + ctx := context.Background() + exec, err := h.Start(ctx, "conv-1") + if err != nil { + t.Fatalf("Start: %v", err) + } + t.Cleanup(func() { _ = exec.Close(ctx) }) + if err := exec.Queue(ctx, userText("hi")); err != nil { + t.Fatalf("Queue: %v", err) + } + if err := exec.Run(ctx, &mockHandler{}); err == nil { + t.Fatal("expected error from failed harness frame, got nil") + } else if !strings.Contains(err.Error(), "harness failed") { + t.Errorf("error = %v, want it to mention 'harness failed'", err) + } +}