Skip to content

Commit

Permalink
Implement default LocalQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Dec 2, 2024
1 parent 08bbbe0 commit 198d947
Show file tree
Hide file tree
Showing 22 changed files with 563 additions and 39 deletions.
2 changes: 2 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ const (

// ManagedByKueueLabel label that signalize that an object is managed by Kueue
ManagedByKueueLabel = "kueue.x-k8s.io/managed"

DefaultLocalQueueName = "default"
)
4 changes: 4 additions & 0 deletions pkg/controller/jobframework/base_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"sigs.k8s.io/kueue/pkg/controller/jobframework/webhook"
"sigs.k8s.io/kueue/pkg/queue"
)

// BaseWebhook applies basic defaulting and validation for jobs.
type BaseWebhook struct {
ManageJobsWithoutQueueName bool
FromObject func(runtime.Object) GenericJob
Queues *queue.Manager
}

func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJob) func(ctrl.Manager, ...Option) error {
Expand All @@ -38,6 +40,7 @@ func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJ
wh := &BaseWebhook{
ManageJobsWithoutQueueName: options.ManageJobsWithoutQueueName,
FromObject: fromObject,
Queues: options.Queues,
}
return webhook.WebhookManagedBy(mgr).
For(job.Object()).
Expand All @@ -54,6 +57,7 @@ func (w *BaseWebhook) Default(ctx context.Context, obj runtime.Object) error {
job := w.FromObject(obj)
log := ctrl.LoggerFrom(ctx)
log.V(5).Info("Applying defaults")
ApplyDefaultLocalQueue(job, w.Queues.DefaultLocalQueue(job.Object().GetNamespace()))
ApplyDefaultForSuspend(job, w.ManageJobsWithoutQueueName)
return nil
}
Expand Down
82 changes: 79 additions & 3 deletions pkg/controller/jobframework/base_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/go-cmp/cmp"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -31,9 +32,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/queue"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)

type testGenericJob struct {
Expand Down Expand Up @@ -125,9 +130,10 @@ func makeTestGenericJob() *testGenericJob {
func TestBaseWebhookDefault(t *testing.T) {
testcases := map[string]struct {
manageJobsWithoutQueueName bool

job *batchv1.Job
want *batchv1.Job
LocalQueueDefaulting bool
defaultLqExist bool
job *batchv1.Job
want *batchv1.Job
}{
"update the suspend field with 'manageJobsWithoutQueueName=false'": {
job: &batchv1.Job{
Expand Down Expand Up @@ -164,12 +170,82 @@ func TestBaseWebhookDefault(t *testing.T) {
Spec: batchv1.JobSpec{Suspend: ptr.To(true)},
},
},
"LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
job: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{},
},
},
want: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{constants.QueueLabel: "default"},
},
},
},
"LocalQueueDefaulting enabled, default lq created, job has queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
job: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{constants.QueueLabel: "queue"},
},
},
want: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{constants.QueueLabel: "queue"},
},
},
},
"LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: false,
job: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{},
},
},
want: &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
Labels: map[string]string{},
},
},
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.LocalQueueDefaulting)
clientBuilder := utiltesting.NewClientBuilder().
WithObjects(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}},
)
cl := clientBuilder.Build()
cqCache := cache.New(cl)
queueManager := queue.NewManager(cl, cqCache)
if tc.defaultLqExist {
if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default").
ClusterQueue("cluster-queue").Obj()); err != nil {
t.Errorf("failed to create default local queue: %s", err)
}
}
w := &jobframework.BaseWebhook{
ManageJobsWithoutQueueName: tc.manageJobsWithoutQueueName,
FromObject: makeTestGenericJob().fromObject,
Queues: queueManager,
}
if err := w.Default(context.Background(), tc.job); err != nil {
t.Errorf("set defaults to a kubeflow/mpijob by a Defaulter")
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/jobframework/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package jobframework

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/features"
)

func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) {
Expand All @@ -32,3 +34,17 @@ func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) {
}
}
}

func ApplyDefaultLocalQueue(job GenericJob, defaultQueue string) {
if !features.Enabled(features.LocalQueueDefaulting) || defaultQueue == "" {
return
}
labels := job.Object().GetLabels()
if labels == nil {
labels = make(map[string]string, 1)
}
if labels[constants.QueueLabel] == "" {
labels[constants.QueueLabel] = defaultQueue
job.Object().SetLabels(labels)
}
}
25 changes: 24 additions & 1 deletion pkg/controller/jobs/deployment/deployment_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,24 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

kueue_constants "sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobframework/webhook"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
)

type Webhook struct {
client client.Client
queues *queue.Manager
}

func SetupWebhook(mgr ctrl.Manager, _ ...jobframework.Option) error {
func SetupWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error {
options := jobframework.ProcessOptions(opts...)
wh := &Webhook{
client: mgr.GetClient(),
queues: options.Queues,
}
obj := &appsv1.Deployment{}
return webhook.WebhookManagedBy(mgr).
Expand All @@ -58,6 +64,23 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error {
log := ctrl.LoggerFrom(ctx).WithName("deployment-webhook")
log.V(5).Info("Applying defaults")

queue, ok := deployment.Labels[constants.QueueLabel]
if ok || features.Enabled(features.LocalQueueDefaulting) && wh.queues.DefaultLocalQueue(deployment.ObjectMeta.Namespace) != "" {
if deployment.Spec.Template.Labels == nil {
deployment.Spec.Template.Labels = make(map[string]string, 1)
}
}
if queue != "" {
deployment.Spec.Template.Labels[constants.QueueLabel] = queue
} else if features.Enabled(features.LocalQueueDefaulting) && wh.queues.DefaultLocalQueue(deployment.ObjectMeta.Namespace) != "" {
queue := kueue_constants.DefaultLocalQueueName
if deployment.Labels == nil {
deployment.Labels = make(map[string]string)
}
deployment.Labels[constants.QueueLabel] = queue
deployment.Spec.Template.Labels[constants.QueueLabel] = queue
}

if queueName := jobframework.QueueNameForObject(deployment.Object()); queueName != "" {
if deployment.Spec.Template.Labels == nil {
deployment.Spec.Template.Labels = make(map[string]string, 1)
Expand Down
53 changes: 46 additions & 7 deletions pkg/controller/jobs/deployment/deployment_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingdeployment "sigs.k8s.io/kueue/pkg/util/testingjobs/deployment"
)

func TestDefault(t *testing.T) {
testCases := map[string]struct {
deployment *appsv1.Deployment
want *appsv1.Deployment
deployment *appsv1.Deployment
LocalQueueDefaulting bool
defaultLqExist bool
want *appsv1.Deployment
}{
"deployment without queue": {
deployment: testingdeployment.MakeDeployment("test-pod", "").Obj(),
Expand Down Expand Up @@ -62,20 +67,54 @@ func TestDefault(t *testing.T) {
deployment: testingdeployment.MakeDeployment("test-pod", "").PodTemplateSpecQueue("test-queue").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").PodTemplateSpecQueue("test-queue").Obj(),
},
"LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
deployment: testingdeployment.MakeDeployment("test-pod", "default").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "default").
Queue("default").
PodTemplateSpecQueue("default").
Obj(),
},
"LocalQueueDefaulting enabled, default lq created, job has queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
deployment: testingdeployment.MakeDeployment("test-pod", "").Queue("test-queue").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").
Queue("test-queue").
PodTemplateSpecQueue("test-queue").
Obj(),
},
"LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: false,
deployment: testingdeployment.MakeDeployment("test-pod", "").Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").
Obj(),
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.LocalQueueDefaulting)
t.Cleanup(jobframework.EnableIntegrationsForTest(t, "pod"))
builder := utiltesting.NewClientBuilder()
client := builder.Build()

cli := builder.Build()
cqCache := cache.New(cli)
queueManager := queue.NewManager(cli, cqCache)
if tc.defaultLqExist {
if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default").
ClusterQueue("cluster-queue").
Obj()); err != nil {
t.Errorf("failed to create default local queue: %s", err)
}
}
w := &Webhook{
client: client,
client: cli,
queues: queueManager,
}

ctx, _ := utiltesting.ContextWithLog(t)

if err := w.Default(ctx, tc.deployment); err != nil {
t.Errorf("failed to set defaults for v1/deployment: %s", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/job/job_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (w *JobWebhook) Default(ctx context.Context, obj runtime.Object) error {
log := ctrl.LoggerFrom(ctx).WithName("job-webhook")
log.V(5).Info("Applying defaults")

jobframework.ApplyDefaultLocalQueue(job, w.queues.DefaultLocalQueue(job.ObjectMeta.Namespace))
jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName)

if canDefaultManagedBy(job.Spec.ManagedBy) {
Expand Down
32 changes: 32 additions & 0 deletions pkg/controller/jobs/job/job_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,8 @@ func TestDefault(t *testing.T) {
manageJobsWithoutQueueName bool
multiKueueEnabled bool
multiKueueBatchJobWithManagedByEnabled bool
LocalQueueDefaulting bool
defaultLqExist bool
want *batchv1.Job
wantErr error
}{
Expand Down Expand Up @@ -637,11 +639,35 @@ func TestDefault(t *testing.T) {
multiKueueEnabled: true,
multiKueueBatchJobWithManagedByEnabled: true,
},
"LocalQueueDefaulting enabled, default lq created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
job: testingutil.MakeJob("test-job", "default").Obj(),
want: testingutil.MakeJob("test-job", "default").
Queue("default").
Obj(),
},
"LocalQueueDefaulting enabled, default lq created, job has queue label": {
LocalQueueDefaulting: true,
defaultLqExist: true,
job: testingutil.MakeJob("test-job", "").Queue("test-queue").Obj(),
want: testingutil.MakeJob("test-job", "").
Queue("test-queue").
Obj(),
},
"LocalQueueDefaulting enabled, default lq isn't created, job doesn't have queue label": {
LocalQueueDefaulting: true,
defaultLqExist: false,
job: testingutil.MakeJob("test-job", "").Obj(),
want: testingutil.MakeJob("test-job", "").
Obj(),
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
features.SetFeatureGateDuringTest(t, features.MultiKueue, tc.multiKueueEnabled)
features.SetFeatureGateDuringTest(t, features.MultiKueueBatchJobWithManagedBy, tc.multiKueueBatchJobWithManagedByEnabled)
features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.LocalQueueDefaulting)

ctx, _ := utiltesting.ContextWithLog(t)

Expand All @@ -652,6 +678,12 @@ func TestDefault(t *testing.T) {
cl := clientBuilder.Build()
cqCache := cache.New(cl)
queueManager := queue.NewManager(cl, cqCache)
if tc.defaultLqExist {
if err := queueManager.AddLocalQueue(ctx, utiltesting.MakeLocalQueue("default", "default").
ClusterQueue("cluster-queue").Obj()); err != nil {
t.Errorf("failed to create default local queue: %s", err)
}
}

for _, q := range tc.queues {
if err := queueManager.AddLocalQueue(ctx, &q); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/jobset/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (w *JobSetWebhook) Default(ctx context.Context, obj runtime.Object) error {
log := ctrl.LoggerFrom(ctx).WithName("jobset-webhook")
log.V(5).Info("Applying defaults")

jobframework.ApplyDefaultLocalQueue(jobSet, w.queues.DefaultLocalQueue(jobSet.ObjectMeta.Namespace))
jobframework.ApplyDefaultForSuspend(jobSet, w.manageJobsWithoutQueueName)

if canDefaultManagedBy(jobSet.Spec.ManagedBy) {
Expand Down
Loading

0 comments on commit 198d947

Please sign in to comment.