Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement default LocalQueue #3610

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ const (

// ManagedByKueueLabel label that signalize that an object is managed by Kueue
ManagedByKueueLabel = "kueue.x-k8s.io/managed"

DefaultLocalQueueName = "default"
)
4 changes: 4 additions & 0 deletions pkg/controller/jobframework/base_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"sigs.k8s.io/kueue/pkg/controller/jobframework/webhook"
"sigs.k8s.io/kueue/pkg/queue"
)

// BaseWebhook applies basic defaulting and validation for jobs.
type BaseWebhook struct {
ManageJobsWithoutQueueName bool
FromObject func(runtime.Object) GenericJob
Queues *queue.Manager
}

func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJob) func(ctrl.Manager, ...Option) error {
Expand All @@ -38,6 +40,7 @@ func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJ
wh := &BaseWebhook{
ManageJobsWithoutQueueName: options.ManageJobsWithoutQueueName,
FromObject: fromObject,
Queues: options.Queues,
}
return webhook.WebhookManagedBy(mgr).
For(job.Object()).
Expand All @@ -54,6 +57,7 @@ func (w *BaseWebhook) Default(ctx context.Context, obj runtime.Object) error {
job := w.FromObject(obj)
log := ctrl.LoggerFrom(ctx)
log.V(5).Info("Applying defaults")
ApplyDefaultLocalQueue(job, w.Queues.DefaultLocalQueue(job.Object().GetNamespace()))
ApplyDefaultForSuspend(job, w.ManageJobsWithoutQueueName)
return nil
}
Expand Down
82 changes: 79 additions & 3 deletions pkg/controller/jobframework/base_webhook_test.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use JobWrapper from pkg/util/testingjobs/job/wrappers.go?

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/go-cmp/cmp"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -31,9 +32,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/queue"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)

type testGenericJob struct {
Expand Down Expand Up @@ -125,9 +130,10 @@ func makeTestGenericJob() *testGenericJob {
func TestBaseWebhookDefault(t *testing.T) {
testcases := map[string]struct {
manageJobsWithoutQueueName bool

job *batchv1.Job
want *batchv1.Job
LocalQueueDefaulting bool
defaultLqExist bool
job *batchv1.Job
want *batchv1.Job
}{
"update the suspend field with 'manageJobsWithoutQueueName=false'": {
job: &batchv1.Job{
Expand Down Expand Up @@ -164,12 +170,82 @@ func TestBaseWebhookDefault(t *testing.T) {
Spec: batchv1.JobSpec{Suspend: ptr.To(true)},
},
},
"LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
job: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{},
},
},
want: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{constants.QueueLabel: "default"},
},
},
},
"LocalQueueDefaulting enabled, default lq created, job has queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
job: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{constants.QueueLabel: "queue"},
},
},
want: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{constants.QueueLabel: "queue"},
},
},
},
"LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: false,
job: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{},
},
},
want: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{},
},
},
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.LocalQueueDefaulting)
clientBuilder := utiltesting.NewClientBuilder().
WithObjects(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}},
)
cl := clientBuilder.Build()
cqCache := cache.New(cl)
queueManager := queue.NewManager(cl, cqCache)
if tc.defaultLqExist {
if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default").
ClusterQueue("cluster-queue").Obj()); err != nil {
t.Errorf("failed to create default local queue: %s", err)
}
}
w := &jobframework.BaseWebhook{
ManageJobsWithoutQueueName: tc.manageJobsWithoutQueueName,
FromObject: makeTestGenericJob().fromObject,
Queues: queueManager,
}
if err := w.Default(context.Background(), tc.job); err != nil {
t.Errorf("set defaults to a kubeflow/mpijob by a Defaulter")
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/jobframework/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package jobframework

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/features"
)

func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) {
Expand All @@ -32,3 +34,17 @@ func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) {
}
}
}

func ApplyDefaultLocalQueue(job GenericJob, defaultQueue string) {
if !features.Enabled(features.LocalQueueDefaulting) || defaultQueue == "" {
return
}
labels := job.Object().GetLabels()
if labels == nil {
labels = make(map[string]string, 1)
}
if labels[constants.QueueLabel] == "" {
labels[constants.QueueLabel] = defaultQueue
job.Object().SetLabels(labels)
}
}
25 changes: 24 additions & 1 deletion pkg/controller/jobs/deployment/deployment_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,24 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

kueue_constants "sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobframework/webhook"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
)

type Webhook struct {
client client.Client
queues *queue.Manager
}

func SetupWebhook(mgr ctrl.Manager, _ ...jobframework.Option) error {
func SetupWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error {
options := jobframework.ProcessOptions(opts...)
wh := &Webhook{
client: mgr.GetClient(),
queues: options.Queues,
}
obj := &appsv1.Deployment{}
return webhook.WebhookManagedBy(mgr).
Expand All @@ -58,6 +64,23 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error {
log := ctrl.LoggerFrom(ctx).WithName("deployment-webhook")
log.V(5).Info("Applying defaults")

queue, ok := deployment.Labels[constants.QueueLabel]
if ok || features.Enabled(features.LocalQueueDefaulting) && wh.queues.DefaultLocalQueue(deployment.ObjectMeta.Namespace) != "" {
if deployment.Spec.Template.Labels == nil {
deployment.Spec.Template.Labels = make(map[string]string, 1)
}
}
if ok {
deployment.Spec.Template.Labels[constants.QueueLabel] = queue
} else if features.Enabled(features.LocalQueueDefaulting) && wh.queues.DefaultLocalQueue(deployment.ObjectMeta.Namespace) != "" {
queue := kueue_constants.DefaultLocalQueueName
if deployment.Labels == nil {
deployment.Labels = make(map[string]string)
yaroslava-serdiuk marked this conversation as resolved.
Show resolved Hide resolved
}
deployment.Labels[constants.QueueLabel] = queue
deployment.Spec.Template.Labels[constants.QueueLabel] = queue
}

if queueName := jobframework.QueueNameForObject(deployment.Object()); queueName != "" {
if deployment.Spec.Template.Labels == nil {
deployment.Spec.Template.Labels = make(map[string]string, 1)
Expand Down
53 changes: 46 additions & 7 deletions pkg/controller/jobs/deployment/deployment_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingdeployment "sigs.k8s.io/kueue/pkg/util/testingjobs/deployment"
)

func TestDefault(t *testing.T) {
testCases := map[string]struct {
deployment *appsv1.Deployment
want *appsv1.Deployment
deployment *appsv1.Deployment
LocalQueueDefaulting bool
defaultLqExist bool
want *appsv1.Deployment
}{
"deployment without queue": {
deployment: testingdeployment.MakeDeployment("test-pod", "").Obj(),
Expand Down Expand Up @@ -62,20 +67,54 @@ func TestDefault(t *testing.T) {
deployment: testingdeployment.MakeDeployment("test-pod", "").PodTemplateSpecQueue("test-queue").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").PodTemplateSpecQueue("test-queue").Obj(),
},
"LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
deployment: testingdeployment.MakeDeployment("test-pod", "default").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "default").
Queue("default").
PodTemplateSpecQueue("default").
Obj(),
},
"LocalQueueDefaulting enabled, default lq created, job has queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
deployment: testingdeployment.MakeDeployment("test-pod", "").Queue("test-queue").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").
Queue("test-queue").
PodTemplateSpecQueue("test-queue").
Obj(),
},
"LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: false,
deployment: testingdeployment.MakeDeployment("test-pod", "").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").
Obj(),
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.LocalQueueDefaulting)
t.Cleanup(jobframework.EnableIntegrationsForTest(t, "pod"))
builder := utiltesting.NewClientBuilder()
client := builder.Build()

cli := builder.Build()
cqCache := cache.New(cli)
queueManager := queue.NewManager(cli, cqCache)
if tc.defaultLqExist {
if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default").
ClusterQueue("cluster-queue").
Obj()); err != nil {
t.Errorf("failed to create default local queue: %s", err)
}
}
w := &Webhook{
client: client,
client: cli,
queues: queueManager,
}

ctx, _ := utiltesting.ContextWithLog(t)

if err := w.Default(ctx, tc.deployment); err != nil {
t.Errorf("failed to set defaults for v1/deployment: %s", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/job/job_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (w *JobWebhook) Default(ctx context.Context, obj runtime.Object) error {
log := ctrl.LoggerFrom(ctx).WithName("job-webhook")
log.V(5).Info("Applying defaults")

jobframework.ApplyDefaultLocalQueue(job, w.queues.DefaultLocalQueue(job.ObjectMeta.Namespace))
jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName)

if canDefaultManagedBy(job.Spec.ManagedBy) {
Expand Down
32 changes: 32 additions & 0 deletions pkg/controller/jobs/job/job_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,8 @@ func TestDefault(t *testing.T) {
manageJobsWithoutQueueName bool
multiKueueEnabled bool
multiKueueBatchJobWithManagedByEnabled bool
LocalQueueDefaulting bool
defaultLqExist bool
want *batchv1.Job
wantErr error
}{
Expand Down Expand Up @@ -637,11 +639,35 @@ func TestDefault(t *testing.T) {
multiKueueEnabled: true,
multiKueueBatchJobWithManagedByEnabled: true,
},
"LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
job: testingutil.MakeJob("test-job", "default").Obj(),
want: testingutil.MakeJob("test-job", "default").
Queue("default").
Obj(),
},
"LocalQueueDefaulting enabled, default lq created, job has queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
job: testingutil.MakeJob("test-job", "").Queue("test-queue").Obj(),
want: testingutil.MakeJob("test-job", "").
Queue("test-queue").
Obj(),
},
"LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: false,
job: testingutil.MakeJob("test-job", "").Obj(),
want: testingutil.MakeJob("test-job", "").
Obj(),
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
features.SetFeatureGateDuringTest(t, features.MultiKueue, tc.multiKueueEnabled)
features.SetFeatureGateDuringTest(t, features.MultiKueueBatchJobWithManagedBy, tc.multiKueueBatchJobWithManagedByEnabled)
features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.LocalQueueDefaulting)

ctx, _ := utiltesting.ContextWithLog(t)

Expand All @@ -652,6 +678,12 @@ func TestDefault(t *testing.T) {
cl := clientBuilder.Build()
cqCache := cache.New(cl)
queueManager := queue.NewManager(cl, cqCache)
if tc.defaultLqExist {
if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default").
ClusterQueue("cluster-queue").Obj()); err != nil {
t.Errorf("failed to create default local queue: %s", err)
}
}

for _, q := range tc.queues {
if err := queueManager.AddLocalQueue(ctx, &q); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/jobset/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (w *JobSetWebhook) Default(ctx context.Context, obj runtime.Object) error {
log := ctrl.LoggerFrom(ctx).WithName("jobset-webhook")
log.V(5).Info("Applying defaults")

jobframework.ApplyDefaultLocalQueue(jobSet, w.queues.DefaultLocalQueue(jobSet.ObjectMeta.Namespace))
jobframework.ApplyDefaultForSuspend(jobSet, w.manageJobsWithoutQueueName)

if canDefaultManagedBy(jobSet.Spec.ManagedBy) {
Expand Down
Loading