Skip to content

Commit

Permalink
Implement default LocalQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Nov 21, 2024
1 parent 4e45d4c commit 809e481
Show file tree
Hide file tree
Showing 21 changed files with 548 additions and 25 deletions.
10 changes: 9 additions & 1 deletion pkg/controller/jobframework/base_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"sigs.k8s.io/kueue/pkg/controller/jobframework/webhook"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
)

// BaseWebhook applies basic defaulting and validation for jobs.
type BaseWebhook struct {
ManageJobsWithoutQueueName bool
FromObject func(runtime.Object) GenericJob
Queues *queue.Manager
}

func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJob) func(ctrl.Manager, ...Option) error {
Expand All @@ -39,6 +42,7 @@ func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJ
wh := &BaseWebhook{
ManageJobsWithoutQueueName: options.ManageJobsWithoutQueueName,
FromObject: fromObject,
Queues: options.Queues,
}
return webhook.WebhookManagedBy(mgr).
For(job.Object()).
Expand All @@ -55,7 +59,11 @@ func (w *BaseWebhook) Default(ctx context.Context, obj runtime.Object) error {
job := w.FromObject(obj)
log := ctrl.LoggerFrom(ctx)
log.V(5).Info("Applying defaults", "job", klog.KObj(job.Object()))
ApplyDefaultForSuspend(job, w.ManageJobsWithoutQueueName)
if features.Enabled(features.KueueDefaulting) && w.Queues.DefaultLocalQueue(job.Object().GetNamespace()) != "" {
ApplyDefaultLocalQueue(job)
} else {
ApplyDefaultForSuspend(job, w.ManageJobsWithoutQueueName)
}
return nil
}

Expand Down
81 changes: 78 additions & 3 deletions pkg/controller/jobframework/base_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/go-cmp/cmp"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -31,9 +32,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/queue"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)

type testGenericJob struct {
Expand Down Expand Up @@ -125,9 +130,10 @@ func makeTestGenericJob() *testGenericJob {
func TestBaseWebhookDefault(t *testing.T) {
testcases := map[string]struct {
manageJobsWithoutQueueName bool

job *batchv1.Job
want *batchv1.Job
kueueDefaulting bool
defaultLqExist bool
job *batchv1.Job
want *batchv1.Job
}{
"update the suspend field with 'manageJobsWithoutQueueName=false'": {
job: &batchv1.Job{
Expand Down Expand Up @@ -164,12 +170,81 @@ func TestBaseWebhookDefault(t *testing.T) {
Spec: batchv1.JobSpec{Suspend: ptr.To(true)},
},
},
"KueueDefaulting enabled, default lq created, job doesn't have queue label": {
kueueDefaulting: true,
defaultLqExist: true,
job: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{},
},
},
want: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{constants.QueueLabel: "default"},
},
},
},
"KueueDefaulting enabled, default lq created, job has queue label": {
kueueDefaulting: true,
defaultLqExist: true,
job: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{constants.QueueLabel: "queue"},
},
},
want: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{constants.QueueLabel: "queue"},
},
},
},
"KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label": {
kueueDefaulting: true,
defaultLqExist: false,
job: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{},
},
},
want: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{},
},
},
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting)
clientBuilder := utiltesting.NewClientBuilder().
WithObjects(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}},
)
cl := clientBuilder.Build()
cqCache := cache.New(cl)
queueManager := queue.NewManager(cl, cqCache)
if tc.defaultLqExist {
queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default").
ClusterQueue("cluster-queue").
Obj())
}
w := &jobframework.BaseWebhook{
ManageJobsWithoutQueueName: tc.manageJobsWithoutQueueName,
FromObject: makeTestGenericJob().fromObject,
Queues: queueManager,
}
if err := w.Default(context.Background(), tc.job); err != nil {
t.Errorf("set defaults to a kubeflow/mpijob by a Defaulter")
Expand Down
13 changes: 13 additions & 0 deletions pkg/controller/jobframework/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package jobframework

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/queue"
)

func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) {
Expand All @@ -32,3 +34,14 @@ func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) {
}
}
}

func ApplyDefaultLocalQueue(job GenericJob) {
labels := job.Object().GetLabels()
if labels == nil {
labels = make(map[string]string)
}
if labels[constants.QueueLabel] == "" {
labels[constants.QueueLabel] = queue.DefaultLocalQueueName
job.Object().SetLabels(labels)
}
}
15 changes: 14 additions & 1 deletion pkg/controller/jobs/deployment/deployment_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,22 @@ import (
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobframework/webhook"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
)

type Webhook struct {
client client.Client
manageJobsWithoutQueueName bool
queues *queue.Manager
}

func SetupWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error {
options := jobframework.ProcessOptions(opts...)
wh := &Webhook{
client: mgr.GetClient(),
manageJobsWithoutQueueName: options.ManageJobsWithoutQueueName,
queues: options.Queues,
}
obj := &appsv1.Deployment{}
return webhook.WebhookManagedBy(mgr).
Expand All @@ -62,11 +66,20 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error {
log.V(5).Info("Applying defaults")

cqLabel, ok := d.Labels[constants.QueueLabel]
if ok {
if ok || features.Enabled(features.KueueDefaulting) && wh.queues.DefaultLocalQueue(d.ObjectMeta.Namespace) != "" {
if d.Spec.Template.Labels == nil {
d.Spec.Template.Labels = make(map[string]string, 1)
}
}
if cqLabel != "" {
d.Spec.Template.Labels[constants.QueueLabel] = cqLabel
} else if features.Enabled(features.KueueDefaulting) && wh.queues.DefaultLocalQueue(d.ObjectMeta.Namespace) != "" {
queue := queue.DefaultLocalQueueName
if d.Labels == nil {
d.Labels = make(map[string]string)
}
d.Labels[constants.QueueLabel] = queue
d.Spec.Template.Labels[constants.QueueLabel] = queue
}

return nil
Expand Down
43 changes: 40 additions & 3 deletions pkg/controller/jobs/deployment/deployment_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import (
"github.com/google/go-cmp/cmp"
appsv1 "k8s.io/api/apps/v1"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingdeployment "sigs.k8s.io/kueue/pkg/util/testingjobs/deployment"
)
Expand All @@ -31,6 +34,8 @@ func TestDefault(t *testing.T) {
testCases := map[string]struct {
deployment *appsv1.Deployment
manageJobsWithoutQueueName bool
kueueDefaulting bool
defaultLqExist bool
enableIntegrations []string
want *appsv1.Deployment
}{
Expand All @@ -44,21 +49,53 @@ func TestDefault(t *testing.T) {
PodTemplateSpecQueue("test-queue").
Obj(),
},
"KueueDefaulting enabled, default lq created, job doesn't have queue label": {
kueueDefaulting: true,
defaultLqExist: true,
deployment: testingdeployment.MakeDeployment("test-pod", "default").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "default").
Queue("default").
PodTemplateSpecQueue("default").
Obj(),
},
"KueueDefaulting enabled, default lq created, job has queue label": {
kueueDefaulting: true,
defaultLqExist: true,
deployment: testingdeployment.MakeDeployment("test-pod", "").Queue("test-queue").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").
Queue("test-queue").
PodTemplateSpecQueue("test-queue").
Obj(),
},
"KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label": {
kueueDefaulting: true,
defaultLqExist: false,
deployment: testingdeployment.MakeDeployment("test-pod", "").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").
Obj(),
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting)
t.Cleanup(jobframework.EnableIntegrationsForTest(t, tc.enableIntegrations...))
builder := utiltesting.NewClientBuilder()
cli := builder.Build()

cqCache := cache.New(cli)
queueManager := queue.NewManager(cli, cqCache)
if tc.defaultLqExist {
queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default").
ClusterQueue("cluster-queue").
Obj())
}
w := &Webhook{
client: cli,
manageJobsWithoutQueueName: tc.manageJobsWithoutQueueName,
queues: queueManager,
}

ctx, _ := utiltesting.ContextWithLog(t)

if err := w.Default(ctx, tc.deployment); err != nil {
t.Errorf("failed to set defaults for v1/deployment: %s", err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/jobs/job/job_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ func (w *JobWebhook) Default(ctx context.Context, obj runtime.Object) error {
log := ctrl.LoggerFrom(ctx).WithName("job-webhook")
log.V(5).Info("Applying defaults", "job", klog.KObj(job))

jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName)
if features.Enabled(features.KueueDefaulting) && w.queues.DefaultLocalQueue(job.ObjectMeta.Namespace) != "" {
jobframework.ApplyDefaultLocalQueue(job)
} else {
jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName)
}

if canDefaultManagedBy(job.Spec.ManagedBy) {
localQueueName, found := job.Labels[constants.QueueLabel]
Expand Down
31 changes: 31 additions & 0 deletions pkg/controller/jobs/job/job_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ func TestDefault(t *testing.T) {
manageJobsWithoutQueueName bool
multiKueueEnabled bool
multiKueueBatchJobWithManagedByEnabled bool
kueueDefaulting bool
defaultLqExist bool
want *batchv1.Job
wantErr error
}{
Expand Down Expand Up @@ -683,11 +685,35 @@ func TestDefault(t *testing.T) {
multiKueueEnabled: true,
multiKueueBatchJobWithManagedByEnabled: true,
},
"KueueDefaulting enabled, default lq created, job doesn't have queue label": {
kueueDefaulting: true,
defaultLqExist: true,
job: testingutil.MakeJob("test-job", "default").Obj(),
want: testingutil.MakeJob("test-job", "default").
Queue("default").
Obj(),
},
"KueueDefaulting enabled, default lq created, job has queue label": {
kueueDefaulting: true,
defaultLqExist: true,
job: testingutil.MakeJob("test-job", "").Queue("test-queue").Obj(),
want: testingutil.MakeJob("test-job", "").
Queue("test-queue").
Obj(),
},
"KueueDefaulting enabled, default lq doesn't created, job doesn't have queue label": {
kueueDefaulting: true,
defaultLqExist: false,
job: testingutil.MakeJob("test-job", "").Obj(),
want: testingutil.MakeJob("test-job", "").
Obj(),
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
features.SetFeatureGateDuringTest(t, features.MultiKueue, tc.multiKueueEnabled)
features.SetFeatureGateDuringTest(t, features.MultiKueueBatchJobWithManagedBy, tc.multiKueueBatchJobWithManagedByEnabled)
features.SetFeatureGateDuringTest(t, features.KueueDefaulting, tc.kueueDefaulting)

ctx, _ := utiltesting.ContextWithLog(t)

Expand All @@ -698,6 +724,11 @@ func TestDefault(t *testing.T) {
cl := clientBuilder.Build()
cqCache := cache.New(cl)
queueManager := queue.NewManager(cl, cqCache)
if tc.defaultLqExist {
queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default").
ClusterQueue("cluster-queue").
Obj())
}

for _, q := range tc.queues {
if err := queueManager.AddLocalQueue(ctx, &q); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/jobs/jobset/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ func (w *JobSetWebhook) Default(ctx context.Context, obj runtime.Object) error {
log := ctrl.LoggerFrom(ctx).WithName("jobset-webhook")
log.V(5).Info("Applying defaults", "jobset", klog.KObj(jobSet))

jobframework.ApplyDefaultForSuspend(jobSet, w.manageJobsWithoutQueueName)
if features.Enabled(features.KueueDefaulting) && w.queues.DefaultLocalQueue(jobSet.ObjectMeta.Namespace) != "" {
jobframework.ApplyDefaultLocalQueue(jobSet)
} else {
jobframework.ApplyDefaultForSuspend(jobSet, w.manageJobsWithoutQueueName)
}

if canDefaultManagedBy(jobSet.Spec.ManagedBy) {
localQueueName, found := jobSet.Labels[constants.QueueLabel]
Expand Down
Loading

0 comments on commit 809e481

Please sign in to comment.