diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 0d5f2c224b..4f723b0dd4 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -43,4 +43,6 @@ const ( // ManagedByKueueLabel label that signalize that an object is managed by Kueue ManagedByKueueLabel = "kueue.x-k8s.io/managed" + + DefaultLocalQueueName = "default" ) diff --git a/pkg/controller/jobframework/base_webhook.go b/pkg/controller/jobframework/base_webhook.go index 70060fae6b..3e1afc95e6 100644 --- a/pkg/controller/jobframework/base_webhook.go +++ b/pkg/controller/jobframework/base_webhook.go @@ -24,12 +24,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" "sigs.k8s.io/kueue/pkg/controller/jobframework/webhook" + "sigs.k8s.io/kueue/pkg/queue" ) // BaseWebhook applies basic defaulting and validation for jobs. type BaseWebhook struct { ManageJobsWithoutQueueName bool FromObject func(runtime.Object) GenericJob + Queues *queue.Manager } func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJob) func(ctrl.Manager, ...Option) error { @@ -38,6 +40,7 @@ func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJ wh := &BaseWebhook{ ManageJobsWithoutQueueName: options.ManageJobsWithoutQueueName, FromObject: fromObject, + Queues: options.Queues, } return webhook.WebhookManagedBy(mgr). For(job.Object()). @@ -54,6 +57,7 @@ func (w *BaseWebhook) Default(ctx context.Context, obj runtime.Object) error { job := w.FromObject(obj) log := ctrl.LoggerFrom(ctx) log.V(5).Info("Applying defaults") + ApplyDefaultLocalQueue(job, w.Queues.DefaultLocalQueue(job.Object().GetNamespace())) ApplyDefaultForSuspend(job, w.ManageJobsWithoutQueueName) return nil } diff --git a/pkg/controller/jobframework/base_webhook_test.go b/pkg/controller/jobframework/base_webhook_test.go index 60d2f5f9e7..0bed141e84 100644 --- a/pkg/controller/jobframework/base_webhook_test.go +++ b/pkg/controller/jobframework/base_webhook_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/go-cmp/cmp" batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -31,9 +32,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/podset" + "sigs.k8s.io/kueue/pkg/queue" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" ) type testGenericJob struct { @@ -125,9 +130,10 @@ func makeTestGenericJob() *testGenericJob { func TestBaseWebhookDefault(t *testing.T) { testcases := map[string]struct { manageJobsWithoutQueueName bool - - job *batchv1.Job - want *batchv1.Job + LocalQueueDefaulting bool + defaultLqExist bool + job *batchv1.Job + want *batchv1.Job }{ "update the suspend field with 'manageJobsWithoutQueueName=false'": { job: &batchv1.Job{ @@ -164,12 +170,82 @@ func TestBaseWebhookDefault(t *testing.T) { Spec: batchv1.JobSpec{Suspend: ptr.To(true)}, }, }, + "LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": { + LocalQueueDefaulting: true, + defaultLqExist: true, + job: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{}, + }, + }, + want: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{constants.QueueLabel: "default"}, + }, + }, + }, + "LocalQueueDefaulting enabled, default lq created, job has queue label": { + LocalQueueDefaulting: true, + defaultLqExist: true, + job: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{constants.QueueLabel: "queue"}, + }, + }, + want: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{constants.QueueLabel: "queue"}, + }, + }, + }, + "LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": { + LocalQueueDefaulting: true, + defaultLqExist: false, + job: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{}, + }, + }, + want: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + Labels: map[string]string{}, + }, + }, + }, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.LocalQueueDefaulting) + clientBuilder := utiltesting.NewClientBuilder(). + WithObjects( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}, + ) + cl := clientBuilder.Build() + cqCache := cache.New(cl) + queueManager := queue.NewManager(cl, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } w := &jobframework.BaseWebhook{ ManageJobsWithoutQueueName: tc.manageJobsWithoutQueueName, FromObject: makeTestGenericJob().fromObject, + Queues: queueManager, } if err := w.Default(context.Background(), tc.job); err != nil { t.Errorf("set defaults to a kubeflow/mpijob by a Defaulter") diff --git a/pkg/controller/jobframework/defaults.go b/pkg/controller/jobframework/defaults.go index d24e471331..5408bca80d 100644 --- a/pkg/controller/jobframework/defaults.go +++ b/pkg/controller/jobframework/defaults.go @@ -18,6 +18,8 @@ package jobframework import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/features" ) func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) { @@ -32,3 +34,17 @@ func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) { } } } + +func ApplyDefaultLocalQueue(job GenericJob, defaultQueue string) { + if !features.Enabled(features.LocalQueueDefaulting) || defaultQueue == "" { + return + } + labels := job.Object().GetLabels() + if labels == nil { + labels = make(map[string]string, 1) + } + if labels[constants.QueueLabel] == "" { + labels[constants.QueueLabel] = defaultQueue + job.Object().SetLabels(labels) + } +} diff --git a/pkg/controller/jobs/deployment/deployment_webhook.go b/pkg/controller/jobs/deployment/deployment_webhook.go index 4489d0b3d9..bead2cd012 100644 --- a/pkg/controller/jobs/deployment/deployment_webhook.go +++ b/pkg/controller/jobs/deployment/deployment_webhook.go @@ -27,18 +27,24 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + kueue_constants "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobframework/webhook" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" ) type Webhook struct { client client.Client + queues *queue.Manager } -func SetupWebhook(mgr ctrl.Manager, _ ...jobframework.Option) error { +func SetupWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error { + options := jobframework.ProcessOptions(opts...) wh := &Webhook{ client: mgr.GetClient(), + queues: options.Queues, } obj := &appsv1.Deployment{} return webhook.WebhookManagedBy(mgr). @@ -58,6 +64,23 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("deployment-webhook") log.V(5).Info("Applying defaults") + queue, ok := deployment.Labels[constants.QueueLabel] + if ok || features.Enabled(features.LocalQueueDefaulting) && wh.queues.DefaultLocalQueue(deployment.ObjectMeta.Namespace) != "" { + if deployment.Spec.Template.Labels == nil { + deployment.Spec.Template.Labels = make(map[string]string, 1) + } + } + if ok { + deployment.Spec.Template.Labels[constants.QueueLabel] = queue + } else if features.Enabled(features.LocalQueueDefaulting) && wh.queues.DefaultLocalQueue(deployment.ObjectMeta.Namespace) != "" { + queue := kueue_constants.DefaultLocalQueueName + if deployment.Labels == nil { + deployment.Labels = make(map[string]string) + } + deployment.Labels[constants.QueueLabel] = queue + deployment.Spec.Template.Labels[constants.QueueLabel] = queue + } + if queueName := jobframework.QueueNameForObject(deployment.Object()); queueName != "" { if deployment.Spec.Template.Labels == nil { deployment.Spec.Template.Labels = make(map[string]string, 1) diff --git a/pkg/controller/jobs/deployment/deployment_webhook_test.go b/pkg/controller/jobs/deployment/deployment_webhook_test.go index e2371ba0e7..d55d89a52f 100644 --- a/pkg/controller/jobs/deployment/deployment_webhook_test.go +++ b/pkg/controller/jobs/deployment/deployment_webhook_test.go @@ -25,15 +25,20 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingdeployment "sigs.k8s.io/kueue/pkg/util/testingjobs/deployment" ) func TestDefault(t *testing.T) { testCases := map[string]struct { - deployment *appsv1.Deployment - want *appsv1.Deployment + deployment *appsv1.Deployment + LocalQueueDefaulting bool + defaultLqExist bool + want *appsv1.Deployment }{ "deployment without queue": { deployment: testingdeployment.MakeDeployment("test-pod", "").Obj(), @@ -62,20 +67,54 @@ func TestDefault(t *testing.T) { deployment: testingdeployment.MakeDeployment("test-pod", "").PodTemplateSpecQueue("test-queue").Obj(), want: testingdeployment.MakeDeployment("test-pod", "").PodTemplateSpecQueue("test-queue").Obj(), }, + "LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": { + LocalQueueDefaulting: true, + defaultLqExist: true, + deployment: testingdeployment.MakeDeployment("test-pod", "default").Obj(), + want: testingdeployment.MakeDeployment("test-pod", "default"). + Queue("default"). + PodTemplateSpecQueue("default"). + Obj(), + }, + "LocalQueueDefaulting enabled, default lq created, job has queue label": { + LocalQueueDefaulting: true, + defaultLqExist: true, + deployment: testingdeployment.MakeDeployment("test-pod", "").Queue("test-queue").Obj(), + want: testingdeployment.MakeDeployment("test-pod", ""). + Queue("test-queue"). + PodTemplateSpecQueue("test-queue"). + Obj(), + }, + "LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": { + LocalQueueDefaulting: true, + defaultLqExist: false, + deployment: testingdeployment.MakeDeployment("test-pod", "").Obj(), + want: testingdeployment.MakeDeployment("test-pod", ""). + Obj(), + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.LocalQueueDefaulting) t.Cleanup(jobframework.EnableIntegrationsForTest(t, "pod")) builder := utiltesting.NewClientBuilder() - client := builder.Build() - + cli := builder.Build() + cqCache := cache.New(cli) + queueManager := queue.NewManager(cli, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue"). + Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } w := &Webhook{ - client: client, + client: cli, + queues: queueManager, } - ctx, _ := utiltesting.ContextWithLog(t) - if err := w.Default(ctx, tc.deployment); err != nil { t.Errorf("failed to set defaults for v1/deployment: %s", err) } diff --git a/pkg/controller/jobs/job/job_webhook.go b/pkg/controller/jobs/job/job_webhook.go index 9c38b95b78..56ac27e83b 100644 --- a/pkg/controller/jobs/job/job_webhook.go +++ b/pkg/controller/jobs/job/job_webhook.go @@ -76,6 +76,7 @@ func (w *JobWebhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("job-webhook") log.V(5).Info("Applying defaults") + jobframework.ApplyDefaultLocalQueue(job, w.queues.DefaultLocalQueue(job.ObjectMeta.Namespace)) jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName) if canDefaultManagedBy(job.Spec.ManagedBy) { diff --git a/pkg/controller/jobs/job/job_webhook_test.go b/pkg/controller/jobs/job/job_webhook_test.go index fc4e8382b4..13ee599dd8 100644 --- a/pkg/controller/jobs/job/job_webhook_test.go +++ b/pkg/controller/jobs/job/job_webhook_test.go @@ -551,6 +551,8 @@ func TestDefault(t *testing.T) { manageJobsWithoutQueueName bool multiKueueEnabled bool multiKueueBatchJobWithManagedByEnabled bool + LocalQueueDefaulting bool + defaultLqExist bool want *batchv1.Job wantErr error }{ @@ -637,11 +639,35 @@ func TestDefault(t *testing.T) { multiKueueEnabled: true, multiKueueBatchJobWithManagedByEnabled: true, }, + "LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": { + LocalQueueDefaulting: true, + defaultLqExist: true, + job: testingutil.MakeJob("test-job", "default").Obj(), + want: testingutil.MakeJob("test-job", "default"). + Queue("default"). + Obj(), + }, + "LocalQueueDefaulting enabled, default lq created, job has queue label": { + LocalQueueDefaulting: true, + defaultLqExist: true, + job: testingutil.MakeJob("test-job", "").Queue("test-queue").Obj(), + want: testingutil.MakeJob("test-job", ""). + Queue("test-queue"). + Obj(), + }, + "LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": { + LocalQueueDefaulting: true, + defaultLqExist: false, + job: testingutil.MakeJob("test-job", "").Obj(), + want: testingutil.MakeJob("test-job", ""). + Obj(), + }, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { features.SetFeatureGateDuringTest(t, features.MultiKueue, tc.multiKueueEnabled) features.SetFeatureGateDuringTest(t, features.MultiKueueBatchJobWithManagedBy, tc.multiKueueBatchJobWithManagedByEnabled) + features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.LocalQueueDefaulting) ctx, _ := utiltesting.ContextWithLog(t) @@ -652,6 +678,12 @@ func TestDefault(t *testing.T) { cl := clientBuilder.Build() cqCache := cache.New(cl) queueManager := queue.NewManager(cl, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } for _, q := range tc.queues { if err := queueManager.AddLocalQueue(ctx, &q); err != nil { diff --git a/pkg/controller/jobs/jobset/jobset_webhook.go b/pkg/controller/jobs/jobset/jobset_webhook.go index 2a97dce550..2a1aa3f0e6 100644 --- a/pkg/controller/jobs/jobset/jobset_webhook.go +++ b/pkg/controller/jobs/jobset/jobset_webhook.go @@ -71,6 +71,7 @@ func (w *JobSetWebhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("jobset-webhook") log.V(5).Info("Applying defaults") + jobframework.ApplyDefaultLocalQueue(jobSet, w.queues.DefaultLocalQueue(jobSet.ObjectMeta.Namespace)) jobframework.ApplyDefaultForSuspend(jobSet, w.manageJobsWithoutQueueName) if canDefaultManagedBy(jobSet.Spec.ManagedBy) { diff --git a/pkg/controller/jobs/jobset/jobset_webhook_test.go b/pkg/controller/jobs/jobset/jobset_webhook_test.go index 5ef33de870..c5f405037e 100644 --- a/pkg/controller/jobs/jobset/jobset_webhook_test.go +++ b/pkg/controller/jobs/jobset/jobset_webhook_test.go @@ -164,14 +164,17 @@ func TestValidateUpdate(t *testing.T) { func TestDefault(t *testing.T) { testCases := []struct { - name string - jobSet *jobset.JobSet - queues []kueue.LocalQueue - clusterQueues []kueue.ClusterQueue - admissionCheck *kueue.AdmissionCheck - multiKueueEnabled bool - wantManagedBy *string - wantErr error + name string + jobSet *jobset.JobSet + queues []kueue.LocalQueue + clusterQueues []kueue.ClusterQueue + admissionCheck *kueue.AdmissionCheck + multiKueueEnabled bool + localQueueDefaulting bool + defaultLqExist bool + want *jobset.JobSet + wantManagedBy *string + wantErr error }{ { name: "TestDefault_JobSetManagedBy_jobsetapi.JobSetControllerName", @@ -362,11 +365,72 @@ func TestDefault(t *testing.T) { multiKueueEnabled: true, wantManagedBy: nil, }, + { + name: "LocalQueueDefaulting enabled, default lq created, job doesn't have queue label", + localQueueDefaulting: true, + defaultLqExist: true, + jobSet: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + want: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "default", + }, + }, + }, + }, + { + name: "LocalQueueDefaulting enabled, default lq created, job has queue label", + localQueueDefaulting: true, + defaultLqExist: true, + jobSet: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "local-queue", + }, + }, + }, + want: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "local-queue", + }, + }, + }, + }, + { + name: "LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label", + localQueueDefaulting: true, + defaultLqExist: false, + jobSet: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + want: &jobset.JobSet{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { features.SetFeatureGateDuringTest(t, features.MultiKueue, tc.multiKueueEnabled) + features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.localQueueDefaulting) ctx, _ := utiltesting.ContextWithLog(t) @@ -378,6 +442,13 @@ func TestDefault(t *testing.T) { cqCache := cache.New(cl) queueManager := queue.NewManager(cl, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } + for _, q := range tc.queues { if err := queueManager.AddLocalQueue(ctx, &q); err != nil { t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err) @@ -407,6 +478,11 @@ func TestDefault(t *testing.T) { if diff := cmp.Diff(tc.wantManagedBy, tc.jobSet.Spec.ManagedBy); diff != "" { t.Errorf("Default() jobSet.Spec.ManagedBy mismatch (-want +got):\n%s", diff) } + if tc.want != nil { + if diff := cmp.Diff(tc.want, tc.jobSet); diff != "" { + t.Errorf("Default() mismatch (-want,+got):\n%s", diff) + } + } }) } } diff --git a/pkg/controller/jobs/mpijob/mpijob_webhook.go b/pkg/controller/jobs/mpijob/mpijob_webhook.go index a8f831e8d4..17bc2e2cd1 100644 --- a/pkg/controller/jobs/mpijob/mpijob_webhook.go +++ b/pkg/controller/jobs/mpijob/mpijob_webhook.go @@ -75,6 +75,7 @@ func (w *MpiJobWebhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("mpijob-webhook") log.V(5).Info("Applying defaults") + jobframework.ApplyDefaultLocalQueue(mpiJob, w.queues.DefaultLocalQueue(mpiJob.ObjectMeta.Namespace)) jobframework.ApplyDefaultForSuspend(mpiJob, w.manageJobsWithoutQueueName) if canDefaultManagedBy(mpiJob.Spec.RunPolicy.ManagedBy) { diff --git a/pkg/controller/jobs/mpijob/mpijob_webhook_test.go b/pkg/controller/jobs/mpijob/mpijob_webhook_test.go index 086e664dd7..68858b90dc 100644 --- a/pkg/controller/jobs/mpijob/mpijob_webhook_test.go +++ b/pkg/controller/jobs/mpijob/mpijob_webhook_test.go @@ -135,14 +135,17 @@ func TestValidateCreate(t *testing.T) { func TestDefault(t *testing.T) { testCases := []struct { - name string - mpiJob *v2beta1.MPIJob - queues []kueue.LocalQueue - clusterQueues []kueue.ClusterQueue - admissionCheck *kueue.AdmissionCheck - multiKueueEnabled bool - wantManagedBy *string - wantErr error + name string + mpiJob *v2beta1.MPIJob + queues []kueue.LocalQueue + clusterQueues []kueue.ClusterQueue + admissionCheck *kueue.AdmissionCheck + multiKueueEnabled bool + localQueueDefaulting bool + defaultLqExist bool + want *v2beta1.MPIJob + wantManagedBy *string + wantErr error }{ { name: "TestDefault_MPIJobManagedBy_mpijobapi.MPIJobControllerName", @@ -337,11 +340,72 @@ func TestDefault(t *testing.T) { multiKueueEnabled: true, wantManagedBy: nil, }, + { + name: "LocalQueueDefaulting enabled, default lq created, job doesn't have queue label", + localQueueDefaulting: true, + defaultLqExist: true, + mpiJob: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + want: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "default", + }, + }, + }, + }, + { + name: "LocalQueueDefaulting enabled, default lq created, job has queue label", + localQueueDefaulting: true, + defaultLqExist: true, + mpiJob: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "local-queue", + }, + }, + }, + want: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + Labels: map[string]string{ + constants.QueueLabel: "local-queue", + }, + }, + }, + }, + { + name: "LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label", + localQueueDefaulting: true, + defaultLqExist: false, + mpiJob: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + want: &v2beta1.MPIJob{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test-js", + Namespace: "default", + }, + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { features.SetFeatureGateDuringTest(t, features.MultiKueue, tc.multiKueueEnabled) + features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.localQueueDefaulting) ctx, _ := utiltesting.ContextWithLog(t) @@ -353,6 +417,13 @@ func TestDefault(t *testing.T) { cqCache := cache.New(cl) queueManager := queue.NewManager(cl, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } + for _, q := range tc.queues { if err := queueManager.AddLocalQueue(ctx, &q); err != nil { t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err) @@ -382,6 +453,11 @@ func TestDefault(t *testing.T) { if diff := cmp.Diff(tc.wantManagedBy, tc.mpiJob.Spec.RunPolicy.ManagedBy); diff != "" { t.Errorf("Default() mpijob.Spec.RunPolicy.ManagedBy mismatch (-want +got):\n%s", diff) } + if tc.want != nil { + if diff := cmp.Diff(tc.want, tc.mpiJob); diff != "" { + t.Errorf("Default() mismatch (-want,+got):\n%s", diff) + } + } }) } } diff --git a/pkg/controller/jobs/pod/pod_webhook.go b/pkg/controller/jobs/pod/pod_webhook.go index b634a563ac..f612912275 100644 --- a/pkg/controller/jobs/pod/pod_webhook.go +++ b/pkg/controller/jobs/pod/pod_webhook.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobframework/webhook" "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" utilpod "sigs.k8s.io/kueue/pkg/util/pod" ) @@ -66,6 +67,7 @@ var ( type PodWebhook struct { client client.Client + queues *queue.Manager manageJobsWithoutQueueName bool namespaceSelector *metav1.LabelSelector podSelector *metav1.LabelSelector @@ -80,6 +82,7 @@ func SetupWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error { } wh := &PodWebhook{ client: mgr.GetClient(), + queues: options.Queues, manageJobsWithoutQueueName: options.ManageJobsWithoutQueueName, namespaceSelector: podOpts.NamespaceSelector, podSelector: podOpts.PodSelector, @@ -147,6 +150,8 @@ func (w *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { return nil } + jobframework.ApplyDefaultLocalQueue(pod, w.queues.DefaultLocalQueue(pod.pod.Namespace)) + // Check for pod label selector match podSelector, err := metav1.LabelSelectorAsSelector(w.podSelector) if err != nil { @@ -174,7 +179,7 @@ func (w *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { return nil } - if jobframework.QueueName(pod) != "" || w.manageJobsWithoutQueueName { + if jobframework.QueueName(pod) != "" || w.manageJobsWithoutQueueName || features.Enabled(features.LocalQueueDefaulting) { controllerutil.AddFinalizer(pod.Object(), PodFinalizer) if pod.pod.Labels == nil { diff --git a/pkg/controller/jobs/raycluster/raycluster_webhook.go b/pkg/controller/jobs/raycluster/raycluster_webhook.go index e9403d59c3..83d372ac0c 100644 --- a/pkg/controller/jobs/raycluster/raycluster_webhook.go +++ b/pkg/controller/jobs/raycluster/raycluster_webhook.go @@ -26,6 +26,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobframework/webhook" + "sigs.k8s.io/kueue/pkg/queue" ) var ( @@ -35,6 +36,7 @@ var ( ) type RayClusterWebhook struct { + queues *queue.Manager manageJobsWithoutQueueName bool } @@ -64,6 +66,7 @@ func (w *RayClusterWebhook) Default(ctx context.Context, obj runtime.Object) err job := fromObject(obj) log := ctrl.LoggerFrom(ctx).WithName("raycluster-webhook") log.V(10).Info("Applying defaults") + jobframework.ApplyDefaultLocalQueue(job, w.queues.DefaultLocalQueue(job.Object().GetNamespace())) jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName) return nil } diff --git a/pkg/controller/jobs/raycluster/raycluster_webhook_test.go b/pkg/controller/jobs/raycluster/raycluster_webhook_test.go index 75de14db66..9d2d47a537 100644 --- a/pkg/controller/jobs/raycluster/raycluster_webhook_test.go +++ b/pkg/controller/jobs/raycluster/raycluster_webhook_test.go @@ -27,7 +27,11 @@ import ( "k8s.io/utils/ptr" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingrayutil "sigs.k8s.io/kueue/pkg/util/testingjobs/raycluster" ) @@ -38,9 +42,11 @@ var ( func TestValidateDefault(t *testing.T) { testcases := map[string]struct { - oldJob *rayv1.RayCluster - newJob *rayv1.RayCluster - manageAll bool + oldJob *rayv1.RayCluster + newJob *rayv1.RayCluster + manageAll bool + localQueueDefaulting bool + defaultLqExist bool }{ "unmanaged": { oldJob: testingrayutil.MakeCluster("job", "ns"). @@ -69,12 +75,49 @@ func TestValidateDefault(t *testing.T) { Suspend(true). Obj(), }, + "LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": { + localQueueDefaulting: true, + defaultLqExist: true, + oldJob: testingrayutil.MakeCluster("test-job", "default").Obj(), + newJob: testingrayutil.MakeCluster("test-job", "default"). + Queue("default"). + Obj(), + }, + "LocalQueueDefaulting enabled, default lq created, job has queue label": { + localQueueDefaulting: true, + defaultLqExist: true, + oldJob: testingrayutil.MakeCluster("test-job", "").Queue("test-queue").Obj(), + newJob: testingrayutil.MakeCluster("test-job", ""). + Queue("test-queue"). + Obj(), + }, + "LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": { + localQueueDefaulting: true, + defaultLqExist: false, + oldJob: testingrayutil.MakeCluster("test-job", "").Obj(), + newJob: testingrayutil.MakeCluster("test-job", ""). + Obj(), + }, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { + features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.localQueueDefaulting) + ctx, _ := utiltesting.ContextWithLog(t) + builder := utiltesting.NewClientBuilder() + cli := builder.Build() + cqCache := cache.New(cli) + queueManager := queue.NewManager(cli, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } + wh := &RayClusterWebhook{ manageJobsWithoutQueueName: tc.manageAll, + queues: queueManager, } result := tc.oldJob.DeepCopy() if err := wh.Default(context.Background(), result); err != nil { diff --git a/pkg/controller/jobs/rayjob/rayjob_webhook.go b/pkg/controller/jobs/rayjob/rayjob_webhook.go index b8d950f062..8c60d643d3 100644 --- a/pkg/controller/jobs/rayjob/rayjob_webhook.go +++ b/pkg/controller/jobs/rayjob/rayjob_webhook.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobframework/webhook" + "sigs.k8s.io/kueue/pkg/queue" ) var ( @@ -39,6 +40,7 @@ var ( type RayJobWebhook struct { manageJobsWithoutQueueName bool + queues *queue.Manager } // SetupRayJobWebhook configures the webhook for RayJob. @@ -64,6 +66,7 @@ func (w *RayJobWebhook) Default(ctx context.Context, obj runtime.Object) error { job := obj.(*rayv1.RayJob) log := ctrl.LoggerFrom(ctx).WithName("rayjob-webhook") log.V(5).Info("Applying defaults") + jobframework.ApplyDefaultLocalQueue((*RayJob)(job), w.queues.DefaultLocalQueue((*RayJob)(job).GetNamespace())) jobframework.ApplyDefaultForSuspend((*RayJob)(job), w.manageJobsWithoutQueueName) return nil } diff --git a/pkg/controller/jobs/rayjob/rayjob_webhook_test.go b/pkg/controller/jobs/rayjob/rayjob_webhook_test.go index 03e46631c8..9fc6c81efa 100644 --- a/pkg/controller/jobs/rayjob/rayjob_webhook_test.go +++ b/pkg/controller/jobs/rayjob/rayjob_webhook_test.go @@ -30,7 +30,11 @@ import ( "k8s.io/utils/ptr" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingrayutil "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" ) @@ -41,9 +45,11 @@ var ( func TestValidateDefault(t *testing.T) { testcases := map[string]struct { - oldJob *rayv1.RayJob - newJob *rayv1.RayJob - manageAll bool + oldJob *rayv1.RayJob + newJob *rayv1.RayJob + manageAll bool + localQueueDefaulting bool + defaultLqExist bool }{ "unmanaged": { oldJob: testingrayutil.MakeJob("job", "ns"). @@ -72,12 +78,48 @@ func TestValidateDefault(t *testing.T) { Suspend(true). Obj(), }, + "LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": { + localQueueDefaulting: true, + defaultLqExist: true, + oldJob: testingrayutil.MakeJob("test-job", "default").Obj(), + newJob: testingrayutil.MakeJob("test-job", "default"). + Queue("default"). + Obj(), + }, + "LocalQueueDefaulting enabled, default lq created, job has queue label": { + localQueueDefaulting: true, + defaultLqExist: true, + oldJob: testingrayutil.MakeJob("test-job", "").Queue("test-queue").Obj(), + newJob: testingrayutil.MakeJob("test-job", ""). + Queue("test-queue"). + Obj(), + }, + "LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": { + localQueueDefaulting: true, + defaultLqExist: false, + oldJob: testingrayutil.MakeJob("test-job", "").Obj(), + newJob: testingrayutil.MakeJob("test-job", ""). + Obj(), + }, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { + features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.localQueueDefaulting) + ctx, _ := utiltesting.ContextWithLog(t) + builder := utiltesting.NewClientBuilder() + cli := builder.Build() + cqCache := cache.New(cli) + queueManager := queue.NewManager(cli, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } wh := &RayJobWebhook{ manageJobsWithoutQueueName: tc.manageAll, + queues: queueManager, } result := tc.oldJob.DeepCopy() if err := wh.Default(context.Background(), result); err != nil { diff --git a/pkg/controller/jobs/statefulset/statefulset_webhook.go b/pkg/controller/jobs/statefulset/statefulset_webhook.go index c7b381e2e4..67206184fa 100644 --- a/pkg/controller/jobs/statefulset/statefulset_webhook.go +++ b/pkg/controller/jobs/statefulset/statefulset_webhook.go @@ -30,13 +30,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + kueue_constants "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" ) type Webhook struct { client client.Client + queues *queue.Manager manageJobsWithoutQueueName bool } @@ -62,9 +66,19 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("statefulset-webhook") log.V(5).Info("Applying defaults") - cqLabel, ok := ss.Labels[constants.QueueLabel] - if !ok { - return nil + labels := ss.Labels + cqLabel, ok := labels[constants.QueueLabel] + if !ok || cqLabel == "" { + if features.Enabled(features.LocalQueueDefaulting) && wh.queues.DefaultLocalQueue(ss.ObjectMeta.Namespace) != "" { + cqLabel = kueue_constants.DefaultLocalQueueName + if labels == nil { + labels = make(map[string]string) + } + labels[constants.QueueLabel] = cqLabel + ss.SetLabels(labels) + } else { + return nil + } } if ss.Spec.Template.Labels == nil { diff --git a/pkg/controller/jobs/statefulset/statefulset_webhook_test.go b/pkg/controller/jobs/statefulset/statefulset_webhook_test.go index 709cf3d488..46d3c464c2 100644 --- a/pkg/controller/jobs/statefulset/statefulset_webhook_test.go +++ b/pkg/controller/jobs/statefulset/statefulset_webhook_test.go @@ -29,9 +29,12 @@ import ( "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/queue" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingstatefulset "sigs.k8s.io/kueue/pkg/util/testingjobs/statefulset" ) @@ -40,6 +43,8 @@ func TestDefault(t *testing.T) { testCases := map[string]struct { statefulset *appsv1.StatefulSet manageJobsWithoutQueueName bool + localQueueDefaulting bool + defaultLqExist bool enableIntegrations []string want *appsv1.StatefulSet }{ @@ -71,21 +76,61 @@ func TestDefault(t *testing.T) { PodTemplateSpecPodGroupFastAdmissionAnnotation(true). Obj(), }, + "LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": { + localQueueDefaulting: true, + defaultLqExist: true, + statefulset: testingstatefulset.MakeStatefulSet("test-pod", "default").Obj(), + want: testingstatefulset.MakeStatefulSet("test-pod", "default"). + Queue("default"). + PodTemplateSpecQueue("default"). + PodTemplateSpecPodGroupNameLabel("test-pod", "", gvk). + PodTemplateSpecPodGroupTotalCountAnnotation(1). + PodTemplateSpecPodGroupFastAdmissionAnnotation(true). + Obj(), + }, + "LocalQueueDefaulting enabled, default lq created, job has queue label": { + localQueueDefaulting: true, + defaultLqExist: true, + statefulset: testingstatefulset.MakeStatefulSet("test-pod", "").Queue("test-queue").Obj(), + want: testingstatefulset.MakeStatefulSet("test-pod", ""). + Queue("test-queue"). + PodTemplateSpecQueue("test-queue"). + PodTemplateSpecPodGroupNameLabel("test-pod", "", gvk). + PodTemplateSpecPodGroupTotalCountAnnotation(1). + PodTemplateSpecPodGroupFastAdmissionAnnotation(true). + Obj(), + }, + "LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": { + localQueueDefaulting: true, + defaultLqExist: false, + statefulset: testingstatefulset.MakeStatefulSet("test-pod", "").Obj(), + want: testingstatefulset.MakeStatefulSet("test-pod", "").Obj(), + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.localQueueDefaulting) t.Cleanup(jobframework.EnableIntegrationsForTest(t, tc.enableIntegrations...)) + ctx, _ := utiltesting.ContextWithLog(t) + builder := utiltesting.NewClientBuilder() cli := builder.Build() + cqCache := cache.New(cli) + queueManager := queue.NewManager(cli, cqCache) + if tc.defaultLqExist { + if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default"). + ClusterQueue("cluster-queue").Obj()); err != nil { + t.Errorf("failed to create default local queue: %s", err) + } + } w := &Webhook{ client: cli, manageJobsWithoutQueueName: tc.manageJobsWithoutQueueName, + queues: queueManager, } - ctx, _ := utiltesting.ContextWithLog(t) - if err := w.Default(ctx, tc.statefulset); err != nil { t.Errorf("failed to set defaults for v1/statefulset: %s", err) } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index fbe9cbcc20..5686784de8 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -151,6 +151,12 @@ const ( // // Workloads keeps allocated quota and preserves QuotaReserved=True when ProvisioningRequest fails KeepQuotaForProvReqRetry featuregate.Feature = "KeepQuotaForProvReqRetry" + + // owner: @yaroslava-serdiuk + // alpha: v0.10 + // + // Enable to set default LocalQueue. + LocalQueueDefaulting featuregate.Feature = "LocalQueueDefaulting" ) func init() { @@ -180,6 +186,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ ExposeFlavorsInLocalQueue: {Default: true, PreRelease: featuregate.Beta}, AdmissionCheckValidationRules: {Default: false, PreRelease: featuregate.Deprecated}, KeepQuotaForProvReqRetry: {Default: false, PreRelease: featuregate.Deprecated}, + LocalQueueDefaulting: {Default: false, PreRelease: featuregate.Alpha}, } func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) { diff --git a/pkg/queue/local_queue.go b/pkg/queue/local_queue.go index 8fd13bf00c..38fb89bcec 100644 --- a/pkg/queue/local_queue.go +++ b/pkg/queue/local_queue.go @@ -20,6 +20,7 @@ import ( "fmt" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/workload" ) @@ -28,6 +29,10 @@ func Key(q *kueue.LocalQueue) string { return fmt.Sprintf("%s/%s", q.Namespace, q.Name) } +func DefaultQueue(namespace string) string { + return fmt.Sprintf("%s/%s", namespace, constants.DefaultLocalQueueName) +} + // LocalQueue is the internal implementation of kueue.LocalQueue. type LocalQueue struct { Key string diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index dcc5f89ba3..a7672eae0c 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -206,6 +206,16 @@ func (m *Manager) DeleteClusterQueue(cq *kueue.ClusterQueue) { metrics.ClearClusterQueueMetrics(cq.Name) } +func (m *Manager) DefaultLocalQueue(namespace string) string { + m.Lock() + defer m.Unlock() + + if _, ok := m.localQueues[DefaultQueue(namespace)]; ok { + return DefaultQueue(namespace) + } + return "" +} + func (m *Manager) AddLocalQueue(ctx context.Context, q *kueue.LocalQueue) error { m.Lock() defer m.Unlock()