-
Notifications
You must be signed in to change notification settings - Fork 268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Log output for LocalQueue and ClusterQueue #3605
base: main
Are you sure you want to change the base?
Changes from 4 commits
f4df8fe
fa8c077
fd7bc6a
c77a3a1
449e24e
84d415a
27af647
bf0d9e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -599,8 +599,9 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { | |||||
workload.AdjustResources(ctx, r.client, wlCopy) | ||||||
|
||||||
if !workload.HasQuotaReservation(wl) { | ||||||
if !r.queues.AddOrUpdateWorkload(wlCopy) { | ||||||
log.V(2).Info("LocalQueue for workload didn't exist or not active; ignored for now") | ||||||
err := r.queues.AddOrUpdateWorkload(wlCopy) | ||||||
if err != nil { | ||||||
log.V(2).Info(err.Error()) | ||||||
} | ||||||
return true | ||||||
} | ||||||
|
@@ -703,10 +704,10 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { | |||||
}) | ||||||
|
||||||
case prevStatus == workload.StatusPending && status == workload.StatusPending: | ||||||
if !r.queues.UpdateWorkload(oldWl, wlCopy) { | ||||||
log.V(2).Info("Queue for updated workload didn't exist; ignoring for now") | ||||||
err := r.queues.UpdateWorkload(oldWl, wlCopy) | ||||||
if err != nil { | ||||||
log.V(2).Info(err.Error()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
case prevStatus == workload.StatusPending && (status == workload.StatusQuotaReserved || status == workload.StatusAdmitted): | ||||||
r.queues.DeleteWorkload(oldWl) | ||||||
if !r.cache.AddOrUpdateWorkload(wlCopy) { | ||||||
|
@@ -729,8 +730,9 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { | |||||
// Here we don't take the lock as it is already taken by the wrapping | ||||||
// function. | ||||||
if immediate { | ||||||
if !r.queues.AddOrUpdateWorkloadWithoutLock(wlCopy) { | ||||||
log.V(2).Info("LocalQueue for workload didn't exist or not active; ignored for now") | ||||||
err := r.queues.AddOrUpdateWorkloadWithoutLock(wlCopy) | ||||||
if err != nil { | ||||||
log.V(2).Info(err.Error()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
} | ||||||
}) | ||||||
|
@@ -741,8 +743,9 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { | |||||
updatedWl := kueue.Workload{} | ||||||
err := r.client.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWl) | ||||||
if err == nil && workload.Status(&updatedWl) == workload.StatusPending { | ||||||
if !r.queues.AddOrUpdateWorkload(wlCopy) { | ||||||
log.V(2).Info("LocalQueue for workload didn't exist or not active; ignored for now") | ||||||
err := r.queues.AddOrUpdateWorkload(wlCopy) | ||||||
if err != nil { | ||||||
log.V(2).Info(err.Error()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} else { | ||||||
log.V(3).Info("Workload requeued after backoff") | ||||||
} | ||||||
|
@@ -886,8 +889,9 @@ func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, _ | |||||
log := log.WithValues("workload", klog.KObj(wlCopy)) | ||||||
log.V(5).Info("Queue reconcile for") | ||||||
workload.AdjustResources(ctrl.LoggerInto(ctx, log), h.r.client, wlCopy) | ||||||
if !h.r.queues.AddOrUpdateWorkload(wlCopy) { | ||||||
log.V(2).Info("Queue for workload didn't exist") | ||||||
err := h.r.queues.AddOrUpdateWorkload(wlCopy) | ||||||
if err != nil { | ||||||
log.V(2).Info(err.Error()) | ||||||
} | ||||||
} | ||||||
} | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -319,28 +319,28 @@ func (m *Manager) ClusterQueueForWorkload(wl *kueue.Workload) (string, bool) { | |||||
|
||||||
// AddOrUpdateWorkload adds or updates workload to the corresponding queue. | ||||||
// Returns whether the queue existed. | ||||||
func (m *Manager) AddOrUpdateWorkload(w *kueue.Workload) bool { | ||||||
func (m *Manager) AddOrUpdateWorkload(w *kueue.Workload) error { | ||||||
m.Lock() | ||||||
defer m.Unlock() | ||||||
return m.AddOrUpdateWorkloadWithoutLock(w) | ||||||
} | ||||||
|
||||||
func (m *Manager) AddOrUpdateWorkloadWithoutLock(w *kueue.Workload) bool { | ||||||
func (m *Manager) AddOrUpdateWorkloadWithoutLock(w *kueue.Workload) error { | ||||||
qKey := workload.QueueKey(w) | ||||||
q := m.localQueues[qKey] | ||||||
if q == nil { | ||||||
return false | ||||||
return ErrQueueDoesNotExist | ||||||
} | ||||||
wInfo := workload.NewInfo(w, m.workloadInfoOptions...) | ||||||
q.AddOrUpdate(wInfo) | ||||||
cq := m.hm.ClusterQueues[q.ClusterQueue] | ||||||
if cq == nil { | ||||||
return false | ||||||
return errClusterQueueAlreadyExists | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
cq.PushOrUpdate(wInfo) | ||||||
m.reportPendingWorkloads(q.ClusterQueue, cq) | ||||||
m.Broadcast() | ||||||
return true | ||||||
return nil | ||||||
} | ||||||
|
||||||
// RequeueWorkload requeues the workload ensuring that the queue and the | ||||||
|
@@ -503,7 +503,7 @@ func requeueWorkloadsCohortSubtree(ctx context.Context, m *Manager, cohort *coho | |||||
|
||||||
// UpdateWorkload updates the workload to the corresponding queue or adds it if | ||||||
// it didn't exist. Returns whether the queue existed. | ||||||
func (m *Manager) UpdateWorkload(oldW, w *kueue.Workload) bool { | ||||||
func (m *Manager) UpdateWorkload(oldW, w *kueue.Workload) error { | ||||||
m.Lock() | ||||||
defer m.Unlock() | ||||||
if oldW.Spec.QueueName != w.Spec.QueueName { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To have the same message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for sure I'll add them :)