Skip to content

Commit

Permalink
scheduler: take all assigned cpu cores into account instead of only t…
Browse files Browse the repository at this point in the history
…hose part of the largest lifecycle (#24304)

Fixes a bug in the AllocatedResources.Comparable method, where the scheduler
would only take into account the cpusets of the tasks in the largest lifecycle.
This could result in overlapping cgroup cpusets. Now we make the distinction
between reserved and fungible resources throughout the lifespan of the alloc.
In addition, added logging in case of future regressions thus not requiring
manual inspection of cgroup files.
  • Loading branch information
mvegter authored Nov 21, 2024
1 parent a9e7166 commit 997da25
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .changelog/24304.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: take all assigned cpu cores into account instead of only those part of the largest lifecycle
```
12 changes: 10 additions & 2 deletions client/lib/cgroupslib/partition_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,18 @@ func (p *partition) Reserve(cores *idset.Set[hw.CoreID]) error {
p.lock.Lock()
defer p.lock.Unlock()

// Use the intersection with the usable cores to avoid adding more cores than available.
usableCores := p.usableCores.Intersect(cores)

overlappingCores := p.reserve.Intersect(usableCores)
if overlappingCores.Size() > 0 {
// COMPAT: prior to Nomad 1.9.X this would silently happen, this should probably return an error instead
p.log.Warn("Unable to exclusively reserve the requested cores", "cores", cores, "overlapping_cores", overlappingCores)
}

p.share.RemoveSet(cores)
p.reserve.InsertSet(usableCores)

// Use the intersection with the usable cores to avoid adding more cores than available.
p.reserve.InsertSet(p.usableCores.Intersect(cores))
return p.write()
}

Expand Down
73 changes: 73 additions & 0 deletions nomad/structs/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,79 @@ func TestAllocsFit(t *testing.T) {
must.Eq(t, 1024, used.Flattened.Memory.MemoryMB)
}

func TestAllocsFit_Cores(t *testing.T) {
ci.Parallel(t)

n := node2k()

a1 := &Allocation{
AllocatedResources: &AllocatedResources{
Tasks: map[string]*AllocatedTaskResources{
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 500,
ReservedCores: []uint16{0},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
},
},
},
}

a2 := &Allocation{
AllocatedResources: &AllocatedResources{
Tasks: map[string]*AllocatedTaskResources{
"web-prestart": {
Cpu: AllocatedCpuResources{
CpuShares: 500,
ReservedCores: []uint16{1},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
},
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 500,
ReservedCores: []uint16{0},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
},
},
TaskLifecycles: map[string]*TaskLifecycleConfig{
"web-prestart": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
},
},
}

// Should fit one allocation
fit, dim, used, err := AllocsFit(n, []*Allocation{a1}, nil, false)
must.NoError(t, err)
must.True(t, fit, must.Sprintf("failed for dimension %q", dim))
must.Eq(t, 500, used.Flattened.Cpu.CpuShares)
must.Eq(t, 1024, used.Flattened.Memory.MemoryMB)

// Should fit one allocation
fit, dim, used, err = AllocsFit(n, []*Allocation{a2}, nil, false)
must.NoError(t, err)
must.True(t, fit, must.Sprintf("failed for dimension %q", dim))
must.Eq(t, 1000, used.Flattened.Cpu.CpuShares)
must.Eq(t, 1024, used.Flattened.Memory.MemoryMB)

// Should not fit both allocations
fit, dim, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false)
must.NoError(t, err)
must.False(t, fit)
must.Eq(t, dim, "cores")
}

func TestAllocsFit_TerminalAlloc(t *testing.T) {
ci.Parallel(t)

Expand Down
58 changes: 34 additions & 24 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3856,35 +3856,45 @@ func (a *AllocatedResources) Comparable() *ComparableResources {
Shared: a.Shared,
}

prestartSidecarTasks := &AllocatedTaskResources{}
prestartEphemeralTasks := &AllocatedTaskResources{}
main := &AllocatedTaskResources{}
poststartTasks := &AllocatedTaskResources{}
poststopTasks := &AllocatedTaskResources{}

for taskName, r := range a.Tasks {
lc := a.TaskLifecycles[taskName]
if lc == nil {
main.Add(r)
} else if lc.Hook == TaskLifecycleHookPrestart {
if lc.Sidecar {
prestartSidecarTasks.Add(r)
// The lifecycle in which a task could run
prestartLifecycle := &AllocatedTaskResources{}
mainLifecycle := &AllocatedTaskResources{}
stopLifecycle := &AllocatedTaskResources{}

for taskName, taskResources := range a.Tasks {
taskLifecycle := a.TaskLifecycles[taskName]
fungibleTaskResources := taskResources.Copy()

// Reserved cores (and their respective bandwidth) are not fungible,
// hence we should always include it as part of the Flattened resources.
if len(fungibleTaskResources.Cpu.ReservedCores) > 0 {
c.Flattened.Cpu.Add(&fungibleTaskResources.Cpu)
fungibleTaskResources.Cpu = AllocatedCpuResources{}
}

if taskLifecycle == nil {
mainLifecycle.Add(fungibleTaskResources)
} else if taskLifecycle.Hook == TaskLifecycleHookPrestart {
if taskLifecycle.Sidecar {
// These tasks span both the prestart and main lifecycle
prestartLifecycle.Add(fungibleTaskResources)
mainLifecycle.Add(fungibleTaskResources)
} else {
prestartEphemeralTasks.Add(r)
prestartLifecycle.Add(fungibleTaskResources)
}
} else if lc.Hook == TaskLifecycleHookPoststart {
poststartTasks.Add(r)
} else if lc.Hook == TaskLifecycleHookPoststop {
poststopTasks.Add(r)
} else if taskLifecycle.Hook == TaskLifecycleHookPoststart {
mainLifecycle.Add(fungibleTaskResources)
} else if taskLifecycle.Hook == TaskLifecycleHookPoststop {
stopLifecycle.Add(fungibleTaskResources)
}
}

// update this loop to account for lifecycle hook
main.Add(poststartTasks)
prestartEphemeralTasks.Max(main)
prestartEphemeralTasks.Max(poststopTasks)
prestartSidecarTasks.Add(prestartEphemeralTasks)
c.Flattened.Add(prestartSidecarTasks)
// Update the main lifecycle to reflect the largest fungible resource set
mainLifecycle.Max(prestartLifecycle)
mainLifecycle.Max(stopLifecycle)

// Add the fungible resources
c.Flattened.Add(mainLifecycle)

// Add network resources that are at the task group level
for _, network := range a.Shared.Networks {
Expand Down
122 changes: 120 additions & 2 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7763,6 +7763,124 @@ func TestAllocatedResources_Comparable_Flattened(t *testing.T) {
ci.Parallel(t)

allocationResources := AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
"poststop-task": {
Hook: TaskLifecycleHookPoststop,
Sidecar: false,
},
},
Tasks: map[string]*AllocatedTaskResources{
"prestart-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
},
},
"main-task": {
Cpu: AllocatedCpuResources{
CpuShares: 4000,
},
},
"poststop-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
},
},
}

// The output of Flattened should return the resource required during the execution of the largest lifecycle
must.Eq(t, 4000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 0, allocationResources.Comparable().Flattened.Cpu.ReservedCores)

allocationResources = AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
"prestart-sidecar-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: true,
},
"poststop-task": {
Hook: TaskLifecycleHookPoststop,
Sidecar: false,
},
},
Tasks: map[string]*AllocatedTaskResources{
"prestart-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
ReservedCores: []uint16{0},
},
},
"prestart-sidecar-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
ReservedCores: []uint16{1, 2},
},
},
"main-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
ReservedCores: []uint16{3, 4},
},
},
"poststop-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
ReservedCores: []uint16{5},
},
},
},
}

// Reserved core resources are claimed throughout the lifespan of the allocation
must.Eq(t, 6000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 6, allocationResources.Comparable().Flattened.Cpu.ReservedCores)

allocationResources = AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
"poststop-task": {
Hook: TaskLifecycleHookPoststop,
Sidecar: false,
},
},
Tasks: map[string]*AllocatedTaskResources{
"prestart-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
},
"main-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
ReservedCores: []uint16{1, 2},
},
},
"poststop-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
},
},
}

// Reserved core resources are claimed throughout the lifespan of the allocation,
// but the prestart and poststop task can reuse the CpuShares. It's important to
// note that we will only claim 1000 MHz as part of the share slice.
must.Eq(t, 3000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 2, allocationResources.Comparable().Flattened.Cpu.ReservedCores)

allocationResources = AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Expand Down Expand Up @@ -7816,8 +7934,8 @@ func TestAllocatedResources_Comparable_Flattened(t *testing.T) {
}

// The output of Flattened should return the resource required during the execution of the largest lifecycle
must.Eq(t, 5000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 5, allocationResources.Comparable().Flattened.Cpu.ReservedCores)
must.Eq(t, 9000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 9, allocationResources.Comparable().Flattened.Cpu.ReservedCores)
}

func requireErrors(t *testing.T, err error, expected ...string) {
Expand Down

0 comments on commit 997da25

Please sign in to comment.