From 507a5b5e2f6d57c91d965a8b248f98f957e5cc84 Mon Sep 17 00:00:00 2001 From: LY-today <724102053@qq.com> Date: Fri, 20 Dec 2024 17:02:57 +0800 Subject: [PATCH] feat: add noderesourcefitplus and scarceresourceavoidance scheduler plugis Signed-off-by: LY-today <724102053@qq.com> --- cmd/koord-scheduler/main.go | 18 +- pkg/scheduler/apis/config/register.go | 2 + pkg/scheduler/apis/config/types.go | 23 ++ pkg/scheduler/apis/config/v1beta3/register.go | 2 + pkg/scheduler/apis/config/v1beta3/types.go | 23 ++ .../config/v1beta3/zz_generated.conversion.go | 92 ++++++ .../config/v1beta3/zz_generated.deepcopy.go | 78 +++++ .../apis/config/zz_generated.deepcopy.go | 78 +++++ .../node_resources_fit_plus.go | 171 +++++++++++ .../node_resources_fit_plus_test.go | 287 ++++++++++++++++++ .../scarce_resource_avoidance.go | 175 +++++++++++ .../scarce_resource_avoidance_test.go | 250 +++++++++++++++ 12 files changed, 1192 insertions(+), 7 deletions(-) create mode 100644 pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go create mode 100644 pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go create mode 100644 pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go create mode 100644 pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go diff --git a/cmd/koord-scheduler/main.go b/cmd/koord-scheduler/main.go index 214509e41..bc77d53d5 100644 --- a/cmd/koord-scheduler/main.go +++ b/cmd/koord-scheduler/main.go @@ -31,7 +31,9 @@ import ( "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/elasticquota" "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/loadaware" "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/nodenumaresource" + noderesourcesfitplus "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/noderesourcefitplus" "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/reservation" + "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/scarceresourceavoidance" // Ensure metric package is initialized _ "k8s.io/component-base/metrics/prometheus/clientgo" @@ -40,13 +42,15 @@ import ( ) var koordinatorPlugins = map[string]frameworkruntime.PluginFactory{ - loadaware.Name: loadaware.New, - nodenumaresource.Name: nodenumaresource.New, - reservation.Name: reservation.New, - coscheduling.Name: coscheduling.New, - deviceshare.Name: deviceshare.New, - elasticquota.Name: elasticquota.New, - defaultprebind.Name: defaultprebind.New, + loadaware.Name: loadaware.New, + nodenumaresource.Name: nodenumaresource.New, + reservation.Name: reservation.New, + coscheduling.Name: coscheduling.New, + deviceshare.Name: deviceshare.New, + elasticquota.Name: elasticquota.New, + defaultprebind.Name: defaultprebind.New, + noderesourcesfitplus.Name: noderesourcesfitplus.New, + scarceresourceavoidance.Name: scarceresourceavoidance.New, } func flatten(plugins map[string]frameworkruntime.PluginFactory) []app.Option { diff --git a/pkg/scheduler/apis/config/register.go b/pkg/scheduler/apis/config/register.go index 312a9f6a4..6e165a0ab 100644 --- a/pkg/scheduler/apis/config/register.go +++ b/pkg/scheduler/apis/config/register.go @@ -40,6 +40,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ElasticQuotaArgs{}, &CoschedulingArgs{}, &DeviceShareArgs{}, + &NodeResourcesFitPlusArgs{}, + &ScarceResourceAvoidanceArgs{}, ) return nil } diff --git a/pkg/scheduler/apis/config/types.go b/pkg/scheduler/apis/config/types.go index 39ae2ca3b..e09fd0db2 100644 --- a/pkg/scheduler/apis/config/types.go +++ b/pkg/scheduler/apis/config/types.go @@ -19,9 +19,11 @@ package config import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/apis/config" schedconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "github.com/koordinator-sh/koordinator/apis/extension" + v1 "k8s.io/api/core/v1" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -237,3 +239,24 @@ type DeviceShareArgs struct { // DisableDeviceNUMATopologyAlignment indicates device don't need to align with other resources' numa topology DisableDeviceNUMATopologyAlignment bool } + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ScarceResourceAvoidanceArgs defines the parameters for ScarceResourceAvoidance plugin. +type ScarceResourceAvoidanceArgs struct { + metav1.TypeMeta + Resources []v1.ResourceName +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NodeResourcesFitPlusArgs defines the parameters for NodeResourcesFitPlus plugin. +type NodeResourcesFitPlusArgs struct { + metav1.TypeMeta + Resources map[v1.ResourceName]ResourcesType +} + +type ResourcesType struct { + Type config.ScoringStrategyType + Weight int64 +} diff --git a/pkg/scheduler/apis/config/v1beta3/register.go b/pkg/scheduler/apis/config/v1beta3/register.go index 54ebd3936..cb821860a 100644 --- a/pkg/scheduler/apis/config/v1beta3/register.go +++ b/pkg/scheduler/apis/config/v1beta3/register.go @@ -40,6 +40,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ElasticQuotaArgs{}, &CoschedulingArgs{}, &DeviceShareArgs{}, + &NodeResourcesFitPlusArgs{}, + &ScarceResourceAvoidanceArgs{}, ) return nil } diff --git a/pkg/scheduler/apis/config/v1beta3/types.go b/pkg/scheduler/apis/config/v1beta3/types.go index 36a872864..74803030a 100644 --- a/pkg/scheduler/apis/config/v1beta3/types.go +++ b/pkg/scheduler/apis/config/v1beta3/types.go @@ -22,6 +22,8 @@ import ( schedconfigv1beta3 "k8s.io/kube-scheduler/config/v1beta3" "github.com/koordinator-sh/koordinator/apis/extension" + v1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/scheduler/apis/config" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -232,3 +234,24 @@ type DeviceShareArgs struct { // DisableDeviceNUMATopologyAlignment indicates device don't need to align with other resources' numa topology DisableDeviceNUMATopologyAlignment bool `json:"disableDeviceNUMATopologyAlignment,omitempty"` } + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ScarceResourceAvoidanceArgs defines the parameters for ScarceResourceAvoidance plugin. +type ScarceResourceAvoidanceArgs struct { + metav1.TypeMeta + Resources []v1.ResourceName `json:"resources,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NodeResourcesFitPlusArgs defines the parameters for NodeResourcesFitPlus plugin. +type NodeResourcesFitPlusArgs struct { + metav1.TypeMeta + Resources map[v1.ResourceName]ResourcesType `json:"resources"` +} + +type ResourcesType struct { + Type config.ScoringStrategyType `json:"type"` + Weight int64 `json:"weight"` +} diff --git a/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go index 5b9b56c37..f0d35f078 100644 --- a/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go @@ -96,6 +96,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*NodeResourcesFitPlusArgs)(nil), (*config.NodeResourcesFitPlusArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(a.(*NodeResourcesFitPlusArgs), b.(*config.NodeResourcesFitPlusArgs), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.NodeResourcesFitPlusArgs)(nil), (*NodeResourcesFitPlusArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_NodeResourcesFitPlusArgs_To_v1beta3_NodeResourcesFitPlusArgs(a.(*config.NodeResourcesFitPlusArgs), b.(*NodeResourcesFitPlusArgs), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*ReservationArgs)(nil), (*config.ReservationArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta3_ReservationArgs_To_config_ReservationArgs(a.(*ReservationArgs), b.(*config.ReservationArgs), scope) }); err != nil { @@ -106,6 +116,26 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*ResourcesType)(nil), (*config.ResourcesType)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta3_ResourcesType_To_config_ResourcesType(a.(*ResourcesType), b.(*config.ResourcesType), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.ResourcesType)(nil), (*ResourcesType)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_ResourcesType_To_v1beta3_ResourcesType(a.(*config.ResourcesType), b.(*ResourcesType), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*ScarceResourceAvoidanceArgs)(nil), (*config.ScarceResourceAvoidanceArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(a.(*ScarceResourceAvoidanceArgs), b.(*config.ScarceResourceAvoidanceArgs), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.ScarceResourceAvoidanceArgs)(nil), (*ScarceResourceAvoidanceArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_ScarceResourceAvoidanceArgs_To_v1beta3_ScarceResourceAvoidanceArgs(a.(*config.ScarceResourceAvoidanceArgs), b.(*ScarceResourceAvoidanceArgs), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*ScoringStrategy)(nil), (*config.ScoringStrategy)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta3_ScoringStrategy_To_config_ScoringStrategy(a.(*ScoringStrategy), b.(*config.ScoringStrategy), scope) }); err != nil { @@ -355,6 +385,26 @@ func Convert_config_NodeNUMAResourceArgs_To_v1beta3_NodeNUMAResourceArgs(in *con return autoConvert_config_NodeNUMAResourceArgs_To_v1beta3_NodeNUMAResourceArgs(in, out, s) } +func autoConvert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(in *NodeResourcesFitPlusArgs, out *config.NodeResourcesFitPlusArgs, s conversion.Scope) error { + out.Resources = *(*map[corev1.ResourceName]config.ResourcesType)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs is an autogenerated conversion function. +func Convert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(in *NodeResourcesFitPlusArgs, out *config.NodeResourcesFitPlusArgs, s conversion.Scope) error { + return autoConvert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(in, out, s) +} + +func autoConvert_config_NodeResourcesFitPlusArgs_To_v1beta3_NodeResourcesFitPlusArgs(in *config.NodeResourcesFitPlusArgs, out *NodeResourcesFitPlusArgs, s conversion.Scope) error { + out.Resources = *(*map[corev1.ResourceName]ResourcesType)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_config_NodeResourcesFitPlusArgs_To_v1beta3_NodeResourcesFitPlusArgs is an autogenerated conversion function. +func Convert_config_NodeResourcesFitPlusArgs_To_v1beta3_NodeResourcesFitPlusArgs(in *config.NodeResourcesFitPlusArgs, out *NodeResourcesFitPlusArgs, s conversion.Scope) error { + return autoConvert_config_NodeResourcesFitPlusArgs_To_v1beta3_NodeResourcesFitPlusArgs(in, out, s) +} + func autoConvert_v1beta3_ReservationArgs_To_config_ReservationArgs(in *ReservationArgs, out *config.ReservationArgs, s conversion.Scope) error { if err := v1.Convert_Pointer_bool_To_bool(&in.EnablePreemption, &out.EnablePreemption, s); err != nil { return err @@ -391,6 +441,48 @@ func Convert_config_ReservationArgs_To_v1beta3_ReservationArgs(in *config.Reserv return autoConvert_config_ReservationArgs_To_v1beta3_ReservationArgs(in, out, s) } +func autoConvert_v1beta3_ResourcesType_To_config_ResourcesType(in *ResourcesType, out *config.ResourcesType, s conversion.Scope) error { + out.Type = apisconfig.ScoringStrategyType(in.Type) + out.Weight = in.Weight + return nil +} + +// Convert_v1beta3_ResourcesType_To_config_ResourcesType is an autogenerated conversion function. +func Convert_v1beta3_ResourcesType_To_config_ResourcesType(in *ResourcesType, out *config.ResourcesType, s conversion.Scope) error { + return autoConvert_v1beta3_ResourcesType_To_config_ResourcesType(in, out, s) +} + +func autoConvert_config_ResourcesType_To_v1beta3_ResourcesType(in *config.ResourcesType, out *ResourcesType, s conversion.Scope) error { + out.Type = apisconfig.ScoringStrategyType(in.Type) + out.Weight = in.Weight + return nil +} + +// Convert_config_ResourcesType_To_v1beta3_ResourcesType is an autogenerated conversion function. +func Convert_config_ResourcesType_To_v1beta3_ResourcesType(in *config.ResourcesType, out *ResourcesType, s conversion.Scope) error { + return autoConvert_config_ResourcesType_To_v1beta3_ResourcesType(in, out, s) +} + +func autoConvert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(in *ScarceResourceAvoidanceArgs, out *config.ScarceResourceAvoidanceArgs, s conversion.Scope) error { + out.Resources = *(*[]corev1.ResourceName)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs is an autogenerated conversion function. +func Convert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(in *ScarceResourceAvoidanceArgs, out *config.ScarceResourceAvoidanceArgs, s conversion.Scope) error { + return autoConvert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(in, out, s) +} + +func autoConvert_config_ScarceResourceAvoidanceArgs_To_v1beta3_ScarceResourceAvoidanceArgs(in *config.ScarceResourceAvoidanceArgs, out *ScarceResourceAvoidanceArgs, s conversion.Scope) error { + out.Resources = *(*[]corev1.ResourceName)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_config_ScarceResourceAvoidanceArgs_To_v1beta3_ScarceResourceAvoidanceArgs is an autogenerated conversion function. +func Convert_config_ScarceResourceAvoidanceArgs_To_v1beta3_ScarceResourceAvoidanceArgs(in *config.ScarceResourceAvoidanceArgs, out *ScarceResourceAvoidanceArgs, s conversion.Scope) error { + return autoConvert_config_ScarceResourceAvoidanceArgs_To_v1beta3_ScarceResourceAvoidanceArgs(in, out, s) +} + func autoConvert_v1beta3_ScoringStrategy_To_config_ScoringStrategy(in *ScoringStrategy, out *config.ScoringStrategy, s conversion.Scope) error { out.Type = config.ScoringStrategyType(in.Type) out.Resources = *(*[]apisconfig.ResourceSpec)(unsafe.Pointer(&in.Resources)) diff --git a/pkg/scheduler/apis/config/v1beta3/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/v1beta3/zz_generated.deepcopy.go index ae708e93a..61b213ca3 100644 --- a/pkg/scheduler/apis/config/v1beta3/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/v1beta3/zz_generated.deepcopy.go @@ -313,6 +313,38 @@ func (in *NodeNUMAResourceArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeResourcesFitPlusArgs) DeepCopyInto(out *NodeResourcesFitPlusArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make(map[corev1.ResourceName]ResourcesType, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeResourcesFitPlusArgs. +func (in *NodeResourcesFitPlusArgs) DeepCopy() *NodeResourcesFitPlusArgs { + if in == nil { + return nil + } + out := new(NodeResourcesFitPlusArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeResourcesFitPlusArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReservationArgs) DeepCopyInto(out *ReservationArgs) { *out = *in @@ -353,6 +385,52 @@ func (in *ReservationArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResourcesType) DeepCopyInto(out *ResourcesType) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourcesType. +func (in *ResourcesType) DeepCopy() *ResourcesType { + if in == nil { + return nil + } + out := new(ResourcesType) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScarceResourceAvoidanceArgs) DeepCopyInto(out *ScarceResourceAvoidanceArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make([]corev1.ResourceName, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScarceResourceAvoidanceArgs. +func (in *ScarceResourceAvoidanceArgs) DeepCopy() *ScarceResourceAvoidanceArgs { + if in == nil { + return nil + } + out := new(ScarceResourceAvoidanceArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ScarceResourceAvoidanceArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ScoringStrategy) DeepCopyInto(out *ScoringStrategy) { *out = *in diff --git a/pkg/scheduler/apis/config/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/zz_generated.deepcopy.go index 97dac7428..fd010dfb2 100644 --- a/pkg/scheduler/apis/config/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/zz_generated.deepcopy.go @@ -257,6 +257,38 @@ func (in *NodeNUMAResourceArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeResourcesFitPlusArgs) DeepCopyInto(out *NodeResourcesFitPlusArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make(map[v1.ResourceName]ResourcesType, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeResourcesFitPlusArgs. +func (in *NodeResourcesFitPlusArgs) DeepCopy() *NodeResourcesFitPlusArgs { + if in == nil { + return nil + } + out := new(NodeResourcesFitPlusArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeResourcesFitPlusArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReservationArgs) DeepCopyInto(out *ReservationArgs) { *out = *in @@ -282,6 +314,52 @@ func (in *ReservationArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResourcesType) DeepCopyInto(out *ResourcesType) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourcesType. +func (in *ResourcesType) DeepCopy() *ResourcesType { + if in == nil { + return nil + } + out := new(ResourcesType) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScarceResourceAvoidanceArgs) DeepCopyInto(out *ScarceResourceAvoidanceArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make([]v1.ResourceName, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScarceResourceAvoidanceArgs. +func (in *ScarceResourceAvoidanceArgs) DeepCopy() *ScarceResourceAvoidanceArgs { + if in == nil { + return nil + } + out := new(ScarceResourceAvoidanceArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ScarceResourceAvoidanceArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ScoringStrategy) DeepCopyInto(out *ScoringStrategy) { *out = *in diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go new file mode 100644 index 000000000..ad8837c8d --- /dev/null +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go @@ -0,0 +1,171 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package noderesourcesfitplus + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/api/v1/resource" + k8sConfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/framework" + plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" +) + +const ( + // Name is plugin name + Name = "NodeResourcesFitPlus" +) + +var ( + _ framework.ScorePlugin = &Plugin{} +) + +type Plugin struct { + handle framework.Handle + args *config.NodeResourcesFitPlusArgs +} + +func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { + + sampleArgs2, ok := args.(*config.NodeResourcesFitPlusArgs) + + if !ok { + return nil, fmt.Errorf("want args to be of type NodeResourcesArgs, got %T", args) + } + + return &Plugin{ + handle: handle, + args: sampleArgs2, + }, nil +} + +func (s *Plugin) Name() string { + return Name +} + +func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { + + nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + + var nodeScore int64 + var weightSum int64 + + podRequest, _ := fitsRequest(computePodResourceRequest(p).Resource, nodeInfo) + + for _, requestSourceName := range podRequest { + v, ok := s.args.Resources[requestSourceName] + if !ok { + continue + } + fit, err := noderesources.NewFit( + &k8sConfig.NodeResourcesFitArgs{ + ScoringStrategy: &k8sConfig.ScoringStrategy{ + Type: v.Type, // MostAllocated or LeastAllocated + Resources: []k8sConfig.ResourceSpec{ + {Name: string(requestSourceName), Weight: 1}, + }, + }, + }, s.handle, plfeature.Features{}) + + if err != nil { + return 0, framework.NewStatus(framework.Error, err.Error()) + } + + resourceScore, status := fit.(framework.ScorePlugin).Score(ctx, state, p, nodeName) + if !status.IsSuccess() { + return 0, framework.NewStatus(framework.Error, err.Error()) + } + + nodeScore += resourceScore * v.Weight + weightSum += v.Weight + } + + if weightSum == 0 { + return framework.MaxNodeScore, framework.NewStatus(framework.Success, "") + } + scores := nodeScore / weightSum + + return scores, framework.NewStatus(framework.Success, "") +} + +func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +type preFilterState struct { + framework.Resource +} + +func computePodResourceRequest(pod *v1.Pod) *preFilterState { + // pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled + reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) + result := &preFilterState{} + result.SetMaxResource(reqs) + return result +} + +func fitsRequest(podRequest framework.Resource, nodeInfo *framework.NodeInfo) ([]v1.ResourceName, []v1.ResourceName) { + var podRequestResource []v1.ResourceName + var nodeRequestResource []v1.ResourceName + + if podRequest.MilliCPU > 0 { + podRequestResource = append(podRequestResource, v1.ResourceCPU) + } + + if nodeInfo.Allocatable.MilliCPU > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceCPU) + } + + if podRequest.Memory > 0 { + podRequestResource = append(podRequestResource, v1.ResourceMemory) + } + + if nodeInfo.Allocatable.Memory > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceMemory) + } + + if podRequest.EphemeralStorage > 0 { + podRequestResource = append(podRequestResource, v1.ResourceEphemeralStorage) + } + + if nodeInfo.Allocatable.EphemeralStorage > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceEphemeralStorage) + } + + for rName, rQuant := range podRequest.ScalarResources { + if rQuant > 0 { + podRequestResource = append(podRequestResource, rName) + } + } + + for rName, rQuant := range nodeInfo.Allocatable.ScalarResources { + if rQuant > 0 { + nodeRequestResource = append(nodeRequestResource, rName) + } + } + + return podRequestResource, nodeRequestResource +} diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go new file mode 100644 index 000000000..0ee87ee2b --- /dev/null +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go @@ -0,0 +1,287 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package noderesourcesfitplus + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" + k8sConfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" + + koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" + koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake" + koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta3" + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" +) + +var _ framework.SharedLister = &testSharedLister{} + +type testSharedLister struct { + nodes []*corev1.Node + nodeInfos []*framework.NodeInfo + nodeInfoMap map[string]*framework.NodeInfo +} + +func newTestSharedLister(pods []*corev1.Pod, nodes []*corev1.Node) *testSharedLister { + nodeInfoMap := make(map[string]*framework.NodeInfo) + nodeInfos := make([]*framework.NodeInfo, 0) + for _, pod := range pods { + nodeName := pod.Spec.NodeName + if _, ok := nodeInfoMap[nodeName]; !ok { + nodeInfoMap[nodeName] = framework.NewNodeInfo() + } + nodeInfoMap[nodeName].AddPod(pod) + } + for _, node := range nodes { + if _, ok := nodeInfoMap[node.Name]; !ok { + nodeInfoMap[node.Name] = framework.NewNodeInfo() + } + nodeInfoMap[node.Name].SetNode(node) + } + + for _, v := range nodeInfoMap { + nodeInfos = append(nodeInfos, v) + } + + return &testSharedLister{ + nodes: nodes, + nodeInfos: nodeInfos, + nodeInfoMap: nodeInfoMap, + } +} + +type PredicateClientSetAndHandle struct { + frameworkext.ExtendedHandle + koordinatorClientSet koordinatorclientset.Interface + koordInformerFactory koordinatorinformers.SharedInformerFactory +} + +func NodeResourcesPluginFactoryProxy(factoryFn frameworkruntime.PluginFactory, plugin *framework.Plugin) frameworkruntime.PluginFactory { + return func(args apiruntime.Object, handle framework.Handle) (framework.Plugin, error) { + koordClient := koordfake.NewSimpleClientset() + koordInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClient, 0) + extenderFactory, err := frameworkext.NewFrameworkExtenderFactory( + frameworkext.WithKoordinatorClientSet(koordClient), + frameworkext.WithKoordinatorSharedInformerFactory(koordInformerFactory)) + if err != nil { + return nil, err + } + extender := extenderFactory.NewFrameworkExtender(handle.(framework.Framework)) + *plugin, err = factoryFn(args, &PredicateClientSetAndHandle{ + ExtendedHandle: extender, + koordinatorClientSet: koordClient, + koordInformerFactory: koordInformerFactory, + }) + return *plugin, err + } +} + +func TestPlugin_Score(t *testing.T) { + + var v1beta2args v1beta3.NodeResourcesFitPlusArgs + v1beta2args.Resources = map[v1.ResourceName]v1beta3.ResourcesType{ + "nvidia.com/gpu": {Type: k8sConfig.MostAllocated, Weight: 2}, + "cpu": {Type: k8sConfig.LeastAllocated, Weight: 1}, + "memory": {Type: k8sConfig.LeastAllocated, Weight: 1}, + } + + var nodeResourcesFitPlusArgs config.NodeResourcesFitPlusArgs + err := v1beta3.Convert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(&v1beta2args, &nodeResourcesFitPlusArgs, nil) + assert.NoError(t, err) + + var ptplugin framework.Plugin + proxyNew := NodeResourcesPluginFactoryProxy(New, &ptplugin) + + cs := kubefake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode2", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + "xx.xx/xx": resource.MustParse("8"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + "xx.xx/xx": resource.MustParse("8"), + }, + }, + }, + } + + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-pod-0", + }, + Spec: corev1.PodSpec{ + NodeName: "testNode1", + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("4"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("4"), + }, + }, + }, + }, + }, + }, + } + registeredPlugins := []schedulertesting.RegisterPluginFunc{ + schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + } + + snapshot := newTestSharedLister(pods, nodes) + fh, err := schedulertesting.NewFramework(context.TODO(), registeredPlugins, "koord-scheduler", + frameworkruntime.WithClientSet(cs), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + ) + assert.Nil(t, err) + + p, err := proxyNew(&nodeResourcesFitPlusArgs, fh) + p.Name() + assert.NotNil(t, p) + assert.Nil(t, err) + plug := p.(*Plugin) + h := plug.handle.(*PredicateClientSetAndHandle) + + informerFactory.Start(context.TODO().Done()) + informerFactory.WaitForCacheSync(context.TODO().Done()) + + h.koordInformerFactory.Start(context.TODO().Done()) + h.koordInformerFactory.WaitForCacheSync(context.TODO().Done()) + + cycleState := framework.NewCycleState() + + nodeInfo, err := snapshot.Get("testNode1") + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + nodeInfo, err = snapshot.Get("testNode2") + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-pod-1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("2"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("2"), + }, + }, + }, + }, + }, + } + scoreNode1, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode1") + scoreNode2, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode2") + if scoreNode1 <= scoreNode2 { + t.Fatal("scoreNode1 must <= scoreNode2") + } +} + +func (f *testSharedLister) StorageInfos() framework.StorageInfoLister { + return f +} + +func (f *testSharedLister) IsPVCUsedByPods(key string) bool { + return false +} + +func (f *testSharedLister) NodeInfos() framework.NodeInfoLister { + return f +} + +func (f *testSharedLister) List() ([]*framework.NodeInfo, error) { + return f.nodeInfos, nil +} + +func (f *testSharedLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) { + return f.nodeInfoMap[nodeName], nil +} diff --git a/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go new file mode 100644 index 000000000..487cd7221 --- /dev/null +++ b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go @@ -0,0 +1,175 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scarceresourceavoidance + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/api/v1/resource" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" +) + +const ( + // Name is plugin name + Name = "ScarceResourceAvoidance" +) + +var ( + _ framework.ScorePlugin = &Plugin{} +) + +type Plugin struct { + handle framework.Handle + args *config.ScarceResourceAvoidanceArgs +} + +func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { + + sampleArgs2, ok := args.(*config.ScarceResourceAvoidanceArgs) + + if !ok { + return nil, fmt.Errorf("want args to be of type ResourceTypesArgs, got %T", args) + } + + return &Plugin{ + handle: handle, + args: sampleArgs2, + }, nil +} + +func (s *Plugin) Name() string { + return Name +} + +func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + + podRequest := computePodResourceRequest(p) + podRequestResource, nodeAllocatableResource := fitsRequest(podRequest.Resource, nodeInfo) + diffNames := difference(nodeAllocatableResource, podRequestResource) + intersectNames := intersection(diffNames, s.args.Resources) + + if len(diffNames) == 0 || len(intersectNames) == 0 { + return framework.MaxNodeScore, framework.NewStatus(framework.Success, "") + } + scores := resourceTypesScore(int64(len(intersectNames)), int64(len(diffNames))) + + return scores, framework.NewStatus(framework.Success, "") +} + +func intersection(slice1, slice2 []v1.ResourceName) []v1.ResourceName { + m := make(map[v1.ResourceName]struct{}) + result := []v1.ResourceName{} + + for _, v := range slice2 { + m[v] = struct{}{} + } + + for _, v := range slice1 { + if _, found := m[v]; found { + result = append(result, v) + } + } + + return result +} + +func difference(slice1, slice2 []v1.ResourceName) []v1.ResourceName { + var result []v1.ResourceName + m := make(map[v1.ResourceName]struct{}) + for _, v := range slice2 { + m[v] = struct{}{} + } + + for _, v := range slice1 { + if _, found := m[v]; !found { + result = append(result, v) + } + } + + return result +} + +func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +type preFilterState struct { + framework.Resource +} + +func computePodResourceRequest(pod *v1.Pod) *preFilterState { + // pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled + reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) + result := &preFilterState{} + result.SetMaxResource(reqs) + return result +} + +func fitsRequest(podRequest framework.Resource, nodeInfo *framework.NodeInfo) ([]v1.ResourceName, []v1.ResourceName) { + var podRequestResource []v1.ResourceName + var nodeRequestResource []v1.ResourceName + + if podRequest.MilliCPU > 0 { + podRequestResource = append(podRequestResource, v1.ResourceCPU) + } + + if nodeInfo.Allocatable.MilliCPU > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceCPU) + } + + if podRequest.Memory > 0 { + podRequestResource = append(podRequestResource, v1.ResourceMemory) + } + + if nodeInfo.Allocatable.Memory > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceMemory) + } + + if podRequest.EphemeralStorage > 0 { + podRequestResource = append(podRequestResource, v1.ResourceEphemeralStorage) + } + + if nodeInfo.Allocatable.EphemeralStorage > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceEphemeralStorage) + } + + for rName, rQuant := range podRequest.ScalarResources { + if rQuant > 0 { + podRequestResource = append(podRequestResource, rName) + } + } + + for rName, rQuant := range nodeInfo.Allocatable.ScalarResources { + if rQuant > 0 { + nodeRequestResource = append(nodeRequestResource, rName) + } + } + + return podRequestResource, nodeRequestResource +} +func resourceTypesScore(requestsSourcesNum, allocatablesSourcesNum int64) int64 { + return (allocatablesSourcesNum - requestsSourcesNum) * framework.MaxNodeScore / allocatablesSourcesNum +} diff --git a/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go new file mode 100644 index 000000000..e5b2f9822 --- /dev/null +++ b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go @@ -0,0 +1,250 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scarceresourceavoidance + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" + + koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" + koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake" + koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta3" + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" +) + +var _ framework.SharedLister = &testSharedLister{} + +type testSharedLister struct { + nodes []*corev1.Node + nodeInfos []*framework.NodeInfo + nodeInfoMap map[string]*framework.NodeInfo +} + +func newTestSharedLister(pods []*corev1.Pod, nodes []*corev1.Node) *testSharedLister { + nodeInfoMap := make(map[string]*framework.NodeInfo) + nodeInfos := make([]*framework.NodeInfo, 0) + for _, pod := range pods { + nodeName := pod.Spec.NodeName + if _, ok := nodeInfoMap[nodeName]; !ok { + nodeInfoMap[nodeName] = framework.NewNodeInfo() + } + nodeInfoMap[nodeName].AddPod(pod) + } + for _, node := range nodes { + if _, ok := nodeInfoMap[node.Name]; !ok { + nodeInfoMap[node.Name] = framework.NewNodeInfo() + } + nodeInfoMap[node.Name].SetNode(node) + } + + for _, v := range nodeInfoMap { + nodeInfos = append(nodeInfos, v) + } + + return &testSharedLister{ + nodes: nodes, + nodeInfos: nodeInfos, + nodeInfoMap: nodeInfoMap, + } +} + +type PredicateClientSetAndHandle struct { + frameworkext.ExtendedHandle + koordinatorClientSet koordinatorclientset.Interface + koordInformerFactory koordinatorinformers.SharedInformerFactory +} + +func NodeResourcesPluginFactoryProxy(factoryFn frameworkruntime.PluginFactory, plugin *framework.Plugin) frameworkruntime.PluginFactory { + return func(args apiruntime.Object, handle framework.Handle) (framework.Plugin, error) { + koordClient := koordfake.NewSimpleClientset() + koordInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClient, 0) + extenderFactory, err := frameworkext.NewFrameworkExtenderFactory( + frameworkext.WithKoordinatorClientSet(koordClient), + frameworkext.WithKoordinatorSharedInformerFactory(koordInformerFactory)) + if err != nil { + return nil, err + } + extender := extenderFactory.NewFrameworkExtender(handle.(framework.Framework)) + *plugin, err = factoryFn(args, &PredicateClientSetAndHandle{ + ExtendedHandle: extender, + koordinatorClientSet: koordClient, + koordInformerFactory: koordInformerFactory, + }) + return *plugin, err + } +} + +func TestPlugin_Score(t *testing.T) { + + var v1beta2args v1beta3.ScarceResourceAvoidanceArgs + v1beta2args.Resources = []v1.ResourceName{"nvidia.com/gpu"} + + var scarceResourceAvoidanceArgs config.ScarceResourceAvoidanceArgs + err := v1beta3.Convert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(&v1beta2args, &scarceResourceAvoidanceArgs, nil) + assert.NoError(t, err) + + var ptplugin framework.Plugin + proxyNew := NodeResourcesPluginFactoryProxy(New, &ptplugin) + + cs := kubefake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + "xx.xx/xx": resource.MustParse("8"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + "xx.xx/xx": resource.MustParse("8"), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode2", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + }, + }, + }, + } + + registeredPlugins := []schedulertesting.RegisterPluginFunc{ + schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + } + + snapshot := newTestSharedLister(nil, nodes) + fh, err := schedulertesting.NewFramework(context.TODO(), registeredPlugins, "koord-scheduler", + frameworkruntime.WithClientSet(cs), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + ) + assert.Nil(t, err) + + p, err := proxyNew(&scarceResourceAvoidanceArgs, fh) + p.Name() + assert.NotNil(t, p) + assert.Nil(t, err) + plug := p.(*Plugin) + h := plug.handle.(*PredicateClientSetAndHandle) + + informerFactory.Start(context.TODO().Done()) + informerFactory.WaitForCacheSync(context.TODO().Done()) + + h.koordInformerFactory.Start(context.TODO().Done()) + h.koordInformerFactory.WaitForCacheSync(context.TODO().Done()) + + cycleState := framework.NewCycleState() + + nodeInfo, err := snapshot.Get("testNode1") + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + nodeInfo, err = snapshot.Get("testNode2") + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-pod-1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + } + scoreNode1, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode1") + scoreNode2, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode2") + if scoreNode1 >= scoreNode2 { + t.Fatal("scoreNode1 must >= scoreNode2") + } +} + +func (f *testSharedLister) StorageInfos() framework.StorageInfoLister { + return f +} + +func (f *testSharedLister) IsPVCUsedByPods(key string) bool { + return false +} + +func (f *testSharedLister) NodeInfos() framework.NodeInfoLister { + return f +} + +func (f *testSharedLister) List() ([]*framework.NodeInfo, error) { + return f.nodeInfos, nil +} + +func (f *testSharedLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) { + return f.nodeInfoMap[nodeName], nil +}