Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,10 +598,11 @@ func main() {
metrics.Registry.MustRegister(&crControllerMonitor)

if err := (&commitments.CommittedResourceController{
Client: multiclusterClient,
Scheme: mgr.GetScheme(),
Conf: crControllerConf,
Monitor: &crControllerMonitor,
Client: multiclusterClient,
Scheme: mgr.GetScheme(),
Conf: crControllerConf,
Monitor: &crControllerMonitor,
VMSource: commitmentsVMSource,
}).SetupWithManager(mgr, multiclusterClient); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CommittedResource")
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type CommittedResourceController struct {
Scheme *runtime.Scheme
Conf CommittedResourceControllerConfig
Monitor *CRControllerMonitor
// VMSource enables PAYG pre-allocation when creating reservation slots. When nil the
// PAYG scan is skipped and all slots are created via blind scheduler probes.
VMSource reservations.VMSource
}

func (r *CommittedResourceController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -316,6 +319,7 @@ func (r *CommittedResourceController) applyReservationState(ctx context.Context,
state.ParentGeneration = cr.Generation

mgr := NewReservationManager(r.Client)
mgr.VMSource = r.VMSource
mgr.SlotCreationDelay = r.Conf.SlotCreationDelay.Duration
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if cr.Spec.AllowRejection {
mgr.MaxSlots = r.Conf.MaxSlotsPerCommitment
Expand Down Expand Up @@ -470,7 +474,9 @@ func (r *CommittedResourceController) rollbackToAccepted(ctx context.Context, lo
state.NamePrefix = cr.Name + "-"
state.CreatorRequestID = reservations.GlobalRequestIDFromContext(ctx)
state.ParentGeneration = cr.Generation
if _, err := NewReservationManager(r.Client).ApplyCommitmentState(ctx, logger, state, flavorGroups, "committed-resource-controller-rollback"); err != nil {
rollbackMgr := NewReservationManager(r.Client)
rollbackMgr.VMSource = r.VMSource
if _, err := rollbackMgr.ApplyCommitmentState(ctx, logger, state, flavorGroups, "committed-resource-controller-rollback"); err != nil {
return fmt.Errorf("rollback apply failed: %w", err)
}
return nil
Expand Down
119 changes: 104 additions & 15 deletions internal/scheduling/reservations/commitments/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

schedulerdelegationapi "github.com/cobaltcore-dev/cortex/api/external/nova"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -55,6 +56,9 @@ type CRIntegrationTestCase struct {
// CRs to create and drive to terminal state.
CommittedResources []*v1alpha1.CommittedResource

// When set, the CR controller is given a VMSource for PAYG pre-allocation.
VMSource reservations.VMSource

// When true the mock scheduler returns an empty hosts list (NoHostsFound).
SchedulerRejects bool
// SchedulerAcceptFirst, when > 0, makes the mock scheduler accept only the first N
Expand All @@ -63,12 +67,13 @@ type CRIntegrationTestCase struct {
SchedulerAcceptFirst int

// Expected state after all CRs reach a terminal condition.
ExpectedSlots int // total Reservation CRDs remaining in the store
AcceptedCRs []string // CRs expected Ready=True / Accepted
RejectedCRs []string // CRs expected Ready=False / Rejected
PlannedCRs []string // CRs expected Ready=False / Planned
ExpiredCRs []string // CRs expected Ready=False / Expired
SupersededCRs []string // CRs expected Ready=False / Superseded
ExpectedSlots int // total Reservation CRDs remaining in the store
AcceptedCRs []string // CRs expected Ready=True / Accepted
RejectedCRs []string // CRs expected Ready=False / Rejected
PlannedCRs []string // CRs expected Ready=False / Planned
ExpiredCRs []string // CRs expected Ready=False / Expired
SupersededCRs []string // CRs expected Ready=False / Superseded
ValidateReservations func(t *testing.T, slots []v1alpha1.Reservation) // optional extra assertions
}

func TestCRIntegration(t *testing.T) {
Expand Down Expand Up @@ -258,6 +263,70 @@ func TestCRIntegration(t *testing.T) {
ExpectedSlots: 0,
RejectedCRs: []string{"cr-partial"},
},
// ------------------------------------------------------------------
// PAYG pre-allocation
// ------------------------------------------------------------------
{
// PAYG VM on host-1 matches the CR project + flavor group.
// CR controller pre-allocates the slot; reservation controller marks it
// Ready via the PreAllocated fast-path without calling the scheduler.
Name: "PAYG VM present: slot pre-allocated on HV, no scheduler call needed",
Hypervisors: []*hv1.Hypervisor{
intgHypervisorWithAZ("host-1", "test-az", "vm-payg-1"),
},
VMSource: &fakeVMSource{vms: []reservations.VM{{
UUID: "vm-payg-1",
FlavorName: "test-flavor",
CurrentHypervisor: "host-1",
}}},
CommittedResources: []*v1alpha1.CommittedResource{
intgCR("cr-payg", "uuid-intg-payg-1", v1alpha1.CommitmentStatusConfirmed),
},
SchedulerRejects: true, // scheduler would reject if called — proves it isn't
ExpectedSlots: 1,
AcceptedCRs: []string{"cr-payg"},
ValidateReservations: func(t *testing.T, slots []v1alpha1.Reservation) {
t.Helper()
if len(slots) != 1 {
t.Fatalf("want 1 slot, got %d", len(slots))
}
res := slots[0]
if res.Spec.TargetHost != "host-1" {
t.Errorf("TargetHost: want host-1, got %q", res.Spec.TargetHost)
}
if res.Spec.CommittedResourceReservation == nil {
t.Fatal("CommittedResourceReservation is nil")
}
if _, ok := res.Spec.CommittedResourceReservation.Allocations["vm-payg-1"]; !ok {
t.Error("expected vm-payg-1 in Spec.Allocations")
}
},
},
{
// No PAYG VMs → falls through to the scheduler. Scheduler accepts → CR accepted.
Name: "no PAYG VMs: falls back to scheduler, CR accepted normally",
Hypervisors: []*hv1.Hypervisor{
intgHypervisorWithAZ("host-1", "test-az"),
},
VMSource: &fakeVMSource{vms: nil},
CommittedResources: []*v1alpha1.CommittedResource{
intgCR("cr-nopayg", "uuid-intg-payg-2", v1alpha1.CommitmentStatusConfirmed),
},
ExpectedSlots: 1,
AcceptedCRs: []string{"cr-nopayg"},
ValidateReservations: func(t *testing.T, slots []v1alpha1.Reservation) {
t.Helper()
if len(slots) != 1 {
t.Fatalf("want 1 slot, got %d", len(slots))
}
if slots[0].Spec.TargetHost == "" {
t.Error("expected TargetHost set by scheduler (Phase 5 path)")
}
if len(slots[0].Spec.CommittedResourceReservation.Allocations) != 0 {
t.Error("expected no pre-allocations on scheduler-placed slot")
}
},
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -290,7 +359,7 @@ func runCRIntegrationTestCase(t *testing.T, tc CRIntegrationTestCase) {
objects = append(objects, res)
}

env := newIntgEnv(t, objects, schedulerFn)
env := newIntgEnv(t, objects, schedulerFn, tc.VMSource)
defer env.close()

crNames := make([]string, len(tc.CommittedResources))
Expand Down Expand Up @@ -320,6 +389,10 @@ func runCRIntegrationTestCase(t *testing.T, tc CRIntegrationTestCase) {
intgAssertCRCondition(t, env.k8sClient, tc.PlannedCRs, metav1.ConditionFalse, v1alpha1.CommittedResourceReasonPlanned)
intgAssertCRCondition(t, env.k8sClient, tc.ExpiredCRs, metav1.ConditionFalse, string(v1alpha1.CommitmentStatusExpired))
intgAssertCRCondition(t, env.k8sClient, tc.SupersededCRs, metav1.ConditionFalse, string(v1alpha1.CommitmentStatusSuperseded))

if tc.ValidateReservations != nil {
tc.ValidateReservations(t, resList.Items)
}
}

// ============================================================================
Expand All @@ -333,7 +406,7 @@ type intgEnv struct {
schedulerSrv *httptest.Server
}

func newIntgEnv(t *testing.T, initialObjects []client.Object, schedulerFn http.HandlerFunc) *intgEnv {
func newIntgEnv(t *testing.T, initialObjects []client.Object, schedulerFn http.HandlerFunc, vmSource reservations.VMSource) *intgEnv {
t.Helper()
scheme := newCRTestScheme(t)

Expand Down Expand Up @@ -375,9 +448,10 @@ func newIntgEnv(t *testing.T, initialObjects []client.Object, schedulerFn http.H
schedulerSrv := httptest.NewServer(schedulerFn)

crCtrl := &CommittedResourceController{
Client: k8sClient,
Scheme: scheme,
Conf: CommittedResourceControllerConfig{RequeueIntervalRetry: metav1.Duration{Duration: 5 * time.Minute}},
Client: k8sClient,
Scheme: scheme,
Conf: CommittedResourceControllerConfig{RequeueIntervalRetry: metav1.Duration{Duration: 5 * time.Minute}},
VMSource: vmSource,
}
resCtrl := &CommitmentReservationController{
Client: k8sClient,
Expand All @@ -399,7 +473,7 @@ func (e *intgEnv) close() { e.schedulerSrv.Close() }
func newDefaultIntgEnv(t *testing.T) *intgEnv {
t.Helper()
objects := []client.Object{newTestFlavorKnowledge(), intgHypervisor("host-1")}
return newIntgEnv(t, objects, intgAcceptScheduler)
return newIntgEnv(t, objects, intgAcceptScheduler, nil)
}

func (e *intgEnv) reconcileCR(t *testing.T, crName string) {
Expand Down Expand Up @@ -630,6 +704,21 @@ func intgHypervisor(name string) *hv1.Hypervisor {
return &hv1.Hypervisor{ObjectMeta: metav1.ObjectMeta{Name: name}}
}

// intgHypervisorWithAZ returns a Hypervisor in the given AZ with optional active instances.
func intgHypervisorWithAZ(name, az string, instanceIDs ...string) *hv1.Hypervisor {
instances := make([]hv1.Instance, len(instanceIDs))
for i, id := range instanceIDs {
instances[i] = hv1.Instance{ID: id, Name: id, Active: true}
}
return &hv1.Hypervisor{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"topology.kubernetes.io/zone": az},
},
Status: hv1.HypervisorStatus{Instances: instances},
}
}

// intgCR returns a CommittedResource with the default 4 GiB amount.
// commitmentUUID must be unique per test case to avoid field-index collisions.
func intgCR(name, commitmentUUID string, state v1alpha1.CommitmentStatus) *v1alpha1.CommittedResource {
Expand Down Expand Up @@ -886,7 +975,7 @@ func TestCRLifecycle(t *testing.T) {
})

t.Run("AllowRejection=false: stays Reserving when scheduler rejects", func(t *testing.T) {
env := newIntgEnv(t, []client.Object{newTestFlavorKnowledge(), intgHypervisor("host-1")}, intgRejectScheduler)
env := newIntgEnv(t, []client.Object{newTestFlavorKnowledge(), intgHypervisor("host-1")}, intgRejectScheduler, nil)
defer env.close()

cr := newTestCommittedResource("my-cr", v1alpha1.CommitmentStatusConfirmed)
Expand Down Expand Up @@ -995,7 +1084,7 @@ func TestCRLifecycle(t *testing.T) {
t.Run("resize failure: rolls back to AcceptedSpec, prior slot preserved", func(t *testing.T) {
// Scheduler: accepts the first placement call (initial 4 GiB slot), rejects all subsequent.
objects := []client.Object{newTestFlavorKnowledge(), intgHypervisor("host-1")}
env := newIntgEnv(t, objects, intgAcceptFirstScheduler(1))
env := newIntgEnv(t, objects, intgAcceptFirstScheduler(1), nil)
defer env.close()

cr := intgCRAllowRejection("my-cr", "uuid-resize-0001", v1alpha1.CommitmentStatusConfirmed)
Expand Down Expand Up @@ -1060,7 +1149,7 @@ func TestCRLifecycle(t *testing.T) {
// then accepts all subsequent. AllowRejection=false means the CR controller retries rather
// than rejecting, so the CR must eventually reach Accepted once the scheduler cooperates.
objects := []client.Object{newTestFlavorKnowledge(), intgHypervisor("host-1")}
env := newIntgEnv(t, objects, intgRejectFirstScheduler(2))
env := newIntgEnv(t, objects, intgRejectFirstScheduler(2), nil)
defer env.close()

cr := newTestCommittedResource("my-cr", v1alpha1.CommitmentStatusConfirmed)
Expand Down
Loading