Skip to content

Commit

Permalink
[MultiKueue] Introduce Admission Check Validation feature gate (#3254)
Browse files Browse the repository at this point in the history
* Wrap AdmissionChecksSingleInstanceInClusterQueue and FlavorIndependentAdmissionCheck status conditions with a feature gate.
Preserve the logic without the status conditions.

* Approach with SingleInstanceInClusterQueue and FlavorIndependent set to true when multikueue

* Apply code review comments

* Update after code review
  • Loading branch information
mszadkow authored Oct 22, 2024
1 parent 78c234c commit 6300ff3
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 77 deletions.
2 changes: 2 additions & 0 deletions apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
ClusterQueueActiveReasonAdmissionCheckInactive = "AdmissionCheckInactive"
ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks = "MultipleSingleInstanceControllerAdmissionChecks"
ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor = "FlavorIndependentAdmissionCheckAppliedPerFlavor"
ClusterQueueActiveReasonMultipleMultiKueueAdmissionChecks = "MultipleMultiKueueAdmissionChecks"
ClusterQueueActiveReasonMutliKueueAdmissionCheckAppliedPerFlavor = "MutliKueueAdmissionCheckAppliedPerFlavor"
ClusterQueueActiveReasonUnknown = "Unknown"
ClusterQueueActiveReasonReady = "Ready"
)
Expand Down
17 changes: 12 additions & 5 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,19 @@ func (c *Cache) DeleteResourceFlavor(rf *kueue.ResourceFlavor) sets.Set[string]
func (c *Cache) AddOrUpdateAdmissionCheck(ac *kueue.AdmissionCheck) sets.Set[string] {
c.Lock()
defer c.Unlock()
c.admissionChecks[ac.Name] = AdmissionCheck{
Active: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionCheckActive),
Controller: ac.Spec.ControllerName,
SingleInstanceInClusterQueue: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue),
FlavorIndependent: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck),

newAC := AdmissionCheck{
Active: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionCheckActive),
Controller: ac.Spec.ControllerName,
}
if features.Enabled(features.AdmissionCheckValidationRules) {
newAC.SingleInstanceInClusterQueue = apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue)
newAC.FlavorIndependent = apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck)
} else if ac.Spec.ControllerName == kueue.MultiKueueControllerName {
newAC.SingleInstanceInClusterQueue = true
newAC.FlavorIndependent = true
}
c.admissionChecks[ac.Name] = newAC

return c.updateClusterQueues()
}
Expand Down
55 changes: 49 additions & 6 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type clusterQueue struct {
inactiveAdmissionChecks []string
multipleSingleInstanceControllersChecks map[string][]string // key = controllerName
flavorIndependentAdmissionCheckAppliedPerFlavor []string
multipleMultiKueueAdmissionChecks []string
perFlavorMultiKueueAdmissionChecks []string
admittedWorkloadsCount int
isStopped bool
workloadInfoOptions []workload.InfoOption
Expand Down Expand Up @@ -233,7 +235,10 @@ func (c *clusterQueue) updateQueueStatus() {
len(c.missingAdmissionChecks) > 0 ||
len(c.inactiveAdmissionChecks) > 0 ||
len(c.multipleSingleInstanceControllersChecks) > 0 ||
len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 ||
// one multikueue admission check is allowed
len(c.multipleMultiKueueAdmissionChecks) > 1 ||
len(c.perFlavorMultiKueueAdmissionChecks) > 0 {
status = pending
}
if c.Status == terminating {
Expand Down Expand Up @@ -268,13 +273,25 @@ func (c *clusterQueue) inactiveReason() (string, string) {
reasons = append(reasons, kueue.ClusterQueueActiveReasonAdmissionCheckInactive)
messages = append(messages, fmt.Sprintf("references inactive AdmissionCheck(s): %v", c.inactiveAdmissionChecks))
}

if len(c.multipleMultiKueueAdmissionChecks) > 1 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleMultiKueueAdmissionChecks)
messages = append(messages, fmt.Sprintf("Cannot use multiple MultiKueue AdmissionChecks on the same ClusterQueue, found: %v", strings.Join(c.multipleMultiKueueAdmissionChecks, ",")))
}

if len(c.perFlavorMultiKueueAdmissionChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMutliKueueAdmissionCheckAppliedPerFlavor)
messages = append(messages, fmt.Sprintf("Cannot specify MultiKueue AdmissionCheck per flavor, found: %s", strings.Join(c.perFlavorMultiKueueAdmissionChecks, ",")))
}

// This doesn't need to be gated behind, because it is empty when the gate is disabled
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
for _, controller := range utilmaps.SortedKeys(c.multipleSingleInstanceControllersChecks) {
messages = append(messages, fmt.Sprintf("only one AdmissionCheck of %v can be referenced for controller %q", c.multipleSingleInstanceControllersChecks[controller], controller))
}
}

// This doesn't need to be gated behind, because it is empty when the gate is disabled
if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, fmt.Sprintf("AdmissionCheck(s): %v cannot be set at flavor level", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
Expand Down Expand Up @@ -325,9 +342,11 @@ func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference
func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) {
checksPerController := make(map[string][]string, len(c.AdmissionChecks))
singleInstanceControllers := sets.New[string]()
multipleMultiKueueControllers := sets.New[string]()
var missing []string
var inactive []string
var flavorIndependentCheckOnFlavors []string
var perFlavorMultiKueueChecks []string
for acName, flavors := range c.AdmissionChecks {
if ac, found := checks[acName]; !found {
missing = append(missing, acName)
Expand All @@ -342,13 +361,25 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
if ac.FlavorIndependent && flavors.Len() != 0 {
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
}

if ac.Controller == kueue.MultiKueueControllerName {
// MultiKueue Admission Checks has extra constraints:
// - cannot use multiple MultiKueue AdmissionChecks on the same ClusterQueue
// - cannot use specify MultiKueue AdmissionCheck per flavor
multipleMultiKueueControllers.Insert(acName)
if flavors.Len() != 0 {
perFlavorMultiKueueChecks = append(perFlavorMultiKueueChecks, acName)
}
}
}
}

// sort the lists since c.AdmissionChecks is a map
slices.Sort(missing)
slices.Sort(inactive)
slices.Sort(flavorIndependentCheckOnFlavors)
slices.Sort(perFlavorMultiKueueChecks)
multipleMultiKueueControllersList := sets.List(multipleMultiKueueControllers)

update := false
if !slices.Equal(c.missingAdmissionChecks, missing) {
Expand All @@ -371,13 +402,25 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
slices.Sort(checksPerController[c])
}

if !maps.EqualFunc(checksPerController, c.multipleSingleInstanceControllersChecks, slices.Equal) {
c.multipleSingleInstanceControllersChecks = checksPerController
// Behind the gate due to being triggered when AC is MultiKueue
if features.Enabled(features.AdmissionCheckValidationRules) {
if !maps.EqualFunc(checksPerController, c.multipleSingleInstanceControllersChecks, slices.Equal) {
c.multipleSingleInstanceControllersChecks = checksPerController
update = true
}
if !slices.Equal(c.flavorIndependentAdmissionCheckAppliedPerFlavor, flavorIndependentCheckOnFlavors) {
c.flavorIndependentAdmissionCheckAppliedPerFlavor = flavorIndependentCheckOnFlavors
update = true
}
}

if !slices.Equal(c.multipleMultiKueueAdmissionChecks, multipleMultiKueueControllersList) {
c.multipleMultiKueueAdmissionChecks = multipleMultiKueueControllersList
update = true
}

if !slices.Equal(c.flavorIndependentAdmissionCheckAppliedPerFlavor, flavorIndependentCheckOnFlavors) {
c.flavorIndependentAdmissionCheckAppliedPerFlavor = flavorIndependentCheckOnFlavors
if !slices.Equal(c.perFlavorMultiKueueAdmissionChecks, perFlavorMultiKueueChecks) {
c.perFlavorMultiKueueAdmissionChecks = perFlavorMultiKueueChecks
update = true
}

Expand Down
149 changes: 129 additions & 20 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,14 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
Obj()

testcases := []struct {
name string
cq *kueue.ClusterQueue
cqStatus metrics.ClusterQueueStatus
admissionChecks map[string]AdmissionCheck
wantStatus metrics.ClusterQueueStatus
wantReason string
wantMessage string
name string
cq *kueue.ClusterQueue
cqStatus metrics.ClusterQueueStatus
admissionChecks map[string]AdmissionCheck
wantStatus metrics.ClusterQueueStatus
wantReason string
wantMessage string
acValidationRulesEnabled bool
}{
{
name: "Pending clusterQueue updated valid AC list",
Expand All @@ -527,6 +528,29 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
wantReason: "Ready",
wantMessage: "Can admit new workloads",
},
{
name: "Pending clusterQueue updated valid AC list - AdmissionCheckValidationRules enabled",
cq: cqWithAC,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
},
"check2": {
Active: true,
Controller: "controller2",
},
"check3": {
Active: true,
Controller: "controller3",
},
},
wantStatus: active,
wantReason: "Ready",
wantMessage: "Can admit new workloads",
acValidationRulesEnabled: true,
},
{
name: "Pending clusterQueue with an AC strategy updated valid AC list",
cq: cqWithACStrategy,
Expand Down Expand Up @@ -630,7 +654,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
wantMessage: "Can't admit new workloads: references inactive AdmissionCheck(s): [check3].",
},
{
name: "Active clusterQueue updated with duplicate single instance AC Controller",
name: "Active clusterQueue updated with duplicate single instance AC Controller - AdmissionCheckValidationRules enabled",
cq: cqWithAC,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -649,12 +673,13 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
acValidationRulesEnabled: true,
},
{
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller",
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller - AdmissionCheckValidationRules enabled",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -673,12 +698,35 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
acValidationRulesEnabled: true,
},
{
name: "Active clusterQueue with an MultiKueue AC strategy updated with duplicate single instance AC Controller",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: kueue.MultiKueueControllerName,
},
"check2": {
Active: true,
Controller: "controller2",
},
"check3": {
Active: true,
Controller: kueue.MultiKueueControllerName,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
wantReason: kueue.ClusterQueueActiveReasonMultipleMultiKueueAdmissionChecks,
wantMessage: `Can't admit new workloads: Cannot use multiple MultiKueue AdmissionChecks on the same ClusterQueue, found: check1,check3.`,
},
{
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor",
name: "Pending clusterQueue with a FlavorIndependent AC applied per ResourceFlavor",
cq: cqWithACPerFlavor,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -688,9 +736,9 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
FlavorIndependent: true,
},
},
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: AdmissionCheck(s): [check1] cannot be set at flavor level.",
wantStatus: active,
wantReason: "Ready",
wantMessage: "Can admit new workloads",
},
{
name: "Terminating clusterQueue updated with valid AC list",
Expand Down Expand Up @@ -772,10 +820,68 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
wantReason: "Terminating",
wantMessage: "Can't admit new workloads; clusterQueue is terminating",
},
{
name: "Active clusterQueue with an AC strategy updated with AdmissionCheckValidationRules disabled and no MultiKueue",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
SingleInstanceInClusterQueue: true,
},
"check2": {
Active: true,
Controller: "controller2",
},
"check3": {
Active: true,
Controller: "controller2",
SingleInstanceInClusterQueue: true,
},
},
wantStatus: active,
wantReason: "Ready",
wantMessage: "Can admit new workloads",
},
{
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor - AdmissionCheckValidationRules enabled",
cq: cqWithACPerFlavor,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
FlavorIndependent: true,
},
},
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: AdmissionCheck(s): [check1] cannot be set at flavor level.",
acValidationRulesEnabled: true,
},
{
name: "Active clusterQueue with a FlavorIndependent MultiKueue AC applied per ResourceFlavor",
cq: cqWithACPerFlavor,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: kueue.MultiKueueControllerName,
FlavorIndependent: true,
},
},
wantStatus: pending,
wantReason: "MutliKueueAdmissionCheckAppliedPerFlavor",
wantMessage: `Can't admit new workloads: Cannot specify MultiKueue AdmissionCheck per flavor, found: check1.`,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
if tc.acValidationRulesEnabled {
features.SetFeatureGateDuringTest(t, features.AdmissionCheckValidationRules, true)
}
cache := New(utiltesting.NewFakeClient())
cq, err := cache.newClusterQueue(tc.cq)
if err != nil {
Expand All @@ -791,10 +897,13 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
cq.inactiveAdmissionChecks = nil
cq.flavorIndependentAdmissionCheckAppliedPerFlavor = nil
} else {
cq.multipleSingleInstanceControllersChecks = map[string][]string{"c1": {"ac1", "ac2"}}
cq.missingAdmissionChecks = []string{"missing-ac"}
cq.inactiveAdmissionChecks = []string{"inactive-ac"}
cq.flavorIndependentAdmissionCheckAppliedPerFlavor = []string{"not-on-flavor"}
// can only be cleaned up when feature gate is enabled
if tc.acValidationRulesEnabled {
cq.multipleSingleInstanceControllersChecks = map[string][]string{"c1": {"ac1", "ac2"}}
cq.flavorIndependentAdmissionCheckAppliedPerFlavor = []string{"not-on-flavor"}
}
}
cq.updateWithAdmissionChecks(tc.admissionChecks)

Expand Down
Loading

0 comments on commit 6300ff3

Please sign in to comment.