Skip to content

Commit

Permalink
schedler: move gang OnceSatified to gangGroupInfo (#2176)
Browse files Browse the repository at this point in the history
Signed-off-by: xingbao.zy <[email protected]>
Co-authored-by: xingbao.zy <[email protected]>
  • Loading branch information
buptcozy and xingbao.zy authored Aug 29, 2024
1 parent 0ddf353 commit f7c6414
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ func TestPermit(t *testing.T) {
gangId := util.GetId(pg.Namespace, pg.Name)
gang := mgr.cache.getGangFromCacheByGangId(gangId, false)
gang.lock.Lock()
gang.OnceResourceSatisfied = tt.onceSatisfy
gang.GangGroupInfo.OnceResourceSatisfied = tt.onceSatisfy
gang.GangMatchPolicy = tt.matchPolicy
gang.lock.Unlock()
}
Expand Down
17 changes: 5 additions & 12 deletions pkg/scheduler/plugins/coscheduling/core/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ type Gang struct {
WaitingForBindChildren map[string]*v1.Pod
// pods that have already bound
BoundChildren map[string]*v1.Pod
// OnceResourceSatisfied indicates whether the gang has ever reached the ResourceSatisfied state,which means the
// children number has reached the minNum in the early step,
// once this variable is set true, it is irreversible.
OnceResourceSatisfied bool

// only-waiting, only consider waiting pods
// waiting-and-running, consider waiting and running pods
Expand Down Expand Up @@ -344,7 +340,7 @@ func (gang *Gang) isGangOnceResourceSatisfied() bool {
gang.lock.Lock()
defer gang.lock.Unlock()

return gang.OnceResourceSatisfied
return gang.GangGroupInfo.isGangOnceResourceSatisfied()
}

func (gang *Gang) setChild(pod *v1.Pod) {
Expand Down Expand Up @@ -472,10 +468,7 @@ func (gang *Gang) setResourceSatisfied() {
gang.lock.Lock()
defer gang.lock.Unlock()

if !gang.OnceResourceSatisfied {
gang.OnceResourceSatisfied = true
klog.Infof("Gang ResourceSatisfied, gangName: %v", gang.Name)
}
gang.GangGroupInfo.setResourceSatisfied()
}

func (gang *Gang) addBoundPod(pod *v1.Pod) {
Expand All @@ -487,8 +480,8 @@ func (gang *Gang) addBoundPod(pod *v1.Pod) {
gang.BoundChildren[podId] = pod

klog.Infof("AddBoundPod, gangName: %v, podName: %v", gang.Name, podId)
if !gang.OnceResourceSatisfied && len(gang.BoundChildren) >= gang.MinRequiredNumber {
gang.OnceResourceSatisfied = true
if !gang.GangGroupInfo.isGangOnceResourceSatisfied() {
gang.GangGroupInfo.setResourceSatisfied()
klog.Infof("Gang ResourceSatisfied due to addBoundPod, gangName: %v", gang.Name)
}
}
Expand All @@ -507,6 +500,6 @@ func (gang *Gang) isGangValidForPermit() bool {
case extension.GangMatchPolicyWaitingAndRunning:
return len(gang.WaitingForBindChildren)+len(gang.BoundChildren) >= gang.MinRequiredNumber
default:
return len(gang.WaitingForBindChildren) >= gang.MinRequiredNumber || gang.OnceResourceSatisfied == true
return len(gang.WaitingForBindChildren) >= gang.MinRequiredNumber || gang.GangGroupInfo.isGangOnceResourceSatisfied()
}
}
8 changes: 6 additions & 2 deletions pkg/scheduler/plugins/coscheduling/core/gang_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (gangCache *GangCache) deleteGangFromCacheByGangId(gangId string) {
}

func (gangCache *GangCache) onPodAdd(obj interface{}) {
gangCache.onPodAddInternal(obj, "create")
}

func (gangCache *GangCache) onPodAddInternal(obj interface{}, action string) {
pod, ok := obj.(*v1.Pod)
if !ok {
return
Expand Down Expand Up @@ -152,7 +156,7 @@ func (gangCache *GangCache) onPodAdd(obj interface{}) {
gang.setResourceSatisfied()
}

klog.Infof("watch pod created, Name:%v, pgLabel:%v", pod.Name, pod.Labels[v1alpha1.PodGroupLabel])
klog.Infof("watch pod %v, Name:%v, pgLabel:%v", action, pod.Name, pod.Labels[v1alpha1.PodGroupLabel])
}

func (gangCache *GangCache) onPodUpdate(oldObj, newObj interface{}) {
Expand All @@ -170,7 +174,7 @@ func (gangCache *GangCache) onPodUpdate(oldObj, newObj interface{}) {
return
}

gangCache.onPodAdd(newObj)
gangCache.onPodAddInternal(newObj, "update")
}

func (gangCache *GangCache) onPodDelete(obj interface{}) {
Expand Down
24 changes: 14 additions & 10 deletions pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ func TestGangCache_OnPodAdd(t *testing.T) {

defaultArgs := getTestDefaultCoschedulingArgs(t)
tests := []struct {
name string
pods []*corev1.Pod
wantCache map[string]*Gang
name string
pods []*corev1.Pod
wantCache map[string]*Gang
onceSatisfied bool
}{
{
name: "add invalid pod",
Expand Down Expand Up @@ -216,9 +217,9 @@ func TestGangCache_OnPodAdd(t *testing.T) {
},
},
},
OnceResourceSatisfied: true,
},
},
onceSatisfied: true,
},
{
name: "add pod announcing Gang in lightweight-coscheduling way",
Expand Down Expand Up @@ -314,9 +315,9 @@ func TestGangCache_OnPodAdd(t *testing.T) {
},
},
},
OnceResourceSatisfied: true,
},
},
onceSatisfied: true,
},
{
name: "add pods announcing Gang in Annotation way,but with illegal args",
Expand Down Expand Up @@ -506,6 +507,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
continue
}
tt.wantCache[k].GangGroupId = util.GetGangGroupId(v.GangGroup)
tt.wantCache[k].GangGroupInfo.OnceResourceSatisfied = tt.onceSatisfied
}

for _, pod := range tt.pods {
Expand Down Expand Up @@ -543,9 +545,10 @@ func TestGangCache_OnPodUpdate(t *testing.T) {

defaultArgs := getTestDefaultCoschedulingArgs(t)
tests := []struct {
name string
pods []*corev1.Pod
wantCache map[string]*Gang
name string
pods []*corev1.Pod
wantCache map[string]*Gang
onceSatisfied bool
}{
{
name: "add invalid pod",
Expand Down Expand Up @@ -649,9 +652,9 @@ func TestGangCache_OnPodUpdate(t *testing.T) {
},
},
},
OnceResourceSatisfied: true,
},
},
onceSatisfied: true,
},
}

Expand All @@ -671,6 +674,7 @@ func TestGangCache_OnPodUpdate(t *testing.T) {
continue
}
tt.wantCache[k].GangGroupId = util.GetGangGroupId(v.GangGroup)
tt.wantCache[k].GangGroupInfo.OnceResourceSatisfied = true
}

for _, pod := range tt.pods {
Expand Down Expand Up @@ -1125,8 +1129,8 @@ func TestGangCache_OnGangDelete(t *testing.T) {
},
},
},
OnceResourceSatisfied: true,
}
wantedGang.GangGroupInfo.OnceResourceSatisfied = true

wantedGang.GangGroupInfo.GangTotalChildrenNumMap[wantedGang.Name] = wantedGang.TotalChildrenNum
wantedGang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"] = wantedGang.GangGroupInfo.LastScheduleTime
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/coscheduling/core/gang_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (gang *Gang) GetGangSummary() *GangSummary {
gangSummary.GangMatchPolicy = gang.GangMatchPolicy
gangSummary.MinRequiredNumber = gang.MinRequiredNumber
gangSummary.TotalChildrenNum = gang.TotalChildrenNum
gangSummary.OnceResourceSatisfied = gang.OnceResourceSatisfied
gangSummary.OnceResourceSatisfied = gang.GangGroupInfo.isGangOnceResourceSatisfied()
gangSummary.GangGroupInfo = gang.GangGroupInfo
gangSummary.GangFrom = gang.GangFrom
gangSummary.HasGangInit = gang.HasGangInit
Expand Down
22 changes: 22 additions & 0 deletions pkg/scheduler/plugins/coscheduling/core/ganggroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type GangGroupInfo struct {
GangTotalChildrenNumMap map[string]int
ChildrenScheduleRoundMap map[string]int

// OnceResourceSatisfied indicates whether the gang has ever reached the ResourceSatisfied state,which means the
// children number has reached the minNum in the early step,
// once this variable is set true, it is irreversible.
OnceResourceSatisfied bool

LastScheduleTime time.Time
ChildrenLastScheduleTime map[string]time.Time
}
Expand Down Expand Up @@ -224,3 +229,20 @@ func (gg *GangGroupInfo) resetPodLastScheduleTime(pod *corev1.Pod) {
podId := util.GetId(pod.Namespace, pod.Name)
gg.ChildrenLastScheduleTime[podId] = gg.LastScheduleTime
}

func (gg *GangGroupInfo) isGangOnceResourceSatisfied() bool {
gg.lock.Lock()
defer gg.lock.Unlock()

return gg.OnceResourceSatisfied
}

func (gg *GangGroupInfo) setResourceSatisfied() {
gg.lock.Lock()
defer gg.lock.Unlock()

if !gg.OnceResourceSatisfied {
gg.OnceResourceSatisfied = true
klog.Infof("Gang ResourceSatisfied, gangName: %v", gg.GangGroupId)
}
}
7 changes: 7 additions & 0 deletions pkg/scheduler/plugins/coscheduling/core/ganggroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,11 @@ func TestGangGroupInfo(t *testing.T) {
gg.deletePodLastScheduleTime("test/pod1")
assert.Equal(t, 1, len(gg.ChildrenLastScheduleTime))
}
{
gg := NewGangGroupInfo("aa", []string{"aa"})
assert.Equal(t, false, gg.isGangOnceResourceSatisfied())

gg.setResourceSatisfied()
assert.Equal(t, true, gg.isGangOnceResourceSatisfied())
}
}
2 changes: 2 additions & 0 deletions pkg/scheduler/plugins/coscheduling/coscheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ func TestPostFilter(t *testing.T) {
if tt.pod.Name == "pod3" {
wg.Add(2)
}

for _, pod := range tt.pods {
tmpPod := pod
suit.Handle.(framework.Framework).RunPermitPlugins(context.Background(), cycleState, tmpPod, "")
Expand All @@ -652,6 +653,7 @@ func TestPostFilter(t *testing.T) {
defer wg.Done()
}()
}

if tt.pod.Name == "pod3" {
totalWaitingPods := 0
suit.Handle.IterateOverWaitingPods(
Expand Down

0 comments on commit f7c6414

Please sign in to comment.