From 7f473d241bd182277562fdb8f5c9c253bd55ef99 Mon Sep 17 00:00:00 2001 From: mblos Date: Wed, 17 Jun 2026 13:08:00 +0200 Subject: [PATCH 1/4] feat(cr): pre-allocate PAYG VMs into CR reservation slots on cr creation/modification --- cmd/manager/main.go | 9 +- .../committed_resource_controller.go | 4 + .../commitments/payg_candidates.go | 150 ++++++++ .../commitments/payg_candidates_test.go | 324 ++++++++++++++++++ .../commitments/reservation_manager.go | 148 ++++++++ 5 files changed, 631 insertions(+), 4 deletions(-) create mode 100644 internal/scheduling/reservations/commitments/payg_candidates.go create mode 100644 internal/scheduling/reservations/commitments/payg_candidates_test.go diff --git a/cmd/manager/main.go b/cmd/manager/main.go index d58be2a29..d9eb69883 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -598,10 +598,11 @@ func main() { metrics.Registry.MustRegister(&crControllerMonitor) if err := (&commitments.CommittedResourceController{ - Client: multiclusterClient, - Scheme: mgr.GetScheme(), - Conf: crControllerConf, - Monitor: &crControllerMonitor, + Client: multiclusterClient, + Scheme: mgr.GetScheme(), + Conf: crControllerConf, + Monitor: &crControllerMonitor, + VMSource: commitmentsVMSource, }).SetupWithManager(mgr, multiclusterClient); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CommittedResource") os.Exit(1) diff --git a/internal/scheduling/reservations/commitments/committed_resource_controller.go b/internal/scheduling/reservations/commitments/committed_resource_controller.go index a24e0c9e3..c70f6f0a4 100644 --- a/internal/scheduling/reservations/commitments/committed_resource_controller.go +++ b/internal/scheduling/reservations/commitments/committed_resource_controller.go @@ -38,6 +38,9 @@ type CommittedResourceController struct { Scheme *runtime.Scheme Conf CommittedResourceControllerConfig Monitor *CRControllerMonitor + // VMSource enables PAYG pre-allocation when creating reservation slots. When nil the + // PAYG scan is skipped and all slots are created via blind scheduler probes. + VMSource reservations.VMSource } func (r *CommittedResourceController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -316,6 +319,7 @@ func (r *CommittedResourceController) applyReservationState(ctx context.Context, state.ParentGeneration = cr.Generation mgr := NewReservationManager(r.Client) + mgr.VMSource = r.VMSource mgr.SlotCreationDelay = r.Conf.SlotCreationDelay.Duration if cr.Spec.AllowRejection { mgr.MaxSlots = r.Conf.MaxSlotsPerCommitment diff --git a/internal/scheduling/reservations/commitments/payg_candidates.go b/internal/scheduling/reservations/commitments/payg_candidates.go new file mode 100644 index 000000000..c560b17a9 --- /dev/null +++ b/internal/scheduling/reservations/commitments/payg_candidates.go @@ -0,0 +1,150 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package commitments + +import ( + "context" + "sort" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// PAYGCandidate is an unallocated PAYG VM suitable for pre-allocation into a reservation slot. +type PAYGCandidate struct { + VMID string + HVName string + MemoryMB uint64 + FlavorName string // exact flavor of the VM — used directly as the slot ResourceName +} + +// ScanAZForPaygCandidates returns unallocated PAYG VMs across all HVs in az, grouped by HV name. +// Makes exactly one VMSource call regardless of the number of HVs in the AZ. +// HVs are identified by the label "topology.kubernetes.io/zone". +func ScanAZForPaygCandidates( + ctx context.Context, + k8sClient client.Client, + vmSource reservations.VMSource, + az string, + projectID string, + flavorGroup compute.FlavorGroupFeature, +) (map[string][]PAYGCandidate, error) { + // List all HVs from cache; filter to the target AZ. + var hvList hv1.HypervisorList + if err := k8sClient.List(ctx, &hvList); err != nil { + return nil, err + } + azHVs := make(map[string]*hv1.Hypervisor) + for i := range hvList.Items { + hv := &hvList.Items[i] + if hv.Labels["topology.kubernetes.io/zone"] == az { + azHVs[hv.Name] = hv + } + } + if len(azHVs) == 0 { + return nil, nil + } + + // One Postgres call for all VMs in the project. + projectVMs, err := vmSource.ListVMsByProject(ctx, projectID) + if err != nil { + return nil, err + } + + // Group project VMs by hypervisor for O(1) per-HV lookup. + vmsByHV := make(map[string][]reservations.VM, len(azHVs)) + for _, vm := range projectVMs { + if _, inAZ := azHVs[vm.CurrentHypervisor]; inAZ { + vmsByHV[vm.CurrentHypervisor] = append(vmsByHV[vm.CurrentHypervisor], vm) + } + } + + result := make(map[string][]PAYGCandidate) + for hvName, hv := range azHVs { + candidates, err := filterPaygCandidates(ctx, k8sClient, hvName, hv, vmsByHV[hvName], flavorGroup) + if err != nil { + return nil, err + } + if len(candidates) > 0 { + result[hvName] = candidates + } + } + return result, nil +} + +// filterPaygCandidates returns unallocated PAYG VMs from a pre-fetched list for one HV. +// vms must already be filtered to hvName (by the caller). Used by reuse sites (ticket #410, +// #372) where the caller already holds a pre-fetched VM list. +func filterPaygCandidates( + ctx context.Context, + k8sClient client.Client, + hvName string, + hv *hv1.Hypervisor, + vms []reservations.VM, + flavorGroup compute.FlavorGroupFeature, +) ([]PAYGCandidate, error) { + + if len(vms) == 0 { + return nil, nil + } + + // Build set of active instance UUIDs from the HV CRD for physical-presence check. + activeOnHV := make(map[string]bool, len(hv.Status.Instances)) + for _, inst := range hv.Status.Instances { + if inst.Active { + activeOnHV[inst.ID] = true + } + } + + // Build set of flavor names in the group for O(1) membership check. + flavorNames := make(map[string]uint64, len(flavorGroup.Flavors)) + for _, f := range flavorGroup.Flavors { + flavorNames[f.Name] = f.MemoryMB + } + + var candidates []PAYGCandidate + for _, vm := range vms { + memMB, inGroup := flavorNames[vm.FlavorName] + if !inGroup { + continue + } + if !activeOnHV[vm.UUID] { + continue + } + + // Exclude VMs already claimed by any Reservation.Spec.Allocations. + var allocated v1alpha1.ReservationList + if err := k8sClient.List(ctx, &allocated, + client.MatchingFields{idxReservationByAllocationVMUUID: vm.UUID}, + ); err != nil { + return nil, err + } + if len(allocated.Items) > 0 { + continue + } + + candidates = append(candidates, PAYGCandidate{ + VMID: vm.UUID, + HVName: hvName, + MemoryMB: memMB, + FlavorName: vm.FlavorName, + }) + } + + sortCandidatesDesc(candidates) + return candidates, nil +} + +// sortCandidatesDesc sorts candidates descending by memory, with UUID as a stable tie-break. +func sortCandidatesDesc(candidates []PAYGCandidate) { + sort.Slice(candidates, func(i, j int) bool { + if candidates[i].MemoryMB != candidates[j].MemoryMB { + return candidates[i].MemoryMB > candidates[j].MemoryMB + } + return candidates[i].VMID < candidates[j].VMID + }) +} diff --git a/internal/scheduling/reservations/commitments/payg_candidates_test.go b/internal/scheduling/reservations/commitments/payg_candidates_test.go new file mode 100644 index 000000000..002566c12 --- /dev/null +++ b/internal/scheduling/reservations/commitments/payg_candidates_test.go @@ -0,0 +1,324 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package commitments + +import ( + "context" + "errors" + "testing" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +// ============================================================================ +// Fake VMSource +// ============================================================================ + +type fakeVMSource struct { + vms []reservations.VM + err error +} + +func (f *fakeVMSource) ListVMs(_ context.Context) ([]reservations.VM, error) { + return f.vms, f.err +} +func (f *fakeVMSource) ListVMsByProject(_ context.Context, _ string) ([]reservations.VM, error) { + return f.vms, f.err +} +func (f *fakeVMSource) ListVMsOnHypervisors(_ context.Context, _ *hv1.HypervisorList, _ bool) ([]reservations.VM, error) { + return f.vms, f.err +} +func (f *fakeVMSource) GetVM(_ context.Context, _ string) (*reservations.VM, error) { + return nil, nil +} +func (f *fakeVMSource) IsServerActive(_ context.Context, _ string) (bool, error) { + return false, nil +} +func (f *fakeVMSource) GetDeletedVMInfo(_ context.Context, _ string) (*reservations.DeletedVMInfo, error) { + return nil, nil +} + +// ============================================================================ +// Helpers +// ============================================================================ + +func paygScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + if err := v1alpha1.AddToScheme(s); err != nil { + t.Fatalf("add v1alpha1 scheme: %v", err) + } + if err := hv1.AddToScheme(s); err != nil { + t.Fatalf("add hv1 scheme: %v", err) + } + return s +} + +func hvWithInstances(name, az string, instances ...hv1.Instance) *hv1.Hypervisor { //nolint:unparam + return &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"topology.kubernetes.io/zone": az}, + }, + Status: hv1.HypervisorStatus{Instances: instances}, + } +} + +func activeInstance(id string) hv1.Instance { + return hv1.Instance{ID: id, Name: id, Active: true} +} + +func inactiveInstance(id string) hv1.Instance { + return hv1.Instance{ID: id, Name: id, Active: false} +} + +func vmOnHV(uuid, hvName, flavorName string, memMB uint64) reservations.VM { //nolint:unparam + return reservations.VM{ + UUID: uuid, + FlavorName: flavorName, + CurrentHypervisor: hvName, + Resources: map[string]resource.Quantity{ + "memory": *resource.NewQuantity(int64(memMB)*1024*1024, resource.BinarySI), //nolint:gosec + }, + } +} + +func reservationWithAlloc(name, vmUUID string) *v1alpha1.Reservation { + return &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: v1alpha1.ReservationSpec{ + Type: v1alpha1.ReservationTypeCommittedResource, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ + CommitmentUUID: "other-cr", + Allocations: map[string]v1alpha1.CommittedResourceAllocation{ + vmUUID: {CreationTimestamp: metav1.Now()}, + }, + }, + }, + } +} + +func buildPaygClient(t *testing.T, objs ...client.Object) client.Client { + t.Helper() + return fake.NewClientBuilder(). + WithScheme(paygScheme(t)). + WithObjects(objs...). + WithIndex(&v1alpha1.Reservation{}, idxReservationByAllocationVMUUID, func(obj client.Object) []string { + res, ok := obj.(*v1alpha1.Reservation) + if !ok || res.Spec.CommittedResourceReservation == nil { + return nil + } + var uuids []string + for vmUUID := range res.Spec.CommittedResourceReservation.Allocations { + uuids = append(uuids, vmUUID) + } + return uuids + }). + Build() +} + +// ============================================================================ +// Tests: filterPaygCandidates +// ============================================================================ + +func TestFilterPaygCandidates(t *testing.T) { + fg := testFlavorGroup() // flavors: large=32GiB, medium=16GiB, small=8GiB + + tests := []struct { + name string + hv *hv1.Hypervisor + vms []reservations.VM + extraObjs []client.Object // additional k8s objects (e.g. reservations with allocs) + wantVMIDs []string // expected candidate UUIDs (order independent) + wantCount int + }{ + { + name: "VM matches project and flavor group, not allocated — included", + hv: hvWithInstances("host-1", "az-1", activeInstance("vm-a")), + vms: []reservations.VM{vmOnHV("vm-a", "host-1", "small", 8192)}, + wantVMIDs: []string{"vm-a"}, + }, + { + name: "VM already in Reservation.Spec.Allocations — excluded", + hv: hvWithInstances("host-1", "az-1", activeInstance("vm-a")), + vms: []reservations.VM{vmOnHV("vm-a", "host-1", "small", 8192)}, + extraObjs: []client.Object{reservationWithAlloc("res-1", "vm-a")}, + wantCount: 0, + }, + { + name: "VM flavor not in flavor group — excluded", + hv: hvWithInstances("host-1", "az-1", activeInstance("vm-a")), + vms: []reservations.VM{vmOnHV("vm-a", "host-1", "unknown-flavor", 8192)}, + wantCount: 0, + }, + { + name: "VM not active in HV CRD Status.Instances — excluded", + hv: hvWithInstances("host-1", "az-1", inactiveInstance("vm-a")), + vms: []reservations.VM{vmOnHV("vm-a", "host-1", "small", 8192)}, + wantCount: 0, + }, + { + name: "VM not present in HV CRD instances at all — excluded", + hv: hvWithInstances("host-1", "az-1"), // no instances + vms: []reservations.VM{vmOnHV("vm-a", "host-1", "small", 8192)}, + wantCount: 0, + }, + { + name: "empty VM list — returns empty, no error", + hv: hvWithInstances("host-1", "az-1", activeInstance("vm-a")), + vms: nil, + wantCount: 0, + }, + { + name: "multiple VMs — unallocated included, allocated excluded, sorted descending by memory", + hv: hvWithInstances("host-1", "az-1", + activeInstance("vm-small"), + activeInstance("vm-medium"), + activeInstance("vm-large"), + activeInstance("vm-allocated"), + ), + vms: []reservations.VM{ + vmOnHV("vm-small", "host-1", "small", 8192), + vmOnHV("vm-medium", "host-1", "medium", 16384), + vmOnHV("vm-large", "host-1", "large", 32768), + vmOnHV("vm-allocated", "host-1", "small", 8192), + }, + extraObjs: []client.Object{reservationWithAlloc("res-1", "vm-allocated")}, + wantVMIDs: []string{"vm-large", "vm-medium", "vm-small"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + objs := []client.Object{tc.hv} + objs = append(objs, tc.extraObjs...) + k8sClient := buildPaygClient(t, objs...) + + candidates, err := filterPaygCandidates(context.Background(), k8sClient, tc.hv.Name, tc.hv, tc.vms, fg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if tc.wantVMIDs != nil { + if len(candidates) != len(tc.wantVMIDs) { + t.Fatalf("want %d candidates, got %d: %v", len(tc.wantVMIDs), len(candidates), candidates) + } + for i, want := range tc.wantVMIDs { + if candidates[i].VMID != want { + t.Errorf("candidates[%d]: want VMID %q, got %q", i, want, candidates[i].VMID) + } + } + } else if len(candidates) != tc.wantCount { + t.Errorf("want %d candidates, got %d", tc.wantCount, len(candidates)) + } + }) + } +} + +// ============================================================================ +// Tests: ScanAZForPaygCandidates +// ============================================================================ + +func TestScanAZForPaygCandidates(t *testing.T) { + fg := testFlavorGroup() + + tests := []struct { + name string + hvs []*hv1.Hypervisor + vmSourceVMs []reservations.VM + vmSourceErr error + az string + projectID string + wantHVs []string // HV names expected in result (non-empty candidate lists) + wantErr bool + }{ + { + name: "HV in AZ with matching PAYG VM — returned", + hvs: []*hv1.Hypervisor{hvWithInstances("host-1", "az-1", activeInstance("vm-a"))}, + vmSourceVMs: []reservations.VM{vmOnHV("vm-a", "host-1", "small", 8192)}, + az: "az-1", + projectID: "project-1", + wantHVs: []string{"host-1"}, + }, + { + name: "HV in different AZ — not returned", + hvs: []*hv1.Hypervisor{hvWithInstances("host-1", "az-2", activeInstance("vm-a"))}, + vmSourceVMs: []reservations.VM{vmOnHV("vm-a", "host-1", "small", 8192)}, + az: "az-1", + projectID: "project-1", + wantHVs: nil, + }, + { + name: "VMSource returns error — propagated", + hvs: []*hv1.Hypervisor{hvWithInstances("host-1", "az-1", activeInstance("vm-a"))}, + vmSourceErr: errors.New("db error"), + az: "az-1", + projectID: "project-1", + wantErr: true, + }, + { + name: "no HVs in AZ — returns nil, no error", + hvs: []*hv1.Hypervisor{}, + az: "az-1", + projectID: "project-1", + wantHVs: nil, + }, + { + name: "VMSource returns no VMs — returns empty, no error", + hvs: []*hv1.Hypervisor{hvWithInstances("host-1", "az-1", activeInstance("vm-a"))}, + vmSourceVMs: nil, + az: "az-1", + projectID: "project-1", + wantHVs: nil, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + objs := make([]client.Object, len(tc.hvs)) + for i, hv := range tc.hvs { + objs[i] = hv + } + k8sClient := buildPaygClient(t, objs...) + vmSource := &fakeVMSource{vms: tc.vmSourceVMs, err: tc.vmSourceErr} + + result, err := ScanAZForPaygCandidates(context.Background(), k8sClient, vmSource, tc.az, tc.projectID, fg) + if tc.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + for _, hvName := range tc.wantHVs { + if _, ok := result[hvName]; !ok { + t.Errorf("expected candidates for HV %q, not present in result", hvName) + } + } + // No unexpected HVs. + for hvName := range result { + found := false + for _, want := range tc.wantHVs { + if want == hvName { + found = true + break + } + } + if !found { + t.Errorf("unexpected HV %q in result", hvName) + } + } + }) + } +} diff --git a/internal/scheduling/reservations/commitments/reservation_manager.go b/internal/scheduling/reservations/commitments/reservation_manager.go index 75c3cda7f..18187bd98 100644 --- a/internal/scheduling/reservations/commitments/reservation_manager.go +++ b/internal/scheduling/reservations/commitments/reservation_manager.go @@ -10,6 +10,7 @@ import ( "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" "github.com/go-logr/logr" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -50,6 +51,9 @@ func (e *SlotLimitExceededError) Error() string { // ReservationManager handles CRUD operations for Reservation CRDs. type ReservationManager struct { client.Client + // VMSource enables PAYG pre-allocation (Phase 4.5). When nil the PAYG scan is skipped + // and all slots are created via the blind scheduler path (Phase 5). + VMSource reservations.VMSource // SlotCreationDelay adds a pause between consecutive Reservation CRD creates to spread // scheduler load across time rather than bursting all creates at once. SlotCreationDelay time.Duration @@ -194,6 +198,74 @@ func (m *ReservationManager) ApplyCommitmentState( } } + // Phase 4.5 (PAYG PRE-ALLOCATE): absorb existing PAYG VMs into pre-populated slots. + // Runs only when VMSource is configured. Makes one Postgres call for all HVs in the AZ, + // then creates one slot per PAYG VM (largest-first), consuming the delta. + // Any remaining delta falls through to Phase 5 (blind scheduler). + if m.VMSource != nil && deltaMemoryBytes > 0 && desiredState.AvailabilityZone != "" { + scanStart := time.Now() + candidatesByHV, scanErr := ScanAZForPaygCandidates( + ctx, m.Client, m.VMSource, + desiredState.AvailabilityZone, desiredState.ProjectID, flavorGroup, + ) + if scanErr != nil { + log.Error(scanErr, "PAYG candidate scan failed, falling back to blind scheduling", + "durationMs", time.Since(scanStart).Milliseconds()) + } else { + // Flatten all candidates across HVs and sort descending by memory, then stable by UUID. + var allCandidates []PAYGCandidate + for _, hvCandidates := range candidatesByHV { + allCandidates = append(allCandidates, hvCandidates...) + } + sortCandidatesDesc(allCandidates) + + log.Info("PAYG candidate scan complete", + "candidates", len(allCandidates), + "durationMs", time.Since(scanStart).Milliseconds(), + ) + + for _, candidate := range allCandidates { + if deltaMemoryBytes <= 0 { + break + } + slotMemoryMB := candidate.MemoryMB + if int64(slotMemoryMB)*1024*1024 > deltaMemoryBytes { //nolint:gosec + slotMemoryMB = uint64(deltaMemoryBytes) / (1024 * 1024) + } + reservation := m.newPaygReservation(desiredState, nextSlotIndex, candidate, slotMemoryMB, flavorGroup, creator) + slotMemoryBytes := int64(slotMemoryMB) * 1024 * 1024 //nolint:gosec + deltaMemoryBytes -= slotMemoryBytes + result.Created++ + result.TouchedReservations = append(result.TouchedReservations, *reservation) + + if err := m.Create(ctx, reservation); err != nil { + if apierrors.IsAlreadyExists(err) { + return result, fmt.Errorf("reservation %s already exists (collision detected): %w", reservation.Name, err) + } + return result, fmt.Errorf("failed to create PAYG reservation slot %d: %w", nextSlotIndex, err) + } + log.Info("created PAYG pre-allocated reservation slot", + "slot", reservation.Name, + "host", candidate.HVName, + "vm", candidate.VMID, + "flavorName", candidate.FlavorName, + "slotMemoryMB", slotMemoryMB, + ) + nextSlotIndex++ + + if m.SlotCreationDelay > 0 && deltaMemoryBytes > 0 { + timer := time.NewTimer(m.SlotCreationDelay) + select { + case <-ctx.Done(): + timer.Stop() + return result, ctx.Err() + case <-timer.C: + } + } + } + } + } + // Phase 5 (CREATE): Create new reservations (capacity increased) if deltaMemoryBytes > 0 { newSlots := countNewSlots(deltaMemoryBytes, flavorGroup) @@ -415,3 +487,79 @@ func (m *ReservationManager) newReservation( Spec: spec, } } + +// newPaygReservation creates a Reservation pre-populated with a PAYG VM. +// The slot uses the candidate's exact flavor name and the given slotMemoryMB (which may be +// less than candidate.MemoryMB when the remaining CR delta is smaller than the VM's memory). +// VCPUs are taken from the flavor group to keep the slot spec consistent. +func (m *ReservationManager) newPaygReservation( + state *CommitmentState, + slotIndex int, + candidate PAYGCandidate, + slotMemoryMB uint64, + flavorGroup compute.FlavorGroupFeature, + creator string, +) *v1alpha1.Reservation { + + namePrefix := state.NamePrefix + if namePrefix == "" { + namePrefix = fmt.Sprintf("commitment-%s-", state.CommitmentUUID) + } + name := fmt.Sprintf("%s%d", namePrefix, slotIndex) + + // Look up VCPUs from the flavor group for the candidate flavor. + var cpus int64 + for _, f := range flavorGroup.Flavors { + if f.Name == candidate.FlavorName { + cpus = int64(f.VCPUs) //nolint:gosec // VCPUs from flavor specs, realistically bounded + break + } + } + + memoryBytes := int64(slotMemoryMB) * 1024 * 1024 //nolint:gosec // bounded by CR amount + spec := v1alpha1.ReservationSpec{ + Type: v1alpha1.ReservationTypeCommittedResource, + SchedulingDomain: v1alpha1.SchedulingDomainNova, + TargetHost: candidate.HVName, + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(memoryBytes, resource.BinarySI), + hv1.ResourceCPU: *resource.NewQuantity(cpus, resource.DecimalSI), + }, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ + ProjectID: state.ProjectID, + CommitmentUUID: state.CommitmentUUID, + DomainID: state.DomainID, + ResourceGroup: state.FlavorGroupName, + ResourceName: candidate.FlavorName, + Creator: creator, + ParentGeneration: state.ParentGeneration, + Allocations: map[string]v1alpha1.CommittedResourceAllocation{ + candidate.VMID: { + CreationTimestamp: metav1.Now(), + }, + }, + }, + } + if state.AvailabilityZone != "" { + spec.AvailabilityZone = state.AvailabilityZone + } + if state.StartTime != nil { + spec.StartTime = &metav1.Time{Time: *state.StartTime} + } + if state.EndTime != nil { + spec.EndTime = &metav1.Time{Time: *state.EndTime} + } + + return &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, + }, + Annotations: map[string]string{ + v1alpha1.AnnotationCreatorRequestID: state.CreatorRequestID, + }, + }, + Spec: spec, + } +} From 483c6438f4623c8da8adc454779ba2dfa46c027d Mon Sep 17 00:00:00 2001 From: mblos Date: Wed, 17 Jun 2026 13:27:54 +0200 Subject: [PATCH 2/4] refactor --- .../commitments/payg_candidates.go | 67 +++++++++++++------ .../commitments/payg_candidates_test.go | 22 +++--- .../commitments/reservation_manager.go | 3 +- 3 files changed, 57 insertions(+), 35 deletions(-) diff --git a/internal/scheduling/reservations/commitments/payg_candidates.go b/internal/scheduling/reservations/commitments/payg_candidates.go index c560b17a9..ed0ed6cc9 100644 --- a/internal/scheduling/reservations/commitments/payg_candidates.go +++ b/internal/scheduling/reservations/commitments/payg_candidates.go @@ -23,7 +23,10 @@ type PAYGCandidate struct { } // ScanAZForPaygCandidates returns unallocated PAYG VMs across all HVs in az, grouped by HV name. -// Makes exactly one VMSource call regardless of the number of HVs in the AZ. +// Makes exactly two cache/Postgres calls regardless of HV or VM count: +// 1. List all CR Reservations → build allocated VM UUID set from Spec.Allocations +// 2. VMSource.ListVMsByProject → enriched VM data for the project +// // HVs are identified by the label "topology.kubernetes.io/zone". func ScanAZForPaygCandidates( ctx context.Context, @@ -49,6 +52,13 @@ func ScanAZForPaygCandidates( return nil, nil } + // One cache scan: build the set of VM UUIDs already claimed by any CR Reservation. + // Spec.Allocations is the scheduling perspective — exactly what we need here. + allocatedVMIDs, err := buildAllocatedVMSet(ctx, k8sClient, az) + if err != nil { + return nil, err + } + // One Postgres call for all VMs in the project. projectVMs, err := vmSource.ListVMsByProject(ctx, projectID) if err != nil { @@ -65,10 +75,7 @@ func ScanAZForPaygCandidates( result := make(map[string][]PAYGCandidate) for hvName, hv := range azHVs { - candidates, err := filterPaygCandidates(ctx, k8sClient, hvName, hv, vmsByHV[hvName], flavorGroup) - if err != nil { - return nil, err - } + candidates := filterPaygCandidates(hvName, hv, vmsByHV[hvName], flavorGroup, allocatedVMIDs) if len(candidates) > 0 { result[hvName] = candidates } @@ -76,20 +83,45 @@ func ScanAZForPaygCandidates( return result, nil } +// buildAllocatedVMSet lists CR Reservations in az and returns the set of VM UUIDs present +// in any Spec.Allocations. Filtering by AZ keeps the set small — only slots relevant +// to the current scan are included. One cache scan shared across all HV filters. +func buildAllocatedVMSet(ctx context.Context, k8sClient client.Client, az string) (map[string]struct{}, error) { + var resList v1alpha1.ReservationList + if err := k8sClient.List(ctx, &resList, + client.MatchingLabels{v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource}, + ); err != nil { + return nil, err + } + allocated := make(map[string]struct{}) + for _, res := range resList.Items { + if res.Spec.AvailabilityZone != az { + continue + } + if res.Spec.CommittedResourceReservation == nil { + continue + } + for vmUUID := range res.Spec.CommittedResourceReservation.Allocations { + allocated[vmUUID] = struct{}{} + } + } + return allocated, nil +} + // filterPaygCandidates returns unallocated PAYG VMs from a pre-fetched list for one HV. -// vms must already be filtered to hvName (by the caller). Used by reuse sites (ticket #410, -// #372) where the caller already holds a pre-fetched VM list. +// vms must already be filtered to hvName by the caller. +// allocatedVMIDs is a pre-built set of UUIDs already claimed by any CR Reservation.Spec.Allocations. +// Used by reuse sites (ticket #410, #372) where the caller already holds VM data and the allocated set. func filterPaygCandidates( - ctx context.Context, - k8sClient client.Client, hvName string, hv *hv1.Hypervisor, vms []reservations.VM, flavorGroup compute.FlavorGroupFeature, -) ([]PAYGCandidate, error) { + allocatedVMIDs map[string]struct{}, +) []PAYGCandidate { if len(vms) == 0 { - return nil, nil + return nil } // Build set of active instance UUIDs from the HV CRD for physical-presence check. @@ -115,18 +147,9 @@ func filterPaygCandidates( if !activeOnHV[vm.UUID] { continue } - - // Exclude VMs already claimed by any Reservation.Spec.Allocations. - var allocated v1alpha1.ReservationList - if err := k8sClient.List(ctx, &allocated, - client.MatchingFields{idxReservationByAllocationVMUUID: vm.UUID}, - ); err != nil { - return nil, err - } - if len(allocated.Items) > 0 { + if _, isAllocated := allocatedVMIDs[vm.UUID]; isAllocated { continue } - candidates = append(candidates, PAYGCandidate{ VMID: vm.UUID, HVName: hvName, @@ -136,7 +159,7 @@ func filterPaygCandidates( } sortCandidatesDesc(candidates) - return candidates, nil + return candidates } // sortCandidatesDesc sorts candidates descending by memory, with UUID as a stable tie-break. diff --git a/internal/scheduling/reservations/commitments/payg_candidates_test.go b/internal/scheduling/reservations/commitments/payg_candidates_test.go index 002566c12..a607d51cf 100644 --- a/internal/scheduling/reservations/commitments/payg_candidates_test.go +++ b/internal/scheduling/reservations/commitments/payg_candidates_test.go @@ -136,8 +136,8 @@ func TestFilterPaygCandidates(t *testing.T) { name string hv *hv1.Hypervisor vms []reservations.VM - extraObjs []client.Object // additional k8s objects (e.g. reservations with allocs) - wantVMIDs []string // expected candidate UUIDs (order independent) + extraObjs []*v1alpha1.Reservation // reservations with pre-existing allocations + wantVMIDs []string // expected candidate UUIDs in order (largest-first) wantCount int }{ { @@ -150,7 +150,7 @@ func TestFilterPaygCandidates(t *testing.T) { name: "VM already in Reservation.Spec.Allocations — excluded", hv: hvWithInstances("host-1", "az-1", activeInstance("vm-a")), vms: []reservations.VM{vmOnHV("vm-a", "host-1", "small", 8192)}, - extraObjs: []client.Object{reservationWithAlloc("res-1", "vm-a")}, + extraObjs: []*v1alpha1.Reservation{reservationWithAlloc("res-1", "vm-a")}, wantCount: 0, }, { @@ -191,22 +191,22 @@ func TestFilterPaygCandidates(t *testing.T) { vmOnHV("vm-large", "host-1", "large", 32768), vmOnHV("vm-allocated", "host-1", "small", 8192), }, - extraObjs: []client.Object{reservationWithAlloc("res-1", "vm-allocated")}, + extraObjs: []*v1alpha1.Reservation{reservationWithAlloc("res-1", "vm-allocated")}, wantVMIDs: []string{"vm-large", "vm-medium", "vm-small"}, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - objs := []client.Object{tc.hv} - objs = append(objs, tc.extraObjs...) - k8sClient := buildPaygClient(t, objs...) - - candidates, err := filterPaygCandidates(context.Background(), k8sClient, tc.hv.Name, tc.hv, tc.vms, fg) - if err != nil { - t.Fatalf("unexpected error: %v", err) + allocatedVMIDs := make(map[string]struct{}) + for _, res := range tc.extraObjs { + for vmUUID := range res.Spec.CommittedResourceReservation.Allocations { + allocatedVMIDs[vmUUID] = struct{}{} + } } + candidates := filterPaygCandidates(tc.hv.Name, tc.hv, tc.vms, fg, allocatedVMIDs) + if tc.wantVMIDs != nil { if len(candidates) != len(tc.wantVMIDs) { t.Fatalf("want %d candidates, got %d: %v", len(tc.wantVMIDs), len(candidates), candidates) diff --git a/internal/scheduling/reservations/commitments/reservation_manager.go b/internal/scheduling/reservations/commitments/reservation_manager.go index 18187bd98..cab10c7c3 100644 --- a/internal/scheduling/reservations/commitments/reservation_manager.go +++ b/internal/scheduling/reservations/commitments/reservation_manager.go @@ -383,9 +383,8 @@ func (m *ReservationManager) syncReservationMetadata( } return reservation, nil - } else { - return nil, nil // No changes needed } + return nil, nil // No changes needed } // selectFlavor picks the largest flavor whose memory fits within deltaMemoryBytes. From 04f075cfecc811d93e9363136898ecdf37d115b6 Mon Sep 17 00:00:00 2001 From: mblos Date: Wed, 17 Jun 2026 13:36:13 +0200 Subject: [PATCH 3/4] testing --- .../commitments/integration_test.go | 119 +++++++++-- .../commitments/payg_candidates.go | 4 +- .../commitments/reservation_manager_test.go | 200 ++++++++++++++++++ 3 files changed, 305 insertions(+), 18 deletions(-) diff --git a/internal/scheduling/reservations/commitments/integration_test.go b/internal/scheduling/reservations/commitments/integration_test.go index 8c51a8162..3cc15f4d9 100644 --- a/internal/scheduling/reservations/commitments/integration_test.go +++ b/internal/scheduling/reservations/commitments/integration_test.go @@ -29,6 +29,7 @@ import ( schedulerdelegationapi "github.com/cobaltcore-dev/cortex/api/external/nova" "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" @@ -55,6 +56,9 @@ type CRIntegrationTestCase struct { // CRs to create and drive to terminal state. CommittedResources []*v1alpha1.CommittedResource + // When set, the CR controller is given a VMSource for PAYG pre-allocation. + VMSource reservations.VMSource + // When true the mock scheduler returns an empty hosts list (NoHostsFound). SchedulerRejects bool // SchedulerAcceptFirst, when > 0, makes the mock scheduler accept only the first N @@ -63,12 +67,13 @@ type CRIntegrationTestCase struct { SchedulerAcceptFirst int // Expected state after all CRs reach a terminal condition. - ExpectedSlots int // total Reservation CRDs remaining in the store - AcceptedCRs []string // CRs expected Ready=True / Accepted - RejectedCRs []string // CRs expected Ready=False / Rejected - PlannedCRs []string // CRs expected Ready=False / Planned - ExpiredCRs []string // CRs expected Ready=False / Expired - SupersededCRs []string // CRs expected Ready=False / Superseded + ExpectedSlots int // total Reservation CRDs remaining in the store + AcceptedCRs []string // CRs expected Ready=True / Accepted + RejectedCRs []string // CRs expected Ready=False / Rejected + PlannedCRs []string // CRs expected Ready=False / Planned + ExpiredCRs []string // CRs expected Ready=False / Expired + SupersededCRs []string // CRs expected Ready=False / Superseded + ValidateReservations func(t *testing.T, slots []v1alpha1.Reservation) // optional extra assertions } func TestCRIntegration(t *testing.T) { @@ -258,6 +263,70 @@ func TestCRIntegration(t *testing.T) { ExpectedSlots: 0, RejectedCRs: []string{"cr-partial"}, }, + // ------------------------------------------------------------------ + // PAYG pre-allocation + // ------------------------------------------------------------------ + { + // PAYG VM on host-1 matches the CR project + flavor group. + // CR controller pre-allocates the slot; reservation controller marks it + // Ready via the PreAllocated fast-path without calling the scheduler. + Name: "PAYG VM present: slot pre-allocated on HV, no scheduler call needed", + Hypervisors: []*hv1.Hypervisor{ + intgHypervisorWithAZ("host-1", "test-az", "vm-payg-1"), + }, + VMSource: &fakeVMSource{vms: []reservations.VM{{ + UUID: "vm-payg-1", + FlavorName: "test-flavor", + CurrentHypervisor: "host-1", + }}}, + CommittedResources: []*v1alpha1.CommittedResource{ + intgCR("cr-payg", "uuid-intg-payg-1", v1alpha1.CommitmentStatusConfirmed), + }, + SchedulerRejects: true, // scheduler would reject if called — proves it isn't + ExpectedSlots: 1, + AcceptedCRs: []string{"cr-payg"}, + ValidateReservations: func(t *testing.T, slots []v1alpha1.Reservation) { + t.Helper() + if len(slots) != 1 { + t.Fatalf("want 1 slot, got %d", len(slots)) + } + res := slots[0] + if res.Spec.TargetHost != "host-1" { + t.Errorf("TargetHost: want host-1, got %q", res.Spec.TargetHost) + } + if res.Spec.CommittedResourceReservation == nil { + t.Fatal("CommittedResourceReservation is nil") + } + if _, ok := res.Spec.CommittedResourceReservation.Allocations["vm-payg-1"]; !ok { + t.Error("expected vm-payg-1 in Spec.Allocations") + } + }, + }, + { + // No PAYG VMs → falls through to the scheduler. Scheduler accepts → CR accepted. + Name: "no PAYG VMs: falls back to scheduler, CR accepted normally", + Hypervisors: []*hv1.Hypervisor{ + intgHypervisorWithAZ("host-1", "test-az"), + }, + VMSource: &fakeVMSource{vms: nil}, + CommittedResources: []*v1alpha1.CommittedResource{ + intgCR("cr-nopayg", "uuid-intg-payg-2", v1alpha1.CommitmentStatusConfirmed), + }, + ExpectedSlots: 1, + AcceptedCRs: []string{"cr-nopayg"}, + ValidateReservations: func(t *testing.T, slots []v1alpha1.Reservation) { + t.Helper() + if len(slots) != 1 { + t.Fatalf("want 1 slot, got %d", len(slots)) + } + if slots[0].Spec.TargetHost == "" { + t.Error("expected TargetHost set by scheduler (Phase 5 path)") + } + if len(slots[0].Spec.CommittedResourceReservation.Allocations) != 0 { + t.Error("expected no pre-allocations on scheduler-placed slot") + } + }, + }, } for _, tc := range testCases { @@ -290,7 +359,7 @@ func runCRIntegrationTestCase(t *testing.T, tc CRIntegrationTestCase) { objects = append(objects, res) } - env := newIntgEnv(t, objects, schedulerFn) + env := newIntgEnv(t, objects, schedulerFn, tc.VMSource) defer env.close() crNames := make([]string, len(tc.CommittedResources)) @@ -320,6 +389,10 @@ func runCRIntegrationTestCase(t *testing.T, tc CRIntegrationTestCase) { intgAssertCRCondition(t, env.k8sClient, tc.PlannedCRs, metav1.ConditionFalse, v1alpha1.CommittedResourceReasonPlanned) intgAssertCRCondition(t, env.k8sClient, tc.ExpiredCRs, metav1.ConditionFalse, string(v1alpha1.CommitmentStatusExpired)) intgAssertCRCondition(t, env.k8sClient, tc.SupersededCRs, metav1.ConditionFalse, string(v1alpha1.CommitmentStatusSuperseded)) + + if tc.ValidateReservations != nil { + tc.ValidateReservations(t, resList.Items) + } } // ============================================================================ @@ -333,7 +406,7 @@ type intgEnv struct { schedulerSrv *httptest.Server } -func newIntgEnv(t *testing.T, initialObjects []client.Object, schedulerFn http.HandlerFunc) *intgEnv { +func newIntgEnv(t *testing.T, initialObjects []client.Object, schedulerFn http.HandlerFunc, vmSource reservations.VMSource) *intgEnv { t.Helper() scheme := newCRTestScheme(t) @@ -375,9 +448,10 @@ func newIntgEnv(t *testing.T, initialObjects []client.Object, schedulerFn http.H schedulerSrv := httptest.NewServer(schedulerFn) crCtrl := &CommittedResourceController{ - Client: k8sClient, - Scheme: scheme, - Conf: CommittedResourceControllerConfig{RequeueIntervalRetry: metav1.Duration{Duration: 5 * time.Minute}}, + Client: k8sClient, + Scheme: scheme, + Conf: CommittedResourceControllerConfig{RequeueIntervalRetry: metav1.Duration{Duration: 5 * time.Minute}}, + VMSource: vmSource, } resCtrl := &CommitmentReservationController{ Client: k8sClient, @@ -399,7 +473,7 @@ func (e *intgEnv) close() { e.schedulerSrv.Close() } func newDefaultIntgEnv(t *testing.T) *intgEnv { t.Helper() objects := []client.Object{newTestFlavorKnowledge(), intgHypervisor("host-1")} - return newIntgEnv(t, objects, intgAcceptScheduler) + return newIntgEnv(t, objects, intgAcceptScheduler, nil) } func (e *intgEnv) reconcileCR(t *testing.T, crName string) { @@ -630,6 +704,21 @@ func intgHypervisor(name string) *hv1.Hypervisor { return &hv1.Hypervisor{ObjectMeta: metav1.ObjectMeta{Name: name}} } +// intgHypervisorWithAZ returns a Hypervisor in the given AZ with optional active instances. +func intgHypervisorWithAZ(name, az string, instanceIDs ...string) *hv1.Hypervisor { + instances := make([]hv1.Instance, len(instanceIDs)) + for i, id := range instanceIDs { + instances[i] = hv1.Instance{ID: id, Name: id, Active: true} + } + return &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"topology.kubernetes.io/zone": az}, + }, + Status: hv1.HypervisorStatus{Instances: instances}, + } +} + // intgCR returns a CommittedResource with the default 4 GiB amount. // commitmentUUID must be unique per test case to avoid field-index collisions. func intgCR(name, commitmentUUID string, state v1alpha1.CommitmentStatus) *v1alpha1.CommittedResource { @@ -886,7 +975,7 @@ func TestCRLifecycle(t *testing.T) { }) t.Run("AllowRejection=false: stays Reserving when scheduler rejects", func(t *testing.T) { - env := newIntgEnv(t, []client.Object{newTestFlavorKnowledge(), intgHypervisor("host-1")}, intgRejectScheduler) + env := newIntgEnv(t, []client.Object{newTestFlavorKnowledge(), intgHypervisor("host-1")}, intgRejectScheduler, nil) defer env.close() cr := newTestCommittedResource("my-cr", v1alpha1.CommitmentStatusConfirmed) @@ -995,7 +1084,7 @@ func TestCRLifecycle(t *testing.T) { t.Run("resize failure: rolls back to AcceptedSpec, prior slot preserved", func(t *testing.T) { // Scheduler: accepts the first placement call (initial 4 GiB slot), rejects all subsequent. objects := []client.Object{newTestFlavorKnowledge(), intgHypervisor("host-1")} - env := newIntgEnv(t, objects, intgAcceptFirstScheduler(1)) + env := newIntgEnv(t, objects, intgAcceptFirstScheduler(1), nil) defer env.close() cr := intgCRAllowRejection("my-cr", "uuid-resize-0001", v1alpha1.CommitmentStatusConfirmed) @@ -1060,7 +1149,7 @@ func TestCRLifecycle(t *testing.T) { // then accepts all subsequent. AllowRejection=false means the CR controller retries rather // than rejecting, so the CR must eventually reach Accepted once the scheduler cooperates. objects := []client.Object{newTestFlavorKnowledge(), intgHypervisor("host-1")} - env := newIntgEnv(t, objects, intgRejectFirstScheduler(2)) + env := newIntgEnv(t, objects, intgRejectFirstScheduler(2), nil) defer env.close() cr := newTestCommittedResource("my-cr", v1alpha1.CommitmentStatusConfirmed) diff --git a/internal/scheduling/reservations/commitments/payg_candidates.go b/internal/scheduling/reservations/commitments/payg_candidates.go index ed0ed6cc9..b7acc6558 100644 --- a/internal/scheduling/reservations/commitments/payg_candidates.go +++ b/internal/scheduling/reservations/commitments/payg_candidates.go @@ -36,7 +36,7 @@ func ScanAZForPaygCandidates( projectID string, flavorGroup compute.FlavorGroupFeature, ) (map[string][]PAYGCandidate, error) { - // List all HVs from cache; filter to the target AZ. + var hvList hv1.HypervisorList if err := k8sClient.List(ctx, &hvList); err != nil { return nil, err @@ -59,13 +59,11 @@ func ScanAZForPaygCandidates( return nil, err } - // One Postgres call for all VMs in the project. projectVMs, err := vmSource.ListVMsByProject(ctx, projectID) if err != nil { return nil, err } - // Group project VMs by hypervisor for O(1) per-HV lookup. vmsByHV := make(map[string][]reservations.VM, len(azHVs)) for _, vm := range projectVMs { if _, inAZ := azHVs[vm.CurrentHypervisor]; inAZ { diff --git a/internal/scheduling/reservations/commitments/reservation_manager_test.go b/internal/scheduling/reservations/commitments/reservation_manager_test.go index d890bd496..fceca7970 100644 --- a/internal/scheduling/reservations/commitments/reservation_manager_test.go +++ b/internal/scheduling/reservations/commitments/reservation_manager_test.go @@ -9,6 +9,7 @@ import ( "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/resource" @@ -430,6 +431,205 @@ func TestApplyCommitmentState(t *testing.T) { } } +// ============================================================================ +// Tests: ApplyCommitmentState — PAYG pre-allocation (Phase 4.5) +// ============================================================================ + +func TestApplyCommitmentState_PAYG(t *testing.T) { + const ( + az = "test-az" + projectID = "project-1" + hvName = "host-1" + vmUUID = "vm-payg-1" + ) + fg := testFlavorGroup() // small=8GiB, medium=16GiB, large=32GiB + + // hvWithAZ returns an HV in az with one active instance. + hvWithAZ := func(name string, instanceIDs ...string) *hv1.Hypervisor { + instances := make([]hv1.Instance, len(instanceIDs)) + for i, id := range instanceIDs { + instances[i] = hv1.Instance{ID: id, Name: id, Active: true} + } + return &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"topology.kubernetes.io/zone": az}, + }, + Status: hv1.HypervisorStatus{Instances: instances}, + } + } + + // paygVM returns a VM matching the test flavor group on hvName. + paygVM := func(uuid, flavorName string) reservations.VM { + return reservations.VM{ + UUID: uuid, + FlavorName: flavorName, + CurrentHypervisor: hvName, + } + } + + tests := []struct { + name string + hypervisors []*hv1.Hypervisor + paygVMs []reservations.VM // returned by fake VMSource + existingSlots []v1alpha1.Reservation + desiredMemoryGiB int64 + validateTouched func(t *testing.T, touched []v1alpha1.Reservation) + }{ + { + name: "PAYG VM found — slot created pre-allocated with TargetHost and Allocations", + hypervisors: []*hv1.Hypervisor{hvWithAZ(hvName, vmUUID)}, + paygVMs: []reservations.VM{paygVM(vmUUID, "small")}, + desiredMemoryGiB: 8, + validateTouched: func(t *testing.T, touched []v1alpha1.Reservation) { + if len(touched) != 1 { + t.Fatalf("want 1 slot, got %d", len(touched)) + } + res := touched[0] + if res.Spec.TargetHost != hvName { + t.Errorf("TargetHost: want %q, got %q", hvName, res.Spec.TargetHost) + } + if res.Spec.CommittedResourceReservation == nil { + t.Fatal("CommittedResourceReservation is nil") + } + if _, ok := res.Spec.CommittedResourceReservation.Allocations[vmUUID]; !ok { + t.Errorf("expected vm %q in Spec.Allocations", vmUUID) + } + if res.Spec.CommittedResourceReservation.ResourceName != "small" { + t.Errorf("ResourceName: want %q, got %q", "small", res.Spec.CommittedResourceReservation.ResourceName) + } + }, + }, + { + name: "no PAYG VMs — falls through to Phase 5 (no TargetHost set)", + hypervisors: []*hv1.Hypervisor{hvWithAZ(hvName)}, // HV has no instances + paygVMs: nil, + desiredMemoryGiB: 8, + validateTouched: func(t *testing.T, touched []v1alpha1.Reservation) { + if len(touched) != 1 { + t.Fatalf("want 1 slot from Phase 5, got %d", len(touched)) + } + if touched[0].Spec.TargetHost != "" { + t.Errorf("expected no TargetHost for blind slot, got %q", touched[0].Spec.TargetHost) + } + if len(touched[0].Spec.CommittedResourceReservation.Allocations) != 0 { + t.Error("expected no pre-allocations for blind slot") + } + }, + }, + { + name: "PAYG VM already allocated — excluded, slot goes to Phase 5", + hypervisors: []*hv1.Hypervisor{hvWithAZ(hvName, vmUUID)}, + paygVMs: []reservations.VM{paygVM(vmUUID, "small")}, + existingSlots: []v1alpha1.Reservation{ + // Different commitment UUID so Phase 1 doesn't count it against our delta. + func() v1alpha1.Reservation { + s := withAZ(newTestCRSlot("other-cr-0", 8, hvName, "test-group", + map[string]v1alpha1.CommittedResourceAllocation{vmUUID: {}}), az) + s.Spec.CommittedResourceReservation.CommitmentUUID = "other-uuid" + return s + }(), + }, + desiredMemoryGiB: 8, + validateTouched: func(t *testing.T, touched []v1alpha1.Reservation) { + if len(touched) != 1 { + t.Fatalf("want 1 slot, got %d", len(touched)) + } + if touched[0].Spec.TargetHost != "" { + t.Errorf("expected Phase 5 blind slot (no TargetHost), got %q", touched[0].Spec.TargetHost) + } + }, + }, + { + name: "PAYG VM larger than remaining delta — slot undersized, VM pre-allocated", + hypervisors: []*hv1.Hypervisor{hvWithAZ(hvName, vmUUID)}, + paygVMs: []reservations.VM{paygVM(vmUUID, "small")}, + // delta = 4 GiB < VM 8 GiB — undersize path + desiredMemoryGiB: 4, + validateTouched: func(t *testing.T, touched []v1alpha1.Reservation) { + if len(touched) != 1 { + t.Fatalf("want 1 slot, got %d", len(touched)) + } + res := touched[0] + if res.Spec.TargetHost != hvName { + t.Errorf("TargetHost: want %q, got %q", hvName, res.Spec.TargetHost) + } + if _, ok := res.Spec.CommittedResourceReservation.Allocations[vmUUID]; !ok { + t.Errorf("expected vm %q in Spec.Allocations", vmUUID) + } + wantMem := int64(4) * 1024 * 1024 * 1024 + memQty := res.Spec.Resources[hv1.ResourceMemory] + gotMem := memQty.Value() + if gotMem != wantMem { + t.Errorf("slot memory: want %d, got %d", wantMem, gotMem) + } + }, + }, + { + name: "PAYG covers part of delta — remaining goes to Phase 5", + hypervisors: []*hv1.Hypervisor{hvWithAZ(hvName, vmUUID)}, + paygVMs: []reservations.VM{paygVM(vmUUID, "small")}, + // delta = 16 GiB; PAYG covers 8, remaining 8 → Phase 5 + desiredMemoryGiB: 16, + validateTouched: func(t *testing.T, touched []v1alpha1.Reservation) { + if len(touched) != 2 { + t.Fatalf("want 2 slots, got %d", len(touched)) + } + var preAllocated, blind int + for _, res := range touched { + if res.Spec.TargetHost != "" { + preAllocated++ + } else { + blind++ + } + } + if preAllocated != 1 { + t.Errorf("want 1 pre-allocated slot, got %d", preAllocated) + } + if blind != 1 { + t.Errorf("want 1 blind slot, got %d", blind) + } + }, + }, + } + + scheme := newCRTestScheme(t) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + objects := make([]client.Object, 0, len(tt.hypervisors)+len(tt.existingSlots)) + for _, hv := range tt.hypervisors { + objects = append(objects, hv) + } + for i := range tt.existingSlots { + objects = append(objects, &tt.existingSlots[i]) + } + k8sClient := newCRTestClient(scheme, objects...) + + mgr := NewReservationManager(k8sClient) + mgr.VMSource = &fakeVMSource{vms: tt.paygVMs} + + desiredState := &CommitmentState{ + CommitmentUUID: "abc123", + ProjectID: projectID, + FlavorGroupName: "test-group", + TotalMemoryBytes: tt.desiredMemoryGiB * 1024 * 1024 * 1024, + AvailabilityZone: az, + } + + result, err := mgr.ApplyCommitmentState( + context.Background(), logr.Discard(), desiredState, map[string]compute.FlavorGroupFeature{"test-group": fg}, "test", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if tt.validateTouched != nil { + tt.validateTouched(t, result.TouchedReservations) + } + }) + } +} + // ============================================================================ // Tests: newReservation flavor selection // ============================================================================ From 61cd2ee43c7f68b10f432b86ba8ace3a28b484f5 Mon Sep 17 00:00:00 2001 From: mblos Date: Wed, 17 Jun 2026 16:48:18 +0200 Subject: [PATCH 4/4] apply payg remapping also during rollback --- .../reservations/commitments/committed_resource_controller.go | 4 +++- .../reservations/commitments/reservation_manager.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/scheduling/reservations/commitments/committed_resource_controller.go b/internal/scheduling/reservations/commitments/committed_resource_controller.go index c70f6f0a4..10cefb67d 100644 --- a/internal/scheduling/reservations/commitments/committed_resource_controller.go +++ b/internal/scheduling/reservations/commitments/committed_resource_controller.go @@ -474,7 +474,9 @@ func (r *CommittedResourceController) rollbackToAccepted(ctx context.Context, lo state.NamePrefix = cr.Name + "-" state.CreatorRequestID = reservations.GlobalRequestIDFromContext(ctx) state.ParentGeneration = cr.Generation - if _, err := NewReservationManager(r.Client).ApplyCommitmentState(ctx, logger, state, flavorGroups, "committed-resource-controller-rollback"); err != nil { + rollbackMgr := NewReservationManager(r.Client) + rollbackMgr.VMSource = r.VMSource + if _, err := rollbackMgr.ApplyCommitmentState(ctx, logger, state, flavorGroups, "committed-resource-controller-rollback"); err != nil { return fmt.Errorf("rollback apply failed: %w", err) } return nil diff --git a/internal/scheduling/reservations/commitments/reservation_manager.go b/internal/scheduling/reservations/commitments/reservation_manager.go index cab10c7c3..529f769af 100644 --- a/internal/scheduling/reservations/commitments/reservation_manager.go +++ b/internal/scheduling/reservations/commitments/reservation_manager.go @@ -63,6 +63,7 @@ type ReservationManager struct { MaxSlots int } +// NewReservationManager creates a ReservationManager using the given client. func NewReservationManager(k8sClient client.Client) *ReservationManager { return &ReservationManager{ Client: k8sClient,