Skip to content

Commit

Permalink
koord-scheduler: Add non-preemptible feature in Elastic Quota Managem…
Browse files Browse the repository at this point in the history
…ent (#1668)

Signed-off-by: wangyang55 <[email protected]>
  • Loading branch information
tan90github authored Oct 10, 2023
1 parent 14087cb commit 5941c67
Show file tree
Hide file tree
Showing 13 changed files with 261 additions and 73 deletions.
5 changes: 5 additions & 0 deletions apis/extension/elastic_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
LabelQuotaIsRoot = QuotaKoordinatorPrefix + "/is-root"
LabelQuotaTreeID = QuotaKoordinatorPrefix + "/tree-id"
LabelQuotaIgnoreDefaultTree = QuotaKoordinatorPrefix + "/ignore-default-tree"
LabelPreemptible = QuotaKoordinatorPrefix + "/preemptible"
AnnotationSharedWeight = QuotaKoordinatorPrefix + "/shared-weight"
AnnotationRuntime = QuotaKoordinatorPrefix + "/runtime"
AnnotationRequest = QuotaKoordinatorPrefix + "/request"
Expand Down Expand Up @@ -70,6 +71,10 @@ func IsTreeRootQuota(quota *v1alpha1.ElasticQuota) bool {
return quota.Labels[LabelQuotaIsRoot] == "true"
}

func IsPodNonPreemptible(pod *corev1.Pod) bool {
return pod.Labels[LabelPreemptible] == "false"
}

func GetQuotaTreeID(quota *v1alpha1.ElasticQuota) string {
return quota.Labels[LabelQuotaTreeID]
}
Expand Down
33 changes: 23 additions & 10 deletions pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (gqm *GroupQuotaManager) recursiveUpdateGroupTreeWithDeltaRequest(deltaReq

// updateGroupDeltaUsedNoLock updates the usedQuota of a node, it also updates all parent nodes
// no need to lock gqm.hierarchyUpdateLock
func (gqm *GroupQuotaManager) updateGroupDeltaUsedNoLock(quotaName string, delta v1.ResourceList) {
func (gqm *GroupQuotaManager) updateGroupDeltaUsedNoLock(quotaName string, delta, deltaNonPreemptibleUsed v1.ResourceList) {
curToAllParInfos := gqm.getCurToAllParentGroupQuotaInfoNoLock(quotaName)
allQuotaInfoLen := len(curToAllParInfos)
if allQuotaInfoLen <= 0 {
Expand All @@ -234,7 +234,7 @@ func (gqm *GroupQuotaManager) updateGroupDeltaUsedNoLock(quotaName string, delta
defer gqm.scopedLockForQuotaInfo(curToAllParInfos)()
for i := 0; i < allQuotaInfoLen; i++ {
quotaInfo := curToAllParInfos[i]
quotaInfo.addUsedNonNegativeNoLock(delta)
quotaInfo.addUsedNonNegativeNoLock(delta, deltaNonPreemptibleUsed)
}

if utilfeature.DefaultFeatureGate.Enabled(features.ElasticQuotaGuaranteeUsage) {
Expand Down Expand Up @@ -472,7 +472,7 @@ func (gqm *GroupQuotaManager) buildSubParGroupTopoNoLock() {

// ResetAllGroupQuotaNoLock no need to lock gqm.lock
func (gqm *GroupQuotaManager) resetAllGroupQuotaNoLock() {
childRequestMap, childUsedMap := make(quotaResMapType), make(quotaResMapType)
childRequestMap, childNonPreemptibleUsedMap, childUsedMap := make(quotaResMapType), make(quotaResMapType), make(quotaResMapType)
for quotaName, topoNode := range gqm.quotaTopoNodeMap {
if quotaName == extension.RootQuotaName {
gqm.resetRootQuotaUsedAndRequest()
Expand All @@ -481,6 +481,7 @@ func (gqm *GroupQuotaManager) resetAllGroupQuotaNoLock() {
topoNode.quotaInfo.lock.Lock()
if !topoNode.quotaInfo.IsParent {
childRequestMap[quotaName] = topoNode.quotaInfo.CalculateInfo.ChildRequest.DeepCopy()
childNonPreemptibleUsedMap[quotaName] = topoNode.quotaInfo.CalculateInfo.NonPreemptibleUsed.DeepCopy()
childUsedMap[quotaName] = topoNode.quotaInfo.CalculateInfo.Used.DeepCopy()
}
topoNode.quotaInfo.clearForResetNoLock()
Expand All @@ -500,7 +501,7 @@ func (gqm *GroupQuotaManager) resetAllGroupQuotaNoLock() {
for quotaName, topoNode := range gqm.quotaTopoNodeMap {
if !topoNode.quotaInfo.IsParent {
gqm.updateGroupDeltaRequestNoLock(quotaName, childRequestMap[quotaName])
gqm.updateGroupDeltaUsedNoLock(quotaName, childUsedMap[quotaName])
gqm.updateGroupDeltaUsedNoLock(quotaName, childUsedMap[quotaName], childNonPreemptibleUsedMap[quotaName])
}
}
}
Expand Down Expand Up @@ -615,28 +616,36 @@ func (gqm *GroupQuotaManager) updatePodUsedNoLock(quotaName string, oldPod, newP
return
}

var oldPodUsed, newPodUsed v1.ResourceList
var oldPodUsed, newPodUsed, oldNonPreemptibleUsed, newNonPreemptibleUsed v1.ResourceList
if oldPod != nil {
oldPodUsed, _ = PodRequestsAndLimits(oldPod)
if extension.IsPodNonPreemptible(oldPod) {
oldNonPreemptibleUsed = oldPodUsed
}
} else {
oldPodUsed = make(v1.ResourceList)
oldNonPreemptibleUsed = make(v1.ResourceList)
}

if newPod != nil {
newPodUsed, _ = PodRequestsAndLimits(newPod)
if extension.IsPodNonPreemptible(newPod) {
newNonPreemptibleUsed = newPodUsed
}
} else {
newPodUsed = make(v1.ResourceList)
newNonPreemptibleUsed = make(v1.ResourceList)
}

deltaUsed := quotav1.Subtract(newPodUsed, oldPodUsed)
if quotav1.IsZero(deltaUsed) {
deltaNonPreemptibleUsed := quotav1.Subtract(newNonPreemptibleUsed, oldNonPreemptibleUsed)
if quotav1.IsZero(deltaUsed) && quotav1.IsZero(deltaNonPreemptibleUsed) {
if klog.V(5).Enabled() {
klog.Infof("updatePodUsed, deltaUsedIsZero, quotaName: %v, podName: %v, podUsed: %v",
quotaName, getPodName(oldPod, newPod), util.DumpJSON(newPodUsed))
klog.Infof("updatePodUsed, deltaUsedIsZero and deltaNonPreemptibleUsedIsZero, quotaName: %v, podName: %v, podUsed: %v, podNonPreemptibleUsed: %v",
quotaName, getPodName(oldPod, newPod), util.DumpJSON(newPodUsed), util.DumpJSON(newNonPreemptibleUsed))
}
return
}
gqm.updateGroupDeltaUsedNoLock(quotaName, deltaUsed)
gqm.updateGroupDeltaUsedNoLock(quotaName, deltaUsed, deltaNonPreemptibleUsed)
}

func (gqm *GroupQuotaManager) updatePodCacheNoLock(quotaName string, pod *v1.Pod, isAdd bool) {
Expand Down Expand Up @@ -878,21 +887,25 @@ func (gqm *GroupQuotaManager) resetRootQuotaUsedAndRequest() {

used := v1.ResourceList{}
request := v1.ResourceList{}
nonPreemptUsed := v1.ResourceList{}

systemQuotaInfo := gqm.getQuotaInfoByNameNoLock(extension.SystemQuotaName)
if systemQuotaInfo != nil {
used = quotav1.Add(used, systemQuotaInfo.GetUsed())
request = quotav1.Add(request, systemQuotaInfo.GetRequest())
nonPreemptUsed = quotav1.Add(nonPreemptUsed, systemQuotaInfo.GetNonPreemptibleUsed())
}

defaultQuotaInfo := gqm.getQuotaInfoByNameNoLock(extension.DefaultQuotaName)
if defaultQuotaInfo != nil {
used = quotav1.Add(used, defaultQuotaInfo.GetUsed())
request = quotav1.Add(request, defaultQuotaInfo.GetRequest())
nonPreemptUsed = quotav1.Add(nonPreemptUsed, defaultQuotaInfo.GetNonPreemptibleUsed())
}

rootQuotaInfo.CalculateInfo.Used = used
rootQuotaInfo.CalculateInfo.Request = request
rootQuotaInfo.CalculateInfo.NonPreemptibleUsed = nonPreemptUsed
}

func (gqm *GroupQuotaManager) recursiveUpdateGroupTreeWithDeltaAllocated(deltaAllocated v1.ResourceList, curToAllParInfos []*QuotaInfo) {
Expand Down
48 changes: 29 additions & 19 deletions pkg/scheduler/plugins/elasticquota/core/group_quota_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,31 +430,37 @@ func TestGroupQuotaManager_UpdateQuotaRequest(t *testing.T) {
assert.Equal(t, createResourceList(43, 80*GigaByte), runtime)
}

func TestGroupQuotaManager_UpdateGroupDeltaUsed(t *testing.T) {
func TestGroupQuotaManager_UpdateGroupDeltaUsedAndNonPreemptibleUsed(t *testing.T) {
gqm := NewGroupQuotaManagerForTest()

AddQuotaToManager(t, gqm, "test1", extension.RootQuotaName, 96, 160*GigaByte, 50, 80*GigaByte, true, false)

// 1. test1 used[120, 290] runtime == maxQuota
// 1. test1 used[120, 290] runtime == maxQuota nonPreemptibleUsed[20, 30]
used := createResourceList(120, 290*GigaByte)
gqm.updateGroupDeltaUsedNoLock("test1", used)
nonPreemptibleUsed := createResourceList(20, 30*GigaByte)
gqm.updateGroupDeltaUsedNoLock("test1", used, nonPreemptibleUsed)
quotaInfo := gqm.GetQuotaInfoByName("test1")
assert.NotNil(t, quotaInfo)
assert.Equal(t, used, quotaInfo.CalculateInfo.Used)
assert.Equal(t, nonPreemptibleUsed, quotaInfo.CalculateInfo.NonPreemptibleUsed)

// 2. used increases to [130,300]
used = createResourceList(10, 10*GigaByte)
gqm.updateGroupDeltaUsedNoLock("test1", used)
nonPreemptibleUsed = createResourceList(10, 10*GigaByte)
gqm.updateGroupDeltaUsedNoLock("test1", used, nonPreemptibleUsed)
quotaInfo = gqm.GetQuotaInfoByName("test1")
assert.NotNil(t, quotaInfo)
assert.Equal(t, createResourceList(130, 300*GigaByte), quotaInfo.CalculateInfo.Used)
assert.Equal(t, createResourceList(30, 40*GigaByte), quotaInfo.CalculateInfo.NonPreemptibleUsed)

// 3. used decreases to [90,100]
used = createResourceList(-40, -200*GigaByte)
gqm.updateGroupDeltaUsedNoLock("test1", used)
nonPreemptibleUsed = createResourceList(-15, -20*GigaByte)
gqm.updateGroupDeltaUsedNoLock("test1", used, nonPreemptibleUsed)
quotaInfo = gqm.GetQuotaInfoByName("test1")
assert.NotNil(t, quotaInfo)
assert.Equal(t, createResourceList(90, 100*GigaByte), quotaInfo.CalculateInfo.Used)
assert.Equal(t, createResourceList(15, 20*GigaByte), quotaInfo.CalculateInfo.NonPreemptibleUsed)
}

func TestGroupQuotaManager_MultiQuotaAdd(t *testing.T) {
Expand Down Expand Up @@ -712,26 +718,30 @@ func TestGroupQuotaManager_MultiUpdateQuotaRequest_WithScaledMinQuota2(t *testin
assert.Equal(t, createResourceList2(66666, 200*GigaByte/3), quotaInfo.CalculateInfo.AutoScaleMin)
}

func TestGroupQuotaManager_MultiUpdateQuotaUsed(t *testing.T) {
func TestGroupQuotaManager_MultiUpdateQuotaUsedAndNonPreemptibleUsed(t *testing.T) {
gqm := NewGroupQuotaManagerForTest()

AddQuotaToManager(t, gqm, "test1", extension.RootQuotaName, 96, 160*GigaByte, 50, 80*GigaByte, true, true)
AddQuotaToManager(t, gqm, "test1-sub1", "test1", 96, 160*GigaByte, 50, 80*GigaByte, true, true)
AddQuotaToManager(t, gqm, "test1-sub1-1", "test1-sub1", 96, 160*GigaByte, 50, 80*GigaByte, true, false)

used := createResourceList(120, 290*GigaByte)
gqm.updateGroupDeltaUsedNoLock("test1-sub1", used)
nonPreemptibleUsed := createResourceList(50, 100*GigaByte)
gqm.updateGroupDeltaUsedNoLock("test1-sub1", used, nonPreemptibleUsed)
quotaInfo := gqm.GetQuotaInfoByName("test1-sub1")
assert.True(t, quotaInfo != nil)
assert.Equal(t, used, quotaInfo.CalculateInfo.Used)
assert.Equal(t, nonPreemptibleUsed, quotaInfo.CalculateInfo.NonPreemptibleUsed)

quotaInfo = gqm.GetQuotaInfoByName("test1-sub1")
assert.True(t, quotaInfo != nil)
assert.Equal(t, used, quotaInfo.CalculateInfo.Used)
assert.Equal(t, nonPreemptibleUsed, quotaInfo.CalculateInfo.NonPreemptibleUsed)

quotaInfo = gqm.GetQuotaInfoByName("test1")
assert.True(t, quotaInfo != nil)
assert.Equal(t, used, quotaInfo.CalculateInfo.Used)
assert.Equal(t, nonPreemptibleUsed, quotaInfo.CalculateInfo.NonPreemptibleUsed)
}

func TestGroupQuotaManager_UpdateQuotaParentName(t *testing.T) {
Expand All @@ -758,7 +768,7 @@ func TestGroupQuotaManager_UpdateQuotaParentName(t *testing.T) {
// a-123 request [60,100]
request := createResourceList(60, 100*GigaByte)
gqm.updateGroupDeltaRequestNoLock("a-123", request)
gqm.updateGroupDeltaUsedNoLock("a-123", request)
gqm.updateGroupDeltaUsedNoLock("a-123", request, createResourceList(0, 0))
runtime := gqm.RefreshRuntime("a-123")
assert.Equal(t, request, runtime)

Expand All @@ -771,7 +781,7 @@ func TestGroupQuotaManager_UpdateQuotaParentName(t *testing.T) {
// test2-a request [20,40]
request = createResourceList(20, 40*GigaByte)
gqm.updateGroupDeltaRequestNoLock("test2-a", request)
gqm.updateGroupDeltaUsedNoLock("test2-a", request)
gqm.updateGroupDeltaUsedNoLock("test2-a", request, createResourceList(0, 0))
runtime = gqm.RefreshRuntime("test2-a")
assert.Equal(t, request, runtime)

Expand Down Expand Up @@ -855,7 +865,7 @@ func TestGroupQuotaManager_UpdateClusterTotalResource(t *testing.T) {
assert.Equal(t, totalRes, quotaTotalRes)

sysUsed := createResourceList(10, 30*GigaByte)
gqm.updateGroupDeltaUsedNoLock(extension.SystemQuotaName, sysUsed)
gqm.updateGroupDeltaUsedNoLock(extension.SystemQuotaName, sysUsed, createResourceList(0, 0))
assert.Equal(t, sysUsed, gqm.GetQuotaInfoByName(extension.SystemQuotaName).GetUsed())

// 90, 510
Expand All @@ -867,21 +877,21 @@ func TestGroupQuotaManager_UpdateClusterTotalResource(t *testing.T) {
assert.Equal(t, delta, quotaTotalRes)

// 80, 480
gqm.updateGroupDeltaUsedNoLock(extension.SystemQuotaName, createResourceList(10, 30))
gqm.updateGroupDeltaUsedNoLock(extension.SystemQuotaName, createResourceList(10, 30), createResourceList(0, 0))
delta = quotav1.Subtract(delta, createResourceList(10, 30))
assert.Equal(t, totalRes, gqm.totalResource)
assert.Equal(t, delta, gqm.totalResourceExceptSystemAndDefaultUsed)

// 70, 450
defaultUsed := createResourceList(10, 30)
gqm.updateGroupDeltaUsedNoLock(extension.DefaultQuotaName, defaultUsed)
gqm.updateGroupDeltaUsedNoLock(extension.DefaultQuotaName, defaultUsed, createResourceList(0, 0))
assert.Equal(t, defaultUsed, gqm.GetQuotaInfoByName(extension.DefaultQuotaName).GetUsed())
delta = quotav1.Subtract(delta, defaultUsed)
assert.Equal(t, totalRes, gqm.totalResource)
assert.Equal(t, delta, gqm.totalResourceExceptSystemAndDefaultUsed)

// 60 420
gqm.updateGroupDeltaUsedNoLock(extension.DefaultQuotaName, defaultUsed)
gqm.updateGroupDeltaUsedNoLock(extension.DefaultQuotaName, defaultUsed, createResourceList(0, 0))
delta = quotav1.Subtract(delta, defaultUsed)
assert.Equal(t, totalRes, gqm.totalResource)
assert.Equal(t, delta, gqm.totalResourceExceptSystemAndDefaultUsed)
Expand Down Expand Up @@ -1428,7 +1438,7 @@ func TestGroupQuotaManager_GetQuotaInformationForSyncHandler(t *testing.T) {
gqm.UpdateClusterTotalResource(createResourceList(1000, 1000))
gqm.updateGroupDeltaRequestNoLock("1", createResourceList(100, 100))
gqm.RefreshRuntime("1")
gqm.updateGroupDeltaUsedNoLock("1", createResourceList(10, 10))
gqm.updateGroupDeltaUsedNoLock("1", createResourceList(10, 10), createResourceList(0, 0))
used, request, childRequest, runtime, _, _, err := gqm.GetQuotaInformationForSyncHandler("1")
assert.Nil(t, err)
assert.Equal(t, used, createResourceList(10, 10))
Expand All @@ -1446,7 +1456,7 @@ func TestGroupQuotaManager_GetQuotaInformationForSyncHandlerWithUsageGuarantee(t
gqm.UpdateClusterTotalResource(createResourceList(1000, 1000))
gqm.updateGroupDeltaRequestNoLock("1", createResourceList(100, 100))
gqm.RefreshRuntime("1")
gqm.updateGroupDeltaUsedNoLock("1", createResourceList(10, 10))
gqm.updateGroupDeltaUsedNoLock("1", createResourceList(10, 10), createResourceList(0, 0))
used, request, childRequest, runtime, guaranteed, allocated, err := gqm.GetQuotaInformationForSyncHandler("1")
assert.Nil(t, err)
assert.Equal(t, used, createResourceList(10, 10))
Expand Down Expand Up @@ -1492,12 +1502,12 @@ func TestGroupQuotaManager_UpdateRootQuotaUsed(t *testing.T) {

sysUsed := createResourceList(10, 30)
expectedTotalUsed = quotav1.Add(expectedTotalUsed, sysUsed)
gqm.updateGroupDeltaUsedNoLock(extension.SystemQuotaName, sysUsed)
gqm.updateGroupDeltaUsedNoLock(extension.SystemQuotaName, sysUsed, createResourceList(0, 0))
assert.Equal(t, sysUsed, gqm.GetQuotaInfoByName(extension.SystemQuotaName).GetUsed())

defaultUsed := createResourceList(2, 5)
expectedTotalUsed = quotav1.Add(expectedTotalUsed, defaultUsed)
gqm.updateGroupDeltaUsedNoLock(extension.DefaultQuotaName, defaultUsed)
gqm.updateGroupDeltaUsedNoLock(extension.DefaultQuotaName, defaultUsed, createResourceList(0, 0))
assert.Equal(t, defaultUsed, gqm.GetQuotaInfoByName(extension.DefaultQuotaName).GetUsed())

//case1: no quota, root quota used
Expand Down Expand Up @@ -1530,8 +1540,8 @@ func TestGroupQuotaManager_UpdateRootQuotaUsed(t *testing.T) {
gqm.UpdateQuota(qi1, false)
gqm.UpdateQuota(qi2, false)
gqm.UpdateQuota(qi3, false)
gqm.updateGroupDeltaUsedNoLock("2", createResourceList(5, 5))
gqm.updateGroupDeltaUsedNoLock("3", createResourceList(7, 5))
gqm.updateGroupDeltaUsedNoLock("2", createResourceList(5, 5), createResourceList(0, 0))
gqm.updateGroupDeltaUsedNoLock("3", createResourceList(7, 5), createResourceList(0, 0))

expectedTotalUsed = quotav1.Add(expectedTotalUsed, createResourceList(5, 5))
expectedTotalUsed = quotav1.Add(expectedTotalUsed, createResourceList(7, 5))
Expand Down
Loading

0 comments on commit 5941c67

Please sign in to comment.