diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 11b1704147..d65d119ecc 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -131,6 +131,10 @@ func main() { metrics.Register() + if features.Enabled(features.LocalQueueMetrics) { + metrics.RegisterLQMetrics() + } + kubeConfig := ctrl.GetConfigOrDie() if kubeConfig.UserAgent == "" { kubeConfig.UserAgent = useragent.Default() diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 0573ca6c0d..3fb03445dd 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -431,6 +431,11 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) { if !ok { return } + if features.Enabled(features.LocalQueueMetrics) { + for _, q := range c.hm.ClusterQueues[cq.Name].localQueues { + metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(q.key)) + } + } c.hm.DeleteClusterQueue(cq.Name) metrics.ClearCacheMetrics(cq.Name) } diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index eee14e737a..83173072b3 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -237,6 +237,11 @@ func (c *clusterQueue) updateQueueStatus() { if status != c.Status { c.Status = status metrics.ReportClusterQueueStatus(c.Name, c.Status) + if features.Enabled(features.LocalQueueMetrics) { + for _, lq := range c.localQueues { + metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(lq.key), c.Status) + } + } } } @@ -500,6 +505,12 @@ func (c *clusterQueue) reportActiveWorkloads() { metrics.ReservingActiveWorkloads.WithLabelValues(c.Name).Set(float64(len(c.Workloads))) } +func (q *queue) reportActiveWorkloads() { + qKeySlice := strings.Split(q.key, "/") + metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.admittedWorkloads)) + metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.reservingWorkloads)) +} + // updateWorkloadUsage updates the usage of the ClusterQueue for the workload // and the number of admitted workloads for local queues. func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) { @@ -537,6 +548,9 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) { updateFlavorUsage(frUsage, lq.admittedUsage, m) lq.admittedWorkloads += int(m) } + if features.Enabled(features.LocalQueueMetrics) { + lq.reportActiveWorkloads() + } } } @@ -581,11 +595,18 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error { } } c.localQueues[qKey] = qImpl + if features.Enabled(features.LocalQueueMetrics) { + qImpl.reportActiveWorkloads() + metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(qKey), c.Status) + } return nil } func (c *clusterQueue) deleteLocalQueue(q *kueue.LocalQueue) { qKey := queueKey(q) + if features.Enabled(features.LocalQueueMetrics) { + metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(qKey)) + } delete(c.localQueues, qKey) } diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go index cc7a48a7b4..08b7f4c06e 100644 --- a/pkg/controller/core/core.go +++ b/pkg/controller/core/core.go @@ -57,8 +57,8 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache qManager, cc, WithQueueVisibilityUpdateInterval(queueVisibilityUpdateInterval(cfg)), - WithQueueVisibilityClusterQueuesMaxCount(queueVisibilityClusterQueuesMaxCount(cfg)), WithReportResourceMetrics(cfg.Metrics.EnableClusterQueueResources), + WithQueueVisibilityClusterQueuesMaxCount(queueVisibilityClusterQueuesMaxCount(cfg)), WithFairSharing(fairSharingEnabled), WithWatchers(rfRec, acRec), ) diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index d9bf6d0162..64195a0b9e 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -40,7 +40,10 @@ import ( "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/util/resource" ) const ( @@ -63,7 +66,11 @@ type LocalQueueReconciler struct { wlUpdateCh chan event.GenericEvent } -func NewLocalQueueReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache) *LocalQueueReconciler { +func NewLocalQueueReconciler( + client client.Client, + queues *queue.Manager, + cache *cache.Cache, +) *LocalQueueReconciler { return &LocalQueueReconciler{ log: ctrl.Log.WithName("localqueue-reconciler"), queues: queues, @@ -142,6 +149,10 @@ func (r *LocalQueueReconciler) Create(e event.CreateEvent) bool { log.Error(err, "Failed to add localQueue to the cache") } + if features.Enabled(features.LocalQueueMetrics) { + recordLocalQueueUsageMetrics(q) + } + return true } @@ -151,6 +162,11 @@ func (r *LocalQueueReconciler) Delete(e event.DeleteEvent) bool { // No need to interact with the queue manager for other objects. return true } + + if features.Enabled(features.LocalQueueMetrics) { + metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(q)) + } + r.log.V(2).Info("LocalQueue delete event", "localQueue", klog.KObj(q)) r.queues.DeleteLocalQueue(q) r.cache.DeleteLocalQueue(q) @@ -191,10 +207,40 @@ func (r *LocalQueueReconciler) Update(e event.UpdateEvent) bool { } r.queues.DeleteLocalQueue(oldLq) + if features.Enabled(features.LocalQueueMetrics) { + updateLocalQueueResourceMetrics(newLq) + } return true } +func localQueueReferenceFromLocalQueue(lq *kueue.LocalQueue) metrics.LocalQueueReference { + return metrics.LocalQueueReference{ + Name: lq.Name, + Namespace: lq.Namespace, + } +} + +func recordLocalQueueUsageMetrics(queue *kueue.LocalQueue) { + for _, flavor := range queue.Status.FlavorUsage { + for _, r := range flavor.Resources { + metrics.ReportLocalQueueResourceUsage(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total)) + } + } + + for _, flavor := range queue.Status.FlavorsReservation { + for _, r := range flavor.Resources { + metrics.ReportLocalQueueResourceReservations(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total)) + } + } + +} + +func updateLocalQueueResourceMetrics(queue *kueue.LocalQueue) { + metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(queue)) + recordLocalQueueUsageMetrics(queue) +} + func (r *LocalQueueReconciler) Generic(e event.GenericEvent) bool { r.log.V(3).Info("Got Workload event", "workload", klog.KObj(e.Object)) return true diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index cbb5e845ab..79add49116 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -48,6 +48,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/queue" utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck" @@ -258,6 +259,10 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, quotaReservedWaitTime.Seconds()) metrics.AdmittedWorkload(kueue.ClusterQueueReference(cqName), queuedWaitTime) metrics.AdmissionChecksWaitTime(kueue.ClusterQueueReference(cqName), quotaReservedWaitTime) + if features.Enabled(features.LocalQueueMetrics) { + metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(&wl), queuedWaitTime) + metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(&wl), quotaReservedWaitTime) + } } return ctrl.Result{}, nil } @@ -428,6 +433,9 @@ func (r *WorkloadReconciler) reconcileOnLocalQueueActiveState(ctx context.Contex cqName := string(lq.Spec.ClusterQueue) if slices.Contains(r.queues.GetClusterQueueNames(), cqName) { metrics.ReportEvictedWorkloads(cqName, kueue.WorkloadEvictedByLocalQueueStopped) + if features.Enabled(features.LocalQueueMetrics) { + metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), kueue.WorkloadEvictedByLocalQueueStopped) + } } } return true, client.IgnoreNotFound(err) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index fbe9cbcc20..cb9110f664 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -151,6 +151,12 @@ const ( // // Workloads keeps allocated quota and preserves QuotaReserved=True when ProvisioningRequest fails KeepQuotaForProvReqRetry featuregate.Feature = "KeepQuotaForProvReqRetry" + + // owner: @kpostoffice + // alpha: v0.10 + // + // Enabled gathering of LocalQueue metrics + LocalQueueMetrics featuregate.Feature = "LocalQueueMetrics" ) func init() { @@ -180,6 +186,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ ExposeFlavorsInLocalQueue: {Default: true, PreRelease: featuregate.Beta}, AdmissionCheckValidationRules: {Default: false, PreRelease: featuregate.Deprecated}, KeepQuotaForProvReqRetry: {Default: false, PreRelease: featuregate.Deprecated}, + LocalQueueMetrics: {Default: false, PreRelease: featuregate.Alpha}, } func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) { diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index af8fca6936..a7f76799b9 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -17,6 +17,7 @@ limitations under the License. package metrics import ( + "strings" "time" "github.com/prometheus/client_golang/prometheus" @@ -30,6 +31,11 @@ import ( type AdmissionResult string type ClusterQueueStatus string +type LocalQueueReference struct { + Name string + Namespace string +} + const ( AdmissionResultSuccess AdmissionResult = "success" AdmissionResultInadmissible AdmissionResult = "inadmissible" @@ -102,6 +108,17 @@ The label 'result' can have the following values: }, []string{"cluster_queue", "status"}, ) + LocalQueuePendingWorkloads = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: constants.KueueName, + Name: "local_queue_pending_workloads", + Help: `The number of pending workloads, per 'local_queue' and 'status'. +'status' can have the following values: +- "active" means that the workloads are in the admission queue. +- "inadmissible" means there was a failed admission attempt for these workloads and they won't be retried until cluster conditions, which could make this workload admissible, change`, + }, []string{"local_queue", "namespace", "status"}, + ) + QuotaReservedWorkloadsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: constants.KueueName, @@ -110,6 +127,14 @@ The label 'result' can have the following values: }, []string{"cluster_queue"}, ) + LocalQueueQuotaReservedWorkloadsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: constants.KueueName, + Name: "local_queue_quota_reserved_workloads_total", + Help: "The total number of quota reserved workloads per 'local_queue'", + }, []string{"local_queue", "namespace"}, + ) + quotaReservedWaitTime = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: constants.KueueName, @@ -119,6 +144,15 @@ The label 'result' can have the following values: }, []string{"cluster_queue"}, ) + localQueueQuotaReservedWaitTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: constants.KueueName, + Name: "local_queue_quota_reserved_wait_time_seconds", + Help: "The time between a workload was created or requeued until it got quota reservation, per 'local_queue'", + Buckets: generateExponentialBuckets(14), + }, []string{"local_queue", "namespace"}, + ) + AdmittedWorkloadsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: constants.KueueName, @@ -127,6 +161,14 @@ The label 'result' can have the following values: }, []string{"cluster_queue"}, ) + LocalQueueAdmittedWorkloadsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: constants.KueueName, + Name: "local_queue_admitted_workloads_total", + Help: "The total number of admitted workloads per 'local_queue'", + }, []string{"local_queue", "namespace"}, + ) + admissionWaitTime = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: constants.KueueName, @@ -136,6 +178,15 @@ The label 'result' can have the following values: }, []string{"cluster_queue"}, ) + localQueueAdmissionWaitTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: constants.KueueName, + Name: "local_queue_admission_wait_time_seconds", + Help: "The time between a workload was created or requeued until admission, per 'local_queue'", + Buckets: generateExponentialBuckets(14), + }, []string{"local_queue", "namespace"}, + ) + admissionChecksWaitTime = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: constants.KueueName, @@ -145,6 +196,15 @@ The label 'result' can have the following values: }, []string{"cluster_queue"}, ) + localQueueAdmissionChecksWaitTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: constants.KueueName, + Name: "local_queue_admission_checks_wait_time_seconds", + Help: "The time from when a workload got the quota reservation until admission, per 'local_queue'", + Buckets: generateExponentialBuckets(14), + }, []string{"local_queue", "namespace"}, + ) + EvictedWorkloadsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: constants.KueueName, @@ -159,6 +219,20 @@ The label 'reason' can have the following values: }, []string{"cluster_queue", "reason"}, ) + LocalQueueEvictedWorkloadsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: constants.KueueName, + Name: "local_queue_evicted_workloads_total", + Help: `The number of evicted workloads per 'local_queue', +The label 'reason' can have the following values: +- "Preempted" means that the workload was evicted in order to free resources for a workload with a higher priority or reclamation of nominal quota. +- "PodsReadyTimeout" means that the eviction took place due to a PodsReady timeout. +- "AdmissionCheck" means that the workload was evicted because at least one admission check transitioned to False. +- "ClusterQueueStopped" means that the workload was evicted because the ClusterQueue is stopped. +- "Deactivated" means that the workload was evicted because spec.active is set to false`, + }, []string{"local_queue", "namespace", "reason"}, + ) + PreemptedWorkloadsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: constants.KueueName, @@ -182,6 +256,14 @@ The label 'reason' can have the following values: }, []string{"cluster_queue"}, ) + LocalQueueReservingActiveWorkloads = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: constants.KueueName, + Name: "local_queue_reserving_active_workloads", + Help: "The number of Workloads that are reserving quota, per 'localQueue'", + }, []string{"local_queue", "namespace"}, + ) + AdmittedActiveWorkloads = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: constants.KueueName, @@ -190,6 +272,14 @@ The label 'reason' can have the following values: }, []string{"cluster_queue"}, ) + LocalQueueAdmittedActiveWorkloads = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: constants.KueueName, + Name: "local_queue_admitted_active_workloads", + Help: "The number of admitted Workloads that are active (unsuspended and not finished), per 'localQueue'", + }, []string{"local_queue", "namespace"}, + ) + ClusterQueueByStatus = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: constants.KueueName, @@ -199,6 +289,15 @@ For a ClusterQueue, the metric only reports a value of 1 for one of the statuses }, []string{"cluster_queue", "status"}, ) + LocalQueueByStatus = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: constants.KueueName, + Name: "localQueue_status", + Help: `Reports 'localQueue' with its 'status' (with possible values 'pending', 'active' or 'terminated'). +For a LocalQueue, the metric only reports a value of 1 for one of the statuses.`, + }, []string{"local_queue", "namespace", "status"}, + ) + // Optional cluster queue metrics ClusterQueueResourceReservations = prometheus.NewGaugeVec( @@ -217,6 +316,22 @@ For a ClusterQueue, the metric only reports a value of 1 for one of the statuses }, []string{"cohort", "cluster_queue", "flavor", "resource"}, ) + LocalQueueResourceReservations = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: constants.KueueName, + Name: "localQueue_resource_reservation", + Help: `Reports the localQueue's total resource reservation within all the flavors`, + }, []string{"local_queue", "namespace", "flavor", "resource"}, + ) + + LocalQueueResourceUsage = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: constants.KueueName, + Name: "localQueue_resource_usage", + Help: `Reports the localQueue's total resource usage within all the flavors`, + }, []string{"local_queue", "namespace", "flavor", "resource"}, + ) + ClusterQueueResourceNominalQuota = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: constants.KueueName, @@ -269,29 +384,67 @@ func QuotaReservedWorkload(cqName kueue.ClusterQueueReference, waitTime time.Dur quotaReservedWaitTime.WithLabelValues(string(cqName)).Observe(waitTime.Seconds()) } +func LocalQueueQuotaReservedWorkload(lq LocalQueueReference, waitTime time.Duration) { + LocalQueueQuotaReservedWorkloadsTotal.WithLabelValues(lq.Name, lq.Namespace).Inc() + localQueueQuotaReservedWaitTime.WithLabelValues(lq.Name, lq.Namespace).Observe(waitTime.Seconds()) +} + func AdmittedWorkload(cqName kueue.ClusterQueueReference, waitTime time.Duration) { AdmittedWorkloadsTotal.WithLabelValues(string(cqName)).Inc() admissionWaitTime.WithLabelValues(string(cqName)).Observe(waitTime.Seconds()) } +func LocalQueueAdmittedWorkload(lq LocalQueueReference, waitTime time.Duration) { + LocalQueueAdmittedWorkloadsTotal.WithLabelValues(lq.Name, lq.Namespace).Inc() + localQueueAdmissionWaitTime.WithLabelValues(lq.Name, lq.Namespace).Observe(waitTime.Seconds()) +} + func AdmissionChecksWaitTime(cqName kueue.ClusterQueueReference, waitTime time.Duration) { admissionChecksWaitTime.WithLabelValues(string(cqName)).Observe(waitTime.Seconds()) } +func LocalQueueAdmissionChecksWaitTime(lq LocalQueueReference, waitTime time.Duration) { + localQueueAdmissionChecksWaitTime.WithLabelValues(lq.Name, lq.Namespace).Observe(waitTime.Seconds()) +} + func ReportPendingWorkloads(cqName string, active, inadmissible int) { PendingWorkloads.WithLabelValues(cqName, PendingStatusActive).Set(float64(active)) PendingWorkloads.WithLabelValues(cqName, PendingStatusInadmissible).Set(float64(inadmissible)) } +func ReportLocalQueuePendingWorkloads(lq LocalQueueReference, active, inadmissible int) { + LocalQueuePendingWorkloads.WithLabelValues(lq.Name, lq.Namespace, PendingStatusActive).Set(float64(active)) + LocalQueuePendingWorkloads.WithLabelValues(lq.Name, lq.Namespace, PendingStatusInadmissible).Set(float64(inadmissible)) +} + func ReportEvictedWorkloads(cqName, reason string) { EvictedWorkloadsTotal.WithLabelValues(cqName, reason).Inc() } +func ReportLocalQueueEvictedWorkloads(lq LocalQueueReference, reason string) { + LocalQueueEvictedWorkloadsTotal.WithLabelValues(lq.Name, lq.Namespace, reason).Inc() +} + func ReportPreemption(preemptingCqName, preemptingReason, targetCqName string) { PreemptedWorkloadsTotal.WithLabelValues(preemptingCqName, preemptingReason).Inc() ReportEvictedWorkloads(targetCqName, kueue.WorkloadEvictedByPreemption) } +func LQRefFromWorkload(wl *kueue.Workload) LocalQueueReference { + return LocalQueueReference{ + Name: wl.Spec.QueueName, + Namespace: wl.Namespace, + } +} + +func LQRefFromLocalQueueKey(wlKey string) LocalQueueReference { + split := strings.Split(wlKey, "/") + return LocalQueueReference{ + Name: split[1], + Namespace: split[0], + } +} + func ClearClusterQueueMetrics(cqName string) { AdmissionCyclePreemptionSkips.DeleteLabelValues(cqName) PendingWorkloads.DeleteLabelValues(cqName, PendingStatusActive) @@ -305,6 +458,17 @@ func ClearClusterQueueMetrics(cqName string) { PreemptedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"preempting_cluster_queue": cqName}) } +func ClearLocalQueueMetrics(lq LocalQueueReference) { + LocalQueuePendingWorkloads.DeleteLabelValues(lq.Name, lq.Namespace, PendingStatusActive) + LocalQueuePendingWorkloads.DeleteLabelValues(lq.Name, lq.Namespace, PendingStatusInadmissible) + LocalQueueQuotaReservedWorkloadsTotal.DeleteLabelValues(lq.Name, lq.Namespace) + localQueueQuotaReservedWaitTime.DeleteLabelValues(lq.Name, lq.Namespace) + LocalQueueAdmittedWorkloadsTotal.DeleteLabelValues(lq.Name, lq.Namespace) + localQueueAdmissionWaitTime.DeleteLabelValues(lq.Name, lq.Namespace) + localQueueAdmissionChecksWaitTime.DeleteLabelValues(lq.Name, lq.Namespace) + LocalQueueEvictedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"local_queue": lq.Name, "namespace": lq.Namespace}) +} + func ReportClusterQueueStatus(cqName string, cqStatus ClusterQueueStatus) { for _, status := range CQStatuses { var v float64 @@ -315,6 +479,16 @@ func ReportClusterQueueStatus(cqName string, cqStatus ClusterQueueStatus) { } } +func ReportLocalQueueStatus(lq LocalQueueReference, status ClusterQueueStatus) { + for _, s := range CQStatuses { + var v float64 + if s == status { + v = 1 + } + LocalQueueByStatus.WithLabelValues(lq.Name, lq.Namespace, string(s)).Set(v) + } +} + func ClearCacheMetrics(cqName string) { ReservingActiveWorkloads.DeleteLabelValues(cqName) AdmittedActiveWorkloads.DeleteLabelValues(cqName) @@ -323,6 +497,14 @@ func ClearCacheMetrics(cqName string) { } } +func ClearLocalQueueCacheMetrics(lq LocalQueueReference) { + LocalQueueReservingActiveWorkloads.DeleteLabelValues(lq.Name, lq.Namespace) + LocalQueueAdmittedActiveWorkloads.DeleteLabelValues(lq.Name, lq.Namespace) + for _, status := range CQStatuses { + LocalQueueByStatus.DeleteLabelValues(lq.Name, lq.Namespace, string(status)) + } +} + func ReportClusterQueueQuotas(cohort, queue, flavor, resource string, nominal, borrowing, lending float64) { ClusterQueueResourceNominalQuota.WithLabelValues(cohort, queue, flavor, resource).Set(nominal) ClusterQueueResourceBorrowingLimit.WithLabelValues(cohort, queue, flavor, resource).Set(borrowing) @@ -335,10 +517,18 @@ func ReportClusterQueueResourceReservations(cohort, queue, flavor, resource stri ClusterQueueResourceReservations.WithLabelValues(cohort, queue, flavor, resource).Set(usage) } +func ReportLocalQueueResourceReservations(lq LocalQueueReference, flavor, resource string, usage float64) { + LocalQueueResourceReservations.WithLabelValues(lq.Name, lq.Namespace, flavor, resource).Set(usage) +} + func ReportClusterQueueResourceUsage(cohort, queue, flavor, resource string, usage float64) { ClusterQueueResourceUsage.WithLabelValues(cohort, queue, flavor, resource).Set(usage) } +func ReportLocalQueueResourceUsage(lq LocalQueueReference, flavor, resource string, usage float64) { + LocalQueueResourceUsage.WithLabelValues(lq.Name, lq.Namespace, flavor, resource).Set(usage) +} + func ReportClusterQueueWeightedShare(cq string, weightedShare int64) { ClusterQueueWeightedShare.WithLabelValues(cq).Set(float64(weightedShare)) } @@ -356,6 +546,16 @@ func ClearClusterQueueResourceMetrics(cqName string) { ClusterQueueResourceReservations.DeletePartialMatch(lbls) } +// KTODO: call func +func ClearLocalQueueResourceMetrics(lq LocalQueueReference) { + lbls := prometheus.Labels{ + "local_queue": lq.Name, + "namespace": lq.Namespace, + } + LocalQueueResourceReservations.DeletePartialMatch(lbls) + LocalQueueResourceUsage.DeletePartialMatch(lbls) +} + func ClearClusterQueueResourceQuotas(cqName, flavor, resource string) { lbls := prometheus.Labels{ "cluster_queue": cqName, @@ -423,3 +623,20 @@ func Register() { ClusterQueueWeightedShare, ) } + +func RegisterLQMetrics() { + metrics.Registry.MustRegister( + LocalQueuePendingWorkloads, + LocalQueueReservingActiveWorkloads, + LocalQueueAdmittedActiveWorkloads, + LocalQueueQuotaReservedWorkloadsTotal, + localQueueQuotaReservedWaitTime, + LocalQueueAdmittedWorkloadsTotal, + localQueueAdmissionWaitTime, + localQueueAdmissionChecksWaitTime, + LocalQueueEvictedWorkloadsTotal, + LocalQueueByStatus, + LocalQueueResourceReservations, + LocalQueueResourceUsage, + ) +} diff --git a/pkg/queue/local_queue.go b/pkg/queue/local_queue.go index 8fd13bf00c..2c2be3f4b9 100644 --- a/pkg/queue/local_queue.go +++ b/pkg/queue/local_queue.go @@ -53,3 +53,36 @@ func (q *LocalQueue) AddOrUpdate(info *workload.Info) { key := workload.Key(info.Obj) q.items[key] = info } + +func (m *Manager) PendingActiveInLocalQueue(lq *LocalQueue) int { + c := m.getClusterQueue(lq.ClusterQueue) + result := 0 + if c == nil { + return 0 + } + for _, wl := range c.heap.List() { + wlLqKey := workload.QueueKey(wl.Obj) + if wlLqKey == lq.Key { + result++ + } + } + if workloadKey(c.inflight) == lq.Key { + result++ + } + return result +} + +func (m *Manager) PendingInadmissibleInLocalQueue(lq *LocalQueue) int { + c := m.getClusterQueue(lq.ClusterQueue) + if c == nil { + return 0 + } + result := 0 + for _, wl := range c.inadmissibleWorkloads { + wlLqKey := workload.QueueKey(wl.Obj) + if wlLqKey == lq.Key { + result++ + } + } + return result +} diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index dcc5f89ba3..2fc52f88d6 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -32,6 +32,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/hierarchy" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/workload" @@ -161,6 +162,9 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e added := cqImpl.AddFromLocalQueue(qImpl) addedWorkloads = addedWorkloads || added } + if features.Enabled(features.LocalQueueMetrics) { + m.reportLQPendingWorkloads(qImpl) + } } queued := m.requeueWorkloadsCQ(ctx, cqImpl) @@ -190,6 +194,13 @@ func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue // If any workload becomes admissible or the queue becomes active. if (specUpdated && m.requeueWorkloadsCQ(ctx, cqImpl)) || (!oldActive && cqImpl.Active()) { m.reportPendingWorkloads(cq.Name, cqImpl) + if features.Enabled(features.LocalQueueMetrics) { + for _, q := range m.localQueues { + if q.ClusterQueue == cq.Name { + m.reportLQPendingWorkloads(q) + } + } + } m.Broadcast() } return nil @@ -269,6 +280,9 @@ func (m *Manager) DeleteLocalQueue(q *kueue.LocalQueue) { if cq != nil { cq.DeleteFromLocalQueue(qImpl) } + if features.Enabled(features.LocalQueueMetrics) { + metrics.ClearLocalQueueMetrics(metrics.LQRefFromLocalQueueKey(key)) + } delete(m.localQueues, key) } @@ -338,6 +352,9 @@ func (m *Manager) AddOrUpdateWorkloadWithoutLock(w *kueue.Workload) bool { return false } cq.PushOrUpdate(wInfo) + if features.Enabled(features.LocalQueueMetrics) { + m.reportLQPendingWorkloads(q) + } m.reportPendingWorkloads(q.ClusterQueue, cq) m.Broadcast() return true @@ -371,6 +388,9 @@ func (m *Manager) RequeueWorkload(ctx context.Context, info *workload.Info, reas added := cq.RequeueIfNotPresent(info, reason) m.reportPendingWorkloads(q.ClusterQueue, cq) + if features.Enabled(features.LocalQueueMetrics) { + m.reportLQPendingWorkloads(q) + } if added { m.Broadcast() } @@ -389,6 +409,9 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.Workload, qKey return } delete(q.items, workload.Key(w)) + if features.Enabled(features.LocalQueueMetrics) { + m.reportLQPendingWorkloads(q) + } cq := m.hm.ClusterQueues[q.ClusterQueue] if cq != nil { cq.Delete(w) @@ -558,6 +581,9 @@ func (m *Manager) heads() []workload.Info { workloads = append(workloads, wlCopy) q := m.localQueues[workload.QueueKey(wl.Obj)] delete(q.items, workload.Key(wl.Obj)) + if features.Enabled(features.LocalQueueMetrics) { + m.reportLQPendingWorkloads(q) + } } return workloads } @@ -566,6 +592,16 @@ func (m *Manager) Broadcast() { m.cond.Broadcast() } +func (m *Manager) reportLQPendingWorkloads(lq *LocalQueue) { + active := m.PendingActiveInLocalQueue(lq) + inadmissible := m.PendingInadmissibleInLocalQueue(lq) + if m.statusChecker != nil && !m.statusChecker.ClusterQueueActive(lq.ClusterQueue) { + inadmissible += active + active = 0 + } + metrics.ReportLocalQueuePendingWorkloads(metrics.LQRefFromLocalQueueKey(lq.Key), active, inadmissible) +} + func (m *Manager) reportPendingWorkloads(cqName string, cq *ClusterQueue) { active := cq.PendingActive() inadmissible := cq.PendingInadmissible() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 84892bb74f..e39fe18ea8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -548,11 +548,20 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, cq *cache.ClusterQueueS waitTime := workload.QueuedWaitTime(newWorkload) s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "QuotaReserved", "Quota reserved in ClusterQueue %v, wait time since queued was %.0fs", admission.ClusterQueue, waitTime.Seconds()) metrics.QuotaReservedWorkload(admission.ClusterQueue, waitTime) + if features.Enabled(features.LocalQueueMetrics) { + metrics.LocalQueueQuotaReservedWorkload(metrics.LQRefFromWorkload(newWorkload), waitTime) + } if workload.IsAdmitted(newWorkload) { s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was 0s", admission.ClusterQueue) metrics.AdmittedWorkload(admission.ClusterQueue, waitTime) + if features.Enabled(features.LocalQueueMetrics) { + metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(newWorkload), waitTime) + } if len(newWorkload.Status.AdmissionChecks) > 0 { metrics.AdmissionChecksWaitTime(admission.ClusterQueue, 0) + if features.Enabled(features.LocalQueueMetrics) { + metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(newWorkload), 0) + } } } log.V(2).Info("Workload successfully admitted and assigned flavors", "assignments", admission.PodSetAssignments) diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index fbc1936c8a..98b48e974e 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -847,6 +847,9 @@ func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionCh func ReportEvictedWorkload(recorder record.EventRecorder, wl *kueue.Workload, cqName, reason, message string) { metrics.ReportEvictedWorkloads(cqName, reason) + if features.Enabled(features.LocalQueueMetrics) { + metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), reason) + } recorder.Event(wl, corev1.EventTypeNormal, fmt.Sprintf("%sDueTo%s", kueue.WorkloadEvicted, reason), message) }