Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/atenet/internal/app/router/extproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type ExtProcServer struct {
resumer *ActorResumer
}

func NewExtProcServer(port int, apiClient ateapipb.ControlClient) *ExtProcServer {
func NewExtProcServer(port int, apiClient ateapipb.ControlClient, actorResumeTimeout time.Duration) *ExtProcServer {
return &ExtProcServer{
port: port,
apiClient: apiClient,
recorder: NewQueryRecorder(100),
resumer: NewActorResumer(apiClient),
resumer: NewActorResumer(apiClient, actorResumeTimeout),
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/atenet/internal/app/router/extproc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestExtProcHeadersEvaluation(t *testing.T) {
},
}

s := NewExtProcServer(50051, clientMock)
s := NewExtProcServer(50051, clientMock, routerDefaultActorResumeTimeout)

reqHeaders := &extprocv3.HttpHeaders{
Headers: &corev3.HeaderMap{
Expand Down
14 changes: 8 additions & 6 deletions cmd/atenet/internal/app/router/resumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,26 @@ import (

// ActorResumer coordinates safe, deduplicated resumption of actors.
type ActorResumer struct {
apiClient ateapipb.ControlClient
flight singleflight.Group
apiClient ateapipb.ControlClient
resumeTimeout time.Duration
flight singleflight.Group
}

func NewActorResumer(apiClient ateapipb.ControlClient) *ActorResumer {
func NewActorResumer(apiClient ateapipb.ControlClient, resumeTimeout time.Duration) *ActorResumer {
return &ActorResumer{
apiClient: apiClient,
apiClient: apiClient,
resumeTimeout: resumeTimeout,
}
}

// ResumeActor ensures the requested actor is running. It deduplicates concurrent
// requests within the process and retries when needed.
func (r *ActorResumer) ResumeActor(ctx context.Context, actorID string) (*ateapipb.Actor, error) {
ch := r.flight.DoChan(actorID, func() (interface{}, error) {
// We detach the context from the first caller using a fixed background timeout.
// We detach the context from the first caller using a background timeout.
// This guarantees that if Caller 1 disconnects or times out, the underlying
// resume operation continues running for Caller 2 and Caller 3 without failing.
bgCtx, bgCancel := context.WithTimeout(context.Background(), 15*time.Second)
bgCtx, bgCancel := context.WithTimeout(context.Background(), r.resumeTimeout)
defer bgCancel()

backoff := wait.Backoff{
Expand Down
44 changes: 40 additions & 4 deletions cmd/atenet/internal/app/router/resumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestActorResumer_ResumeActor(t *testing.T) {
},
}

resumer := NewActorResumer(mock)
resumer := NewActorResumer(mock, routerDefaultActorResumeTimeout)
actor, err := resumer.ResumeActor(context.Background(), testActorID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestActorResumer_ResumeActor(t *testing.T) {
},
}

resumer := NewActorResumer(mock)
resumer := NewActorResumer(mock, routerDefaultActorResumeTimeout)
actor, err := resumer.ResumeActor(context.Background(), testActorID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand All @@ -108,13 +108,49 @@ func TestActorResumer_ResumeActor(t *testing.T) {
},
}

resumer := NewActorResumer(mock)
resumer := NewActorResumer(mock, routerDefaultActorResumeTimeout)
_, err := resumer.ResumeActor(context.Background(), testActorID)
if got := status.Code(err); got != codes.NotFound {
t.Errorf("expected gRPC code NotFound, got %v (err=%v)", got, err)
}
})

t.Run("ConfiguredDeadline", func(t *testing.T) {
const configuredTimeout = 2 * time.Second
var gotDeadline time.Time
start := time.Now()

mock := &resumerMockClient{
resumeFn: func(ctx context.Context, in *ateapipb.ResumeActorRequest, opts ...grpc.CallOption) (*ateapipb.ResumeActorResponse, error) {
deadline, ok := ctx.Deadline()
if !ok {
t.Errorf("expected ResumeActor context deadline")
} else {
gotDeadline = deadline
}
return &ateapipb.ResumeActorResponse{
Actor: &ateapipb.Actor{
ActorId: testActorID,
Status: ateapipb.Actor_STATUS_RUNNING,
AteomPodIp: expectedIP,
},
}, nil
},
}

resumer := NewActorResumer(mock, configuredTimeout)
_, err := resumer.ResumeActor(context.Background(), testActorID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if gotDeadline.IsZero() {
t.Fatalf("ResumeActor context deadline was not captured")
}
if got := gotDeadline.Sub(start); got < configuredTimeout || got > configuredTimeout+time.Second {
t.Errorf("expected ResumeActor deadline near %s, got %s", configuredTimeout, got)
}
})

t.Run("SingleflightDeduplication", func(t *testing.T) {
var resumeCalled int
var mu sync.Mutex
Expand All @@ -135,7 +171,7 @@ func TestActorResumer_ResumeActor(t *testing.T) {
},
}

resumer := NewActorResumer(mock)
resumer := NewActorResumer(mock, routerDefaultActorResumeTimeout)

var wg sync.WaitGroup
const concurrentRequests = 10
Expand Down
46 changes: 29 additions & 17 deletions cmd/atenet/internal/app/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,37 @@ var (
scheme = runtime.NewScheme()
)

const (
routerDefaultExtprocTimeout = 5 * time.Second
routerDefaultActorResumeTimeout = 15 * time.Second
routerDefaultRouteTimeout = 10 * time.Second
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(v1alpha1.AddToScheme(scheme))
}

// RouterConfig holds deployment setup and endpoint options for the router node instance.
type RouterConfig struct {
Standalone bool
Namespace string
Kubeconfig string
AteapiAddr string
HttpPort int
XdsPort int
ExtprocPort int
ExtprocAddr string
EnvoyImage string
TemplatesFile string
StatusPort int
HealthInterval time.Duration
HttpsPort int
EnvoyCertPath string
LogLevel string
Standalone bool
Namespace string
Kubeconfig string
AteapiAddr string
HttpPort int
XdsPort int
ExtprocPort int
ExtprocAddr string
ExtprocTimeout time.Duration
EnvoyImage string
TemplatesFile string
StatusPort int
HealthInterval time.Duration
ActorResumeTimeout time.Duration
RouteTimeout time.Duration
HttpsPort int
EnvoyCertPath string
LogLevel string
}

// RouterServer instantiates and coordinates runtime threads executing system modules.
Expand Down Expand Up @@ -140,10 +149,13 @@ func NewCmd() *cobra.Command {
cmd.Flags().IntVar(&cfg.XdsPort, "port-xds", 18000, "TCP port listening for the xDS dynamic Envoy connections")
cmd.Flags().IntVar(&cfg.ExtprocPort, "port-extproc", 50051, "Listen port for the Envoy dynamic External Processing (ext_proc) server")
cmd.Flags().StringVar(&cfg.ExtprocAddr, "extproc-address", "127.0.0.1", "Host IP or address of the Envoy External Processing (ext_proc) server")
cmd.Flags().DurationVar(&cfg.ExtprocTimeout, "extproc-timeout", routerDefaultExtprocTimeout, "Timeout for Envoy External Processing (ext_proc) service and message processing")
cmd.Flags().StringVar(&cfg.EnvoyImage, "envoy-image", "envoyproxy/envoy:v1.30-latest", "Image URI used for dynamically launched router instances")
cmd.Flags().StringVar(&cfg.TemplatesFile, "actor-templates-file", "", "Path to offline YAML configuration file listing ActorTemplates")
cmd.Flags().IntVar(&cfg.StatusPort, "status-port", 4040, "Port to serve /statusz on (set <= 0 to disable serving status)")
cmd.Flags().DurationVar(&cfg.HealthInterval, "health-interval", 1*time.Second, "Interval for checking health of dependent services")
cmd.Flags().DurationVar(&cfg.ActorResumeTimeout, "actor-resume-timeout", routerDefaultActorResumeTimeout, "Timeout for actor resume calls triggered by request routing")
cmd.Flags().DurationVar(&cfg.RouteTimeout, "route-timeout", routerDefaultRouteTimeout, "Timeout for Envoy route actions (0 disables route timeout)")
cmd.Flags().IntVar(&cfg.HttpsPort, "port-https", 8443, "TCP port for HTTPS workload traffic entering through the Envoy Router")
cmd.Flags().StringVar(&cfg.EnvoyCertPath, "envoy-cert-path", "", "Path to the Envoy certificate file (if empty, a self-signed cert will be generated for testing)")

Expand Down Expand Up @@ -211,7 +223,7 @@ func (s *RouterServer) Run(ctx context.Context) error {

g, ctx := errgroup.WithContext(ctx)

xdsSrv := NewXdsServer(s.cfg.XdsPort)
xdsSrv := NewXdsServer(s.cfg.XdsPort, s.cfg.ExtprocTimeout, s.cfg.RouteTimeout)
xdsSrv.SetConfig(s.cfg.HttpPort, s.cfg.ExtprocPort, s.cfg.ExtprocAddr)

var certContent, keyContent string
Expand All @@ -226,7 +238,7 @@ func (s *RouterServer) Run(ctx context.Context) error {

xdsSrv.SetTlsConfig(s.cfg.HttpsPort, s.cfg.EnvoyCertPath, certContent, keyContent)
if s.extprocSrv == nil {
s.extprocSrv = NewExtProcServer(s.cfg.ExtprocPort, s.apiClient)
s.extprocSrv = NewExtProcServer(s.cfg.ExtprocPort, s.apiClient, s.cfg.ActorResumeTimeout)
}
ctrl := NewController(s.k8sClient, s.clientset, s.cfg, xdsSrv, s.extprocSrv)

Expand Down
2 changes: 1 addition & 1 deletion cmd/atenet/internal/app/router/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestStatuszEndpoint(t *testing.T) {
t.Fatalf("Failed generating router server: %v", err)
}

srv.extprocSrv = NewExtProcServer(cfg.ExtprocPort, &mockClient{})
srv.extprocSrv = NewExtProcServer(cfg.ExtprocPort, &mockClient{}, routerDefaultActorResumeTimeout)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
38 changes: 21 additions & 17 deletions cmd/atenet/internal/app/router/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ const (

// XdsServer implements an aggregated discovery service server for dynamic Envoy router nodes.
type XdsServer struct {
xdsPort int
extprocPort int
extprocAddr string
ingressPort int
snapshot cachev3.SnapshotCache
srv serverv3.Server
versionCount int64
xdsPort int
extprocPort int
extprocAddr string
extprocTimeout time.Duration
ingressPort int
routeTimeout time.Duration
snapshot cachev3.SnapshotCache
srv serverv3.Server
versionCount int64

mu sync.Mutex

Expand All @@ -82,17 +84,19 @@ type XdsServer struct {
keyContent string
}

func NewXdsServer(xdsPort int) *XdsServer {
func NewXdsServer(xdsPort int, extprocTimeout, routeTimeout time.Duration) *XdsServer {
cache := cachev3.NewSnapshotCache(true, cachev3.IDHash{}, nil)
srv := serverv3.NewServer(context.Background(), cache, nil)

return &XdsServer{
xdsPort: xdsPort,
snapshot: cache,
srv: srv,
extprocPort: 50051, // matches default extproc port
extprocAddr: "127.0.0.1",
ingressPort: 8080,
xdsPort: xdsPort,
snapshot: cache,
srv: srv,
extprocPort: 50051, // matches default extproc port
extprocAddr: "127.0.0.1",
extprocTimeout: extprocTimeout,
ingressPort: 8080,
routeTimeout: routeTimeout,
}
}

Expand Down Expand Up @@ -286,7 +290,7 @@ func (x *XdsServer) buildRoutes() *routev3.RouteConfiguration {
ClusterSpecifier: &routev3.RouteAction_Cluster{
Cluster: "dynamic_forward_proxy_cluster",
},
Timeout: durationpb.New(10 * time.Second),
Timeout: durationpb.New(x.routeTimeout),
},
},
},
Expand All @@ -304,13 +308,13 @@ func (x *XdsServer) buildHcm(statPrefix string) *anypb.Any {
ClusterName: ClusterName,
},
},
Timeout: durationpb.New(5 * time.Second),
Timeout: durationpb.New(x.extprocTimeout),
},
MutationRules: &mutationrulesv3.HeaderMutationRules{
AllowAllRouting: &wrapperspb.BoolValue{Value: true},
},
// Explicitly configure the message timeout to avoid the 200ms default
MessageTimeout: durationpb.New(5 * time.Second),
MessageTimeout: durationpb.New(x.extprocTimeout),
ProcessingMode: &extprocv3filter.ProcessingMode{
RequestHeaderMode: extprocv3filter.ProcessingMode_SEND,
ResponseHeaderMode: extprocv3filter.ProcessingMode_SKIP,
Expand Down
Loading
Loading