diff --git a/pkg/controller/tas/topology_ungater.go b/pkg/controller/tas/topology_ungater.go index f0f1fdaeb4..d37abc4410 100644 --- a/pkg/controller/tas/topology_ungater.go +++ b/pkg/controller/tas/topology_ungater.go @@ -26,6 +26,7 @@ import ( "time" "github.com/go-logr/logr" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -433,6 +434,10 @@ func determineRanksLookup(pod *corev1.Pod) (*string, *replicatedJobsInfo) { if _, found := pod.Labels[batchv1.JobCompletionIndexAnnotation]; found { return ptr.To(batchv1.JobCompletionIndexAnnotation), nil } + // Check if this is kubeflow + if _, found := pod.Labels[kftraining.ReplicaIndexLabel]; found { + return ptr.To(kftraining.ReplicaIndexLabel), nil + } return nil, nil } diff --git a/pkg/controller/tas/topology_ungater_test.go b/pkg/controller/tas/topology_ungater_test.go index d4b1042077..5c5095b4c4 100644 --- a/pkg/controller/tas/topology_ungater_test.go +++ b/pkg/controller/tas/topology_ungater_test.go @@ -24,6 +24,7 @@ import ( gocmp "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" jsoniter "github.com/json-iterator/go" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -1697,6 +1698,260 @@ func TestReconcile(t *testing.T) { }, }, }, + "ranks: support rank-based ordering for kubeflow - for all Pods": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 4).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "4"). + AssignmentPodCount(4). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 2, + Values: []string{ + "b1", + "r1", + }, + }, + { + Count: 1, + Values: []string{ + "b1", + "r2", + }, + }, + { + Count: 1, + Values: []string{ + "b2", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("l0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "launcher"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + *testingpod.MakePod("w0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "0"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + *testingpod.MakePod("w1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "1"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + *testingpod.MakePod("w2", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + cmpNS: true, + wantPods: []corev1.Pod{ + *testingpod.MakePod("l0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "launcher"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("w0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "0"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("w1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "1"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r2"). + Obj(), + *testingpod.MakePod("w2", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b2"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 2, + }, + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r2", + }, + Count: 1, + }, + { + NodeSelector: map[string]string{ + tasBlockLabel: "b2", + tasRackLabel: "r1", + }, + Count: 1, + }, + }, + }, + "ranks: support rank-based ordering for kubeflow - some Pods already scheduled": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 4).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "4"). + AssignmentPodCount(4). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 2, + Values: []string{ + "b1", + "r1", + }, + }, + { + Count: 1, + Values: []string{ + "b1", + "r2", + }, + }, + { + Count: 1, + Values: []string{ + "b2", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("l0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "launcher"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + *testingpod.MakePod("w0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "0"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + *testingpod.MakePod("w1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "1"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("w2", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r2"). + Obj(), + }, + cmpNS: true, + wantPods: []corev1.Pod{ + *testingpod.MakePod("l0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "launcher"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("w0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "0"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b2"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("w1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "1"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("w2", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kftraining.JobRoleLabel, "worker"). + Label(kftraining.ReplicaIndexLabel, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r2"). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 2, + }, + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r2", + }, + Count: 1, + }, + { + NodeSelector: map[string]string{ + tasBlockLabel: "b2", + tasRackLabel: "r1", + }, + Count: 1, + }, + }, + }, } for name, tc := range testCases { diff --git a/pkg/util/testingjobs/mpijob/wrappers_mpijob.go b/pkg/util/testingjobs/mpijob/wrappers_mpijob.go index d1d9098fcc..47291b3458 100644 --- a/pkg/util/testingjobs/mpijob/wrappers_mpijob.go +++ b/pkg/util/testingjobs/mpijob/wrappers_mpijob.go @@ -49,6 +49,8 @@ func MakeMPIJob(name, ns string) *MPIJobWrapper { } type MPIJobReplicaSpecRequirement struct { + Image string + Args []string ReplicaType kfmpi.MPIReplicaType ReplicaCount int32 Annotations map[string]string @@ -58,6 +60,8 @@ type MPIJobReplicaSpecRequirement struct { func (j *MPIJobWrapper) MPIJobReplicaSpecs(replicaSpecs ...MPIJobReplicaSpecRequirement) *MPIJobWrapper { j = j.GenericLauncherAndWorker() for _, rs := range replicaSpecs { + j.Spec.MPIReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Image = rs.Image + j.Spec.MPIReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Args = rs.Args j.Spec.MPIReplicaSpecs[rs.ReplicaType].Replicas = ptr.To[int32](rs.ReplicaCount) j.Spec.MPIReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = rs.RestartPolicy @@ -77,10 +81,13 @@ func (j *MPIJobWrapper) GenericLauncherAndWorker() *MPIJobWrapper { RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { - Name: "mpijob", - Image: "pause", - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + Name: "mpijob", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{}, + Limits: corev1.ResourceList{}, + }, }, }, NodeSelector: map[string]string{}, @@ -95,10 +102,13 @@ func (j *MPIJobWrapper) GenericLauncherAndWorker() *MPIJobWrapper { RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { - Name: "mpijob", - Image: "pause", - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + Name: "mpijob", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{}, + Limits: corev1.ResourceList{}, + }, }, }, NodeSelector: map[string]string{}, @@ -161,6 +171,12 @@ func (j *MPIJobWrapper) Request(replicaType kfmpi.MPIReplicaType, r corev1.Resou return j } +// Limit adds a resource request to the default container. +func (j *MPIJobWrapper) Limit(replicaType kfmpi.MPIReplicaType, r corev1.ResourceName, v string) *MPIJobWrapper { + j.Spec.MPIReplicaSpecs[replicaType].Template.Spec.Containers[0].Resources.Limits[r] = resource.MustParse(v) + return j +} + // Parallelism updates job parallelism. func (j *MPIJobWrapper) Parallelism(p int32) *MPIJobWrapper { j.Spec.MPIReplicaSpecs[kfmpi.MPIReplicaTypeWorker].Replicas = ptr.To(p) diff --git a/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go b/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go index 6f163b2a00..6cb4d40f0e 100644 --- a/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go +++ b/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go @@ -48,6 +48,8 @@ func MakePyTorchJob(name, ns string) *PyTorchJobWrapper { } type PyTorchReplicaSpecRequirement struct { + Image string + Args []string ReplicaType kftraining.ReplicaType Name string ReplicaCount int32 @@ -62,6 +64,8 @@ func (j *PyTorchJobWrapper) PyTorchReplicaSpecs(replicaSpecs ...PyTorchReplicaSp j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.Name = rs.Name j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = corev1.RestartPolicy(rs.RestartPolicy) j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Name = "pytorch" + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Image = rs.Image + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Args = rs.Args if rs.Annotations != nil { j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.ObjectMeta.Annotations = rs.Annotations @@ -79,10 +83,13 @@ func (j *PyTorchJobWrapper) PyTorchReplicaSpecsDefault() *PyTorchJobWrapper { RestartPolicy: "Never", Containers: []corev1.Container{ { - Name: "c", - Image: "pause", - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{}, + Limits: corev1.ResourceList{}, + }, }, }, NodeSelector: map[string]string{}, @@ -97,10 +104,13 @@ func (j *PyTorchJobWrapper) PyTorchReplicaSpecsDefault() *PyTorchJobWrapper { RestartPolicy: "Never", Containers: []corev1.Container{ { - Name: "c", - Image: "pause", - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{}, + Limits: corev1.ResourceList{}, + }, }, }, NodeSelector: map[string]string{}, @@ -163,6 +173,12 @@ func (j *PyTorchJobWrapper) Request(replicaType kftraining.ReplicaType, r corev1 return j } +// Limit adds a resource request to the default container. +func (j *PyTorchJobWrapper) Limit(replicaType kftraining.ReplicaType, r corev1.ResourceName, v string) *PyTorchJobWrapper { + j.Spec.PyTorchReplicaSpecs[replicaType].Template.Spec.Containers[0].Resources.Limits[r] = resource.MustParse(v) + return j +} + // Parallelism updates job parallelism. func (j *PyTorchJobWrapper) Parallelism(p int32) *PyTorchJobWrapper { j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Replicas = ptr.To(p) diff --git a/test/e2e/tas/mpijob_test.go b/test/e2e/tas/mpijob_test.go new file mode 100644 index 0000000000..8b0b76f3c0 --- /dev/null +++ b/test/e2e/tas/mpijob_test.go @@ -0,0 +1,178 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tase2e + +import ( + "fmt" + + kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/util/testing" + testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob" + "sigs.k8s.io/kueue/test/util" +) + +var _ = ginkgo.Describe("TopologyAwareScheduling for MPIJob", func() { + var ( + ns *corev1.Namespace + topology *kueuealpha.Topology + tasFlavor *kueue.ResourceFlavor + clusterQueue *kueue.ClusterQueue + localQueue *kueue.LocalQueue + ) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "e2e-tas-mpijob-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + topology = testing.MakeTopology("datacenter"). + Levels([]string{topologyLevelBlock, topologyLevelRack, topologyLevelHostname}). + Obj() + gomega.Expect(k8sClient.Create(ctx, topology)).Should(gomega.Succeed()) + + tasFlavor = testing.MakeResourceFlavor("tas-flavor"). + NodeLabel(tasNodeGroupLabel, instanceType). + TopologyName(topology.Name). + Obj() + gomega.Expect(k8sClient.Create(ctx, tasFlavor)).Should(gomega.Succeed()) + + clusterQueue = testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas(tasFlavor.Name). + Resource(corev1.ResourceCPU, "1"). + Resource(extraResource, "8"). + Obj(), + ). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) + util.ExpectClusterQueuesToBeActive(ctx, k8sClient, clusterQueue) + + localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + util.ExpectLocalQueuesToBeActive(ctx, k8sClient, localQueue) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteAllMPIJobsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectObjectToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectObjectToBeDeleted(ctx, k8sClient, tasFlavor, true) + util.ExpectObjectToBeDeleted(ctx, k8sClient, topology, true) + }) + + ginkgo.When("Creating a MPIJob", func() { + ginkgo.It("Should place pods based on the ranks-ordering", func() { + const ( + launcherReplicas = 1 + workerReplicas = 3 + ) + + numPods := launcherReplicas + workerReplicas + + mpijob := testingmpijob.MakeMPIJob("ranks-mpi", ns.Name). + Queue(localQueue.Name). + MPIJobReplicaSpecs( + testingmpijob.MPIJobReplicaSpecRequirement{ + Image: util.E2eTestSleepImage, + Args: []string{"60s"}, + ReplicaType: kfmpi.MPIReplicaTypeLauncher, + ReplicaCount: launcherReplicas, + RestartPolicy: corev1.RestartPolicyOnFailure, + Annotations: map[string]string{ + kueuealpha.PodSetPreferredTopologyAnnotation: topologyLevelRack, + }, + }, + testingmpijob.MPIJobReplicaSpecRequirement{ + Image: util.E2eTestSleepImage, + Args: []string{"60s"}, + ReplicaType: kfmpi.MPIReplicaTypeWorker, + ReplicaCount: workerReplicas, + RestartPolicy: corev1.RestartPolicyOnFailure, + Annotations: map[string]string{ + kueuealpha.PodSetPreferredTopologyAnnotation: topologyLevelBlock, + }, + }, + ). + Request(kfmpi.MPIReplicaTypeLauncher, corev1.ResourceCPU, "100m"). + Limit(kfmpi.MPIReplicaTypeLauncher, corev1.ResourceCPU, "100m"). + Request(kfmpi.MPIReplicaTypeWorker, extraResource, "1"). + Limit(kfmpi.MPIReplicaTypeWorker, extraResource, "1"). + Obj() + gomega.Expect(k8sClient.Create(ctx, mpijob)).Should(gomega.Succeed()) + + ginkgo.By("MPIJob is unsuspended", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mpijob), mpijob)).To(gomega.Succeed()) + g.Expect(mpijob.Spec.RunPolicy.Suspend).Should(gomega.Equal(ptr.To(false))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + pods := &corev1.PodList{} + ginkgo.By("ensure all pods are created", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name))).To(gomega.Succeed()) + g.Expect(pods.Items).Should(gomega.HaveLen(numPods)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("ensure all pods are scheduled", func() { + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", ""), + } + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name), listOpts)).To(gomega.Succeed()) + g.Expect(pods.Items).Should(gomega.HaveLen(numPods)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("verify the assignment of pods are as expected with rank-based ordering", func() { + gomega.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name))).To(gomega.Succeed()) + gotAssignment := readRankAssignmentsFromMPIJobPods(pods.Items) + wantAssignment := map[string]string{ + "worker/0": "kind-worker", + "worker/1": "kind-worker2", + "worker/2": "kind-worker3", + } + gomega.Expect(wantAssignment).Should(gomega.BeComparableTo(gotAssignment)) + }) + }) + }) +}) + +func readRankAssignmentsFromMPIJobPods(pods []corev1.Pod) map[string]string { + assignment := make(map[string]string, len(pods)) + for _, pod := range pods { + if role := pod.Labels[kftraining.JobRoleLabel]; role == "worker" { + key := fmt.Sprintf("%s/%s", role, pod.Labels[kftraining.ReplicaIndexLabel]) + assignment[key] = pod.Spec.NodeName + } + } + return assignment +} diff --git a/test/e2e/tas/pytorch_test.go b/test/e2e/tas/pytorch_test.go new file mode 100644 index 0000000000..e58eb53484 --- /dev/null +++ b/test/e2e/tas/pytorch_test.go @@ -0,0 +1,177 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tase2e + +import ( + "fmt" + + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/util/testing" + testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" + "sigs.k8s.io/kueue/test/util" +) + +var _ = ginkgo.Describe("TopologyAwareScheduling for PyTorchJob", func() { + var ( + ns *corev1.Namespace + topology *kueuealpha.Topology + tasFlavor *kueue.ResourceFlavor + clusterQueue *kueue.ClusterQueue + localQueue *kueue.LocalQueue + ) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "e2e-tas-pytorchjob-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + topology = testing.MakeTopology("datacenter"). + Levels([]string{topologyLevelBlock, topologyLevelRack, topologyLevelHostname}). + Obj() + gomega.Expect(k8sClient.Create(ctx, topology)).Should(gomega.Succeed()) + + tasFlavor = testing.MakeResourceFlavor("tas-flavor"). + NodeLabel(tasNodeGroupLabel, instanceType). + TopologyName(topology.Name). + Obj() + gomega.Expect(k8sClient.Create(ctx, tasFlavor)).Should(gomega.Succeed()) + + clusterQueue = testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas(tasFlavor.Name). + Resource(corev1.ResourceCPU, "1"). + Resource(extraResource, "8"). + Obj(), + ). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) + util.ExpectClusterQueuesToBeActive(ctx, k8sClient, clusterQueue) + + localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + util.ExpectLocalQueuesToBeActive(ctx, k8sClient, localQueue) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteAllPyTorchJobsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectObjectToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectObjectToBeDeleted(ctx, k8sClient, tasFlavor, true) + util.ExpectObjectToBeDeleted(ctx, k8sClient, topology, true) + }) + + ginkgo.When("Creating a PyTorchJob", func() { + ginkgo.It("Should place pods based on the ranks-ordering", func() { + const ( + masterReplicas = 1 + workerReplicas = 3 + ) + + numPods := masterReplicas + workerReplicas + + pytorchjob := testingpytorchjob.MakePyTorchJob("ranks-pytorch", ns.Name). + Queue(localQueue.Name). + PyTorchReplicaSpecs( + testingpytorchjob.PyTorchReplicaSpecRequirement{ + Image: util.E2eTestSleepImage, + Args: []string{"60s"}, + ReplicaType: kftraining.PyTorchJobReplicaTypeMaster, + ReplicaCount: masterReplicas, + RestartPolicy: kftraining.RestartPolicyOnFailure, + Annotations: map[string]string{ + kueuealpha.PodSetPreferredTopologyAnnotation: topologyLevelRack, + }, + }, + testingpytorchjob.PyTorchReplicaSpecRequirement{ + Image: util.E2eTestSleepImage, + Args: []string{"60s"}, + ReplicaType: kftraining.PyTorchJobReplicaTypeWorker, + ReplicaCount: workerReplicas, + RestartPolicy: kftraining.RestartPolicyOnFailure, + Annotations: map[string]string{ + kueuealpha.PodSetPreferredTopologyAnnotation: topologyLevelBlock, + }, + }, + ). + Request(kftraining.PyTorchJobReplicaTypeMaster, corev1.ResourceCPU, "100m"). + Limit(kftraining.PyTorchJobReplicaTypeMaster, corev1.ResourceCPU, "100m"). + Request(kftraining.PyTorchJobReplicaTypeWorker, extraResource, "1"). + Limit(kftraining.PyTorchJobReplicaTypeWorker, extraResource, "1"). + Obj() + gomega.Expect(k8sClient.Create(ctx, pytorchjob)).Should(gomega.Succeed()) + + ginkgo.By("PyTorchJob is unsuspended", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(pytorchjob), pytorchjob)).To(gomega.Succeed()) + g.Expect(pytorchjob.Spec.RunPolicy.Suspend).Should(gomega.Equal(ptr.To(false))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + pods := &corev1.PodList{} + ginkgo.By("ensure all pods are created", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name))).To(gomega.Succeed()) + g.Expect(pods.Items).Should(gomega.HaveLen(numPods)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("ensure all pods are scheduled", func() { + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", ""), + } + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name), listOpts)).To(gomega.Succeed()) + g.Expect(pods.Items).Should(gomega.HaveLen(numPods)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("verify the assignment of pods are as expected with rank-based ordering", func() { + gomega.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name))).To(gomega.Succeed()) + gotAssignment := readRankAssignmentsFromPyTorchJobPods(pods.Items) + wantAssignment := map[string]string{ + "worker/0": "kind-worker", + "worker/1": "kind-worker2", + "worker/2": "kind-worker3", + } + gomega.Expect(wantAssignment).Should(gomega.BeComparableTo(gotAssignment)) + }) + }) + }) +}) + +func readRankAssignmentsFromPyTorchJobPods(pods []corev1.Pod) map[string]string { + assignment := make(map[string]string, len(pods)) + for _, pod := range pods { + if role := pod.Labels[kftraining.ReplicaTypeLabel]; role == "worker" { + key := fmt.Sprintf("%s/%s", role, pod.Labels[kftraining.ReplicaIndexLabel]) + assignment[key] = pod.Spec.NodeName + } + } + return assignment +} diff --git a/test/util/util.go b/test/util/util.go index b912ecaa2c..1843663ca4 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -25,6 +25,8 @@ import ( "github.com/go-logr/logr" "github.com/google/go-cmp/cmp/cmpopts" + kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus" @@ -139,6 +141,22 @@ func DeleteAllJobsetsInNamespace(ctx context.Context, c client.Client, ns *corev return nil } +func DeleteAllMPIJobsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error { + err := c.DeleteAllOf(ctx, &kfmpi.MPIJob{}, client.InNamespace(ns.Name), client.PropagationPolicy(metav1.DeletePropagationBackground)) + if err != nil && !apierrors.IsNotFound(err) && !errors.Is(err, &apimeta.NoKindMatchError{}) { + return err + } + return nil +} + +func DeleteAllPyTorchJobsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error { + err := c.DeleteAllOf(ctx, &kftraining.PyTorchJob{}, client.InNamespace(ns.Name), client.PropagationPolicy(metav1.DeletePropagationBackground)) + if err != nil && !apierrors.IsNotFound(err) && !errors.Is(err, &apimeta.NoKindMatchError{}) { + return err + } + return nil +} + func DeleteAllPodsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error { if err := client.IgnoreNotFound(c.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(ns.Name))); err != nil { return fmt.Errorf("deleting all Pods in namespace %q: %w", ns.Name, err) @@ -765,3 +783,15 @@ func ExpectClusterQueuesToBeActive(ctx context.Context, c client.Client, cqs ... } }, Timeout, Interval).Should(gomega.Succeed()) } + +func ExpectLocalQueuesToBeActive(ctx context.Context, c client.Client, lqs ...*kueue.LocalQueue) { + gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { + readLq := &kueue.LocalQueue{} + for _, lq := range lqs { + g.Expect(c.Get(ctx, client.ObjectKeyFromObject(lq), readLq)).To(gomega.Succeed()) + cond := apimeta.FindStatusCondition(readLq.Status.Conditions, kueue.LocalQueueActive) + g.Expect(cond).NotTo(gomega.BeNil(), "no %q condition found in %q cq status", kueue.LocalQueueActive, lq.Name) + g.Expect(cond.Status).To(gomega.Equal(metav1.ConditionTrue), "%q is not active status: %q message: %q", lq.Name, cond.Status, cond.Message) + } + }, Timeout, Interval).Should(gomega.Succeed()) +}