Skip to content

Commit

Permalink
TAS: support rank-ordering for Kubeflow (#3604)
Browse files Browse the repository at this point in the history
* TAS: support rank-ordering for Kubeflow.

* Add ExpectLocalQueuesToBeActive.

* Add MPIJob e2e test.

* Add unit tests.

* Add PyTorchJob e2e test.
  • Loading branch information
mbobrovskyi authored Nov 21, 2024
1 parent 15e00ea commit 4bbddce
Show file tree
Hide file tree
Showing 7 changed files with 693 additions and 16 deletions.
5 changes: 5 additions & 0 deletions pkg/controller/tas/topology_ungater.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/go-logr/logr"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -433,6 +434,10 @@ func determineRanksLookup(pod *corev1.Pod) (*string, *replicatedJobsInfo) {
if _, found := pod.Labels[batchv1.JobCompletionIndexAnnotation]; found {
return ptr.To(batchv1.JobCompletionIndexAnnotation), nil
}
// Check if this is kubeflow
if _, found := pod.Labels[kftraining.ReplicaIndexLabel]; found {
return ptr.To(kftraining.ReplicaIndexLabel), nil
}
return nil, nil
}

Expand Down
255 changes: 255 additions & 0 deletions pkg/controller/tas/topology_ungater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
gocmp "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
jsoniter "github.com/json-iterator/go"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -1697,6 +1698,260 @@ func TestReconcile(t *testing.T) {
},
},
},
"ranks: support rank-based ordering for kubeflow - for all Pods": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 4).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(
utiltesting.MakeAdmission("cq").
Assignment(corev1.ResourceCPU, "unit-test-flavor", "4").
AssignmentPodCount(4).
TopologyAssignment(&kueue.TopologyAssignment{
Levels: defaultTestLevels,
Domains: []kueue.TopologyDomainAssignment{
{
Count: 2,
Values: []string{
"b1",
"r1",
},
},
{
Count: 1,
Values: []string{
"b1",
"r2",
},
},
{
Count: 1,
Values: []string{
"b2",
"r1",
},
},
},
}).
Obj(),
).
Admitted(true).
Obj(),
},
pods: []corev1.Pod{
*testingpod.MakePod("l0", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "launcher").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
TopologySchedulingGate().
Obj(),
*testingpod.MakePod("w0", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "0").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
TopologySchedulingGate().
Obj(),
*testingpod.MakePod("w1", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "1").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
TopologySchedulingGate().
Obj(),
*testingpod.MakePod("w2", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "2").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
TopologySchedulingGate().
Obj(),
},
cmpNS: true,
wantPods: []corev1.Pod{
*testingpod.MakePod("l0", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "launcher").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
NodeSelector(tasBlockLabel, "b1").
NodeSelector(tasRackLabel, "r1").
Obj(),
*testingpod.MakePod("w0", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "0").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
NodeSelector(tasBlockLabel, "b1").
NodeSelector(tasRackLabel, "r1").
Obj(),
*testingpod.MakePod("w1", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "1").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
NodeSelector(tasBlockLabel, "b1").
NodeSelector(tasRackLabel, "r2").
Obj(),
*testingpod.MakePod("w2", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "2").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
NodeSelector(tasBlockLabel, "b2").
NodeSelector(tasRackLabel, "r1").
Obj(),
},
wantCounts: []counts{
{
NodeSelector: map[string]string{
tasBlockLabel: "b1",
tasRackLabel: "r1",
},
Count: 2,
},
{
NodeSelector: map[string]string{
tasBlockLabel: "b1",
tasRackLabel: "r2",
},
Count: 1,
},
{
NodeSelector: map[string]string{
tasBlockLabel: "b2",
tasRackLabel: "r1",
},
Count: 1,
},
},
},
"ranks: support rank-based ordering for kubeflow - some Pods already scheduled": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 4).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(
utiltesting.MakeAdmission("cq").
Assignment(corev1.ResourceCPU, "unit-test-flavor", "4").
AssignmentPodCount(4).
TopologyAssignment(&kueue.TopologyAssignment{
Levels: defaultTestLevels,
Domains: []kueue.TopologyDomainAssignment{
{
Count: 2,
Values: []string{
"b1",
"r1",
},
},
{
Count: 1,
Values: []string{
"b1",
"r2",
},
},
{
Count: 1,
Values: []string{
"b2",
"r1",
},
},
},
}).
Obj(),
).
Admitted(true).
Obj(),
},
pods: []corev1.Pod{
*testingpod.MakePod("l0", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "launcher").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
TopologySchedulingGate().
Obj(),
*testingpod.MakePod("w0", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "0").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
TopologySchedulingGate().
Obj(),
*testingpod.MakePod("w1", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "1").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
NodeSelector(tasBlockLabel, "b1").
NodeSelector(tasRackLabel, "r1").
Obj(),
*testingpod.MakePod("w2", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "2").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
NodeSelector(tasBlockLabel, "b1").
NodeSelector(tasRackLabel, "r2").
Obj(),
},
cmpNS: true,
wantPods: []corev1.Pod{
*testingpod.MakePod("l0", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "launcher").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
NodeSelector(tasBlockLabel, "b1").
NodeSelector(tasRackLabel, "r1").
Obj(),
*testingpod.MakePod("w0", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "0").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
NodeSelector(tasBlockLabel, "b2").
NodeSelector(tasRackLabel, "r1").
Obj(),
*testingpod.MakePod("w1", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "1").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
NodeSelector(tasBlockLabel, "b1").
NodeSelector(tasRackLabel, "r1").
Obj(),
*testingpod.MakePod("w2", "ns").
Annotation(kueuealpha.WorkloadAnnotation, "unit-test").
Label(kftraining.JobRoleLabel, "worker").
Label(kftraining.ReplicaIndexLabel, "2").
Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName).
NodeSelector(tasBlockLabel, "b1").
NodeSelector(tasRackLabel, "r2").
Obj(),
},
wantCounts: []counts{
{
NodeSelector: map[string]string{
tasBlockLabel: "b1",
tasRackLabel: "r1",
},
Count: 2,
},
{
NodeSelector: map[string]string{
tasBlockLabel: "b1",
tasRackLabel: "r2",
},
Count: 1,
},
{
NodeSelector: map[string]string{
tasBlockLabel: "b2",
tasRackLabel: "r1",
},
Count: 1,
},
},
},
}

for name, tc := range testCases {
Expand Down
32 changes: 24 additions & 8 deletions pkg/util/testingjobs/mpijob/wrappers_mpijob.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func MakeMPIJob(name, ns string) *MPIJobWrapper {
}

type MPIJobReplicaSpecRequirement struct {
Image string
Args []string
ReplicaType kfmpi.MPIReplicaType
ReplicaCount int32
Annotations map[string]string
Expand All @@ -58,6 +60,8 @@ type MPIJobReplicaSpecRequirement struct {
func (j *MPIJobWrapper) MPIJobReplicaSpecs(replicaSpecs ...MPIJobReplicaSpecRequirement) *MPIJobWrapper {
j = j.GenericLauncherAndWorker()
for _, rs := range replicaSpecs {
j.Spec.MPIReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Image = rs.Image
j.Spec.MPIReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Args = rs.Args
j.Spec.MPIReplicaSpecs[rs.ReplicaType].Replicas = ptr.To[int32](rs.ReplicaCount)
j.Spec.MPIReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = rs.RestartPolicy

Expand All @@ -77,10 +81,13 @@ func (j *MPIJobWrapper) GenericLauncherAndWorker() *MPIJobWrapper {
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Name: "mpijob",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
Name: "mpijob",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
},
},
},
NodeSelector: map[string]string{},
Expand All @@ -95,10 +102,13 @@ func (j *MPIJobWrapper) GenericLauncherAndWorker() *MPIJobWrapper {
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Name: "mpijob",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
Name: "mpijob",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
},
},
},
NodeSelector: map[string]string{},
Expand Down Expand Up @@ -161,6 +171,12 @@ func (j *MPIJobWrapper) Request(replicaType kfmpi.MPIReplicaType, r corev1.Resou
return j
}

// Limit adds a resource request to the default container.
func (j *MPIJobWrapper) Limit(replicaType kfmpi.MPIReplicaType, r corev1.ResourceName, v string) *MPIJobWrapper {
j.Spec.MPIReplicaSpecs[replicaType].Template.Spec.Containers[0].Resources.Limits[r] = resource.MustParse(v)
return j
}

// Parallelism updates job parallelism.
func (j *MPIJobWrapper) Parallelism(p int32) *MPIJobWrapper {
j.Spec.MPIReplicaSpecs[kfmpi.MPIReplicaTypeWorker].Replicas = ptr.To(p)
Expand Down
Loading

0 comments on commit 4bbddce

Please sign in to comment.