Skip to content

Commit

Permalink
add feature gate
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin <[email protected]>
  • Loading branch information
KPostOffice committed Dec 2, 2024
1 parent 3901168 commit a2c952d
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 57 deletions.
3 changes: 3 additions & 0 deletions apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ type ControllerMetrics struct {
// metrics will be reported.
// +optional
EnableClusterQueueResources bool `json:"enableClusterQueueResources,omitempty"`

// +optional
EnableLocalQueueMetrics bool `json:"enableLocalQueueMetrics,omitempty"`
}

// ControllerHealth defines the health configs.
Expand Down
5 changes: 5 additions & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func main() {
cacheOptions = append(cacheOptions, cache.WithResourceTransformations(cfg.Resources.Transformations))
queueOptions = append(queueOptions, queue.WithResourceTransformations(cfg.Resources.Transformations))
}
if cfg.Metrics.EnableLocalQueueMetrics {
cacheOptions = append(cacheOptions, cache.WithLocalQueueMetrics(true))
queueOptions = append(queueOptions, queue.WithLocalQueueMetrics(true))
}
if cfg.FairSharing != nil {
cacheOptions = append(cacheOptions, cache.WithFairSharing(cfg.FairSharing.Enable))
}
Expand Down Expand Up @@ -348,6 +352,7 @@ func setupScheduler(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager
mgr.GetEventRecorderFor(constants.AdmissionName),
scheduler.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(cfg)),
scheduler.WithFairSharing(cfg.FairSharing),
scheduler.WithLocalQueueMetrics(cfg.Metrics.EnableLocalQueueMetrics),
)
if err := mgr.Add(sched); err != nil {
setupLog.Error(err, "Unable to add scheduler to manager")
Expand Down
31 changes: 21 additions & 10 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type options struct {
workloadInfoOptions []workload.InfoOption
podsReadyTracking bool
fairSharingEnabled bool
localQueueMetrics bool
}

// Option configures the reconciler.
Expand Down Expand Up @@ -93,6 +94,12 @@ func WithFairSharing(enabled bool) Option {
}
}

func WithLocalQueueMetrics(enabled bool) Option {
return func(o *options) {
o.localQueueMetrics = enabled
}
}

var defaultOptions = options{}

// Cache keeps track of the Workloads that got admitted through ClusterQueues.
Expand All @@ -107,6 +114,7 @@ type Cache struct {
admissionChecks map[string]AdmissionCheck
workloadInfoOptions []workload.InfoOption
fairSharingEnabled bool
localQueueMetrics bool

hm hierarchy.Manager[*clusterQueue, *cohort]

Expand All @@ -126,6 +134,7 @@ func New(client client.Client, opts ...Option) *Cache {
podsReadyTracking: options.podsReadyTracking,
workloadInfoOptions: options.workloadInfoOptions,
fairSharingEnabled: options.fairSharingEnabled,
localQueueMetrics: options.localQueueMetrics,
hm: hierarchy.NewManager[*clusterQueue, *cohort](newCohort),
tasCache: NewTASCache(client),
}
Expand Down Expand Up @@ -431,8 +440,10 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
if !ok {
return
}
for _, q := range c.hm.ClusterQueues[cq.Name].localQueues {
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromWorkloadKey(q.key))
if c.localQueueMetrics {
for _, q := range c.hm.ClusterQueues[cq.Name].localQueues {
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromWorkloadKey(q.key))
}
}
c.hm.DeleteClusterQueue(cq.Name)
metrics.ClearCacheMetrics(cq.Name)
Expand Down Expand Up @@ -517,13 +528,13 @@ func (c *Cache) addOrUpdateWorkload(w *kueue.Workload) bool {
c.cleanupAssumedState(w)

if _, exist := clusterQueue.Workloads[workload.Key(w)]; exist {
clusterQueue.deleteWorkload(w)
clusterQueue.deleteWorkload(w, c.localQueueMetrics)
}

if c.podsReadyTracking {
c.podsReadyCond.Broadcast()
}
return clusterQueue.addWorkload(w) == nil
return clusterQueue.addWorkload(w, c.localQueueMetrics) == nil
}

func (c *Cache) UpdateWorkload(oldWl, newWl *kueue.Workload) error {
Expand All @@ -534,7 +545,7 @@ func (c *Cache) UpdateWorkload(oldWl, newWl *kueue.Workload) error {
if !ok {
return errors.New("old ClusterQueue doesn't exist")
}
cq.deleteWorkload(oldWl)
cq.deleteWorkload(oldWl, c.localQueueMetrics)
}
c.cleanupAssumedState(oldWl)

Expand All @@ -548,7 +559,7 @@ func (c *Cache) UpdateWorkload(oldWl, newWl *kueue.Workload) error {
if c.podsReadyTracking {
c.podsReadyCond.Broadcast()
}
return cq.addWorkload(newWl)
return cq.addWorkload(newWl, c.localQueueMetrics)
}

func (c *Cache) DeleteWorkload(w *kueue.Workload) error {
Expand All @@ -562,7 +573,7 @@ func (c *Cache) DeleteWorkload(w *kueue.Workload) error {

c.cleanupAssumedState(w)

cq.deleteWorkload(w)
cq.deleteWorkload(w, c.localQueueMetrics)
if c.podsReadyTracking {
c.podsReadyCond.Broadcast()
}
Expand Down Expand Up @@ -604,7 +615,7 @@ func (c *Cache) AssumeWorkload(w *kueue.Workload) error {
return ErrCqNotFound
}

if err := cq.addWorkload(w); err != nil {
if err := cq.addWorkload(w, c.localQueueMetrics); err != nil {
return err
}
c.assumedWorkloads[k] = string(w.Status.Admission.ClusterQueue)
Expand All @@ -628,7 +639,7 @@ func (c *Cache) ForgetWorkload(w *kueue.Workload) error {
if !ok {
return ErrCqNotFound
}
cq.deleteWorkload(w)
cq.deleteWorkload(w, c.localQueueMetrics)
if c.podsReadyTracking {
c.podsReadyCond.Broadcast()
}
Expand Down Expand Up @@ -794,7 +805,7 @@ func (c *Cache) cleanupAssumedState(w *kueue.Workload) {
// one, then we should also clean up the assumed one.
if workload.HasQuotaReservation(w) && assumedCQName != string(w.Status.Admission.ClusterQueue) {
if assumedCQ, exist := c.hm.ClusterQueues[assumedCQName]; exist {
assumedCQ.deleteWorkload(w)
assumedCQ.deleteWorkload(w, c.localQueueMetrics)
}
}
delete(c.assumedWorkloads, k)
Expand Down
15 changes: 8 additions & 7 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,28 +465,28 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
}
}

func (c *clusterQueue) addWorkload(w *kueue.Workload) error {
func (c *clusterQueue) addWorkload(w *kueue.Workload, lqMetrics bool) error {
k := workload.Key(w)
if _, exist := c.Workloads[k]; exist {
return errors.New("workload already exists in ClusterQueue")
}
wi := workload.NewInfo(w, c.workloadInfoOptions...)
c.Workloads[k] = wi
c.updateWorkloadUsage(wi, 1)
c.updateWorkloadUsage(wi, 1, lqMetrics)
if c.podsReadyTracking && !apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadPodsReady) {
c.WorkloadsNotReady.Insert(k)
}
c.reportActiveWorkloads()
return nil
}

func (c *clusterQueue) deleteWorkload(w *kueue.Workload) {
func (c *clusterQueue) deleteWorkload(w *kueue.Workload, lqMetrics bool) {
k := workload.Key(w)
wi, exist := c.Workloads[k]
if !exist {
return
}
c.updateWorkloadUsage(wi, -1)
c.updateWorkloadUsage(wi, -1, lqMetrics)
if c.podsReadyTracking && !apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadPodsReady) {
c.WorkloadsNotReady.Delete(k)
}
Expand All @@ -512,7 +512,7 @@ func (q *queue) reportActiveWorkloads() {

// updateWorkloadUsage updates the usage of the ClusterQueue for the workload
// and the number of admitted workloads for local queues.
func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64, lqMetrics bool) {
admitted := workload.IsAdmitted(wi.Obj)
frUsage := wi.FlavorResourceUsage()
for fr, q := range frUsage {
Expand Down Expand Up @@ -547,7 +547,9 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
updateFlavorUsage(frUsage, lq.admittedUsage, m)
lq.admittedWorkloads += int(m)
}
lq.reportActiveWorkloads()
if lqMetrics {
lq.reportActiveWorkloads()
}
}
}

Expand Down Expand Up @@ -618,7 +620,6 @@ func (q *queue) resetFlavorsAndResources(cqUsage resources.FlavorResourceQuantit
// Clean up removed flavors or resources.
q.usage = resetUsage(q.usage, cqUsage)
q.admittedUsage = resetUsage(q.admittedUsage, cqAdmittedUsage)
// KTODO: report local queue flavor usage metrics
}

func resetUsage(lqUsage resources.FlavorResourceQuantities, cqUsage resources.FlavorResourceQuantities) resources.FlavorResourceQuantities {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
if err := acRec.SetupWithManager(mgr, cfg); err != nil {
return "AdmissionCheck", err
}
qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc)
qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc, WithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics))
if err := qRec.SetupWithManager(mgr, cfg); err != nil {
return "LocalQueue", err
}
Expand Down
60 changes: 45 additions & 15 deletions pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,45 @@ const (

// LocalQueueReconciler reconciles a LocalQueue object
type LocalQueueReconciler struct {
client client.Client
log logr.Logger
queues *queue.Manager
cache *cache.Cache
wlUpdateCh chan event.GenericEvent
client client.Client
log logr.Logger
queues *queue.Manager
cache *cache.Cache
wlUpdateCh chan event.GenericEvent
localQueueMetricsEnabled bool
}

func NewLocalQueueReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache) *LocalQueueReconciler {
type LocalQueueReconcilerOptions struct {
LocalQueueMetricsEnabled bool
}

type LocalQueueReconcilerOption func(*LocalQueueReconcilerOptions)

var defaultLQOptions = LocalQueueReconcilerOptions{}

func WithLocalQueueMetricsEnabled(enabled bool) LocalQueueReconcilerOption {
return func(o *LocalQueueReconcilerOptions) {
o.LocalQueueMetricsEnabled = enabled
}
}

func NewLocalQueueReconciler(
client client.Client,
queues *queue.Manager,
cache *cache.Cache,
opts ...LocalQueueReconcilerOption,
) *LocalQueueReconciler {
options := defaultLQOptions
for _, opt := range opts {
opt(&options)
}
return &LocalQueueReconciler{
log: ctrl.Log.WithName("localqueue-reconciler"),
queues: queues,
cache: cache,
client: client,
wlUpdateCh: make(chan event.GenericEvent, updateChBuffer),
log: ctrl.Log.WithName("localqueue-reconciler"),
queues: queues,
cache: cache,
client: client,
wlUpdateCh: make(chan event.GenericEvent, updateChBuffer),
localQueueMetricsEnabled: options.LocalQueueMetricsEnabled,
}
}

Expand Down Expand Up @@ -144,7 +169,9 @@ func (r *LocalQueueReconciler) Create(e event.CreateEvent) bool {
log.Error(err, "Failed to add localQueue to the cache")
}

recordLocalQueueUsageMetrics(q)
if r.localQueueMetricsEnabled {
recordLocalQueueUsageMetrics(q)
}

return true
}
Expand All @@ -156,7 +183,9 @@ func (r *LocalQueueReconciler) Delete(e event.DeleteEvent) bool {
return true
}

metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(q))
if r.localQueueMetricsEnabled {
metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(q))
}

r.log.V(2).Info("LocalQueue delete event", "localQueue", klog.KObj(q))
r.queues.DeleteLocalQueue(q)
Expand Down Expand Up @@ -198,7 +227,9 @@ func (r *LocalQueueReconciler) Update(e event.UpdateEvent) bool {
}

r.queues.DeleteLocalQueue(oldLq)
updateLocalQueueResourceMetrics(newLq)
if r.localQueueMetricsEnabled {
updateLocalQueueResourceMetrics(newLq)
}

return true
}
Expand Down Expand Up @@ -354,7 +385,6 @@ func (r *LocalQueueReconciler) UpdateStatusIfChanged(
}
}
stats, err := r.cache.LocalQueueUsage(queue)
// KTODO: report LQ usage stats
if err != nil {
r.log.Error(err, failedUpdateLqStatusMsg)
return err
Expand Down
Loading

0 comments on commit a2c952d

Please sign in to comment.