diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index d0c627846f..d65d119ecc 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -131,7 +131,7 @@ func main() { metrics.Register() - if cfg.Metrics.EnableLocalQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { metrics.RegisterLQMetrics() } @@ -170,10 +170,6 @@ func main() { cacheOptions = append(cacheOptions, cache.WithResourceTransformations(cfg.Resources.Transformations)) queueOptions = append(queueOptions, queue.WithResourceTransformations(cfg.Resources.Transformations)) } - if cfg.Metrics.EnableLocalQueueMetrics { - cacheOptions = append(cacheOptions, cache.WithLocalQueueMetrics(true)) - queueOptions = append(queueOptions, queue.WithLocalQueueMetrics(true)) - } if cfg.FairSharing != nil { cacheOptions = append(cacheOptions, cache.WithFairSharing(cfg.FairSharing.Enable)) } @@ -356,7 +352,6 @@ func setupScheduler(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager mgr.GetEventRecorderFor(constants.AdmissionName), scheduler.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(cfg)), scheduler.WithFairSharing(cfg.FairSharing), - scheduler.WithLocalQueueMetrics(cfg.Metrics.EnableLocalQueueMetrics), ) if err := mgr.Add(sched); err != nil { setupLog.Error(err, "Unable to add scheduler to manager") diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index a26d29b871..3fb03445dd 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -60,7 +60,6 @@ type options struct { workloadInfoOptions []workload.InfoOption podsReadyTracking bool fairSharingEnabled bool - localQueueMetrics bool } // Option configures the reconciler. @@ -94,12 +93,6 @@ func WithFairSharing(enabled bool) Option { } } -func WithLocalQueueMetrics(enabled bool) Option { - return func(o *options) { - o.localQueueMetrics = enabled - } -} - var defaultOptions = options{} // Cache keeps track of the Workloads that got admitted through ClusterQueues. @@ -114,7 +107,6 @@ type Cache struct { admissionChecks map[string]AdmissionCheck workloadInfoOptions []workload.InfoOption fairSharingEnabled bool - localQueueMetrics bool hm hierarchy.Manager[*clusterQueue, *cohort] @@ -134,7 +126,6 @@ func New(client client.Client, opts ...Option) *Cache { podsReadyTracking: options.podsReadyTracking, workloadInfoOptions: options.workloadInfoOptions, fairSharingEnabled: options.fairSharingEnabled, - localQueueMetrics: options.localQueueMetrics, hm: hierarchy.NewManager[*clusterQueue, *cohort](newCohort), tasCache: NewTASCache(client), } @@ -156,7 +147,7 @@ func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*clusterQueue, error) { } c.hm.AddClusterQueue(cqImpl) c.hm.UpdateClusterQueueEdge(cq.Name, cq.Spec.Cohort) - if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, nil, c.localQueueMetrics); err != nil { + if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, nil); err != nil { return nil, err } @@ -228,8 +219,8 @@ func (c *Cache) updateClusterQueues() sets.Set[string] { // We call update on all ClusterQueues irrespective of which CQ actually use this flavor // because it is not expensive to do so, and is not worth tracking which ClusterQueues use // which flavors. - cq.UpdateWithFlavors(c.resourceFlavors, c.localQueueMetrics) - cq.updateWithAdmissionChecks(c.admissionChecks, c.localQueueMetrics) + cq.UpdateWithFlavors(c.resourceFlavors) + cq.updateWithAdmissionChecks(c.admissionChecks) curStatus := cq.Status if prevStatus == pending && curStatus == active { cqs.Insert(cq.Name) @@ -421,7 +412,7 @@ func (c *Cache) UpdateClusterQueue(cq *kueue.ClusterQueue) error { } oldParent := cqImpl.Parent() c.hm.UpdateClusterQueueEdge(cq.Name, cq.Spec.Cohort) - if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, oldParent, c.localQueueMetrics); err != nil { + if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, oldParent); err != nil { return err } for _, qImpl := range cqImpl.localQueues { @@ -440,7 +431,7 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) { if !ok { return } - if c.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { for _, q := range c.hm.ClusterQueues[cq.Name].localQueues { metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(q.key)) } @@ -528,13 +519,13 @@ func (c *Cache) addOrUpdateWorkload(w *kueue.Workload) bool { c.cleanupAssumedState(w) if _, exist := clusterQueue.Workloads[workload.Key(w)]; exist { - clusterQueue.deleteWorkload(w, c.localQueueMetrics) + clusterQueue.deleteWorkload(w) } if c.podsReadyTracking { c.podsReadyCond.Broadcast() } - return clusterQueue.addWorkload(w, c.localQueueMetrics) == nil + return clusterQueue.addWorkload(w) == nil } func (c *Cache) UpdateWorkload(oldWl, newWl *kueue.Workload) error { @@ -545,7 +536,7 @@ func (c *Cache) UpdateWorkload(oldWl, newWl *kueue.Workload) error { if !ok { return errors.New("old ClusterQueue doesn't exist") } - cq.deleteWorkload(oldWl, c.localQueueMetrics) + cq.deleteWorkload(oldWl) } c.cleanupAssumedState(oldWl) @@ -559,7 +550,7 @@ func (c *Cache) UpdateWorkload(oldWl, newWl *kueue.Workload) error { if c.podsReadyTracking { c.podsReadyCond.Broadcast() } - return cq.addWorkload(newWl, c.localQueueMetrics) + return cq.addWorkload(newWl) } func (c *Cache) DeleteWorkload(w *kueue.Workload) error { @@ -573,7 +564,7 @@ func (c *Cache) DeleteWorkload(w *kueue.Workload) error { c.cleanupAssumedState(w) - cq.deleteWorkload(w, c.localQueueMetrics) + cq.deleteWorkload(w) if c.podsReadyTracking { c.podsReadyCond.Broadcast() } @@ -615,7 +606,7 @@ func (c *Cache) AssumeWorkload(w *kueue.Workload) error { return ErrCqNotFound } - if err := cq.addWorkload(w, c.localQueueMetrics); err != nil { + if err := cq.addWorkload(w); err != nil { return err } c.assumedWorkloads[k] = string(w.Status.Admission.ClusterQueue) @@ -639,7 +630,7 @@ func (c *Cache) ForgetWorkload(w *kueue.Workload) error { if !ok { return ErrCqNotFound } - cq.deleteWorkload(w, c.localQueueMetrics) + cq.deleteWorkload(w) if c.podsReadyTracking { c.podsReadyCond.Broadcast() } @@ -805,7 +796,7 @@ func (c *Cache) cleanupAssumedState(w *kueue.Workload) { // one, then we should also clean up the assumed one. if workload.HasQuotaReservation(w) && assumedCQName != string(w.Status.Admission.ClusterQueue) { if assumedCQ, exist := c.hm.ClusterQueues[assumedCQName]; exist { - assumedCQ.deleteWorkload(w, c.localQueueMetrics) + assumedCQ.deleteWorkload(w) } } delete(c.assumedWorkloads, k) diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index a6ef1d8e6a..b4d4310da4 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -145,7 +145,7 @@ var defaultPreemption = kueue.ClusterQueuePreemption{ var defaultFlavorFungibility = kueue.FlavorFungibility{WhenCanBorrow: kueue.Borrow, WhenCanPreempt: kueue.TryNextFlavor} -func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort, lqMetrics bool) error { +func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort) error { if c.updateQuotasAndResourceGroups(in.Spec.ResourceGroups) || oldParent != c.Parent() { if oldParent != nil && oldParent != c.Parent() { // ignore error when old Cohort has cycle. @@ -179,8 +179,8 @@ func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, i c.Preemption = defaultPreemption } - c.UpdateWithFlavors(resourceFlavors, lqMetrics) - c.updateWithAdmissionChecks(admissionChecks, lqMetrics) + c.UpdateWithFlavors(resourceFlavors) + c.updateWithAdmissionChecks(admissionChecks) if in.Spec.FlavorFungibility != nil { c.FlavorFungibility = *in.Spec.FlavorFungibility @@ -230,7 +230,7 @@ func (c *clusterQueue) updateQuotasAndResourceGroups(in []kueue.ResourceGroup) b !equality.Semantic.DeepEqual(oldQuotas, c.resourceNode.Quotas) } -func (c *clusterQueue) updateQueueStatus(lqMetrics bool) { +func (c *clusterQueue) updateQueueStatus() { status := active if c.isStopped || len(c.missingFlavors) > 0 || @@ -250,7 +250,7 @@ func (c *clusterQueue) updateQueueStatus(lqMetrics bool) { if status != c.Status { c.Status = status metrics.ReportClusterQueueStatus(c.Name, c.Status) - if lqMetrics { + if features.Enabled(features.LocalQueueMetrics) { for _, lq := range c.localQueues { metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(lq.key), c.Status) } @@ -345,9 +345,9 @@ func (c *clusterQueue) isTASViolated() bool { // UpdateWithFlavors updates a ClusterQueue based on the passed ResourceFlavors set. // Exported only for testing. -func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, lqMetrics bool) { +func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) { c.updateLabelKeys(flavors) - c.updateQueueStatus(lqMetrics) + c.updateQueueStatus() } func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) { @@ -380,7 +380,7 @@ func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference } // updateWithAdmissionChecks updates a ClusterQueue based on the passed AdmissionChecks set. -func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck, lqMetrics bool) { +func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) { checksPerController := make(map[string][]string, len(c.AdmissionChecks)) singleInstanceControllers := sets.New[string]() multiKueueAdmissionChecks := sets.New[string]() @@ -476,18 +476,18 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec } if update { - c.updateQueueStatus(lqMetrics) + c.updateQueueStatus() } } -func (c *clusterQueue) addWorkload(w *kueue.Workload, lqMetrics bool) error { +func (c *clusterQueue) addWorkload(w *kueue.Workload) error { k := workload.Key(w) if _, exist := c.Workloads[k]; exist { return errors.New("workload already exists in ClusterQueue") } wi := workload.NewInfo(w, c.workloadInfoOptions...) c.Workloads[k] = wi - c.updateWorkloadUsage(wi, 1, lqMetrics) + c.updateWorkloadUsage(wi, 1) if c.podsReadyTracking && !apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadPodsReady) { c.WorkloadsNotReady.Insert(k) } @@ -495,13 +495,13 @@ func (c *clusterQueue) addWorkload(w *kueue.Workload, lqMetrics bool) error { return nil } -func (c *clusterQueue) deleteWorkload(w *kueue.Workload, lqMetrics bool) { +func (c *clusterQueue) deleteWorkload(w *kueue.Workload) { k := workload.Key(w) wi, exist := c.Workloads[k] if !exist { return } - c.updateWorkloadUsage(wi, -1, lqMetrics) + c.updateWorkloadUsage(wi, -1) if c.podsReadyTracking && !apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadPodsReady) { c.WorkloadsNotReady.Delete(k) } @@ -519,7 +519,6 @@ func (c *clusterQueue) reportActiveWorkloads() { } func (q *queue) reportActiveWorkloads() { - // KTODO: report local queue metrics qKeySlice := strings.Split(q.key, "/") metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.admittedWorkloads)) metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.reservingWorkloads)) @@ -527,7 +526,7 @@ func (q *queue) reportActiveWorkloads() { // updateWorkloadUsage updates the usage of the ClusterQueue for the workload // and the number of admitted workloads for local queues. -func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64, lqMetrics bool) { +func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) { admitted := workload.IsAdmitted(wi.Obj) frUsage := wi.FlavorResourceUsage() for fr, q := range frUsage { @@ -562,7 +561,7 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64, lqMetrics updateFlavorUsage(frUsage, lq.admittedUsage, m) lq.admittedWorkloads += int(m) } - if lqMetrics { + if features.Enabled(features.LocalQueueMetrics) { lq.reportActiveWorkloads() } } @@ -609,14 +608,18 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error { } } c.localQueues[qKey] = qImpl - qImpl.reportActiveWorkloads() - metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(qKey), c.Status) + if features.Enabled(features.LocalQueueMetrics) { + qImpl.reportActiveWorkloads() + metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(qKey), c.Status) + } return nil } func (c *clusterQueue) deleteLocalQueue(q *kueue.LocalQueue) { qKey := queueKey(q) - metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(qKey)) + if features.Enabled(features.LocalQueueMetrics) { + metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(qKey)) + } delete(c.localQueues, qKey) } diff --git a/pkg/cache/clusterqueue_test.go b/pkg/cache/clusterqueue_test.go index 76495aa3c6..f0a4483209 100644 --- a/pkg/cache/clusterqueue_test.go +++ b/pkg/cache/clusterqueue_test.go @@ -85,7 +85,7 @@ func TestClusterQueueUpdateWithFlavors(t *testing.T) { } cq.Status = tc.curStatus - cq.UpdateWithFlavors(tc.flavors, false) + cq.UpdateWithFlavors(tc.flavors) if cq.Status != tc.wantStatus { t.Fatalf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status) @@ -911,7 +911,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { cq.flavorIndependentAdmissionCheckAppliedPerFlavor = []string{"not-on-flavor"} } } - cq.updateWithAdmissionChecks(tc.admissionChecks, false) + cq.updateWithAdmissionChecks(tc.admissionChecks) if cq.Status != tc.wantStatus { t.Errorf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status) diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go index 91aa802286..b91faf6000 100644 --- a/pkg/controller/core/core.go +++ b/pkg/controller/core/core.go @@ -42,7 +42,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache if err := acRec.SetupWithManager(mgr, cfg); err != nil { return "AdmissionCheck", err } - qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc, LqControllerWithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics)) + qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc) if err := qRec.SetupWithManager(mgr, cfg); err != nil { return "LocalQueue", err } @@ -58,7 +58,6 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache cc, WithQueueVisibilityUpdateInterval(queueVisibilityUpdateInterval(cfg)), WithQueueVisibilityClusterQueuesMaxCount(queueVisibilityClusterQueuesMaxCount(cfg)), - WithReportResourceMetrics(cfg.Metrics.EnableClusterQueueResources), WithFairSharing(fairSharingEnabled), WithWatchers(rfRec, acRec), ) @@ -80,7 +79,6 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache mgr.GetEventRecorderFor(constants.WorkloadControllerName), WithWorkloadUpdateWatchers(qRec, cqRec), WithWaitForPodsReady(waitForPodsReady(cfg.WaitForPodsReady)), - WlControllerWithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics), ).SetupWithManager(mgr, cfg); err != nil { return "Workload", err } diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index 3092083967..ace9e9a101 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/util/resource" @@ -58,12 +59,11 @@ const ( // LocalQueueReconciler reconciles a LocalQueue object type LocalQueueReconciler struct { - client client.Client - log logr.Logger - queues *queue.Manager - cache *cache.Cache - wlUpdateCh chan event.GenericEvent - localQueueMetricsEnabled bool + client client.Client + log logr.Logger + queues *queue.Manager + cache *cache.Cache + wlUpdateCh chan event.GenericEvent } type LocalQueueReconcilerOptions struct { @@ -74,12 +74,6 @@ type LocalQueueReconcilerOption func(*LocalQueueReconcilerOptions) var defaultLQOptions = LocalQueueReconcilerOptions{} -func LqControllerWithLocalQueueMetricsEnabled(enabled bool) LocalQueueReconcilerOption { - return func(o *LocalQueueReconcilerOptions) { - o.LocalQueueMetricsEnabled = enabled - } -} - func NewLocalQueueReconciler( client client.Client, queues *queue.Manager, @@ -91,12 +85,11 @@ func NewLocalQueueReconciler( opt(&options) } return &LocalQueueReconciler{ - log: ctrl.Log.WithName("localqueue-reconciler"), - queues: queues, - cache: cache, - client: client, - wlUpdateCh: make(chan event.GenericEvent, updateChBuffer), - localQueueMetricsEnabled: options.LocalQueueMetricsEnabled, + log: ctrl.Log.WithName("localqueue-reconciler"), + queues: queues, + cache: cache, + client: client, + wlUpdateCh: make(chan event.GenericEvent, updateChBuffer), } } @@ -169,7 +162,7 @@ func (r *LocalQueueReconciler) Create(e event.CreateEvent) bool { log.Error(err, "Failed to add localQueue to the cache") } - if r.localQueueMetricsEnabled { + if features.Enabled(features.LocalQueueMetrics) { recordLocalQueueUsageMetrics(q) } @@ -183,7 +176,7 @@ func (r *LocalQueueReconciler) Delete(e event.DeleteEvent) bool { return true } - if r.localQueueMetricsEnabled { + if features.Enabled(features.LocalQueueMetrics) { metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(q)) } @@ -227,7 +220,7 @@ func (r *LocalQueueReconciler) Update(e event.UpdateEvent) bool { } r.queues.DeleteLocalQueue(oldLq) - if r.localQueueMetricsEnabled { + if features.Enabled(features.LocalQueueMetrics) { updateLocalQueueResourceMetrics(newLq) } diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 715f03ecbf..79add49116 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -48,6 +48,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/queue" utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck" @@ -70,7 +71,6 @@ type waitForPodsReadyConfig struct { type options struct { watchers []WorkloadUpdateWatcher waitForPodsReadyConfig *waitForPodsReadyConfig - localQueueMetrics bool } // Option configures the reconciler. @@ -90,12 +90,6 @@ func WithWorkloadUpdateWatchers(value ...WorkloadUpdateWatcher) Option { } } -func WlControllerWithLocalQueueMetricsEnabled(enabled bool) Option { - return func(o *options) { - o.localQueueMetrics = enabled - } -} - var defaultOptions = options{} type WorkloadUpdateWatcher interface { @@ -104,15 +98,14 @@ type WorkloadUpdateWatcher interface { // WorkloadReconciler reconciles a Workload object type WorkloadReconciler struct { - log logr.Logger - queues *queue.Manager - cache *cache.Cache - client client.Client - watchers []WorkloadUpdateWatcher - waitForPodsReady *waitForPodsReadyConfig - localQueueMetrics bool - recorder record.EventRecorder - clock clock.Clock + log logr.Logger + queues *queue.Manager + cache *cache.Cache + client client.Client + watchers []WorkloadUpdateWatcher + waitForPodsReady *waitForPodsReadyConfig + recorder record.EventRecorder + clock clock.Clock } func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, recorder record.EventRecorder, opts ...Option) *WorkloadReconciler { @@ -122,15 +115,14 @@ func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *c } return &WorkloadReconciler{ - log: ctrl.Log.WithName("workload-reconciler"), - client: client, - queues: queues, - cache: cache, - watchers: options.watchers, - waitForPodsReady: options.waitForPodsReadyConfig, - localQueueMetrics: options.localQueueMetrics, - recorder: recorder, - clock: realClock, + log: ctrl.Log.WithName("workload-reconciler"), + client: client, + queues: queues, + cache: cache, + watchers: options.watchers, + waitForPodsReady: options.waitForPodsReadyConfig, + recorder: recorder, + clock: realClock, } } @@ -218,7 +210,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, fmt.Errorf("setting eviction: %w", err) } if evicted && wl.Status.Admission != nil { - workload.ReportEvictedWorkload(r.recorder, &wl, string(wl.Status.Admission.ClusterQueue), reason, message, r.localQueueMetrics) + workload.ReportEvictedWorkload(r.recorder, &wl, string(wl.Status.Admission.ClusterQueue), reason, message) } return ctrl.Result{}, nil } @@ -267,7 +259,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, quotaReservedWaitTime.Seconds()) metrics.AdmittedWorkload(kueue.ClusterQueueReference(cqName), queuedWaitTime) metrics.AdmissionChecksWaitTime(kueue.ClusterQueueReference(cqName), quotaReservedWaitTime) - if r.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(&wl), queuedWaitTime) metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(&wl), quotaReservedWaitTime) } @@ -403,7 +395,7 @@ func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl return false, client.IgnoreNotFound(err) } cqName, _ := r.queues.ClusterQueueForWorkload(wl) - workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByAdmissionCheck, message, r.localQueueMetrics) + workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByAdmissionCheck, message) return true, nil } @@ -441,7 +433,7 @@ func (r *WorkloadReconciler) reconcileOnLocalQueueActiveState(ctx context.Contex cqName := string(lq.Spec.ClusterQueue) if slices.Contains(r.queues.GetClusterQueueNames(), cqName) { metrics.ReportEvictedWorkloads(cqName, kueue.WorkloadEvictedByLocalQueueStopped) - if r.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), kueue.WorkloadEvictedByLocalQueueStopped) } } @@ -489,7 +481,7 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont workload.ResetChecksOnEviction(wl, r.clock.Now()) err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true) if err == nil { - workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByClusterQueueStopped, message, r.localQueueMetrics) + workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByClusterQueueStopped, message) } return true, client.IgnoreNotFound(err) } @@ -569,7 +561,7 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true) if err == nil { cqName, _ := r.queues.ClusterQueueForWorkload(wl) - workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByPodsReadyTimeout, message, r.localQueueMetrics) + workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByPodsReadyTimeout, message) } return 0, client.IgnoreNotFound(err) } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index bacb80b625..83758de6dd 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -148,6 +148,12 @@ const ( // // Workloads keeps allocated quota and preserves QuotaReserved=True when ProvisioningRequest fails KeepQuotaForProvReqRetry featuregate.Feature = "KeepQuotaForProvReqRetry" + + // owner: @kpostoffice + // alpha: v0.10 + // + // Enabled gathering of LocalQueue metrics + LocalQueueMetrics featuregate.Feature = "LocalQueueMetrics" ) func init() { @@ -177,6 +183,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ ExposeFlavorsInLocalQueue: {Default: true, PreRelease: featuregate.Beta}, AdmissionCheckValidationRules: {Default: false, PreRelease: featuregate.Deprecated}, KeepQuotaForProvReqRetry: {Default: false, PreRelease: featuregate.Deprecated}, + LocalQueueMetrics: {Default: false, PreRelease: featuregate.Alpha}, } func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) { diff --git a/pkg/queue/cluster_queue.go b/pkg/queue/cluster_queue.go index 74ec3388e1..7c201c7300 100644 --- a/pkg/queue/cluster_queue.go +++ b/pkg/queue/cluster_queue.go @@ -300,39 +300,6 @@ func (c *ClusterQueue) PendingActive() int { return result } -func (m *Manager) PendingActiveInLocalQueue(lq *LocalQueue) int { - c := m.getClusterQueue(lq.ClusterQueue) - result := 0 - if c == nil { - return 0 - } - for _, wl := range c.heap.List() { - wlLqKey := workload.QueueKey(wl.Obj) - if wlLqKey == lq.Key { - result++ - } - } - if workloadKey(c.inflight) == lq.Key { - result++ - } - return result -} - -func (m *Manager) PendingInadmissibleInLocalQueue(lq *LocalQueue) int { - c := m.getClusterQueue(lq.ClusterQueue) - if c == nil { - return 0 - } - result := 0 - for _, wl := range c.inadmissibleWorkloads { - wlLqKey := workload.QueueKey(wl.Obj) - if wlLqKey == lq.Key { - result++ - } - } - return result -} - // PendingInadmissible returns the number of inadmissible pending workloads, // workloads that were already tried and are waiting for cluster conditions // to change to potentially become admissible. diff --git a/pkg/queue/local_queue.go b/pkg/queue/local_queue.go index 8fd13bf00c..2c2be3f4b9 100644 --- a/pkg/queue/local_queue.go +++ b/pkg/queue/local_queue.go @@ -53,3 +53,36 @@ func (q *LocalQueue) AddOrUpdate(info *workload.Info) { key := workload.Key(info.Obj) q.items[key] = info } + +func (m *Manager) PendingActiveInLocalQueue(lq *LocalQueue) int { + c := m.getClusterQueue(lq.ClusterQueue) + result := 0 + if c == nil { + return 0 + } + for _, wl := range c.heap.List() { + wlLqKey := workload.QueueKey(wl.Obj) + if wlLqKey == lq.Key { + result++ + } + } + if workloadKey(c.inflight) == lq.Key { + result++ + } + return result +} + +func (m *Manager) PendingInadmissibleInLocalQueue(lq *LocalQueue) int { + c := m.getClusterQueue(lq.ClusterQueue) + if c == nil { + return 0 + } + result := 0 + for _, wl := range c.inadmissibleWorkloads { + wlLqKey := workload.QueueKey(wl.Obj) + if wlLqKey == lq.Key { + result++ + } + } + return result +} diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index 419fccdbc0..661822e8ee 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -32,6 +32,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/hierarchy" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/workload" @@ -55,7 +56,6 @@ type Option func(*options) var defaultOptions = options{ podsReadyRequeuingTimestamp: config.EvictionTimestamp, workloadInfoOptions: []workload.InfoOption{}, - localQueueMetrics: false, } // WithPodsReadyRequeuingTimestamp sets the timestamp that is used for ordering @@ -80,12 +80,6 @@ func WithResourceTransformations(transforms []config.ResourceTransformation) Opt } } -func WithLocalQueueMetrics(enabled bool) Option { - return func(o *options) { - o.localQueueMetrics = enabled - } -} - type Manager struct { sync.RWMutex cond sync.Cond @@ -101,8 +95,6 @@ type Manager struct { workloadInfoOptions []workload.InfoOption - localQueueMetrics bool - hm hierarchy.Manager[*ClusterQueue, *cohort] } @@ -121,7 +113,6 @@ func NewManager(client client.Client, checker StatusChecker, opts ...Option) *Ma PodsReadyRequeuingTimestamp: options.podsReadyRequeuingTimestamp, }, workloadInfoOptions: options.workloadInfoOptions, - localQueueMetrics: options.localQueueMetrics, hm: hierarchy.NewManager[*ClusterQueue, *cohort](newCohort), } m.cond.L = &m.RWMutex @@ -172,7 +163,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e added := cqImpl.AddFromLocalQueue(qImpl) addedWorkloads = addedWorkloads || added } - if m.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { m.reportLQPendingWorkloads(qImpl) } } @@ -205,7 +196,7 @@ func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue if (specUpdated && m.requeueWorkloadsCQ(ctx, cqImpl)) || (!oldActive && cqImpl.Active()) { m.reportPendingWorkloads(cq.Name, cqImpl) for _, q := range m.localQueues { - if m.localQueueMetrics && q.ClusterQueue == cq.Name { + if features.Enabled(features.LocalQueueMetrics) && q.ClusterQueue == cq.Name { m.reportLQPendingWorkloads(q) } } @@ -288,7 +279,7 @@ func (m *Manager) DeleteLocalQueue(q *kueue.LocalQueue) { if cq != nil { cq.DeleteFromLocalQueue(qImpl) } - if m.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { metrics.ClearLocalQueueMetrics(metrics.LQRefFromLocalQueueKey(key)) } delete(m.localQueues, key) @@ -360,7 +351,7 @@ func (m *Manager) AddOrUpdateWorkloadWithoutLock(w *kueue.Workload) bool { return false } cq.PushOrUpdate(wInfo) - if m.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { m.reportLQPendingWorkloads(q) } m.reportPendingWorkloads(q.ClusterQueue, cq) @@ -396,7 +387,7 @@ func (m *Manager) RequeueWorkload(ctx context.Context, info *workload.Info, reas added := cq.RequeueIfNotPresent(info, reason) m.reportPendingWorkloads(q.ClusterQueue, cq) - if m.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { m.reportLQPendingWorkloads(q) } if added { @@ -417,7 +408,7 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.Workload, qKey return } delete(q.items, workload.Key(w)) - if m.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { m.reportLQPendingWorkloads(q) } cq := m.hm.ClusterQueues[q.ClusterQueue] @@ -589,7 +580,7 @@ func (m *Manager) heads() []workload.Info { workloads = append(workloads, wlCopy) q := m.localQueues[workload.QueueKey(wl.Obj)] delete(q.items, workload.Key(wl.Obj)) - if m.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { m.reportLQPendingWorkloads(q) } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 5f11cc1920..98d4d1116b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -78,8 +78,6 @@ type Scheduler struct { // attemptCount identifies the number of scheduling attempt in logs, from the last restart. attemptCount int64 - localQueueMetrics bool - // Stubs. applyAdmission func(context.Context, *kueue.Workload) error } @@ -88,7 +86,6 @@ type options struct { podsReadyRequeuingTimestamp config.RequeuingTimestamp fairSharing config.FairSharing clock clock.Clock - localQueueMetrics bool } // Option configures the reconciler. @@ -97,7 +94,6 @@ type Option func(*options) var defaultOptions = options{ podsReadyRequeuingTimestamp: config.EvictionTimestamp, clock: realClock, - localQueueMetrics: false, } // WithPodsReadyRequeuingTimestamp sets the timestamp that is used for ordering @@ -122,12 +118,6 @@ func WithClock(_ testing.TB, c clock.Clock) Option { } } -func WithLocalQueueMetrics(enabled bool) Option { - return func(o *options) { - o.localQueueMetrics = enabled - } -} - func New(queues *queue.Manager, cache *cache.Cache, cl client.Client, recorder record.EventRecorder, opts ...Option) *Scheduler { options := defaultOptions for _, opt := range opts { @@ -620,18 +610,18 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, cq *cache.ClusterQueueS waitTime := workload.QueuedWaitTime(newWorkload) s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "QuotaReserved", "Quota reserved in ClusterQueue %v, wait time since queued was %.0fs", admission.ClusterQueue, waitTime.Seconds()) metrics.QuotaReservedWorkload(admission.ClusterQueue, waitTime) - if s.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { metrics.LocalQueueQuotaReservedWorkload(metrics.LQRefFromWorkload(newWorkload), waitTime) } if workload.IsAdmitted(newWorkload) { s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was 0s", admission.ClusterQueue) metrics.AdmittedWorkload(admission.ClusterQueue, waitTime) - if s.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(newWorkload), waitTime) } if len(newWorkload.Status.AdmissionChecks) > 0 { metrics.AdmissionChecksWaitTime(admission.ClusterQueue, 0) - if s.localQueueMetrics { + if features.Enabled(features.LocalQueueMetrics) { metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(newWorkload), 0) } } diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 20940758f2..98b48e974e 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -845,9 +845,9 @@ func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionCh return acNames } -func ReportEvictedWorkload(recorder record.EventRecorder, wl *kueue.Workload, cqName, reason, message string, lqMetrics bool) { +func ReportEvictedWorkload(recorder record.EventRecorder, wl *kueue.Workload, cqName, reason, message string) { metrics.ReportEvictedWorkloads(cqName, reason) - if lqMetrics { + if features.Enabled(features.LocalQueueMetrics) { metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), reason) } recorder.Event(wl, corev1.EventTypeNormal, fmt.Sprintf("%sDueTo%s", kueue.WorkloadEvicted, reason), message)