From 53dc0e772e968a1acb5b8f972b266a2c7e677a3f Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Thu, 21 Nov 2024 20:55:42 +0000 Subject: [PATCH] Implement default LocalQueue --- pkg/controller/jobframework/base_webhook.go | 10 ++- .../jobframework/base_webhook_test.go | 82 ++++++++++++++++++- pkg/controller/jobframework/defaults.go | 13 +++ .../jobs/deployment/deployment_webhook.go | 24 +++++- .../deployment/deployment_webhook_test.go | 53 ++++++++++-- pkg/controller/jobs/job/job_webhook.go | 6 +- pkg/controller/jobs/job/job_webhook_test.go | 32 ++++++++ pkg/controller/jobs/jobset/jobset_webhook.go | 6 +- .../jobs/jobset/jobset_webhook_test.go | 76 +++++++++++++++++ pkg/controller/jobs/mpijob/mpijob_webhook.go | 6 +- .../jobs/mpijob/mpijob_webhook_test.go | 76 +++++++++++++++++ pkg/controller/jobs/pod/pod_webhook.go | 9 +- .../jobs/raycluster/raycluster_webhook.go | 9 +- .../raycluster/raycluster_webhook_test.go | 49 ++++++++++- pkg/controller/jobs/rayjob/rayjob_webhook.go | 9 +- .../jobs/rayjob/rayjob_webhook_test.go | 48 ++++++++++- .../jobs/statefulset/statefulset_webhook.go | 19 ++++- .../statefulset/statefulset_webhook_test.go | 49 ++++++++++- pkg/features/kube_features.go | 7 ++ pkg/queue/local_queue.go | 6 ++ pkg/queue/manager.go | 10 +++ 21 files changed, 570 insertions(+), 29 deletions(-) diff --git a/pkg/controller/jobframework/base_webhook.go b/pkg/controller/jobframework/base_webhook.go index 4b8438a5450..5e76c9446ec 100644 --- a/pkg/controller/jobframework/base_webhook.go +++ b/pkg/controller/jobframework/base_webhook.go @@ -25,12 +25,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" "sigs.k8s.io/kueue/pkg/controller/jobframework/webhook" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" ) // BaseWebhook applies basic defaulting and validation for jobs. type BaseWebhook struct { ManageJobsWithoutQueueName bool FromObject func(runtime.Object) GenericJob + Queues *queue.Manager } func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJob) func(ctrl.Manager, ...Option) error { @@ -39,6 +42,7 @@ func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJ wh := &BaseWebhook{ ManageJobsWithoutQueueName: options.ManageJobsWithoutQueueName, FromObject: fromObject, + Queues: options.Queues, } return webhook.WebhookManagedBy(mgr). For(job.Object()). @@ -55,7 +59,11 @@ func (w *BaseWebhook) Default(ctx context.Context, obj runtime.Object) error { job := w.FromObject(obj) log := ctrl.LoggerFrom(ctx) log.V(5).Info("Applying defaults", "job", klog.KObj(job.Object())) - ApplyDefaultForSuspend(job, w.ManageJobsWithoutQueueName) + if features.Enabled(features.KueueDefaulting) && w.Queues.DefaultLocalQueue(job.Object().GetNamespace()) != "" { + ApplyDefaultLocalQueue(job) + } else { + ApplyDefaultForSuspend(job, w.ManageJobsWithoutQueueName) + } return nil } diff --git a/pkg/controller/jobframework/base_webhook_test.go b/pkg/controller/jobframework/base_webhook_test.go index 60d2f5f9e72..4241031c572 100644 --- a/pkg/controller/jobframework/base_webhook_test.go +++ b/pkg/controller/jobframework/base_webhook_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/go-cmp/cmp" batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -31,9 +32,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/podset" + "sigs.k8s.io/kueue/pkg/queue" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" ) type testGenericJob struct { @@ -125,9 +130,10 @@ func makeTestGenericJob() *testGenericJob { func TestBaseWebhookDefault(t *testing.T) { testcases := map[string]struct { manageJobsWithoutQueueName bool - - job *batchv1.Job - want *batchv1.Job + kueueDefaulting bool + defaultLqExist bool + job *batchv1.Job + want *batchv1.Job }{ "update the suspend field with 'manageJobsWithoutQueueName=false'": { job: &batchv1.Job{ @@ -164,12 +170,82 @@ func TestBaseWebhookDefault(t *testing.T) { Spec: batchv1.JobSpec{Suspend: ptr.To(true)}, }, }, + "KueueDefaulting enabled, default lq created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: true, + job: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{}, + }, + }, + want: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{constants.QueueLabel: "default"}, + }, + }, + }, + "KueueDefaulting enabled, default lq created, job has queue label": { + kueueDefaulting: true, + defaultLqExist: true, + job: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{constants.QueueLabel: "queue"}, + }, + }, + want: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{constants.QueueLabel: "queue"}, + }, + }, + }, + "KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: false, + job: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{}, + }, + }, + want: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{}, + }, + }, + }, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting) + clientBuilder := utiltesting.NewClientBuilder(). + WithObjects( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}, + ) + cl := clientBuilder.Build() + cqCache := cache.New(cl) + queueManager := queue.NewManager(cl, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } w := &jobframework.BaseWebhook{ ManageJobsWithoutQueueName: tc.manageJobsWithoutQueueName, FromObject: makeTestGenericJob().fromObject, + Queues: queueManager, } if err := w.Default(context.Background(), tc.job); err != nil { t.Errorf("set defaults to a kubeflow/mpijob by a Defaulter") diff --git a/pkg/controller/jobframework/defaults.go b/pkg/controller/jobframework/defaults.go index d24e471331e..c9d49f9a217 100644 --- a/pkg/controller/jobframework/defaults.go +++ b/pkg/controller/jobframework/defaults.go @@ -18,6 +18,8 @@ package jobframework import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/queue" ) func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) { @@ -32,3 +34,14 @@ func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) { } } } + +func ApplyDefaultLocalQueue(job GenericJob) { + labels := job.Object().GetLabels() + if labels == nil { + labels = make(map[string]string) + } + if labels[constants.QueueLabel] == "" { + labels[constants.QueueLabel] = queue.DefaultLocalQueueName + job.Object().SetLabels(labels) + } +} diff --git a/pkg/controller/jobs/deployment/deployment_webhook.go b/pkg/controller/jobs/deployment/deployment_webhook.go index 4489d0b3d9a..e711cff06a5 100644 --- a/pkg/controller/jobs/deployment/deployment_webhook.go +++ b/pkg/controller/jobs/deployment/deployment_webhook.go @@ -30,15 +30,20 @@ import ( "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobframework/webhook" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" ) type Webhook struct { client client.Client + queues *queue.Manager } -func SetupWebhook(mgr ctrl.Manager, _ ...jobframework.Option) error { +func SetupWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error { + options := jobframework.ProcessOptions(opts...) wh := &Webhook{ client: mgr.GetClient(), + queues: options.Queues, } obj := &appsv1.Deployment{} return webhook.WebhookManagedBy(mgr). @@ -58,6 +63,23 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("deployment-webhook") log.V(5).Info("Applying defaults") + cqLabel, ok := deployment.Labels[constants.QueueLabel] + if ok || features.Enabled(features.KueueDefaulting) && wh.queues.DefaultLocalQueue(deployment.ObjectMeta.Namespace) != "" { + if deployment.Spec.Template.Labels == nil { + deployment.Spec.Template.Labels = make(map[string]string, 1) + } + } + if cqLabel != "" { + deployment.Spec.Template.Labels[constants.QueueLabel] = cqLabel + } else if features.Enabled(features.KueueDefaulting) && wh.queues.DefaultLocalQueue(deployment.ObjectMeta.Namespace) != "" { + queue := queue.DefaultLocalQueueName + if deployment.Labels == nil { + deployment.Labels = make(map[string]string) + } + deployment.Labels[constants.QueueLabel] = queue + deployment.Spec.Template.Labels[constants.QueueLabel] = queue + } + if queueName := jobframework.QueueNameForObject(deployment.Object()); queueName != "" { if deployment.Spec.Template.Labels == nil { deployment.Spec.Template.Labels = make(map[string]string, 1) diff --git a/pkg/controller/jobs/deployment/deployment_webhook_test.go b/pkg/controller/jobs/deployment/deployment_webhook_test.go index e2371ba0e75..0f6f114ca15 100644 --- a/pkg/controller/jobs/deployment/deployment_webhook_test.go +++ b/pkg/controller/jobs/deployment/deployment_webhook_test.go @@ -25,15 +25,20 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingdeployment "sigs.k8s.io/kueue/pkg/util/testingjobs/deployment" ) func TestDefault(t *testing.T) { testCases := map[string]struct { - deployment *appsv1.Deployment - want *appsv1.Deployment + deployment *appsv1.Deployment + kueueDefaulting bool + defaultLqExist bool + want *appsv1.Deployment }{ "deployment without queue": { deployment: testingdeployment.MakeDeployment("test-pod", "").Obj(), @@ -62,20 +67,54 @@ func TestDefault(t *testing.T) { deployment: testingdeployment.MakeDeployment("test-pod", "").PodTemplateSpecQueue("test-queue").Obj(), want: testingdeployment.MakeDeployment("test-pod", "").PodTemplateSpecQueue("test-queue").Obj(), }, + "KueueDefaulting enabled, default lq created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: true, + deployment: testingdeployment.MakeDeployment("test-pod", "default").Obj(), + want: testingdeployment.MakeDeployment("test-pod", "default"). + Queue("default"). + PodTemplateSpecQueue("default"). + Obj(), + }, + "KueueDefaulting enabled, default lq created, job has queue label": { + kueueDefaulting: true, + defaultLqExist: true, + deployment: testingdeployment.MakeDeployment("test-pod", "").Queue("test-queue").Obj(), + want: testingdeployment.MakeDeployment("test-pod", ""). + Queue("test-queue"). + PodTemplateSpecQueue("test-queue"). + Obj(), + }, + "KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: false, + deployment: testingdeployment.MakeDeployment("test-pod", "").Obj(), + want: testingdeployment.MakeDeployment("test-pod", ""). + Obj(), + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting) t.Cleanup(jobframework.EnableIntegrationsForTest(t, "pod")) builder := utiltesting.NewClientBuilder() - client := builder.Build() - + cli := builder.Build() + cqCache := cache.New(cli) + queueManager := queue.NewManager(cli, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue"). + Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } w := &Webhook{ - client: client, + client: cli, + queues: queueManager, } - ctx, _ := utiltesting.ContextWithLog(t) - if err := w.Default(ctx, tc.deployment); err != nil { t.Errorf("failed to set defaults for v1/deployment: %s", err) } diff --git a/pkg/controller/jobs/job/job_webhook.go b/pkg/controller/jobs/job/job_webhook.go index e7859627578..70cc3b43bba 100644 --- a/pkg/controller/jobs/job/job_webhook.go +++ b/pkg/controller/jobs/job/job_webhook.go @@ -77,7 +77,11 @@ func (w *JobWebhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("job-webhook") log.V(5).Info("Applying defaults", "job", klog.KObj(job)) - jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName) + if features.Enabled(features.KueueDefaulting) && w.queues.DefaultLocalQueue(job.ObjectMeta.Namespace) != "" { + jobframework.ApplyDefaultLocalQueue(job) + } else { + jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName) + } if canDefaultManagedBy(job.Spec.ManagedBy) { localQueueName, found := job.Labels[constants.QueueLabel] diff --git a/pkg/controller/jobs/job/job_webhook_test.go b/pkg/controller/jobs/job/job_webhook_test.go index fc4e8382b44..9b3fa884f6c 100644 --- a/pkg/controller/jobs/job/job_webhook_test.go +++ b/pkg/controller/jobs/job/job_webhook_test.go @@ -551,6 +551,8 @@ func TestDefault(t *testing.T) { manageJobsWithoutQueueName bool multiKueueEnabled bool multiKueueBatchJobWithManagedByEnabled bool + kueueDefaulting bool + defaultLqExist bool want *batchv1.Job wantErr error }{ @@ -637,11 +639,35 @@ func TestDefault(t *testing.T) { multiKueueEnabled: true, multiKueueBatchJobWithManagedByEnabled: true, }, + "KueueDefaulting enabled, default lq created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: true, + job: testingutil.MakeJob("test-job", "default").Obj(), + want: testingutil.MakeJob("test-job", "default"). + Queue("default"). + Obj(), + }, + "KueueDefaulting enabled, default lq created, job has queue label": { + kueueDefaulting: true, + defaultLqExist: true, + job: testingutil.MakeJob("test-job", "").Queue("test-queue").Obj(), + want: testingutil.MakeJob("test-job", ""). + Queue("test-queue"). + Obj(), + }, + "KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: false, + job: testingutil.MakeJob("test-job", "").Obj(), + want: testingutil.MakeJob("test-job", ""). + Obj(), + }, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { features.SetFeatureGateDuringTest(t, features.MultiKueue, tc.multiKueueEnabled) features.SetFeatureGateDuringTest(t, features.MultiKueueBatchJobWithManagedBy, tc.multiKueueBatchJobWithManagedByEnabled) + features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting) ctx, _ := utiltesting.ContextWithLog(t) @@ -652,6 +678,12 @@ func TestDefault(t *testing.T) { cl := clientBuilder.Build() cqCache := cache.New(cl) queueManager := queue.NewManager(cl, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } for _, q := range tc.queues { if err := queueManager.AddLocalQueue(ctx, &q); err != nil { diff --git a/pkg/controller/jobs/jobset/jobset_webhook.go b/pkg/controller/jobs/jobset/jobset_webhook.go index 04e2dc78d0f..8804b7b7b8a 100644 --- a/pkg/controller/jobs/jobset/jobset_webhook.go +++ b/pkg/controller/jobs/jobset/jobset_webhook.go @@ -72,7 +72,11 @@ func (w *JobSetWebhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("jobset-webhook") log.V(5).Info("Applying defaults", "jobset", klog.KObj(jobSet)) - jobframework.ApplyDefaultForSuspend(jobSet, w.manageJobsWithoutQueueName) + if features.Enabled(features.KueueDefaulting) && w.queues.DefaultLocalQueue(jobSet.ObjectMeta.Namespace) != "" { + jobframework.ApplyDefaultLocalQueue(jobSet) + } else { + jobframework.ApplyDefaultForSuspend(jobSet, w.manageJobsWithoutQueueName) + } if canDefaultManagedBy(jobSet.Spec.ManagedBy) { localQueueName, found := jobSet.Labels[constants.QueueLabel] diff --git a/pkg/controller/jobs/jobset/jobset_webhook_test.go b/pkg/controller/jobs/jobset/jobset_webhook_test.go index 5ef33de8708..263b8550f49 100644 --- a/pkg/controller/jobs/jobset/jobset_webhook_test.go +++ b/pkg/controller/jobs/jobset/jobset_webhook_test.go @@ -170,6 +170,9 @@ func TestDefault(t *testing.T) { clusterQueues []kueue.ClusterQueue admissionCheck *kueue.AdmissionCheck multiKueueEnabled bool + kueueDefaulting bool + defaultLqExist bool + want *jobset.JobSet wantManagedBy *string wantErr error }{ @@ -362,11 +365,72 @@ func TestDefault(t *testing.T) { multiKueueEnabled: true, wantManagedBy: nil, }, + { + name: "KueueDefaulting enabled, default lq created, job doesn't have queue label", + kueueDefaulting: true, + defaultLqExist: true, + jobSet: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + want: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "default", + }, + }, + }, + }, + { + name: "KueueDefaulting enabled, default lq created, job has queue label", + kueueDefaulting: true, + defaultLqExist: true, + jobSet: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "local-queue", + }, + }, + }, + want: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "local-queue", + }, + }, + }, + }, + { + name: "KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label", + kueueDefaulting: true, + defaultLqExist: false, + jobSet: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + want: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { features.SetFeatureGateDuringTest(t, features.MultiKueue, tc.multiKueueEnabled) + features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting) ctx, _ := utiltesting.ContextWithLog(t) @@ -378,6 +442,13 @@ func TestDefault(t *testing.T) { cqCache := cache.New(cl) queueManager := queue.NewManager(cl, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } + for _, q := range tc.queues { if err := queueManager.AddLocalQueue(ctx, &q); err != nil { t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err) @@ -407,6 +478,11 @@ func TestDefault(t *testing.T) { if diff := cmp.Diff(tc.wantManagedBy, tc.jobSet.Spec.ManagedBy); diff != "" { t.Errorf("Default() jobSet.Spec.ManagedBy mismatch (-want +got):\n%s", diff) } + if tc.want != nil { + if diff := cmp.Diff(tc.want, tc.jobSet); diff != "" { + t.Errorf("Default() mismatch (-want,+got):\n%s", diff) + } + } }) } } diff --git a/pkg/controller/jobs/mpijob/mpijob_webhook.go b/pkg/controller/jobs/mpijob/mpijob_webhook.go index 6e612e05895..367785a1ac1 100644 --- a/pkg/controller/jobs/mpijob/mpijob_webhook.go +++ b/pkg/controller/jobs/mpijob/mpijob_webhook.go @@ -76,7 +76,11 @@ func (w *MpiJobWebhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("mpijob-webhook") log.V(5).Info("Applying defaults", "mpijob", klog.KObj(mpiJob)) - jobframework.ApplyDefaultForSuspend(mpiJob, w.manageJobsWithoutQueueName) + if features.Enabled(features.KueueDefaulting) && w.queues.DefaultLocalQueue(mpiJob.ObjectMeta.Namespace) != "" { + jobframework.ApplyDefaultLocalQueue(mpiJob) + } else { + jobframework.ApplyDefaultForSuspend(mpiJob, w.manageJobsWithoutQueueName) + } if canDefaultManagedBy(mpiJob.Spec.RunPolicy.ManagedBy) { localQueueName, found := mpiJob.Labels[constants.QueueLabel] diff --git a/pkg/controller/jobs/mpijob/mpijob_webhook_test.go b/pkg/controller/jobs/mpijob/mpijob_webhook_test.go index 086e664dd74..71c26c8f365 100644 --- a/pkg/controller/jobs/mpijob/mpijob_webhook_test.go +++ b/pkg/controller/jobs/mpijob/mpijob_webhook_test.go @@ -141,6 +141,9 @@ func TestDefault(t *testing.T) { clusterQueues []kueue.ClusterQueue admissionCheck *kueue.AdmissionCheck multiKueueEnabled bool + kueueDefaulting bool + defaultLqExist bool + want *v2beta1.MPIJob wantManagedBy *string wantErr error }{ @@ -337,11 +340,72 @@ func TestDefault(t *testing.T) { multiKueueEnabled: true, wantManagedBy: nil, }, + { + name: "KueueDefaulting enabled, default lq created, job doesn't have queue label", + kueueDefaulting: true, + defaultLqExist: true, + mpiJob: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + want: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "default", + }, + }, + }, + }, + { + name: "KueueDefaulting enabled, default lq created, job has queue label", + kueueDefaulting: true, + defaultLqExist: true, + mpiJob: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "local-queue", + }, + }, + }, + want: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "local-queue", + }, + }, + }, + }, + { + name: "KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label", + kueueDefaulting: true, + defaultLqExist: false, + mpiJob: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + want: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { features.SetFeatureGateDuringTest(t, features.MultiKueue, tc.multiKueueEnabled) + features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting) ctx, _ := utiltesting.ContextWithLog(t) @@ -353,6 +417,13 @@ func TestDefault(t *testing.T) { cqCache := cache.New(cl) queueManager := queue.NewManager(cl, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } + for _, q := range tc.queues { if err := queueManager.AddLocalQueue(ctx, &q); err != nil { t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err) @@ -382,6 +453,11 @@ func TestDefault(t *testing.T) { if diff := cmp.Diff(tc.wantManagedBy, tc.mpiJob.Spec.RunPolicy.ManagedBy); diff != "" { t.Errorf("Default() mpijob.Spec.RunPolicy.ManagedBy mismatch (-want +got):\n%s", diff) } + if tc.want != nil { + if diff := cmp.Diff(tc.want, tc.mpiJob); diff != "" { + t.Errorf("Default() mismatch (-want,+got):\n%s", diff) + } + } }) } } diff --git a/pkg/controller/jobs/pod/pod_webhook.go b/pkg/controller/jobs/pod/pod_webhook.go index 5640fb7277e..4563a98fe9b 100644 --- a/pkg/controller/jobs/pod/pod_webhook.go +++ b/pkg/controller/jobs/pod/pod_webhook.go @@ -39,6 +39,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobframework/webhook" "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" utilpod "sigs.k8s.io/kueue/pkg/util/pod" ) @@ -67,6 +68,7 @@ var ( type PodWebhook struct { client client.Client + queues *queue.Manager manageJobsWithoutQueueName bool namespaceSelector *metav1.LabelSelector podSelector *metav1.LabelSelector @@ -81,6 +83,7 @@ func SetupWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error { } wh := &PodWebhook{ client: mgr.GetClient(), + queues: options.Queues, manageJobsWithoutQueueName: options.ManageJobsWithoutQueueName, namespaceSelector: podOpts.NamespaceSelector, podSelector: podOpts.PodSelector, @@ -148,6 +151,10 @@ func (w *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { return nil } + if features.Enabled(features.KueueDefaulting) { + jobframework.ApplyDefaultLocalQueue(pod) + } + // Check for pod label selector match podSelector, err := metav1.LabelSelectorAsSelector(w.podSelector) if err != nil { @@ -175,7 +182,7 @@ func (w *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { return nil } - if jobframework.QueueName(pod) != "" || w.manageJobsWithoutQueueName { + if jobframework.QueueName(pod) != "" || w.manageJobsWithoutQueueName || features.Enabled(features.KueueDefaulting) { controllerutil.AddFinalizer(pod.Object(), PodFinalizer) if pod.pod.Labels == nil { diff --git a/pkg/controller/jobs/raycluster/raycluster_webhook.go b/pkg/controller/jobs/raycluster/raycluster_webhook.go index e1ad061ef6f..f46b8be2293 100644 --- a/pkg/controller/jobs/raycluster/raycluster_webhook.go +++ b/pkg/controller/jobs/raycluster/raycluster_webhook.go @@ -27,6 +27,8 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobframework/webhook" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" ) var ( @@ -36,6 +38,7 @@ var ( ) type RayClusterWebhook struct { + queues *queue.Manager manageJobsWithoutQueueName bool } @@ -65,7 +68,11 @@ func (w *RayClusterWebhook) Default(ctx context.Context, obj runtime.Object) err job := fromObject(obj) log := ctrl.LoggerFrom(ctx).WithName("raycluster-webhook") log.V(10).Info("Applying defaults", "job", klog.KObj(job)) - jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName) + if features.Enabled(features.KueueDefaulting) && w.queues.DefaultLocalQueue(job.ObjectMeta.Namespace) != "" { + jobframework.ApplyDefaultLocalQueue(job) + } else { + jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName) + } return nil } diff --git a/pkg/controller/jobs/raycluster/raycluster_webhook_test.go b/pkg/controller/jobs/raycluster/raycluster_webhook_test.go index 75de14db660..24ac322c0bf 100644 --- a/pkg/controller/jobs/raycluster/raycluster_webhook_test.go +++ b/pkg/controller/jobs/raycluster/raycluster_webhook_test.go @@ -27,7 +27,11 @@ import ( "k8s.io/utils/ptr" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingrayutil "sigs.k8s.io/kueue/pkg/util/testingjobs/raycluster" ) @@ -38,9 +42,11 @@ var ( func TestValidateDefault(t *testing.T) { testcases := map[string]struct { - oldJob *rayv1.RayCluster - newJob *rayv1.RayCluster - manageAll bool + oldJob *rayv1.RayCluster + newJob *rayv1.RayCluster + manageAll bool + kueueDefaulting bool + defaultLqExist bool }{ "unmanaged": { oldJob: testingrayutil.MakeCluster("job", "ns"). @@ -69,12 +75,49 @@ func TestValidateDefault(t *testing.T) { Suspend(true). Obj(), }, + "KueueDefaulting enabled, default lq created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: true, + oldJob: testingrayutil.MakeCluster("test-job", "default").Obj(), + newJob: testingrayutil.MakeCluster("test-job", "default"). + Queue("default"). + Obj(), + }, + "KueueDefaulting enabled, default lq created, job has queue label": { + kueueDefaulting: true, + defaultLqExist: true, + oldJob: testingrayutil.MakeCluster("test-job", "").Queue("test-queue").Obj(), + newJob: testingrayutil.MakeCluster("test-job", ""). + Queue("test-queue"). + Obj(), + }, + "KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: false, + oldJob: testingrayutil.MakeCluster("test-job", "").Obj(), + newJob: testingrayutil.MakeCluster("test-job", ""). + Obj(), + }, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { + features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting) + ctx, _ := utiltesting.ContextWithLog(t) + builder := utiltesting.NewClientBuilder() + cli := builder.Build() + cqCache := cache.New(cli) + queueManager := queue.NewManager(cli, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } + wh := &RayClusterWebhook{ manageJobsWithoutQueueName: tc.manageAll, + queues: queueManager, } result := tc.oldJob.DeepCopy() if err := wh.Default(context.Background(), result); err != nil { diff --git a/pkg/controller/jobs/rayjob/rayjob_webhook.go b/pkg/controller/jobs/rayjob/rayjob_webhook.go index ef3c66f906c..bac60b91a98 100644 --- a/pkg/controller/jobs/rayjob/rayjob_webhook.go +++ b/pkg/controller/jobs/rayjob/rayjob_webhook.go @@ -30,6 +30,8 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobframework/webhook" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" ) var ( @@ -40,6 +42,7 @@ var ( type RayJobWebhook struct { manageJobsWithoutQueueName bool + queues *queue.Manager } // SetupRayJobWebhook configures the webhook for RayJob. @@ -65,7 +68,11 @@ func (w *RayJobWebhook) Default(ctx context.Context, obj runtime.Object) error { job := obj.(*rayv1.RayJob) log := ctrl.LoggerFrom(ctx).WithName("rayjob-webhook") log.V(5).Info("Applying defaults", "job", klog.KObj(job)) - jobframework.ApplyDefaultForSuspend((*RayJob)(job), w.manageJobsWithoutQueueName) + if features.Enabled(features.KueueDefaulting) && w.queues.DefaultLocalQueue(job.ObjectMeta.Namespace) != "" { + jobframework.ApplyDefaultLocalQueue((*RayJob)(job)) + } else { + jobframework.ApplyDefaultForSuspend((*RayJob)(job), w.manageJobsWithoutQueueName) + } return nil } diff --git a/pkg/controller/jobs/rayjob/rayjob_webhook_test.go b/pkg/controller/jobs/rayjob/rayjob_webhook_test.go index 03e46631c80..69ba76ba9b6 100644 --- a/pkg/controller/jobs/rayjob/rayjob_webhook_test.go +++ b/pkg/controller/jobs/rayjob/rayjob_webhook_test.go @@ -30,7 +30,11 @@ import ( "k8s.io/utils/ptr" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingrayutil "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" ) @@ -41,9 +45,11 @@ var ( func TestValidateDefault(t *testing.T) { testcases := map[string]struct { - oldJob *rayv1.RayJob - newJob *rayv1.RayJob - manageAll bool + oldJob *rayv1.RayJob + newJob *rayv1.RayJob + manageAll bool + kueueDefaulting bool + defaultLqExist bool }{ "unmanaged": { oldJob: testingrayutil.MakeJob("job", "ns"). @@ -72,12 +78,48 @@ func TestValidateDefault(t *testing.T) { Suspend(true). Obj(), }, + "KueueDefaulting enabled, default lq created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: true, + oldJob: testingrayutil.MakeJob("test-job", "default").Obj(), + newJob: testingrayutil.MakeJob("test-job", "default"). + Queue("default"). + Obj(), + }, + "KueueDefaulting enabled, default lq created, job has queue label": { + kueueDefaulting: true, + defaultLqExist: true, + oldJob: testingrayutil.MakeJob("test-job", "").Queue("test-queue").Obj(), + newJob: testingrayutil.MakeJob("test-job", ""). + Queue("test-queue"). + Obj(), + }, + "KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: false, + oldJob: testingrayutil.MakeJob("test-job", "").Obj(), + newJob: testingrayutil.MakeJob("test-job", ""). + Obj(), + }, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { + features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting) + ctx, _ := utiltesting.ContextWithLog(t) + builder := utiltesting.NewClientBuilder() + cli := builder.Build() + cqCache := cache.New(cli) + queueManager := queue.NewManager(cli, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } wh := &RayJobWebhook{ manageJobsWithoutQueueName: tc.manageAll, + queues: queueManager, } result := tc.oldJob.DeepCopy() if err := wh.Default(context.Background(), result); err != nil { diff --git a/pkg/controller/jobs/statefulset/statefulset_webhook.go b/pkg/controller/jobs/statefulset/statefulset_webhook.go index c7b381e2e4b..a1206ce83cd 100644 --- a/pkg/controller/jobs/statefulset/statefulset_webhook.go +++ b/pkg/controller/jobs/statefulset/statefulset_webhook.go @@ -33,10 +33,13 @@ import ( "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" ) type Webhook struct { client client.Client + queues *queue.Manager manageJobsWithoutQueueName bool } @@ -62,9 +65,19 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("statefulset-webhook") log.V(5).Info("Applying defaults") - cqLabel, ok := ss.Labels[constants.QueueLabel] - if !ok { - return nil + labels := ss.Labels + cqLabel, ok := labels[constants.QueueLabel] + if !ok || cqLabel == "" { + if features.Enabled(features.KueueDefaulting) && wh.queues.DefaultLocalQueue(ss.ObjectMeta.Namespace) != "" { + cqLabel = queue.DefaultLocalQueueName + if labels == nil { + labels = make(map[string]string) + } + labels[constants.QueueLabel] = cqLabel + ss.SetLabels(labels) + } else { + return nil + } } if ss.Spec.Template.Labels == nil { diff --git a/pkg/controller/jobs/statefulset/statefulset_webhook_test.go b/pkg/controller/jobs/statefulset/statefulset_webhook_test.go index 709cf3d488b..7e04c73e471 100644 --- a/pkg/controller/jobs/statefulset/statefulset_webhook_test.go +++ b/pkg/controller/jobs/statefulset/statefulset_webhook_test.go @@ -29,9 +29,12 @@ import ( "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingstatefulset "sigs.k8s.io/kueue/pkg/util/testingjobs/statefulset" ) @@ -40,6 +43,8 @@ func TestDefault(t *testing.T) { testCases := map[string]struct { statefulset *appsv1.StatefulSet manageJobsWithoutQueueName bool + kueueDefaulting bool + defaultLqExist bool enableIntegrations []string want *appsv1.StatefulSet }{ @@ -71,21 +76,61 @@ func TestDefault(t *testing.T) { PodTemplateSpecPodGroupFastAdmissionAnnotation(true). Obj(), }, + "KueueDefaulting enabled, default lq created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: true, + statefulset: testingstatefulset.MakeStatefulSet("test-pod", "default").Obj(), + want: testingstatefulset.MakeStatefulSet("test-pod", "default"). + Queue("default"). + PodTemplateSpecQueue("default"). + PodTemplateSpecPodGroupNameLabel("test-pod", "", gvk). + PodTemplateSpecPodGroupTotalCountAnnotation(1). + PodTemplateSpecPodGroupFastAdmissionAnnotation(true). + Obj(), + }, + "KueueDefaulting enabled, default lq created, job has queue label": { + kueueDefaulting: true, + defaultLqExist: true, + statefulset: testingstatefulset.MakeStatefulSet("test-pod", "").Queue("test-queue").Obj(), + want: testingstatefulset.MakeStatefulSet("test-pod", ""). + Queue("test-queue"). + PodTemplateSpecQueue("test-queue"). + PodTemplateSpecPodGroupNameLabel("test-pod", "", gvk). + PodTemplateSpecPodGroupTotalCountAnnotation(1). + PodTemplateSpecPodGroupFastAdmissionAnnotation(true). + Obj(), + }, + "KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label": { + kueueDefaulting: true, + defaultLqExist: false, + statefulset: testingstatefulset.MakeStatefulSet("test-pod", "").Obj(), + want: testingstatefulset.MakeStatefulSet("test-pod", "").Obj(), + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting) t.Cleanup(jobframework.EnableIntegrationsForTest(t, tc.enableIntegrations...)) + ctx, _ := utiltesting.ContextWithLog(t) + builder := utiltesting.NewClientBuilder() cli := builder.Build() + cqCache := cache.New(cli) + queueManager := queue.NewManager(cli, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } w := &Webhook{ client: cli, manageJobsWithoutQueueName: tc.manageJobsWithoutQueueName, + queues: queueManager, } - ctx, _ := utiltesting.ContextWithLog(t) - if err := w.Default(ctx, tc.statefulset); err != nil { t.Errorf("failed to set defaults for v1/statefulset: %s", err) } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index aef372aae6a..6d626e35017 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -146,6 +146,12 @@ const ( // // Workloads keeps allocated quota and preserves QuotaReserved=True when ProvisioningRequest fails KeepQuotaForProvReqRetry featuregate.Feature = "KeepQuotaForProvReqRetry" + + // owner: @yaroslava-serdiuk + // alpha: v0.9 + // + // Enable to set default LocalQueue. + KueueDefaulting featuregate.Feature = "KueueDefaulting" ) func init() { @@ -175,6 +181,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ ExposeFlavorsInLocalQueue: {Default: true, PreRelease: featuregate.Beta}, AdmissionCheckValidationRules: {Default: false, PreRelease: featuregate.Deprecated}, KeepQuotaForProvReqRetry: {Default: false, PreRelease: featuregate.Deprecated}, + KueueDefaulting: {Default: false, PreRelease: featuregate.Alpha}, } func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) { diff --git a/pkg/queue/local_queue.go b/pkg/queue/local_queue.go index 8fd13bf00c0..e5a085cf32e 100644 --- a/pkg/queue/local_queue.go +++ b/pkg/queue/local_queue.go @@ -23,11 +23,17 @@ import ( "sigs.k8s.io/kueue/pkg/workload" ) +const DefaultLocalQueueName = "default" + // Key is the key used to index the queue. func Key(q *kueue.LocalQueue) string { return fmt.Sprintf("%s/%s", q.Namespace, q.Name) } +func DefaultQueue(namespace string) string { + return fmt.Sprintf("%s/%s", namespace, DefaultLocalQueueName) +} + // LocalQueue is the internal implementation of kueue.LocalQueue. type LocalQueue struct { Key string diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index dcc5f89ba30..a7672eae0cc 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -206,6 +206,16 @@ func (m *Manager) DeleteClusterQueue(cq *kueue.ClusterQueue) { metrics.ClearClusterQueueMetrics(cq.Name) } +func (m *Manager) DefaultLocalQueue(namespace string) string { + m.Lock() + defer m.Unlock() + + if _, ok := m.localQueues[DefaultQueue(namespace)]; ok { + return DefaultQueue(namespace) + } + return "" +} + func (m *Manager) AddLocalQueue(ctx context.Context, q *kueue.LocalQueue) error { m.Lock() defer m.Unlock()