Skip to content

Commit

Permalink
Introducing resource limits on Windows (#85)
Browse files Browse the repository at this point in the history
* Introducing resource limits on Windows

* Complete setting resource limits + add listing of limited processes

* Fix linux build

* Add a noop 'RemoveAllLimits' method on Windows

* Add tests for windows + fix handle permissions

* Set resource limit on actual execution

* Add test for job name config
  • Loading branch information
Maelkum authored Apr 29, 2023
1 parent c1b33fb commit e5f2d7d
Show file tree
Hide file tree
Showing 15 changed files with 434 additions and 23 deletions.
4 changes: 2 additions & 2 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func run() int {
}

defer func() {
err = limiter.RemoveAllLimits()
err = limiter.Shutdown()
if err != nil {
log.Error().Err(err).Msg("could not remove resource limtis")
log.Error().Err(err).Msg("could not shutdown resource limiter")
}
}()

Expand Down
7 changes: 5 additions & 2 deletions executor/execute_ux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ func (e *Executor) executeCommand(cmd *exec.Cmd) (execute.RuntimeOutput, execute
return execute.RuntimeOutput{}, execute.Usage{}, fmt.Errorf("could not start process: %w", err)
}

err = e.cfg.Limiter.LimitProcess(cmd.Process.Pid)
proc := execute.ProcessID{
PID: cmd.Process.Pid,
}
err = e.cfg.Limiter.LimitProcess(proc)
if err != nil {
return execute.RuntimeOutput{}, execute.Usage{}, fmt.Errorf("could not limit process: %w", err)
return execute.RuntimeOutput{}, execute.Usage{}, fmt.Errorf("could not set resource limits: %w", err)
}

// Return execution error with as much info below.
Expand Down
18 changes: 17 additions & 1 deletion executor/execute_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ func (e *Executor) executeCommand(cmd *exec.Cmd) (execute.RuntimeOutput, execute
// Create a duplicate handle - only for me (current process), not inheritable.
var handle windows.Handle
me := windows.CurrentProcess()
err = windows.DuplicateHandle(me, childHandle, me, &handle, windows.PROCESS_QUERY_INFORMATION, false, 0)
err = windows.DuplicateHandle(
me,
childHandle,
me,
&handle,
windows.PROCESS_QUERY_INFORMATION|windows.PROCESS_TERMINATE|windows.PROCESS_SET_QUOTA,
false,
0)
if err != nil {
return execute.RuntimeOutput{}, execute.Usage{}, fmt.Errorf("could not duplicate process handle: %w", err)
}
Expand All @@ -59,6 +66,15 @@ func (e *Executor) executeCommand(cmd *exec.Cmd) (execute.RuntimeOutput, execute
}
}()

proc := execute.ProcessID{
PID: cmd.Process.Pid,
Handle: uintptr(handle),
}
err = e.cfg.Limiter.LimitProcess(proc)
if err != nil {
return execute.RuntimeOutput{}, execute.Usage{}, fmt.Errorf("could not set resource limits: %w", err)
}

// Now we can safely wait for the child process to complete.
cmdErr := cmd.Wait()
end := time.Now()
Expand Down
6 changes: 5 additions & 1 deletion executor/limiter.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package executor

import (
"github.com/blocklessnetworking/b7s/models/execute"
)

// noopLimiter is a dummy limiter used when processes run without any resource limitations.
type noopLimiter struct{}

func (n *noopLimiter) LimitProcess(pid int) error {
func (n *noopLimiter) LimitProcess(proc execute.ProcessID) error {
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion executor/limits.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package executor

import (
"github.com/blocklessnetworking/b7s/models/execute"
)

type Limiter interface {
LimitProcess(pid int) error
LimitProcess(proc execute.ProcessID) error
ListProcesses() ([]int, error)
}
12 changes: 11 additions & 1 deletion executor/limits/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package limits
// DefaultConfig describes the default process resource limits.
var DefaultConfig = Config{
Cgroup: DefaultCgroup,
JobName: DefaultJobObjectName,
MemoryKB: -1,
CPUPercentage: DefaultCPUPercentage,
}

// Config represents the resource limits to set.
type Config struct {
Cgroup string // Cgroup to use for limits.
Cgroup string // On Linux, Cgroup to use for limits.
JobName string // On Windows, job object name to use for limits.

MemoryKB int64 // Maximum amount of memory allowed in kilobytes.
CPUPercentage float64 // Percentage of the CPU time allowed.
}
Expand All @@ -24,6 +27,13 @@ func WithCgroup(path string) Option {
}
}

// WithJobObjectName sets the name for the job object to be used for the jobs.
func WithJobObjectName(name string) Option {
return func(cfg *Config) {
cfg.JobName = name
}
}

// WithCPUPercentage sets the percentage of CPU time allowed.
func WithCPUPercentage(p float64) Option {
return func(cfg *Config) {
Expand Down
12 changes: 12 additions & 0 deletions executor/limits/config_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,15 @@ func TestConfig_WithMemoryKB(t *testing.T) {
WithMemoryKB(limit)(&cfg)
require.Equal(t, limit, cfg.MemoryKB)
}

func TestConfig_JobName(t *testing.T) {

const jobName = "blockless-test"

cfg := Config{
JobName: DefaultJobObjectName,
}

WithJobObjectName(jobName)(&cfg)
require.Equal(t, jobName, cfg.JobName)
}
164 changes: 164 additions & 0 deletions executor/limits/job_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
//go:build windows
// +build windows

package limits

import (
"fmt"
"strings"
"unsafe"

"golang.org/x/sys/windows"
)

const (
jobObjectBasicProcessIdListInformationClass = 3
)

const (
JOB_OBJECT_CPU_RATE_CONTROL_ENABLE = 0x1
// The job's CPU rate is a hard limit. After the job reaches its CPU cycle limit for the current scheduling interval,
// no threads associated with the job will run until the next interval.
// => See https://learn.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-jobobject_cpu_rate_control_information
JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP = 0x4

// errMoreData is returned by QueryInformationJobObject to notify us on memory needed to store the response.
// Unfortunately there's no Go error defined for it.
errMoreDataStr = "More data is available."
)

type jobObjectCPURateControlInformation struct {
ControlFlags uint32
CPURate uint32
}

type jobObjectExtendedLimitInformation struct {
BasicLimitInformation windows.JOBOBJECT_BASIC_LIMIT_INFORMATION
IoInfo ioCounters
ProcessMemoryLimit uintptr
JobMemoryLimit uintptr
PeakProcessMemoryUsed uintptr
PeakJobMemoryUsed uintptr
}

type ioCounters struct {
ReadOperationCount uint64
WriteOperationCount uint64
OtherOperationCount uint64
ReadTransferCount uint64
WriteTransferCount uint64
OtherTransferCount uint64
}

type jobObjectBasicProcessIdList struct {
NumberOfAssignedProcesses uint32
NumberOfProcessIDsInList uint32
ProcessIDList [1]uintptr
}

func setCPULimit(h windows.Handle, cpuRate float64) error {

// Specifies the portion of processor cycles that the threads in a job object can use during each scheduling interval, as the number of cycles per 10,000 cycles.
// Set CpuRate to a percentage times 100. For example, to let the job use 20% of the CPU, set CpuRate to 20 times 100, or 2,000.
// => See https://learn.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-jobobject_cpu_rate_control_information

info := &jobObjectCPURateControlInformation{
ControlFlags: JOB_OBJECT_CPU_RATE_CONTROL_ENABLE | JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP,

// Convert rate from e.g. 0.8 to 80(%) * 100.
CPURate: uint32((100 * cpuRate) * 100),
}

_, err := windows.SetInformationJobObject(
h,
windows.JobObjectCpuRateControlInformation,
uintptr(unsafe.Pointer(info)),
uint32(unsafe.Sizeof(*info)),
)
if err != nil {
return fmt.Errorf("could not set CPU limit for job: %w", err)
}

return nil
}

func setMemLimit(h windows.Handle, memoryKB int64) error {

info := &jobObjectExtendedLimitInformation{
BasicLimitInformation: windows.JOBOBJECT_BASIC_LIMIT_INFORMATION{
LimitFlags: windows.JOB_OBJECT_LIMIT_JOB_MEMORY,
},
JobMemoryLimit: uintptr(memoryKB * 1000),
}

_, err := windows.SetInformationJobObject(
h,
windows.JobObjectExtendedLimitInformation,
uintptr(unsafe.Pointer(info)),
uint32(unsafe.Sizeof(*info)),
)
if err != nil {
return fmt.Errorf("could not set memory limit for job: %w", err)
}

return nil
}

func getJobObjectPids(h windows.Handle) ([]int, error) {

var info jobObjectBasicProcessIdList

err := windows.QueryInformationJobObject(
h,
jobObjectBasicProcessIdListInformationClass,
uintptr(unsafe.Pointer(&info)),
uint32(unsafe.Sizeof(info)),
nil,
)
if err == nil {
if info.NumberOfProcessIDsInList == 1 {

pids := []int{
int(info.ProcessIDList[0]),
}

return pids, nil
}

return []int{}, nil
}
if err != nil && !errIsMoreData(err) {
return nil, fmt.Errorf("could not list job object processes: %w", err)
}

bufSize := unsafe.Sizeof(info) + (unsafe.Sizeof(info.ProcessIDList[0]) * uintptr(info.NumberOfAssignedProcesses-1))
buf := make([]byte, bufSize)

err = windows.QueryInformationJobObject(
h,
jobObjectBasicProcessIdListInformationClass,
uintptr(unsafe.Pointer(&buf[0])),
uint32(len(buf)),
nil,
)
if err != nil {
return nil, fmt.Errorf("could not list job object processes: %w", err)
}

bufInfo := (*jobObjectBasicProcessIdList)(unsafe.Pointer(&buf[0]))

// Some dark sorcery ported from the MS `Host Compute Service Shim` library.
// => See `AllPids` method over at https://github.com/microsoft/hcsshim/blob/main/internal/winapi/jobobject.go#L101
pidList := (*[(1 << 27) - 1]uintptr)(unsafe.Pointer(&bufInfo.ProcessIDList[0]))[:bufInfo.NumberOfProcessIDsInList:bufInfo.NumberOfProcessIDsInList]

out := make([]int, 0, bufInfo.NumberOfProcessIDsInList)
for _, pid := range pidList {
out = append(out, int(pid))
}

return out, nil
}

func errIsMoreData(err error) bool {
return strings.Contains(err.Error(), errMoreDataStr)
}
9 changes: 7 additions & 2 deletions executor/limits/limits_gen.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build !linux
// +build !linux
//go:build !linux && !windows
// +build !linux,!windows

package limits

Expand Down Expand Up @@ -32,3 +32,8 @@ func (l *Limits) ListProcesses() ([]int, error) {
func (l *Limits) RemoveAllLimits() error {
return errors.New("TBD: not implemented")
}

// Close will close the limiter.
func (l *Limits) Close() error {
return errors.New("TBD: not implemented")
}
9 changes: 6 additions & 3 deletions executor/limits/limits_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup2"

"github.com/blocklessnetworking/b7s/models/execute"
)

// TODO: Add support for cgroups v1 - determine on the fly which version to use
Expand Down Expand Up @@ -56,8 +58,9 @@ func New(opts ...Option) (*Limits, error) {
}

// LimitProcess will set the resource limits for the process with the given PID.
func (l *Limits) LimitProcess(pid int) error {
func (l *Limits) LimitProcess(proc execute.ProcessID) error {

pid := proc.PID
err := l.cgroup.AddProc(uint64(pid))
if err != nil {
return fmt.Errorf("could not set resouce limit for process (pid: %v): %w", pid, err)
Expand All @@ -82,8 +85,8 @@ func (l *Limits) ListProcesses() ([]int, error) {
return list, nil
}

// RemoveAllLimits will remove any set resource limits.
func (l *Limits) RemoveAllLimits() error {
// Shutdown will remove any set resource limits.
func (l *Limits) Shutdown() error {

// Remove all limits effectively sets them to very large values, which is different from "removing" them.
period := uint64(time.Second.Microseconds())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build limits
// +build limits
//go:build limits && linux
// +build limits,linux

package limits_test

Expand All @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/blocklessnetworking/b7s/executor/limits"
"github.com/blocklessnetworking/b7s/models/execute"
)

const (
Expand All @@ -37,9 +38,9 @@ func TestLimits(t *testing.T) {
)
require.NoError(t, err)

// Always remove all resource limits.
// Always remove all resource limits on end of test.
defer func() {
err = limiter.RemoveAllLimits()
err = limiter.Shutdown()
require.NoError(t, err)
}()

Expand All @@ -53,18 +54,20 @@ func TestLimits(t *testing.T) {

// Put resource limit on self.
// This is effectively a limit on go test so we're conservative with limits.
pid := os.Getpid()
err = limiter.LimitProcess(pid)
proc := execute.ProcessID{
PID: os.Getpid(),
}
err = limiter.LimitProcess(proc)
require.NoError(t, err)

// Verify list of limited processes now has a single process.
pids, err = limiter.ListProcesses()
require.NoError(t, err)
require.Len(t, pids, 1)
require.Equal(t, pids[0], pid)
require.Equal(t, pids[0], proc.PID)

// Manually verify the PID limit.
verifyPids(t, cgroup, []int{pid})
verifyPids(t, cgroup, []int{proc.PID})
}

func verifyCPULImit(t *testing.T, cgroup string, limit float64) {
Expand Down
Loading

0 comments on commit e5f2d7d

Please sign in to comment.