diff --git a/cmd/atenet/internal/app/router/extproc.go b/cmd/atenet/internal/app/router/extproc.go index 20aaf15ef..3d88a4636 100644 --- a/cmd/atenet/internal/app/router/extproc.go +++ b/cmd/atenet/internal/app/router/extproc.go @@ -39,12 +39,12 @@ type ExtProcServer struct { resumer *ActorResumer } -func NewExtProcServer(port int, apiClient ateapipb.ControlClient) *ExtProcServer { +func NewExtProcServer(port int, apiClient ateapipb.ControlClient, actorResumeTimeout time.Duration) *ExtProcServer { return &ExtProcServer{ port: port, apiClient: apiClient, recorder: NewQueryRecorder(100), - resumer: NewActorResumer(apiClient), + resumer: NewActorResumer(apiClient, actorResumeTimeout), } } diff --git a/cmd/atenet/internal/app/router/extproc_test.go b/cmd/atenet/internal/app/router/extproc_test.go index 3d66775de..eff1cd9ba 100644 --- a/cmd/atenet/internal/app/router/extproc_test.go +++ b/cmd/atenet/internal/app/router/extproc_test.go @@ -138,7 +138,7 @@ func TestExtProcHeadersEvaluation(t *testing.T) { }, } - s := NewExtProcServer(50051, clientMock) + s := NewExtProcServer(50051, clientMock, routerDefaultActorResumeTimeout) reqHeaders := &extprocv3.HttpHeaders{ Headers: &corev3.HeaderMap{ diff --git a/cmd/atenet/internal/app/router/resumer.go b/cmd/atenet/internal/app/router/resumer.go index 2f9a420f9..d8a040b93 100644 --- a/cmd/atenet/internal/app/router/resumer.go +++ b/cmd/atenet/internal/app/router/resumer.go @@ -27,13 +27,15 @@ import ( // ActorResumer coordinates safe, deduplicated resumption of actors. type ActorResumer struct { - apiClient ateapipb.ControlClient - flight singleflight.Group + apiClient ateapipb.ControlClient + resumeTimeout time.Duration + flight singleflight.Group } -func NewActorResumer(apiClient ateapipb.ControlClient) *ActorResumer { +func NewActorResumer(apiClient ateapipb.ControlClient, resumeTimeout time.Duration) *ActorResumer { return &ActorResumer{ - apiClient: apiClient, + apiClient: apiClient, + resumeTimeout: resumeTimeout, } } @@ -41,10 +43,10 @@ func NewActorResumer(apiClient ateapipb.ControlClient) *ActorResumer { // requests within the process and retries when needed. func (r *ActorResumer) ResumeActor(ctx context.Context, actorID string) (*ateapipb.Actor, error) { ch := r.flight.DoChan(actorID, func() (interface{}, error) { - // We detach the context from the first caller using a fixed background timeout. + // We detach the context from the first caller using a background timeout. // This guarantees that if Caller 1 disconnects or times out, the underlying // resume operation continues running for Caller 2 and Caller 3 without failing. - bgCtx, bgCancel := context.WithTimeout(context.Background(), 15*time.Second) + bgCtx, bgCancel := context.WithTimeout(context.Background(), r.resumeTimeout) defer bgCancel() backoff := wait.Backoff{ diff --git a/cmd/atenet/internal/app/router/resumer_test.go b/cmd/atenet/internal/app/router/resumer_test.go index 8e10f7f57..a9c658ad7 100644 --- a/cmd/atenet/internal/app/router/resumer_test.go +++ b/cmd/atenet/internal/app/router/resumer_test.go @@ -57,7 +57,7 @@ func TestActorResumer_ResumeActor(t *testing.T) { }, } - resumer := NewActorResumer(mock) + resumer := NewActorResumer(mock, routerDefaultActorResumeTimeout) actor, err := resumer.ResumeActor(context.Background(), testActorID) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -88,7 +88,7 @@ func TestActorResumer_ResumeActor(t *testing.T) { }, } - resumer := NewActorResumer(mock) + resumer := NewActorResumer(mock, routerDefaultActorResumeTimeout) actor, err := resumer.ResumeActor(context.Background(), testActorID) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -108,13 +108,49 @@ func TestActorResumer_ResumeActor(t *testing.T) { }, } - resumer := NewActorResumer(mock) + resumer := NewActorResumer(mock, routerDefaultActorResumeTimeout) _, err := resumer.ResumeActor(context.Background(), testActorID) if got := status.Code(err); got != codes.NotFound { t.Errorf("expected gRPC code NotFound, got %v (err=%v)", got, err) } }) + t.Run("ConfiguredDeadline", func(t *testing.T) { + const configuredTimeout = 2 * time.Second + var gotDeadline time.Time + start := time.Now() + + mock := &resumerMockClient{ + resumeFn: func(ctx context.Context, in *ateapipb.ResumeActorRequest, opts ...grpc.CallOption) (*ateapipb.ResumeActorResponse, error) { + deadline, ok := ctx.Deadline() + if !ok { + t.Errorf("expected ResumeActor context deadline") + } else { + gotDeadline = deadline + } + return &ateapipb.ResumeActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: testActorID, + Status: ateapipb.Actor_STATUS_RUNNING, + AteomPodIp: expectedIP, + }, + }, nil + }, + } + + resumer := NewActorResumer(mock, configuredTimeout) + _, err := resumer.ResumeActor(context.Background(), testActorID) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gotDeadline.IsZero() { + t.Fatalf("ResumeActor context deadline was not captured") + } + if got := gotDeadline.Sub(start); got < configuredTimeout || got > configuredTimeout+time.Second { + t.Errorf("expected ResumeActor deadline near %s, got %s", configuredTimeout, got) + } + }) + t.Run("SingleflightDeduplication", func(t *testing.T) { var resumeCalled int var mu sync.Mutex @@ -135,7 +171,7 @@ func TestActorResumer_ResumeActor(t *testing.T) { }, } - resumer := NewActorResumer(mock) + resumer := NewActorResumer(mock, routerDefaultActorResumeTimeout) var wg sync.WaitGroup const concurrentRequests = 10 diff --git a/cmd/atenet/internal/app/router/router.go b/cmd/atenet/internal/app/router/router.go index b0c1d2f26..f67ea9cdc 100644 --- a/cmd/atenet/internal/app/router/router.go +++ b/cmd/atenet/internal/app/router/router.go @@ -54,6 +54,12 @@ var ( scheme = runtime.NewScheme() ) +const ( + routerDefaultExtprocTimeout = 5 * time.Second + routerDefaultActorResumeTimeout = 15 * time.Second + routerDefaultRouteTimeout = 10 * time.Second +) + func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(v1alpha1.AddToScheme(scheme)) @@ -61,21 +67,24 @@ func init() { // RouterConfig holds deployment setup and endpoint options for the router node instance. type RouterConfig struct { - Standalone bool - Namespace string - Kubeconfig string - AteapiAddr string - HttpPort int - XdsPort int - ExtprocPort int - ExtprocAddr string - EnvoyImage string - TemplatesFile string - StatusPort int - HealthInterval time.Duration - HttpsPort int - EnvoyCertPath string - LogLevel string + Standalone bool + Namespace string + Kubeconfig string + AteapiAddr string + HttpPort int + XdsPort int + ExtprocPort int + ExtprocAddr string + ExtprocTimeout time.Duration + EnvoyImage string + TemplatesFile string + StatusPort int + HealthInterval time.Duration + ActorResumeTimeout time.Duration + RouteTimeout time.Duration + HttpsPort int + EnvoyCertPath string + LogLevel string } // RouterServer instantiates and coordinates runtime threads executing system modules. @@ -140,10 +149,13 @@ func NewCmd() *cobra.Command { cmd.Flags().IntVar(&cfg.XdsPort, "port-xds", 18000, "TCP port listening for the xDS dynamic Envoy connections") cmd.Flags().IntVar(&cfg.ExtprocPort, "port-extproc", 50051, "Listen port for the Envoy dynamic External Processing (ext_proc) server") cmd.Flags().StringVar(&cfg.ExtprocAddr, "extproc-address", "127.0.0.1", "Host IP or address of the Envoy External Processing (ext_proc) server") + cmd.Flags().DurationVar(&cfg.ExtprocTimeout, "extproc-timeout", routerDefaultExtprocTimeout, "Timeout for Envoy External Processing (ext_proc) service and message processing") cmd.Flags().StringVar(&cfg.EnvoyImage, "envoy-image", "envoyproxy/envoy:v1.30-latest", "Image URI used for dynamically launched router instances") cmd.Flags().StringVar(&cfg.TemplatesFile, "actor-templates-file", "", "Path to offline YAML configuration file listing ActorTemplates") cmd.Flags().IntVar(&cfg.StatusPort, "status-port", 4040, "Port to serve /statusz on (set <= 0 to disable serving status)") cmd.Flags().DurationVar(&cfg.HealthInterval, "health-interval", 1*time.Second, "Interval for checking health of dependent services") + cmd.Flags().DurationVar(&cfg.ActorResumeTimeout, "actor-resume-timeout", routerDefaultActorResumeTimeout, "Timeout for actor resume calls triggered by request routing") + cmd.Flags().DurationVar(&cfg.RouteTimeout, "route-timeout", routerDefaultRouteTimeout, "Timeout for Envoy route actions (0 disables route timeout)") cmd.Flags().IntVar(&cfg.HttpsPort, "port-https", 8443, "TCP port for HTTPS workload traffic entering through the Envoy Router") cmd.Flags().StringVar(&cfg.EnvoyCertPath, "envoy-cert-path", "", "Path to the Envoy certificate file (if empty, a self-signed cert will be generated for testing)") @@ -211,7 +223,7 @@ func (s *RouterServer) Run(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) - xdsSrv := NewXdsServer(s.cfg.XdsPort) + xdsSrv := NewXdsServer(s.cfg.XdsPort, s.cfg.ExtprocTimeout, s.cfg.RouteTimeout) xdsSrv.SetConfig(s.cfg.HttpPort, s.cfg.ExtprocPort, s.cfg.ExtprocAddr) var certContent, keyContent string @@ -226,7 +238,7 @@ func (s *RouterServer) Run(ctx context.Context) error { xdsSrv.SetTlsConfig(s.cfg.HttpsPort, s.cfg.EnvoyCertPath, certContent, keyContent) if s.extprocSrv == nil { - s.extprocSrv = NewExtProcServer(s.cfg.ExtprocPort, s.apiClient) + s.extprocSrv = NewExtProcServer(s.cfg.ExtprocPort, s.apiClient, s.cfg.ActorResumeTimeout) } ctrl := NewController(s.k8sClient, s.clientset, s.cfg, xdsSrv, s.extprocSrv) diff --git a/cmd/atenet/internal/app/router/status_test.go b/cmd/atenet/internal/app/router/status_test.go index 8641de263..efa4720f1 100644 --- a/cmd/atenet/internal/app/router/status_test.go +++ b/cmd/atenet/internal/app/router/status_test.go @@ -62,7 +62,7 @@ func TestStatuszEndpoint(t *testing.T) { t.Fatalf("Failed generating router server: %v", err) } - srv.extprocSrv = NewExtProcServer(cfg.ExtprocPort, &mockClient{}) + srv.extprocSrv = NewExtProcServer(cfg.ExtprocPort, &mockClient{}, routerDefaultActorResumeTimeout) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/cmd/atenet/internal/app/router/xds.go b/cmd/atenet/internal/app/router/xds.go index 964fc5e92..80fe5a1fc 100644 --- a/cmd/atenet/internal/app/router/xds.go +++ b/cmd/atenet/internal/app/router/xds.go @@ -66,13 +66,15 @@ const ( // XdsServer implements an aggregated discovery service server for dynamic Envoy router nodes. type XdsServer struct { - xdsPort int - extprocPort int - extprocAddr string - ingressPort int - snapshot cachev3.SnapshotCache - srv serverv3.Server - versionCount int64 + xdsPort int + extprocPort int + extprocAddr string + extprocTimeout time.Duration + ingressPort int + routeTimeout time.Duration + snapshot cachev3.SnapshotCache + srv serverv3.Server + versionCount int64 mu sync.Mutex @@ -82,17 +84,19 @@ type XdsServer struct { keyContent string } -func NewXdsServer(xdsPort int) *XdsServer { +func NewXdsServer(xdsPort int, extprocTimeout, routeTimeout time.Duration) *XdsServer { cache := cachev3.NewSnapshotCache(true, cachev3.IDHash{}, nil) srv := serverv3.NewServer(context.Background(), cache, nil) return &XdsServer{ - xdsPort: xdsPort, - snapshot: cache, - srv: srv, - extprocPort: 50051, // matches default extproc port - extprocAddr: "127.0.0.1", - ingressPort: 8080, + xdsPort: xdsPort, + snapshot: cache, + srv: srv, + extprocPort: 50051, // matches default extproc port + extprocAddr: "127.0.0.1", + extprocTimeout: extprocTimeout, + ingressPort: 8080, + routeTimeout: routeTimeout, } } @@ -286,7 +290,7 @@ func (x *XdsServer) buildRoutes() *routev3.RouteConfiguration { ClusterSpecifier: &routev3.RouteAction_Cluster{ Cluster: "dynamic_forward_proxy_cluster", }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(x.routeTimeout), }, }, }, @@ -304,13 +308,13 @@ func (x *XdsServer) buildHcm(statPrefix string) *anypb.Any { ClusterName: ClusterName, }, }, - Timeout: durationpb.New(5 * time.Second), + Timeout: durationpb.New(x.extprocTimeout), }, MutationRules: &mutationrulesv3.HeaderMutationRules{ AllowAllRouting: &wrapperspb.BoolValue{Value: true}, }, // Explicitly configure the message timeout to avoid the 200ms default - MessageTimeout: durationpb.New(5 * time.Second), + MessageTimeout: durationpb.New(x.extprocTimeout), ProcessingMode: &extprocv3filter.ProcessingMode{ RequestHeaderMode: extprocv3filter.ProcessingMode_SEND, ResponseHeaderMode: extprocv3filter.ProcessingMode_SKIP, diff --git a/cmd/atenet/internal/app/router/xds_test.go b/cmd/atenet/internal/app/router/xds_test.go index 92e347648..8d3eaa45b 100644 --- a/cmd/atenet/internal/app/router/xds_test.go +++ b/cmd/atenet/internal/app/router/xds_test.go @@ -24,12 +24,14 @@ import ( clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + extprocv3filter "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" + hcmv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" ) func TestXdsServer_UpdateSnapshot(t *testing.T) { - server := NewXdsServer(18000) + server := NewXdsServer(18000, routerDefaultExtprocTimeout, routerDefaultRouteTimeout) server.SetConfig(8081, 50052, "10.0.0.1") err := server.UpdateSnapshot() @@ -51,6 +53,7 @@ func TestXdsServer_UpdateSnapshot(t *testing.T) { if err := snap.Consistent(); err != nil { t.Fatalf("Integrity check failed on snapshot: %v", err) } + assertSnapshotTimeouts(t, snap, routerDefaultExtprocTimeout, routerDefaultRouteTimeout) // Verify clusters generated clustersMap := snap.GetResources(resourcev3.ClusterType) @@ -138,8 +141,29 @@ func TestXdsServer_UpdateSnapshot(t *testing.T) { } } +func TestXdsServer_UpdateSnapshot_WithTimeoutOverrides(t *testing.T) { + const ( + extprocTimeout = 7 * time.Second + routeTimeout = 23 * time.Second + ) + + server := NewXdsServer(18000, extprocTimeout, routeTimeout) + server.SetConfig(8081, 50052, "10.0.0.1") + + snap := updatedSnapshot(t, server) + assertSnapshotTimeouts(t, snap, extprocTimeout, routeTimeout) +} + +func TestXdsServer_UpdateSnapshot_WithRouteTimeoutDisabled(t *testing.T) { + server := NewXdsServer(18000, routerDefaultExtprocTimeout, 0) + server.SetConfig(8081, 50052, "10.0.0.1") + + snap := updatedSnapshot(t, server) + assertSnapshotTimeouts(t, snap, routerDefaultExtprocTimeout, 0) +} + func TestXdsServer_UpdateSnapshot_WithHttps(t *testing.T) { - server := NewXdsServer(18000) + server := NewXdsServer(18000, routerDefaultExtprocTimeout, routerDefaultRouteTimeout) server.SetConfig(8085, 50053, "127.0.0.1") server.SetTlsConfig(8443, "", "dummy-cert", "dummy-key") @@ -182,7 +206,7 @@ func TestXdsServer_UpdateSnapshot_WithHttps(t *testing.T) { } func TestXdsServer_Serve_Shutdown(t *testing.T) { - server := NewXdsServer(18000) + server := NewXdsServer(18000, routerDefaultExtprocTimeout, routerDefaultRouteTimeout) server.SetConfig(8085, 50053, "127.0.0.1") lis, err := net.Listen("tcp", "127.0.0.1:0") @@ -210,3 +234,98 @@ func TestXdsServer_Serve_Shutdown(t *testing.T) { t.Error("Timeout exceeded waiting for Serve to finish graceful closure") } } + +func updatedSnapshot(t *testing.T, server *XdsServer) *cachev3.Snapshot { + t.Helper() + + if err := server.UpdateSnapshot(); err != nil { + t.Fatalf("UpdateSnapshot failed: %v", err) + } + + res, err := server.snapshot.GetSnapshot(NodeID) + if err != nil { + t.Fatalf("Failed to get generated snapshot: %v", err) + } + + snap, ok := res.(*cachev3.Snapshot) + if !ok { + t.Fatalf("Snapshot doesn't conform to type *cachev3.Snapshot, got %T", res) + } + + if err := snap.Consistent(); err != nil { + t.Fatalf("Integrity check failed on snapshot: %v", err) + } + + return snap +} + +func assertSnapshotTimeouts(t *testing.T, snap *cachev3.Snapshot, wantExtprocTimeout, wantRouteTimeout time.Duration) { + t.Helper() + + routesMap := snap.GetResources(resourcev3.RouteType) + rawRoute, exists := routesMap[RouteName] + if !exists { + t.Fatalf("Route name %q is missing from snapshot routes configuration", RouteName) + } + + routeConfig := rawRoute.(*routev3.RouteConfiguration) + if len(routeConfig.GetVirtualHosts()) != 1 { + t.Fatalf("Expected 1 VirtualHost definition, got %d", len(routeConfig.GetVirtualHosts())) + } + if len(routeConfig.GetVirtualHosts()[0].GetRoutes()) != 1 { + t.Fatalf("Expected 1 route, got %d", len(routeConfig.GetVirtualHosts()[0].GetRoutes())) + } + + routeTimeout := routeConfig.GetVirtualHosts()[0].GetRoutes()[0].GetRoute().GetTimeout() + if routeTimeout == nil { + t.Fatalf("Route timeout is missing") + } + if got := routeTimeout.AsDuration(); got != wantRouteTimeout { + t.Errorf("Expected route timeout %s, got %s", wantRouteTimeout, got) + } + + listenersMap := snap.GetResources(resourcev3.ListenerType) + rawListener, exists := listenersMap[IngressHTTPListener] + if !exists { + t.Fatalf("Listener name %q is missing from snapshot listeners", IngressHTTPListener) + } + + listener := rawListener.(*listenerv3.Listener) + filter := listener.GetFilterChains()[0].GetFilters()[0] + var hcm hcmv3.HttpConnectionManager + if err := filter.GetTypedConfig().UnmarshalTo(&hcm); err != nil { + t.Fatalf("Failed to unmarshal HTTP connection manager: %v", err) + } + + var extproc extprocv3filter.ExternalProcessor + found := false + for _, httpFilter := range hcm.GetHttpFilters() { + if httpFilter.GetName() != "envoy.filters.http.ext_proc" { + continue + } + if err := httpFilter.GetTypedConfig().UnmarshalTo(&extproc); err != nil { + t.Fatalf("Failed to unmarshal external processor filter: %v", err) + } + found = true + break + } + if !found { + t.Fatalf("External processor filter is missing") + } + + grpcTimeout := extproc.GetGrpcService().GetTimeout() + if grpcTimeout == nil { + t.Fatalf("External processor gRPC timeout is missing") + } + if got := grpcTimeout.AsDuration(); got != wantExtprocTimeout { + t.Errorf("Expected external processor gRPC timeout %s, got %s", wantExtprocTimeout, got) + } + + messageTimeout := extproc.GetMessageTimeout() + if messageTimeout == nil { + t.Fatalf("External processor message timeout is missing") + } + if got := messageTimeout.AsDuration(); got != wantExtprocTimeout { + t.Errorf("Expected external processor message timeout %s, got %s", wantExtprocTimeout, got) + } +}