From 9493af33e0b419e54f841730fcd7f1af1b57df56 Mon Sep 17 00:00:00 2001 From: Kevin Date: Wed, 27 Nov 2024 14:04:24 -0500 Subject: [PATCH] cleanup todos and add more feature gates Signed-off-by: Kevin --- cmd/kueue/main.go | 4 ++ pkg/cache/cache.go | 10 ++-- pkg/cache/clusterqueue.go | 26 +++++---- pkg/cache/clusterqueue_test.go | 4 +- pkg/controller/core/core.go | 3 +- pkg/controller/core/localqueue_controller.go | 2 +- pkg/controller/core/workload_controller.go | 59 ++++++++++++-------- pkg/metrics/metrics.go | 31 ++++++---- pkg/queue/cluster_queue.go | 5 -- pkg/queue/manager.go | 4 +- pkg/workload/workload.go | 7 ++- 11 files changed, 89 insertions(+), 66 deletions(-) diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 655a323844..d0c627846f 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -131,6 +131,10 @@ func main() { metrics.Register() + if cfg.Metrics.EnableLocalQueueMetrics { + metrics.RegisterLQMetrics() + } + kubeConfig := ctrl.GetConfigOrDie() if kubeConfig.UserAgent == "" { kubeConfig.UserAgent = useragent.Default() diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index d2b759e835..a26d29b871 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -156,7 +156,7 @@ func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*clusterQueue, error) { } c.hm.AddClusterQueue(cqImpl) c.hm.UpdateClusterQueueEdge(cq.Name, cq.Spec.Cohort) - if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, nil); err != nil { + if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, nil, c.localQueueMetrics); err != nil { return nil, err } @@ -228,8 +228,8 @@ func (c *Cache) updateClusterQueues() sets.Set[string] { // We call update on all ClusterQueues irrespective of which CQ actually use this flavor // because it is not expensive to do so, and is not worth tracking which ClusterQueues use // which flavors. - cq.UpdateWithFlavors(c.resourceFlavors) - cq.updateWithAdmissionChecks(c.admissionChecks) + cq.UpdateWithFlavors(c.resourceFlavors, c.localQueueMetrics) + cq.updateWithAdmissionChecks(c.admissionChecks, c.localQueueMetrics) curStatus := cq.Status if prevStatus == pending && curStatus == active { cqs.Insert(cq.Name) @@ -421,7 +421,7 @@ func (c *Cache) UpdateClusterQueue(cq *kueue.ClusterQueue) error { } oldParent := cqImpl.Parent() c.hm.UpdateClusterQueueEdge(cq.Name, cq.Spec.Cohort) - if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, oldParent); err != nil { + if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, oldParent, c.localQueueMetrics); err != nil { return err } for _, qImpl := range cqImpl.localQueues { @@ -442,7 +442,7 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) { } if c.localQueueMetrics { for _, q := range c.hm.ClusterQueues[cq.Name].localQueues { - metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromWorkloadKey(q.key)) + metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(q.key)) } } c.hm.DeleteClusterQueue(cq.Name) diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index 07479f3526..074ec4218f 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -132,7 +132,7 @@ var defaultPreemption = kueue.ClusterQueuePreemption{ var defaultFlavorFungibility = kueue.FlavorFungibility{WhenCanBorrow: kueue.Borrow, WhenCanPreempt: kueue.TryNextFlavor} -func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort) error { +func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort, lqMetrics bool) error { if c.updateQuotasAndResourceGroups(in.Spec.ResourceGroups) || oldParent != c.Parent() { if oldParent != nil && oldParent != c.Parent() { // ignore error when old Cohort has cycle. @@ -166,8 +166,8 @@ func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, i c.Preemption = defaultPreemption } - c.UpdateWithFlavors(resourceFlavors) - c.updateWithAdmissionChecks(admissionChecks) + c.UpdateWithFlavors(resourceFlavors, lqMetrics) + c.updateWithAdmissionChecks(admissionChecks, lqMetrics) if in.Spec.FlavorFungibility != nil { c.FlavorFungibility = *in.Spec.FlavorFungibility @@ -217,7 +217,7 @@ func (c *clusterQueue) updateQuotasAndResourceGroups(in []kueue.ResourceGroup) b !equality.Semantic.DeepEqual(oldQuotas, c.resourceNode.Quotas) } -func (c *clusterQueue) updateQueueStatus() { +func (c *clusterQueue) updateQueueStatus(lqMetrics bool) { status := active if c.isStopped || len(c.missingFlavors) > 0 || @@ -237,8 +237,10 @@ func (c *clusterQueue) updateQueueStatus() { if status != c.Status { c.Status = status metrics.ReportClusterQueueStatus(c.Name, c.Status) - for _, lq := range c.localQueues { - metrics.ReportLocalQueueStatus(metrics.LQRefFromWorkloadKey(lq.key), c.Status) + if lqMetrics { + for _, lq := range c.localQueues { + metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(lq.key), c.Status) + } } } } @@ -330,9 +332,9 @@ func (c *clusterQueue) isTASViolated() bool { // UpdateWithFlavors updates a ClusterQueue based on the passed ResourceFlavors set. // Exported only for testing. -func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) { +func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, lqMetrics bool) { c.updateLabelKeys(flavors) - c.updateQueueStatus() + c.updateQueueStatus(lqMetrics) } func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) { @@ -365,7 +367,7 @@ func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference } // updateWithAdmissionChecks updates a ClusterQueue based on the passed AdmissionChecks set. -func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) { +func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck, lqMetrics bool) { checksPerController := make(map[string][]string, len(c.AdmissionChecks)) singleInstanceControllers := sets.New[string]() multiKueueAdmissionChecks := sets.New[string]() @@ -461,7 +463,7 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec } if update { - c.updateQueueStatus() + c.updateQueueStatus(lqMetrics) } } @@ -595,13 +597,13 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error { } c.localQueues[qKey] = qImpl qImpl.reportActiveWorkloads() - metrics.ReportLocalQueueStatus(metrics.LQRefFromWorkloadKey(qKey), c.Status) + metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(qKey), c.Status) return nil } func (c *clusterQueue) deleteLocalQueue(q *kueue.LocalQueue) { qKey := queueKey(q) - metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromWorkloadKey(qKey)) + metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(qKey)) delete(c.localQueues, qKey) } diff --git a/pkg/cache/clusterqueue_test.go b/pkg/cache/clusterqueue_test.go index 39516ed407..d8898d6d54 100644 --- a/pkg/cache/clusterqueue_test.go +++ b/pkg/cache/clusterqueue_test.go @@ -85,7 +85,7 @@ func TestClusterQueueUpdateWithFlavors(t *testing.T) { } cq.Status = tc.curStatus - cq.UpdateWithFlavors(tc.flavors) + cq.UpdateWithFlavors(tc.flavors, false) if cq.Status != tc.wantStatus { t.Fatalf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status) @@ -608,7 +608,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { cq.flavorIndependentAdmissionCheckAppliedPerFlavor = []string{"not-on-flavor"} } } - cq.updateWithAdmissionChecks(tc.admissionChecks) + cq.updateWithAdmissionChecks(tc.admissionChecks, false) if cq.Status != tc.wantStatus { t.Errorf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status) diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go index 617cc73b0b..91aa802286 100644 --- a/pkg/controller/core/core.go +++ b/pkg/controller/core/core.go @@ -42,7 +42,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache if err := acRec.SetupWithManager(mgr, cfg); err != nil { return "AdmissionCheck", err } - qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc, WithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics)) + qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc, LqControllerWithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics)) if err := qRec.SetupWithManager(mgr, cfg); err != nil { return "LocalQueue", err } @@ -80,6 +80,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache mgr.GetEventRecorderFor(constants.WorkloadControllerName), WithWorkloadUpdateWatchers(qRec, cqRec), WithWaitForPodsReady(waitForPodsReady(cfg.WaitForPodsReady)), + WlControllerWithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics), ).SetupWithManager(mgr, cfg); err != nil { return "Workload", err } diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index 832ca08244..3092083967 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -74,7 +74,7 @@ type LocalQueueReconcilerOption func(*LocalQueueReconcilerOptions) var defaultLQOptions = LocalQueueReconcilerOptions{} -func WithLocalQueueMetricsEnabled(enabled bool) LocalQueueReconcilerOption { +func LqControllerWithLocalQueueMetricsEnabled(enabled bool) LocalQueueReconcilerOption { return func(o *LocalQueueReconcilerOptions) { o.LocalQueueMetricsEnabled = enabled } diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 48a6a7aeed..715f03ecbf 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -70,6 +70,7 @@ type waitForPodsReadyConfig struct { type options struct { watchers []WorkloadUpdateWatcher waitForPodsReadyConfig *waitForPodsReadyConfig + localQueueMetrics bool } // Option configures the reconciler. @@ -89,6 +90,12 @@ func WithWorkloadUpdateWatchers(value ...WorkloadUpdateWatcher) Option { } } +func WlControllerWithLocalQueueMetricsEnabled(enabled bool) Option { + return func(o *options) { + o.localQueueMetrics = enabled + } +} + var defaultOptions = options{} type WorkloadUpdateWatcher interface { @@ -97,14 +104,15 @@ type WorkloadUpdateWatcher interface { // WorkloadReconciler reconciles a Workload object type WorkloadReconciler struct { - log logr.Logger - queues *queue.Manager - cache *cache.Cache - client client.Client - watchers []WorkloadUpdateWatcher - waitForPodsReady *waitForPodsReadyConfig - recorder record.EventRecorder - clock clock.Clock + log logr.Logger + queues *queue.Manager + cache *cache.Cache + client client.Client + watchers []WorkloadUpdateWatcher + waitForPodsReady *waitForPodsReadyConfig + localQueueMetrics bool + recorder record.EventRecorder + clock clock.Clock } func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, recorder record.EventRecorder, opts ...Option) *WorkloadReconciler { @@ -114,14 +122,15 @@ func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *c } return &WorkloadReconciler{ - log: ctrl.Log.WithName("workload-reconciler"), - client: client, - queues: queues, - cache: cache, - watchers: options.watchers, - waitForPodsReady: options.waitForPodsReadyConfig, - recorder: recorder, - clock: realClock, + log: ctrl.Log.WithName("workload-reconciler"), + client: client, + queues: queues, + cache: cache, + watchers: options.watchers, + waitForPodsReady: options.waitForPodsReadyConfig, + localQueueMetrics: options.localQueueMetrics, + recorder: recorder, + clock: realClock, } } @@ -209,7 +218,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, fmt.Errorf("setting eviction: %w", err) } if evicted && wl.Status.Admission != nil { - workload.ReportEvictedWorkload(r.recorder, &wl, string(wl.Status.Admission.ClusterQueue), reason, message) + workload.ReportEvictedWorkload(r.recorder, &wl, string(wl.Status.Admission.ClusterQueue), reason, message, r.localQueueMetrics) } return ctrl.Result{}, nil } @@ -258,8 +267,10 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, quotaReservedWaitTime.Seconds()) metrics.AdmittedWorkload(kueue.ClusterQueueReference(cqName), queuedWaitTime) metrics.AdmissionChecksWaitTime(kueue.ClusterQueueReference(cqName), quotaReservedWaitTime) - metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(&wl), queuedWaitTime) - metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(&wl), quotaReservedWaitTime) + if r.localQueueMetrics { + metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(&wl), queuedWaitTime) + metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(&wl), quotaReservedWaitTime) + } } return ctrl.Result{}, nil } @@ -392,7 +403,7 @@ func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl return false, client.IgnoreNotFound(err) } cqName, _ := r.queues.ClusterQueueForWorkload(wl) - workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByAdmissionCheck, message) + workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByAdmissionCheck, message, r.localQueueMetrics) return true, nil } @@ -430,7 +441,9 @@ func (r *WorkloadReconciler) reconcileOnLocalQueueActiveState(ctx context.Contex cqName := string(lq.Spec.ClusterQueue) if slices.Contains(r.queues.GetClusterQueueNames(), cqName) { metrics.ReportEvictedWorkloads(cqName, kueue.WorkloadEvictedByLocalQueueStopped) - metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), kueue.WorkloadEvictedByLocalQueueStopped) + if r.localQueueMetrics { + metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), kueue.WorkloadEvictedByLocalQueueStopped) + } } } return true, client.IgnoreNotFound(err) @@ -476,7 +489,7 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont workload.ResetChecksOnEviction(wl, r.clock.Now()) err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true) if err == nil { - workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByClusterQueueStopped, message) + workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByClusterQueueStopped, message, r.localQueueMetrics) } return true, client.IgnoreNotFound(err) } @@ -556,7 +569,7 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true) if err == nil { cqName, _ := r.queues.ClusterQueueForWorkload(wl) - workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByPodsReadyTimeout, message) + workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByPodsReadyTimeout, message, r.localQueueMetrics) } return 0, client.IgnoreNotFound(err) } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 556476f92c..a7f76799b9 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -412,11 +412,9 @@ func ReportPendingWorkloads(cqName string, active, inadmissible int) { PendingWorkloads.WithLabelValues(cqName, PendingStatusInadmissible).Set(float64(inadmissible)) } -func ReportLocalQueuePendingWorkloads(lqKey string, active, inadmissible int) { - keySlice := strings.Split(lqKey, "/") - // KTODO: make sure index is correct for lqname and namespace - LocalQueuePendingWorkloads.WithLabelValues(keySlice[0], keySlice[1], PendingStatusActive).Set(float64(active)) - LocalQueuePendingWorkloads.WithLabelValues(keySlice[0], keySlice[1], PendingStatusInadmissible).Set(float64(inadmissible)) +func ReportLocalQueuePendingWorkloads(lq LocalQueueReference, active, inadmissible int) { + LocalQueuePendingWorkloads.WithLabelValues(lq.Name, lq.Namespace, PendingStatusActive).Set(float64(active)) + LocalQueuePendingWorkloads.WithLabelValues(lq.Name, lq.Namespace, PendingStatusInadmissible).Set(float64(inadmissible)) } func ReportEvictedWorkloads(cqName, reason string) { @@ -439,8 +437,7 @@ func LQRefFromWorkload(wl *kueue.Workload) LocalQueueReference { } } -func LQRefFromWorkloadKey(wlKey string) LocalQueueReference { - // KTODO: make sure split is correct +func LQRefFromLocalQueueKey(wlKey string) LocalQueueReference { split := strings.Split(wlKey, "/") return LocalQueueReference{ Name: split[1], @@ -461,7 +458,6 @@ func ClearClusterQueueMetrics(cqName string) { PreemptedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"preempting_cluster_queue": cqName}) } -// KTODO: call clear func func ClearLocalQueueMetrics(lq LocalQueueReference) { LocalQueuePendingWorkloads.DeleteLabelValues(lq.Name, lq.Namespace, PendingStatusActive) LocalQueuePendingWorkloads.DeleteLabelValues(lq.Name, lq.Namespace, PendingStatusInadmissible) @@ -501,7 +497,6 @@ func ClearCacheMetrics(cqName string) { } } -// KTODO: call clear func func ClearLocalQueueCacheMetrics(lq LocalQueueReference) { LocalQueueReservingActiveWorkloads.DeleteLabelValues(lq.Name, lq.Namespace) LocalQueueAdmittedActiveWorkloads.DeleteLabelValues(lq.Name, lq.Namespace) @@ -522,7 +517,6 @@ func ReportClusterQueueResourceReservations(cohort, queue, flavor, resource stri ClusterQueueResourceReservations.WithLabelValues(cohort, queue, flavor, resource).Set(usage) } -// KTODO: call func func ReportLocalQueueResourceReservations(lq LocalQueueReference, flavor, resource string, usage float64) { LocalQueueResourceReservations.WithLabelValues(lq.Name, lq.Namespace, flavor, resource).Set(usage) } @@ -531,7 +525,6 @@ func ReportClusterQueueResourceUsage(cohort, queue, flavor, resource string, usa ClusterQueueResourceUsage.WithLabelValues(cohort, queue, flavor, resource).Set(usage) } -// KTODO: call func func ReportLocalQueueResourceUsage(lq LocalQueueReference, flavor, resource string, usage float64) { LocalQueueResourceUsage.WithLabelValues(lq.Name, lq.Namespace, flavor, resource).Set(usage) } @@ -628,8 +621,22 @@ func Register() { ClusterQueueResourceBorrowingLimit, ClusterQueueResourceLendingLimit, ClusterQueueWeightedShare, + ) +} + +func RegisterLQMetrics() { + metrics.Registry.MustRegister( + LocalQueuePendingWorkloads, + LocalQueueReservingActiveWorkloads, + LocalQueueAdmittedActiveWorkloads, + LocalQueueQuotaReservedWorkloadsTotal, + localQueueQuotaReservedWaitTime, + LocalQueueAdmittedWorkloadsTotal, + localQueueAdmissionWaitTime, + localQueueAdmissionChecksWaitTime, + LocalQueueEvictedWorkloadsTotal, + LocalQueueByStatus, LocalQueueResourceReservations, LocalQueueResourceUsage, - LocalQueueByStatus, ) } diff --git a/pkg/queue/cluster_queue.go b/pkg/queue/cluster_queue.go index 27c642fa74..74ec3388e1 100644 --- a/pkg/queue/cluster_queue.go +++ b/pkg/queue/cluster_queue.go @@ -333,11 +333,6 @@ func (m *Manager) PendingInadmissibleInLocalQueue(lq *LocalQueue) int { return result } -// KTODO: is this function necessary? -func (m *Manager) PendingInLocalQueue(lq *LocalQueue) int { - return m.PendingActiveInLocalQueue(lq) + m.PendingInadmissibleInLocalQueue(lq) -} - // PendingInadmissible returns the number of inadmissible pending workloads, // workloads that were already tried and are waiting for cluster conditions // to change to potentially become admissible. diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index 4553c7ee33..419fccdbc0 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -289,7 +289,7 @@ func (m *Manager) DeleteLocalQueue(q *kueue.LocalQueue) { cq.DeleteFromLocalQueue(qImpl) } if m.localQueueMetrics { - metrics.ClearLocalQueueMetrics(metrics.LQRefFromWorkloadKey(key)) + metrics.ClearLocalQueueMetrics(metrics.LQRefFromLocalQueueKey(key)) } delete(m.localQueues, key) } @@ -607,7 +607,7 @@ func (m *Manager) reportLQPendingWorkloads(lq *LocalQueue) { inadmissible += active active = 0 } - metrics.ReportLocalQueuePendingWorkloads(lq.Key, active, inadmissible) + metrics.ReportLocalQueuePendingWorkloads(metrics.LQRefFromLocalQueueKey(lq.Key), active, inadmissible) } func (m *Manager) reportPendingWorkloads(cqName string, cq *ClusterQueue) { diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 4b71351020..20940758f2 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -845,10 +845,11 @@ func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionCh return acNames } -func ReportEvictedWorkload(recorder record.EventRecorder, wl *kueue.Workload, cqName, reason, message string) { +func ReportEvictedWorkload(recorder record.EventRecorder, wl *kueue.Workload, cqName, reason, message string, lqMetrics bool) { metrics.ReportEvictedWorkloads(cqName, reason) - // KTODO: make LQ metric feature gate - metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), reason) + if lqMetrics { + metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), reason) + } recorder.Event(wl, corev1.EventTypeNormal, fmt.Sprintf("%sDueTo%s", kueue.WorkloadEvicted, reason), message) }