diff --git a/Makefile b/Makefile index ef26083..d4501c7 100644 --- a/Makefile +++ b/Makefile @@ -57,8 +57,9 @@ generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and $(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..." .PHONY: fmt -fmt: ## Run go fmt against code. +fmt: goimports ## Run go fmt against code. go fmt ./... + $(GOIMPORTS) -w ./ .PHONY: vet vet: ## Run go vet against code. @@ -232,6 +233,7 @@ ENVTEST ?= $(LOCALBIN)/setup-envtest ENVSUBST ?= $(LOCALBIN)/envsubst KUBECTL ?= $(LOCALBIN)/kubectl GOLANGCI_LINT ?= $(LOCALBIN)/golangci-lint +GOIMPORTS ?= $(LOCALBIN)/goimports ## Tool Versions KUSTOMIZE_VERSION ?= v5.0.0 @@ -280,3 +282,8 @@ $(SETUP_ENVTEST): go.mod # Build setup-envtest from tools folder. golangci-lint: $(GOLANGCI_LINT) $(GOLANGCI_LINT): $(LOCALBIN) curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(LOCALBIN) v1.54.0 + +.PHONY: goimports +goimports: $(GOIMPORTS) +$(GOIMPORTS): $(LOCALBIN) + GOBIN=$(LOCALBIN) go install golang.org/x/tools/cmd/goimports@latest diff --git a/README.md b/README.md index 4f9bf08..c154c1d 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,8 @@ kubectl delete cluster cappx-test - Supports custom cloud-config (user data). CAPPX uses VNC websockert for bootstrapping nodes so it can applies custom cloud-config that can not be achieved by only Proxmox API. +- Flexible vmid/node assigning. You can flexibly assign vmid to your qemu and flexibly schedule qemus to proxmox nodes. For more details please check [qemu-scheduler](./cloud/scheduler/). + ### Node Images CAPPX is compatible with `iso`, `qcow2`, `qed`, `raw`, `vdi`, `vpc`, `vmdk` format of image. You can build your own node image and use it for `ProxmoxMachine`. diff --git a/cloud/interfaces.go b/cloud/interfaces.go index d311bea..d6548e7 100644 --- a/cloud/interfaces.go +++ b/cloud/interfaces.go @@ -8,6 +8,7 @@ import ( clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" infrav1 "github.com/sp-yduck/cluster-api-provider-proxmox/api/v1beta1" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler" ) type Reconciler interface { @@ -45,8 +46,10 @@ type ClusterSettter interface { // MachineGetter is an interface which can get machine information. type MachineGetter interface { Client + GetScheduler(client *proxmox.Service) *scheduler.Scheduler Name() string Namespace() string + Annotations() map[string]string // Zone() string // Role() string // IsControlPlane() bool diff --git a/cloud/scheduler/README.md b/cloud/scheduler/README.md new file mode 100644 index 0000000..c825b73 --- /dev/null +++ b/cloud/scheduler/README.md @@ -0,0 +1,67 @@ +# qemu-scheduler + +Scheduling refers to making sure that VM(QEMU) are matched to Proxmox Nodes. + +## How qemu-scheduler select proxmox node to run qemu + +Basic flow of the node selection process is `filter => score => select one node which has highest score` + +### Filter Plugins + +Filter plugins filter the node based on nodename, overcommit ratio etc. + +#### regex plugin + +Regex plugin is a one of the default Filter Plugin of qemu-scheduler. You can specify node name as regex format. +```sh +key: node.qemu-scheduler/regex +value(example): node[0-9]+ +``` + +### Score Plugins + +Score plugins score the nodes based on resource etc. + +## How to specify vmid +qemu-scheduler reads context and find key registerd to scheduler. If the context has any value of the registerd key, qemu-scheduler uses the plugin that matchies the key. + +### Range Plugin +You can specify vmid range with `(start id)-(end id)` format. +```sh +key: vmid.qemu-scheduler/range +value(example): 100-150 +``` + +### Regex Plugin +```sh +key: vmid.qemu-scheduler/regex +value(example): (12[0-9]|130) +``` + +## How qemu-scheduler works with CAPPX +CAPPX passes all the annotation (of `ProxmoxMachine`) key-values to scheduler's context. So if you will use Range Plugin for your `ProxmoxMachine`, your manifest must look like following. +```sh +apiVersion: infrastructure.cluster.x-k8s.io/v1beta1 +kind: ProxmoxMachine +metadata: + name: sample-machine + annotations: + vmid.qemu-scheduler/range: 100-150 # this means your vmid will be chosen from the range of 100 to 150. +``` + +Also, you can specifies these annotations via `MachineDeployment` since Cluster API propagates some metadatas (ref: [metadata-propagation](https://cluster-api.sigs.k8s.io/developer/architecture/controllers/metadata-propagation.html#metadata-propagation)). + +For example, your `MachineDeployment` may look like following. +```sh +apiVersion: cluster.x-k8s.io/v1beta1 +kind: MachineDeployment +metadata: + annotations: + caution: "# do not use here, because this annotation won't be propagated to your ProxmoxMachine" + name: sample-machine-deployment +spec: + template: + metadata: + annotations: + node.qemu-scheduler/regex: node[0-9]+ # this annotation will be propagated to your ProxmoxMachine via MachineSet +``` \ No newline at end of file diff --git a/cloud/scheduler/framework/cycle_state.go b/cloud/scheduler/framework/cycle_state.go new file mode 100644 index 0000000..ebbfc8f --- /dev/null +++ b/cloud/scheduler/framework/cycle_state.go @@ -0,0 +1,77 @@ +package framework + +import ( + "github.com/sp-yduck/proxmox-go/api" + "github.com/sp-yduck/proxmox-go/proxmox" +) + +type CycleState struct { + completed bool + err error + messages map[string]string + result SchedulerResult +} + +type SchedulerResult struct { + vmid int + node string + instance *proxmox.VirtualMachine +} + +func NewCycleState() CycleState { + return CycleState{completed: false, err: nil, messages: map[string]string{}} +} + +func (c *CycleState) SetComplete() { + c.completed = true +} + +func (c *CycleState) IsCompleted() bool { + return c.completed +} + +func (c *CycleState) SetError(err error) { + c.err = err +} + +func (c *CycleState) Error() error { + return c.err +} + +func (c *CycleState) SetMessage(pluginName, message string) { + c.messages[pluginName] = message +} + +func (c *CycleState) Messages() map[string]string { + return c.messages +} + +func (c *CycleState) QEMU() *api.VirtualMachine { + return c.result.instance.VM +} + +func (c *CycleState) UpdateState(completed bool, err error, result SchedulerResult) { + c.completed = completed + c.err = err + c.result = result +} + +func NewSchedulerResult(vmid int, node string, instance *proxmox.VirtualMachine) SchedulerResult { + return SchedulerResult{vmid: vmid, node: node, instance: instance} +} + +func (c *CycleState) Result() SchedulerResult { + return c.result +} + +func (r *SchedulerResult) Node() string { + return r.node +} + +func (r *SchedulerResult) VMID() int { + return r.vmid +} + +func (r *SchedulerResult) Instance() *proxmox.VirtualMachine { + return r.instance +} diff --git a/cloud/scheduler/framework/interface.go b/cloud/scheduler/framework/interface.go new file mode 100644 index 0000000..fc3f386 --- /dev/null +++ b/cloud/scheduler/framework/interface.go @@ -0,0 +1,28 @@ +package framework + +import ( + "context" + + "github.com/sp-yduck/proxmox-go/api" +) + +type Plugin interface { + // return plugin name + Name() string +} + +type NodeFilterPlugin interface { + Plugin + Filter(ctx context.Context, state *CycleState, config api.VirtualMachineCreateOptions, nodeInfo *NodeInfo) *Status +} + +type NodeScorePlugin interface { + Plugin + Score(ctx context.Context, state *CycleState, config api.VirtualMachineCreateOptions, nodeInfo *NodeInfo) (int64, *Status) +} + +type VMIDPlugin interface { + Plugin + PluginKey() CtxKey + Select(ctx context.Context, state *CycleState, config api.VirtualMachineCreateOptions, nextid int, usedID map[int]bool) (int, error) +} diff --git a/cloud/scheduler/framework/suite_test.go b/cloud/scheduler/framework/suite_test.go new file mode 100644 index 0000000..94276bd --- /dev/null +++ b/cloud/scheduler/framework/suite_test.go @@ -0,0 +1,45 @@ +package framework_test + +import ( + "os" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sp-yduck/proxmox-go/proxmox" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +var ( + proxmoxSvc *proxmox.Service +) + +func TestFrameworks(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Scheduler Framework Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + if GinkgoLabelFilter() != "unit" { + By("setup proxmox client to do integration test") + url := os.Getenv("PROXMOX_URL") + user := os.Getenv("PROXMOX_USER") + password := os.Getenv("PROXMOX_PASSWORD") + tokenid := os.Getenv("PROXMOX_TOKENID") + secret := os.Getenv("PROXMOX_SECRET") + + authConfig := proxmox.AuthConfig{ + Username: user, + Password: password, + TokenID: tokenid, + Secret: secret, + } + param := proxmox.NewParams(url, authConfig, proxmox.ClientConfig{InsecureSkipVerify: true}) + var err error + proxmoxSvc, err = proxmox.GetOrCreateService(param) + Expect(err).NotTo(HaveOccurred()) + } +}) diff --git a/cloud/scheduler/framework/types.go b/cloud/scheduler/framework/types.go new file mode 100644 index 0000000..0707d68 --- /dev/null +++ b/cloud/scheduler/framework/types.go @@ -0,0 +1,91 @@ +package framework + +import ( + "context" + + "github.com/sp-yduck/proxmox-go/api" + "github.com/sp-yduck/proxmox-go/proxmox" +) + +type Status struct { + code int + reasons []string + err error + failedPlugin string +} + +func NewStatus() *Status { + return &Status{code: 0} +} + +func (s *Status) Code() int { + return s.code +} + +func (s *Status) SetCode(code int) { + s.code = code +} + +func (s *Status) Reasons() []string { + if s.err != nil { + return append([]string{s.err.Error()}, s.reasons...) + } + return s.reasons +} + +func (s *Status) FailedPlugin() string { + return s.failedPlugin +} + +func (s *Status) SetFailedPlugin(name string) { + s.failedPlugin = name +} + +func (s *Status) IsSuccess() bool { + return s.code == 0 +} + +func (s *Status) Error() error { + return s.err +} + +// NodeInfo is node level aggregated information +type NodeInfo struct { + node *api.Node + + // qemus assigned to the node + qemus []*api.VirtualMachine +} + +func GetNodeInfoList(ctx context.Context, client *proxmox.Service) ([]*NodeInfo, error) { + nodes, err := client.Nodes(ctx) + if err != nil { + return nil, err + } + nodeInfos := []*NodeInfo{} + for _, node := range nodes { + qemus, err := client.RESTClient().GetVirtualMachines(ctx, node.Node) + if err != nil { + return nil, err + } + nodeInfos = append(nodeInfos, &NodeInfo{node: node, qemus: qemus}) + } + return nodeInfos, nil +} + +func (n NodeInfo) Node() *api.Node { + return n.node +} + +func (n NodeInfo) QEMUs() []*api.VirtualMachine { + return n.qemus +} + +// NodeScoreList declares a list of nodes and their scores. +type NodeScoreList []NodeScore + +// NodeScore is a struct with node name and score. +type NodeScore struct { + Name string + Score int64 +} diff --git a/cloud/scheduler/framework/types_test.go b/cloud/scheduler/framework/types_test.go new file mode 100644 index 0000000..7b3cae8 --- /dev/null +++ b/cloud/scheduler/framework/types_test.go @@ -0,0 +1,21 @@ +package framework_test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" +) + +var _ = Describe("GetNodeInfoList", Label("integration", "framework"), func() { + ctx := context.Background() + + It("should not error", func() { + nodes, err := framework.GetNodeInfoList(ctx, proxmoxSvc) + Expect(err).To(BeNil()) + Expect(len(nodes)).ToNot(Equal(0)) + }) + +}) diff --git a/cloud/scheduler/framework/utils.go b/cloud/scheduler/framework/utils.go new file mode 100644 index 0000000..da07d0b --- /dev/null +++ b/cloud/scheduler/framework/utils.go @@ -0,0 +1,14 @@ +package framework + +import "context" + +type CtxKey string + +// bind map's key-value to context key-value. +// type of key is translated to CtxKey type +func ContextWithMap(ctx context.Context, m map[string]string) context.Context { + for key, value := range m { + ctx = context.WithValue(ctx, CtxKey(key), value) + } + return ctx +} diff --git a/cloud/scheduler/framework/utils_test.go b/cloud/scheduler/framework/utils_test.go new file mode 100644 index 0000000..fb4b6f7 --- /dev/null +++ b/cloud/scheduler/framework/utils_test.go @@ -0,0 +1,39 @@ +package framework_test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" +) + +var _ = Describe("ContextWithMap", Label("unit", "framework"), func() { + c := context.Background() + + Context("input empty map", func() { + It("should not panic", func() { + m := map[string]string{} + ctx := framework.ContextWithMap(c, m) + Expect(ctx).To(Equal(c)) + }) + }) + + Context("input nil", func() { + It("should not panic", func() { + ctx := framework.ContextWithMap(c, nil) + Expect(ctx).To(Equal(c)) + }) + }) + + Context("input random map", func() { + It("should get map's key-value", func() { + m := map[string]string{} + m["abc"] = "ABC" + ctx := framework.ContextWithMap(c, m) + Expect(ctx.Value("abc")).ToNot(Equal("ABC")) + Expect(ctx.Value(framework.CtxKey("abc"))).To(Equal("ABC")) + }) + }) +}) diff --git a/cloud/scheduler/plugins/idrange/export_test.go b/cloud/scheduler/plugins/idrange/export_test.go new file mode 100644 index 0000000..abeef9a --- /dev/null +++ b/cloud/scheduler/plugins/idrange/export_test.go @@ -0,0 +1,7 @@ +package idrange + +import "context" + +func FindVMIDRange(ctx context.Context) (int, int, error) { + return findVMIDRange(ctx) +} diff --git a/cloud/scheduler/plugins/idrange/idrange.go b/cloud/scheduler/plugins/idrange/idrange.go new file mode 100644 index 0000000..00398b6 --- /dev/null +++ b/cloud/scheduler/plugins/idrange/idrange.go @@ -0,0 +1,66 @@ +package idrange + +import ( + "context" + "fmt" + "strconv" + "strings" + + "github.com/sp-yduck/proxmox-go/api" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/names" +) + +type Range struct{} + +var _ framework.VMIDPlugin = &Range{} + +const ( + Name = names.Range + VMIDRangeKey = "vmid.qemu-scheduler/range" +) + +func (pl *Range) Name() string { + return Name +} + +func (pl *Range) PluginKey() framework.CtxKey { + return framework.CtxKey(VMIDRangeKey) +} + +// select minimum id being not used in specified range +// range is specified in ctx value (key=vmid.qemu-scheduler/range) +func (pl *Range) Select(ctx context.Context, state *framework.CycleState, _ api.VirtualMachineCreateOptions, nextid int, usedID map[int]bool) (int, error) { + start, end, err := findVMIDRange(ctx) + if err != nil { + state.SetMessage(pl.Name(), "no idrange is specified, use nextid.") + return nextid, nil + } + for i := start; i <= end; i++ { + _, used := usedID[i] + if !used { + return i, nil + } + } + return 0, fmt.Errorf("no available vmid in range %d-%d", start, end) +} + +// specify available vmid as range +// example: vmid.qemu-scheduler/range=start-end +func findVMIDRange(ctx context.Context) (int, int, error) { + value := ctx.Value(framework.CtxKey(VMIDRangeKey)) + if value == nil { + return 0, 0, fmt.Errorf("no vmid range is specified") + } + rangeStrs := strings.Split(fmt.Sprintf("%s", value), "-") + start, err := strconv.Atoi(rangeStrs[0]) + if err != nil { + return 0, 0, fmt.Errorf("invalid range is specified: %w", err) + } + end, err := strconv.Atoi(rangeStrs[1]) + if err != nil { + return 0, 0, fmt.Errorf("invalid range is specified: %w", err) + } + return start, end, nil +} diff --git a/cloud/scheduler/plugins/idrange/idrange_test.go b/cloud/scheduler/plugins/idrange/idrange_test.go new file mode 100644 index 0000000..3031b83 --- /dev/null +++ b/cloud/scheduler/plugins/idrange/idrange_test.go @@ -0,0 +1,60 @@ +package idrange_test + +import ( + "context" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/idrange" +) + +func TestIDRange(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "idrange plugin") +} + +var _ = Describe("findVMIDRange", Label("unit", "plugins"), func() { + ctx := context.Background() + + Context("no 'vmid.qemu-scheduler/range' key in the context", func() { + It("should error", func() { + start, end, err := idrange.FindVMIDRange(ctx) + Expect(start).To(Equal(0)) + Expect(end).To(Equal(0)) + Expect(err.Error()).To(Equal("no vmid range is specified")) + }) + }) + + Context("specify invalid range (start)", func() { + It("should error", func() { + c := context.WithValue(ctx, framework.CtxKey(idrange.VMIDRangeKey), "a-10") + start, end, err := idrange.FindVMIDRange(c) + Expect(start).To(Equal(0)) + Expect(end).To(Equal(0)) + Expect(err.Error()).To(ContainSubstring("invalid range is specified")) + }) + }) + + Context("specify invalid range (end)", func() { + It("should error", func() { + c := context.WithValue(ctx, framework.CtxKey(idrange.VMIDRangeKey), "10-b") + start, end, err := idrange.FindVMIDRange(c) + Expect(start).To(Equal(0)) + Expect(end).To(Equal(0)) + Expect(err.Error()).To(ContainSubstring("invalid range is specified")) + }) + }) + + Context("specify valid range", func() { + It("should not error", func() { + c := context.WithValue(ctx, framework.CtxKey(idrange.VMIDRangeKey), "10-20") + start, end, err := idrange.FindVMIDRange(c) + Expect(start).To(Equal(10)) + Expect(end).To(Equal(20)) + Expect(err).ToNot(HaveOccurred()) + }) + }) +}) diff --git a/cloud/scheduler/plugins/names/names.go b/cloud/scheduler/plugins/names/names.go new file mode 100644 index 0000000..94482ce --- /dev/null +++ b/cloud/scheduler/plugins/names/names.go @@ -0,0 +1,26 @@ +package names + +// node plugins +const ( + // filter plugins + // filter by node name + NodeName = "NodeName" + // filter by node name regex + NodeRegex = "NodeRegex" + // filter by cpu overcommit ratio + CPUOvercommit = "CPUOvercommit" + // filter by memory overcommit ratio + MemoryOvercommit = "MemoryOvercommit" + + // score plugins + // random score + Random = "Random" + // resource utilization score + NodeResource = "NodeResource" + + // vmid plugins + // select by range + Range = "Range" + // select by regex + Regex = "Regex" +) diff --git a/cloud/scheduler/plugins/nodename/node_name.go b/cloud/scheduler/plugins/nodename/node_name.go new file mode 100644 index 0000000..e5cbe87 --- /dev/null +++ b/cloud/scheduler/plugins/nodename/node_name.go @@ -0,0 +1,37 @@ +package nodename + +import ( + "context" + + "github.com/sp-yduck/proxmox-go/api" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/names" +) + +type NodeName struct{} + +var _ framework.NodeFilterPlugin = &NodeName{} + +const ( + Name = names.NodeName + ErrReason = "node didn't match the requested node name" +) + +func (pl *NodeName) Name() string { + return Name +} + +func (pl *NodeName) Filter(ctx context.Context, _ *framework.CycleState, config api.VirtualMachineCreateOptions, nodeInfo *framework.NodeInfo) *framework.Status { + if !Fits(config, nodeInfo) { + status := framework.NewStatus() + status.SetCode(1) + return status + } + return &framework.Status{} +} + +// return true if config.Node is empty or match with node name +func Fits(config api.VirtualMachineCreateOptions, nodeInfo *framework.NodeInfo) bool { + return config.Node == "" || config.Node == nodeInfo.Node().Node +} diff --git a/cloud/scheduler/plugins/noderesource/node_resrouce.go b/cloud/scheduler/plugins/noderesource/node_resrouce.go new file mode 100644 index 0000000..8508ed4 --- /dev/null +++ b/cloud/scheduler/plugins/noderesource/node_resrouce.go @@ -0,0 +1,32 @@ +package noderesource + +import ( + "context" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/names" + "github.com/sp-yduck/proxmox-go/api" +) + +type NodeResource struct{} + +var _ framework.NodeScorePlugin = &NodeResource{} + +const ( + Name = names.NodeResource +) + +func (pl *NodeResource) Name() string { + return Name +} + +// score = 1/(cpu/maxcpu * mem/maxmem) +func (pl *NodeResource) Score(ctx context.Context, state *framework.CycleState, _ api.VirtualMachineCreateOptions, nodeInfo *framework.NodeInfo) (int64, *framework.Status) { + cpu := nodeInfo.Node().Cpu + maxCPU := nodeInfo.Node().MaxCpu + mem := nodeInfo.Node().Mem + maxMem := nodeInfo.Node().MaxMem + u := cpu / float32(maxCPU) * float32(mem/maxMem) + score := int64(1 / u) + return score, nil +} diff --git a/cloud/scheduler/plugins/overcommit/cpu_overcommit.go b/cloud/scheduler/plugins/overcommit/cpu_overcommit.go new file mode 100644 index 0000000..beb4134 --- /dev/null +++ b/cloud/scheduler/plugins/overcommit/cpu_overcommit.go @@ -0,0 +1,48 @@ +package overcommit + +import ( + "context" + + "github.com/sp-yduck/proxmox-go/api" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/names" +) + +type CPUOvercommit struct{} + +var _ framework.NodeFilterPlugin = &CPUOvercommit{} + +const ( + CPUOvercommitName = names.CPUOvercommit + defaultCPUOvercommitRatio = 4 +) + +func (pl *CPUOvercommit) Name() string { + return CPUOvercommitName +} + +// filter by cpu overcommit ratio +func (pl *CPUOvercommit) Filter(ctx context.Context, _ *framework.CycleState, config api.VirtualMachineCreateOptions, nodeInfo *framework.NodeInfo) *framework.Status { + cpu := sumCPUs(nodeInfo.QEMUs()) + maxCPU := nodeInfo.Node().MaxCpu + sockets := config.Sockets + if sockets == 0 { + sockets = 1 + } + ratio := float32(cpu+config.Cores*sockets) / float32(maxCPU) + if ratio > defaultCPUOvercommitRatio { + status := framework.NewStatus() + status.SetCode(1) + return status + } + return &framework.Status{} +} + +func sumCPUs(qemus []*api.VirtualMachine) int { + var result int + for _, q := range qemus { + result += q.Cpus + } + return result +} diff --git a/cloud/scheduler/plugins/overcommit/memory_overcommit.go b/cloud/scheduler/plugins/overcommit/memory_overcommit.go new file mode 100644 index 0000000..c6b7586 --- /dev/null +++ b/cloud/scheduler/plugins/overcommit/memory_overcommit.go @@ -0,0 +1,44 @@ +package overcommit + +import ( + "context" + + "github.com/sp-yduck/proxmox-go/api" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/names" +) + +type MemoryOvercommit struct{} + +var _ framework.NodeFilterPlugin = &MemoryOvercommit{} + +const ( + MemoryOvercommitName = names.MemoryOvercommit + defaultMemoryOvercommitRatio = 1 +) + +func (pl *MemoryOvercommit) Name() string { + return MemoryOvercommitName +} + +// filter by memory overcommit ratio +func (pl *MemoryOvercommit) Filter(ctx context.Context, _ *framework.CycleState, config api.VirtualMachineCreateOptions, nodeInfo *framework.NodeInfo) *framework.Status { + mem := sumMems(nodeInfo.QEMUs()) + maxMem := nodeInfo.Node().MaxMem + ratio := float32(mem+1024*1024*config.Memory) / float32(maxMem) + if ratio >= defaultMemoryOvercommitRatio { + status := framework.NewStatus() + status.SetCode(1) + return status + } + return &framework.Status{} +} + +func sumMems(qemus []*api.VirtualMachine) int { + var result int + for _, q := range qemus { + result += q.MaxMem + } + return result +} diff --git a/cloud/scheduler/plugins/random/random.go b/cloud/scheduler/plugins/random/random.go new file mode 100644 index 0000000..ecf181f --- /dev/null +++ b/cloud/scheduler/plugins/random/random.go @@ -0,0 +1,33 @@ +package random + +import ( + "context" + "math/rand" + "time" + + "github.com/sp-yduck/proxmox-go/api" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/names" +) + +type Random struct{} + +var _ framework.NodeScorePlugin = &Random{} + +const ( + Name = names.Random +) + +func (pl *Random) Name() string { + return Name +} + +// return random score: 0 <= n < 100. +// just a sample plugin +func (pl *Random) Score(ctx context.Context, state *framework.CycleState, config api.VirtualMachineCreateOptions, nodeInfo *framework.NodeInfo) (int64, *framework.Status) { + src := rand.NewSource(time.Now().Unix()) + r := rand.New(src) + score := r.Int63n(100) + return score, nil +} diff --git a/cloud/scheduler/plugins/regex/node_regex.go b/cloud/scheduler/plugins/regex/node_regex.go new file mode 100644 index 0000000..60ac33b --- /dev/null +++ b/cloud/scheduler/plugins/regex/node_regex.go @@ -0,0 +1,54 @@ +package regex + +import ( + "context" + "fmt" + "regexp" + + "github.com/sp-yduck/proxmox-go/api" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/names" +) + +type NodeRegex struct{} + +var _ framework.NodeFilterPlugin = &NodeRegex{} + +const ( + NodeRegexName = names.NodeRegex + NodeRegexKey = "node.qemu-scheduler/regex" +) + +func (pl *NodeRegex) Name() string { + return NodeRegexName +} + +// regex is specified in ctx value (key=node.qemu-scheduler/regex) +func (pl *NodeRegex) Filter(ctx context.Context, state *framework.CycleState, config api.VirtualMachineCreateOptions, nodeInfo *framework.NodeInfo) *framework.Status { + reg, err := findNodeRegex(ctx) + if err != nil { + state.SetMessage(pl.Name(), "no valid regex is specified, skip") + return &framework.Status{} + } + if !reg.MatchString(nodeInfo.Node().Node) { + status := framework.NewStatus() + status.SetCode(1) + return status + } + return &framework.Status{} +} + +// specify available node name as regex +// example: node.qemu-scheduler/regex=node[0-9]+ +func findNodeRegex(ctx context.Context) (*regexp.Regexp, error) { + value := ctx.Value(framework.CtxKey(NodeRegexKey)) + if value == nil { + return nil, fmt.Errorf("no node name regex is specified") + } + reg, err := regexp.Compile(fmt.Sprintf("%s", value)) + if err != nil { + return nil, err + } + return reg, nil +} diff --git a/cloud/scheduler/plugins/regex/vmid_regex.go b/cloud/scheduler/plugins/regex/vmid_regex.go new file mode 100644 index 0000000..428643a --- /dev/null +++ b/cloud/scheduler/plugins/regex/vmid_regex.go @@ -0,0 +1,60 @@ +package regex + +import ( + "context" + "fmt" + "regexp" + + "github.com/sp-yduck/proxmox-go/api" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/names" +) + +type Regex struct{} + +var _ framework.VMIDPlugin = &Regex{} + +const ( + Name = names.Regex + VMIDRegexKey = "vmid.qemu-scheduler/regex" +) + +func (pl *Regex) Name() string { + return Name +} + +func (pl *Regex) PluginKey() framework.CtxKey { + return framework.CtxKey(VMIDRegexKey) +} + +// select minimum id being not used and matching regex +// regex is specified in ctx value (key=vmid.qemu-scheduler/regex) +func (pl *Regex) Select(ctx context.Context, state *framework.CycleState, _ api.VirtualMachineCreateOptions, nextid int, usedID map[int]bool) (int, error) { + idregex, err := findVMIDRegex(ctx) + if err != nil { + state.SetMessage(pl.Name(), "no idregex is specified, use nextid.") + return nextid, nil + } + for i := nextid; i < 1000000000; i++ { + _, used := usedID[i] + if idregex.MatchString(fmt.Sprintf("%d", i)) && !used { + return i, nil + } + } + return 0, fmt.Errorf("no available vmid") +} + +// specify available vmid as regex +// example: vmid.qemu-scheduler/regex=(12[0-9]|130) +func findVMIDRegex(ctx context.Context) (*regexp.Regexp, error) { + value := ctx.Value(framework.CtxKey(VMIDRegexKey)) + if value == nil { + return nil, fmt.Errorf("no vmid regex is specified") + } + reg, err := regexp.Compile(fmt.Sprintf("%s", value)) + if err != nil { + return nil, err + } + return reg, nil +} diff --git a/cloud/scheduler/plugins/registry.go b/cloud/scheduler/plugins/registry.go new file mode 100644 index 0000000..1d728a0 --- /dev/null +++ b/cloud/scheduler/plugins/registry.go @@ -0,0 +1,33 @@ +package plugins + +import ( + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/idrange" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/nodename" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/noderesource" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/overcommit" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins/regex" +) + +func NewNodeFilterPlugins() []framework.NodeFilterPlugin { + return []framework.NodeFilterPlugin{ + &nodename.NodeName{}, + &overcommit.CPUOvercommit{}, + &overcommit.MemoryOvercommit{}, + ®ex.NodeRegex{}, + } +} + +func NewNodeScorePlugins() []framework.NodeScorePlugin { + return []framework.NodeScorePlugin{ + // &random.Random{}, + &noderesource.NodeResource{}, + } +} + +func NewVMIDPlugins() []framework.VMIDPlugin { + return []framework.VMIDPlugin{ + &idrange.Range{}, + ®ex.Regex{}, + } +} diff --git a/cloud/scheduler/queue/queue.go b/cloud/scheduler/queue/queue.go new file mode 100644 index 0000000..6f6fb2a --- /dev/null +++ b/cloud/scheduler/queue/queue.go @@ -0,0 +1,83 @@ +package queue + +import ( + "context" + "sync" + + "github.com/sp-yduck/proxmox-go/api" +) + +type SchedulingQueue struct { + activeQ []*qemuSpec + lock *sync.Cond + shuttingDown bool +} + +func New() *SchedulingQueue { + return &SchedulingQueue{ + activeQ: []*qemuSpec{}, + lock: sync.NewCond(&sync.Mutex{}), + } +} + +// qemu create option and context. +// each scheduling plugins retrieves values from this context +type qemuSpec struct { + ctx context.Context + config *api.VirtualMachineCreateOptions +} + +// add new qemuSpec to queue +func (s *SchedulingQueue) Add(ctx context.Context, config *api.VirtualMachineCreateOptions) { + s.lock.L.Lock() + defer s.lock.L.Unlock() + + if s.shuttingDown { + return + } + + s.activeQ = append(s.activeQ, &qemuSpec{ctx: ctx, config: config}) + s.lock.Signal() +} + +// return length of active queue +// func (s *SchedulingQueue) Len() int { +// s.lock.L.Lock() +// defer s.lock.L.Unlock() +// return len(s.activeQ) +// } + +// return nex qemuSpec +func (s *SchedulingQueue) Get() (spec *qemuSpec, shutdown bool) { + s.lock.L.Lock() + defer s.lock.L.Unlock() + for len(s.activeQ) == 0 && !s.shuttingDown { + s.lock.Wait() + } + if len(s.activeQ) == 0 { + return nil, true + } + + spec = s.activeQ[0] + // The underlying array still exists and reference this object, + // so the object will not be garbage collected. + s.activeQ[0] = nil + s.activeQ = s.activeQ[1:] + return spec, false +} + +// shut down the queue +func (s *SchedulingQueue) ShutDown() { + s.lock.L.Lock() + defer s.lock.L.Unlock() + s.shuttingDown = true + s.lock.Broadcast() +} + +func (s *qemuSpec) Config() *api.VirtualMachineCreateOptions { + return s.config +} + +func (s *qemuSpec) Context() context.Context { + return s.ctx +} diff --git a/cloud/scheduler/queue/queue_test.go b/cloud/scheduler/queue/queue_test.go new file mode 100644 index 0000000..2071bda --- /dev/null +++ b/cloud/scheduler/queue/queue_test.go @@ -0,0 +1,62 @@ +package queue_test + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/queue" + "github.com/sp-yduck/proxmox-go/api" +) + +func TestQueue(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Queue Suite") +} + +var _ = Describe("New", Label("unit", "queue"), func() { + It("shoud not error", func() { + q := queue.New() + Expect(q).ToNot(BeNil()) + }) +}) + +var _ = Describe("Add", Label("unit", "queue"), func() { + q := queue.New() + + It("should not error", func() { + q.Add(context.Background(), &api.VirtualMachineCreateOptions{Name: "foo"}) + }) +}) + +var _ = Describe("Get", Label("unit", "queue"), func() { + var q *queue.SchedulingQueue + + BeforeEach(func() { + q = queue.New() + }) + + Context("normal", func() { + It("should run properly", func() { + c := &api.VirtualMachineCreateOptions{Name: "foo"} + q.Add(context.Background(), c) + qemu, shutdown := q.Get() + Expect(qemu.Config()).To(Equal(c)) + Expect(shutdown).To(BeFalse()) + }) + }) + + Context("shutdown empty queue after 1 sec", func() { + It("should get nil", func() { + go func() { + time.Sleep(1 * time.Second) + q.ShutDown() + }() + qemu, shutdown := q.Get() + Expect(qemu).To(BeNil()) + Expect(shutdown).To(BeTrue()) + }) + }) +}) diff --git a/cloud/scheduler/scheduler.go b/cloud/scheduler/scheduler.go new file mode 100644 index 0000000..fb8dcda --- /dev/null +++ b/cloud/scheduler/scheduler.go @@ -0,0 +1,406 @@ +package scheduler + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + "github.com/sp-yduck/proxmox-go/api" + "github.com/sp-yduck/proxmox-go/proxmox" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/plugins" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/queue" +) + +var ( + // ErrNoNodesAvailable is used to describe the error that no nodes available to schedule qemus. + ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule qemus") + + // ErrNoVMIDAvailable is used to describe the error that no vmid available to schedule qemus. + ErrNoVMIDAvailable = fmt.Errorf("no vmid available to schedule qemus") +) + +// manager manages schedulers +type Manager struct { + ctx context.Context + + // params is used for initializing each scheduler + params SchedulerParams + + // scheduler map + table map[schedulerID]*Scheduler +} + +// return manager with initialized scheduler-table +func NewManager(params SchedulerParams) *Manager { + table := make(map[schedulerID]*Scheduler) + return &Manager{ctx: context.Background(), params: params, table: table} +} + +// return new/existing scheduler +func (m *Manager) GetOrCreateScheduler(client *proxmox.Service) *Scheduler { + m.params.Logger = m.params.Logger.WithValues("Name", "qemu-scheduler") + schedID, err := m.getSchedulerID(client) + if err != nil { + // create new scheduler without registering + // to not make it zombie scheduler set timeout to context + sched := m.NewScheduler(client, WithTimeout(1*time.Minute)) + return sched + } + m.params.Logger = m.params.Logger.WithValues("scheduler ID", *schedID) + sched, ok := m.table[*schedID] + if !ok { + // create and register new scheduler + m.params.Logger.V(5).Info("registering new scheduler") + sched := m.NewScheduler(client) + m.table[*schedID] = sched + return sched + } + m.params.Logger.V(5).Info("using existing scheduler") + return sched +} + +// return new scheduler. +// usually better to use GetOrCreateScheduler instead. +func (m *Manager) NewScheduler(client *proxmox.Service, opts ...SchedulerOption) *Scheduler { + ctx, cancel := context.WithCancel(m.ctx) + sched := &Scheduler{ + client: client, + schedulingQueue: queue.New(), + + filterPlugins: plugins.NewNodeFilterPlugins(), + scorePlugins: plugins.NewNodeScorePlugins(), + vmidPlugins: plugins.NewVMIDPlugins(), + + resultMap: make(map[string]chan *framework.CycleState), + logger: m.params.Logger, + + ctx: ctx, + cancel: cancel, + } + + for _, fn := range opts { + fn(sched) + } + + return sched +} + +type SchedulerOption func(s *Scheduler) +type CancelFunc func() + +func (s *Scheduler) WithCancel() (*Scheduler, CancelFunc) { + return s, s.Stop +} + +// set timeout to scheduler +func WithTimeout(timeout time.Duration) SchedulerOption { + return func(s *Scheduler) { + _, cancel := s.WithCancel() + go time.AfterFunc(timeout, cancel) + } +} + +// get scheduler identifier +// (treat ipaddr&fingreprint of node having id=1 as proxmox cluster identifier) +func (m *Manager) getSchedulerID(client *proxmox.Service) (*schedulerID, error) { + joinConfig, err := client.JoinConfig(context.Background()) + if err != nil { + return nil, err + } + for _, node := range joinConfig.NodeList { + if node.NodeID == "1" { + return &schedulerID{IPAddress: node.PVEAddr, Fingreprint: node.PVEFP}, nil + } + } + return nil, fmt.Errorf("no nodes with id=1") +} + +type Scheduler struct { + client *proxmox.Service + schedulingQueue *queue.SchedulingQueue + + filterPlugins []framework.NodeFilterPlugin + scorePlugins []framework.NodeScorePlugin + vmidPlugins []framework.VMIDPlugin + + // to do : cache + + // map[qemu name]chan *framework.CycleState + resultMap map[string]chan *framework.CycleState + logger logr.Logger + + // scheduler status + running bool + + // scheduler runs until this context done + ctx context.Context + + // to stop itself + cancel context.CancelFunc +} + +type SchedulerParams struct { + Logger logr.Logger +} + +type schedulerID struct { + IPAddress string + Fingreprint string +} + +// run scheduler +// and ensure only one process is running +func (s *Scheduler) Run() { + if s.IsRunning() { + s.logger.Info("this scheduler is already running") + return + } + defer func() { s.running = false }() + s.running = true + s.logger.Info("Start Running Scheduler") + wait.UntilWithContext(s.ctx, s.ScheduleOne, 0) + s.logger.Info("Stop Running Scheduler") +} + +func (s *Scheduler) IsRunning() bool { + return s.running +} + +// run scheduelr in parallel +func (s *Scheduler) RunAsync() { + go s.Run() +} + +// stop scheduler +func (s *Scheduler) Stop() { + defer s.cancel() + s.schedulingQueue.ShutDown() +} + +// retrieve one qemuSpec from queue and try to create +// new qemu according to the qemuSpec +func (s *Scheduler) ScheduleOne(ctx context.Context) { + qemu, shutdown := s.schedulingQueue.Get() + if shutdown { + return + } + config := qemu.Config() + qemuCtx := qemu.Context() + s.logger = s.logger.WithValues("qemu", config.Name) + s.logger.Info("scheduling qemu") + + state := framework.NewCycleState() + s.resultMap[config.Name] = make(chan *framework.CycleState, 1) + defer func() { s.resultMap[config.Name] <- &state }() + + // select node to run qemu + node, err := s.SelectNode(qemuCtx, *config) + if err != nil { + state.UpdateState(true, err, framework.SchedulerResult{}) + return + } + + // select vmid to be assigned to qemu + // to do: do this in parallel with SelectNode + vmid, err := s.SelectVMID(qemuCtx, *config) + if err != nil { + state.UpdateState(true, err, framework.SchedulerResult{}) + return + } + + // actually create qemu + vm, err := s.client.CreateVirtualMachine(ctx, node, vmid, *config) + if err != nil { + state.UpdateState(true, err, framework.SchedulerResult{}) + return + } + + result := framework.NewSchedulerResult(vmid, node, vm) + state.UpdateState(true, nil, result) +} + +// wait until CycleState is put into channel and then return it +func (s *Scheduler) WaitStatus(ctx context.Context, config *api.VirtualMachineCreateOptions) (framework.CycleState, error) { + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + var done chan *framework.CycleState + ok := false + for !ok { + done, ok = s.resultMap[config.Name] + if !ok { + time.Sleep(100 * time.Millisecond) + } + } + select { + case state := <-done: + delete(s.resultMap, config.Name) + return *state, nil + case <-ctx.Done(): + err := fmt.Errorf("exceed timeout deadline. schedulingQueue might be shutdowned") + s.logger.Error(err, fmt.Sprintf("schedulingQueue: %v", *s.schedulingQueue)) + return framework.CycleState{}, err + } +} + +// create new qemu with given spec and context +func (s *Scheduler) CreateQEMU(ctx context.Context, config *api.VirtualMachineCreateOptions) (framework.SchedulerResult, error) { + // add qemu spec into the queue + s.schedulingQueue.Add(ctx, config) + + // wait until the scheduller finishes its job + var err error + status, err := s.WaitStatus(ctx, config) + if err != nil { + return status.Result(), err + } + if status.Error() != nil { + s.logger.Error(status.Error(), fmt.Sprintf("failed to create qemu: %v", status.Messages())) + return status.Result(), status.Error() + } + s.logger.Info(fmt.Sprintf("%v", status.Messages())) + return status.Result(), nil +} + +func (s *Scheduler) SelectNode(ctx context.Context, config api.VirtualMachineCreateOptions) (string, error) { + s.logger.Info("finding proxmox node matching qemu") + nodes, err := s.client.Nodes(ctx) + if err != nil { + return "", err + } + + state := framework.NewCycleState() + + // filter + nodelist, _ := s.RunFilterPlugins(ctx, &state, config, nodes) + if len(nodelist) == 0 { + return "", ErrNoNodesAvailable + } + if len(nodelist) == 1 { + return nodelist[0].Node, nil + } + + // score + scorelist, status := s.RunScorePlugins(ctx, &state, config, nodelist) + if !status.IsSuccess() { + s.logger.Error(status.Error(), "scoring failed") + } + selectedNode, err := selectHighestScoreNode(scorelist) + if err != nil { + return "", err + } + s.logger.Info(fmt.Sprintf("proxmox node %s was selected for vm %s", selectedNode, config.Name)) + return selectedNode, nil +} + +func (s *Scheduler) SelectVMID(ctx context.Context, config api.VirtualMachineCreateOptions) (int, error) { + s.logger.Info("finding proxmox vmid to be assigned to qemu") + if config.VMID != nil { + return *config.VMID, nil + } + nextid, err := s.client.NextID(ctx) + if err != nil { + return 0, err + } + usedID, err := usedIDMap(ctx, s.client) + if err != nil { + return 0, err + } + return s.RunVMIDPlugins(ctx, nil, config, nextid, *usedID) +} + +func (s *Scheduler) RunFilterPlugins(ctx context.Context, state *framework.CycleState, config api.VirtualMachineCreateOptions, nodes []*api.Node) ([]*api.Node, error) { + s.logger.Info("filtering proxmox node") + feasibleNodes := make([]*api.Node, 0, len(nodes)) + nodeInfos, err := framework.GetNodeInfoList(ctx, s.client) + if err != nil { + return nil, err + } + for _, nodeInfo := range nodeInfos { + status := framework.NewStatus() + for _, pl := range s.filterPlugins { + status = pl.Filter(ctx, state, config, nodeInfo) + if !status.IsSuccess() { + status.SetFailedPlugin(pl.Name()) + break + } + } + if status.IsSuccess() { + feasibleNodes = append(feasibleNodes, nodeInfo.Node()) + } + } + return feasibleNodes, nil +} + +func (s *Scheduler) RunScorePlugins(ctx context.Context, state *framework.CycleState, config api.VirtualMachineCreateOptions, nodes []*api.Node) (framework.NodeScoreList, *framework.Status) { + s.logger.Info("scoring proxmox node") + var scoresMap map[string](map[int]framework.NodeScore) + nodeInfos, err := framework.GetNodeInfoList(ctx, s.client) + if err != nil { + status := framework.NewStatus() + status.SetCode(1) + return nil, status + } + for index, nodeInfo := range nodeInfos { + for _, pl := range s.scorePlugins { + score, status := pl.Score(ctx, state, config, nodeInfo) + if !status.IsSuccess() { + return nil, status + } + scoresMap[pl.Name()][index] = framework.NodeScore{ + Name: nodeInfo.Node().Node, + Score: score, + } + } + } + result := make(framework.NodeScoreList, 0, len(nodes)) + for i := range nodes { + result = append(result, framework.NodeScore{Name: nodes[i].Node, Score: 0}) + for j := range scoresMap { + result[i].Score += scoresMap[j][i].Score + } + } + return result, nil +} + +func selectHighestScoreNode(scoreList framework.NodeScoreList) (string, error) { + if len(scoreList) == 0 { + return "", fmt.Errorf("empty node score list") + } + selectedScore := framework.NodeScore{Score: -1} + for _, nodescore := range scoreList { + if selectedScore.Score < nodescore.Score { + selectedScore = nodescore + } + } + return selectedScore.Name, nil +} + +func (s *Scheduler) RunVMIDPlugins(ctx context.Context, state *framework.CycleState, config api.VirtualMachineCreateOptions, nextid int, usedID map[int]bool) (int, error) { + for _, pl := range s.vmidPlugins { + key := pl.PluginKey() + value := ctx.Value(key) + if value != nil { + s.logger.WithValues("vmid plugin", pl.Name()).Info("selecting vmid") + return pl.Select(ctx, state, config, nextid, usedID) + } + } + s.logger.Info("no vmid key found. using nextid") + return nextid, nil +} + +// return map[vmid]bool +func usedIDMap(ctx context.Context, client *proxmox.Service) (*map[int]bool, error) { + vms, err := client.VirtualMachines(ctx) + if err != nil { + return nil, err + } + result := make(map[int]bool) + for _, vm := range vms { + result[vm.VMID] = true + } + return &result, nil +} diff --git a/cloud/scheduler/scheduler_test.go b/cloud/scheduler/scheduler_test.go new file mode 100644 index 0000000..cdf9941 --- /dev/null +++ b/cloud/scheduler/scheduler_test.go @@ -0,0 +1,98 @@ +package scheduler_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sp-yduck/proxmox-go/api" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" +) + +var _ = Describe("NewManager", Label("unit", "scheduler"), func() { + It("should not error", func() { + params := scheduler.SchedulerParams{} + manager := scheduler.NewManager(params) + Expect(manager).NotTo(BeNil()) + }) +}) + +var _ = Describe("NewScheduler", Label("unit", "scheduler"), func() { + manager := scheduler.NewManager(scheduler.SchedulerParams{}) + + It("should not error", func() { + sched := manager.NewScheduler(proxmoxSvc) + Expect(sched).NotTo(BeNil()) + }) +}) + +var _ = Describe("GetOrCreateScheduler", Label("integration", "scheduler"), func() { + manager := scheduler.NewManager(scheduler.SchedulerParams{}) + + It("should not error", func() { + sched := manager.GetOrCreateScheduler(proxmoxSvc) + Expect(sched).NotTo(BeNil()) + }) +}) + +var _ = Describe("Run (RunAsync) / IsRunning / Stop", Label("unit", "scheduler"), func() { + manager := scheduler.NewManager(scheduler.SchedulerParams{zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))}) + + Context("with minimal scheduler", func() { + It("should not error", func() { + sched := manager.NewScheduler(proxmoxSvc) + sched.RunAsync() + time.Sleep(1 * time.Second) + Expect(sched.IsRunning()).To(BeTrue()) + sched.Stop() + time.Sleep(1 * time.Second) + Expect(sched.IsRunning()).To(BeFalse()) + }) + }) +}) + +var _ = Describe("WithTimeout", Label("integration", "scheduler"), func() { + manager := scheduler.NewManager(scheduler.SchedulerParams{}) + + It("should not error", func() { + sched := manager.NewScheduler(proxmoxSvc, scheduler.WithTimeout(2*time.Second)) + Expect(sched).NotTo(BeNil()) + sched.RunAsync() + time.Sleep(1 * time.Second) + Expect(sched.IsRunning()).To(BeTrue()) + time.Sleep(2 * time.Second) + Expect(sched.IsRunning()).To(BeFalse()) + }) +}) + +var _ = Describe("CreateQEMU", Label("integration"), func() { + manager := scheduler.NewManager(scheduler.SchedulerParams{zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))}) + var result framework.SchedulerResult + + AfterEach(func() { + vm, err := proxmoxSvc.VirtualMachine(context.Background(), result.VMID()) + if err == nil { + err := vm.Delete(context.Background()) + Expect(err).NotTo(HaveOccurred()) + } + }) + + Context("with minimal scheduler", func() { + It("should not error", func() { + sched := manager.NewScheduler(proxmoxSvc) + sched.RunAsync() + var err error + result, err = sched.CreateQEMU(context.Background(), &api.VirtualMachineCreateOptions{ + Name: "qemu-scheduler-test-createqemu", + }) + Expect(err).ToNot(HaveOccurred()) + Expect(result).ToNot(BeNil()) + Expect(result.Node()).To(Equal(result.Instance().Node)) + Expect(result.VMID()).To(Equal(result.Instance().VM.VMID)) + }) + }) +}) diff --git a/cloud/scheduler/suite_test.go b/cloud/scheduler/suite_test.go new file mode 100644 index 0000000..e328558 --- /dev/null +++ b/cloud/scheduler/suite_test.go @@ -0,0 +1,45 @@ +package scheduler_test + +import ( + "os" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sp-yduck/proxmox-go/proxmox" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +var ( + proxmoxSvc *proxmox.Service +) + +func TestScheduler(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Scheduler Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + if GinkgoLabelFilter() != "unit" { + By("setup proxmox client to do integration test") + url := os.Getenv("PROXMOX_URL") + user := os.Getenv("PROXMOX_USER") + password := os.Getenv("PROXMOX_PASSWORD") + tokenid := os.Getenv("PROXMOX_TOKENID") + secret := os.Getenv("PROXMOX_SECRET") + + authConfig := proxmox.AuthConfig{ + Username: user, + Password: password, + TokenID: tokenid, + Secret: secret, + } + param := proxmox.NewParams(url, authConfig, proxmox.ClientConfig{InsecureSkipVerify: true}) + var err error + proxmoxSvc, err = proxmox.GetOrCreateService(param) + Expect(err).NotTo(HaveOccurred()) + } +}) diff --git a/cloud/scope/machine.go b/cloud/scope/machine.go index d1a7574..192225e 100644 --- a/cloud/scope/machine.go +++ b/cloud/scope/machine.go @@ -33,14 +33,16 @@ import ( infrav1 "github.com/sp-yduck/cluster-api-provider-proxmox/api/v1beta1" "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/providerid" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler" ) type MachineScopeParams struct { ProxmoxServices - Client client.Client - Machine *clusterv1.Machine - ProxmoxMachine *infrav1.ProxmoxMachine - ClusterGetter *ClusterScope + Client client.Client + Machine *clusterv1.Machine + ProxmoxMachine *infrav1.ProxmoxMachine + ClusterGetter *ClusterScope + SchedulerManager *scheduler.Manager } func NewMachineScope(params MachineScopeParams) (*MachineScope, error) { @@ -56,6 +58,9 @@ func NewMachineScope(params MachineScopeParams) (*MachineScope, error) { if params.ClusterGetter == nil { return nil, errors.New("failed to generate new scope form nil ClusterScope") } + if params.SchedulerManager == nil { + return nil, errors.New("failed to generate new scope form nil SchedulerManager") + } helper, err := patch.NewHelper(params.ProxmoxMachine, params.Client) if err != nil { @@ -63,26 +68,34 @@ func NewMachineScope(params MachineScopeParams) (*MachineScope, error) { } return &MachineScope{ - client: params.Client, - Machine: params.Machine, - ProxmoxMachine: params.ProxmoxMachine, - patchHelper: helper, - ClusterGetter: params.ClusterGetter, + client: params.Client, + Machine: params.Machine, + ProxmoxMachine: params.ProxmoxMachine, + patchHelper: helper, + ClusterGetter: params.ClusterGetter, + SchedulerManager: params.SchedulerManager, }, err } type MachineScope struct { - client client.Client - patchHelper *patch.Helper - Machine *clusterv1.Machine - ProxmoxMachine *infrav1.ProxmoxMachine - ClusterGetter *ClusterScope + client client.Client + patchHelper *patch.Helper + Machine *clusterv1.Machine + ProxmoxMachine *infrav1.ProxmoxMachine + ClusterGetter *ClusterScope + SchedulerManager *scheduler.Manager } func (m *MachineScope) CloudClient() *proxmox.Service { return m.ClusterGetter.CloudClient() } +func (m *MachineScope) GetScheduler(client *proxmox.Service) *scheduler.Scheduler { + sched := m.SchedulerManager.GetOrCreateScheduler(client) + sched.RunAsync() + return sched +} + func (m *MachineScope) GetClusterStorage() infrav1.Storage { return m.ClusterGetter.Storage() } @@ -99,6 +112,10 @@ func (m *MachineScope) Namespace() string { return m.ProxmoxMachine.Namespace } +func (m *MachineScope) Annotations() map[string]string { + return m.ProxmoxMachine.Annotations +} + func (m *MachineScope) NodeName() string { return m.ProxmoxMachine.Spec.Node } diff --git a/cloud/services/compute/instance/qemu.go b/cloud/services/compute/instance/qemu.go index ecf4aa6..74e3d99 100644 --- a/cloud/services/compute/instance/qemu.go +++ b/cloud/services/compute/instance/qemu.go @@ -3,10 +3,8 @@ package instance import ( "context" "fmt" - "math/rand" - "time" - "github.com/pkg/errors" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler/framework" "github.com/sp-yduck/proxmox-go/api" "github.com/sp-yduck/proxmox-go/proxmox" "github.com/sp-yduck/proxmox-go/rest" @@ -22,91 +20,51 @@ func (s *Service) reconcileQEMU(ctx context.Context) (*proxmox.VirtualMachine, e log := log.FromContext(ctx) log.Info("Reconciling QEMU") - nodeName := s.scope.NodeName() - vmid := s.scope.GetVMID() - qemu, err := s.getQEMU(ctx, vmid) + qemu, err := s.getQEMU(ctx) if err == nil { // if qemu is found, return it return qemu, nil } if !rest.IsNotFound(err) { - log.Error(err, fmt.Sprintf("failed to get qemu: node=%s,vmid=%d", nodeName, *vmid)) + log.Error(err, "failed to get qemu") return nil, err } // no qemu found, create new one - return s.createQEMU(ctx, nodeName, vmid) + return s.createQEMU(ctx) } // get QEMU gets proxmox vm from vmid -func (s *Service) getQEMU(ctx context.Context, vmid *int) (*proxmox.VirtualMachine, error) { +func (s *Service) getQEMU(ctx context.Context) (*proxmox.VirtualMachine, error) { + vmid := s.scope.GetVMID() if vmid != nil { return s.client.VirtualMachine(ctx, *vmid) } return nil, rest.NotFoundErr } -func (s *Service) createQEMU(ctx context.Context, nodeName string, vmid *int) (*proxmox.VirtualMachine, error) { +func (s *Service) createQEMU(ctx context.Context) (*proxmox.VirtualMachine, error) { log := log.FromContext(ctx) if err := s.ensureStorageAvailable(ctx); err != nil { return nil, err } - // get node - if nodeName == "" { - // temp solution - node, err := s.getRandomNode(ctx) - if err != nil { - log.Error(err, "failed to get random node") - return nil, err - } - nodeName = node.Node - s.scope.SetNodeName(nodeName) - } - - // if vmid is empty, generate new vmid - if vmid == nil { - nextid, err := s.getNextID(ctx) - if err != nil { - log.Error(err, "failed to get available vmid") - return nil, err - } - vmid = &nextid - } - + // create qemu vmoption := s.generateVMOptions() - vm, err := s.client.CreateVirtualMachine(ctx, nodeName, *vmid, vmoption) + // bind annotation key-values to context + schedCtx := framework.ContextWithMap(ctx, s.scope.Annotations()) + result, err := s.scheduler.CreateQEMU(schedCtx, &vmoption) if err != nil { - log.Error(err, fmt.Sprintf("failed to create qemu instance %s", vm.VM.Name)) - return nil, err - } - s.scope.SetVMID(*vmid) - if err := s.scope.PatchObject(); err != nil { + log.Error(err, "failed to create qemu instance") return nil, err } - return vm, nil -} - -func (s *Service) getNextID(ctx context.Context) (int, error) { - return s.client.RESTClient().GetNextID(ctx) -} - -func (s *Service) getNodes(ctx context.Context) ([]*api.Node, error) { - return s.client.Nodes(ctx) -} -// GetRandomNode returns a node chosen randomly -func (s *Service) getRandomNode(ctx context.Context) (*api.Node, error) { - nodes, err := s.getNodes(ctx) - if err != nil { + s.scope.SetNodeName(result.Node()) + s.scope.SetVMID(result.VMID()) + if err := s.scope.PatchObject(); err != nil { return nil, err } - if len(nodes) <= 0 { - return nil, errors.Errorf("no nodes found") - } - src := rand.NewSource(time.Now().Unix()) - r := rand.New(src) - return nodes[r.Intn(len(nodes))], nil + return result.Instance(), nil } func (s *Service) generateVMOptions() api.VirtualMachineCreateOptions { @@ -144,6 +102,7 @@ func (s *Service) generateVMOptions() api.VirtualMachineCreateOptions { NameServer: network.NameServer, Net: api.Net{Net0: net0}, Numa: boolToInt8(options.NUMA), + Node: s.scope.NodeName(), OnBoot: boolToInt8(options.OnBoot), OSType: api.OSType(options.OSType), Protection: boolToInt8(options.Protection), @@ -160,6 +119,7 @@ func (s *Service) generateVMOptions() api.VirtualMachineCreateOptions { Template: boolToInt8(options.Template), VCPUs: options.VCPUs, VMGenID: options.VMGenerationID, + VMID: s.scope.GetVMID(), VGA: "serial0", } return vmoptions diff --git a/cloud/services/compute/instance/reconcile.go b/cloud/services/compute/instance/reconcile.go index a299455..5fd5d25 100644 --- a/cloud/services/compute/instance/reconcile.go +++ b/cloud/services/compute/instance/reconcile.go @@ -54,7 +54,7 @@ func (s *Service) Delete(ctx context.Context) error { log := log.FromContext(ctx) log.Info("Deleting instance resources") - instance, err := s.getQEMU(ctx, s.scope.GetVMID()) + instance, err := s.getQEMU(ctx) if err != nil { if !rest.IsNotFound(err) { return err diff --git a/cloud/services/compute/instance/service.go b/cloud/services/compute/instance/service.go index c1f5593..b5a763c 100644 --- a/cloud/services/compute/instance/service.go +++ b/cloud/services/compute/instance/service.go @@ -6,6 +6,7 @@ import ( "github.com/sp-yduck/proxmox-go/proxmox" "github.com/sp-yduck/cluster-api-provider-proxmox/cloud" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler" ) type Scope interface { @@ -13,14 +14,16 @@ type Scope interface { } type Service struct { - scope Scope - client proxmox.Service + scope Scope + client proxmox.Service + scheduler *scheduler.Scheduler } func NewService(s Scope) *Service { return &Service{ - scope: s, - client: *s.CloudClient(), + scope: s, + client: *s.CloudClient(), + scheduler: s.GetScheduler(s.CloudClient()), } } diff --git a/cmd/main.go b/cmd/main.go index 99458ff..83bca3f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" infrastructurev1beta1 "github.com/sp-yduck/cluster-api-provider-proxmox/api/v1beta1" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler" controller "github.com/sp-yduck/cluster-api-provider-proxmox/controllers" //+kubebuilder:scaffold:imports ) @@ -93,8 +94,9 @@ func main() { } if err = (&controller.ProxmoxMachineReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + SchedulerManager: scheduler.NewManager(scheduler.SchedulerParams{Logger: zap.New(zap.UseFlagOptions(&opts))}), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ProxmoxMachine") os.Exit(1) diff --git a/config/crd/bases/infrastructure.cluster.x-k8s.io_proxmoxmachines.yaml b/config/crd/bases/infrastructure.cluster.x-k8s.io_proxmoxmachines.yaml index d8f4872..e60ba3f 100644 --- a/config/crd/bases/infrastructure.cluster.x-k8s.io_proxmoxmachines.yaml +++ b/config/crd/bases/infrastructure.cluster.x-k8s.io_proxmoxmachines.yaml @@ -613,11 +613,90 @@ spec: type: integer hookscript: type: string + hostpci0: + type: string + hostpci1: + type: string + hostpci2: + type: string + hostpci3: + type: string hotplug: - description: HostPci type: string hugepages: type: string + ide0: + type: string + ide1: + type: string + ide2: + type: string + ide3: + type: string + ipconfig0: + type: string + ipconfig1: + type: string + ipconfig2: + type: string + ipconfig3: + type: string + ipconfig4: + type: string + ipconfig5: + type: string + ipconfig6: + type: string + ipconfig7: + type: string + ipconfig8: + type: string + ipconfig9: + type: string + ipconfig10: + type: string + ipconfig11: + type: string + ipconfig12: + type: string + ipconfig13: + type: string + ipconfig14: + type: string + ipconfig15: + type: string + ipconfig16: + type: string + ipconfig17: + type: string + ipconfig18: + type: string + ipconfig19: + type: string + ipconfig20: + type: string + ipconfig21: + type: string + ipconfig22: + type: string + ipconfig23: + type: string + ipconfig24: + type: string + ipconfig25: + type: string + ipconfig26: + type: string + ipconfig27: + type: string + ipconfig28: + type: string + ipconfig29: + type: string + ipconfig30: + type: string + ipconfig31: + type: string ivshmem: type: string keephugepages: @@ -650,8 +729,88 @@ spec: Create will automatically use the setting from the host if neither searchdomain nor nameserver are set.' type: string + net0: + type: string + net1: + type: string + net2: + type: string + net3: + type: string + net4: + type: string + net5: + type: string + net6: + type: string + net7: + type: string + net8: + type: string + net9: + type: string + net10: + type: string + net11: + type: string + net12: + type: string + net13: + type: string + net14: + type: string + net15: + type: string + net16: + type: string + net17: + type: string + net18: + type: string + net19: + type: string + net20: + type: string + net21: + type: string + net22: + type: string + net23: + type: string + net24: + type: string + net25: + type: string + net26: + type: string + net27: + type: string + net28: + type: string + net29: + type: string + net30: + type: string + net31: + type: string numa: type: integer + numa0: + type: string + numa1: + type: string + numa2: + type: string + numa3: + type: string + numa4: + type: string + numa5: + type: string + numa6: + type: string + numa7: + type: string onboot: description: specifies whether a VM will be started during system bootup @@ -659,6 +818,12 @@ spec: ostype: description: quest OS type: string + parallel0: + type: string + parallel1: + type: string + parallel2: + type: string protection: type: integer reboot: @@ -666,6 +831,80 @@ spec: type: integer rng0: type: string + sata0: + type: string + sata1: + type: string + sata2: + type: string + sata3: + type: string + sata4: + type: string + sata5: + type: string + scsi0: + type: string + scsi1: + type: string + scsi2: + type: string + scsi3: + type: string + scsi4: + type: string + scsi5: + type: string + scsi6: + type: string + scsi7: + type: string + scsi8: + type: string + scsi9: + type: string + scsi10: + type: string + scsi11: + type: string + scsi12: + type: string + scsi13: + type: string + scsi14: + type: string + scsi15: + type: string + scsi16: + type: string + scsi17: + type: string + scsi18: + type: string + scsi19: + type: string + scsi20: + type: string + scsi21: + type: string + scsi22: + type: string + scsi23: + type: string + scsi24: + type: string + scsi25: + type: string + scsi26: + type: string + scsi27: + type: string + scsi28: + type: string + scsi29: + type: string + scsi30: + type: string scsihw: description: SCSI controller model type: string @@ -674,6 +913,14 @@ spec: Create will automatically use the setting from the host if neither searchdomain nor nameserver are set.' type: string + serial0: + type: string + serial1: + type: string + serial2: + type: string + serial3: + type: string shares: type: integer smbios1: @@ -705,10 +952,58 @@ spec: type: integer tpmstate: type: string + unused0: + type: string + unused1: + type: string + unused2: + type: string + unused3: + type: string + unused4: + type: string + unused5: + type: string + unused6: + type: string + unused7: + type: string vcpus: type: integer vga: type: string + virtio0: + type: string + virtio1: + type: string + virtio2: + type: string + virtio3: + type: string + virtio4: + type: string + virtio5: + type: string + virtio6: + type: string + virtio7: + type: string + virtio8: + type: string + virtio9: + type: string + virtio10: + type: string + virtio11: + type: string + virtio12: + type: string + virtio13: + type: string + virtio14: + type: string + virtio15: + type: string vmgenid: type: string vmstatestorage: diff --git a/controllers/proxmoxmachine_controller.go b/controllers/proxmoxmachine_controller.go index 0fad385..4e14847 100644 --- a/controllers/proxmoxmachine_controller.go +++ b/controllers/proxmoxmachine_controller.go @@ -34,6 +34,7 @@ import ( infrav1 "github.com/sp-yduck/cluster-api-provider-proxmox/api/v1beta1" "github.com/sp-yduck/cluster-api-provider-proxmox/cloud" + "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scheduler" "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/scope" "github.com/sp-yduck/cluster-api-provider-proxmox/cloud/services/compute/instance" ) @@ -41,7 +42,8 @@ import ( // ProxmoxMachineReconciler reconciles a ProxmoxMachine object type ProxmoxMachineReconciler struct { client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + SchedulerManager *scheduler.Manager } //+kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=proxmoxmachines,verbs=get;list;watch;create;update;patch;delete @@ -111,10 +113,11 @@ func (r *ProxmoxMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reque // Create the machine scope machineScope, err := scope.NewMachineScope(scope.MachineScopeParams{ - Client: r.Client, - Machine: machine, - ProxmoxMachine: proxmoxMachine, - ClusterGetter: clusterScope, + Client: r.Client, + Machine: machine, + ProxmoxMachine: proxmoxMachine, + ClusterGetter: clusterScope, + SchedulerManager: r.SchedulerManager, }) if err != nil { return ctrl.Result{}, errors.Errorf("failed to create scope: %+v", err) diff --git a/go.mod b/go.mod index 47fa37d..e8838f7 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/sp-yduck/cluster-api-provider-proxmox go 1.19 require ( + github.com/go-logr/logr v1.3.0 github.com/imdario/mergo v0.3.13 github.com/onsi/ginkgo/v2 v2.13.0 github.com/onsi/gomega v1.29.0 @@ -45,7 +46,6 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect - github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/zapr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.1 // indirect