Skip to content

Commit

Permalink
use feature gate instead of config
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin <[email protected]>
  • Loading branch information
KPostOffice committed Dec 2, 2024
1 parent d19a172 commit fa02f8b
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 169 deletions.
7 changes: 1 addition & 6 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func main() {

metrics.Register()

if cfg.Metrics.EnableLocalQueueMetrics {
if features.Enabled(features.LocalQueueMetrics) {
metrics.RegisterLQMetrics()
}

Expand Down Expand Up @@ -170,10 +170,6 @@ func main() {
cacheOptions = append(cacheOptions, cache.WithResourceTransformations(cfg.Resources.Transformations))
queueOptions = append(queueOptions, queue.WithResourceTransformations(cfg.Resources.Transformations))
}
if cfg.Metrics.EnableLocalQueueMetrics {
cacheOptions = append(cacheOptions, cache.WithLocalQueueMetrics(true))
queueOptions = append(queueOptions, queue.WithLocalQueueMetrics(true))
}
if cfg.FairSharing != nil {
cacheOptions = append(cacheOptions, cache.WithFairSharing(cfg.FairSharing.Enable))
}
Expand Down Expand Up @@ -356,7 +352,6 @@ func setupScheduler(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager
mgr.GetEventRecorderFor(constants.AdmissionName),
scheduler.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(cfg)),
scheduler.WithFairSharing(cfg.FairSharing),
scheduler.WithLocalQueueMetrics(cfg.Metrics.EnableLocalQueueMetrics),
)
if err := mgr.Add(sched); err != nil {
setupLog.Error(err, "Unable to add scheduler to manager")
Expand Down
35 changes: 13 additions & 22 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type options struct {
workloadInfoOptions []workload.InfoOption
podsReadyTracking bool
fairSharingEnabled bool
localQueueMetrics bool
}

// Option configures the reconciler.
Expand Down Expand Up @@ -94,12 +93,6 @@ func WithFairSharing(enabled bool) Option {
}
}

func WithLocalQueueMetrics(enabled bool) Option {
return func(o *options) {
o.localQueueMetrics = enabled
}
}

var defaultOptions = options{}

// Cache keeps track of the Workloads that got admitted through ClusterQueues.
Expand All @@ -114,7 +107,6 @@ type Cache struct {
admissionChecks map[string]AdmissionCheck
workloadInfoOptions []workload.InfoOption
fairSharingEnabled bool
localQueueMetrics bool

hm hierarchy.Manager[*clusterQueue, *cohort]

Expand All @@ -134,7 +126,6 @@ func New(client client.Client, opts ...Option) *Cache {
podsReadyTracking: options.podsReadyTracking,
workloadInfoOptions: options.workloadInfoOptions,
fairSharingEnabled: options.fairSharingEnabled,
localQueueMetrics: options.localQueueMetrics,
hm: hierarchy.NewManager[*clusterQueue, *cohort](newCohort),
tasCache: NewTASCache(client),
}
Expand All @@ -156,7 +147,7 @@ func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*clusterQueue, error) {
}
c.hm.AddClusterQueue(cqImpl)
c.hm.UpdateClusterQueueEdge(cq.Name, cq.Spec.Cohort)
if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, nil, c.localQueueMetrics); err != nil {
if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, nil); err != nil {
return nil, err
}

Expand Down Expand Up @@ -228,8 +219,8 @@ func (c *Cache) updateClusterQueues() sets.Set[string] {
// We call update on all ClusterQueues irrespective of which CQ actually use this flavor
// because it is not expensive to do so, and is not worth tracking which ClusterQueues use
// which flavors.
cq.UpdateWithFlavors(c.resourceFlavors, c.localQueueMetrics)
cq.updateWithAdmissionChecks(c.admissionChecks, c.localQueueMetrics)
cq.UpdateWithFlavors(c.resourceFlavors)
cq.updateWithAdmissionChecks(c.admissionChecks)
curStatus := cq.Status
if prevStatus == pending && curStatus == active {
cqs.Insert(cq.Name)
Expand Down Expand Up @@ -421,7 +412,7 @@ func (c *Cache) UpdateClusterQueue(cq *kueue.ClusterQueue) error {
}
oldParent := cqImpl.Parent()
c.hm.UpdateClusterQueueEdge(cq.Name, cq.Spec.Cohort)
if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, oldParent, c.localQueueMetrics); err != nil {
if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, oldParent); err != nil {
return err
}
for _, qImpl := range cqImpl.localQueues {
Expand All @@ -440,7 +431,7 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
if !ok {
return
}
if c.localQueueMetrics {
if features.Enabled(features.LocalQueueMetrics) {
for _, q := range c.hm.ClusterQueues[cq.Name].localQueues {
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(q.key))
}
Expand Down Expand Up @@ -528,13 +519,13 @@ func (c *Cache) addOrUpdateWorkload(w *kueue.Workload) bool {
c.cleanupAssumedState(w)

if _, exist := clusterQueue.Workloads[workload.Key(w)]; exist {
clusterQueue.deleteWorkload(w, c.localQueueMetrics)
clusterQueue.deleteWorkload(w)
}

if c.podsReadyTracking {
c.podsReadyCond.Broadcast()
}
return clusterQueue.addWorkload(w, c.localQueueMetrics) == nil
return clusterQueue.addWorkload(w) == nil
}

func (c *Cache) UpdateWorkload(oldWl, newWl *kueue.Workload) error {
Expand All @@ -545,7 +536,7 @@ func (c *Cache) UpdateWorkload(oldWl, newWl *kueue.Workload) error {
if !ok {
return errors.New("old ClusterQueue doesn't exist")
}
cq.deleteWorkload(oldWl, c.localQueueMetrics)
cq.deleteWorkload(oldWl)
}
c.cleanupAssumedState(oldWl)

Expand All @@ -559,7 +550,7 @@ func (c *Cache) UpdateWorkload(oldWl, newWl *kueue.Workload) error {
if c.podsReadyTracking {
c.podsReadyCond.Broadcast()
}
return cq.addWorkload(newWl, c.localQueueMetrics)
return cq.addWorkload(newWl)
}

func (c *Cache) DeleteWorkload(w *kueue.Workload) error {
Expand All @@ -573,7 +564,7 @@ func (c *Cache) DeleteWorkload(w *kueue.Workload) error {

c.cleanupAssumedState(w)

cq.deleteWorkload(w, c.localQueueMetrics)
cq.deleteWorkload(w)
if c.podsReadyTracking {
c.podsReadyCond.Broadcast()
}
Expand Down Expand Up @@ -615,7 +606,7 @@ func (c *Cache) AssumeWorkload(w *kueue.Workload) error {
return ErrCqNotFound
}

if err := cq.addWorkload(w, c.localQueueMetrics); err != nil {
if err := cq.addWorkload(w); err != nil {
return err
}
c.assumedWorkloads[k] = string(w.Status.Admission.ClusterQueue)
Expand All @@ -639,7 +630,7 @@ func (c *Cache) ForgetWorkload(w *kueue.Workload) error {
if !ok {
return ErrCqNotFound
}
cq.deleteWorkload(w, c.localQueueMetrics)
cq.deleteWorkload(w)
if c.podsReadyTracking {
c.podsReadyCond.Broadcast()
}
Expand Down Expand Up @@ -805,7 +796,7 @@ func (c *Cache) cleanupAssumedState(w *kueue.Workload) {
// one, then we should also clean up the assumed one.
if workload.HasQuotaReservation(w) && assumedCQName != string(w.Status.Admission.ClusterQueue) {
if assumedCQ, exist := c.hm.ClusterQueues[assumedCQName]; exist {
assumedCQ.deleteWorkload(w, c.localQueueMetrics)
assumedCQ.deleteWorkload(w)
}
}
delete(c.assumedWorkloads, k)
Expand Down
41 changes: 22 additions & 19 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ var defaultPreemption = kueue.ClusterQueuePreemption{

var defaultFlavorFungibility = kueue.FlavorFungibility{WhenCanBorrow: kueue.Borrow, WhenCanPreempt: kueue.TryNextFlavor}

func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort, lqMetrics bool) error {
func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort) error {
if c.updateQuotasAndResourceGroups(in.Spec.ResourceGroups) || oldParent != c.Parent() {
if oldParent != nil && oldParent != c.Parent() {
// ignore error when old Cohort has cycle.
Expand Down Expand Up @@ -179,8 +179,8 @@ func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, i
c.Preemption = defaultPreemption
}

c.UpdateWithFlavors(resourceFlavors, lqMetrics)
c.updateWithAdmissionChecks(admissionChecks, lqMetrics)
c.UpdateWithFlavors(resourceFlavors)
c.updateWithAdmissionChecks(admissionChecks)

if in.Spec.FlavorFungibility != nil {
c.FlavorFungibility = *in.Spec.FlavorFungibility
Expand Down Expand Up @@ -230,7 +230,7 @@ func (c *clusterQueue) updateQuotasAndResourceGroups(in []kueue.ResourceGroup) b
!equality.Semantic.DeepEqual(oldQuotas, c.resourceNode.Quotas)
}

func (c *clusterQueue) updateQueueStatus(lqMetrics bool) {
func (c *clusterQueue) updateQueueStatus() {
status := active
if c.isStopped ||
len(c.missingFlavors) > 0 ||
Expand All @@ -250,7 +250,7 @@ func (c *clusterQueue) updateQueueStatus(lqMetrics bool) {
if status != c.Status {
c.Status = status
metrics.ReportClusterQueueStatus(c.Name, c.Status)
if lqMetrics {
if features.Enabled(features.LocalQueueMetrics) {
for _, lq := range c.localQueues {
metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(lq.key), c.Status)
}
Expand Down Expand Up @@ -345,9 +345,9 @@ func (c *clusterQueue) isTASViolated() bool {

// UpdateWithFlavors updates a ClusterQueue based on the passed ResourceFlavors set.
// Exported only for testing.
func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, lqMetrics bool) {
func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
c.updateLabelKeys(flavors)
c.updateQueueStatus(lqMetrics)
c.updateQueueStatus()
}

func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
Expand Down Expand Up @@ -380,7 +380,7 @@ func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference
}

// updateWithAdmissionChecks updates a ClusterQueue based on the passed AdmissionChecks set.
func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck, lqMetrics bool) {
func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) {
checksPerController := make(map[string][]string, len(c.AdmissionChecks))
singleInstanceControllers := sets.New[string]()
multiKueueAdmissionChecks := sets.New[string]()
Expand Down Expand Up @@ -476,32 +476,32 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
}

if update {
c.updateQueueStatus(lqMetrics)
c.updateQueueStatus()
}
}

func (c *clusterQueue) addWorkload(w *kueue.Workload, lqMetrics bool) error {
func (c *clusterQueue) addWorkload(w *kueue.Workload) error {
k := workload.Key(w)
if _, exist := c.Workloads[k]; exist {
return errors.New("workload already exists in ClusterQueue")
}
wi := workload.NewInfo(w, c.workloadInfoOptions...)
c.Workloads[k] = wi
c.updateWorkloadUsage(wi, 1, lqMetrics)
c.updateWorkloadUsage(wi, 1)
if c.podsReadyTracking && !apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadPodsReady) {
c.WorkloadsNotReady.Insert(k)
}
c.reportActiveWorkloads()
return nil
}

func (c *clusterQueue) deleteWorkload(w *kueue.Workload, lqMetrics bool) {
func (c *clusterQueue) deleteWorkload(w *kueue.Workload) {
k := workload.Key(w)
wi, exist := c.Workloads[k]
if !exist {
return
}
c.updateWorkloadUsage(wi, -1, lqMetrics)
c.updateWorkloadUsage(wi, -1)
if c.podsReadyTracking && !apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadPodsReady) {
c.WorkloadsNotReady.Delete(k)
}
Expand All @@ -519,15 +519,14 @@ func (c *clusterQueue) reportActiveWorkloads() {
}

func (q *queue) reportActiveWorkloads() {
// KTODO: report local queue metrics
qKeySlice := strings.Split(q.key, "/")
metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.admittedWorkloads))
metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.reservingWorkloads))
}

// updateWorkloadUsage updates the usage of the ClusterQueue for the workload
// and the number of admitted workloads for local queues.
func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64, lqMetrics bool) {
func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
admitted := workload.IsAdmitted(wi.Obj)
frUsage := wi.FlavorResourceUsage()
for fr, q := range frUsage {
Expand Down Expand Up @@ -562,7 +561,7 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64, lqMetrics
updateFlavorUsage(frUsage, lq.admittedUsage, m)
lq.admittedWorkloads += int(m)
}
if lqMetrics {
if features.Enabled(features.LocalQueueMetrics) {
lq.reportActiveWorkloads()
}
}
Expand Down Expand Up @@ -609,14 +608,18 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error {
}
}
c.localQueues[qKey] = qImpl
qImpl.reportActiveWorkloads()
metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(qKey), c.Status)
if features.Enabled(features.LocalQueueMetrics) {
qImpl.reportActiveWorkloads()
metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(qKey), c.Status)
}
return nil
}

func (c *clusterQueue) deleteLocalQueue(q *kueue.LocalQueue) {
qKey := queueKey(q)
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(qKey))
if features.Enabled(features.LocalQueueMetrics) {
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(qKey))
}
delete(c.localQueues, qKey)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestClusterQueueUpdateWithFlavors(t *testing.T) {
}

cq.Status = tc.curStatus
cq.UpdateWithFlavors(tc.flavors, false)
cq.UpdateWithFlavors(tc.flavors)

if cq.Status != tc.wantStatus {
t.Fatalf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status)
Expand Down Expand Up @@ -911,7 +911,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
cq.flavorIndependentAdmissionCheckAppliedPerFlavor = []string{"not-on-flavor"}
}
}
cq.updateWithAdmissionChecks(tc.admissionChecks, false)
cq.updateWithAdmissionChecks(tc.admissionChecks)

if cq.Status != tc.wantStatus {
t.Errorf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status)
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
if err := acRec.SetupWithManager(mgr, cfg); err != nil {
return "AdmissionCheck", err
}
qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc, LqControllerWithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics))
qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc)
if err := qRec.SetupWithManager(mgr, cfg); err != nil {
return "LocalQueue", err
}
Expand All @@ -58,7 +58,6 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
cc,
WithQueueVisibilityUpdateInterval(queueVisibilityUpdateInterval(cfg)),
WithQueueVisibilityClusterQueuesMaxCount(queueVisibilityClusterQueuesMaxCount(cfg)),
WithReportResourceMetrics(cfg.Metrics.EnableClusterQueueResources),
WithFairSharing(fairSharingEnabled),
WithWatchers(rfRec, acRec),
)
Expand All @@ -80,7 +79,6 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
mgr.GetEventRecorderFor(constants.WorkloadControllerName),
WithWorkloadUpdateWatchers(qRec, cqRec),
WithWaitForPodsReady(waitForPodsReady(cfg.WaitForPodsReady)),
WlControllerWithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics),
).SetupWithManager(mgr, cfg); err != nil {
return "Workload", err
}
Expand Down
Loading

0 comments on commit fa02f8b

Please sign in to comment.