Skip to content

Commit

Permalink
cleanup todos and add more feature gates
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin <[email protected]>
  • Loading branch information
KPostOffice committed Dec 2, 2024
1 parent a2c952d commit 9493af3
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 66 deletions.
4 changes: 4 additions & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func main() {

metrics.Register()

if cfg.Metrics.EnableLocalQueueMetrics {
metrics.RegisterLQMetrics()
}

kubeConfig := ctrl.GetConfigOrDie()
if kubeConfig.UserAgent == "" {
kubeConfig.UserAgent = useragent.Default()
Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*clusterQueue, error) {
}
c.hm.AddClusterQueue(cqImpl)
c.hm.UpdateClusterQueueEdge(cq.Name, cq.Spec.Cohort)
if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, nil); err != nil {
if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, nil, c.localQueueMetrics); err != nil {
return nil, err
}

Expand Down Expand Up @@ -228,8 +228,8 @@ func (c *Cache) updateClusterQueues() sets.Set[string] {
// We call update on all ClusterQueues irrespective of which CQ actually use this flavor
// because it is not expensive to do so, and is not worth tracking which ClusterQueues use
// which flavors.
cq.UpdateWithFlavors(c.resourceFlavors)
cq.updateWithAdmissionChecks(c.admissionChecks)
cq.UpdateWithFlavors(c.resourceFlavors, c.localQueueMetrics)
cq.updateWithAdmissionChecks(c.admissionChecks, c.localQueueMetrics)
curStatus := cq.Status
if prevStatus == pending && curStatus == active {
cqs.Insert(cq.Name)
Expand Down Expand Up @@ -421,7 +421,7 @@ func (c *Cache) UpdateClusterQueue(cq *kueue.ClusterQueue) error {
}
oldParent := cqImpl.Parent()
c.hm.UpdateClusterQueueEdge(cq.Name, cq.Spec.Cohort)
if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, oldParent); err != nil {
if err := cqImpl.updateClusterQueue(c.hm.CycleChecker, cq, c.resourceFlavors, c.admissionChecks, oldParent, c.localQueueMetrics); err != nil {
return err
}
for _, qImpl := range cqImpl.localQueues {
Expand All @@ -442,7 +442,7 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
}
if c.localQueueMetrics {
for _, q := range c.hm.ClusterQueues[cq.Name].localQueues {
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromWorkloadKey(q.key))
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(q.key))
}
}
c.hm.DeleteClusterQueue(cq.Name)
Expand Down
26 changes: 14 additions & 12 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ var defaultPreemption = kueue.ClusterQueuePreemption{

var defaultFlavorFungibility = kueue.FlavorFungibility{WhenCanBorrow: kueue.Borrow, WhenCanPreempt: kueue.TryNextFlavor}

func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort) error {
func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort, lqMetrics bool) error {
if c.updateQuotasAndResourceGroups(in.Spec.ResourceGroups) || oldParent != c.Parent() {
if oldParent != nil && oldParent != c.Parent() {
// ignore error when old Cohort has cycle.
Expand Down Expand Up @@ -166,8 +166,8 @@ func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, i
c.Preemption = defaultPreemption
}

c.UpdateWithFlavors(resourceFlavors)
c.updateWithAdmissionChecks(admissionChecks)
c.UpdateWithFlavors(resourceFlavors, lqMetrics)
c.updateWithAdmissionChecks(admissionChecks, lqMetrics)

if in.Spec.FlavorFungibility != nil {
c.FlavorFungibility = *in.Spec.FlavorFungibility
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *clusterQueue) updateQuotasAndResourceGroups(in []kueue.ResourceGroup) b
!equality.Semantic.DeepEqual(oldQuotas, c.resourceNode.Quotas)
}

func (c *clusterQueue) updateQueueStatus() {
func (c *clusterQueue) updateQueueStatus(lqMetrics bool) {
status := active
if c.isStopped ||
len(c.missingFlavors) > 0 ||
Expand All @@ -237,8 +237,10 @@ func (c *clusterQueue) updateQueueStatus() {
if status != c.Status {
c.Status = status
metrics.ReportClusterQueueStatus(c.Name, c.Status)
for _, lq := range c.localQueues {
metrics.ReportLocalQueueStatus(metrics.LQRefFromWorkloadKey(lq.key), c.Status)
if lqMetrics {
for _, lq := range c.localQueues {
metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(lq.key), c.Status)
}
}
}
}
Expand Down Expand Up @@ -330,9 +332,9 @@ func (c *clusterQueue) isTASViolated() bool {

// UpdateWithFlavors updates a ClusterQueue based on the passed ResourceFlavors set.
// Exported only for testing.
func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, lqMetrics bool) {
c.updateLabelKeys(flavors)
c.updateQueueStatus()
c.updateQueueStatus(lqMetrics)
}

func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
Expand Down Expand Up @@ -365,7 +367,7 @@ func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference
}

// updateWithAdmissionChecks updates a ClusterQueue based on the passed AdmissionChecks set.
func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) {
func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck, lqMetrics bool) {
checksPerController := make(map[string][]string, len(c.AdmissionChecks))
singleInstanceControllers := sets.New[string]()
multiKueueAdmissionChecks := sets.New[string]()
Expand Down Expand Up @@ -461,7 +463,7 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
}

if update {
c.updateQueueStatus()
c.updateQueueStatus(lqMetrics)
}
}

Expand Down Expand Up @@ -595,13 +597,13 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error {
}
c.localQueues[qKey] = qImpl
qImpl.reportActiveWorkloads()
metrics.ReportLocalQueueStatus(metrics.LQRefFromWorkloadKey(qKey), c.Status)
metrics.ReportLocalQueueStatus(metrics.LQRefFromLocalQueueKey(qKey), c.Status)
return nil
}

func (c *clusterQueue) deleteLocalQueue(q *kueue.LocalQueue) {
qKey := queueKey(q)
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromWorkloadKey(qKey))
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(qKey))
delete(c.localQueues, qKey)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestClusterQueueUpdateWithFlavors(t *testing.T) {
}

cq.Status = tc.curStatus
cq.UpdateWithFlavors(tc.flavors)
cq.UpdateWithFlavors(tc.flavors, false)

if cq.Status != tc.wantStatus {
t.Fatalf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status)
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
cq.flavorIndependentAdmissionCheckAppliedPerFlavor = []string{"not-on-flavor"}
}
}
cq.updateWithAdmissionChecks(tc.admissionChecks)
cq.updateWithAdmissionChecks(tc.admissionChecks, false)

if cq.Status != tc.wantStatus {
t.Errorf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
if err := acRec.SetupWithManager(mgr, cfg); err != nil {
return "AdmissionCheck", err
}
qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc, WithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics))
qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc, LqControllerWithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics))
if err := qRec.SetupWithManager(mgr, cfg); err != nil {
return "LocalQueue", err
}
Expand Down Expand Up @@ -80,6 +80,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
mgr.GetEventRecorderFor(constants.WorkloadControllerName),
WithWorkloadUpdateWatchers(qRec, cqRec),
WithWaitForPodsReady(waitForPodsReady(cfg.WaitForPodsReady)),
WlControllerWithLocalQueueMetricsEnabled(cfg.Metrics.EnableLocalQueueMetrics),
).SetupWithManager(mgr, cfg); err != nil {
return "Workload", err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type LocalQueueReconcilerOption func(*LocalQueueReconcilerOptions)

var defaultLQOptions = LocalQueueReconcilerOptions{}

func WithLocalQueueMetricsEnabled(enabled bool) LocalQueueReconcilerOption {
func LqControllerWithLocalQueueMetricsEnabled(enabled bool) LocalQueueReconcilerOption {
return func(o *LocalQueueReconcilerOptions) {
o.LocalQueueMetricsEnabled = enabled
}
Expand Down
59 changes: 36 additions & 23 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type waitForPodsReadyConfig struct {
type options struct {
watchers []WorkloadUpdateWatcher
waitForPodsReadyConfig *waitForPodsReadyConfig
localQueueMetrics bool
}

// Option configures the reconciler.
Expand All @@ -89,6 +90,12 @@ func WithWorkloadUpdateWatchers(value ...WorkloadUpdateWatcher) Option {
}
}

func WlControllerWithLocalQueueMetricsEnabled(enabled bool) Option {
return func(o *options) {
o.localQueueMetrics = enabled
}
}

var defaultOptions = options{}

type WorkloadUpdateWatcher interface {
Expand All @@ -97,14 +104,15 @@ type WorkloadUpdateWatcher interface {

// WorkloadReconciler reconciles a Workload object
type WorkloadReconciler struct {
log logr.Logger
queues *queue.Manager
cache *cache.Cache
client client.Client
watchers []WorkloadUpdateWatcher
waitForPodsReady *waitForPodsReadyConfig
recorder record.EventRecorder
clock clock.Clock
log logr.Logger
queues *queue.Manager
cache *cache.Cache
client client.Client
watchers []WorkloadUpdateWatcher
waitForPodsReady *waitForPodsReadyConfig
localQueueMetrics bool
recorder record.EventRecorder
clock clock.Clock
}

func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, recorder record.EventRecorder, opts ...Option) *WorkloadReconciler {
Expand All @@ -114,14 +122,15 @@ func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *c
}

return &WorkloadReconciler{
log: ctrl.Log.WithName("workload-reconciler"),
client: client,
queues: queues,
cache: cache,
watchers: options.watchers,
waitForPodsReady: options.waitForPodsReadyConfig,
recorder: recorder,
clock: realClock,
log: ctrl.Log.WithName("workload-reconciler"),
client: client,
queues: queues,
cache: cache,
watchers: options.watchers,
waitForPodsReady: options.waitForPodsReadyConfig,
localQueueMetrics: options.localQueueMetrics,
recorder: recorder,
clock: realClock,
}
}

Expand Down Expand Up @@ -209,7 +218,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, fmt.Errorf("setting eviction: %w", err)
}
if evicted && wl.Status.Admission != nil {
workload.ReportEvictedWorkload(r.recorder, &wl, string(wl.Status.Admission.ClusterQueue), reason, message)
workload.ReportEvictedWorkload(r.recorder, &wl, string(wl.Status.Admission.ClusterQueue), reason, message, r.localQueueMetrics)
}
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -258,8 +267,10 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, quotaReservedWaitTime.Seconds())
metrics.AdmittedWorkload(kueue.ClusterQueueReference(cqName), queuedWaitTime)
metrics.AdmissionChecksWaitTime(kueue.ClusterQueueReference(cqName), quotaReservedWaitTime)
metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(&wl), queuedWaitTime)
metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(&wl), quotaReservedWaitTime)
if r.localQueueMetrics {
metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(&wl), queuedWaitTime)
metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(&wl), quotaReservedWaitTime)
}
}
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -392,7 +403,7 @@ func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl
return false, client.IgnoreNotFound(err)
}
cqName, _ := r.queues.ClusterQueueForWorkload(wl)
workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByAdmissionCheck, message)
workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByAdmissionCheck, message, r.localQueueMetrics)
return true, nil
}

Expand Down Expand Up @@ -430,7 +441,9 @@ func (r *WorkloadReconciler) reconcileOnLocalQueueActiveState(ctx context.Contex
cqName := string(lq.Spec.ClusterQueue)
if slices.Contains(r.queues.GetClusterQueueNames(), cqName) {
metrics.ReportEvictedWorkloads(cqName, kueue.WorkloadEvictedByLocalQueueStopped)
metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), kueue.WorkloadEvictedByLocalQueueStopped)
if r.localQueueMetrics {
metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), kueue.WorkloadEvictedByLocalQueueStopped)
}
}
}
return true, client.IgnoreNotFound(err)
Expand Down Expand Up @@ -476,7 +489,7 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont
workload.ResetChecksOnEviction(wl, r.clock.Now())
err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
if err == nil {
workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByClusterQueueStopped, message)
workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByClusterQueueStopped, message, r.localQueueMetrics)
}
return true, client.IgnoreNotFound(err)
}
Expand Down Expand Up @@ -556,7 +569,7 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c
err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
if err == nil {
cqName, _ := r.queues.ClusterQueueForWorkload(wl)
workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByPodsReadyTimeout, message)
workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByPodsReadyTimeout, message, r.localQueueMetrics)
}
return 0, client.IgnoreNotFound(err)
}
Expand Down
31 changes: 19 additions & 12 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,9 @@ func ReportPendingWorkloads(cqName string, active, inadmissible int) {
PendingWorkloads.WithLabelValues(cqName, PendingStatusInadmissible).Set(float64(inadmissible))
}

func ReportLocalQueuePendingWorkloads(lqKey string, active, inadmissible int) {
keySlice := strings.Split(lqKey, "/")
// KTODO: make sure index is correct for lqname and namespace
LocalQueuePendingWorkloads.WithLabelValues(keySlice[0], keySlice[1], PendingStatusActive).Set(float64(active))
LocalQueuePendingWorkloads.WithLabelValues(keySlice[0], keySlice[1], PendingStatusInadmissible).Set(float64(inadmissible))
func ReportLocalQueuePendingWorkloads(lq LocalQueueReference, active, inadmissible int) {
LocalQueuePendingWorkloads.WithLabelValues(lq.Name, lq.Namespace, PendingStatusActive).Set(float64(active))
LocalQueuePendingWorkloads.WithLabelValues(lq.Name, lq.Namespace, PendingStatusInadmissible).Set(float64(inadmissible))
}

func ReportEvictedWorkloads(cqName, reason string) {
Expand All @@ -439,8 +437,7 @@ func LQRefFromWorkload(wl *kueue.Workload) LocalQueueReference {
}
}

func LQRefFromWorkloadKey(wlKey string) LocalQueueReference {
// KTODO: make sure split is correct
func LQRefFromLocalQueueKey(wlKey string) LocalQueueReference {
split := strings.Split(wlKey, "/")
return LocalQueueReference{
Name: split[1],
Expand All @@ -461,7 +458,6 @@ func ClearClusterQueueMetrics(cqName string) {
PreemptedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"preempting_cluster_queue": cqName})
}

// KTODO: call clear func
func ClearLocalQueueMetrics(lq LocalQueueReference) {
LocalQueuePendingWorkloads.DeleteLabelValues(lq.Name, lq.Namespace, PendingStatusActive)
LocalQueuePendingWorkloads.DeleteLabelValues(lq.Name, lq.Namespace, PendingStatusInadmissible)
Expand Down Expand Up @@ -501,7 +497,6 @@ func ClearCacheMetrics(cqName string) {
}
}

// KTODO: call clear func
func ClearLocalQueueCacheMetrics(lq LocalQueueReference) {
LocalQueueReservingActiveWorkloads.DeleteLabelValues(lq.Name, lq.Namespace)
LocalQueueAdmittedActiveWorkloads.DeleteLabelValues(lq.Name, lq.Namespace)
Expand All @@ -522,7 +517,6 @@ func ReportClusterQueueResourceReservations(cohort, queue, flavor, resource stri
ClusterQueueResourceReservations.WithLabelValues(cohort, queue, flavor, resource).Set(usage)
}

// KTODO: call func
func ReportLocalQueueResourceReservations(lq LocalQueueReference, flavor, resource string, usage float64) {
LocalQueueResourceReservations.WithLabelValues(lq.Name, lq.Namespace, flavor, resource).Set(usage)
}
Expand All @@ -531,7 +525,6 @@ func ReportClusterQueueResourceUsage(cohort, queue, flavor, resource string, usa
ClusterQueueResourceUsage.WithLabelValues(cohort, queue, flavor, resource).Set(usage)
}

// KTODO: call func
func ReportLocalQueueResourceUsage(lq LocalQueueReference, flavor, resource string, usage float64) {
LocalQueueResourceUsage.WithLabelValues(lq.Name, lq.Namespace, flavor, resource).Set(usage)
}
Expand Down Expand Up @@ -628,8 +621,22 @@ func Register() {
ClusterQueueResourceBorrowingLimit,
ClusterQueueResourceLendingLimit,
ClusterQueueWeightedShare,
)
}

func RegisterLQMetrics() {
metrics.Registry.MustRegister(
LocalQueuePendingWorkloads,
LocalQueueReservingActiveWorkloads,
LocalQueueAdmittedActiveWorkloads,
LocalQueueQuotaReservedWorkloadsTotal,
localQueueQuotaReservedWaitTime,
LocalQueueAdmittedWorkloadsTotal,
localQueueAdmissionWaitTime,
localQueueAdmissionChecksWaitTime,
LocalQueueEvictedWorkloadsTotal,
LocalQueueByStatus,
LocalQueueResourceReservations,
LocalQueueResourceUsage,
LocalQueueByStatus,
)
}
Loading

0 comments on commit 9493af3

Please sign in to comment.