Skip to content

Commit

Permalink
scheduler: support reservation name, taints and tolerations (#2207)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <[email protected]>
  • Loading branch information
saintube authored Sep 27, 2024
1 parent 0da015a commit 83267af
Show file tree
Hide file tree
Showing 8 changed files with 533 additions and 132 deletions.
60 changes: 37 additions & 23 deletions pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,18 @@ func MakeReservationErrorHandler(
schedulingErr := status.AsError()

if _, reserveAffExist := pod.Annotations[extension.AnnotationReservationAffinity]; reserveAffExist {
// for pod specified reservation affinity, export new event on reservation level
reservationLevelMsg, hasReservation := generatePodEventOnReservationLevel(schedulingErr.Error())
klog.V(7).Infof("origin scheduling error info: %s. hasReservation %v. reservation msg: %s",
schedulingErr.Error(), hasReservation, reservationLevelMsg)
if hasReservation {
msg := truncateMessage(reservationLevelMsg)
// user reason=FailedScheduling-Reservation to avoid event being auto-merged
fwk.EventRecorder().Eventf(pod, nil, corev1.EventTypeWarning, "FailedScheduling-Reservation", "Scheduling", msg)
}
// export event on reservation level asynchronously
go func() {
// for pod specified reservation affinity, export new event on reservation level
reservationLevelMsg, hasReservation := generatePodEventOnReservationLevel(schedulingErr.Error())
klog.V(7).Infof("origin scheduling error info: %s. hasReservation %v. reservation msg: %s",
schedulingErr.Error(), hasReservation, reservationLevelMsg)
if hasReservation {
msg := truncateMessage(reservationLevelMsg)
// user reason=FailedScheduling-Reservation to avoid event being auto-merged
fwk.EventRecorder().Eventf(pod, nil, corev1.EventTypeWarning, "FailedScheduling-Reservation", "Scheduling", msg)
}
}()
return false
} else if reservationutil.IsReservePod(pod) {
// NOTE: Since the failure handler is asynchronous and not locked with the reservation event handler,
Expand Down Expand Up @@ -151,30 +154,34 @@ func generatePodEventOnReservationLevel(errorMsg string) (string, bool) {
return "", false
}

// "3 Reservations ..., 1 Reservation ..."
// "3 Reservations ..., 1 Reservation xxx. 1 Reservation ..."
detailedMsg := prefixSplit[1]

splitFunc := func(c rune) bool {
detailSeparators := ",."
return strings.ContainsRune(detailSeparators, c)
}
// ["3 Reservation(s) ...", " 1 Reservation(s) ...", ..., " 8 Reservation(s) matched owner total", " Gang rejected..."]
detailSplit := strings.FieldsFunc(detailedMsg, splitFunc)
// "3 Reservations ..., 1 Reservation xxx, 1 Reservation ..."
detailedMsg = strings.ReplaceAll(detailedMsg, ". ", ", ")
// ["3 Reservation(s) ...", " 1 Reservation(s) ...", ..., " 8 Reservation(s) matched owner total.", " Gang rejected..."]
detailSplit := strings.FieldsFunc(detailedMsg, func(c rune) bool {
return c == ','
})

total := int64(-1)
resultDetails := make([]string, 0, len(detailSplit))
nodeRelatedDetails := make([]string, 0, len(detailSplit))
var reservationNameDetail []string

// for reservation total item
reserveTotalRe := regexp.MustCompile("^([0-9]+) Reservation\\(s\\) matched owner total$")

// for reservation name matched item
reserveNameTotalRe := regexp.MustCompile("^([0-9]+) Reservation\\(s\\) exactly matches the requested reservation name$")

// for node related item
reserveNodeDetailRe := regexp.MustCompile("^([0-9]+ Reservation\\(s\\)) (for node reason that .*)$")

// for reservation detail item
reserveDetailRe := regexp.MustCompile("^([0-9]+) Reservation\\(s\\) .*$")

for _, item := range detailSplit {
trimItem := strings.TrimSpace(item)
trimItem := strings.Trim(item, ". ")
totalStr := reserveTotalRe.FindAllStringSubmatch(trimItem, -1)

if len(totalStr) > 0 && len(totalStr[0]) == 2 {
Expand All @@ -197,16 +204,23 @@ func generatePodEventOnReservationLevel(errorMsg string) (string, bool) {
}
nodeReasonWords = append(nodeReasonWords, vv)
}
resultDetails = append(resultDetails, strings.Join(nodeReasonWords, " "))
nodeRelatedDetails = append(nodeRelatedDetails, strings.Join(nodeReasonWords, " "))
} else if reserveNameTotalRe.MatchString(trimItem) {
reservationNameDetail = append(reservationNameDetail, trimItem)
} else if reserveDetailRe.MatchString(trimItem) {
// reservation itself item, append to details, e.g. " 1 Reservation(s) ..."
// for 1 Reservation(s) Insufficient nvidia, replace nvidia with nvidia.com/gpu
// TODO support other extend resource fields like kubernetes.io/batch-cpu
itemReplaced := strings.Replace(trimItem, "nvidia", "nvidia.com/gpu", -1)
resultDetails = append(resultDetails, itemReplaced)
resultDetails = append(resultDetails, trimItem)
}
}

// put the reservation name at the front, and put the node-related details at the end
if d := len(reservationNameDetail); d > 0 {
resultDetails = append(resultDetails, reservationNameDetail...)
copy(resultDetails[d:], resultDetails[:len(resultDetails)-d])
copy(resultDetails[:d], reservationNameDetail)
}
resultDetails = append(resultDetails, nodeRelatedDetails...)

reserveLevelMsgFmt := "0/%d reservations are available: %s."

return fmt.Sprintf(reserveLevelMsgFmt, total, strings.Join(resultDetails, ", ")), total >= 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1349,8 +1349,8 @@ func Test_generatePodEventOnReservationLevel(t *testing.T) {
"3 Reservation(s) for node reason that didn't match pod topology spread constraints (missing required label), " +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory, " +
"8 Reservation(s) matched owner total.",
wantMsg: "0/8 reservations are available: 3 Reservation(s) for node reason that didn't match pod topology spread constraints (missing required label), " +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory.",
wantMsg: "0/8 reservations are available: 2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory, " +
"3 Reservation(s) for node reason that didn't match pod topology spread constraints (missing required label).",
wantIsReserve: true,
},
{
Expand All @@ -1360,8 +1360,8 @@ func Test_generatePodEventOnReservationLevel(t *testing.T) {
"3 Reservation(s) for node reason that didn't match pod topology spread constraints," +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory. " +
"8 Reservation(s) matched owner total.",
wantMsg: "0/8 reservations are available: 3 Reservation(s) for node reason that didn't match pod topology spread constraints, " +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory.",
wantMsg: "0/8 reservations are available: 2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory, " +
"3 Reservation(s) for node reason that didn't match pod topology spread constraints.",
wantIsReserve: true,
},
{
Expand All @@ -1371,8 +1371,8 @@ func Test_generatePodEventOnReservationLevel(t *testing.T) {
"3 Reservation(s) for node reason that didn't satisfy existing pods anti-affinity rules," +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory. " +
"8 Reservation(s) matched owner total.",
wantMsg: "0/8 reservations are available: 3 Reservation(s) for node reason that didn't satisfy existing pods anti-affinity rules, " +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory.",
wantMsg: "0/8 reservations are available: 2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory, " +
"3 Reservation(s) for node reason that didn't satisfy existing pods anti-affinity rules.",
wantIsReserve: true,
},
{
Expand All @@ -1382,8 +1382,8 @@ func Test_generatePodEventOnReservationLevel(t *testing.T) {
"3 Reservation(s) for node reason that didn't match pod affinity rules, " +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory. " +
"8 Reservation(s) matched owner total.",
wantMsg: "0/8 reservations are available: 3 Reservation(s) for node reason that didn't match pod affinity rules, " +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory.",
wantMsg: "0/8 reservations are available: 2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory, " +
"3 Reservation(s) for node reason that didn't match pod affinity rules.",
wantIsReserve: true,
},
{
Expand All @@ -1393,8 +1393,8 @@ func Test_generatePodEventOnReservationLevel(t *testing.T) {
"3 Reservation(s) for node reason that didn't match pod anti-affinity rules, " +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory. " +
"8 Reservation(s) matched owner total.",
wantMsg: "0/8 reservations are available: 3 Reservation(s) for node reason that didn't match pod anti-affinity rules, " +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory.",
wantMsg: "0/8 reservations are available: 2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory, " +
"3 Reservation(s) for node reason that didn't match pod anti-affinity rules.",
wantIsReserve: true,
},
{
Expand All @@ -1406,9 +1406,21 @@ func Test_generatePodEventOnReservationLevel(t *testing.T) {
"1 Reservation(s) for node reason that didn't match pod affinity rules, " +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory. " +
"8 Reservation(s) matched owner total.",
wantMsg: "0/8 reservations are available: 3 Reservation(s) for node reason that didn't match pod topology spread constraints, " +
"1 Reservation(s) for node reason that didn't match pod affinity rules, " +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory.",
wantMsg: "0/8 reservations are available: 2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory, " +
"3 Reservation(s) for node reason that didn't match pod topology spread constraints, " +
"1 Reservation(s) for node reason that didn't match pod affinity rules.",
wantIsReserve: true,
},
{
name: "reservation taints errors",
errorMsg: "0/5 nodes are available: 3 node(s) didn't match pod topology spread constraints," +
"1 Insufficient cpu, 1 Insufficient memory, " +
"3 Reservation(s) for node reason that didn't match pod topology spread constraints," +
"2 Reservation(s) had untolerated taint {koordinator.sh/reservation-pool: test}, 1 Reservation(s) Insufficient memory. " +
"8 Reservation(s) matched owner total.",
wantMsg: "0/8 reservations are available: 2 Reservation(s) had untolerated taint {koordinator.sh/reservation-pool: test}, " +
"1 Reservation(s) Insufficient memory, " +
"3 Reservation(s) for node reason that didn't match pod topology spread constraints.",
wantIsReserve: true,
},
{
Expand All @@ -1419,9 +1431,10 @@ func Test_generatePodEventOnReservationLevel(t *testing.T) {
"2 Reservation(s) for node reason that didn't match pod affinity rules," +
"2 Reservation(s) Insufficient cpu, 1 Reservation(s) Insufficient memory. " +
"8 Reservation(s) matched owner total.",
wantMsg: "0/8 reservations are available: 3 Reservation(s) for node reason that didn't match pod topology spread constraints, " +
"2 Reservation(s) for node reason that didn't match pod affinity rules, 2 Reservation(s) Insufficient cpu, " +
"1 Reservation(s) Insufficient memory.",
wantMsg: "0/8 reservations are available: 2 Reservation(s) Insufficient cpu, " +
"1 Reservation(s) Insufficient memory, " +
"3 Reservation(s) for node reason that didn't match pod topology spread constraints, " +
"2 Reservation(s) for node reason that didn't match pod affinity rules.",
wantIsReserve: true,
},
{
Expand All @@ -1435,6 +1448,18 @@ func Test_generatePodEventOnReservationLevel(t *testing.T) {
errorMsg: `0/5 nodes are available: 3 Insufficient cpu, 2 Insufficient memory.`,
wantIsReserve: false,
},
{
name: "reservation name specified with insufficient resource",
errorMsg: "0/10 nodes are available: 1 Reservation(s) Insufficient cpu, requested: 4000, used: 30000, capacity: 32000, " +
"4 Insufficient cpu, 5 Insufficient memory, " +
"1 Reservation(s) exactly matches the requested reservation name, " +
"3 Reservation(s) didn't match the requested reservation name, " +
"4 Reservation(s) matched owner total.",
wantMsg: "0/4 reservations are available: 1 Reservation(s) exactly matches the requested reservation name, " +
"1 Reservation(s) Insufficient cpu, " +
"3 Reservation(s) didn't match the requested reservation name.",
wantIsReserve: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
43 changes: 42 additions & 1 deletion pkg/scheduler/frameworkext/reservation_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (ri *ReservationInfo) GetPodOwners() []schedulingv1alpha1.ReservationOwner
return nil
}

func (ri *ReservationInfo) Match(pod *corev1.Pod) bool {
func (ri *ReservationInfo) MatchOwners(pod *corev1.Pod) bool {
if ri.ParseError != nil {
return false
}
Expand All @@ -284,6 +284,45 @@ func (ri *ReservationInfo) IsTerminating() bool {
return !ri.GetObject().GetDeletionTimestamp().IsZero()
}

func (ri *ReservationInfo) GetTaints() []corev1.Taint {
if ri.Reservation != nil {
return ri.Reservation.Spec.Taints
}
return nil
}

// MatchReservationAffinity returns the statuses of whether the reservation affinity matches, whether the reservation
// taints are tolerated, and whether the reservation name matches.
func (ri *ReservationInfo) MatchReservationAffinity(reservationAffinity *reservationutil.RequiredReservationAffinity, nodeLabels map[string]string) bool {
if reservationAffinity != nil {
// NOTE: There are some special scenarios.
// For example, the AZ where the Pod wants to select the Reservation is cn-hangzhou, but the Reservation itself
// does not have this information, so it needs to perceive the label of the Node when Matching Affinity.
fakeNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: ri.GetName(),
Labels: map[string]string{},
},
}
for k, v := range nodeLabels {
fakeNode.Labels[k] = v
}
for k, v := range ri.GetObject().GetLabels() {
fakeNode.Labels[k] = v
}
return reservationAffinity.MatchAffinity(fakeNode)
}
return true
}

func (ri *ReservationInfo) MatchExactMatchSpec(podRequests corev1.ResourceList, spec *apiext.ExactMatchReservationSpec) bool {
return apiext.ExactMatchReservation(podRequests, ri.Allocatable, spec)
}

func (ri *ReservationInfo) FindMatchingUntoleratedTaint(reservationAffinity *reservationutil.RequiredReservationAffinity) (corev1.Taint, bool) {
return reservationAffinity.FindMatchingUntoleratedTaint(ri.GetTaints(), reservationutil.DoNotScheduleTaintsFilter)
}

func (ri *ReservationInfo) Clone() *ReservationInfo {
resourceNames := make([]corev1.ResourceName, 0, len(ri.ResourceNames))
for _, v := range ri.ResourceNames {
Expand All @@ -304,6 +343,8 @@ func (ri *ReservationInfo) Clone() *ReservationInfo {
AllocatablePorts: util.CloneHostPorts(ri.AllocatablePorts),
AllocatedPorts: util.CloneHostPorts(ri.AllocatedPorts),
AssignedPods: assignedPods,
OwnerMatchers: ri.OwnerMatchers,
ParseError: ri.ParseError,
}
}

Expand Down
Loading

0 comments on commit 83267af

Please sign in to comment.