Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Prometheus metrics for LocalQueue #3673

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ type ControllerMetrics struct {
// metrics will be reported.
// +optional
EnableClusterQueueResources bool `json:"enableClusterQueueResources,omitempty"`

// +optional
EnableLocalQueueMetrics bool `json:"enableLocalQueueMetrics,omitempty"`
Copy link
Contributor

@mimowo mimowo Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to favor API rather than a feature gate? We don't guard other metrics by API. So, I don't see such a need, but let us know if there is something specific about them. If the concern is stability of the system due to potential bugs, then feature gate is enough, we can start from alpha. It would also allow us to simplify the code as feature gate status can be checked from any place, so no need to pass parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I very much agree, especially when it comes to passing parameters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a comment about increasing cardinality and wanting to leave this behind a long term config field

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but in that case I would like to go via the KEP process. Pity the comment does not mention why cardinality is a problem - is it for usability (this could be solved by aggregation), or performance. Do you have some other references why cardinality might be a problem in k8s.

I assume we don't have many more LQs than namepaces, which also let me check what we do in the core k8s. I see that we have metrics depending on Namespace, example. However, in this case we use explicitly CounterOpts.Namespace. Maybe we could also do it this way? PTAL.

If you want this feature in 0.10 I think the only chance is a short KEP, don't change API, and guard it by Alpha feature gate (disabled by default). Then for second iteration of alpha investigate if we need the API switch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The namespace in the example you link isn't a K8 namespace from what I understand. It is the project namespace to avoid prometheus metrics clashing

Copy link
Contributor

@mimowo mimowo Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I thought I found such an example in the k8s l, but I was wrong.. seeing no such metrics in k8s suggests that indeed it might be better not to multiply the metrics by namespace. DISCLAIMER: I haven't done extensive search, just looked at a couple places

Since we have such a use case in Kueue I would be ok with the API knob, but anyway a KEP would be useful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://kubernetes.io/docs/reference/instrumentation/metrics/

There's a few metrics here with a namespace label.

There's an existing KEP that I was having a bit of trouble implementing since it included the ability to use namespace/local_queue selectors for metric collection.

Copy link
Contributor

@mimowo mimowo Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://kubernetes.io/docs/reference/instrumentation/metrics/

There's a few metrics here with a namespace label.

Interesting, are these metrics opt-in or enabled by default? If k8s core enables them by default I don't think we need to worry. I would like to better understand why cardinality is a problem basically

Copy link
Contributor

@mimowo mimowo Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to update the KEP and hide the mretics behind alpha feature gate. This will not impact users /customers and we don't commit to maintain the API. Then as graduation point for beta we re-evaluate both approaches

EDIT: to be clear I'm hesitant, maybe it is actually ok to just preemptively prevent very large outputs from the metrics endpoint. So, maybe the API is fine, I will look tomorrow. Cc @tenzen-y.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, are these metrics opt-in or enabled by default? If k8s core enables them by default I don't think we need to worry. I would like to better understand why cardinality is a problem basically

There are a handful that have graduated to stable and about a dozen that are alpha

}

// ControllerHealth defines the health configs.
Expand Down
4 changes: 4 additions & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func main() {

metrics.Register()

if features.Enabled(features.LocalQueueMetrics) {
metrics.RegisterLQMetrics()
}

kubeConfig := ctrl.GetConfigOrDie()
if kubeConfig.UserAgent == "" {
kubeConfig.UserAgent = useragent.Default()
Expand Down
5 changes: 5 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
if !ok {
return
}
if features.Enabled(features.LocalQueueMetrics) {
for _, q := range c.hm.ClusterQueues[cq.Name].localQueues {
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(q.key))
}
}
c.hm.DeleteClusterQueue(cq.Name)
metrics.ClearCacheMetrics(cq.Name)
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ func (c *clusterQueue) updateQueueStatus() {
if status != c.Status {
c.Status = status
metrics.ReportClusterQueueStatus(c.Name, c.Status)
if features.Enabled(features.LocalQueueMetrics) {
for _, lq := range c.localQueues {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This iteration might be adding unnecessary performance cost. What is the scenario that it needs calling here? Maybe we could move the call per LQ, when we update the specific LQ. PTAL.

Copy link
Contributor Author

@KPostOffice KPostOffice Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lq status is equal to the cq status. So when the cq status updates, all the cq's associated lqs should have their statuses updated as well

metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(lq.key), c.Status)
}
}
}
}

Expand Down Expand Up @@ -500,6 +505,12 @@ func (c *clusterQueue) reportActiveWorkloads() {
metrics.ReservingActiveWorkloads.WithLabelValues(c.Name).Set(float64(len(c.Workloads)))
}

func (q *queue) reportActiveWorkloads() {
qKeySlice := strings.Split(q.key, "/")
metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.admittedWorkloads))
metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.reservingWorkloads))
}

// updateWorkloadUsage updates the usage of the ClusterQueue for the workload
// and the number of admitted workloads for local queues.
func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
Expand Down Expand Up @@ -537,6 +548,9 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
updateFlavorUsage(frUsage, lq.admittedUsage, m)
lq.admittedWorkloads += int(m)
}
if features.Enabled(features.LocalQueueMetrics) {
lq.reportActiveWorkloads()
}
}
}

Expand Down Expand Up @@ -581,11 +595,18 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error {
}
}
c.localQueues[qKey] = qImpl
if features.Enabled(features.LocalQueueMetrics) {
qImpl.reportActiveWorkloads()
metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(qKey), c.Status)
}
return nil
}

func (c *clusterQueue) deleteLocalQueue(q *kueue.LocalQueue) {
qKey := queueKey(q)
if features.Enabled(features.LocalQueueMetrics) {
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(qKey))
}
delete(c.localQueues, qKey)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
cc,
WithQueueVisibilityUpdateInterval(queueVisibilityUpdateInterval(cfg)),
WithQueueVisibilityClusterQueuesMaxCount(queueVisibilityClusterQueuesMaxCount(cfg)),
WithReportResourceMetrics(cfg.Metrics.EnableClusterQueueResources),
WithFairSharing(fairSharingEnabled),
WithWatchers(rfRec, acRec),
)
Expand Down
61 changes: 60 additions & 1 deletion pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ import (
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/resource"
)

const (
Expand All @@ -63,7 +66,24 @@ type LocalQueueReconciler struct {
wlUpdateCh chan event.GenericEvent
}

func NewLocalQueueReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache) *LocalQueueReconciler {
type LocalQueueReconcilerOptions struct {
LocalQueueMetricsEnabled bool
}

type LocalQueueReconcilerOption func(*LocalQueueReconcilerOptions)

var defaultLQOptions = LocalQueueReconcilerOptions{}

func NewLocalQueueReconciler(
client client.Client,
queues *queue.Manager,
cache *cache.Cache,
opts ...LocalQueueReconcilerOption,
) *LocalQueueReconciler {
options := defaultLQOptions
for _, opt := range opts {
opt(&options)
}
return &LocalQueueReconciler{
log: ctrl.Log.WithName("localqueue-reconciler"),
queues: queues,
Expand Down Expand Up @@ -142,6 +162,10 @@ func (r *LocalQueueReconciler) Create(e event.CreateEvent) bool {
log.Error(err, "Failed to add localQueue to the cache")
}

if features.Enabled(features.LocalQueueMetrics) {
recordLocalQueueUsageMetrics(q)
}

return true
}

Expand All @@ -151,6 +175,11 @@ func (r *LocalQueueReconciler) Delete(e event.DeleteEvent) bool {
// No need to interact with the queue manager for other objects.
return true
}

if features.Enabled(features.LocalQueueMetrics) {
metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(q))
}

r.log.V(2).Info("LocalQueue delete event", "localQueue", klog.KObj(q))
r.queues.DeleteLocalQueue(q)
r.cache.DeleteLocalQueue(q)
Expand Down Expand Up @@ -191,10 +220,40 @@ func (r *LocalQueueReconciler) Update(e event.UpdateEvent) bool {
}

r.queues.DeleteLocalQueue(oldLq)
if features.Enabled(features.LocalQueueMetrics) {
updateLocalQueueResourceMetrics(newLq)
}

return true
}

func localQueueReferenceFromLocalQueue(lq *kueue.LocalQueue) metrics.LocalQueueReference {
return metrics.LocalQueueReference{
Name: lq.Name,
Namespace: lq.Namespace,
}
}

func recordLocalQueueUsageMetrics(queue *kueue.LocalQueue) {
for _, flavor := range queue.Status.FlavorUsage {
for _, r := range flavor.Resources {
metrics.ReportLocalQueueResourceUsage(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
}
}

for _, flavor := range queue.Status.FlavorsReservation {
for _, r := range flavor.Resources {
metrics.ReportLocalQueueResourceReservations(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
}
}

}

func updateLocalQueueResourceMetrics(queue *kueue.LocalQueue) {
metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(queue))
recordLocalQueueUsageMetrics(queue)
}

func (r *LocalQueueReconciler) Generic(e event.GenericEvent) bool {
r.log.V(3).Info("Got Workload event", "workload", klog.KObj(e.Object))
return true
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck"
Expand Down Expand Up @@ -258,6 +259,10 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, quotaReservedWaitTime.Seconds())
metrics.AdmittedWorkload(kueue.ClusterQueueReference(cqName), queuedWaitTime)
metrics.AdmissionChecksWaitTime(kueue.ClusterQueueReference(cqName), quotaReservedWaitTime)
if features.Enabled(features.LocalQueueMetrics) {
metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(&wl), queuedWaitTime)
metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(&wl), quotaReservedWaitTime)
}
}
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -428,6 +433,9 @@ func (r *WorkloadReconciler) reconcileOnLocalQueueActiveState(ctx context.Contex
cqName := string(lq.Spec.ClusterQueue)
if slices.Contains(r.queues.GetClusterQueueNames(), cqName) {
metrics.ReportEvictedWorkloads(cqName, kueue.WorkloadEvictedByLocalQueueStopped)
if features.Enabled(features.LocalQueueMetrics) {
metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), kueue.WorkloadEvictedByLocalQueueStopped)
}
}
}
return true, client.IgnoreNotFound(err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ const (
//
// Workloads keeps allocated quota and preserves QuotaReserved=True when ProvisioningRequest fails
KeepQuotaForProvReqRetry featuregate.Feature = "KeepQuotaForProvReqRetry"

// owner: @kpostoffice
// alpha: v0.10
//
// Enabled gathering of LocalQueue metrics
LocalQueueMetrics featuregate.Feature = "LocalQueueMetrics"
)

func init() {
Expand Down Expand Up @@ -180,6 +186,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
ExposeFlavorsInLocalQueue: {Default: true, PreRelease: featuregate.Beta},
AdmissionCheckValidationRules: {Default: false, PreRelease: featuregate.Deprecated},
KeepQuotaForProvReqRetry: {Default: false, PreRelease: featuregate.Deprecated},
LocalQueueMetrics: {Default: false, PreRelease: featuregate.Alpha},
}

func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) {
Expand Down
Loading