Skip to content

Commit

Permalink
scheduler: refactor reset quota (#2301)
Browse files Browse the repository at this point in the history
Signed-off-by: chuanyun.lcy <[email protected]>
Co-authored-by: chuanyun.lcy <[email protected]>
  • Loading branch information
shaloulcy and chuanyun.lcy authored Jan 6, 2025
1 parent 84cebbf commit b3ea5b6
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 156 deletions.
67 changes: 27 additions & 40 deletions pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (gqm *GroupQuotaManager) SetTotalResourceForTree(total v1.ResourceList) v1.
}

// updateGroupDeltaRequestNoLock no need lock gqm.lock
func (gqm *GroupQuotaManager) updateGroupDeltaRequestNoLock(quotaName string, deltaReq, deltaNonPreemptibleRequest v1.ResourceList) {
func (gqm *GroupQuotaManager) updateGroupDeltaRequestNoLock(quotaName string, deltaReq, deltaNonPreemptibleRequest v1.ResourceList, selfQuotaIndex int) {
curToAllParInfos := gqm.getCurToAllParentGroupQuotaInfoNoLock(quotaName)
allQuotaInfoLen := len(curToAllParInfos)
if allQuotaInfoLen <= 0 {
Expand All @@ -177,17 +177,16 @@ func (gqm *GroupQuotaManager) updateGroupDeltaRequestNoLock(quotaName string, de

defer gqm.scopedLockForQuotaInfo(curToAllParInfos)()

gqm.recursiveUpdateGroupTreeWithDeltaRequest(deltaReq, deltaNonPreemptibleRequest, curToAllParInfos)
gqm.recursiveUpdateGroupTreeWithDeltaRequest(deltaReq, deltaNonPreemptibleRequest, curToAllParInfos, selfQuotaIndex)
}

// recursiveUpdateGroupTreeWithDeltaRequest update the quota of a node, also need update all parentNode, the lock operation
// of all quotaInfo is done by gqm. scopedLockForQuotaInfo, so just get treeWrappers' lock when calling treeWrappers' function
func (gqm *GroupQuotaManager) recursiveUpdateGroupTreeWithDeltaRequest(deltaReq, deltaNonPreemptibleRequest v1.ResourceList, curToAllParInfos []*QuotaInfo) {
func (gqm *GroupQuotaManager) recursiveUpdateGroupTreeWithDeltaRequest(deltaReq, deltaNonPreemptibleRequest v1.ResourceList, curToAllParInfos []*QuotaInfo, selfQuotaIndex int) {
for i := 0; i < len(curToAllParInfos); i++ {
curQuotaInfo := curToAllParInfos[i]
oldSubLimitReq := curQuotaInfo.getLimitRequestNoLock()
isSelfRequest := i == 0
curQuotaInfo.addRequestNonNegativeNoLock(deltaReq, deltaNonPreemptibleRequest, isSelfRequest)
curQuotaInfo.addRequestNonNegativeNoLock(deltaReq, deltaNonPreemptibleRequest, i == selfQuotaIndex)
if curQuotaInfo.Name == extension.RootQuotaName {
return
}
Expand Down Expand Up @@ -226,7 +225,7 @@ func (gqm *GroupQuotaManager) recursiveUpdateGroupTreeWithDeltaRequest(deltaReq,

// updateGroupDeltaUsedNoLock updates the usedQuota of a node, it also updates all parent nodes
// no need to lock gqm.hierarchyUpdateLock
func (gqm *GroupQuotaManager) updateGroupDeltaUsedNoLock(quotaName string, delta, deltaNonPreemptibleUsed v1.ResourceList) {
func (gqm *GroupQuotaManager) updateGroupDeltaUsedNoLock(quotaName string, delta, deltaNonPreemptibleUsed v1.ResourceList, selfQuotaIndex int) {
curToAllParInfos := gqm.getCurToAllParentGroupQuotaInfoNoLock(quotaName)
allQuotaInfoLen := len(curToAllParInfos)
if allQuotaInfoLen <= 0 {
Expand All @@ -236,8 +235,7 @@ func (gqm *GroupQuotaManager) updateGroupDeltaUsedNoLock(quotaName string, delta
defer gqm.scopedLockForQuotaInfo(curToAllParInfos)()
for i := 0; i < allQuotaInfoLen; i++ {
quotaInfo := curToAllParInfos[i]
isSelfUsed := i == 0
quotaInfo.addUsedNonNegativeNoLock(delta, deltaNonPreemptibleUsed, isSelfUsed)
quotaInfo.addUsedNonNegativeNoLock(delta, deltaNonPreemptibleUsed, i == selfQuotaIndex)
}

if utilfeature.DefaultFeatureGate.Enabled(features.ElasticQuotaGuaranteeUsage) {
Expand Down Expand Up @@ -390,6 +388,7 @@ func (gqm *GroupQuotaManager) UpdateQuota(quota *v1alpha1.ElasticQuota, isDelete

quotaName := quota.Name
if isDelete {
// TODO: inplace delete
_, exist := gqm.quotaInfoMap[quotaName]
if !exist {
return fmt.Errorf("get quota info failed, quotaName:%v", quotaName)
Expand All @@ -407,28 +406,33 @@ func (gqm *GroupQuotaManager) UpdateQuota(quota *v1alpha1.ElasticQuota, isDelete
}
localQuotaInfo.updateQuotaInfoFromRemote(newQuotaInfo)
} else {
// TODO: inplace add
gqm.quotaInfoMap[quotaName] = newQuotaInfo
}
}

klog.Infof("reset quota tree %v, for quota %v updated", gqm.treeID, quota.Name)
gqm.updateQuotaGroupConfigNoLock()
gqm.resetQuotaNoLock()

return nil
}

func (gqm *GroupQuotaManager) updateQuotaGroupConfigNoLock() {
func (gqm *GroupQuotaManager) resetQuotaNoLock() {
start := time.Now()
defer func() {
klog.Infof("reset quota tree %v take %v", time.Since(start))
}()
// rebuild gqm.quotaTopoNodeMap
gqm.buildSubParGroupTopoNoLock()
gqm.rebuildQuotaTopoNodeMapNoLock()
// reset gqm.runtimeQuotaCalculator
gqm.resetAllGroupQuotaNoLock()
gqm.rebuildAllGroupQuotaNoLock()
}

// buildSubParGroupTopoNoLock reBuild a nodeTree from root, no need to lock gqm.lock
func (gqm *GroupQuotaManager) buildSubParGroupTopoNoLock() {
// buildSubParGroupTopoNoLock rebuild a nodeTree from root, no need to lock gqm.lock
func (gqm *GroupQuotaManager) rebuildQuotaTopoNodeMapNoLock() {
// rebuild QuotaTopoNodeMap
gqm.quotaTopoNodeMap = make(map[string]*QuotaTopoNode)
rootNode := NewQuotaTopoNode(extension.RootQuotaName, NewQuotaInfo(false, true, extension.RootQuotaName, extension.RootQuotaName))
rootNode := NewQuotaTopoNode(extension.RootQuotaName, NewQuotaInfo(true, true, extension.RootQuotaName, extension.RootQuotaName))
gqm.quotaTopoNodeMap[extension.RootQuotaName] = rootNode

// add node according to the quotaInfoMap
Expand All @@ -454,27 +458,10 @@ func (gqm *GroupQuotaManager) buildSubParGroupTopoNoLock() {
topoNode.parQuotaTopoNode = parQuotaTopoNode
parQuotaTopoNode.addChildGroupQuotaInfo(topoNode)
}

for _, topoNode := range gqm.quotaTopoNodeMap {
if topoNode.name == extension.RootQuotaName {
continue
}
if len(topoNode.childGroupQuotaInfos) > 0 {
topoNode.quotaInfo.IsParent = true
} else {
// the parent node become leaf node. clean used and childRequest
if topoNode.quotaInfo.IsParent == true {
topoNode.quotaInfo.IsParent = false
topoNode.quotaInfo.CalculateInfo.ChildRequest = v1.ResourceList{}
topoNode.quotaInfo.CalculateInfo.Used = v1.ResourceList{}
}
}
}

}

// ResetAllGroupQuotaNoLock no need to lock gqm.lock
func (gqm *GroupQuotaManager) resetAllGroupQuotaNoLock() {
// rebuildAllGroupQuotaNoLock will reset quota info and runtimeQuotaCalculator
func (gqm *GroupQuotaManager) rebuildAllGroupQuotaNoLock() {
toUpdateRequestMap, toUpdateNonPreemptibleUsedMap, toUpdateUsedMap, toUpdateNonPreemptibleRequestMap :=
make(quotaResMapType), make(quotaResMapType), make(quotaResMapType), make(quotaResMapType)
for quotaName, topoNode := range gqm.quotaTopoNodeMap {
Expand Down Expand Up @@ -510,8 +497,8 @@ func (gqm *GroupQuotaManager) resetAllGroupQuotaNoLock() {
// subGroup's topo relation may change; refresh the request/used from bottom to top
for quotaName, topoNode := range gqm.quotaTopoNodeMap {
if _, ok := toUpdateRequestMap[topoNode.quotaInfo.Name]; ok {
gqm.updateGroupDeltaRequestNoLock(quotaName, toUpdateRequestMap[quotaName], toUpdateNonPreemptibleRequestMap[quotaName])
gqm.updateGroupDeltaUsedNoLock(quotaName, toUpdateUsedMap[quotaName], toUpdateNonPreemptibleUsedMap[quotaName])
gqm.updateGroupDeltaRequestNoLock(quotaName, toUpdateRequestMap[quotaName], toUpdateNonPreemptibleRequestMap[quotaName], 0)
gqm.updateGroupDeltaUsedNoLock(quotaName, toUpdateUsedMap[quotaName], toUpdateNonPreemptibleUsedMap[quotaName], 0)
}
}
}
Expand Down Expand Up @@ -621,7 +608,7 @@ func (gqm *GroupQuotaManager) updatePodRequestNoLock(quotaName string, oldPod, n
if quotav1.IsZero(deltaReq) && quotav1.IsZero(deltaNonPreemptibleRequest) {
return
}
gqm.updateGroupDeltaRequestNoLock(quotaName, deltaReq, deltaNonPreemptibleRequest)
gqm.updateGroupDeltaRequestNoLock(quotaName, deltaReq, deltaNonPreemptibleRequest, 0)
}

func (gqm *GroupQuotaManager) updatePodUsedNoLock(quotaName string, oldPod, newPod *v1.Pod) {
Expand Down Expand Up @@ -659,7 +646,7 @@ func (gqm *GroupQuotaManager) updatePodUsedNoLock(quotaName string, oldPod, newP
quotaName, getPodName(oldPod, newPod), util.DumpJSON(newPodUsed), util.DumpJSON(newNonPreemptibleUsed))
}
}
gqm.updateGroupDeltaUsedNoLock(quotaName, deltaUsed, deltaNonPreemptibleUsed)
gqm.updateGroupDeltaUsedNoLock(quotaName, deltaUsed, deltaNonPreemptibleUsed, 0)
}

func (gqm *GroupQuotaManager) updatePodCacheNoLock(quotaName string, pod *v1.Pod, isAdd bool) {
Expand Down Expand Up @@ -1034,7 +1021,7 @@ func (gqm *GroupQuotaManager) doUpdateOneGroupMaxQuotaNoLock(quotaName string, n

newSubLimitReq := curQuotaInfo.getLimitRequestNoLock()
deltaRequest := quotav1.Subtract(newSubLimitReq, oldSubLimitReq)
gqm.recursiveUpdateGroupTreeWithDeltaRequest(deltaRequest, nil, curToAllParInfos[1:])
gqm.recursiveUpdateGroupTreeWithDeltaRequest(deltaRequest, nil, curToAllParInfos[1:], -1)
}
}

Expand Down Expand Up @@ -1072,7 +1059,7 @@ func (gqm *GroupQuotaManager) doUpdateOneGroupMinQuotaNoLock(quotaName string, n

newSubLimitReq := curQuotaInfo.getLimitRequestNoLock()
deltaRequest := quotav1.Subtract(newSubLimitReq, oldSubLimitReq)
gqm.recursiveUpdateGroupTreeWithDeltaRequest(deltaRequest, nil, curToAllParInfos[1:])
gqm.recursiveUpdateGroupTreeWithDeltaRequest(deltaRequest, nil, curToAllParInfos[1:], -1)
}

// update the guarantee.
Expand Down
Loading

0 comments on commit b3ea5b6

Please sign in to comment.