Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cmd/ateapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
sessionIDJWTPoolFile = pflag.String("session-id-jwt-pool", "", "The file that contains the serialized JWT authority pool for signing session JWTs")

sessionIDCAPoolFile = pflag.String("session-id-ca-pool", "", "The file that contains the CA pool for signing session JWTs")
workerpoolCACerts = pflag.String("workerpool-ca-certs", "", "The file that contains the CA for verifying workerpool client certificates.")
podidentityCACerts = pflag.String("podidentity-ca-certs", "", "The file that contains the CA for verifying workerpool client certificates.")

showVersion = pflag.Bool("version", false, "Print version and exit.")
)
Expand Down Expand Up @@ -133,7 +133,7 @@ func main() {
dialer := controlapi.NewAteletDialer(workerPodInformer.GetIndexer(), ateletPodInformer.GetIndexer())
sm := controlapi.NewService(redisPersistence, actorTemplateLister, dialer, clientset)

sessionIdentitySrv := sessionidentity.New(*clientJWTIssuer, *clientJWTAudience, *sessionIDJWTPoolFile, *sessionIDCAPoolFile, *workerpoolCACerts)
sessionIdentitySrv := sessionidentity.New(*clientJWTIssuer, *clientJWTAudience, *sessionIDJWTPoolFile, *sessionIDCAPoolFile, *podidentityCACerts)

lisCfg := &net.ListenConfig{}
lis, err := lisCfg.Listen(ctx, "tcp", *listenAddr)
Expand Down Expand Up @@ -195,7 +195,7 @@ func logFlagValues(ctx context.Context) {
slog.String("client-jwt-audience", *clientJWTAudience),
slog.String("session-id-jwt-pool", *sessionIDJWTPoolFile),
slog.String("session-id-ca-pool", *sessionIDCAPoolFile),
slog.String("workerpool-ca-certs", *workerpoolCACerts),
slog.String("podidentity-ca-certs", *podidentityCACerts),
)
}

Expand Down Expand Up @@ -307,17 +307,17 @@ func newKubeClients() (*kubernetes.Clientset, versioned.Interface, error) {
// client-cert verification.
func buildServerCreds(ctx context.Context) (credentials.TransportCredentials, error) {
var clientCAs *x509.CertPool
if *workerpoolCACerts != "" {
if *podidentityCACerts != "" {
// TODO: Periodically reload these to handle rotations. Consult with Tina to see how she did it for client-go.
ca, err := os.ReadFile(*workerpoolCACerts)
ca, err := os.ReadFile(*podidentityCACerts)
if err != nil {
return nil, fmt.Errorf("read workerpool CA: %w", err)
}
clientCAs = x509.NewCertPool()
if !clientCAs.AppendCertsFromPEM(ca) {
return nil, fmt.Errorf("parse workerpool CA from %s", *workerpoolCACerts)
return nil, fmt.Errorf("parse workerpool CA from %s", *podidentityCACerts)
}
slog.InfoContext(ctx, "Using custom CA for workerpool clients", slog.String("path", *workerpoolCACerts))
slog.InfoContext(ctx, "Using custom CA for workerpool clients", slog.String("path", *podidentityCACerts))
}
return credentials.NewTLS(&tls.Config{
GetCertificate: credbundle.Loader(*grpcServerCredBundle),
Expand Down
7 changes: 5 additions & 2 deletions cmd/atenet/internal/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ func NewRouterCmd() *cobra.Command {
cmd.Flags().IntVar(&cfg.StatusPort, "status-port", 4040, "Port to serve /statusz on (set <= 0 to disable serving status)")
cmd.Flags().DurationVar(&cfg.HealthInterval, "health-interval", 1*time.Second, "Interval for checking health of dependent services")
cmd.Flags().IntVar(&cfg.HttpsPort, "port-https", 8443, "TCP port for HTTPS workload traffic entering through the Envoy Router")
cmd.Flags().StringVar(&cfg.EnvoyCertPath, "envoy-cert-path", "", "Path to the Envoy certificate file (if empty, a self-signed cert will be generated for testing)")

cmd.Flags().StringVar(&cfg.EnvoyCertPath, "envoy-cert-path", "", "Path to the Envoy certificate file")
cmd.Flags().StringVar(&cfg.AteapiServerCAPath, "ate-apiserver-ca-path", "", "Path to the CA bundle used to verify the ateapi server certificate.")
cmd.Flags().StringVar(&cfg.AteapiClientCredPath, "ateapi-client-cred-bundle", "", "Path to the credential bundle presented as this router's client certificate when talking to the ateapi server.")
cmd.MarkFlagRequired("envoy-cert-path")
cmd.MarkFlagRequired("ate-apiserver-ca-path")
return cmd
}
2 changes: 1 addition & 1 deletion cmd/kubectl-ate/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Commands for bootstrapping the Substrate control plane and debugging local envir
```bash
# Generate a new CA pool and push it directly to a Kubernetes Secret
kubectl ate admin make-ca-pool \
--name workerpool-ca-certs \
--name podidentity-ca-certs \
--secret-namespace ate-system \
--ca-id "1"

Expand Down
10 changes: 10 additions & 0 deletions internal/credbundle/credbundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ func Loader(path string) func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
}
}

// ClientLoader reads a private key and certificate chain from a credential bundle file as written
// by the Kubernetes Pod Certificates mechanism.
//
// Returns a function that can be used as GetClientCertificate in a tls.Config.
func ClientLoader(path string) func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
return func(_ *tls.CertificateRequestInfo) (*tls.Certificate, error) {
return Parse(path)
}
}

// Parse reads a private key and certificate chain from a credential bundle file as written by the
// Kubernetes Pod Certificates mechanism.
func Parse(bundlePath string) (*tls.Certificate, error) {
Expand Down
104 changes: 41 additions & 63 deletions internal/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,11 @@ package router

import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"log/slog"
"math/big"
"net"
"net/http"
"os"
Expand All @@ -46,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"

"github.com/agent-substrate/substrate/internal/credbundle"
"github.com/agent-substrate/substrate/internal/serverboot"
v1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
Expand All @@ -62,22 +58,24 @@ func init() {

// RouterConfig holds deployment setup and endpoint options for the router node instance.
type RouterConfig struct {
Standalone bool
Namespace string
Kubeconfig string
AteapiAddr string
HttpPort int
XdsPort int
ExtprocPort int
ExtprocAddr string
EnvoyImage string
TemplatesFile string
StatusPort int
HealthInterval time.Duration
HttpsPort int
EnvoyCertPath string
LogLevel string
MetricsAddr string
Standalone bool
Namespace string
Kubeconfig string
AteapiAddr string
HttpPort int
XdsPort int
ExtprocPort int
ExtprocAddr string
EnvoyImage string
TemplatesFile string
StatusPort int
HealthInterval time.Duration
HttpsPort int
EnvoyCertPath string
LogLevel string
MetricsAddr string
AteapiServerCAPath string
AteapiClientCredPath string
}

// RouterServer instantiates and coordinates runtime threads executing system modules.
Expand Down Expand Up @@ -125,7 +123,13 @@ func NewRouterServer(cfg RouterConfig) (*RouterServer, error) {
}
}

conn, err := grpc.NewClient(cfg.AteapiAddr, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})))
tlsCfg, err := buildTLSConfig(cfg)
if err != nil {
slog.Error("Failed to build TLS config", slog.String("err", err.Error()))
return nil, err
}
conn, err := grpc.NewClient(cfg.AteapiAddr, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg)))

if err != nil {
return nil, fmt.Errorf("failed to establish grpc channel to ateapi client: %w", err)
}
Expand Down Expand Up @@ -188,17 +192,7 @@ func (s *RouterServer) Run(ctx context.Context) error {
xdsSrv := NewXdsServer(s.cfg.XdsPort)
xdsSrv.SetConfig(s.cfg.HttpPort, s.cfg.ExtprocPort, s.cfg.ExtprocAddr)

var certContent, keyContent string
if s.cfg.EnvoyCertPath == "" {
slog.InfoContext(ctx, "No Envoy certificate path provided, generating self-signed certificate for testing")
var err error
certContent, keyContent, err = generateSelfSignedCert()
if err != nil {
return fmt.Errorf("failed to generate self-signed cert: %w", err)
}
}

xdsSrv.SetTlsConfig(s.cfg.HttpsPort, s.cfg.EnvoyCertPath, certContent, keyContent)
xdsSrv.SetTlsConfig(s.cfg.HttpsPort, s.cfg.EnvoyCertPath)
if s.extprocSrv == nil {
routeDuration, err := newRouteDurationHistogram()
if err != nil {
Expand Down Expand Up @@ -275,38 +269,22 @@ func (s *RouterServer) Run(ctx context.Context) error {
return g.Wait()
}

func generateSelfSignedCert() (string, string, error) {
priv, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return "", "", err
}

template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
Organization: []string{"Substrate Local Test"},
},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour * 24 * 365),

KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
DNSNames: []string{"localhost"},
func buildTLSConfig(cfg RouterConfig) (*tls.Config, error) {
tlsCfg := &tls.Config{MinVersion: tls.VersionTLS12}
if cfg.AteapiServerCAPath == "" {
return nil, fmt.Errorf("ateapi server CA path not configured")
}

derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv)
caBytes, err := os.ReadFile(cfg.AteapiServerCAPath)
if err != nil {
return "", "", err
return nil, fmt.Errorf("failed to read ateapi server CA bundle: %w", err)
}

certPem := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes})

privBytes, err := x509.MarshalPKCS8PrivateKey(priv)
if err != nil {
return "", "", err
roots := x509.NewCertPool()
if ok := roots.AppendCertsFromPEM(caBytes); !ok {
return nil, fmt.Errorf("failed to parse ateapi server CA bundle from %s", cfg.AteapiServerCAPath)
}
keyPem := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})

return string(certPem), string(keyPem), nil
tlsCfg.RootCAs = roots
if cfg.AteapiClientCredPath != "" {
tlsCfg.GetClientCertificate = credbundle.ClientLoader(cfg.AteapiClientCredPath)
}
return tlsCfg, nil
}
52 changes: 45 additions & 7 deletions internal/router/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@ package router

import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"encoding/pem"
"fmt"
"io"
"math/big"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"testing"
"time"
Expand All @@ -48,13 +56,14 @@ func TestStatuszEndpoint(t *testing.T) {
tmpFile.Close()

cfg := RouterConfig{
Standalone: true,
Namespace: "default",
StatusPort: httpPort,
HttpPort: 8080,
XdsPort: 18000,
ExtprocPort: 50051,
TemplatesFile: tmpFile.Name(),
Standalone: true,
Namespace: "default",
StatusPort: httpPort,
HttpPort: 8080,
XdsPort: 18000,
ExtprocPort: 50051,
TemplatesFile: tmpFile.Name(),
AteapiServerCAPath: writeTestCA(t),
}

srv, err := NewRouterServer(cfg)
Expand Down Expand Up @@ -140,3 +149,32 @@ func TestStatuszEndpoint(t *testing.T) {
t.Errorf("Target parameters unassigned inside context payload context properties: found %s", dashboard.Queries[0].Target)
}
}

// writeTestCA writes a self-signed CA certificate PEM to a temp file, for
// configuring AteapiServerCAPath in tests.
func writeTestCA(t *testing.T) string {
t.Helper()
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
t.Fatalf("generating CA key: %v", err)
}
template := &x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "test-ca"},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: time.Now().Add(time.Hour),
IsCA: true,
BasicConstraintsValid: true,
KeyUsage: x509.KeyUsageCertSign,
}
der, err := x509.CreateCertificate(rand.Reader, template, template, &key.PublicKey, key)
if err != nil {
t.Fatalf("creating CA certificate: %v", err)
}
path := filepath.Join(t.TempDir(), "ca.pem")
pemBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: der})
if err := os.WriteFile(path, pemBytes, 0o600); err != nil {
t.Fatalf("writing CA file: %v", err)
}
return path
}
33 changes: 7 additions & 26 deletions internal/router/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ type XdsServer struct {

mu sync.Mutex

httpsPort int
certPath string
certContent string
keyContent string
httpsPort int
certPath string
}

func NewXdsServer(xdsPort int) *XdsServer {
Expand All @@ -104,13 +102,11 @@ func (x *XdsServer) SetConfig(ingressPort int, extprocPort int, extprocAddr stri
x.extprocAddr = extprocAddr
}

func (x *XdsServer) SetTlsConfig(httpsPort int, certPath string, certContent string, keyContent string) {
func (x *XdsServer) SetTlsConfig(httpsPort int, certPath string) {
x.mu.Lock()
defer x.mu.Unlock()
x.httpsPort = httpsPort
x.certPath = certPath
x.certContent = certContent
x.keyContent = keyContent
}

func (x *XdsServer) UpdateSnapshot() error {
Expand Down Expand Up @@ -453,30 +449,15 @@ func (x *XdsServer) buildHttpsListener() *listenerv3.Listener {
}

func (x *XdsServer) buildTlsCertificate() *tlsv3.TlsCertificate {
if x.certPath != "" {
return &tlsv3.TlsCertificate{
CertificateChain: &corev3.DataSource{
Specifier: &corev3.DataSource_Filename{
Filename: x.certPath,
},
},
PrivateKey: &corev3.DataSource{
Specifier: &corev3.DataSource_Filename{
Filename: x.certPath, // Assuming combined file
},
},
}
}

return &tlsv3.TlsCertificate{
CertificateChain: &corev3.DataSource{
Specifier: &corev3.DataSource_InlineString{
InlineString: x.certContent,
Specifier: &corev3.DataSource_Filename{
Filename: x.certPath,
},
},
PrivateKey: &corev3.DataSource{
Specifier: &corev3.DataSource_InlineString{
InlineString: x.keyContent,
Specifier: &corev3.DataSource_Filename{
Filename: x.certPath,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion internal/router/xds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestXdsServer_UpdateSnapshot(t *testing.T) {
func TestXdsServer_UpdateSnapshot_WithHttps(t *testing.T) {
server := NewXdsServer(18000)
server.SetConfig(8085, 50053, "127.0.0.1")
server.SetTlsConfig(8443, "", "dummy-cert", "dummy-key")
server.SetTlsConfig(8443, "")

err := server.UpdateSnapshot()
if err != nil {
Expand Down
Loading
Loading